Callback used in handlers

Core

The Transformer class is designed to facilitate the application of a series of callbacks to a set of dataframes. It provides a structured way to apply transformations (i.e Callback) to the data, with a focus on flexibility and ease of use.


source

Callback

 Callback ()

Base class for callbacks.

Exported source
class Callback(): 
    "Base class for callbacks."
    order = 0

source

run_cbs

 run_cbs (cbs:List[__main__.Callback], obj:Any)

Run the callbacks in the order they are specified.

Type Details
cbs List List of callbacks to run
obj Any Object to pass to the callbacks
Exported source
def run_cbs(
    cbs: List[Callback], # List of callbacks to run
    obj: Any # Object to pass to the callbacks
    ):
    "Run the callbacks in the order they are specified."
    for cb in sorted(cbs, key=attrgetter('order')):
        if cb.__doc__: obj.logs.append(cb.__doc__)
        cb(obj)

source

Transformer

 Transformer (data:Union[Dict[str,pandas.core.frame.DataFrame],pandas.core
              .frame.DataFrame],
              cbs:Optional[List[__main__.Callback]]=None,
              inplace:bool=False)

Transform the dataframe(s) according to the specified callbacks.

Type Default Details
data Union Data to be transformed
cbs Optional None List of callbacks to run
inplace bool False Whether to modify the dataframe(s) in place
Exported source
class Transformer():
    "Transform the dataframe(s) according to the specified callbacks."
    def __init__(self, 
                 data: Union[Dict[str, pd.DataFrame], pd.DataFrame], # Data to be transformed
                 cbs: Optional[List[Callback]]=None, # List of callbacks to run
                 inplace: bool=False # Whether to modify the dataframe(s) in place
                 ): 
        fc.store_attr()
        self.is_single_df = isinstance(data, pd.DataFrame)
        self.df, self.dfs = self._prepare_data(data, inplace)
        self.logs = []
            
    def _prepare_data(self, data, inplace):
        if self.is_single_df:
            return (data if inplace else data.copy()), None
        else:
            return None, (data if inplace else {k: v.copy() for k, v in data.items()})
    
    def unique(self, col_name: str) -> np.ndarray:
        "Distinct values of a specific column present in all groups."
        if self.is_single_df:
            values = self.df.get(col_name, pd.Series()).dropna().values
        else:
            columns = [df.get(col_name) for df in self.dfs.values() if df.get(col_name) is not None]
            values = np.concatenate([col.dropna().values for col in columns]) if columns else []
        return np.unique(values)
        
    def __call__(self):
        "Transform the dataframe(s) according to the specified callbacks."
        if self.cbs: run_cbs(self.cbs, self)
        return self.df if self.dfs is None else self.dfs

Below, a few examples of how to use the Transformer class. Let’s define first a test callback that adds 1 to the depth:

class TestCB(Callback):
    "A test callback to add 1 to the depth."
    def __call__(self, tfm: Transformer):
        for grp, df in tfm.dfs.items(): 
            df['depth'] = df['depth'].apply(lambda x: x+1)

And apply it to the following dataframes:

dfs = {'biota': pd.DataFrame({'id': [0, 1, 2], 'species': [0, 2, 0], 'depth': [2, 3, 4]}),
       'seawater': pd.DataFrame({'id': [0, 1, 2], 'depth': [3, 4, 5]})}

tfm = Transformer(dfs, cbs=[TestCB()])
dfs_test = tfm()

fc.test_eq(dfs_test['biota']['depth'].to_list(), [3, 4, 5])
fc.test_eq(dfs_test['seawater']['depth'].to_list(), [4, 5, 6])
class TestCB(Callback):
    "A test callback to add 1 to the depth."
    def __call__(self, tfm: Transformer):
        tfm.df['depth'] = tfm.df['depth'].apply(lambda x: x+1)
df = pd.DataFrame({'id': [0, 1, 2], 'species': [0, 2, 0], 'depth': [2, 3, 4]})

tfm = Transformer(df, cbs=[TestCB()])
df_test = tfm()

fc.test_eq(df_test['depth'].to_list(), [3, 4, 5])

Geographical

This section gathers callbacks that are used to transform the geographical coordinates.


source

SanitizeLonLatCB

 SanitizeLonLatCB (lon_col:str='lon', lat_col:str='lat',
                   verbose:bool=False)

