from nbdev.showdoc import show_docCallbacks
Foundation
The Transformer class coordinates an ordered pipeline of Callback objects applied to a dict of DataFrames (one per sample type group) or a single DataFrame.
Each callback modifies its group’s DataFrame in sequence. This is how provider-specific conventions are gradually normalised to the MARIS schema.
Callback
def Callback(
):
Base class for callbacks.
PerGroupCB
def PerGroupCB(
grps:list=None, # Groups to process; None = all groups in `tfm.dfs`
):
Calls each_grp for each group in tfm.dfs; set grps to restrict to specific groups.
PerGroupCB.each_grp
def each_grp(
grp:str, # Group key e.g. `'SEAWATER'`, `'BIOTA'`
df:pd.DataFrame, # DataFrame for this group
tfm, # Parent [`Transformer`](https://franckalbinet.github.io/marisco/api/callbacks.html#transformer)
):
Override to implement per-group transformation logic.
run_cbs
def run_cbs(
cbs:List[Callback], # List of callbacks to run
obj:Any, # Object to pass to the callbacks
):
Run the callbacks in the order they are specified.
Testing that callbacks run in order and only those with docstrings get logged:
class DocCB(Callback):
"Runs second."
order = 1
def __call__(self, obj): pass
class DocSecondCB(Callback):
"Runs first."
order = 0
def __call__(self, obj): pass
class NoDocCB(Callback):
def __call__(self, obj): pass
obj = type('Obj', (), {'logs': []})()
run_cbs([DocCB(), NoDocCB(), DocSecondCB()], obj)
test_eq(obj.logs, ['Runs first.', 'Runs second.'])Transformer
def Transformer(
data:Union[Dict[str, pd.DataFrame], pd.DataFrame], # Data to be transformed
cbs:Optional[List[Callback]]=None, # List of callbacks to run
custom_maps:Dict=None, inplace:bool=False, # Whether to modify the dataframe(s) in place
):
Transform the dataframe(s) according to the specified callbacks.
Below, a few examples of how to use the Transformer class. Let’s define first a test callback that adds 1 to the depth:
class TestCB(Callback):
"A test callback to add 1 to the depth."
def __call__(self, tfm: Transformer):
for grp, df in tfm.dfs.items():
df['depth'] = df['depth'].apply(lambda x: x+1)And apply it to the following dataframes:
dfs = {'biota': pd.DataFrame({'id': [0, 1, 2], 'species': [0, 2, 0], 'depth': [2, 3, 4]}),
'seawater': pd.DataFrame({'id': [0, 1, 2], 'depth': [3, 4, 5]})}
tfm = Transformer(dfs, cbs=[TestCB()])
dfs_test = tfm()
test_eq(dfs_test['biota']['depth'].to_list(), [3, 4, 5])
test_eq(dfs_test['seawater']['depth'].to_list(), [4, 5, 6])Transformer also accepts a single pd.DataFrame (pre-split handlers like geotraces). In that case callbacks access tfm.df instead of tfm.dfs:
class TestCB(Callback):
"A test callback to add 1 to the depth."
def __call__(self, tfm: Transformer):
tfm.df['depth'] = tfm.df['depth'].apply(lambda x: x+1)df = pd.DataFrame({'id': [0, 1, 2], 'species': [0, 2, 0], 'depth': [2, 3, 4]})
tfm = Transformer(df, cbs=[TestCB()])
df_test = tfm()
test_eq(df_test['depth'].to_list(), [3, 4, 5])Subclass PerGroupCB and override each_grp. The loop, missing-group guard, and optional scoping are inherited. Use grps to restrict to specific groups:
# All groups: stamp every group with a constant flag
class AddFlagCB(PerGroupCB):
def __init__(self, col, val): store_attr()
def each_grp(self, grp, df, tfm): df[self.col] = self.val
dfs = {'SEAWATER': pd.DataFrame({'depth': [1, 2]}),
'BIOTA': pd.DataFrame({'depth': [3, 4]})}
tfm = Transformer(dfs, cbs=[AddFlagCB(col='flag', val=1)])
dfs_result = tfm()
test_eq(dfs_result['SEAWATER']['flag'].to_list(), [1, 1])
test_eq(dfs_result['BIOTA']['flag'].to_list(), [1, 1])# Scoped: only BIOTA gets the column; SEAWATER is untouched
class BiotaFlagCB(PerGroupCB):
grps = ['BIOTA']
def each_grp(self, grp, df, tfm): df['is_biota'] = True
tfm2 = Transformer(dfs, cbs=[BiotaFlagCB()])
dfs_result2 = tfm2()
test_eq('is_biota' in dfs_result2['BIOTA'].columns, True)
test_eq('is_biota' in dfs_result2['SEAWATER'].columns, False)Cleaning & validation
Callbacks for cleaning and validating data: coordinate sanitisation (invalid lon/lat), and removal of rows with all-NA values in key columns.
SanitizeLonLatCB
def SanitizeLonLatCB(
lon_col:str='LON', # Longitude column name
lat_col:str='LAT', # Latitude column name
verbose:bool=False, # Whether to print the number of invalid longitude & latitude values
):
Drop rows with invalid longitude & latitude values. Convert , separator to . separator.
# Check that measurements located at (0,0) get removed
dfs = {'BIOTA': pd.DataFrame({'LON': [0, 1, 0], 'LAT': [0, 2, 0]})}
tfm = Transformer(dfs, cbs=[SanitizeLonLatCB()])
tfm()['BIOTA']
expected = [1., 2.]
test_eq(tfm()['BIOTA'].iloc[0].to_list(), expected)# Check that comma decimal separator get replaced by point instead
dfs = {'BIOTA': pd.DataFrame({'LON': ['45,2'], 'LAT': ['43,1']})}
tfm = Transformer(dfs, cbs=[SanitizeLonLatCB()])
tfm()['BIOTA']
expected = [45.2, 43.1]
test_eq(tfm()['BIOTA'].iloc[0].to_list(), expected)# Check that out of bounds lon or lat get removed
dfs = {'BIOTA': pd.DataFrame({'LON': [-190, 190, 1, 2, 1.1], 'LAT': [1, 2, 91, -91, 2.2]})}
tfm = Transformer(dfs, cbs=[SanitizeLonLatCB()])
tfm()['BIOTA']
expected = [1.1, 2.2]
test_eq(tfm()['BIOTA'].iloc[0].to_list(), expected)Value mapping
RemapCB
def RemapCB(
lut:dict | Callable, # Lookup: dict, or callable(dfs)->dict
col_remap:str, # Destination column to create
col_src:str, # Source column with provider values
default_val:int=0, # Value assigned to unmapped source values
grps:list[str]=None, # Groups to process (None = all)
):
Remap source values to MARIS standard identifiers using a lookup table.
RemapCB
def RemapCB(
lut:dict | Callable, # Lookup: dict, or callable(dfs)->dict
col_remap:str, # Destination column to create
col_src:str, # Source column with provider values
default_val:int=0, # Value assigned to unmapped source values
grps:list[str]=None, # Groups to process (None = all)
):
Remap source values to MARIS standard identifiers using a lookup table.
Here are tests demonstrating various RemapCB behaviours: basic mapping, default fallback, whitespace handling, group scoping, custom default values, and lazy LUT resolution via callable factories.
# Basic remapping: values present in the LUT get mapped correctly
lut_dict = {'Cs-137': 1, 'K-40': 2, 'Sr-90': 3}
dfs = {'SEAWATER': pd.DataFrame({'nuclide': ['Cs-137', 'K-40', 'Sr-90']})}
tfm = Transformer(dfs, cbs=[RemapCB(lut=lut_dict, col_remap='NUCLIDE_ID', col_src='nuclide')])
result = tfm()['SEAWATER']
test_eq(result['NUCLIDE_ID'].to_list(), [1, 2, 3])# Default fallback: values NOT in the LUT get default_value (0)
dfs = {'SEAWATER': pd.DataFrame({'nuclide': ['Cs-137', 'Am-241']})}
tfm = Transformer(dfs, cbs=[RemapCB(lut=lut_dict, col_remap='NUCLIDE_ID', col_src='nuclide')])
result = tfm()['SEAWATER']
test_eq(result['NUCLIDE_ID'].to_list(), [1, 0])# Group scoping: only BIOTA gets remapped, SEAWATER untouched
dfs = {'SEAWATER': pd.DataFrame({'nuclide': ['Cs-137']}),
'BIOTA': pd.DataFrame({'nuclide': ['K-40']})}
tfm = Transformer(dfs, cbs=[RemapCB(lut=lut_dict, col_remap='NUCLIDE_ID', col_src='nuclide',
grps=['BIOTA'])])
result = tfm()
test_eq('NUCLIDE_ID' in result['SEAWATER'].columns, False)
test_eq(result['BIOTA']['NUCLIDE_ID'].to_list(), [2])# Custom default_val
dfs = {'SEAWATER': pd.DataFrame({'nuclide': ['Am-241']})}
tfm = Transformer(dfs, cbs=[RemapCB(lut=lut_dict, col_remap='NUCLIDE_ID', col_src='nuclide',
default_val=-1)])
result = tfm()['SEAWATER']
test_eq(result['NUCLIDE_ID'].to_list(), [-1])from marisco.match import make_lut_from
# Provider supplies their own nuclide codes
prov_nomencl = pd.DataFrame({
'code': ['h-3', 'k-40'],
'maris_ref': ['h3', 'k40']
})
nuclide_lut = make_lut_from(prov_nomencl, key_col='code', match_col='maris_ref', lut_key='NUCLIDE')
dfs = {'SEAWATER': pd.DataFrame({'NUCLIDE': ['h-3', 'k-40']})}
tfm = Transformer(dfs, cbs=[RemapCB(lut=nuclide_lut, col_remap='NUCLIDE_ID', col_src='NUCLIDE')])
result = tfm()['SEAWATER']
# h-3 → h3 → nuclide_id=1, k-40 → k40 → nuclide_id=4
test_eq(result['NUCLIDE_ID'].to_list(), [1, 4])# Only BIOTA gets remapped
dfs = {'SEAWATER': pd.DataFrame({'NUCLIDE': ['h-3']}),
'BIOTA': pd.DataFrame({'NUCLIDE': ['k-40']})}
tfm = Transformer(dfs, cbs=[RemapCB(lut=nuclide_lut, col_remap='NUCLIDE_ID', col_src='NUCLIDE',
grps=['BIOTA'])])
result = tfm()
test_eq('NUCLIDE_ID' in result['SEAWATER'].columns, False)
test_eq(result['BIOTA']['NUCLIDE_ID'].to_list(), [4])from marisco.match import make_lut
nuclide_lut = make_lut('NUCLIDE')
dfs = {'SEAWATER': pd.DataFrame({'NUCLIDE': ['h3']}),
'BIOTA': pd.DataFrame({'NUCLIDE': ['k40']})}
tfm = Transformer(dfs, cbs=[RemapCB(lut=nuclide_lut, col_remap='NUCLIDE_ID', col_src='NUCLIDE',
grps=['BIOTA'])])
result = tfm()
test_eq('NUCLIDE_ID' in result['SEAWATER'].columns, False)
test_eq(result['BIOTA']['NUCLIDE_ID'].to_list(), [4])LowerStripNameCB
def LowerStripNameCB(
col_src:str, # Source column name e.g. 'Nuclide'
col_dst:str=None, # Destination column name
fn_transform:Callable=<lambda>, # Transformation function
):
Convert values to lowercase and strip any trailing spaces.
Let’s test the callback:
dfs = {'seawater': pd.DataFrame({'Nuclide': ['CS137', '226RA']})}
tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='Nuclide', col_dst='NUCLIDE')])
test_eq(tfm()['seawater']['NUCLIDE'].to_list(), ['cs137', '226ra'])
tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='Nuclide')])
test_eq(tfm()['seawater']['Nuclide'].to_list(), ['cs137', '226ra'])Schema alignment
AddSampleTypeIdColumnCB
def AddSampleTypeIdColumnCB(
lut:dict={'SEAWATER': 1, 'BIOTA': 2, 'SEDIMENT': 3, 'SUSPENDED_MATTER': 4}, # Lookup table for sample type
col_name:str='SAMPLE_TYPE', # Column name to store the sample type id
):
Add a column with the sample type as defined in the CDL.
Let’s test the callback:
dfs = {smp_type: pd.DataFrame({'col_test': [0, 1, 2]}) for smp_type in SMP_TYPE_LUT.keys()};
tfm = Transformer(dfs, cbs=[AddSampleTypeIdColumnCB()])
dfs_test = tfm()
for smp_type in SMP_TYPE_LUT.keys():
test_eq(dfs_test[smp_type]['SAMPLE_TYPE'].unique().item(), SMP_TYPE_LUT[smp_type])RenameColumnsCB
def RenameColumnsCB(
renaming_rules:dict, # Renaming rules {old_name: new_name}
):
Rename variables to MARIS standard names, keeping only renamed columns.
# RenameColumnsCB now also selects: only renamed columns survive
dfs = {'SEAWATER': pd.DataFrame({'a': [1, 2], 'b': [3, 4], 'c': [5, 6]})}
rules = {'a': 'ALPHA', 'b': 'BETA'}
tfm = Transformer(dfs, cbs=[RenameColumnsCB(rules)])
result = tfm()['SEAWATER']
test_eq(list(result.columns), ['ALPHA', 'BETA'])
test_eq(result['ALPHA'].to_list(), [1, 2])
# Column 'c' is dropped: not in the renaming rules
test_eq('c' in result.columns, False)
# Works across multiple groups
dfs = {'SEAWATER': pd.DataFrame({'a': [1], 'b': [2]}),
'BIOTA': pd.DataFrame({'a': [3], 'b': [4]})}
rules = {'a': 'VAL'}
tfm = Transformer(dfs, cbs=[RenameColumnsCB(rules)])
result = tfm()
test_eq(list(result['SEAWATER'].columns), ['VAL'])
test_eq(list(result['BIOTA'].columns), ['VAL'])
test_eq(result['SEAWATER']['VAL'].iloc[0], 1)
test_eq(result['BIOTA']['VAL'].iloc[0], 3)Comparison & audit
CompareDfsAndTfmCB
def CompareDfsAndTfmCB(
dfs:Dict[str, pd.DataFrame], # Original dataframes
):
Create a dataframe of removed data and track changes in row counts due to transformations.
CompareDfsAndTfmCB compares original vs. transformed dataframes. It creates:
tfm.dfs_removed: data present in the original but absent after transformationtfm.compare_stats: row count summary per group
UniqueIndexCB
def UniqueIndexCB(
index_name:str='ID'
):
Set unique index for each group.
Test UniqueIndexCB: sequential IDs per group, starting from 0 independently for each group.
# Test UniqueIndexCB: check sequential ID per group, starting from
# 0 for each group independently
dfs = {'SEAWATER': pd.DataFrame({'val': [10, 20, 30]}),
'SEDIMENT': pd.DataFrame({'val': [100, 200]})}
tfm = Transformer(dfs, cbs=[UniqueIndexCB()])
result = tfm()
test_eq(result['SEAWATER']['ID'].to_list(), [0, 1, 2])
test_eq(result['SEDIMENT']['ID'].to_list(), [0, 1])Time
Callbacks for parsing, encoding, and decoding time columns to/from NetCDF-compatible numeric values.
ParseTimeCB
def ParseTimeCB(
time_col_name:str='TIME'
):
Parse time column from ISO8601 string to datetime.
dfs_test = {
'SEAWATER': pd.DataFrame({'TIME': ['2023-01-01T00:00:00', '2023-06-15T12:30:00']}),
'BIOTA': pd.DataFrame({'TIME': ['2010-03-22T08:00:00']}),
}
tfm = Transformer(dfs_test, cbs=[ParseTimeCB()])
dfs_result = tfm()
test_eq(dfs_result['SEAWATER']['TIME'].dtype.kind, 'M')
test_eq(dfs_result['BIOTA']['TIME'].dtype.kind, 'M')EncodeTimeCB
def EncodeTimeCB(
col_time:str='TIME', # Time column name
verbose:bool=False, # Print warning about missing time values
fn_units:Callable=get_time_units, # Function returning the time units
):
Encode time as seconds since epoch.
dfs_test = {
'SEAWATER': pd.DataFrame({
'TIME': [pd.Timestamp(f'2023-01-0{t}') for t in [1, 2]],
'value': [1, 2]
}),
'SEDIMENT': pd.DataFrame({
'TIME': [pd.Timestamp(f'2023-01-0{t}') for t in [3, 4]],
'value': [3, 4]
}),
}
units = 'seconds since 1970-01-01 00:00:00.0'
tfm = Transformer(dfs_test, cbs=[
EncodeTimeCB(fn_units=lambda: units)
], inplace=False)
dfs_result = tfm()
test_eq(dfs_result['SEAWATER'].TIME.dtype, 'int64')
test_eq(dfs_result['SEDIMENT'].TIME.dtype, 'int64')
test_eq(dfs_result['SEAWATER'].TIME, dfs_test['SEAWATER'].TIME.apply(lambda x: date2num(x, units=units)))
test_eq(dfs_result['SEDIMENT'].TIME, dfs_test['SEDIMENT'].TIME.apply(lambda x: date2num(x, units=units)))DecodeTimeCB
def DecodeTimeCB(
col_time:str='TIME', fn_units:Callable=get_time_units, # Function returning the time units
):
Decode time from seconds since epoch to datetime format.
dfs_test = {
'SEAWATER': pd.DataFrame({
'TIME': [1672531200, 1672617600], # 2023-01-01, 2023-01-02 in seconds since epoch
'value': [1, 2]
}),
'SEDIMENT': pd.DataFrame({
'TIME': [1672704000, 1672790400], # 2023-01-03, 2023-01-04 in seconds since epoch
'value': [3, 4]
}),
}
units = 'seconds since 1970-01-01 00:00:00.0'
tfm = Transformer(dfs_test, cbs=[
DecodeTimeCB(fn_units=lambda: units)
], inplace=False)
dfs_result = tfm()
# Test that times were converted to datetime
test_eq(dfs_result['SEAWATER'].TIME.dtype.kind, 'M')
test_eq(dfs_result['SEDIMENT'].TIME.dtype.kind, 'M')
# Test specific datetime values
expected_times_seawater = pd.to_datetime(['2023-01-01', '2023-01-02'])
expected_times_sediment = pd.to_datetime(['2023-01-03', '2023-01-04'])
test_eq(dfs_result['SEAWATER'].TIME.dt.date, expected_times_seawater.date)
test_eq(dfs_result['SEDIMENT'].TIME.dt.date, expected_times_sediment.date)