Exported source
class Callback():
"Base class for callbacks."
= 0 order
The Transformer
class is designed to facilitate the application of a series of callbacks to a set of dataframes. It provides a structured way to apply transformations (i.e Callback
) to the data, with a focus on flexibility and ease of use.
Callback ()
Base class for callbacks.
run_cbs (cbs:List[__main__.Callback], obj:Any)
Run the callbacks in the order they are specified.
Type | Details | |
---|---|---|
cbs | List | List of callbacks to run |
obj | Any | Object to pass to the callbacks |
Transformer (data:Union[Dict[str,pandas.core.frame.DataFrame],pandas.core .frame.DataFrame], cbs:Optional[List[__main__.Callback]]=None, inplace:bool=False)
Transform the dataframe(s) according to the specified callbacks.
Type | Default | Details | |
---|---|---|---|
data | Union | Data to be transformed | |
cbs | Optional | None | List of callbacks to run |
inplace | bool | False | Whether to modify the dataframe(s) in place |
class Transformer():
"Transform the dataframe(s) according to the specified callbacks."
def __init__(self,
data: Union[Dict[str, pd.DataFrame], pd.DataFrame], # Data to be transformed
cbs: Optional[List[Callback]]=None, # List of callbacks to run
inplace: bool=False # Whether to modify the dataframe(s) in place
):
fc.store_attr()
self.is_single_df = isinstance(data, pd.DataFrame)
self.df, self.dfs = self._prepare_data(data, inplace)
self.logs = []
def _prepare_data(self, data, inplace):
if self.is_single_df:
return (data if inplace else data.copy()), None
else:
return None, (data if inplace else {k: v.copy() for k, v in data.items()})
def unique(self, col_name: str) -> np.ndarray:
"Distinct values of a specific column present in all groups."
if self.is_single_df:
values = self.df.get(col_name, pd.Series()).dropna().values
else:
columns = [df.get(col_name) for df in self.dfs.values() if df.get(col_name) is not None]
values = np.concatenate([col.dropna().values for col in columns]) if columns else []
return np.unique(values)
def __call__(self):
"Transform the dataframe(s) according to the specified callbacks."
if self.cbs: run_cbs(self.cbs, self)
return self.df if self.dfs is None else self.dfs
Below, a few examples of how to use the Transformer
class. Let’s define first a test callback that adds 1
to the depth
:
And apply it to the following dataframes:
dfs = {'biota': pd.DataFrame({'id': [0, 1, 2], 'species': [0, 2, 0], 'depth': [2, 3, 4]}),
'seawater': pd.DataFrame({'id': [0, 1, 2], 'depth': [3, 4, 5]})}
tfm = Transformer(dfs, cbs=[TestCB()])
dfs_test = tfm()
fc.test_eq(dfs_test['biota']['depth'].to_list(), [3, 4, 5])
fc.test_eq(dfs_test['seawater']['depth'].to_list(), [4, 5, 6])
This section gathers callbacks that are used to transform the geographical coordinates.
SanitizeLonLatCB (lon_col:str='LON', lat_col:str='LAT', verbose:bool=False)
Drop rows with invalid longitude & latitude values. Convert ,
separator to .
separator.
Type | Default | Details | |
---|---|---|---|
lon_col | str | LON | Longitude column name |
lat_col | str | LAT | Latitude column name |
verbose | bool | False | Whether to print the number of invalid longitude & latitude values |
class SanitizeLonLatCB(Callback):
"Drop rows with invalid longitude & latitude values. Convert `,` separator to `.` separator."
def __init__(self,
lon_col: str='LON', # Longitude column name
lat_col: str='LAT', # Latitude column name
verbose: bool=False # Whether to print the number of invalid longitude & latitude values
):
fc.store_attr()
def __call__(self, tfm: Transformer):
for grp, df in tfm.dfs.items():
# Convert `,` separator to `.` separator
df[self.lon_col] = df[self.lon_col].apply(lambda x: float(str(x).replace(',', '.')))
df[self.lat_col] = df[self.lat_col].apply(lambda x: float(str(x).replace(',', '.')))
# Mask zero values
mask_zeroes = (df[self.lon_col] == 0) & (df[self.lat_col] == 0)
nZeroes = mask_zeroes.sum()
if nZeroes and self.verbose:
print(f'The "{grp}" group contains {nZeroes} data points whose ({self.lon_col}, {self.lat_col}) = (0, 0)')
# Mask out of bounds values
mask_goob = (df[self.lon_col] < -180) | (df[self.lon_col] > 180) | (df[self.lat_col] < -90) | (df[self.lat_col] > 90)
nGoob = mask_goob.sum()
if nGoob and self.verbose:
print(f'The "{grp}" group contains {nGoob} data points with unrealistic {self.lon_col} or {self.lat_col} values.')
tfm.dfs[grp] = df.loc[~(mask_zeroes | mask_goob)]
RemapCB (fn_lut:Callable, col_remap:str, col_src:str, dest_grps:list[str]|str=dict_keys(['BIOTA', 'SEAWATER', 'SEDIMENT', 'SUSPENDED_MATTER']), default_value:Any=0, verbose:bool=False)
Generic MARIS remapping callback.
Type | Default | Details | |
---|---|---|---|
fn_lut | Callable | Function that returns the lookup table dictionary | |
col_remap | str | Name of the column to remap | |
col_src | str | Name of the column with the source values | |
dest_grps | list[str] | str | dict_keys([‘BIOTA’, ‘SEAWATER’, ‘SEDIMENT’, ‘SUSPENDED_MATTER’]) | List of destination groups |
default_value | Any | 0 | Default value for unmatched entries |
verbose | bool | False | Whether to print the number of unmatched entries |
class RemapCB(Callback):
"Generic MARIS remapping callback."
def __init__(self,
fn_lut: Callable, # Function that returns the lookup table dictionary
col_remap: str, # Name of the column to remap
col_src: str, # Name of the column with the source values
dest_grps: list[str]|str=NC_GROUPS.keys(), # List of destination groups
default_value: Any = 0, # Default value for unmatched entries
verbose: bool=False # Whether to print the number of unmatched entries
):
fc.store_attr()
self.lut = None
if isinstance(dest_grps, str): self.dest_grps = [dest_grps]
# Format the documentation string based on the type and content of dest_grps
if isinstance(self.dest_grps, list):
if len(self.dest_grps) > 1:
grp_str = ', '.join(self.dest_grps[:-1]) + ' and ' + self.dest_grps[-1]
else:
grp_str = self.dest_grps[0]
else:
grp_str = self.dest_grps
self.__doc__ = f"Remap values from '{col_src}' to '{col_remap}' for groups: {grp_str}."
def __call__(self, tfm):
self.lut = self.fn_lut()
for grp in self.dest_grps:
if grp in tfm.dfs:
self._remap_group(tfm.dfs[grp])
else:
print(f"Group {grp} not found in the dataframes.")
def _remap_group(self, df: pd.DataFrame):
df[self.col_remap] = df[self.col_src].apply(self._remap_value)
def _remap_value(self, value: str) -> Any:
value = value.strip() if isinstance(value, str) else value
match = self.lut.get(value, Match(self.default_value, None, None, None))
if isinstance(match, Match):
if match.matched_id == self.default_value:
if self.verbose:
print(f"Unmatched value: {value}")
return match.matched_id
else:
return match
LowerStripNameCB (col_src:str, col_dst:str=None, fn_transform:Callable=<function <lambda>>)
Convert values to lowercase and strip any trailing spaces.
Type | Default | Details | |
---|---|---|---|
col_src | str | Source column name e.g. ‘Nuclide’ | |
col_dst | str | None | Destination column name |
fn_transform | Callable | Transformation function |
class LowerStripNameCB(Callback):
"Convert values to lowercase and strip any trailing spaces."
def __init__(self,
col_src: str, # Source column name e.g. 'Nuclide'
col_dst: str=None, # Destination column name
fn_transform: Callable=lambda x: x.lower().strip() # Transformation function
):
fc.store_attr()
self.__doc__ = f"Convert '{col_src}' column values to lowercase, strip spaces, and store in '{col_dst}' column."
if not col_dst: self.col_dst = col_src
def _safe_transform(self, value):
"Ensure value is not NA and apply transformation function."
return value if pd.isna(value) else self.fn_transform(str(value))
def __call__(self, tfm):
for key in tfm.dfs.keys():
tfm.dfs[key][self.col_dst] = tfm.dfs[key][self.col_src].apply(self._safe_transform)
Let’s test the callback:
dfs = {'seawater': pd.DataFrame({'Nuclide': ['CS137', '226RA']})}
tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='Nuclide', col_dst='NUCLIDE')])
fc.test_eq(tfm()['seawater']['NUCLIDE'].to_list(), ['cs137', '226ra'])
tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='Nuclide')])
fc.test_eq(tfm()['seawater']['Nuclide'].to_list(), ['cs137', '226ra'])
The point is when (semi-automatic) remapping names generally:
AddSampleTypeIdColumnCB (lut:dict={'SEAWATER': 1, 'BIOTA': 2, 'SEDIMENT': 3, 'SUSPENDED_MATTER': 4}, col_name:str='SAMPLE_TYPE')
Base class for callbacks.
Type | Default | Details | |
---|---|---|---|
lut | dict | {‘SEAWATER’: 1, ‘BIOTA’: 2, ‘SEDIMENT’: 3, ‘SUSPENDED_MATTER’: 4} | Lookup table for sample type |
col_name | str | SAMPLE_TYPE | Column name to store the sample type id |
class AddSampleTypeIdColumnCB(Callback):
def __init__(self,
lut: dict=SMP_TYPE_LUT, # Lookup table for sample type
col_name: str='SAMPLE_TYPE' # Column name to store the sample type id
):
"Add a column with the sample type as defined in the CDL."
fc.store_attr()
def __call__(self, tfm):
for grp, df in tfm.dfs.items():
df[self.col_name] = self.lut[grp]
Let’s test the callback:
AddNuclideIdColumnCB (col_value:str, lut_fname_fn:Callable=<function nuc_lut_path>, col_name:str='nuclide_id')
Base class for callbacks.
Type | Default | Details | |
---|---|---|---|
col_value | str | Column name containing the nuclide name | |
lut_fname_fn | Callable | nuc_lut_path | Function returning the lut path |
col_name | str | nuclide_id | Column name to store the nuclide id |
class AddNuclideIdColumnCB(Callback):
def __init__(self,
col_value: str, # Column name containing the nuclide name
lut_fname_fn: Callable=nuc_lut_path, # Function returning the lut path
col_name: str='nuclide_id' # Column name to store the nuclide id
):
"Add a column with the nuclide id."
fc.store_attr()
self.lut = get_lut(lut_fname_fn().parent, lut_fname_fn().name,
key='nc_name', value='nuclide_id', reverse=False)
def __call__(self, tfm: Transformer):
for grp, df in tfm.dfs.items():
df[self.col_name] = df[self.col_value].map(self.lut)
SelectColumnsCB (cois:dict)
Select columns of interest.
Type | Details | |
---|---|---|
cois | dict | Columns of interest |
RenameColumnsCB (renaming_rules:dict)
Renaming variables to MARIS standard names.
Type | Details | |
---|---|---|
renaming_rules | dict | Renaming rules |
RemoveAllNAValuesCB (cols_to_check:Dict[str,str])
Remove rows with all NA values.
Type | Details | |
---|---|---|
cols_to_check | Dict | A dictionary with the sample type as key and the column name to check as value |
class RemoveAllNAValuesCB(Callback):
"Remove rows with all NA values."
def __init__(self,
cols_to_check: Dict[str, str] # A dictionary with the sample type as key and the column name to check as value
):
fc.store_attr()
def __call__(self, tfm):
for k in tfm.dfs.keys():
col_to_check = self.cols_to_check[k]
mask = tfm.dfs[k][col_to_check].isnull().all(axis=1)
tfm.dfs[k] = tfm.dfs[k][~mask]
RemoveAllNAValuesCB (cols_to_check:Dict[str,str])
Remove rows with all NA values.
Type | Details | |
---|---|---|
cols_to_check | Dict | A dictionary with the sample type as key and the column name to check as value |
class RemoveAllNAValuesCB(Callback):
"Remove rows with all NA values."
def __init__(self,
cols_to_check: Dict[str, str] # A dictionary with the sample type as key and the column name to check as value
):
fc.store_attr()
def __call__(self, tfm):
for k in tfm.dfs.keys():
col_to_check = self.cols_to_check[k]
mask = tfm.dfs[k][col_to_check].isnull().all(axis=1)
tfm.dfs[k] = tfm.dfs[k][~mask]
[TO BE REMOVED] Many data providers use a long format (e.g lat, lon, radionuclide, value, unc, ...
) to store their data. When encoding as netCDF
, it is often required to use a wide format (e.g lat, lon, nuclide1_value, nuclide1_unc, nuclide2_value, nuclide2_unc, ...
). The class ReshapeLongToWide
is designed to perform this transformation.
# class ReshapeLongToWide(Callback):
# "Convert data from long to wide with renamed columns."
# def __init__(self,
# columns: List[str]=['nuclide'], # Columns to use as index
# values: List[str]=['value'], # Columns to use as values
# num_fill_value: int=-999, # Fill value for numeric columns
# str_fill_value='STR FILL VALUE'
# ):
# fc.store_attr()
# self.derived_cols = self._get_derived_cols()
# def _get_derived_cols(self):
# "Retrieve all possible derived vars (e.g 'unc', 'dl', ...) from configs."
# return [value['name'] for value in cdl_cfg()['vars']['suffixes'].values()]
# def renamed_cols(self, cols):
# "Flatten columns name."
# return [inner if outer == "value" else f'{inner}{outer}' if inner else outer
# for outer, inner in cols]
# def _get_unique_fill_value(self, df, idx):
# "Get a unique fill value for NaN replacement."
# fill_value = self.num_fill_value
# while (df[idx] == fill_value).any().any():
# fill_value -= 1
# return fill_value
# def _fill_nan_values(self, df, idx):
# "Fill NaN values in index columns."
# num_fill_value = self._get_unique_fill_value(df, idx)
# for col in idx:
# fill_value = num_fill_value if pd.api.types.is_numeric_dtype(df[col]) else self.str_fill_value
# df[col] = df[col].fillna(fill_value)
# return df, num_fill_value
# def pivot(self, df):
# derived_coi = [col for col in self.derived_cols if col in df.columns]
# # In past implementation we added an index column before pivoting
# # TO BE REMOVED
# # making all rows (compound_idx) unique.
# # df.index.name = 'org_index'
# # df = df.reset_index()
# idx = list(set(df.columns) - set(self.columns + derived_coi + self.values))
# df, num_fill_value = self._fill_nan_values(df, idx)
# pivot_df = df.pivot_table(index=idx,
# columns=self.columns,
# values=self.values + derived_coi,
# aggfunc=lambda x: x
# ).reset_index()
# pivot_df[idx] = pivot_df[idx].replace({self.str_fill_value: np.nan, num_fill_value: np.nan})
# pivot_df = self.set_index(pivot_df)
# return pivot_df
# def set_index(self, df):
# "Set the index of the dataframe."
# # TODO: Consider implementing a universal unique index
# # by hashing the compound index columns (lat, lon, time, depth, etc.)
# df.index.name = 'org_index'
# return df
# def __call__(self, tfm):
# for grp in tfm.dfs.keys():
# tfm.dfs[grp] = self.pivot(tfm.dfs[grp])
# tfm.dfs[grp].columns = self.renamed_cols(tfm.dfs[grp].columns)
[TO BE DELETED] Example of usage:
compound_idx
(in our case made of lon
, lat
, time
, depth
, …) are unique# #| eval: false
# dfs_test = {'seawater': pd.DataFrame({
# 'compound_idx': ['a', 'b', 'c', 'd'],
# 'nuclide': ['cs137', 'cs137', 'pu239_240_tot', 'pu239_240_tot'],
# 'value': [1, 2, 3, 4],
# '_unc': [0.1, 0.2, 0.3, 0.4]})}
# tfm = Transformer(dfs_test, cbs=[ReshapeLongToWide()])
# tfm()['seawater']
# #| eval: false
# dfs_test = {'seawater': pd.DataFrame({
# 'compound_idx': ['a', 'a', 'c', 'd'],
# 'nuclide': ['cs137', 'cs134', 'pu239_240_tot', 'pu239_240_tot'],
# 'value': [1, 2, 3, 4],
# '_unc': [0.1, 0.2, 0.3, 0.4]})}
# tfm = Transformer(dfs_test, cbs=[ReshapeLongToWide()])
# tfm()['seawater']
CompareDfsAndTfmCB (dfs:Dict[str,pandas.core.frame.DataFrame])
Create a dataframe of removed data. Data included in the tfm
not in the dfs
.
Type | Details | |
---|---|---|
dfs | Dict | Original dataframes |
class CompareDfsAndTfmCB(Callback):
"Create a dataframe of removed data. Data included in the `tfm` not in the `dfs`."
def __init__(self,
dfs: Dict[str, pd.DataFrame] # Original dataframes
):
fc.store_attr()
def __call__(self, tfm: Transformer) -> None:
self._initialize_tfm_attributes(tfm)
for grp in tfm.dfs.keys():
removed_df = self._get_removed_data(grp, tfm)
tfm.dfs_removed[grp] = removed_df
tfm.compare_stats[grp] = self._compute_stats(grp, tfm)
def _initialize_tfm_attributes(self, tfm: Transformer) -> None:
tfm.dfs_removed = {}
tfm.compare_stats = {}
def _get_removed_data(self,
grp: str, # The group key
tfm: Transformer # The transformation object containing `dfs`
) -> pd.DataFrame: # Dataframe with dropped rows
"Return the data that is present in `dfs` but not in `tfm.dfs`."
index_diff = self.dfs[grp].index.difference(tfm.dfs[grp].index)
return self.dfs[grp].loc[index_diff]
def _compute_stats(self,
grp: str, # The group key
tfm: Transformer # The transformation object containing `dfs`
) -> Dict[str, int]: # Dictionary with comparison statistics
"Compute comparison statistics between `dfs` and `tfm.dfs`."
return {
'Number of rows in original dataframes (dfs):': len(self.dfs[grp].index),
'Number of rows in transformed dataframes (tfm.dfs):': len(tfm.dfs[grp].index),
'Number of rows removed (tfm.dfs_removed):': len(tfm.dfs_removed[grp].index),
}
CompareDfsAndTfmCB
compares the original dataframes to the transformed dataframe. A dictionary of dataframes, tfm.dfs_dropped
, is created to include the data present in the original dataset but absent from the transformed data. tfm.compare_stats
provides a quick overview of the number of rows in both the original dataframes and the transformed dataframe.
For instance:
class UniqueIndexCB(Callback):
"Set unique index for each group."
def __init__(self,
index_name='ID'):
fc.store_attr()
def __call__(self, tfm):
for k in tfm.dfs.keys():
# Reset the index of the DataFrame and drop the old index
tfm.dfs[k] = tfm.dfs[k].reset_index(drop=True)
# Reset the index again and set the name of the new index to `ìndex_name``
tfm.dfs[k] = tfm.dfs[k].reset_index(names=[self.index_name])
UniqueIndexCB (index_name='ID')
Set unique index for each group.
class UniqueIndexCB(Callback):
"Set unique index for each group."
def __init__(self,
index_name='ID'):
fc.store_attr()
def __call__(self, tfm):
for k in tfm.dfs.keys():
# Reset the index of the DataFrame and drop the old index
tfm.dfs[k] = tfm.dfs[k].reset_index(drop=True)
# Reset the index again and set the name of the new index to `ìndex_name``
tfm.dfs[k] = tfm.dfs[k].reset_index(names=[self.index_name])
EncodeTimeCB (col_time:str='TIME', fn_units:Callable=<function get_time_units>)
Encode time as seconds since epoch.
Type | Default | Details | |
---|---|---|---|
col_time | str | TIME | |
fn_units | Callable | get_time_units | Function returning the time units |
class EncodeTimeCB(Callback):
"Encode time as seconds since epoch."
def __init__(self,
col_time: str='TIME',
fn_units: Callable=get_time_units # Function returning the time units
):
fc.store_attr()
self.units = fn_units()
def __call__(self, tfm):
for grp, df in tfm.dfs.items():
n_missing = df[self.col_time].isna().sum()
if n_missing:
print(f"Warning: {n_missing} missing time value(s) in {grp}")
# Remove NaN times and convert to seconds since epoch
tfm.dfs[grp] = tfm.dfs[grp][tfm.dfs[grp][self.col_time].notna()]
tfm.dfs[grp][self.col_time] = tfm.dfs[grp][self.col_time].apply(lambda x: date2num(x, units=self.units))
dfs_test = {
'SEAWATER': pd.DataFrame({
'TIME': [pd.Timestamp(f'2023-01-0{t}') for t in [1, 2]],
'value': [1, 2]
}),
'SEDIMENT': pd.DataFrame({
'TIME': [pd.Timestamp(f'2023-01-0{t}') for t in [3, 4]],
'value': [3, 4]
}),
}
units = 'seconds since 1970-01-01 00:00:00.0'
tfm = Transformer(dfs_test, cbs=[
EncodeTimeCB(fn_units=lambda: units)
], inplace=False)
dfs_result = tfm()
fc.test_eq(dfs_result['SEAWATER'].TIME.dtype, 'int64')
fc.test_eq(dfs_result['SEDIMENT'].TIME.dtype, 'int64')
fc.test_eq(dfs_result['SEAWATER'].TIME, dfs_test['SEAWATER'].TIME.apply(lambda x: date2num(x, units=units)))
fc.test_eq(dfs_result['SEDIMENT'].TIME, dfs_test['SEDIMENT'].TIME.apply(lambda x: date2num(x, units=units)))
DecodeTimeCB (col_time:str='TIME', fn_units:Callable=<function get_time_units>)
Decode time from seconds since epoch to datetime format.
Type | Default | Details | |
---|---|---|---|
col_time | str | TIME | |
fn_units | Callable | get_time_units | Function returning the time units |
class DecodeTimeCB(Callback):
"Decode time from seconds since epoch to datetime format."
def __init__(self,
col_time: str='TIME',
fn_units: Callable=get_time_units # Function returning the time units
):
fc.store_attr()
self.units = fn_units()
def __call__(self, tfm):
for grp, df in tfm.dfs.items():
n_missing = df[self.col_time].isna().sum()
if n_missing:
print(f"Warning: {n_missing} missing time value(s) in {grp}.")
# Remove NaN times and convert to datetime
tfm.dfs[grp] = tfm.dfs[grp][tfm.dfs[grp][self.col_time].notna()]
tfm.dfs[grp][self.col_time] = df[self.col_time].apply(
lambda x: num2date(x, units=self.units, only_use_cftime_datetimes=False)
)
dfs_test = {
'SEAWATER': pd.DataFrame({
'TIME': [1672531200, 1672617600], # 2023-01-01, 2023-01-02 in seconds since epoch
'value': [1, 2]
}),
'SEDIMENT': pd.DataFrame({
'TIME': [1672704000, 1672790400], # 2023-01-03, 2023-01-04 in seconds since epoch
'value': [3, 4]
}),
}
units = 'seconds since 1970-01-01 00:00:00.0'
tfm = Transformer(dfs_test, cbs=[
DecodeTimeCB(fn_units=lambda: units)
], inplace=False)
dfs_result = tfm()
# Test that times were converted to datetime
fc.test_eq(dfs_result['SEAWATER'].TIME.dtype, 'datetime64[ns]')
fc.test_eq(dfs_result['SEDIMENT'].TIME.dtype, 'datetime64[ns]')
# Test specific datetime values
expected_times_seawater = pd.to_datetime(['2023-01-01', '2023-01-02'])
expected_times_sediment = pd.to_datetime(['2023-01-03', '2023-01-04'])
fc.test_eq(dfs_result['SEAWATER'].TIME.dt.date, expected_times_seawater.date)
fc.test_eq(dfs_result['SEDIMENT'].TIME.dt.date, expected_times_sediment.date)