Drop rows with invalid longitude & latitude values. Convert , separator to . separator.

Type Default Details
lon_col str lon Longitude column name
lat_col str lat Latitude column name
verbose bool False Whether to print the number of invalid longitude & latitude values
Exported source
class SanitizeLonLatCB(Callback):
    "Drop rows with invalid longitude & latitude values. Convert `,` separator to `.` separator."
    def __init__(self, 
                 lon_col: str='lon', # Longitude column name
                 lat_col: str='lat', # Latitude column name
                 verbose: bool=False # Whether to print the number of invalid longitude & latitude values
                 ):
        fc.store_attr()
        
    def __call__(self, tfm: Transformer):
        for grp, df in tfm.dfs.items():
            # Convert `,` separator to `.` separator
            df[self.lon_col] = df[self.lon_col].apply(lambda x: float(str(x).replace(',', '.')))
            df[self.lat_col] = df[self.lat_col].apply(lambda x: float(str(x).replace(',', '.')))
            
            # Mask zero values
            mask_zeroes = (df[self.lon_col] == 0) & (df[self.lat_col] == 0) 
            nZeroes = mask_zeroes.sum()
            if nZeroes and self.verbose: 
                print(f'The "{grp}" group contains {nZeroes} data points whose ({self.lon_col}, {self.lat_col}) = (0, 0)')
            
            # Mask out of bounds values
            mask_goob = (df[self.lon_col] < -180) | (df[self.lon_col] > 180) | (df[self.lat_col] < -90) | (df[self.lat_col] > 90)
            nGoob = mask_goob.sum()
            if nGoob and self.verbose: 
                print(f'The "{grp}" group contains {nGoob} data points with unrealistic {self.lon_col} or {self.lat_col} values.')
                
            tfm.dfs[grp] = df.loc[~(mask_zeroes | mask_goob)]
# Check that measurements located at (0,0) get removed
dfs = {'biota': pd.DataFrame({'lon': [0, 1, 0], 'lat': [0, 2, 0]})}
tfm = Transformer(dfs, cbs=[SanitizeLonLatCB()])
tfm()['biota']

expected = [1., 2.]
fc.test_eq(tfm()['biota'].iloc[0].to_list(), expected)
# Check that comma decimal separator get replaced by point instead
dfs = {'biota': pd.DataFrame({'lon': ['45,2'], 'lat': ['43,1']})}
tfm = Transformer(dfs, cbs=[SanitizeLonLatCB()])
tfm()['biota']

expected = [45.2, 43.1]
fc.test_eq(tfm()['biota'].iloc[0].to_list(), expected)
# Check that out of bounds lon or lat get removed
dfs = {'biota': pd.DataFrame({'lon': [-190, 190, 1, 2, 1.1], 'lat': [1, 2, 91, -91, 2.2]})}
tfm = Transformer(dfs, cbs=[SanitizeLonLatCB()])
tfm()['biota']

expected = [1.1, 2.2]
fc.test_eq(tfm()['biota'].iloc[0].to_list(), expected)

Add columns

This section gathers callbacks that are used to add required columns to the dataframes.


source

AddSampleTypeIdColumnCB

 AddSampleTypeIdColumnCB (cdl_cfg:Callable=<function cdl_cfg>,
                          col_name:str='samptype_id')

Base class for callbacks.

Type Default Details
cdl_cfg Callable cdl_cfg Callable to get the CDL config dictionary
col_name str samptype_id Column name to store the sample type id
Exported source
class AddSampleTypeIdColumnCB(Callback):
    def __init__(self, 
                 cdl_cfg: Callable=cdl_cfg, # Callable to get the CDL config dictionary
                 col_name: str='samptype_id' # Column name to store the sample type id
                 ): 
        "Add a column with the sample type id as defined in the CDL."
        fc.store_attr()
        self.lut = {v['name']: v['id'] for v in cdl_cfg()['grps'].values()}
        
    def __call__(self, tfm):
        for grp, df in tfm.dfs.items(): df[self.col_name] = self.lut[grp]

Let’s test the callback:

dfs = {v['name']: pd.DataFrame({'col_test': [0, 1, 2]}) for v in CONFIGS_CDL['grps'].values()}

tfm = Transformer(dfs, cbs=[AddSampleTypeIdColumnCB(cdl_cfg=lambda: CONFIGS_CDL)])
dfs_test = tfm()

