Callback used in handlers

Core

The Transformer class is designed to facilitate the application of a series of callbacks to a set of dataframes. It provides a structured way to apply transformations (i.e Callback) to the data, with a focus on flexibility and ease of use.


source

Callback

 Callback ()

Base class for callbacks.

Exported source
class Callback(): 
    "Base class for callbacks."
    order = 0

source

run_cbs

 run_cbs (cbs:List[__main__.Callback], obj:Any)

Run the callbacks in the order they are specified.

Type Details
cbs List List of callbacks to run
obj Any Object to pass to the callbacks
Exported source
def run_cbs(
    cbs: List[Callback], # List of callbacks to run
    obj: Any # Object to pass to the callbacks
    ):
    "Run the callbacks in the order they are specified."
    for cb in sorted(cbs, key=attrgetter('order')):
        if cb.__doc__: obj.logs.append(cb.__doc__)
        cb(obj)

source

Transformer

 Transformer (data:Union[Dict[str,pandas.core.frame.DataFrame],pandas.core
              .frame.DataFrame],
              cbs:Optional[List[__main__.Callback]]=None,
              inplace:bool=False)

Transform the dataframe(s) according to the specified callbacks.

Type Default Details
data Union Data to be transformed
cbs Optional None List of callbacks to run
inplace bool False Whether to modify the dataframe(s) in place
Exported source
class Transformer():
    "Transform the dataframe(s) according to the specified callbacks."
    def __init__(self, 
                 data: Union[Dict[str, pd.DataFrame], pd.DataFrame], # Data to be transformed
                 cbs: Optional[List[Callback]]=None, # List of callbacks to run
                 inplace: bool=False # Whether to modify the dataframe(s) in place
                 ): 
        fc.store_attr()
        self.is_single_df = isinstance(data, pd.DataFrame)
        self.df, self.dfs = self._prepare_data(data, inplace)
        self.logs = []
            
    def _prepare_data(self, data, inplace):
        if self.is_single_df:
            return (data if inplace else data.copy()), None
        else:
            return None, (data if inplace else {k: v.copy() for k, v in data.items()})
    
    def unique(self, col_name: str) -> np.ndarray:
        "Distinct values of a specific column present in all groups."
        if self.is_single_df:
            values = self.df.get(col_name, pd.Series()).dropna().values
        else:
            columns = [df.get(col_name) for df in self.dfs.values() if df.get(col_name) is not None]
            values = np.concatenate([col.dropna().values for col in columns]) if columns else []
        return np.unique(values)
        
    def __call__(self):
        "Transform the dataframe(s) according to the specified callbacks."
        if self.cbs: run_cbs(self.cbs, self)
        return self.df if self.dfs is None else self.dfs

Below, a few examples of how to use the Transformer class. Let’s define first a test callback that adds 1 to the depth:

class TestCB(Callback):
    "A test callback to add 1 to the depth."
    def __call__(self, tfm: Transformer):
        for grp, df in tfm.dfs.items(): 
            df['depth'] = df['depth'].apply(lambda x: x+1)

And apply it to the following dataframes:

dfs = {'biota': pd.DataFrame({'id': [0, 1, 2], 'species': [0, 2, 0], 'depth': [2, 3, 4]}),
       'seawater': pd.DataFrame({'id': [0, 1, 2], 'depth': [3, 4, 5]})}

tfm = Transformer(dfs, cbs=[TestCB()])
dfs_test = tfm()

fc.test_eq(dfs_test['biota']['depth'].to_list(), [3, 4, 5])
fc.test_eq(dfs_test['seawater']['depth'].to_list(), [4, 5, 6])
class TestCB(Callback):
    "A test callback to add 1 to the depth."
    def __call__(self, tfm: Transformer):
        tfm.df['depth'] = tfm.df['depth'].apply(lambda x: x+1)
df = pd.DataFrame({'id': [0, 1, 2], 'species': [0, 2, 0], 'depth': [2, 3, 4]})

tfm = Transformer(df, cbs=[TestCB()])
df_test = tfm()

fc.test_eq(df_test['depth'].to_list(), [3, 4, 5])

Geographical

This section gathers callbacks that are used to transform the geographical coordinates.


source

SanitizeLonLatCB

 SanitizeLonLatCB (lon_col:str='LON', lat_col:str='LAT',
                   verbose:bool=False)

