Callbacks

Callbacks used by handlers to transform raw provider dataframes through an ordered pipeline of transformations.
from nbdev.showdoc import show_doc

Foundation

The Transformer class coordinates an ordered pipeline of Callback objects applied to a dict of DataFrames (one per sample type group) or a single DataFrame.

Each callback modifies its group’s DataFrame in sequence. This is how provider-specific conventions are gradually normalised to the MARIS schema.


source

Callback


def Callback(
    
):

Base class for callbacks.


source

PerGroupCB


def PerGroupCB(
    grps:list=None, # Groups to process; None = all groups in `tfm.dfs`
):

Calls each_grp for each group in tfm.dfs; set grps to restrict to specific groups.


source

PerGroupCB.each_grp


def each_grp(
    grp:str, # Group key e.g. `'SEAWATER'`, `'BIOTA'`
    df:pd.DataFrame, # DataFrame for this group
    tfm, # Parent [`Transformer`](https://franckalbinet.github.io/marisco/api/callbacks.html#transformer)
):

Override to implement per-group transformation logic.


source

run_cbs


def run_cbs(
    cbs:List[Callback], # List of callbacks to run
    obj:Any, # Object to pass to the callbacks
):

Run the callbacks in the order they are specified.

Testing that callbacks run in order and only those with docstrings get logged:

class DocCB(Callback):
    "Runs second."
    order = 1
    def __call__(self, obj): pass

class DocSecondCB(Callback):
    "Runs first."
    order = 0
    def __call__(self, obj): pass

class NoDocCB(Callback):
    def __call__(self, obj): pass

obj = type('Obj', (), {'logs': []})()
run_cbs([DocCB(), NoDocCB(), DocSecondCB()], obj)
test_eq(obj.logs, ['Runs first.', 'Runs second.'])

source

Transformer


def Transformer(
    data:Union[Dict[str, pd.DataFrame], pd.DataFrame], # Data to be transformed
    cbs:Optional[List[Callback]]=None, # List of callbacks to run
    custom_maps:Dict=None, inplace:bool=False, # Whether to modify the dataframe(s) in place
):

Transform the dataframe(s) according to the specified callbacks.

Below, a few examples of how to use the Transformer class. Let’s define first a test callback that adds 1 to the depth:

class TestCB(Callback):
    "A test callback to add 1 to the depth."
    def __call__(self, tfm: Transformer):
        for grp, df in tfm.dfs.items(): 
            df['depth'] = df['depth'].apply(lambda x: x+1)

And apply it to the following dataframes:

dfs = {'biota': pd.DataFrame({'id': [0, 1, 2], 'species': [0, 2, 0], 'depth': [2, 3, 4]}),
       'seawater': pd.DataFrame({'id': [0, 1, 2], 'depth': [3, 4, 5]})}

tfm = Transformer(dfs, cbs=[TestCB()])
dfs_test = tfm()

test_eq(dfs_test['biota']['depth'].to_list(), [3, 4, 5])
test_eq(dfs_test['seawater']['depth'].to_list(), [4, 5, 6])

Transformer also accepts a single pd.DataFrame (pre-split handlers like geotraces). In that case callbacks access tfm.df instead of tfm.dfs:

class TestCB(Callback):
    "A test callback to add 1 to the depth."
    def __call__(self, tfm: Transformer):
        tfm.df['depth'] = tfm.df['depth'].apply(lambda x: x+1)
df = pd.DataFrame({'id': [0, 1, 2], 'species': [0, 2, 0], 'depth': [2, 3, 4]})

tfm = Transformer(df, cbs=[TestCB()])
df_test = tfm()

test_eq(df_test['depth'].to_list(), [3, 4, 5])

Subclass PerGroupCB and override each_grp. The loop, missing-group guard, and optional scoping are inherited. Use grps to restrict to specific groups:

# All groups: stamp every group with a constant flag
class AddFlagCB(PerGroupCB):
    def __init__(self, col, val): store_attr()
    def each_grp(self, grp, df, tfm): df[self.col] = self.val

dfs = {'SEAWATER': pd.DataFrame({'depth': [1, 2]}),
       'BIOTA':    pd.DataFrame({'depth': [3, 4]})}

