Callback used in handlers

Core

The Transformer class is designed to facilitate the application of a series of callbacks to a set of dataframes. It provides a structured way to apply transformations (i.e Callback) to the data, with a focus on flexibility and ease of use.


source

Callback

 Callback ()

Base class for callbacks.

Exported source
class Callback(): 
    "Base class for callbacks."
    order = 0

source

run_cbs

 run_cbs (cbs:List[__main__.Callback], obj:Any)

Run the callbacks in the order they are specified.

Type Details
cbs List List of callbacks to run
obj Any Object to pass to the callbacks
Exported source
def run_cbs(
    cbs: List[Callback], # List of callbacks to run
    obj: Any # Object to pass to the callbacks
    ):
    "Run the callbacks in the order they are specified."
    for cb in sorted(cbs, key=attrgetter('order')):
        if cb.__doc__: obj.logs.append(cb.__doc__)
        cb(obj)

source

Transformer

 Transformer (data:Union[Dict[str,pandas.core.frame.DataFrame],pandas.core
              .frame.DataFrame],
              cbs:Optional[List[__main__.Callback]]=None,
              custom_maps:Dict=None, inplace:bool=False)

Transform the dataframe(s) according to the specified callbacks.

Type Default Details
data Union Data to be transformed
cbs Optional None List of callbacks to run
custom_maps Dict None
inplace bool False Whether to modify the dataframe(s) in place
Exported source
class Transformer():
    "Transform the dataframe(s) according to the specified callbacks."
    def __init__(self, 
                 data: Union[Dict[str, pd.DataFrame], pd.DataFrame], # Data to be transformed
                 cbs: Optional[List[Callback]]=None, # List of callbacks to run
                 custom_maps: Dict = None,
                 inplace: bool=False # Whether to modify the dataframe(s) in place
                 ): 
        fc.store_attr()
        self.is_single_df = isinstance(data, pd.DataFrame)
        self.df, self.dfs = self._prepare_data(data, inplace)
        self.logs = []
        self.custom_maps = custom_maps or defaultdict(lambda: defaultdict(dict))
            
    def _prepare_data(self, data, inplace):
        if self.is_single_df:
            return (data if inplace else data.copy()), None
        else:
            return None, (data if inplace else {k: v.copy() for k, v in data.items()})
    
    def unique(self, col_name: str) -> np.ndarray:
        "Distinct values of a specific column present in all groups."
        if self.is_single_df:
            values = self.df.get(col_name, pd.Series()).dropna().values
        else:
            columns = [df.get(col_name) for df in self.dfs.values() if df.get(col_name) is not None]
            values = np.concatenate([col.dropna().values for col in columns]) if columns else []
        return np.unique(values)
        
    def __call__(self):
        "Transform the dataframe(s) according to the specified callbacks."
        if self.cbs: run_cbs(self.cbs, self)
        return self.df if self.dfs is None else self.dfs

Below, a few examples of how to use the Transformer class. Let’s define first a test callback that adds 1 to the depth:

class TestCB(Callback):
    "A test callback to add 1 to the depth."
    def __call__(self, tfm: Transformer):
        for grp, df in tfm.dfs.items(): 
            df['depth'] = df['depth'].apply(lambda x: x+1)

And apply it to the following dataframes:

dfs = {'biota': pd.DataFrame({'id': [0, 1, 2], 'species': [0, 2, 0], 'depth': [2, 3, 4]}),
       'seawater': pd.DataFrame({'id': [0, 1, 2], 'depth': [3, 4, 5]})}

tfm = Transformer(dfs, cbs=[TestCB()])
dfs_test = tfm()

fc.test_eq(dfs_test['biota']['depth'].to_list(), [3, 4, 5])
fc.test_eq(dfs_test['seawater']['depth'].to_list(), [4, 5, 6])
class TestCB(Callback):
    "A test callback to add 1 to the depth."
    def __call__(self, tfm: Transformer):
        tfm.df['depth'] = tfm.df['depth'].apply(lambda x: x+1)
df = pd.DataFrame({'id': [0, 1, 2], 'species': [0, 2, 0], 'depth': [2, 3, 4]})

tfm = Transformer(df, cbs=[TestCB()])
df_test = tfm()

fc.test_eq(df_test['depth'].to_list(), [3, 4, 5])

Geographical

This section gathers callbacks that are used to transform the geographical coordinates.


source