Drop rows with invalid longitude & latitude values. Convert , separator to . separator.

Type Default Details
lon_col str LON Longitude column name
lat_col str LAT Latitude column name
verbose bool False Whether to print the number of invalid longitude & latitude values
Exported source
class SanitizeLonLatCB(Callback):
    "Drop rows with invalid longitude & latitude values. Convert `,` separator to `.` separator."
    def __init__(self, 
                 lon_col: str='LON', # Longitude column name
                 lat_col: str='LAT', # Latitude column name
                 verbose: bool=False # Whether to print the number of invalid longitude & latitude values
                 ):
        fc.store_attr()
        
    def __call__(self, tfm: Transformer):
        for grp, df in tfm.dfs.items():
            # Convert `,` separator to `.` separator
            df[self.lon_col] = df[self.lon_col].apply(lambda x: float(str(x).replace(',', '.')))
            df[self.lat_col] = df[self.lat_col].apply(lambda x: float(str(x).replace(',', '.')))
            
            # Mask zero values
            mask_zeroes = (df[self.lon_col] == 0) & (df[self.lat_col] == 0) 
            nZeroes = mask_zeroes.sum()
            if nZeroes and self.verbose: 
                print(f'The "{grp}" group contains {nZeroes} data points whose ({self.lon_col}, {self.lat_col}) = (0, 0)')
            
            # Mask out of bounds values
            mask_goob = (df[self.lon_col] < -180) | (df[self.lon_col] > 180) | (df[self.lat_col] < -90) | (df[self.lat_col] > 90)
            nGoob = mask_goob.sum()
            if nGoob and self.verbose: 
                print(f'The "{grp}" group contains {nGoob} data points with unrealistic {self.lon_col} or {self.lat_col} values.')
                
            tfm.dfs[grp] = df.loc[~(mask_zeroes | mask_goob)]
# Check that measurements located at (0,0) get removed
dfs = {'BIOTA': pd.DataFrame({'LON': [0, 1, 0], 'LAT': [0, 2, 0]})}
tfm = Transformer(dfs, cbs=[SanitizeLonLatCB()])
tfm()['BIOTA']

expected = [1., 2.]
fc.test_eq(tfm()['BIOTA'].iloc[0].to_list(), expected)
# Check that comma decimal separator get replaced by point instead
dfs = {'BIOTA': pd.DataFrame({'LON': ['45,2'], 'LAT': ['43,1']})}
tfm = Transformer(dfs, cbs=[SanitizeLonLatCB()])
tfm()['BIOTA']

expected = [45.2, 43.1]
fc.test_eq(tfm()['BIOTA'].iloc[0].to_list(), expected)
# Check that out of bounds lon or lat get removed
dfs = {'BIOTA': pd.DataFrame({'LON': [-190, 190, 1, 2, 1.1], 'LAT': [1, 2, 91, -91, 2.2]})}
tfm = Transformer(dfs, cbs=[SanitizeLonLatCB()])
tfm()['BIOTA']

expected = [1.1, 2.2]
fc.test_eq(tfm()['BIOTA'].iloc[0].to_list(), expected)

Map & Standardize


source

RemapCB

 RemapCB (fn_lut:Callable, col_remap:str, col_src:str,
          dest_grps:list[str]|str=dict_keys(['BIOTA', 'SEAWATER',
          'SEDIMENT', 'SUSPENDED_MATTER']), default_value:Any=0,
          verbose:bool=False)

Generic MARIS remapping callback.