tfm = Transformer(dfs, cbs=[AddFlagCB(col='flag', val=1)])
dfs_result = tfm()
test_eq(dfs_result['SEAWATER']['flag'].to_list(), [1, 1])
test_eq(dfs_result['BIOTA']['flag'].to_list(), [1, 1])
# Scoped: only BIOTA gets the column; SEAWATER is untouched
class BiotaFlagCB(PerGroupCB):
    grps = ['BIOTA']
    def each_grp(self, grp, df, tfm): df['is_biota'] = True

tfm2 = Transformer(dfs, cbs=[BiotaFlagCB()])
dfs_result2 = tfm2()
test_eq('is_biota' in dfs_result2['BIOTA'].columns, True)
test_eq('is_biota' in dfs_result2['SEAWATER'].columns, False)

Cleaning & validation

Callbacks for cleaning and validating data: coordinate sanitisation (invalid lon/lat), and removal of rows with all-NA values in key columns.


source

SanitizeLonLatCB


def SanitizeLonLatCB(
    lon_col:str='LON', # Longitude column name
    lat_col:str='LAT', # Latitude column name
    verbose:bool=False, # Whether to print the number of invalid longitude & latitude values
):

Drop rows with invalid longitude & latitude values. Convert , separator to . separator.

# Check that measurements located at (0,0) get removed
dfs = {'BIOTA': pd.DataFrame({'LON': [0, 1, 0], 'LAT': [0, 2, 0]})}
tfm = Transformer(dfs, cbs=[SanitizeLonLatCB()])
tfm()['BIOTA']

expected = [1., 2.]
test_eq(tfm()['BIOTA'].iloc[0].to_list(), expected)
# Check that comma decimal separator get replaced by point instead
dfs = {'BIOTA': pd.DataFrame({'LON': ['45,2'], 'LAT': ['43,1']})}
tfm = Transformer(dfs, cbs=[SanitizeLonLatCB()])
tfm()['BIOTA']

expected = [45.2, 43.1]
test_eq(tfm()['BIOTA'].iloc[0].to_list(), expected)
# Check that out of bounds lon or lat get removed
dfs = {'BIOTA': pd.DataFrame({'LON': [-190, 190, 1, 2, 1.1], 'LAT': [1, 2, 91, -91, 2.2]})}
tfm = Transformer(dfs, cbs=[SanitizeLonLatCB()])
tfm()['BIOTA']

expected = [1.1, 2.2]
test_eq(tfm()['BIOTA'].iloc[0].to_list(), expected)

Value mapping


source

RemapCB


def RemapCB(
    lut:dict | Callable, # Lookup: dict, or callable(dfs)->dict
    col_remap:str, # Destination column to create
    col_src:str, # Source column with provider values
    default_val:int=0, # Value assigned to unmapped source values
    grps:list[str]=None, # Groups to process (None = all)
):

Remap source values to MARIS standard identifiers using a lookup table.


source

RemapCB


def RemapCB(
    lut:dict | Callable, # Lookup: dict, or callable(dfs)->dict
    col_remap:str, # Destination column to create
    col_src:str, # Source column with provider values
    default_val:int=0, # Value assigned to unmapped source values
    grps:list[str]=None, # Groups to process (None = all)
):

Remap source values to MARIS standard identifiers using a lookup table.

Here are tests demonstrating various RemapCB behaviours: basic mapping, default fallback, whitespace handling, group scoping, custom default values, and lazy LUT resolution via callable factories.

# Basic remapping: values present in the LUT get mapped correctly
lut_dict = {'Cs-137': 1, 'K-40': 2, 'Sr-90': 3}
dfs = {'SEAWATER': pd.DataFrame({'nuclide': ['Cs-137', 'K-40', 'Sr-90']})}
tfm = Transformer(dfs, cbs=[RemapCB(lut=lut_dict, col_remap='NUCLIDE_ID', col_src='nuclide')])
result = tfm()['SEAWATER']
test_eq(result['NUCLIDE_ID'].to_list(), [1, 2, 3])
# Default fallback: values NOT in the LUT get default_value (0)
dfs = {'SEAWATER': pd.DataFrame({'nuclide': ['Cs-137', 'Am-241']})}
tfm = Transformer(dfs, cbs=[RemapCB(lut=lut_dict, col_remap='NUCLIDE_ID', col_src='nuclide')])
result = tfm()['SEAWATER']
test_eq(result['NUCLIDE_ID'].to_list(), [1, 0])
# Group scoping: only BIOTA gets remapped, SEAWATER untouched
dfs = {'SEAWATER': pd.DataFrame({'nuclide': ['Cs-137']}),
       'BIOTA':    pd.DataFrame({'nuclide': ['K-40']})}