SanitizeLonLatCB

 SanitizeLonLatCB (lon_col:str='LON', lat_col:str='LAT',
                   verbose:bool=False)

Drop rows with invalid longitude & latitude values. Convert , separator to . separator.

Type Default Details
lon_col str LON Longitude column name
lat_col str LAT Latitude column name
verbose bool False Whether to print the number of invalid longitude & latitude values
Exported source
class SanitizeLonLatCB(Callback):
    "Drop rows with invalid longitude & latitude values. Convert `,` separator to `.` separator."
    def __init__(self, 
                 lon_col: str='LON', # Longitude column name
                 lat_col: str='LAT', # Latitude column name
                 verbose: bool=False # Whether to print the number of invalid longitude & latitude values
                 ):
        fc.store_attr()
        
    def __call__(self, tfm: Transformer):
        for grp, df in tfm.dfs.items():
            # Convert `,` separator to `.` separator
            df[self.lon_col] = df[self.lon_col].apply(lambda x: float(str(x).replace(',', '.')))
            df[self.lat_col] = df[self.lat_col].apply(lambda x: float(str(x).replace(',', '.')))
            
            # Mask zero values
            mask_zeroes = (df[self.lon_col] == 0) & (df[self.lat_col] == 0) 
            nZeroes = mask_zeroes.sum()
            if nZeroes and self.verbose: 
                print(f'The "{grp}" group contains {nZeroes} data points whose ({self.lon_col}, {self.lat_col}) = (0, 0)')
            
            # Mask out of bounds values
            mask_goob = (df[self.lon_col] < -180) | (df[self.lon_col] > 180) | (df[self.lat_col] < -90) | (df[self.lat_col] > 90)
            nGoob = mask_goob.sum()
            if nGoob and self.verbose: 
                print(f'The "{grp}" group contains {nGoob} data points with unrealistic {self.lon_col} or {self.lat_col} values.')
                
            tfm.dfs[grp] = df.loc[~(mask_zeroes | mask_goob)]
# Check that measurements located at (0,0) get removed
dfs = {'BIOTA': pd.DataFrame({'LON': [0, 1, 0], 'LAT': [0, 2, 0]})}
tfm = Transformer(dfs, cbs=[SanitizeLonLatCB()])
tfm()['BIOTA']

expected = [1., 2.]
fc.test_eq(tfm()['BIOTA'].iloc[0].to_list(), expected)
# Check that comma decimal separator get replaced by point instead
dfs = {'BIOTA': pd.DataFrame({'LON': ['45,2'], 'LAT': ['43,1']})}
tfm = Transformer(dfs, cbs=[SanitizeLonLatCB()])
tfm()['BIOTA']

expected = [45.2, 43.1]
fc.test_eq(tfm()['BIOTA'].iloc[0].to_list(), expected)
# Check that out of bounds lon or lat get removed
dfs = {'BIOTA': pd.DataFrame({'LON': [-190, 190, 1, 2, 1.1], 'LAT': [1, 2, 91, -91, 2.2]})}
tfm = Transformer(dfs, cbs=[SanitizeLonLatCB()])
tfm()['BIOTA']

expected = [1.1, 2.2]
fc.test_eq(tfm()['BIOTA'].iloc[0].to_list(), expected)

Map & Standardize


source

RemapCB

 RemapCB (fn_lut:Callable, col_remap:str, col_src:str,
          dest_grps:list[str]|str=dict_keys(['BIOTA', 'SEAWATER',
          'SEDIMENT', 'SUSPENDED_MATTER']), default_value:Any=0,
          verbose:bool=False)

Generic MARIS remapping callback.