for v in CONFIGS_CDL['grps'].values():
    fc.test_eq(dfs_test[v['name']]['samptype_id'].unique().item(), v['id'])

source

AddNuclideIdColumnCB

 AddNuclideIdColumnCB (col_value:str, lut_fname_fn:Callable=<function
                       nuc_lut_path>, col_name:str='nuclide_id')

Base class for callbacks.

Type Default Details
col_value str Column name containing the nuclide name
lut_fname_fn Callable nuc_lut_path Function returning the lut path
col_name str nuclide_id Column name to store the nuclide id
Exported source
class AddNuclideIdColumnCB(Callback):
    def __init__(self, 
                 col_value: str, # Column name containing the nuclide name
                 lut_fname_fn: Callable=nuc_lut_path, # Function returning the lut path
                 col_name: str='nuclide_id' # Column name to store the nuclide id
                 ): 
        "Add a column with the nuclide id."
        fc.store_attr()
        self.lut = get_lut(lut_fname_fn().parent, lut_fname_fn().name, 
                           key='nc_name', value='nuclide_id', reverse=False)
        
    def __call__(self, tfm: Transformer):
        for grp, df in tfm.dfs.items(): 
            df[self.col_name] = df[self.col_value].map(self.lut)
dfs = {v['name']: pd.DataFrame({'Nuclide': ['cs137', 'pu239_240_tot']}) for v in CONFIGS_CDL['grps'].values()}

lut_fname_fn = lambda: Path('./files/lut/dbo_nuclide.xlsx')

tfm = Transformer(dfs, cbs=[AddNuclideIdColumnCB(col_value='Nuclide', lut_fname_fn=lut_fname_fn)])
tfm()['seawater']

expected = [33, 77]
for grp in tfm.dfs.keys():
    fc.test_eq(tfm.dfs[grp]['nuclide_id'].to_list(), expected)

Map & Standardize


source

RemapCB

 RemapCB (fn_lut:Callable, col_remap:str, col_src:str,
          dest_grps:list[str]|str=['seawater', 'biota', 'sediment',
          'suspended-matter'], default_value:Any=-1, verbose:bool=False)

Generic MARIS remapping callback.

Type Default Details
fn_lut Callable Function that returns the lookup table dictionary
col_remap str Name of the column to remap
col_src str Name of the column with the source values
dest_grps list[str] | str [‘seawater’, ‘biota’, ‘sediment’, ‘suspended-matter’] List of destination groups
default_value Any -1 Default value for unmatched entries
verbose bool False Whether to print unmatched values
Exported source
class RemapCB(Callback):
    "Generic MARIS remapping callback."
    def __init__(self, 
                 fn_lut: Callable, # Function that returns the lookup table dictionary
                 col_remap: str, # Name of the column to remap
                 col_src: str, # Name of the column with the source values
                 dest_grps: list[str]|str=grp_names(), # List of destination groups
                 default_value: Any = -1, # Default value for unmatched entries
                 verbose: bool = False, # Whether to print unmatched values
                ):
        fc.store_attr()
        self.lut = None
        if isinstance(dest_grps, str): self.dest_grps = [dest_grps]
        self.__doc__ = f"Remap values from '{col_src}' to '{col_remap}' for groups: {', '.join(dest_grps)}."

    def __call__(self, tfm):
        self.lut = self.fn_lut()
        for grp in self.dest_grps:
            if grp in tfm.dfs:
                self._remap_group(tfm.dfs[grp])

    def _remap_group(self, df: pd.DataFrame):
        df[self.col_remap] = df[self.col_src].apply(self._remap_value)

    def _remap_value(self, value: str) -> Any:
        value = value.strip() if isinstance(value, str) else value
        match = self.lut.get(value, Match(self.default_value, None, None, None))
        if isinstance(match, Match):
            if match.matched_id == self.default_value and self.verbose:
                print(f"Unmatched value: {value}")
            return match.matched_id 
        else:
            return match
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[1], line 2
      1 #| exports
----> 2 class RemapCB(Callback):
      3     "Generic MARIS remapping callback."
      4     def __init__(self, 
      5                  fn_lut: Callable, # Function that returns the lookup table dictionary
      6                  col_remap: str, # Name of the column to remap
   (...)
     10                  verbose: bool = False, # Whether to print unmatched values
     11                 ):

NameError: name 'Callback' is not defined