tfm = Transformer(dfs, cbs=[RemapCB(lut=lut_dict, col_remap='NUCLIDE_ID', col_src='nuclide',
                                     grps=['BIOTA'])])
result = tfm()
test_eq('NUCLIDE_ID' in result['SEAWATER'].columns, False)
test_eq(result['BIOTA']['NUCLIDE_ID'].to_list(), [2])
# Custom default_val
dfs = {'SEAWATER': pd.DataFrame({'nuclide': ['Am-241']})}
tfm = Transformer(dfs, cbs=[RemapCB(lut=lut_dict, col_remap='NUCLIDE_ID', col_src='nuclide',
                                     default_val=-1)])
result = tfm()['SEAWATER']
test_eq(result['NUCLIDE_ID'].to_list(), [-1])
from marisco.match import make_lut_from

# Provider supplies their own nuclide codes
prov_nomencl = pd.DataFrame({
    'code': ['h-3', 'k-40'],
    'maris_ref': ['h3', 'k40']
})

nuclide_lut = make_lut_from(prov_nomencl, key_col='code', match_col='maris_ref', lut_key='NUCLIDE')

dfs = {'SEAWATER': pd.DataFrame({'NUCLIDE': ['h-3', 'k-40']})}
tfm = Transformer(dfs, cbs=[RemapCB(lut=nuclide_lut, col_remap='NUCLIDE_ID', col_src='NUCLIDE')])
result = tfm()['SEAWATER']
# h-3 → h3 → nuclide_id=1, k-40 → k40 → nuclide_id=4
test_eq(result['NUCLIDE_ID'].to_list(), [1, 4])
# Only BIOTA gets remapped
dfs = {'SEAWATER': pd.DataFrame({'NUCLIDE': ['h-3']}),
       'BIOTA':    pd.DataFrame({'NUCLIDE': ['k-40']})}
       
tfm = Transformer(dfs, cbs=[RemapCB(lut=nuclide_lut, col_remap='NUCLIDE_ID', col_src='NUCLIDE',
                                     grps=['BIOTA'])])
result = tfm()
test_eq('NUCLIDE_ID' in result['SEAWATER'].columns, False)
test_eq(result['BIOTA']['NUCLIDE_ID'].to_list(), [4])
from marisco.match import make_lut
nuclide_lut = make_lut('NUCLIDE')

dfs = {'SEAWATER': pd.DataFrame({'NUCLIDE': ['h3']}),
       'BIOTA':    pd.DataFrame({'NUCLIDE': ['k40']})}
       
tfm = Transformer(dfs, cbs=[RemapCB(lut=nuclide_lut, col_remap='NUCLIDE_ID', col_src='NUCLIDE',
                                     grps=['BIOTA'])])
result = tfm()
test_eq('NUCLIDE_ID' in result['SEAWATER'].columns, False)
test_eq(result['BIOTA']['NUCLIDE_ID'].to_list(), [4])

source

LowerStripNameCB


def LowerStripNameCB(
    col_src:str, # Source column name e.g. 'Nuclide'
    col_dst:str=None, # Destination column name
    fn_transform:Callable=<lambda>, # Transformation function
):

Convert values to lowercase and strip any trailing spaces.

Let’s test the callback:

dfs = {'seawater': pd.DataFrame({'Nuclide': ['CS137', '226RA']})}

tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='Nuclide', col_dst='NUCLIDE')])
test_eq(tfm()['seawater']['NUCLIDE'].to_list(), ['cs137', '226ra'])


tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='Nuclide')])
test_eq(tfm()['seawater']['Nuclide'].to_list(), ['cs137', '226ra'])

Schema alignment


source

AddSampleTypeIdColumnCB


def AddSampleTypeIdColumnCB(
    lut:dict={'SEAWATER': 1, 'BIOTA': 2, 'SEDIMENT': 3, 'SUSPENDED_MATTER': 4}, # Lookup table for sample type
    col_name:str='SAMPLE_TYPE', # Column name to store the sample type id
):