Type Default Details
fn_lut Callable Function that returns the lookup table dictionary
col_remap str Name of the column to remap
col_src str Name of the column with the source values
dest_grps list[str] | str dict_keys([‘BIOTA’, ‘SEAWATER’, ‘SEDIMENT’, ‘SUSPENDED_MATTER’]) List of destination groups
default_value Any 0 Default value for unmatched entries
verbose bool False Whether to print the number of unmatched entries
Exported source
class RemapCB(Callback):
    "Generic MARIS remapping callback."
    def __init__(self, 
                 fn_lut: Callable, # Function that returns the lookup table dictionary
                 col_remap: str, # Name of the column to remap
                 col_src: str, # Name of the column with the source values
                 dest_grps: list[str]|str=NC_GROUPS.keys(), # List of destination groups
                 default_value: Any = 0, # Default value for unmatched entries
                 verbose: bool=False # Whether to print the number of unmatched entries
                ):
        fc.store_attr()
        self.lut = None
        if isinstance(dest_grps, str): self.dest_grps = [dest_grps]
        # Format the documentation string based on the type and content of dest_grps
        if isinstance(self.dest_grps, list):
            if len(self.dest_grps) > 1:
                grp_str = ', '.join(self.dest_grps[:-1]) + ' and ' + self.dest_grps[-1]
            else:
                grp_str = self.dest_grps[0]
        else:
            grp_str = self.dest_grps
                
        self.__doc__ = f"Remap values from '{col_src}' to '{col_remap}' for groups: {grp_str}."

    def __call__(self, tfm):
        self.lut = self.fn_lut()
        for grp in self.dest_grps:
            if grp in tfm.dfs:
                self._remap_group(tfm.dfs[grp])
            else:
                print(f"Group {grp} not found in the dataframes.")

    def _remap_group(self, df: pd.DataFrame):
        df[self.col_remap] = df[self.col_src].apply(self._remap_value)

    def _remap_value(self, value: str) -> Any:
        value = value.strip() if isinstance(value, str) else value
        match = self.lut.get(value, Match(self.default_value, None, None, None))
        if isinstance(match, Match):
            if match.matched_id == self.default_value:
                if self.verbose:
                    print(f"Unmatched value: {value}")
            return match.matched_id 
        else:
            return match

source

LowerStripNameCB

 LowerStripNameCB (col_src:str, col_dst:str=None,
                   fn_transform:Callable=<function <lambda>>)

Convert values to lowercase and strip any trailing spaces.

Type Default Details
col_src str Source column name e.g. ‘Nuclide’
col_dst str None Destination column name
fn_transform Callable Transformation function
Exported source
class LowerStripNameCB(Callback):
    "Convert values to lowercase and strip any trailing spaces."
    def __init__(self, 
                 col_src: str, # Source column name e.g. 'Nuclide'
                 col_dst: str=None, # Destination column name
                 fn_transform: Callable=lambda x: x.lower().strip() # Transformation function
                 ):
        fc.store_attr()
        self.__doc__ = f"Convert '{col_src}' column values to lowercase, strip spaces, and store in '{col_dst}' column."
        if not col_dst: self.col_dst = col_src
        
    def _safe_transform(self, value):
        "Ensure value is not NA and apply transformation function."
        return value if pd.isna(value) else self.fn_transform(str(value))
            
    def __call__(self, tfm):
        for key in tfm.dfs.keys():
            tfm.dfs[key][self.col_dst] = tfm.dfs[key][self.col_src].apply(self._safe_transform)

Let’s test the callback:

dfs = {'seawater': pd.DataFrame({'Nuclide': ['CS137', '226RA']})}

tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='Nuclide', col_dst='NUCLIDE')])
fc.test_eq(tfm()['seawater']['NUCLIDE'].to_list(), ['cs137', '226ra'])


tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='Nuclide')])
fc.test_eq(tfm()['seawater']['Nuclide'].to_list(), ['cs137', '226ra'])

The point is when (semi-automatic) remapping names generally:

  1. we need first to guess (fuzzy matching or other) the right nuclide name.
  2. Then manually check the result and eventually update the lookup table.
  3. Finally we can apply the lookup table to the dataframe.

Change structure


source

AddSampleTypeIdColumnCB

 AddSampleTypeIdColumnCB (lut:dict={'SEAWATER': 1, 'BIOTA': 2, 'SEDIMENT':
                          3, 'SUSPENDED_MATTER': 4},
                          col_name:str='SAMPLE_TYPE')

Base class for callbacks.

Type Default Details
lut dict {‘SEAWATER’: 1, ‘BIOTA’: 2, ‘SEDIMENT’: 3, ‘SUSPENDED_MATTER’: 4} Lookup table for sample type
col_name str SAMPLE_TYPE Column name to store the sample type id
Exported source
class AddSampleTypeIdColumnCB(Callback):
    def __init__(self, 
                 lut: dict=SMP_TYPE_LUT, # Lookup table for sample type
                 col_name: str='SAMPLE_TYPE' # Column name to store the sample type id
                 ): 
        "Add a column with the sample type as defined in the CDL."
        fc.store_attr()
        
    def __call__(self, tfm):
        for grp, df in tfm.dfs.items():             
            df[self.col_name] = self.lut[grp]