source

LowerStripNameCB

 LowerStripNameCB (col_src:str, col_dst:str=None,
                   fn_transform:Callable=<function <lambda>>)

Convert values to lowercase and strip any trailing spaces.

Type Default Details
col_src str Source column name e.g. ‘Nuclide’
col_dst str None Destination column name
fn_transform Callable Transformation function
Exported source
class LowerStripNameCB(Callback):
    "Convert values to lowercase and strip any trailing spaces."
    def __init__(self, 
                 col_src: str, # Source column name e.g. 'Nuclide'
                 col_dst: str=None, # Destination column name
                 fn_transform: Callable=lambda x: x.lower().strip() # Transformation function
                 ):
        fc.store_attr()
        self.__doc__ = f"Convert values from '{col_src}' to lowercase, strip spaces, and store in '{col_dst}'."
        if not col_dst: self.col_dst = col_src
        
    def _safe_transform(self, value):
        "Ensure value is not NA and apply transformation function."
        return value if pd.isna(value) else self.fn_transform(str(value))
            
    def __call__(self, tfm):
        for key in tfm.dfs.keys():
            tfm.dfs[key][self.col_dst] = tfm.dfs[key][self.col_src].apply(self._safe_transform)

Let’s test the callback:

dfs = {'seawater': pd.DataFrame({'Nuclide': ['CS137', '226RA']})}

tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='Nuclide', col_dst='NUCLIDE')])
fc.test_eq(tfm()['seawater']['NUCLIDE'].to_list(), ['cs137', '226ra'])


tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='Nuclide')])
fc.test_eq(tfm()['seawater']['Nuclide'].to_list(), ['cs137', '226ra'])

The point is when (semi-automatic) remapping names generally:

  1. we need first to guess (fuzzy matching or other) the right nuclide name.
  2. Then manually check the result and eventually update the lookup table.
  3. Finally we can apply the lookup table to the dataframe.

Change structure


source

RemoveAllNAValuesCB

 RemoveAllNAValuesCB (cols_to_check:Dict[str,str])

Remove rows with all NA values.

Type Details
cols_to_check Dict A dictionary with the sample type as key and the column name to check as value
Exported source
class RemoveAllNAValuesCB(Callback):
    "Remove rows with all NA values."
    def __init__(self, 
                 cols_to_check: Dict[str, str] # A dictionary with the sample type as key and the column name to check as value
                ):
        fc.store_attr()

    def __call__(self, tfm):
        for k in tfm.dfs.keys():
            col_to_check = self.cols_to_check[k]
            mask = tfm.dfs[k][col_to_check].isnull().all(axis=1)
            tfm.dfs[k] = tfm.dfs[k][~mask]

Many data providers use a long format (e.g lat, lon, radionuclide, value, unc, ...) to store their data. When encoding as netCDF, it is often required to use a wide format (e.g lat, lon, nuclide1_value, nuclide1_unc, nuclide2_value, nuclide2_unc, ...). The class ReshapeLongToWide is designed to perform this transformation.


source

ReshapeLongToWide

 ReshapeLongToWide (columns:List[str]=['nuclide'],
                    values:List[str]=['value'], num_fill_value:int=-999,
                    str_fill_value='STR FILL VALUE')

Convert data from long to wide with renamed columns.