Type Default Details
fn_lut Callable Function that returns the lookup table dictionary
col_remap str Name of the column to remap
col_src str Name of the column with the source values
dest_grps list[str] | str dict_keys([‘BIOTA’, ‘SEAWATER’, ‘SEDIMENT’, ‘SUSPENDED_MATTER’]) List of destination groups
default_value Any 0 Default value for unmatched entries
verbose bool False Whether to print the number of unmatched entries
Exported source
class RemapCB(Callback):
    "Generic MARIS remapping callback."
    def __init__(self, 
                 fn_lut: Callable, # Function that returns the lookup table dictionary
                 col_remap: str, # Name of the column to remap
                 col_src: str, # Name of the column with the source values
                 dest_grps: list[str]|str=NC_GROUPS.keys(), # List of destination groups
                 default_value: Any = 0, # Default value for unmatched entries
                 verbose: bool=False # Whether to print the number of unmatched entries
                ):
        fc.store_attr()
        self.lut = None
        if isinstance(dest_grps, str): self.dest_grps = [dest_grps]
        # Format the documentation string based on the type and content of dest_grps
        if isinstance(self.dest_grps, list):
            if len(self.dest_grps) > 1:
                grp_str = ', '.join(self.dest_grps[:-1]) + ' and ' + self.dest_grps[-1]
            else:
                grp_str = self.dest_grps[0]
        else:
            grp_str = self.dest_grps
                
        self.__doc__ = f"Remap values from '{col_src}' to '{col_remap}' for groups: {grp_str}."

    def __call__(self, tfm):
        self.lut = self.fn_lut()
        for grp in self.dest_grps:
            if grp in tfm.dfs:
                self._remap_group(tfm.dfs[grp])
            else:
                print(f"Group {grp} not found in the dataframes.")

    def _remap_group(self, df: pd.DataFrame):
        df[self.col_remap] = df[self.col_src].apply(self._remap_value)

    def _remap_value(self, value: str) -> Any:
        value = value.strip() if isinstance(value, str) else value
        match = self.lut.get(value, Match(self.default_value, None, None, None))
        if isinstance(match, Match):
            if match.matched_id == self.default_value:
                if self.verbose:
                    print(f"Unmatched value: {value}")
            return match.matched_id 
        else:
            return match

source

LowerStripNameCB

 LowerStripNameCB (col_src:str, col_dst:str=None,
                   fn_transform:Callable=<function <lambda>>)

Convert values to lowercase and strip any trailing spaces.

Type Default Details
col_src str Source column name e.g. ‘Nuclide’
col_dst str None Destination column name
fn_transform Callable Transformation function
Exported source
class LowerStripNameCB(Callback):
    "Convert values to lowercase and strip any trailing spaces."
    def __init__(self, 
                 col_src: str, # Source column name e.g. 'Nuclide'
                 col_dst: str=None, # Destination column name
                 fn_transform: Callable=lambda x: x.lower().strip() # Transformation function
                 ):
        fc.store_attr()
        self.__doc__ = f"Convert '{col_src}' column values to lowercase, strip spaces, and store in '{col_dst}' column."
        if not col_dst: self.col_dst = col_src
        
    def _safe_transform(self, value):
        "Ensure value is not NA and apply transformation function."
        return value if pd.isna(value) else self.fn_transform(str(value))
            
    def __call__(self, tfm):
        for key in tfm.dfs.keys():
            tfm.dfs[key][self.col_dst] = tfm.dfs[key][self.col_src].apply(self._safe_transform)

Let’s test the callback:

dfs = {'seawater': pd.DataFrame({'Nuclide': ['CS137', '226RA']})}

tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='Nuclide', col_dst='NUCLIDE')])
fc.test_eq(tfm()['seawater']['NUCLIDE'].to_list(), ['cs137', '226ra'])


tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='Nuclide')])
fc.test_eq(tfm()['seawater']['Nuclide'].to_list(), ['cs137', '226ra'])

The point is when (semi-automatic) remapping names generally:

  1. we need first to guess (fuzzy matching or other) the right nuclide name.
  2. Then manually check the result and eventually update the lookup table.
  3. Finally we can apply the lookup table to the dataframe.

Change structure


source

AddSampleTypeIdColumnCB

 AddSampleTypeIdColumnCB (lut:dict={'SEAWATER': 1, 'BIOTA': 2, 'SEDIMENT':
                          3, 'SUSPENDED_MATTER': 4},
                          col_name:str='SAMPLE_TYPE')

Base class for callbacks.

Type Default Details
lut dict {‘SEAWATER’: 1, ‘BIOTA’: 2, ‘SEDIMENT’: 3, ‘SUSPENDED_MATTER’: 4} Lookup table for sample type
col_name str SAMPLE_TYPE Column name to store the sample type id
Exported source
class AddSampleTypeIdColumnCB(Callback):
    def __init__(self, 
                 lut: dict=SMP_TYPE_LUT, # Lookup table for sample type
                 col_name: str='SAMPLE_TYPE' # Column name to store the sample type id
                 ): 
        "Add a column with the sample type as defined in the CDL."
        fc.store_attr()
        
    def __call__(self, tfm):
        for grp, df in tfm.dfs.items():             
            df[self.col_name] = self.lut[grp]