Let’s test the callback:

dfs = {smp_type: pd.DataFrame({'col_test': [0, 1, 2]}) for smp_type in SMP_TYPE_LUT.keys()};

tfm = Transformer(dfs, cbs=[AddSampleTypeIdColumnCB()])
dfs_test = tfm()

for smp_type in SMP_TYPE_LUT.keys():
    fc.test_eq(dfs_test[smp_type]['SAMPLE_TYPE'].unique().item(), SMP_TYPE_LUT[smp_type])
''' 05 Feb 2025 - NM - AddNuclideIdColumnCB is not used anymore.
#| exports
class AddNuclideIdColumnCB(Callback):
    def __init__(self, 
                 col_value: str, # Column name containing the nuclide name
                 lut_fname_fn: Callable=nuc_lut_path, # Function returning the lut path
                 col_name: str='nuclide_id' # Column name to store the nuclide id
                 ): 
        "Add a column with the nuclide id."
        fc.store_attr()
        self.lut = get_lut(lut_fname_fn().parent, lut_fname_fn().name, 
                           key='nc_name', value='nuclide_id', reverse=False)
        
    def __call__(self, tfm: Transformer):
        for grp, df in tfm.dfs.items(): 
            df[self.col_name] = df[self.col_value].map(self.lut)
'''
' 05 Feb 2025 - NM - AddNuclideIdColumnCB is not used anymore.\n#| exports\nclass AddNuclideIdColumnCB(Callback):\n    def __init__(self, \n                 col_value: str, # Column name containing the nuclide name\n                 lut_fname_fn: Callable=nuc_lut_path, # Function returning the lut path\n                 col_name: str=\'nuclide_id\' # Column name to store the nuclide id\n                 ): \n        "Add a column with the nuclide id."\n        fc.store_attr()\n        self.lut = get_lut(lut_fname_fn().parent, lut_fname_fn().name, \n                           key=\'nc_name\', value=\'nuclide_id\', reverse=False)\n        \n    def __call__(self, tfm: Transformer):\n        for grp, df in tfm.dfs.items(): \n            df[self.col_name] = df[self.col_value].map(self.lut)\n'

source

SelectColumnsCB

 SelectColumnsCB (cois:dict)

Select columns of interest.

Type Details
cois dict Columns of interest
Exported source
class SelectColumnsCB(Callback):
    "Select columns of interest."
    def __init__(self, 
                 cois: dict # Columns of interest
                 ): 
        fc.store_attr()
        
    def __call__(self, tfm):
        "Select columns of interest."
        for grp, df in tfm.dfs.items(): 
            tfm.dfs[grp] = df.loc[:, self.cois.keys()]

source

RenameColumnsCB

 RenameColumnsCB (renaming_rules:dict)

Renaming variables to MARIS standard names.

Type Details
renaming_rules dict Renaming rules
Exported source
class RenameColumnsCB(Callback):
    "Renaming variables to MARIS standard names."
    def __init__(self,
                 renaming_rules: dict # Renaming rules
                 ): 
        fc.store_attr()
        
    def __call__(self, tfm):
        for grp in tfm.dfs.keys(): 
            tfm.dfs[grp].rename(columns=self.renaming_rules, inplace=True)

source

RemoveAllNAValuesCB

 RemoveAllNAValuesCB (cols_to_check:Union[Dict[str,list],list],
                      how:str='all')

Remove rows with all NA values in specified columns.

Type Default Details
cols_to_check Union Dict or list of columns to check
how str all How to handle NA values ‘all’ or ‘any’
Exported source
class RemoveAllNAValuesCB(Callback):
    "Remove rows with all NA values in specified columns."
    def __init__(self, 
                 cols_to_check: Union[Dict[str, list], list],  # Dict or list of columns to check
                 how: str='all'  # How to handle NA values 'all' or 'any'
                ):
        fc.store_attr()

    def __call__(self, tfm):
        # Convert list to dict if cols_to_check is a list
        cols_dict = (self.cols_to_check if isinstance(self.cols_to_check, dict) 
                    else {k: self.cols_to_check for k in tfm.dfs.keys()})
        
        for sample_type, columns in cols_dict.items():
            tfm.dfs[sample_type].dropna(
                subset=columns,
                how=self.how,
                inplace=True
            )

Let’s test the callback:

def test_remove_all_na_values_cb():
    """Test RemoveAllNAValuesCB with both dict and list inputs."""
    # Create sample data
    sample_data = {
        'SEAWATER': pd.DataFrame({
            'value': [1.0, np.nan, np.nan, 4.0],
            'uncertainty': [0.1, np.nan, np.nan, 0.4],
            'other_col': ['a', 'b', 'c', 'd']
        }),
        'SEDIMENT': pd.DataFrame({
            'value': [np.nan, 2.0, np.nan, np.nan],
            'uncertainty': [np.nan, 0.2, np.nan, np.nan],
            'meta': ['x', 'y', 'z', 'w']
        })
    }
    
    tfm = Transformer(sample_data)
    
    # Test with list input
    cb1 = RemoveAllNAValuesCB(cols_to_check=['value', 'uncertainty'], how='all')
    tfm_list = copy.deepcopy(tfm)
    cb1(tfm_list)
    
    # Test with dict input
    cb2 = RemoveAllNAValuesCB(
        cols_to_check={'SEAWATER': ['value', 'uncertainty'], 
                       'SEDIMENT': ['value', 'uncertainty']},
        how='all'
    )
    tfm_dict = copy.deepcopy(tfm)
    cb2(tfm_dict)
    
    # Verify both approaches give same results
    for k in tfm.dfs.keys():
        assert tfm_list.dfs[k].equals(tfm_dict.dfs[k])
        assert len(tfm_list.dfs[k]) == len(tfm_dict.dfs[k])

source

CompareDfsAndTfmCB

 CompareDfsAndTfmCB (dfs:Dict[str,pandas.core.frame.DataFrame])

Create a dataframe of removed data and track changes in row counts due to transformations.

Type Details
dfs Dict Original dataframes
Exported source
class CompareDfsAndTfmCB(Callback):
    "Create a dataframe of removed data and track changes in row counts due to transformations."
    def __init__(self, 
                 dfs: Dict[str, pd.DataFrame]  # Original dataframes
                 ): 
        fc.store_attr()
        
    def __call__(self, tfm: Transformer) -> None:
        self._initialize_tfm_attributes(tfm)
        for grp in tfm.dfs.keys():
            self._compute_changes(grp, tfm)

    def _initialize_tfm_attributes(self, tfm: Transformer) -> None:
        tfm.dfs_removed = {}
        tfm.compare_stats = {}

    def _compute_changes(self, 
                         grp: str,  # The group key
                         tfm: Transformer  # The transformation object containing `dfs`
                        ) -> None:
        "Compute and store changes including data removed and created during transformation."
        original_df = self.dfs[grp]
        transformed_df = tfm.dfs[grp]

        # Calculate differences
        original_count = len(original_df.index)
        transformed_count = len(transformed_df.index)
        removed_count = len(original_df.index.difference(transformed_df.index))
        created_count = len(transformed_df.index.difference(original_df.index))

        # Store results
        tfm.dfs_removed[grp] = original_df.loc[original_df.index.difference(transformed_df.index)]
        tfm.compare_stats[grp] = {
            'Original row count (dfs)': original_count,
            'Transformed row count (tfm.dfs)': transformed_count,
            'Rows removed from original (tfm.dfs_removed)': removed_count,
            'Rows created in transformed (tfm.dfs_created)': created_count
        }

CompareDfsAndTfmCB compares the original dataframes to the transformed dataframe. A dictionary of dataframes, tfm.dfs_dropped, is created to include the data present in the original dataset but absent from the transformed data. tfm.compare_stats provides a quick overview of the number of rows in both the original dataframes and the transformed dataframe.

For instance:


source

UniqueIndexCB

 UniqueIndexCB (index_name='ID')

Set unique index for each group.

Exported source
class UniqueIndexCB(Callback):
    "Set unique index for each group."
    def __init__(self,
                 index_name='ID'):
        fc.store_attr()
        
    def __call__(self, tfm):
        for k in tfm.dfs.keys():
            # Reset the index of the DataFrame and drop the old index
            tfm.dfs[k] = tfm.dfs[k].reset_index(drop=True)
            # Reset the index again and set the name of the new index to `ìndex_name``
            tfm.dfs[k] = tfm.dfs[k].reset_index(names=[self.index_name])

Time


source

EncodeTimeCB

 EncodeTimeCB (col_time:str='TIME', fn_units:Callable=<function
               get_time_units>)

Encode time as seconds since epoch.