Type Default Details
columns List [‘nuclide’] Columns to use as index
values List [‘value’] Columns to use as values
num_fill_value int -999 Fill value for numeric columns
str_fill_value str STR FILL VALUE
Exported source
class ReshapeLongToWide(Callback):
    "Convert data from long to wide with renamed columns."
    def __init__(self, 
                 columns: List[str]=['nuclide'], # Columns to use as index
                 values: List[str]=['value'], # Columns to use as values
                 num_fill_value: int=-999, # Fill value for numeric columns
                 str_fill_value='STR FILL VALUE'
                 ):
        fc.store_attr()
        self.derived_cols = self._get_derived_cols()
    
    def _get_derived_cols(self):
        "Retrieve all possible derived vars (e.g 'unc', 'dl', ...) from configs."
        return [value['name'] for value in cdl_cfg()['vars']['suffixes'].values()]

    def renamed_cols(self, cols):
        "Flatten columns name."
        return [inner if outer == "value" else f'{inner}{outer}' if inner else outer
                for outer, inner in cols]

    def _get_unique_fill_value(self, df, idx):
        "Get a unique fill value for NaN replacement."
        fill_value = self.num_fill_value
        while (df[idx] == fill_value).any().any():
            fill_value -= 1
        return fill_value

    def _fill_nan_values(self, df, idx):
        "Fill NaN values in index columns."
        num_fill_value = self._get_unique_fill_value(df, idx)
        for col in idx:
            fill_value = num_fill_value if pd.api.types.is_numeric_dtype(df[col]) else self.str_fill_value
            df[col] = df[col].fillna(fill_value)
        return df, num_fill_value

    def pivot(self, df):
        derived_coi = [col for col in self.derived_cols if col in df.columns]
        # In past implementation we added an index column before pivoting 
        # TO BE REMOVED
        # making all rows (compound_idx) unique.
        # df.index.name = 'org_index'
        # df = df.reset_index()
        idx = list(set(df.columns) - set(self.columns + derived_coi + self.values))
        
        df, num_fill_value = self._fill_nan_values(df, idx)

        pivot_df = df.pivot_table(index=idx,
                                  columns=self.columns,
                                  values=self.values + derived_coi,
                                  fill_value=np.nan,
                                  aggfunc=lambda x: x).reset_index()

        pivot_df[idx] = pivot_df[idx].replace({self.str_fill_value: np.nan, num_fill_value: np.nan})
        pivot_df = self.set_index(pivot_df)
        return pivot_df

    def set_index(self, df):
        "Set the index of the dataframe."
        # TODO: Consider implementing a universal unique index
        # by hashing the compound index columns (lat, lon, time, depth, etc.)
        df.index.name = 'org_index'
        return df
    
    def __call__(self, tfm):
        for grp in tfm.dfs.keys():
            tfm.dfs[grp] = self.pivot(tfm.dfs[grp])
            tfm.dfs[grp].columns = self.renamed_cols(tfm.dfs[grp].columns)

Example of usage:

  • Case 1: compound_idx (in our case made of lon, lat, time, depth, …) are unique
dfs_test = {'seawater': pd.DataFrame({
    'compound_idx': ['a', 'b', 'c', 'd'], 
    'nuclide': ['cs137', 'cs137', 'pu239_240_tot', 'pu239_240_tot'], 
    'value': [1, 2, 3, 4],
    '_unc': [0.1, 0.2, 0.3, 0.4]})}

tfm = Transformer(dfs_test, cbs=[ReshapeLongToWide()])
tfm()['seawater']
compound_idx cs137_unc pu239_240_tot_unc cs137 pu239_240_tot
org_index
0 a 0.1 NaN 1.0 NaN
1 b 0.2 NaN 2.0 NaN
2 c NaN 0.3 NaN 3.0
3 d NaN 0.4 NaN 4.0
  • Case 2: compound_idx are not unique
dfs_test = {'seawater': pd.DataFrame({
    'compound_idx': ['a', 'a', 'c', 'd'], 
    'nuclide': ['cs137', 'cs134', 'pu239_240_tot', 'pu239_240_tot'], 
    'value': [1, 2, 3, 4],
    '_unc': [0.1, 0.2, 0.3, 0.4]})}

tfm = Transformer(dfs_test, cbs=[ReshapeLongToWide()])
tfm()['seawater']
compound_idx cs134_unc cs137_unc pu239_240_tot_unc cs134 cs137 pu239_240_tot
org_index
0 a 0.2 0.1 NaN 2.0 1.0 NaN
1 c NaN NaN 0.3 NaN NaN 3.0
2 d NaN NaN 0.4 NaN NaN 4.0
dfs_test = fc.load_pickle('./files/pkl/dfs_reshape_test_in.pkl')
dfs_expected = fc.load_pickle('./files/pkl/dfs_reshape_test_expected.pkl')

tfm = Transformer(dfs_test, cbs=[ReshapeLongToWide()])
dfs_output = tfm()
test_dfs(dfs_output, dfs_expected)

source

CompareDfsAndTfmCB

 CompareDfsAndTfmCB (dfs:Dict[str,pandas.core.frame.DataFrame])

Create a dataframe of dropped data. Data included in the dfs not in the tfm.