Let’s test the callback:

dfs = {smp_type: pd.DataFrame({'col_test': [0, 1, 2]}) for smp_type in SMP_TYPE_LUT.keys()};

tfm = Transformer(dfs, cbs=[AddSampleTypeIdColumnCB()])
dfs_test = tfm()

for smp_type in SMP_TYPE_LUT.keys():
    fc.test_eq(dfs_test[smp_type]['SAMPLE_TYPE'].unique().item(), SMP_TYPE_LUT[smp_type])

source

AddNuclideIdColumnCB

 AddNuclideIdColumnCB (col_value:str, lut_fname_fn:Callable=<function
                       nuc_lut_path>, col_name:str='nuclide_id')

Base class for callbacks.

Type Default Details
col_value str Column name containing the nuclide name
lut_fname_fn Callable nuc_lut_path Function returning the lut path
col_name str nuclide_id Column name to store the nuclide id
Exported source
class AddNuclideIdColumnCB(Callback):
    def __init__(self, 
                 col_value: str, # Column name containing the nuclide name
                 lut_fname_fn: Callable=nuc_lut_path, # Function returning the lut path
                 col_name: str='nuclide_id' # Column name to store the nuclide id
                 ): 
        "Add a column with the nuclide id."
        fc.store_attr()
        self.lut = get_lut(lut_fname_fn().parent, lut_fname_fn().name, 
                           key='nc_name', value='nuclide_id', reverse=False)
        
    def __call__(self, tfm: Transformer):
        for grp, df in tfm.dfs.items(): 
            df[self.col_name] = df[self.col_value].map(self.lut)

source

SelectColumnsCB

 SelectColumnsCB (cois:dict)

Select columns of interest.

Type Details
cois dict Columns of interest
Exported source
class SelectColumnsCB(Callback):
    "Select columns of interest."
    def __init__(self, 
                 cois: dict # Columns of interest
                 ): 
        fc.store_attr()
        
    def __call__(self, tfm):
        "Select columns of interest."
        for grp, df in tfm.dfs.items(): 
            tfm.dfs[grp] = df.loc[:, self.cois.keys()]

source

RenameColumnsCB

 RenameColumnsCB (renaming_rules:dict)

Renaming variables to MARIS standard names.

Type Details
renaming_rules dict Renaming rules
Exported source
class RenameColumnsCB(Callback):
    "Renaming variables to MARIS standard names."
    def __init__(self,
                 renaming_rules: dict # Renaming rules
                 ): 
        fc.store_attr()
        
    def __call__(self, tfm):
        for grp in tfm.dfs.keys(): 
            tfm.dfs[grp].rename(columns=self.renaming_rules, inplace=True)

source

RemoveAllNAValuesCB

 RemoveAllNAValuesCB (cols_to_check:Dict[str,str])

Remove rows with all NA values.

Type Details
cols_to_check Dict A dictionary with the sample type as key and the column name to check as value
Exported source
class RemoveAllNAValuesCB(Callback):
    "Remove rows with all NA values."
    def __init__(self, 
                 cols_to_check: Dict[str, str] # A dictionary with the sample type as key and the column name to check as value
                ):
        fc.store_attr()

    def __call__(self, tfm):
        for k in tfm.dfs.keys():
            col_to_check = self.cols_to_check[k]
            mask = tfm.dfs[k][col_to_check].isnull().all(axis=1)
            tfm.dfs[k] = tfm.dfs[k][~mask]

source

RemoveAllNAValuesCB

 RemoveAllNAValuesCB (cols_to_check:Dict[str,str])

Remove rows with all NA values.

Type Details
cols_to_check Dict A dictionary with the sample type as key and the column name to check as value
Exported source
class RemoveAllNAValuesCB(Callback):
    "Remove rows with all NA values."
    def __init__(self, 
                 cols_to_check: Dict[str, str] # A dictionary with the sample type as key and the column name to check as value
                ):
        fc.store_attr()

    def __call__(self, tfm):
        for k in tfm.dfs.keys():
            col_to_check = self.cols_to_check[k]
            mask = tfm.dfs[k][col_to_check].isnull().all(axis=1)
            tfm.dfs[k] = tfm.dfs[k][~mask]