Type Default Details
col_time str TIME
fn_units Callable get_time_units Function returning the time units
Exported source
class EncodeTimeCB(Callback):
    "Encode time as seconds since epoch."    
    def __init__(self, 
                 col_time: str='TIME',
                 fn_units: Callable=get_time_units # Function returning the time units
                 ): 
        fc.store_attr()
        self.units = fn_units()
    
    def __call__(self, tfm): 
        for grp, df in tfm.dfs.items():
            n_missing = df[self.col_time].isna().sum()
            if n_missing:
                print(f"Warning: {n_missing} missing time value(s) in {grp}")
            
            # Remove NaN times and convert to seconds since epoch
            tfm.dfs[grp] = tfm.dfs[grp][tfm.dfs[grp][self.col_time].notna()]
            tfm.dfs[grp][self.col_time] = tfm.dfs[grp][self.col_time].apply(lambda x: date2num(x, units=self.units))
dfs_test = {
    'SEAWATER': pd.DataFrame({
        'TIME': [pd.Timestamp(f'2023-01-0{t}') for t in [1, 2]],
        'value': [1, 2]
        }),
    'SEDIMENT': pd.DataFrame({
        'TIME': [pd.Timestamp(f'2023-01-0{t}') for t in [3, 4]],
        'value': [3, 4]
        }),
}

units = 'seconds since 1970-01-01 00:00:00.0'
tfm = Transformer(dfs_test, cbs=[
    EncodeTimeCB(fn_units=lambda: units)
    ], inplace=False)
dfs_result = tfm()

fc.test_eq(dfs_result['SEAWATER'].TIME.dtype, 'int64')
fc.test_eq(dfs_result['SEDIMENT'].TIME.dtype, 'int64')


fc.test_eq(dfs_result['SEAWATER'].TIME, dfs_test['SEAWATER'].TIME.apply(lambda x: date2num(x, units=units)))
fc.test_eq(dfs_result['SEDIMENT'].TIME, dfs_test['SEDIMENT'].TIME.apply(lambda x: date2num(x, units=units)))

source

DecodeTimeCB

 DecodeTimeCB (col_time:str='TIME', fn_units:Callable=<function
               get_time_units>)

Decode time from seconds since epoch to datetime format.

Type Default Details
col_time str TIME
fn_units Callable get_time_units Function returning the time units
Exported source
class DecodeTimeCB(Callback):
    "Decode time from seconds since epoch to datetime format."    
    def __init__(self, 
                 col_time: str='TIME',
                 fn_units: Callable=get_time_units # Function returning the time units
                 ): 
        fc.store_attr()
        self.units = fn_units()

    def __call__(self, tfm): 
        for grp, df in tfm.dfs.items():
            n_missing = df[self.col_time].isna().sum()
            if n_missing:
                print(f"Warning: {n_missing} missing time value(s) in {grp}.")
            
            # Remove NaN times and convert to datetime
            tfm.dfs[grp] = tfm.dfs[grp][tfm.dfs[grp][self.col_time].notna()]
            tfm.dfs[grp][self.col_time] = df[self.col_time].apply(
                lambda x: num2date(x, units=self.units, only_use_cftime_datetimes=False)
            )
dfs_test = {
    'SEAWATER': pd.DataFrame({
        'TIME': [1672531200, 1672617600],  # 2023-01-01, 2023-01-02 in seconds since epoch
        'value': [1, 2]
        }),
    'SEDIMENT': pd.DataFrame({
        'TIME': [1672704000, 1672790400],  # 2023-01-03, 2023-01-04 in seconds since epoch
        'value': [3, 4]
        }),
}

units = 'seconds since 1970-01-01 00:00:00.0'
tfm = Transformer(dfs_test, cbs=[
    DecodeTimeCB(fn_units=lambda: units)
    ], inplace=False)
dfs_result = tfm()


# Test that times were converted to datetime
fc.test_eq(dfs_result['SEAWATER'].TIME.dtype, 'datetime64[ns]')
fc.test_eq(dfs_result['SEDIMENT'].TIME.dtype, 'datetime64[ns]')

# Test specific datetime values
expected_times_seawater = pd.to_datetime(['2023-01-01', '2023-01-02'])
expected_times_sediment = pd.to_datetime(['2023-01-03', '2023-01-04'])

fc.test_eq(dfs_result['SEAWATER'].TIME.dt.date, expected_times_seawater.date)
fc.test_eq(dfs_result['SEDIMENT'].TIME.dt.date, expected_times_sediment.date)