Exported source
class Callback():
"Base class for callbacks."
= 0 order
The Transformer
class is designed to facilitate the application of a series of callbacks to a set of dataframes. It provides a structured way to apply transformations (i.e Callback
) to the data, with a focus on flexibility and ease of use.
Callback ()
Base class for callbacks.
run_cbs (cbs:List[__main__.Callback], obj:Any)
Run the callbacks in the order they are specified.
Type | Details | |
---|---|---|
cbs | List | List of callbacks to run |
obj | Any | Object to pass to the callbacks |
Transformer (data:Union[Dict[str,pandas.core.frame.DataFrame],pandas.core .frame.DataFrame], cbs:Optional[List[__main__.Callback]]=None, custom_maps:Dict=None, inplace:bool=False)
Transform the dataframe(s) according to the specified callbacks.
Type | Default | Details | |
---|---|---|---|
data | Union | Data to be transformed | |
cbs | Optional | None | List of callbacks to run |
custom_maps | Dict | None | |
inplace | bool | False | Whether to modify the dataframe(s) in place |
class Transformer():
"Transform the dataframe(s) according to the specified callbacks."
def __init__(self,
data: Union[Dict[str, pd.DataFrame], pd.DataFrame], # Data to be transformed
cbs: Optional[List[Callback]]=None, # List of callbacks to run
custom_maps: Dict = None,
inplace: bool=False # Whether to modify the dataframe(s) in place
):
fc.store_attr()
self.is_single_df = isinstance(data, pd.DataFrame)
self.df, self.dfs = self._prepare_data(data, inplace)
self.logs = []
self.custom_maps = custom_maps or defaultdict(lambda: defaultdict(dict))
def _prepare_data(self, data, inplace):
if self.is_single_df:
return (data if inplace else data.copy()), None
else:
return None, (data if inplace else {k: v.copy() for k, v in data.items()})
def unique(self, col_name: str) -> np.ndarray:
"Distinct values of a specific column present in all groups."
if self.is_single_df:
values = self.df.get(col_name, pd.Series()).dropna().values
else:
columns = [df.get(col_name) for df in self.dfs.values() if df.get(col_name) is not None]
values = np.concatenate([col.dropna().values for col in columns]) if columns else []
return np.unique(values)
def __call__(self):
"Transform the dataframe(s) according to the specified callbacks."
if self.cbs: run_cbs(self.cbs, self)
return self.df if self.dfs is None else self.dfs
Below, a few examples of how to use the Transformer
class. Let’s define first a test callback that adds 1
to the depth
:
And apply it to the following dataframes:
dfs = {'biota': pd.DataFrame({'id': [0, 1, 2], 'species': [0, 2, 0], 'depth': [2, 3, 4]}),
'seawater': pd.DataFrame({'id': [0, 1, 2], 'depth': [3, 4, 5]})}
tfm = Transformer(dfs, cbs=[TestCB()])
dfs_test = tfm()
fc.test_eq(dfs_test['biota']['depth'].to_list(), [3, 4, 5])
fc.test_eq(dfs_test['seawater']['depth'].to_list(), [4, 5, 6])
This section gathers callbacks that are used to transform the geographical coordinates.
SanitizeLonLatCB (lon_col:str='LON', lat_col:str='LAT', verbose:bool=False)
Drop rows with invalid longitude & latitude values. Convert ,
separator to .
separator.
Type | Default | Details | |
---|---|---|---|
lon_col | str | LON | Longitude column name |
lat_col | str | LAT | Latitude column name |
verbose | bool | False | Whether to print the number of invalid longitude & latitude values |
class SanitizeLonLatCB(Callback):
"Drop rows with invalid longitude & latitude values. Convert `,` separator to `.` separator."
def __init__(self,
lon_col: str='LON', # Longitude column name
lat_col: str='LAT', # Latitude column name
verbose: bool=False # Whether to print the number of invalid longitude & latitude values
):
fc.store_attr()
def __call__(self, tfm: Transformer):
for grp, df in tfm.dfs.items():
# Convert `,` separator to `.` separator
df[self.lon_col] = df[self.lon_col].apply(lambda x: float(str(x).replace(',', '.')))
df[self.lat_col] = df[self.lat_col].apply(lambda x: float(str(x).replace(',', '.')))
# Mask zero values
mask_zeroes = (df[self.lon_col] == 0) & (df[self.lat_col] == 0)
nZeroes = mask_zeroes.sum()
if nZeroes and self.verbose:
print(f'The "{grp}" group contains {nZeroes} data points whose ({self.lon_col}, {self.lat_col}) = (0, 0)')
# Mask out of bounds values
mask_goob = (df[self.lon_col] < -180) | (df[self.lon_col] > 180) | (df[self.lat_col] < -90) | (df[self.lat_col] > 90)
nGoob = mask_goob.sum()
if nGoob and self.verbose:
print(f'The "{grp}" group contains {nGoob} data points with unrealistic {self.lon_col} or {self.lat_col} values.')
tfm.dfs[grp] = df.loc[~(mask_zeroes | mask_goob)]
RemapCB (fn_lut:Callable, col_remap:str, col_src:str, dest_grps:list[str]|str=dict_keys(['BIOTA', 'SEAWATER', 'SEDIMENT', 'SUSPENDED_MATTER']), default_value:Any=0, verbose:bool=False)
Generic MARIS remapping callback.
Type | Default | Details | |
---|---|---|---|
fn_lut | Callable | Function that returns the lookup table dictionary | |
col_remap | str | Name of the column to remap | |
col_src | str | Name of the column with the source values | |
dest_grps | list[str] | str | dict_keys([‘BIOTA’, ‘SEAWATER’, ‘SEDIMENT’, ‘SUSPENDED_MATTER’]) | List of destination groups |
default_value | Any | 0 | Default value for unmatched entries |
verbose | bool | False | Whether to print the number of unmatched entries |
class RemapCB(Callback):
"Generic MARIS remapping callback."
def __init__(self,
fn_lut: Callable, # Function that returns the lookup table dictionary
col_remap: str, # Name of the column to remap
col_src: str, # Name of the column with the source values
dest_grps: list[str]|str=NC_GROUPS.keys(), # List of destination groups
default_value: Any = 0, # Default value for unmatched entries
verbose: bool=False # Whether to print the number of unmatched entries
):
fc.store_attr()
self.lut = None
if isinstance(dest_grps, str): self.dest_grps = [dest_grps]
# Format the documentation string based on the type and content of dest_grps
if isinstance(self.dest_grps, list):
if len(self.dest_grps) > 1:
grp_str = ', '.join(self.dest_grps[:-1]) + ' and ' + self.dest_grps[-1]
else:
grp_str = self.dest_grps[0]
else:
grp_str = self.dest_grps
self.__doc__ = f"Remap values from '{col_src}' to '{col_remap}' for groups: {grp_str}."
def __call__(self, tfm):
self.lut = self.fn_lut()
for grp in self.dest_grps:
if grp in tfm.dfs:
self._remap_group(tfm.dfs[grp])
else:
print(f"Group {grp} not found in the dataframes.")
def _remap_group(self, df: pd.DataFrame):
df[self.col_remap] = df[self.col_src].apply(self._remap_value)
def _remap_value(self, value: str) -> Any:
value = value.strip() if isinstance(value, str) else value
match = self.lut.get(value, Match(self.default_value, None, None, None))
if isinstance(match, Match):
if match.matched_id == self.default_value:
if self.verbose:
print(f"Unmatched value: {value}")
return match.matched_id
else:
return match
LowerStripNameCB (col_src:str, col_dst:str=None, fn_transform:Callable=<function <lambda>>)
Convert values to lowercase and strip any trailing spaces.
Type | Default | Details | |
---|---|---|---|
col_src | str | Source column name e.g. ‘Nuclide’ | |
col_dst | str | None | Destination column name |
fn_transform | Callable | Transformation function |
class LowerStripNameCB(Callback):
"Convert values to lowercase and strip any trailing spaces."
def __init__(self,
col_src: str, # Source column name e.g. 'Nuclide'
col_dst: str=None, # Destination column name
fn_transform: Callable=lambda x: x.lower().strip() # Transformation function
):
fc.store_attr()
self.__doc__ = f"Convert '{col_src}' column values to lowercase, strip spaces, and store in '{col_dst}' column."
if not col_dst: self.col_dst = col_src
def _safe_transform(self, value):
"Ensure value is not NA and apply transformation function."
return value if pd.isna(value) else self.fn_transform(str(value))
def __call__(self, tfm):
for key in tfm.dfs.keys():
tfm.dfs[key][self.col_dst] = tfm.dfs[key][self.col_src].apply(self._safe_transform)
Let’s test the callback:
dfs = {'seawater': pd.DataFrame({'Nuclide': ['CS137', '226RA']})}
tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='Nuclide', col_dst='NUCLIDE')])
fc.test_eq(tfm()['seawater']['NUCLIDE'].to_list(), ['cs137', '226ra'])
tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='Nuclide')])
fc.test_eq(tfm()['seawater']['Nuclide'].to_list(), ['cs137', '226ra'])
The point is when (semi-automatic) remapping names generally:
AddSampleTypeIdColumnCB (lut:dict={'SEAWATER': 1, 'BIOTA': 2, 'SEDIMENT': 3, 'SUSPENDED_MATTER': 4}, col_name:str='SAMPLE_TYPE')
Base class for callbacks.
Type | Default | Details | |
---|---|---|---|
lut | dict | {‘SEAWATER’: 1, ‘BIOTA’: 2, ‘SEDIMENT’: 3, ‘SUSPENDED_MATTER’: 4} | Lookup table for sample type |
col_name | str | SAMPLE_TYPE | Column name to store the sample type id |
class AddSampleTypeIdColumnCB(Callback):
def __init__(self,
lut: dict=SMP_TYPE_LUT, # Lookup table for sample type
col_name: str='SAMPLE_TYPE' # Column name to store the sample type id
):
"Add a column with the sample type as defined in the CDL."
fc.store_attr()
def __call__(self, tfm):
for grp, df in tfm.dfs.items():
df[self.col_name] = self.lut[grp]
Let’s test the callback:
''' 05 Feb 2025 - NM - AddNuclideIdColumnCB is not used anymore.
#| exports
class AddNuclideIdColumnCB(Callback):
def __init__(self,
col_value: str, # Column name containing the nuclide name
lut_fname_fn: Callable=nuc_lut_path, # Function returning the lut path
col_name: str='nuclide_id' # Column name to store the nuclide id
):
"Add a column with the nuclide id."
fc.store_attr()
self.lut = get_lut(lut_fname_fn().parent, lut_fname_fn().name,
key='nc_name', value='nuclide_id', reverse=False)
def __call__(self, tfm: Transformer):
for grp, df in tfm.dfs.items():
df[self.col_name] = df[self.col_value].map(self.lut)
'''
' 05 Feb 2025 - NM - AddNuclideIdColumnCB is not used anymore.\n#| exports\nclass AddNuclideIdColumnCB(Callback):\n def __init__(self, \n col_value: str, # Column name containing the nuclide name\n lut_fname_fn: Callable=nuc_lut_path, # Function returning the lut path\n col_name: str=\'nuclide_id\' # Column name to store the nuclide id\n ): \n "Add a column with the nuclide id."\n fc.store_attr()\n self.lut = get_lut(lut_fname_fn().parent, lut_fname_fn().name, \n key=\'nc_name\', value=\'nuclide_id\', reverse=False)\n \n def __call__(self, tfm: Transformer):\n for grp, df in tfm.dfs.items(): \n df[self.col_name] = df[self.col_value].map(self.lut)\n'
SelectColumnsCB (cois:dict)
Select columns of interest.
Type | Details | |
---|---|---|
cois | dict | Columns of interest |
RenameColumnsCB (renaming_rules:dict)
Renaming variables to MARIS standard names.
Type | Details | |
---|---|---|
renaming_rules | dict | Renaming rules |
RemoveAllNAValuesCB (cols_to_check:Union[Dict[str,list],list], how:str='all')
Remove rows with all NA values in specified columns.
Type | Default | Details | |
---|---|---|---|
cols_to_check | Union | Dict or list of columns to check | |
how | str | all | How to handle NA values ‘all’ or ‘any’ |
class RemoveAllNAValuesCB(Callback):
"Remove rows with all NA values in specified columns."
def __init__(self,
cols_to_check: Union[Dict[str, list], list], # Dict or list of columns to check
how: str='all' # How to handle NA values 'all' or 'any'
):
fc.store_attr()
def __call__(self, tfm):
# Convert list to dict if cols_to_check is a list
cols_dict = (self.cols_to_check if isinstance(self.cols_to_check, dict)
else {k: self.cols_to_check for k in tfm.dfs.keys()})
for sample_type, columns in cols_dict.items():
tfm.dfs[sample_type].dropna(
subset=columns,
how=self.how,
inplace=True
)
Let’s test the callback:
def test_remove_all_na_values_cb():
"""Test RemoveAllNAValuesCB with both dict and list inputs."""
# Create sample data
sample_data = {
'SEAWATER': pd.DataFrame({
'value': [1.0, np.nan, np.nan, 4.0],
'uncertainty': [0.1, np.nan, np.nan, 0.4],
'other_col': ['a', 'b', 'c', 'd']
}),
'SEDIMENT': pd.DataFrame({
'value': [np.nan, 2.0, np.nan, np.nan],
'uncertainty': [np.nan, 0.2, np.nan, np.nan],
'meta': ['x', 'y', 'z', 'w']
})
}
tfm = Transformer(sample_data)
# Test with list input
cb1 = RemoveAllNAValuesCB(cols_to_check=['value', 'uncertainty'], how='all')
tfm_list = copy.deepcopy(tfm)
cb1(tfm_list)
# Test with dict input
cb2 = RemoveAllNAValuesCB(
cols_to_check={'SEAWATER': ['value', 'uncertainty'],
'SEDIMENT': ['value', 'uncertainty']},
how='all'
)
tfm_dict = copy.deepcopy(tfm)
cb2(tfm_dict)
# Verify both approaches give same results
for k in tfm.dfs.keys():
assert tfm_list.dfs[k].equals(tfm_dict.dfs[k])
assert len(tfm_list.dfs[k]) == len(tfm_dict.dfs[k])
CompareDfsAndTfmCB (dfs:Dict[str,pandas.core.frame.DataFrame])
Create a dataframe of removed data and track changes in row counts due to transformations.
Type | Details | |
---|---|---|
dfs | Dict | Original dataframes |
class CompareDfsAndTfmCB(Callback):
"Create a dataframe of removed data and track changes in row counts due to transformations."
def __init__(self,
dfs: Dict[str, pd.DataFrame] # Original dataframes
):
fc.store_attr()
def __call__(self, tfm: Transformer) -> None:
self._initialize_tfm_attributes(tfm)
for grp in tfm.dfs.keys():
self._compute_changes(grp, tfm)
def _initialize_tfm_attributes(self, tfm: Transformer) -> None:
tfm.dfs_removed = {}
tfm.compare_stats = {}
def _compute_changes(self,
grp: str, # The group key
tfm: Transformer # The transformation object containing `dfs`
) -> None:
"Compute and store changes including data removed and created during transformation."
original_df = self.dfs[grp]
transformed_df = tfm.dfs[grp]
# Calculate differences
original_count = len(original_df.index)
transformed_count = len(transformed_df.index)
removed_count = len(original_df.index.difference(transformed_df.index))
created_count = len(transformed_df.index.difference(original_df.index))
# Store results
tfm.dfs_removed[grp] = original_df.loc[original_df.index.difference(transformed_df.index)]
tfm.compare_stats[grp] = {
'Original row count (dfs)': original_count,
'Transformed row count (tfm.dfs)': transformed_count,
'Rows removed from original (tfm.dfs_removed)': removed_count,
'Rows created in transformed (tfm.dfs_created)': created_count
}
CompareDfsAndTfmCB
compares the original dataframes to the transformed dataframe. A dictionary of dataframes, tfm.dfs_dropped
, is created to include the data present in the original dataset but absent from the transformed data. tfm.compare_stats
provides a quick overview of the number of rows in both the original dataframes and the transformed dataframe.
For instance:
UniqueIndexCB (index_name='ID')
Set unique index for each group.
class UniqueIndexCB(Callback):
"Set unique index for each group."
def __init__(self,
index_name='ID'):
fc.store_attr()
def __call__(self, tfm):
for k in tfm.dfs.keys():
# Reset the index of the DataFrame and drop the old index
tfm.dfs[k] = tfm.dfs[k].reset_index(drop=True)
# Reset the index again and set the name of the new index to `ìndex_name``
tfm.dfs[k] = tfm.dfs[k].reset_index(names=[self.index_name])
EncodeTimeCB (col_time:str='TIME', fn_units:Callable=<function get_time_units>)
Encode time as seconds since epoch.
Type | Default | Details | |
---|---|---|---|
col_time | str | TIME | |
fn_units | Callable | get_time_units | Function returning the time units |
class EncodeTimeCB(Callback):
"Encode time as seconds since epoch."
def __init__(self,
col_time: str='TIME',
fn_units: Callable=get_time_units # Function returning the time units
):
fc.store_attr()
self.units = fn_units()
def __call__(self, tfm):
for grp, df in tfm.dfs.items():
n_missing = df[self.col_time].isna().sum()
if n_missing:
print(f"Warning: {n_missing} missing time value(s) in {grp}")
# Remove NaN times and convert to seconds since epoch
tfm.dfs[grp] = tfm.dfs[grp][tfm.dfs[grp][self.col_time].notna()]
tfm.dfs[grp][self.col_time] = tfm.dfs[grp][self.col_time].apply(lambda x: date2num(x, units=self.units))
dfs_test = {
'SEAWATER': pd.DataFrame({
'TIME': [pd.Timestamp(f'2023-01-0{t}') for t in [1, 2]],
'value': [1, 2]
}),
'SEDIMENT': pd.DataFrame({
'TIME': [pd.Timestamp(f'2023-01-0{t}') for t in [3, 4]],
'value': [3, 4]
}),
}
units = 'seconds since 1970-01-01 00:00:00.0'
tfm = Transformer(dfs_test, cbs=[
EncodeTimeCB(fn_units=lambda: units)
], inplace=False)
dfs_result = tfm()
fc.test_eq(dfs_result['SEAWATER'].TIME.dtype, 'int64')
fc.test_eq(dfs_result['SEDIMENT'].TIME.dtype, 'int64')
fc.test_eq(dfs_result['SEAWATER'].TIME, dfs_test['SEAWATER'].TIME.apply(lambda x: date2num(x, units=units)))
fc.test_eq(dfs_result['SEDIMENT'].TIME, dfs_test['SEDIMENT'].TIME.apply(lambda x: date2num(x, units=units)))
DecodeTimeCB (col_time:str='TIME', fn_units:Callable=<function get_time_units>)
Decode time from seconds since epoch to datetime format.
Type | Default | Details | |
---|---|---|---|
col_time | str | TIME | |
fn_units | Callable | get_time_units | Function returning the time units |
class DecodeTimeCB(Callback):
"Decode time from seconds since epoch to datetime format."
def __init__(self,
col_time: str='TIME',
fn_units: Callable=get_time_units # Function returning the time units
):
fc.store_attr()
self.units = fn_units()
def __call__(self, tfm):
for grp, df in tfm.dfs.items():
n_missing = df[self.col_time].isna().sum()
if n_missing:
print(f"Warning: {n_missing} missing time value(s) in {grp}.")
# Remove NaN times and convert to datetime
tfm.dfs[grp] = tfm.dfs[grp][tfm.dfs[grp][self.col_time].notna()]
tfm.dfs[grp][self.col_time] = df[self.col_time].apply(
lambda x: num2date(x, units=self.units, only_use_cftime_datetimes=False)
)
dfs_test = {
'SEAWATER': pd.DataFrame({
'TIME': [1672531200, 1672617600], # 2023-01-01, 2023-01-02 in seconds since epoch
'value': [1, 2]
}),
'SEDIMENT': pd.DataFrame({
'TIME': [1672704000, 1672790400], # 2023-01-03, 2023-01-04 in seconds since epoch
'value': [3, 4]
}),
}
units = 'seconds since 1970-01-01 00:00:00.0'
tfm = Transformer(dfs_test, cbs=[
DecodeTimeCB(fn_units=lambda: units)
], inplace=False)
dfs_result = tfm()
# Test that times were converted to datetime
fc.test_eq(dfs_result['SEAWATER'].TIME.dtype, 'datetime64[ns]')
fc.test_eq(dfs_result['SEDIMENT'].TIME.dtype, 'datetime64[ns]')
# Test specific datetime values
expected_times_seawater = pd.to_datetime(['2023-01-01', '2023-01-02'])
expected_times_sediment = pd.to_datetime(['2023-01-03', '2023-01-04'])
fc.test_eq(dfs_result['SEAWATER'].TIME.dt.date, expected_times_seawater.date)
fc.test_eq(dfs_result['SEDIMENT'].TIME.dt.date, expected_times_sediment.date)