Add a column with the sample type as defined in the CDL.

Let’s test the callback:

dfs = {smp_type: pd.DataFrame({'col_test': [0, 1, 2]}) for smp_type in SMP_TYPE_LUT.keys()};

tfm = Transformer(dfs, cbs=[AddSampleTypeIdColumnCB()])
dfs_test = tfm()

for smp_type in SMP_TYPE_LUT.keys():
    test_eq(dfs_test[smp_type]['SAMPLE_TYPE'].unique().item(), SMP_TYPE_LUT[smp_type])

source

RenameColumnsCB


def RenameColumnsCB(
    renaming_rules:dict, # Renaming rules {old_name: new_name}
):

Rename variables to MARIS standard names, keeping only renamed columns.

# RenameColumnsCB now also selects: only renamed columns survive
dfs = {'SEAWATER': pd.DataFrame({'a': [1, 2], 'b': [3, 4], 'c': [5, 6]})}
rules = {'a': 'ALPHA', 'b': 'BETA'}
tfm = Transformer(dfs, cbs=[RenameColumnsCB(rules)])
result = tfm()['SEAWATER']
test_eq(list(result.columns), ['ALPHA', 'BETA'])
test_eq(result['ALPHA'].to_list(), [1, 2])

# Column 'c' is dropped: not in the renaming rules
test_eq('c' in result.columns, False)

# Works across multiple groups
dfs = {'SEAWATER': pd.DataFrame({'a': [1], 'b': [2]}),
       'BIOTA':    pd.DataFrame({'a': [3], 'b': [4]})}
rules = {'a': 'VAL'}
tfm = Transformer(dfs, cbs=[RenameColumnsCB(rules)])
result = tfm()
test_eq(list(result['SEAWATER'].columns), ['VAL'])
test_eq(list(result['BIOTA'].columns), ['VAL'])
test_eq(result['SEAWATER']['VAL'].iloc[0], 1)
test_eq(result['BIOTA']['VAL'].iloc[0], 3)

source

RemoveAllNAValuesCB


def RemoveAllNAValuesCB(
    cols_to_check:Union[Dict[str, list], list], # Dict or list of columns to check
    how:str='all', # How to handle NA values 'all' or 'any'
):

Remove rows with all NA values in specified columns.

Tests for list and dict cols_to_check inputs: rows where all specified columns are NA are removed, per group.

result
{'SEAWATER':    VAL
 0    1,
 'BIOTA':    VAL
 0    3}
# List input: only row 'c' has all NA in subset cols → it's dropped, row 'b' survives (uncertainty=0.2)
dfs = {'SEAWATER': pd.DataFrame({'value': [1.0, np.nan, np.nan, 4.0],
                                  'uncertainty': [0.1, 0.2, np.nan, 0.4],
                                  'meta': ['a', 'b', 'c', 'd']})}
tfm = Transformer(dfs, cbs=[RemoveAllNAValuesCB(cols_to_check=['value', 'uncertainty'], how='all')])
result = tfm()['SEAWATER']
test_eq(len(result), 3)
test_eq(result['meta'].to_list(), ['a', 'b', 'd'])

# Dict input: same behaviour, per-group column lists
dfs = {'SEAWATER': pd.DataFrame({'value': [1.0, np.nan], 'uncertainty': [0.1, np.nan]}),
       'SEDIMENT': pd.DataFrame({'value': [np.nan, 2.0], 'uncertainty': [np.nan, 0.2]})}
tfm2 = Transformer(dfs, cbs=[RemoveAllNAValuesCB(
    cols_to_check={'SEAWATER': ['value', 'uncertainty'], 'SEDIMENT': ['value', 'uncertainty']},
    how='all')])
result2 = tfm2()
test_eq(len(result2['SEAWATER']), 1)
test_eq(len(result2['SEDIMENT']), 1)

Comparison & audit


source

CompareDfsAndTfmCB


def CompareDfsAndTfmCB(
    dfs:Dict[str, pd.DataFrame], # Original dataframes
):

Create a dataframe of removed data and track changes in row counts due to transformations.

CompareDfsAndTfmCB compares original vs. transformed dataframes. It creates:

  • tfm.dfs_removed: data present in the original but absent after transformation
  • tfm.compare_stats: row count summary per group

source

UniqueIndexCB