[TO BE REMOVED] Many data providers use a long format (e.g lat, lon, radionuclide, value, unc, ...) to store their data. When encoding as netCDF, it is often required to use a wide format (e.g lat, lon, nuclide1_value, nuclide1_unc, nuclide2_value, nuclide2_unc, ...). The class ReshapeLongToWide is designed to perform this transformation.

# class ReshapeLongToWide(Callback):
#     "Convert data from long to wide with renamed columns."
#     def __init__(self, 
#                  columns: List[str]=['nuclide'], # Columns to use as index
#                  values: List[str]=['value'], # Columns to use as values
#                  num_fill_value: int=-999, # Fill value for numeric columns
#                  str_fill_value='STR FILL VALUE'
#                  ):
#         fc.store_attr()
#         self.derived_cols = self._get_derived_cols()
    
#     def _get_derived_cols(self):
#         "Retrieve all possible derived vars (e.g 'unc', 'dl', ...) from configs."
#         return [value['name'] for value in cdl_cfg()['vars']['suffixes'].values()]

#     def renamed_cols(self, cols):
#         "Flatten columns name."
#         return [inner if outer == "value" else f'{inner}{outer}' if inner else outer
#                 for outer, inner in cols]

#     def _get_unique_fill_value(self, df, idx):
#         "Get a unique fill value for NaN replacement."
#         fill_value = self.num_fill_value
#         while (df[idx] == fill_value).any().any():
#             fill_value -= 1
#         return fill_value

#     def _fill_nan_values(self, df, idx):
#         "Fill NaN values in index columns."
#         num_fill_value = self._get_unique_fill_value(df, idx)
#         for col in idx:
#             fill_value = num_fill_value if pd.api.types.is_numeric_dtype(df[col]) else self.str_fill_value
#             df[col] = df[col].fillna(fill_value)
#         return df, num_fill_value

#     def pivot(self, df):
#         derived_coi = [col for col in self.derived_cols if col in df.columns]
#         # In past implementation we added an index column before pivoting 
#         # TO BE REMOVED
#         # making all rows (compound_idx) unique.
#         # df.index.name = 'org_index'
#         # df = df.reset_index()
#         idx = list(set(df.columns) - set(self.columns + derived_coi + self.values))
        
#         df, num_fill_value = self._fill_nan_values(df, idx)

#         pivot_df = df.pivot_table(index=idx,
#                                   columns=self.columns,
#                                   values=self.values + derived_coi,
                                  
                                  
#                                   aggfunc=lambda x: x
#                                   ).reset_index()

#         pivot_df[idx] = pivot_df[idx].replace({self.str_fill_value: np.nan, num_fill_value: np.nan})
#         pivot_df = self.set_index(pivot_df)
#         return pivot_df

#         def set_index(self, df):
#             "Set the index of the dataframe."
#             # TODO: Consider implementing a universal unique index
#             # by hashing the compound index columns (lat, lon, time, depth, etc.)
#             df.index.name = 'org_index'
#             return df
    
#     def __call__(self, tfm):
#         for grp in tfm.dfs.keys():
#             tfm.dfs[grp] = self.pivot(tfm.dfs[grp])
#             tfm.dfs[grp].columns = self.renamed_cols(tfm.dfs[grp].columns)

[TO BE DELETED] Example of usage:

  • Case 1: compound_idx (in our case made of lon, lat, time, depth, …) are unique
# #| eval: false
# dfs_test = {'seawater': pd.DataFrame({
#     'compound_idx': ['a', 'b', 'c', 'd'], 
#     'nuclide': ['cs137', 'cs137', 'pu239_240_tot', 'pu239_240_tot'], 
#     'value': [1, 2, 3, 4],
#     '_unc': [0.1, 0.2, 0.3, 0.4]})}

# tfm = Transformer(dfs_test, cbs=[ReshapeLongToWide()])
# tfm()['seawater']
  • Case 2: compound_idx are not unique
