Exported source
class Callback():
"Base class for callbacks."
= 0 order
The Transformer
class is designed to facilitate the application of a series of callbacks to a set of dataframes. It provides a structured way to apply transformations (i.e Callback
) to the data, with a focus on flexibility and ease of use.
Callback ()
Base class for callbacks.
run_cbs (cbs:List[__main__.Callback], obj:Any)
Run the callbacks in the order they are specified.
Type | Details | |
---|---|---|
cbs | List | List of callbacks to run |
obj | Any | Object to pass to the callbacks |
Transformer (data:Union[Dict[str,pandas.core.frame.DataFrame],pandas.core .frame.DataFrame], cbs:Optional[List[__main__.Callback]]=None, inplace:bool=False)
Transform the dataframe(s) according to the specified callbacks.
Type | Default | Details | |
---|---|---|---|
data | Union | Data to be transformed | |
cbs | Optional | None | List of callbacks to run |
inplace | bool | False | Whether to modify the dataframe(s) in place |
class Transformer():
"Transform the dataframe(s) according to the specified callbacks."
def __init__(self,
data: Union[Dict[str, pd.DataFrame], pd.DataFrame], # Data to be transformed
cbs: Optional[List[Callback]]=None, # List of callbacks to run
inplace: bool=False # Whether to modify the dataframe(s) in place
):
fc.store_attr()
self.is_single_df = isinstance(data, pd.DataFrame)
self.df, self.dfs = self._prepare_data(data, inplace)
self.logs = []
def _prepare_data(self, data, inplace):
if self.is_single_df:
return (data if inplace else data.copy()), None
else:
return None, (data if inplace else {k: v.copy() for k, v in data.items()})
def unique(self, col_name: str) -> np.ndarray:
"Distinct values of a specific column present in all groups."
if self.is_single_df:
values = self.df.get(col_name, pd.Series()).dropna().values
else:
columns = [df.get(col_name) for df in self.dfs.values() if df.get(col_name) is not None]
values = np.concatenate([col.dropna().values for col in columns]) if columns else []
return np.unique(values)
def __call__(self):
"Transform the dataframe(s) according to the specified callbacks."
if self.cbs: run_cbs(self.cbs, self)
return self.df if self.dfs is None else self.dfs
Below, a few examples of how to use the Transformer
class. Let’s define first a test callback that adds 1
to the depth
:
And apply it to the following dataframes:
dfs = {'biota': pd.DataFrame({'id': [0, 1, 2], 'species': [0, 2, 0], 'depth': [2, 3, 4]}),
'seawater': pd.DataFrame({'id': [0, 1, 2], 'depth': [3, 4, 5]})}
tfm = Transformer(dfs, cbs=[TestCB()])
dfs_test = tfm()
fc.test_eq(dfs_test['biota']['depth'].to_list(), [3, 4, 5])
fc.test_eq(dfs_test['seawater']['depth'].to_list(), [4, 5, 6])
This section gathers callbacks that are used to transform the geographical coordinates.
SanitizeLonLatCB (lon_col:str='lon', lat_col:str='lat', verbose:bool=False)
Drop rows with invalid longitude & latitude values. Convert ,
separator to .
separator.
Type | Default | Details | |
---|---|---|---|
lon_col | str | lon | Longitude column name |
lat_col | str | lat | Latitude column name |
verbose | bool | False | Whether to print the number of invalid longitude & latitude values |
class SanitizeLonLatCB(Callback):
"Drop rows with invalid longitude & latitude values. Convert `,` separator to `.` separator."
def __init__(self,
lon_col: str='lon', # Longitude column name
lat_col: str='lat', # Latitude column name
verbose: bool=False # Whether to print the number of invalid longitude & latitude values
):
fc.store_attr()
def __call__(self, tfm: Transformer):
for grp, df in tfm.dfs.items():
# Convert `,` separator to `.` separator
df[self.lon_col] = df[self.lon_col].apply(lambda x: float(str(x).replace(',', '.')))
df[self.lat_col] = df[self.lat_col].apply(lambda x: float(str(x).replace(',', '.')))
# Mask zero values
mask_zeroes = (df[self.lon_col] == 0) & (df[self.lat_col] == 0)
nZeroes = mask_zeroes.sum()
if nZeroes and self.verbose:
print(f'The "{grp}" group contains {nZeroes} data points whose ({self.lon_col}, {self.lat_col}) = (0, 0)')
# Mask out of bounds values
mask_goob = (df[self.lon_col] < -180) | (df[self.lon_col] > 180) | (df[self.lat_col] < -90) | (df[self.lat_col] > 90)
nGoob = mask_goob.sum()
if nGoob and self.verbose:
print(f'The "{grp}" group contains {nGoob} data points with unrealistic {self.lon_col} or {self.lat_col} values.')
tfm.dfs[grp] = df.loc[~(mask_zeroes | mask_goob)]
This section gathers callbacks that are used to add required columns to the dataframes.
AddSampleTypeIdColumnCB (cdl_cfg:Callable=<function cdl_cfg>, col_name:str='samptype_id')
Base class for callbacks.
Type | Default | Details | |
---|---|---|---|
cdl_cfg | Callable | cdl_cfg | Callable to get the CDL config dictionary |
col_name | str | samptype_id | Column name to store the sample type id |
class AddSampleTypeIdColumnCB(Callback):
def __init__(self,
cdl_cfg: Callable=cdl_cfg, # Callable to get the CDL config dictionary
col_name: str='samptype_id' # Column name to store the sample type id
):
"Add a column with the sample type id as defined in the CDL."
fc.store_attr()
self.lut = {v['name']: v['id'] for v in cdl_cfg()['grps'].values()}
def __call__(self, tfm):
for grp, df in tfm.dfs.items(): df[self.col_name] = self.lut[grp]
Let’s test the callback:
dfs = {v['name']: pd.DataFrame({'col_test': [0, 1, 2]}) for v in CONFIGS_CDL['grps'].values()}
tfm = Transformer(dfs, cbs=[AddSampleTypeIdColumnCB(cdl_cfg=lambda: CONFIGS_CDL)])
dfs_test = tfm()
for v in CONFIGS_CDL['grps'].values():
fc.test_eq(dfs_test[v['name']]['samptype_id'].unique().item(), v['id'])
AddNuclideIdColumnCB (col_value:str, lut_fname_fn:Callable=<function nuc_lut_path>, col_name:str='nuclide_id')
Base class for callbacks.
Type | Default | Details | |
---|---|---|---|
col_value | str | Column name containing the nuclide name | |
lut_fname_fn | Callable | nuc_lut_path | Function returning the lut path |
col_name | str | nuclide_id | Column name to store the nuclide id |
class AddNuclideIdColumnCB(Callback):
def __init__(self,
col_value: str, # Column name containing the nuclide name
lut_fname_fn: Callable=nuc_lut_path, # Function returning the lut path
col_name: str='nuclide_id' # Column name to store the nuclide id
):
"Add a column with the nuclide id."
fc.store_attr()
self.lut = get_lut(lut_fname_fn().parent, lut_fname_fn().name,
key='nc_name', value='nuclide_id', reverse=False)
def __call__(self, tfm: Transformer):
for grp, df in tfm.dfs.items():
df[self.col_name] = df[self.col_value].map(self.lut)
dfs = {v['name']: pd.DataFrame({'Nuclide': ['cs137', 'pu239_240_tot']}) for v in CONFIGS_CDL['grps'].values()}
lut_fname_fn = lambda: Path('./files/lut/dbo_nuclide.xlsx')
tfm = Transformer(dfs, cbs=[AddNuclideIdColumnCB(col_value='Nuclide', lut_fname_fn=lut_fname_fn)])
tfm()['seawater']
expected = [33, 77]
for grp in tfm.dfs.keys():
fc.test_eq(tfm.dfs[grp]['nuclide_id'].to_list(), expected)
RemapCB (fn_lut:Callable, col_remap:str, col_src:str, dest_grps:list[str]|str=['seawater', 'biota', 'sediment', 'suspended-matter'], default_value:Any=-1, verbose:bool=False)
Generic MARIS remapping callback.
Type | Default | Details | |
---|---|---|---|
fn_lut | Callable | Function that returns the lookup table dictionary | |
col_remap | str | Name of the column to remap | |
col_src | str | Name of the column with the source values | |
dest_grps | list[str] | str | [‘seawater’, ‘biota’, ‘sediment’, ‘suspended-matter’] | List of destination groups |
default_value | Any | -1 | Default value for unmatched entries |
verbose | bool | False | Whether to print unmatched values |
class RemapCB(Callback):
"Generic MARIS remapping callback."
def __init__(self,
fn_lut: Callable, # Function that returns the lookup table dictionary
col_remap: str, # Name of the column to remap
col_src: str, # Name of the column with the source values
dest_grps: list[str]|str=grp_names(), # List of destination groups
default_value: Any = -1, # Default value for unmatched entries
verbose: bool = False, # Whether to print unmatched values
):
fc.store_attr()
self.lut = None
if isinstance(dest_grps, str): self.dest_grps = [dest_grps]
self.__doc__ = f"Remap values from '{col_src}' to '{col_remap}' for groups: {', '.join(dest_grps)}."
def __call__(self, tfm):
self.lut = self.fn_lut()
for grp in self.dest_grps:
if grp in tfm.dfs:
self._remap_group(tfm.dfs[grp])
def _remap_group(self, df: pd.DataFrame):
df[self.col_remap] = df[self.col_src].apply(self._remap_value)
def _remap_value(self, value: str) -> Any:
value = value.strip() if isinstance(value, str) else value
match = self.lut.get(value, Match(self.default_value, None, None, None))
if isinstance(match, Match):
if match.matched_id == self.default_value and self.verbose:
print(f"Unmatched value: {value}")
return match.matched_id
else:
return match
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[1], line 2 1 #| exports ----> 2 class RemapCB(Callback): 3 "Generic MARIS remapping callback." 4 def __init__(self, 5 fn_lut: Callable, # Function that returns the lookup table dictionary 6 col_remap: str, # Name of the column to remap (...) 10 verbose: bool = False, # Whether to print unmatched values 11 ): NameError: name 'Callback' is not defined
LowerStripNameCB (col_src:str, col_dst:str=None, fn_transform:Callable=<function <lambda>>)
Convert values to lowercase and strip any trailing spaces.
Type | Default | Details | |
---|---|---|---|
col_src | str | Source column name e.g. ‘Nuclide’ | |
col_dst | str | None | Destination column name |
fn_transform | Callable | Transformation function |
class LowerStripNameCB(Callback):
"Convert values to lowercase and strip any trailing spaces."
def __init__(self,
col_src: str, # Source column name e.g. 'Nuclide'
col_dst: str=None, # Destination column name
fn_transform: Callable=lambda x: x.lower().strip() # Transformation function
):
fc.store_attr()
self.__doc__ = f"Convert values from '{col_src}' to lowercase, strip spaces, and store in '{col_dst}'."
if not col_dst: self.col_dst = col_src
def _safe_transform(self, value):
"Ensure value is not NA and apply transformation function."
return value if pd.isna(value) else self.fn_transform(str(value))
def __call__(self, tfm):
for key in tfm.dfs.keys():
tfm.dfs[key][self.col_dst] = tfm.dfs[key][self.col_src].apply(self._safe_transform)
Let’s test the callback:
dfs = {'seawater': pd.DataFrame({'Nuclide': ['CS137', '226RA']})}
tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='Nuclide', col_dst='NUCLIDE')])
fc.test_eq(tfm()['seawater']['NUCLIDE'].to_list(), ['cs137', '226ra'])
tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='Nuclide')])
fc.test_eq(tfm()['seawater']['Nuclide'].to_list(), ['cs137', '226ra'])
The point is when (semi-automatic) remapping names generally:
RemoveAllNAValuesCB (cols_to_check:Dict[str,str])
Remove rows with all NA values.
Type | Details | |
---|---|---|
cols_to_check | Dict | A dictionary with the sample type as key and the column name to check as value |
class RemoveAllNAValuesCB(Callback):
"Remove rows with all NA values."
def __init__(self,
cols_to_check: Dict[str, str] # A dictionary with the sample type as key and the column name to check as value
):
fc.store_attr()
def __call__(self, tfm):
for k in tfm.dfs.keys():
col_to_check = self.cols_to_check[k]
mask = tfm.dfs[k][col_to_check].isnull().all(axis=1)
tfm.dfs[k] = tfm.dfs[k][~mask]
Many data providers use a long format (e.g lat, lon, radionuclide, value, unc, ...
) to store their data. When encoding as netCDF
, it is often required to use a wide format (e.g lat, lon, nuclide1_value, nuclide1_unc, nuclide2_value, nuclide2_unc, ...
). The class ReshapeLongToWide
is designed to perform this transformation.
ReshapeLongToWide (columns:List[str]=['nuclide'], values:List[str]=['value'], num_fill_value:int=-999, str_fill_value='STR FILL VALUE')
Convert data from long to wide with renamed columns.
Type | Default | Details | |
---|---|---|---|
columns | List | [‘nuclide’] | Columns to use as index |
values | List | [‘value’] | Columns to use as values |
num_fill_value | int | -999 | Fill value for numeric columns |
str_fill_value | str | STR FILL VALUE |
class ReshapeLongToWide(Callback):
"Convert data from long to wide with renamed columns."
def __init__(self,
columns: List[str]=['nuclide'], # Columns to use as index
values: List[str]=['value'], # Columns to use as values
num_fill_value: int=-999, # Fill value for numeric columns
str_fill_value='STR FILL VALUE'
):
fc.store_attr()
self.derived_cols = self._get_derived_cols()
def _get_derived_cols(self):
"Retrieve all possible derived vars (e.g 'unc', 'dl', ...) from configs."
return [value['name'] for value in cdl_cfg()['vars']['suffixes'].values()]
def renamed_cols(self, cols):
"Flatten columns name."
return [inner if outer == "value" else f'{inner}{outer}' if inner else outer
for outer, inner in cols]
def _get_unique_fill_value(self, df, idx):
"Get a unique fill value for NaN replacement."
fill_value = self.num_fill_value
while (df[idx] == fill_value).any().any():
fill_value -= 1
return fill_value
def _fill_nan_values(self, df, idx):
"Fill NaN values in index columns."
num_fill_value = self._get_unique_fill_value(df, idx)
for col in idx:
fill_value = num_fill_value if pd.api.types.is_numeric_dtype(df[col]) else self.str_fill_value
df[col] = df[col].fillna(fill_value)
return df, num_fill_value
def pivot(self, df):
derived_coi = [col for col in self.derived_cols if col in df.columns]
# In past implementation we added an index column before pivoting
# TO BE REMOVED
# making all rows (compound_idx) unique.
# df.index.name = 'org_index'
# df = df.reset_index()
idx = list(set(df.columns) - set(self.columns + derived_coi + self.values))
df, num_fill_value = self._fill_nan_values(df, idx)
pivot_df = df.pivot_table(index=idx,
columns=self.columns,
values=self.values + derived_coi,
fill_value=np.nan,
aggfunc=lambda x: x).reset_index()
pivot_df[idx] = pivot_df[idx].replace({self.str_fill_value: np.nan, num_fill_value: np.nan})
pivot_df = self.set_index(pivot_df)
return pivot_df
def set_index(self, df):
"Set the index of the dataframe."
# TODO: Consider implementing a universal unique index
# by hashing the compound index columns (lat, lon, time, depth, etc.)
df.index.name = 'org_index'
return df
def __call__(self, tfm):
for grp in tfm.dfs.keys():
tfm.dfs[grp] = self.pivot(tfm.dfs[grp])
tfm.dfs[grp].columns = self.renamed_cols(tfm.dfs[grp].columns)
Example of usage:
compound_idx
(in our case made of lon
, lat
, time
, depth
, …) are uniquedfs_test = {'seawater': pd.DataFrame({
'compound_idx': ['a', 'b', 'c', 'd'],
'nuclide': ['cs137', 'cs137', 'pu239_240_tot', 'pu239_240_tot'],
'value': [1, 2, 3, 4],
'_unc': [0.1, 0.2, 0.3, 0.4]})}
tfm = Transformer(dfs_test, cbs=[ReshapeLongToWide()])
tfm()['seawater']
compound_idx | cs137_unc | pu239_240_tot_unc | cs137 | pu239_240_tot | |
---|---|---|---|---|---|
org_index | |||||
0 | a | 0.1 | NaN | 1.0 | NaN |
1 | b | 0.2 | NaN | 2.0 | NaN |
2 | c | NaN | 0.3 | NaN | 3.0 |
3 | d | NaN | 0.4 | NaN | 4.0 |
dfs_test = {'seawater': pd.DataFrame({
'compound_idx': ['a', 'a', 'c', 'd'],
'nuclide': ['cs137', 'cs134', 'pu239_240_tot', 'pu239_240_tot'],
'value': [1, 2, 3, 4],
'_unc': [0.1, 0.2, 0.3, 0.4]})}
tfm = Transformer(dfs_test, cbs=[ReshapeLongToWide()])
tfm()['seawater']
compound_idx | cs134_unc | cs137_unc | pu239_240_tot_unc | cs134 | cs137 | pu239_240_tot | |
---|---|---|---|---|---|---|---|
org_index | |||||||
0 | a | 0.2 | 0.1 | NaN | 2.0 | 1.0 | NaN |
1 | c | NaN | NaN | 0.3 | NaN | NaN | 3.0 |
2 | d | NaN | NaN | 0.4 | NaN | NaN | 4.0 |
CompareDfsAndTfmCB (dfs:Dict[str,pandas.core.frame.DataFrame])
Create a dataframe of dropped data. Data included in the dfs
not in the tfm
.
Type | Details | |
---|---|---|
dfs | Dict | Original dataframes |
class CompareDfsAndTfmCB(Callback):
"Create a dataframe of dropped data. Data included in the `dfs` not in the `tfm`."
def __init__(self,
dfs: Dict[str, pd.DataFrame] # Original dataframes
):
fc.store_attr()
def __call__(self, tfm: Transformer) -> None:
self._initialize_tfm_attributes(tfm)
for grp in tfm.dfs.keys():
dropped_df = self._get_dropped_data(grp, tfm)
tfm.dfs_dropped[grp] = dropped_df
tfm.compare_stats[grp] = self._compute_stats(grp, tfm)
def _initialize_tfm_attributes(self, tfm: Transformer) -> None:
tfm.dfs_dropped = {}
tfm.compare_stats = {}
def _get_dropped_data(self,
grp: str, # The group key
tfm: Transformer # The transformation object containing `dfs`
) -> pd.DataFrame: # Dataframe with dropped rows
"Get the data that is present in `dfs` but not in `tfm.dfs`."
index_diff = self.dfs[grp].index.difference(tfm.dfs[grp].index)
return self.dfs[grp].loc[index_diff]
def _compute_stats(self,
grp: str, # The group key
tfm: Transformer # The transformation object containing `dfs`
) -> Dict[str, int]: # Dictionary with comparison statistics
"Compute comparison statistics between `dfs` and `tfm.dfs`."
return {
'Number of rows in dfs': len(self.dfs[grp].index),
'Number of rows in tfm.dfs': len(tfm.dfs[grp].index),
'Number of dropped rows': len(tfm.dfs_dropped[grp].index),
'Number of rows in tfm.dfs + Number of dropped rows': len(tfm.dfs[grp].index) + len(tfm.dfs_dropped[grp].index)
}
CompareDfsAndTfmCB
compares the original dataframes to the transformed dataframe. A dictionary of dataframes, tfm.dfs_dropped
, is created to include the data present in the original dataset but absent from the transformed data. tfm.compare_stats
provides a quick overview of the number of rows in both the original dataframes and the transformed dataframe.
For instance:
dfs_test = {
'seawater': pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]}),
'sediment': pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]}),
}
class TestTfmCB(Callback):
def __call__(self, tfm):
for key in tfm.dfs.keys():
df = tfm.dfs[key]
drop_idxs = [0, 1] if key == 'seawater' else [0]
df.drop(drop_idxs, inplace=True)
tfm = Transformer(dfs_test, cbs=[
TestTfmCB(),
CompareDfsAndTfmCB(dfs_test)], inplace=False)
print(tfm())
fc.test_eq(tfm.compare_stats['seawater']['Number of dropped rows'], 2)
fc.test_eq(tfm.compare_stats['sediment']['Number of dropped rows'], 1)
{'seawater': a b
2 3 6, 'sediment': a b
1 2 5
2 3 6}
These callbacks are used to transform the time variable according to netCDF CF standards. For instance, the EncodeTimeCB
callback is used to encode the time variable as an integer representing seconds since a reference date as specified in configs.ipynb
CONFIGS_CDL
dictionary.
EncodeTimeCB (cfg:dict, verbose:bool=False)
Encode time as int
representing seconds since xxx.
Type | Default | Details | |
---|---|---|---|
cfg | dict | Configuration dictionary | |
verbose | bool | False | Whether to print the number of invalid time entries |
class EncodeTimeCB(Callback):
"Encode time as `int` representing seconds since xxx."
def __init__(self,
cfg: dict, # Configuration dictionary
verbose: bool=False # Whether to print the number of invalid time entries
):
fc.store_attr()
def __call__(self, tfm):
def format_time(x):
return date2num(x, units=self.cfg['units']['time'])
for k in tfm.dfs.keys():
# If invalid time entries.
if tfm.dfs[k]['time'].isna().any():
if self.verbose:
invalid_time_df=tfm.dfs[k][tfm.dfs[k]['time'].isna()]
print (f'{len(invalid_time_df.index)} of {len(tfm.dfs[k].index)} entries for `time` are invalid for {k}.')
# Filter nan values
tfm.dfs[k] = tfm.dfs[k][tfm.dfs[k]['time'].notna()]
tfm.dfs[k]['time'] = tfm.dfs[k]['time'].apply(format_time)