def UniqueIndexCB(
    index_name:str='ID'
):

Set unique index for each group.

Test UniqueIndexCB: sequential IDs per group, starting from 0 independently for each group.

# Test UniqueIndexCB: check sequential ID per group, starting from
# 0 for each group independently
dfs = {'SEAWATER': pd.DataFrame({'val': [10, 20, 30]}),
       'SEDIMENT': pd.DataFrame({'val': [100, 200]})}

tfm = Transformer(dfs, cbs=[UniqueIndexCB()])
result = tfm()

test_eq(result['SEAWATER']['ID'].to_list(), [0, 1, 2])
test_eq(result['SEDIMENT']['ID'].to_list(), [0, 1])

Time

Callbacks for parsing, encoding, and decoding time columns to/from NetCDF-compatible numeric values.


source

ParseTimeCB


def ParseTimeCB(
    time_col_name:str='TIME'
):

Parse time column from ISO8601 string to datetime.

dfs_test = {
    'SEAWATER': pd.DataFrame({'TIME': ['2023-01-01T00:00:00', '2023-06-15T12:30:00']}),
    'BIOTA':    pd.DataFrame({'TIME': ['2010-03-22T08:00:00']}),
}
tfm = Transformer(dfs_test, cbs=[ParseTimeCB()])
dfs_result = tfm()
test_eq(dfs_result['SEAWATER']['TIME'].dtype.kind, 'M')
test_eq(dfs_result['BIOTA']['TIME'].dtype.kind, 'M')

source

EncodeTimeCB


def EncodeTimeCB(
    col_time:str='TIME', # Time column name
    verbose:bool=False, # Print warning about missing time values
    fn_units:Callable=get_time_units, # Function returning the time units
):

Encode time as seconds since epoch.

dfs_test = {
    'SEAWATER': pd.DataFrame({
        'TIME': [pd.Timestamp(f'2023-01-0{t}') for t in [1, 2]],
        'value': [1, 2]
        }),
    'SEDIMENT': pd.DataFrame({
        'TIME': [pd.Timestamp(f'2023-01-0{t}') for t in [3, 4]],
        'value': [3, 4]
        }),
}

units = 'seconds since 1970-01-01 00:00:00.0'
tfm = Transformer(dfs_test, cbs=[
    EncodeTimeCB(fn_units=lambda: units)
    ], inplace=False)
dfs_result = tfm()

test_eq(dfs_result['SEAWATER'].TIME.dtype, 'int64')
test_eq(dfs_result['SEDIMENT'].TIME.dtype, 'int64')


test_eq(dfs_result['SEAWATER'].TIME, dfs_test['SEAWATER'].TIME.apply(lambda x: date2num(x, units=units)))
test_eq(dfs_result['SEDIMENT'].TIME, dfs_test['SEDIMENT'].TIME.apply(lambda x: date2num(x, units=units)))

source

DecodeTimeCB


def DecodeTimeCB(
    col_time:str='TIME', fn_units:Callable=get_time_units, # Function returning the time units
):

Decode time from seconds since epoch to datetime format.

dfs_test = {
    'SEAWATER': pd.DataFrame({
        'TIME': [1672531200, 1672617600],  # 2023-01-01, 2023-01-02 in seconds since epoch
        'value': [1, 2]
        }),
    'SEDIMENT': pd.DataFrame({
        'TIME': [1672704000, 1672790400],  # 2023-01-03, 2023-01-04 in seconds since epoch
        'value': [3, 4]
        }),
}

units = 'seconds since 1970-01-01 00:00:00.0'
tfm = Transformer(dfs_test, cbs=[
    DecodeTimeCB(fn_units=lambda: units)
    ], inplace=False)
dfs_result = tfm()

# Test that times were converted to datetime
test_eq(dfs_result['SEAWATER'].TIME.dtype.kind, 'M')
test_eq(dfs_result['SEDIMENT'].TIME.dtype.kind, 'M')

# Test specific datetime values
expected_times_seawater = pd.to_datetime(['2023-01-01', '2023-01-02'])
expected_times_sediment = pd.to_datetime(['2023-01-03', '2023-01-04'])

test_eq(dfs_result['SEAWATER'].TIME.dt.date, expected_times_seawater.date)
test_eq(dfs_result['SEDIMENT'].TIME.dt.date, expected_times_sediment.date)