# #| eval: false
# dfs_test = {'seawater': pd.DataFrame({
#     'compound_idx': ['a', 'a', 'c', 'd'], 
#     'nuclide': ['cs137', 'cs134', 'pu239_240_tot', 'pu239_240_tot'], 
#     'value': [1, 2, 3, 4],
#     '_unc': [0.1, 0.2, 0.3, 0.4]})}

# tfm = Transformer(dfs_test, cbs=[ReshapeLongToWide()])
# tfm()['seawater']

source

CompareDfsAndTfmCB

 CompareDfsAndTfmCB (dfs:Dict[str,pandas.core.frame.DataFrame])

Create a dataframe of removed data. Data included in the tfm not in the dfs.

Type Details
dfs Dict Original dataframes
Exported source
class CompareDfsAndTfmCB(Callback):
    "Create a dataframe of removed data. Data included in the `tfm` not in the `dfs`."
    def __init__(self, 
                 dfs: Dict[str, pd.DataFrame] # Original dataframes
                 ): 
        fc.store_attr()
        
    def __call__(self, tfm: Transformer) -> None:
        self._initialize_tfm_attributes(tfm)
        for grp in tfm.dfs.keys():
            removed_df = self._get_removed_data(grp, tfm)
            tfm.dfs_removed[grp] = removed_df
            tfm.compare_stats[grp] = self._compute_stats(grp, tfm)
            

    def _initialize_tfm_attributes(self, tfm: Transformer) -> None:
        tfm.dfs_removed = {}
        tfm.compare_stats = {}

    def _get_removed_data(self, 
                          grp: str, # The group key
                          tfm: Transformer # The transformation object containing `dfs`
                         ) -> pd.DataFrame: # Dataframe with dropped rows
        "Return the data that is present in `dfs` but not in `tfm.dfs`."
        index_diff = self.dfs[grp].index.difference(tfm.dfs[grp].index)
        return self.dfs[grp].loc[index_diff]
    
    def _compute_stats(self, 
                       grp: str, # The group key
                       tfm: Transformer # The transformation object containing `dfs`
                      ) -> Dict[str, int]: # Dictionary with comparison statistics
        "Compute comparison statistics between `dfs` and `tfm.dfs`."
        return {
            'Number of rows in original dataframes (dfs):': len(self.dfs[grp].index),
            'Number of rows in transformed dataframes (tfm.dfs):': len(tfm.dfs[grp].index),
            'Number of rows removed (tfm.dfs_removed):': len(tfm.dfs_removed[grp].index),
        }

CompareDfsAndTfmCB compares the original dataframes to the transformed dataframe. A dictionary of dataframes, tfm.dfs_dropped, is created to include the data present in the original dataset but absent from the transformed data. tfm.compare_stats provides a quick overview of the number of rows in both the original dataframes and the transformed dataframe.

For instance:

class UniqueIndexCB(Callback):
    "Set unique index for each group."
    def __init__(self,
                 index_name='ID'):
        fc.store_attr()
        
    def __call__(self, tfm):
        for k in tfm.dfs.keys():
            # Reset the index of the DataFrame and drop the old index
            tfm.dfs[k] = tfm.dfs[k].reset_index(drop=True)
            # Reset the index again and set the name of the new index to `ìndex_name``
            tfm.dfs[k] = tfm.dfs[k].reset_index(names=[self.index_name])

source

UniqueIndexCB

 UniqueIndexCB (index_name='ID')

Set unique index for each group.

Exported source
class UniqueIndexCB(Callback):
    "Set unique index for each group."
    def __init__(self,
                 index_name='ID'):
        fc.store_attr()
        
    def __call__(self, tfm):
        for k in tfm.dfs.keys():
            # Reset the index of the DataFrame and drop the old index
            tfm.dfs[k] = tfm.dfs[k].reset_index(drop=True)
            # Reset the index again and set the name of the new index to `ìndex_name``
            tfm.dfs[k] = tfm.dfs[k].reset_index(names=[self.index_name])

Time


source

EncodeTimeCB

 EncodeTimeCB (col_time:str='TIME', fn_units:Callable=<function
               get_time_units>)

Encode time as seconds since epoch.