Type Details
dfs Dict Original dataframes
Exported source
class CompareDfsAndTfmCB(Callback):
    "Create a dataframe of dropped data. Data included in the `dfs` not in the `tfm`."
    def __init__(self, 
                 dfs: Dict[str, pd.DataFrame] # Original dataframes
                 ): 
        fc.store_attr()
        
    def __call__(self, tfm: Transformer) -> None:
        self._initialize_tfm_attributes(tfm)
        for grp in tfm.dfs.keys():
            dropped_df = self._get_dropped_data(grp, tfm)
            tfm.dfs_dropped[grp] = dropped_df
            tfm.compare_stats[grp] = self._compute_stats(grp, tfm)

    def _initialize_tfm_attributes(self, tfm: Transformer) -> None:
        tfm.dfs_dropped = {}
        tfm.compare_stats = {}

    def _get_dropped_data(self, 
                          grp: str, # The group key
                          tfm: Transformer # The transformation object containing `dfs`
                         ) -> pd.DataFrame: # Dataframe with dropped rows
        "Get the data that is present in `dfs` but not in `tfm.dfs`."
        index_diff = self.dfs[grp].index.difference(tfm.dfs[grp].index)
        return self.dfs[grp].loc[index_diff]
    
    def _compute_stats(self, 
                       grp: str, # The group key
                       tfm: Transformer # The transformation object containing `dfs`
                      ) -> Dict[str, int]: # Dictionary with comparison statistics
        "Compute comparison statistics between `dfs` and `tfm.dfs`."
        return {
            'Number of rows in dfs': len(self.dfs[grp].index),
            'Number of rows in tfm.dfs': len(tfm.dfs[grp].index),
            'Number of dropped rows': len(tfm.dfs_dropped[grp].index),
            'Number of rows in tfm.dfs + Number of dropped rows': len(tfm.dfs[grp].index) + len(tfm.dfs_dropped[grp].index)
        }

CompareDfsAndTfmCB compares the original dataframes to the transformed dataframe. A dictionary of dataframes, tfm.dfs_dropped, is created to include the data present in the original dataset but absent from the transformed data. tfm.compare_stats provides a quick overview of the number of rows in both the original dataframes and the transformed dataframe.

For instance:

dfs_test = {
    'seawater': pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]}),
    'sediment': pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]}),
}

class TestTfmCB(Callback):
    def __call__(self, tfm):
        for key in tfm.dfs.keys():
            df = tfm.dfs[key]
            drop_idxs = [0, 1] if key == 'seawater' else [0]
            df.drop(drop_idxs, inplace=True)
            
tfm = Transformer(dfs_test, cbs=[
    TestTfmCB(), 
    CompareDfsAndTfmCB(dfs_test)], inplace=False)

print(tfm())

fc.test_eq(tfm.compare_stats['seawater']['Number of dropped rows'], 2)
fc.test_eq(tfm.compare_stats['sediment']['Number of dropped rows'], 1)
{'seawater':    a  b
2  3  6, 'sediment':    a  b
1  2  5
2  3  6}

Time

These callbacks are used to transform the time variable according to netCDF CF standards. For instance, the EncodeTimeCB callback is used to encode the time variable as an integer representing seconds since a reference date as specified in configs.ipynb CONFIGS_CDL dictionary.


source

EncodeTimeCB

 EncodeTimeCB (cfg:dict, verbose:bool=False)

Encode time as int representing seconds since xxx.

Type Default Details
cfg dict Configuration dictionary
verbose bool False Whether to print the number of invalid time entries
Exported source
class EncodeTimeCB(Callback):
    "Encode time as `int` representing seconds since xxx."    
    def __init__(self, 
                 cfg: dict, # Configuration dictionary
                 verbose: bool=False # Whether to print the number of invalid time entries
                 ): 
        fc.store_attr()
        
    def __call__(self, tfm): 
        def format_time(x): 
            return date2num(x, units=self.cfg['units']['time'])
        
        for k in tfm.dfs.keys():
            # If invalid time entries.
            if tfm.dfs[k]['time'].isna().any():
                if self.verbose:
                    invalid_time_df=tfm.dfs[k][tfm.dfs[k]['time'].isna()]
                    print (f'{len(invalid_time_df.index)} of {len(tfm.dfs[k].index)} entries for `time` are invalid for {k}.')
                # Filter nan values
                tfm.dfs[k] = tfm.dfs[k][tfm.dfs[k]['time'].notna()]
            
            tfm.dfs[k]['time'] = tfm.dfs[k]['time'].apply(format_time)