Type Default Details
col_time str TIME
fn_units Callable get_time_units Function returning the time units
Exported source
class EncodeTimeCB(Callback):
    "Encode time as seconds since epoch."    
    def __init__(self, 
                 col_time: str='TIME',
                 fn_units: Callable=get_time_units # Function returning the time units
                 ): 
        fc.store_attr()
        self.units = fn_units()
    
    def __call__(self, tfm): 
        for grp, df in tfm.dfs.items():
            n_missing = df[self.col_time].isna().sum()
            if n_missing:
                print(f"Warning: {n_missing} missing time value(s) in {grp}")
            
            # Remove NaN times and convert to seconds since epoch
            tfm.dfs[grp] = tfm.dfs[grp][tfm.dfs[grp][self.col_time].notna()]
            tfm.dfs[grp][self.col_time] = tfm.dfs[grp][self.col_time].apply(lambda x: date2num(x, units=self.units))
dfs_test = {
    'SEAWATER': pd.DataFrame({
        'TIME': [pd.Timestamp(f'2023-01-0{t}') for t in [1, 2]],
        'value': [1, 2]
        }),
    'SEDIMENT': pd.DataFrame({
        'TIME': [pd.Timestamp(f'2023-01-0{t}') for t in [3, 4]],
        'value': [3, 4]
        }),
}

units = 'seconds since 1970-01-01 00:00:00.0'
tfm = Transformer(dfs_test, cbs=[
    EncodeTimeCB(fn_units=lambda: units)
    ], inplace=False)
dfs_result = tfm()

fc.test_eq(dfs_result['SEAWATER'].TIME.dtype, 'int64')
fc.test_eq(dfs_result['SEDIMENT'].TIME.dtype, 'int64')


fc.test_eq(dfs_result['SEAWATER'].TIME, dfs_test['SEAWATER'].TIME.apply(lambda x: date2num(x, units=units)))
fc.test_eq(dfs_result['SEDIMENT'].TIME, dfs_test['SEDIMENT'].TIME.apply(lambda x: date2num(x, units=units)))

source

DecodeTimeCB

 DecodeTimeCB (col_time:str='TIME', fn_units:Callable=<function
               get_time_units>)

Decode time from seconds since epoch to datetime format.

Type Default Details
col_time str TIME
fn_units Callable get_time_units Function returning the time units
Exported source
class DecodeTimeCB(Callback):
    "Decode time from seconds since epoch to datetime format."    
    def __init__(self, 
                 col_time: str='TIME',
                 fn_units: Callable=get_time_units # Function returning the time units
                 ): 
        fc.store_attr()
        self.units = fn_units()

    def __call__(self, tfm): 
        for grp, df in tfm.dfs.items():
            n_missing = df[self.col_time].isna().sum()
            if n_missing:
                print(f"Warning: {n_missing} missing time value(s) in {grp}.")
            
            # Remove NaN times and convert to datetime
            tfm.dfs[grp] = tfm.dfs[grp][tfm.dfs[grp][self.col_time].notna()]
            tfm.dfs[grp][self.col_time] = df[self.col_time].apply(
                lambda x: num2date(x, units=self.units, only_use_cftime_datetimes=False)
            )
dfs_test = {
    'SEAWATER': pd.DataFrame({
        'TIME': [1672531200, 1672617600],  # 2023-01-01, 2023-01-02 in seconds since epoch
        'value': [1, 2]
        }),
    'SEDIMENT': pd.DataFrame({
        'TIME': [1672704000, 1672790400],  # 2023-01-03, 2023-01-04 in seconds since epoch
        'value': [3, 4]
        }),
}

units = 'seconds since 1970-01-01 00:00:00.0'
tfm = Transformer(dfs_test, cbs=[
    DecodeTimeCB(fn_units=lambda: units)
    ], inplace=False)
dfs_result = tfm()


# Test that times were converted to datetime
fc.test_eq(dfs_result['SEAWATER'].TIME.dtype, 'datetime64[ns]')
fc.test_eq(dfs_result['SEDIMENT'].TIME.dtype, 'datetime64[ns]')

# Test specific datetime values
expected_times_seawater = pd.to_datetime(['2023-01-01', '2023-01-02'])
expected_times_sediment = pd.to_datetime(['2023-01-03', '2023-01-04'])

fc.test_eq(dfs_result['SEAWATER'].TIME.dt.date, expected_times_seawater.date)
fc.test_eq(dfs_result['SEDIMENT'].TIME.dt.date, expected_times_sediment.date)