This data pipeline, known as a “handler” in Marisco terminology, is designed to clean, standardize, and encode BODC Geotraces dataset into MARIS NetCDF format. The handler processes Geotraces data, applying various transformations and lookups to align it with MARIS data standards.

Key functions of this handler:

This handler is a crucial component in the Marisco data processing workflow, ensuring Geotraces data is properly integrated into the MARIS database.

Getting Started

For new MARIS users, please refer to Understanding MARIS Data Formats (NetCDF and Open Refine) for detailed information.

The present notebook pretends to be an instance of Literate Programming in the sense that it is a narrative that includes code snippets that are interspersed with explanations. When a function or a class needs to be exported in a dedicated python module (in our case marisco/handlers/geotraces.py) the code snippet is added to the module using #| exports as provided by the wonderful nbdev library.

Configuration & file paths

  • fname_in: path to the folder containing the HELCOM data in CSV format. The path can be defined as a relative path.

  • fname_out: path and filename for the NetCDF output.The path can be defined as a relative path.

  • Zotero key: used to retrieve attributes related to the dataset from Zotero. The MARIS datasets include a library available on Zotero.

Exported source
fname_in = '../../_data/geotraces/GEOTRACES_IDP2021_v2/seawater/ascii/GEOTRACES_IDP2021_Seawater_Discrete_Sample_Data_v2.csv'
fname_out = '../../_data/output/190-geotraces-2021.nc'
zotero_key = '97UIMEXN'

Load data

Exported source
load_data = lambda fname: pd.read_csv(fname_in)
df = load_data(fname_in)
print(f'df shape: {df.shape}')
df.head()
df shape: (105417, 1188)
Cruise Station:METAVAR:INDEXED_TEXT Type yyyy-mm-ddThh:mm:ss.sss Longitude [degrees_east] Latitude [degrees_north] Bot. Depth [m] Operator's Cruise Name:METAVAR:INDEXED_TEXT Ship Name:METAVAR:INDEXED_TEXT Period:METAVAR:INDEXED_TEXT ... QV:SEADATANET.581 Co_CELL_CONC_BOTTLE [amol/cell] QV:SEADATANET.582 Ni_CELL_CONC_BOTTLE [amol/cell] QV:SEADATANET.583 Cu_CELL_CONC_BOTTLE [amol/cell] QV:SEADATANET.584 Zn_CELL_CONC_BOTTLE [amol/cell] QV:SEADATANET.585 QV:ODV:SAMPLE
0 GA01 0 B 2014-05-17T22:29:00 349.29999 38.4329 4854.0 GEOVIDE Pourquoi pas? 15/05/2014 - 30/06/2014 ... 9 NaN 9 NaN 9 NaN 9 NaN 9 1
1 GA01 0 B 2014-05-17T22:29:00 349.29999 38.4329 4854.0 GEOVIDE Pourquoi pas? 15/05/2014 - 30/06/2014 ... 9 NaN 9 NaN 9 NaN 9 NaN 9 1
2 GA01 0 B 2014-05-17T22:29:00 349.29999 38.4329 4854.0 GEOVIDE Pourquoi pas? 15/05/2014 - 30/06/2014 ... 9 NaN 9 NaN 9 NaN 9 NaN 9 1
3 GA01 0 B 2014-05-17T22:29:00 349.29999 38.4329 4854.0 GEOVIDE Pourquoi pas? 15/05/2014 - 30/06/2014 ... 9 NaN 9 NaN 9 NaN 9 NaN 9 1
4 GA01 0 B 2014-05-17T22:29:00 349.29999 38.4329 4854.0 GEOVIDE Pourquoi pas? 15/05/2014 - 30/06/2014 ... 9 NaN 9 NaN 9 NaN 9 NaN 9 1

5 rows × 1188 columns

Select columns of interest

We select the columns of interest and in particular the elements of interest, in our case radionuclides.


source

SelectColsOfInterestCB

 SelectColsOfInterestCB (common_coi, nuclides_pattern)

Select columns of interest.

Exported source
common_coi = ['yyyy-mm-ddThh:mm:ss.sss', 'Longitude [degrees_east]',
              'Latitude [degrees_north]', 'Bot. Depth [m]', 'DEPTH [m]', 'BODC Bottle Number:INTEGER']

nuclides_pattern = ['^TRITI', '^Th_228', '^Th_23[024]', '^Pa_231', 
                    '^U_236_[DT]', '^Be_', '^Cs_137', '^Pb_210', '^Po_210',
                    '^Ra_22[3468]', 'Np_237', '^Pu_239_[D]', '^Pu_240', '^Pu_239_Pu_240',
                    '^I_129', '^Ac_227']  

class SelectColsOfInterestCB(Callback):
    "Select columns of interest."
    def __init__(self, common_coi, nuclides_pattern): fc.store_attr()
    def __call__(self, tfm):
        nuc_of_interest = [c for c in tfm.df.columns if 
                           any(re.match(pattern, c) for pattern in self.nuclides_pattern)]

        tfm.df = tfm.df[self.common_coi + nuc_of_interest]
tfm = Transformer(df, cbs=[
    SelectColsOfInterestCB(common_coi, nuclides_pattern)
])
df_test = tfm()
print(f'df_test shape: {df_test.shape}')
df_test.head()
df_test shape: (105417, 86)
yyyy-mm-ddThh:mm:ss.sss Longitude [degrees_east] Latitude [degrees_north] Bot. Depth [m] DEPTH [m] BODC Bottle Number:INTEGER TRITIUM_D_CONC_BOTTLE [TU] Cs_137_D_CONC_BOTTLE [uBq/kg] I_129_D_CONC_BOTTLE [atoms/kg] Np_237_D_CONC_BOTTLE [uBq/kg] ... Th_230_TP_CONC_PUMP [uBq/kg] Th_230_SPT_CONC_PUMP [uBq/kg] Th_230_LPT_CONC_PUMP [uBq/kg] Th_232_TP_CONC_PUMP [pmol/kg] Th_232_SPT_CONC_PUMP [pmol/kg] Th_232_LPT_CONC_PUMP [pmol/kg] Th_234_SPT_CONC_PUMP [mBq/kg] Th_234_LPT_CONC_PUMP [mBq/kg] Po_210_TP_CONC_UWAY [mBq/kg] Pb_210_TP_CONC_UWAY [mBq/kg]
0 2014-05-17T22:29:00 349.29999 38.4329 4854.0 2957.1 1214048 NaN NaN NaN NaN ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
1 2014-05-17T22:29:00 349.29999 38.4329 4854.0 2957.2 1214039 NaN NaN NaN NaN ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
2 2014-05-17T22:29:00 349.29999 38.4329 4854.0 2957.2 1214027 NaN NaN NaN NaN ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
3 2014-05-17T22:29:00 349.29999 38.4329 4854.0 2957.2 1214018 NaN NaN NaN NaN ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
4 2014-05-17T22:29:00 349.29999 38.4329 4854.0 2957.2 1214036 NaN NaN NaN NaN ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN

5 rows × 86 columns

BODC Bottle Number:INTEGER field allows to characterize uniquely a sample as shown below:

cols_measurements = [col for col in df_test.columns if col not in common_coi]

unique_key = ['BODC Bottle Number:INTEGER']

df_test.dropna(subset=cols_measurements, how='all', inplace=True);
print(f'df_test shape after dropping rows with no measurements: {df_test.shape}')
print(f'df_test duplicated keys: {df_test[unique_key].duplicated().sum()}')

df_test[df_test[unique_key].duplicated(keep=False)].sort_values(by=unique_key)
df_test shape after dropping rows with no measurements: (9389, 86)
df_test duplicated keys: 0
yyyy-mm-ddThh:mm:ss.sss Longitude [degrees_east] Latitude [degrees_north] Bot. Depth [m] DEPTH [m] BODC Bottle Number:INTEGER TRITIUM_D_CONC_BOTTLE [TU] Cs_137_D_CONC_BOTTLE [uBq/kg] I_129_D_CONC_BOTTLE [atoms/kg] Np_237_D_CONC_BOTTLE [uBq/kg] ... Th_230_TP_CONC_PUMP [uBq/kg] Th_230_SPT_CONC_PUMP [uBq/kg] Th_230_LPT_CONC_PUMP [uBq/kg] Th_232_TP_CONC_PUMP [pmol/kg] Th_232_SPT_CONC_PUMP [pmol/kg] Th_232_LPT_CONC_PUMP [pmol/kg] Th_234_SPT_CONC_PUMP [mBq/kg] Th_234_LPT_CONC_PUMP [mBq/kg] Po_210_TP_CONC_UWAY [mBq/kg] Pb_210_TP_CONC_UWAY [mBq/kg]

0 rows × 86 columns

Reshape: wide to long

So that we can extract information such as sample methodology, filtering status, units included in Geotraces nuclides name.


source

WideToLongCB

 WideToLongCB (common_coi, nuclides_pattern, var_name='NUCLIDE',
               value_name='VALUE')

Get Geotraces nuclide names as values not column names to extract contained information (unit, sampling method, …).

Exported source
class WideToLongCB(Callback):
    """
    Get Geotraces nuclide names as values not column names 
    to extract contained information (unit, sampling method, ...).
    """
    def __init__(self, common_coi, nuclides_pattern, 
                 var_name='NUCLIDE', value_name='VALUE'): 
        fc.store_attr()
        
    def __call__(self, tfm):
        nuc_of_interest = [c for c in tfm.df.columns if 
                           any(re.match(pattern, c) for pattern in self.nuclides_pattern)]
        tfm.df = pd.melt(tfm.df, id_vars=self.common_coi, value_vars=nuc_of_interest, 
                          var_name=self.var_name, value_name=self.value_name)
        tfm.df.dropna(subset=self.value_name, inplace=True)
tfm = Transformer(df, cbs=[
    SelectColsOfInterestCB(common_coi, nuclides_pattern),
    WideToLongCB(common_coi, nuclides_pattern)
])
df_test = tfm()
print(f'df_test shape: {df_test.shape}')
df_test.head()
df_test shape: (26745, 8)
yyyy-mm-ddThh:mm:ss.sss Longitude [degrees_east] Latitude [degrees_north] Bot. Depth [m] DEPTH [m] BODC Bottle Number:INTEGER NUCLIDE VALUE
9223 2010-10-17T00:13:29 350.33792 38.3271 2827.0 17.8 842525 TRITIUM_D_CONC_BOTTLE [TU] 0.733
9231 2010-10-17T00:13:29 350.33792 38.3271 2827.0 34.7 842528 TRITIUM_D_CONC_BOTTLE [TU] 0.696
9237 2010-10-17T00:13:29 350.33792 38.3271 2827.0 67.5 842531 TRITIUM_D_CONC_BOTTLE [TU] 0.718
9244 2010-10-17T00:13:29 350.33792 38.3271 2827.0 91.9 842534 TRITIUM_D_CONC_BOTTLE [TU] 0.709
9256 2010-10-17T00:13:29 350.33792 38.3271 2827.0 136.6 842540 TRITIUM_D_CONC_BOTTLE [TU] 0.692

Extract

Unit, Filtering status and Sampling method are extracted from column names as embedded in Geotraces data source.

Unit


source

ExtractUnitCB

 ExtractUnitCB (var_name='NUCLIDE')

Extract units from nuclide names.

Exported source
class ExtractUnitCB(Callback):
    """
    Extract units from nuclide names.
    """
    def __init__(self, var_name='NUCLIDE'): 
        fc.store_attr()
        self.unit_col_name = 'UNIT'

    def extract_unit(self, s):
        match = re.search(r'\[(.*?)\]', s)
        return match.group(1) if match else None
        
    def __call__(self, tfm):
        tfm.df[self.unit_col_name] = tfm.df[self.var_name].apply(self.extract_unit)
tfm = Transformer(df, cbs=[
    SelectColsOfInterestCB(common_coi, nuclides_pattern),
    WideToLongCB(common_coi, nuclides_pattern),
    ExtractUnitCB()
])

df_test = tfm()
df_test.head()
yyyy-mm-ddThh:mm:ss.sss Longitude [degrees_east] Latitude [degrees_north] Bot. Depth [m] DEPTH [m] BODC Bottle Number:INTEGER NUCLIDE VALUE UNIT
9223 2010-10-17T00:13:29 350.33792 38.3271 2827.0 17.8 842525 TRITIUM_D_CONC_BOTTLE [TU] 0.733 TU
9231 2010-10-17T00:13:29 350.33792 38.3271 2827.0 34.7 842528 TRITIUM_D_CONC_BOTTLE [TU] 0.696 TU
9237 2010-10-17T00:13:29 350.33792 38.3271 2827.0 67.5 842531 TRITIUM_D_CONC_BOTTLE [TU] 0.718 TU
9244 2010-10-17T00:13:29 350.33792 38.3271 2827.0 91.9 842534 TRITIUM_D_CONC_BOTTLE [TU] 0.709 TU
9256 2010-10-17T00:13:29 350.33792 38.3271 2827.0 136.6 842540 TRITIUM_D_CONC_BOTTLE [TU] 0.692 TU

Filtering status


source

ExtractFilteringStatusCB

 ExtractFilteringStatusCB (phase, var_name='NUCLIDE')

Extract filtering status from nuclide names.

Exported source
phase = {
    'D': {'FILT': 1, 'group': 'SEAWATER'},
    'T': {'FILT': 2, 'group': 'SEAWATER'},
    'TP': {'FILT': 1, 'group': 'SUSPENDED_MATTER'}, 
    'LPT': {'FILT': 1, 'group': 'SUSPENDED_MATTER'},
    'SPT': {'FILT': 1, 'group': 'SUSPENDED_MATTER'}}
Exported source
class ExtractFilteringStatusCB(Callback):
    "Extract filtering status from nuclide names."
    def __init__(self, phase, var_name='NUCLIDE'): 
        fc.store_attr()
        # self.filt_col_name = cdl_cfg()['vars']['suffixes']['filtered']['name']
        self.filt_col_name = 'FILT'

    def extract_filt_status(self, s):
        matched_string = self.match(s)
        return self.phase[matched_string.group(1)][self.filt_col_name] if matched_string else None

    def match(self, s):
        return re.search(r'_(' + '|'.join(self.phase.keys()) + ')_', s)
        
    def extract_group(self, s):
        matched_string = self.match(s)
        return self.phase[matched_string.group(1)]['group'] if matched_string else None
        
    def __call__(self, tfm):
        tfm.df[self.filt_col_name] = tfm.df[self.var_name].apply(self.extract_filt_status)
        tfm.df['GROUP'] = tfm.df[self.var_name].apply(self.extract_group)
tfm = Transformer(df, cbs=[
    SelectColsOfInterestCB(common_coi, nuclides_pattern),
    WideToLongCB(common_coi, nuclides_pattern),
    ExtractUnitCB(),
    ExtractFilteringStatusCB(phase)
])

df_test = tfm()
df_test.head()
yyyy-mm-ddThh:mm:ss.sss Longitude [degrees_east] Latitude [degrees_north] Bot. Depth [m] DEPTH [m] BODC Bottle Number:INTEGER NUCLIDE VALUE UNIT FILT GROUP
9223 2010-10-17T00:13:29 350.33792 38.3271 2827.0 17.8 842525 TRITIUM_D_CONC_BOTTLE [TU] 0.733 TU 1 SEAWATER
9231 2010-10-17T00:13:29 350.33792 38.3271 2827.0 34.7 842528 TRITIUM_D_CONC_BOTTLE [TU] 0.696 TU 1 SEAWATER
9237 2010-10-17T00:13:29 350.33792 38.3271 2827.0 67.5 842531 TRITIUM_D_CONC_BOTTLE [TU] 0.718 TU 1 SEAWATER
9244 2010-10-17T00:13:29 350.33792 38.3271 2827.0 91.9 842534 TRITIUM_D_CONC_BOTTLE [TU] 0.709 TU 1 SEAWATER
9256 2010-10-17T00:13:29 350.33792 38.3271 2827.0 136.6 842540 TRITIUM_D_CONC_BOTTLE [TU] 0.692 TU 1 SEAWATER

Sampling method


source

ExtractSamplingMethodCB

 ExtractSamplingMethodCB (smp_method:dict={'BOTTLE': 1, 'FISH': 18,
                          'PUMP': 14, 'UWAY': 24}, var_name='NUCLIDE',
                          smp_method_col_name='SAMP_MET')

Extract sampling method from nuclide names.

Type Default Details
smp_method dict {‘BOTTLE’: 1, ‘FISH’: 18, ‘PUMP’: 14, ‘UWAY’: 24} Sampling method lookup table
var_name str NUCLIDE Column name containing nuclide names
smp_method_col_name str SAMP_MET Column name for sampling method in output df
Exported source
# To be validated
smp_method = {
    'BOTTLE': 1,
    'FISH': 18,
    'PUMP': 14,
    'UWAY': 24}
Exported source
class ExtractSamplingMethodCB(Callback):
    "Extract sampling method from nuclide names."
    def __init__(self, 
                 smp_method:dict = smp_method, # Sampling method lookup table
                 var_name='NUCLIDE', # Column name containing nuclide names
                 smp_method_col_name = 'SAMP_MET' # Column name for sampling method in output df
                 ): 
        fc.store_attr()

    def extract_smp_method(self, s):
        match = re.search(r'_(' + '|'.join(self.smp_method.keys()) + ') ', s)
        return self.smp_method[match.group(1)] if match else None
        
    def __call__(self, tfm):
        tfm.df[self.smp_method_col_name] = tfm.df[self.var_name].apply(self.extract_smp_method)
tfm = Transformer(df, cbs=[
    SelectColsOfInterestCB(common_coi, nuclides_pattern),
    WideToLongCB(common_coi, nuclides_pattern),
    ExtractUnitCB(),
    ExtractFilteringStatusCB(phase),
    ExtractSamplingMethodCB(smp_method)
])

df_test = tfm()
df_test.head()
yyyy-mm-ddThh:mm:ss.sss Longitude [degrees_east] Latitude [degrees_north] Bot. Depth [m] DEPTH [m] BODC Bottle Number:INTEGER NUCLIDE VALUE UNIT FILT GROUP SAMP_MET
9223 2010-10-17T00:13:29 350.33792 38.3271 2827.0 17.8 842525 TRITIUM_D_CONC_BOTTLE [TU] 0.733 TU 1 SEAWATER 1
9231 2010-10-17T00:13:29 350.33792 38.3271 2827.0 34.7 842528 TRITIUM_D_CONC_BOTTLE [TU] 0.696 TU 1 SEAWATER 1
9237 2010-10-17T00:13:29 350.33792 38.3271 2827.0 67.5 842531 TRITIUM_D_CONC_BOTTLE [TU] 0.718 TU 1 SEAWATER 1
9244 2010-10-17T00:13:29 350.33792 38.3271 2827.0 91.9 842534 TRITIUM_D_CONC_BOTTLE [TU] 0.709 TU 1 SEAWATER 1
9256 2010-10-17T00:13:29 350.33792 38.3271 2827.0 136.6 842540 TRITIUM_D_CONC_BOTTLE [TU] 0.692 TU 1 SEAWATER 1

Remap to MARIS nuclide names

We normalize the nuclide names to MARIS standard for further lookup.


source

RenameNuclideCB

 RenameNuclideCB (nuclides_name, var_name='NUCLIDE')

Remap nuclides name to MARIS standard.

Exported source
nuclides_name = {'TRITIUM': 'h3', 'Pu_239_Pu_240': 'pu239_240_tot'}
Exported source
class RenameNuclideCB(Callback):
    "Remap nuclides name to MARIS standard."
    def __init__(self, nuclides_name, var_name='NUCLIDE'): 
        fc.store_attr()
        self.patterns = ['_D', '_T', '_TP', '_LPT', '_SPT']

    def extract_nuclide_name(self, s):
        match = re.search(r'(.*?)(' + '|'.join(self.patterns) + ')', s)
        return match.group(1) if match else None

    def standardize_name(self, s):
        s = self.extract_nuclide_name(s)
        return self.nuclides_name[s] if s in self.nuclides_name else s.lower().replace('_', '')
        
    def __call__(self, tfm):
        tfm.df[self.var_name] = tfm.df[self.var_name].apply(self.standardize_name)
tfm = Transformer(df, cbs=[
    SelectColsOfInterestCB(common_coi, nuclides_pattern),
    WideToLongCB(common_coi, nuclides_pattern),
    ExtractUnitCB(),
    ExtractFilteringStatusCB(phase),
    ExtractSamplingMethodCB(smp_method),
    RenameNuclideCB(nuclides_name)
])

df_test = tfm()
df_test.head()
yyyy-mm-ddThh:mm:ss.sss Longitude [degrees_east] Latitude [degrees_north] Bot. Depth [m] DEPTH [m] BODC Bottle Number:INTEGER NUCLIDE VALUE UNIT FILT GROUP SAMP_MET
9223 2010-10-17T00:13:29 350.33792 38.3271 2827.0 17.8 842525 h3 0.733 TU 1 SEAWATER 1
9231 2010-10-17T00:13:29 350.33792 38.3271 2827.0 34.7 842528 h3 0.696 TU 1 SEAWATER 1
9237 2010-10-17T00:13:29 350.33792 38.3271 2827.0 67.5 842531 h3 0.718 TU 1 SEAWATER 1
9244 2010-10-17T00:13:29 350.33792 38.3271 2827.0 91.9 842534 h3 0.709 TU 1 SEAWATER 1
9256 2010-10-17T00:13:29 350.33792 38.3271 2827.0 136.6 842540 h3 0.692 TU 1 SEAWATER 1
df_test.NUCLIDE.unique()
array(['h3', 'cs137', 'i129', 'np237', 'pu239', 'pu239_240_tot', 'pu240',
       'u236', 'pa231', 'pb210', 'po210', 'ra224', 'ra226', 'ra228',
       'th230', 'th232', 'th234', 'ac227', 'be7', 'ra223', 'th228'],
      dtype=object)
FEEDBACK TO DATA PROVIDER

Note that several measurements are negative as shown below. Further clarification is needed.

df_test[df_test.VALUE < 0].groupby('NUCLIDE').size()
NUCLIDE
h3       71
pa231     3
th228    22
th230     1
th232     6
dtype: int64

Standardize unit

Here below unit values used by Geotraces data source. We need to remap (sometimes convert) them to MARIS standard.

df_test['UNIT'].unique()
array(['TU', 'uBq/kg', 'atoms/kg', 'mBq/kg', 'pmol/kg'], dtype=object)

source

StandardizeUnitCB

 StandardizeUnitCB (units_lut, unit_col_name='UNIT', var_name='VALUE')

Remap unit to MARIS standard ones and apply conversion where needed.

Exported source
units_lut = {
    'TU': {'id': 7, 'factor': 1},
    'uBq/kg': {'id': 3, 'factor': 1e-6},
    'atoms/kg': {'id': 9, 'factor': 1},
    'mBq/kg': {'id': 3, 'factor': 1e-3},
    'pmol/kg': {'id': 9, 'factor': 1e-12 * AVOGADRO}
    }
Exported source
class StandardizeUnitCB(Callback):
    "Remap unit to MARIS standard ones and apply conversion where needed."
    def __init__(self, 
                 units_lut, 
                 unit_col_name='UNIT',
                 var_name='VALUE'): 
        fc.store_attr()
        # self.unit_col_name = cdl_cfg()['vars']['suffixes']['unit']['name']
        
    def __call__(self, tfm):
        # Convert/rescale values
        tfm.df[self.var_name] *= tfm.df[self.unit_col_name].map(
            {k: v['factor'] for k, v in self.units_lut.items()})
        
        # Match MARIS unit id
        tfm.df[self.unit_col_name] = tfm.df[self.unit_col_name].map(
            {k: v['id'] for k, v in self.units_lut.items()})
tfm = Transformer(df, cbs=[
    SelectColsOfInterestCB(common_coi, nuclides_pattern),
    WideToLongCB(common_coi, nuclides_pattern),
    ExtractUnitCB(),
    ExtractFilteringStatusCB(phase),
    ExtractSamplingMethodCB(smp_method),
    RenameNuclideCB(nuclides_name),
    StandardizeUnitCB(units_lut)
])

df_test = tfm()
print(f'df_test.UNIT.unique(): {df_test.UNIT.unique()}')
df_test.UNIT.unique(): [7 3 9]

Rename common columns

We rename the common columns to MARIS standard names before NetCDF encoding.


source

RenameColumnCB

 RenameColumnCB (lut={'yyyy-mm-ddThh:mm:ss.sss': 'TIME', 'Longitude
                 [degrees_east]': 'LON', 'Latitude [degrees_north]':
                 'LAT', 'DEPTH [m]': 'SMP_DEPTH', 'Bot. Depth [m]':
                 'TOT_DEPTH', 'BODC Bottle Number:INTEGER': 'SMP_ID'})

Renaming variables to MARIS standard names.

Exported source
renaming_rules = {
    'yyyy-mm-ddThh:mm:ss.sss': 'TIME',
    'Longitude [degrees_east]': 'LON',
    'Latitude [degrees_north]': 'LAT',
    'DEPTH [m]': 'SMP_DEPTH',
    'Bot. Depth [m]': 'TOT_DEPTH',
    'BODC Bottle Number:INTEGER': 'SMP_ID'
}
Exported source
class RenameColumnCB(Callback):
    "Renaming variables to MARIS standard names."
    def __init__(self, lut=renaming_rules): fc.store_attr()
    def __call__(self, tfm):
        # lut = self.renaming_rules()
        new_col_names = [self.lut[name] if name in self.lut else name for name in tfm.df.columns]
        tfm.df.columns = new_col_names
tfm = Transformer(df, cbs=[
    SelectColsOfInterestCB(common_coi, nuclides_pattern),
    WideToLongCB(common_coi, nuclides_pattern),
    ExtractUnitCB(),
    ExtractFilteringStatusCB(phase),
    ExtractSamplingMethodCB(smp_method),
    RenameNuclideCB(nuclides_name),
    StandardizeUnitCB(units_lut),
    RenameColumnCB(renaming_rules)
])

df_test = tfm()
df_test.head()
TIME LON LAT TOT_DEPTH SMP_DEPTH SMP_ID NUCLIDE VALUE UNIT FILT GROUP SAMP_MET
9223 2010-10-17T00:13:29 350.33792 38.3271 2827.0 17.8 842525 h3 0.733 7 1 SEAWATER 1
9231 2010-10-17T00:13:29 350.33792 38.3271 2827.0 34.7 842528 h3 0.696 7 1 SEAWATER 1
9237 2010-10-17T00:13:29 350.33792 38.3271 2827.0 67.5 842531 h3 0.718 7 1 SEAWATER 1
9244 2010-10-17T00:13:29 350.33792 38.3271 2827.0 91.9 842534 h3 0.709 7 1 SEAWATER 1
9256 2010-10-17T00:13:29 350.33792 38.3271 2827.0 136.6 842540 h3 0.692 7 1 SEAWATER 1

Unshift longitudes

In Geotraces, longitudes are coded between 0 and 360 in Geotraces. We rescale it between -180 and 180 instead.


source

UnshiftLongitudeCB

 UnshiftLongitudeCB (lon_col_name='LON')

Longitudes are coded between 0 and 360 in Geotraces. We rescale it between -180 and 180 instead.

Exported source
class UnshiftLongitudeCB(Callback):
    "Longitudes are coded between 0 and 360 in Geotraces. We rescale it between -180 and 180 instead."
    def __init__(self, lon_col_name='LON'): 
        fc.store_attr()
    def __call__(self, tfm):
        tfm.df[self.lon_col_name] = tfm.df[self.lon_col_name] - 180
tfm = Transformer(df, cbs=[
    SelectColsOfInterestCB(common_coi, nuclides_pattern),
    WideToLongCB(common_coi, nuclides_pattern),
    ExtractUnitCB(),
    ExtractFilteringStatusCB(phase),
    ExtractSamplingMethodCB(smp_method),
    RenameNuclideCB(nuclides_name),
    StandardizeUnitCB(units_lut),
    RenameColumnCB(renaming_rules),
    UnshiftLongitudeCB()
])

df_test = tfm()
df_test.head()
TIME LON LAT TOT_DEPTH SMP_DEPTH SMP_ID NUCLIDE VALUE UNIT FILT GROUP SAMP_MET
9223 2010-10-17T00:13:29 170.33792 38.3271 2827.0 17.8 842525 h3 0.733 7 1 SEAWATER 1
9231 2010-10-17T00:13:29 170.33792 38.3271 2827.0 34.7 842528 h3 0.696 7 1 SEAWATER 1
9237 2010-10-17T00:13:29 170.33792 38.3271 2827.0 67.5 842531 h3 0.718 7 1 SEAWATER 1
9244 2010-10-17T00:13:29 170.33792 38.3271 2827.0 91.9 842534 h3 0.709 7 1 SEAWATER 1
9256 2010-10-17T00:13:29 170.33792 38.3271 2827.0 136.6 842540 h3 0.692 7 1 SEAWATER 1
np.min(df_test.LON), np.max(df_test.LON)
(-180.0, 179.9986)

Dispatch to groups

We encode each sample type (seawater, suspended matter, …) into a dedicated dataframe as each sample type is further encoded as NetCDF group.


source

DispatchToGroupCB

 DispatchToGroupCB (group_name='GROUP')

Convert to a dictionary of dataframe with sample type (seawater,…) as keys.

Exported source
class DispatchToGroupCB(Callback):
    "Convert to a dictionary of dataframe with sample type (seawater,...) as keys."
    def __init__(self, group_name='GROUP'): 
        fc.store_attr()
        
    def __call__(self, tfm):
        tfm.dfs = dict(tuple(tfm.df.groupby(self.group_name)))
        for key in tfm.dfs:
            tfm.dfs[key] = tfm.dfs[key].drop(self.group_name, axis=1)
df = pd.read_csv(fname_in)

tfm = Transformer(df, cbs=[
    SelectColsOfInterestCB(common_coi, nuclides_pattern),
    WideToLongCB(common_coi, nuclides_pattern),
    ExtractUnitCB(),
    ExtractFilteringStatusCB(phase),
    ExtractSamplingMethodCB(smp_method),
    RenameNuclideCB(nuclides_name),
    StandardizeUnitCB(units_lut),
    RenameColumnCB(renaming_rules),
    UnshiftLongitudeCB(),
    DispatchToGroupCB()
])

dfs_test = tfm()
print(f'dfs_test keys: {dfs_test.keys()}')
print(dfs_test['SEAWATER'].head())
dfs_test keys: dict_keys(['SEAWATER', 'SUSPENDED_MATTER'])
                     TIME        LON      LAT  TOT_DEPTH  SMP_DEPTH  SMP_ID  \
9223  2010-10-17T00:13:29  170.33792  38.3271     2827.0       17.8  842525   
9231  2010-10-17T00:13:29  170.33792  38.3271     2827.0       34.7  842528   
9237  2010-10-17T00:13:29  170.33792  38.3271     2827.0       67.5  842531   
9244  2010-10-17T00:13:29  170.33792  38.3271     2827.0       91.9  842534   
9256  2010-10-17T00:13:29  170.33792  38.3271     2827.0      136.6  842540   

     NUCLIDE  VALUE  UNIT  FILT  SAMP_MET  
9223      h3  0.733     7     1         1  
9231      h3  0.696     7     1         1  
9237      h3  0.718     7     1         1  
9244      h3  0.709     7     1         1  
9256      h3  0.692     7     1         1  

Parse time

We parse the time column to datetime format.


source

ParseTimeCB

 ParseTimeCB ()

Base class for callbacks.

df = pd.read_csv(fname_in)

tfm = Transformer(df, cbs=[
    SelectColsOfInterestCB(common_coi, nuclides_pattern),
    WideToLongCB(common_coi, nuclides_pattern),
    ExtractUnitCB(),
    ExtractFilteringStatusCB(phase),
    ExtractSamplingMethodCB(smp_method),
    RenameNuclideCB(nuclides_name),
    StandardizeUnitCB(units_lut),
    RenameColumnCB(renaming_rules),
    UnshiftLongitudeCB(),
    DispatchToGroupCB(),
    ParseTimeCB()
])

dfs_test = tfm()
print('time data type: ', dfs_test['SEAWATER'].TIME.dtype)
print(dfs_test['SEAWATER'].head())
time data type:  datetime64[ns]
                    TIME        LON      LAT  TOT_DEPTH  SMP_DEPTH  SMP_ID  \
9223 2010-10-17 00:13:29  170.33792  38.3271     2827.0       17.8  842525   
9231 2010-10-17 00:13:29  170.33792  38.3271     2827.0       34.7  842528   
9237 2010-10-17 00:13:29  170.33792  38.3271     2827.0       67.5  842531   
9244 2010-10-17 00:13:29  170.33792  38.3271     2827.0       91.9  842534   
9256 2010-10-17 00:13:29  170.33792  38.3271     2827.0      136.6  842540   

     NUCLIDE  VALUE  UNIT  FILT  SAMP_MET  
9223      h3  0.733     7     1         1  
9231      h3  0.696     7     1         1  
9237      h3  0.718     7     1         1  
9244      h3  0.709     7     1         1  
9256      h3  0.692     7     1         1  

Encode time (seconds since …)

Then encode it to seconds since 1970-01-01 as specified in MARIS NetCDF CDL and template.

df = pd.read_csv(fname_in)

tfm = Transformer(df, cbs=[
    SelectColsOfInterestCB(common_coi, nuclides_pattern),
    WideToLongCB(common_coi, nuclides_pattern),
    ExtractUnitCB(),
    ExtractFilteringStatusCB(phase),
    ExtractSamplingMethodCB(smp_method),
    RenameNuclideCB(nuclides_name),
    StandardizeUnitCB(units_lut),
    RenameColumnCB(renaming_rules),
    UnshiftLongitudeCB(),
    DispatchToGroupCB(),
    ParseTimeCB(),
    EncodeTimeCB()
])

dfs_test = tfm()['SEAWATER']
dfs_test.head()
TIME LON LAT TOT_DEPTH SMP_DEPTH SMP_ID NUCLIDE VALUE UNIT FILT SAMP_MET
9223 1287274409 170.33792 38.3271 2827.0 17.8 842525 h3 0.733 7 1 1
9231 1287274409 170.33792 38.3271 2827.0 34.7 842528 h3 0.696 7 1 1
9237 1287274409 170.33792 38.3271 2827.0 67.5 842531 h3 0.718 7 1 1
9244 1287274409 170.33792 38.3271 2827.0 91.9 842534 h3 0.709 7 1 1
9256 1287274409 170.33792 38.3271 2827.0 136.6 842540 h3 0.692 7 1 1

Sanitize coordinates

df = pd.read_csv(fname_in)

tfm = Transformer(df, cbs=[
    SelectColsOfInterestCB(common_coi, nuclides_pattern),
    WideToLongCB(common_coi, nuclides_pattern),
    ExtractUnitCB(),
    ExtractFilteringStatusCB(phase),
    ExtractSamplingMethodCB(smp_method),
    RenameNuclideCB(nuclides_name),
    StandardizeUnitCB(units_lut),
    RenameColumnCB(renaming_rules),
    UnshiftLongitudeCB(),
    DispatchToGroupCB(),
    ParseTimeCB(),
    EncodeTimeCB(),
    SanitizeLonLatCB()
])
dfs_test = tfm()
dfs_test['SEAWATER'].head()
TIME LON LAT TOT_DEPTH SMP_DEPTH SMP_ID NUCLIDE VALUE UNIT FILT SAMP_MET
9223 1287274409 170.33792 38.3271 2827.0 17.8 842525 h3 0.733 7 1 1
9231 1287274409 170.33792 38.3271 2827.0 34.7 842528 h3 0.696 7 1 1
9237 1287274409 170.33792 38.3271 2827.0 67.5 842531 h3 0.718 7 1 1
9244 1287274409 170.33792 38.3271 2827.0 91.9 842534 h3 0.709 7 1 1
9256 1287274409 170.33792 38.3271 2827.0 136.6 842540 h3 0.692 7 1 1

Remap nuclides name to id

All MARIS lookup tables are embeded in the NetCDF file as enumeration types. Data itself is encoded as integer for the sake of space efficiency. We need to remap it to the corresponding MARIS nuclide id.

Exported source
lut_nuclides = lambda: get_lut(lut_path(), 'dbo_nuclide.xlsx', 
                               key='nc_name', value='nuclide_id', reverse=False)
lut_nuclides()
{'NOT APPLICABLE': -1,
 'NOT AVAILABLE': 0,
 'h3': 1,
 'be7': 2,
 'c14': 3,
 'k40': 4,
 'cr51': 5,
 'mn54': 6,
 'co57': 7,
 'co58': 8,
 'co60': 9,
 'zn65': 10,
 'sr89': 11,
 'sr90': 12,
 'zr95': 13,
 'nb95': 14,
 'tc99': 15,
 'ru103': 16,
 'ru106': 17,
 'rh106': 18,
 'ag106m': 19,
 'ag108': 20,
 'ag108m': 21,
 'ag110m': 22,
 'sb124': 23,
 'sb125': 24,
 'te129m': 25,
 'i129': 28,
 'i131': 29,
 'cs127': 30,
 'cs134': 31,
 'cs137': 33,
 'ba140': 34,
 'la140': 35,
 'ce141': 36,
 'ce144': 37,
 'pm147': 38,
 'eu154': 39,
 'eu155': 40,
 'pb210': 41,
 'pb212': 42,
 'pb214': 43,
 'bi207': 44,
 'bi211': 45,
 'bi214': 46,
 'po210': 47,
 'rn220': 48,
 'rn222': 49,
 'ra223': 50,
 'ra224': 51,
 'ra225': 52,
 'ra226': 53,
 'ra228': 54,
 'ac228': 55,
 'th227': 56,
 'th228': 57,
 'th232': 59,
 'th234': 60,
 'pa234': 61,
 'u234': 62,
 'u235': 63,
 'u238': 64,
 'np237': 65,
 'np239': 66,
 'pu238': 67,
 'pu239': 68,
 'pu240': 69,
 'pu241': 70,
 'am240': 71,
 'am241': 72,
 'cm242': 73,
 'cm243': 74,
 'cm244': 75,
 'cs134_137_tot': 76,
 'pu239_240_tot': 77,
 'pu239_240_iii_iv_tot': 78,
 'pu239_240_v_vi_tot': 79,
 'cm243_244_tot': 80,
 'pu238_pu239_240_tot_ratio': 81,
 'am241_pu239_240_tot_ratio': 82,
 'cs137_134_ratio': 83,
 'cd109': 84,
 'eu152': 85,
 'fe59': 86,
 'gd153': 87,
 'ir192': 88,
 'pu238_240_tot': 89,
 'rb86': 90,
 'sc46': 91,
 'sn113': 92,
 'sn117m': 93,
 'tl208': 94,
 'mo99': 95,
 'tc99m': 96,
 'ru105': 97,
 'te129': 98,
 'te132': 99,
 'i132': 100,
 'i135': 101,
 'cs136': 102,
 'tbeta': 103,
 'talpha': 104,
 'i133': 105,
 'th230': 106,
 'pa231': 107,
 'u236': 108,
 'ag111': 109,
 'in116m': 110,
 'te123m': 111,
 'sb127': 112,
 'ba133': 113,
 'ce139': 114,
 'tl201': 116,
 'hg203': 117,
 'na22': 122,
 'pa234m': 123,
 'am243': 124,
 'se75': 126,
 'sr85': 127,
 'y88': 128,
 'ce140': 129,
 'bi212': 130,
 'u236_238_ratio': 131,
 'i125': 132,
 'ba137m': 133,
 'u232': 134,
 'pa233': 135,
 'ru106_rh106_tot': 136,
 'tu': 137,
 'tbeta40k': 138,
 'fe55': 139,
 'ce144_pr144_tot': 140,
 'pu240_pu239_ratio': 141,
 'u233': 142,
 'pu239_242_tot': 143,
 'ac227': 144}
df = pd.read_csv(fname_in)

tfm = Transformer(df, cbs=[
    SelectColsOfInterestCB(common_coi, nuclides_pattern),
    WideToLongCB(common_coi, nuclides_pattern),
    ExtractUnitCB(),
    ExtractFilteringStatusCB(phase),
    ExtractSamplingMethodCB(smp_method),
    RenameNuclideCB(nuclides_name),
    StandardizeUnitCB(units_lut),
    RenameColumnCB(renaming_rules),
    UnshiftLongitudeCB(),
    DispatchToGroupCB(),
    ParseTimeCB(),
    EncodeTimeCB(),
    SanitizeLonLatCB(),
    RemapCB(fn_lut=lut_nuclides, col_remap='NUCLIDE', col_src='NUCLIDE')
])

dfs_test = tfm()
dfs_test['SEAWATER'].NUCLIDE.unique()
Group BIOTA not found in the dataframes.
Group SEDIMENT not found in the dataframes.
array([  1,  33,  28,  65,  68,  77,  69, 108, 107,  41,  47,  51,  53,
        54, 106,  59,  60, 144,   2,  50,  57])

NetCDF encoder

Example change logs

df = pd.read_csv(fname_in)

tfm = Transformer(df, cbs=[
    SelectColsOfInterestCB(common_coi, nuclides_pattern),
    WideToLongCB(common_coi, nuclides_pattern),
    ExtractUnitCB(),
    ExtractFilteringStatusCB(phase),
    ExtractSamplingMethodCB(smp_method),
    RenameNuclideCB(nuclides_name),
    StandardizeUnitCB(units_lut),
    RenameColumnCB(renaming_rules),
    UnshiftLongitudeCB(),
    DispatchToGroupCB(),
    ParseTimeCB(),
    EncodeTimeCB(),
    SanitizeLonLatCB(),
    RemapCB(fn_lut=lut_nuclides, col_remap='NUCLIDE', col_src='NUCLIDE')
])

tfm();
Group BIOTA not found in the dataframes.
Group SEDIMENT not found in the dataframes.
tfm.logs
['Select columns of interest.',
 '\n    Get Geotraces nuclide names as values not column names \n    to extract contained information (unit, sampling method, ...).\n    ',
 '\n    Extract units from nuclide names.\n    ',
 'Extract filtering status from nuclide names.',
 'Extract sampling method from nuclide names.',
 'Remap nuclides name to MARIS standard.',
 'Remap unit to MARIS standard ones and apply conversion where needed.',
 'Renaming variables to MARIS standard names.',
 'Longitudes are coded between 0 and 360 in Geotraces. We rescale it between -180 and 180 instead.',
 'Convert to a dictionary of dataframe with sample type (seawater,...) as keys.',
 'Encode time as seconds since epoch.',
 'Drop rows with invalid longitude & latitude values. Convert `,` separator to `.` separator.',
 "Remap values from 'NUCLIDE' to 'NUCLIDE' for groups: dict_keys(['BIOTA', 'SEAWATER', 'SEDIMENT', 'SUSPENDED_MATTER'])."]

Feed global attributes


source

get_attrs

 get_attrs (tfm, zotero_key, kw=['oceanography', 'Earth Science > Oceans >
            Ocean Chemistry> Radionuclides', 'Earth Science > Human
            Dimensions > Environmental Impacts > Nuclear Radiation
            Exposure', 'Earth Science > Oceans > Ocean Chemistry > Ocean
            Tracers, Earth Science > Oceans > Marine Sediments', 'Earth
            Science > Oceans > Ocean Chemistry, Earth Science > Oceans >
            Sea Ice > Isotopes', 'Earth Science > Oceans > Water Quality >
            Ocean Contaminants', 'Earth Science > Biological
            Classification > Animals/Vertebrates > Fish', 'Earth Science >
            Biosphere > Ecosystems > Marine Ecosystems', 'Earth Science >
            Biological Classification > Animals/Invertebrates > Mollusks',
            'Earth Science > Biological Classification >
            Animals/Invertebrates > Arthropods > Crustaceans', 'Earth
            Science > Biological Classification > Plants > Macroalgae
            (Seaweeds)'])

Retrieve global attributes from Geotraces dataset.

Exported source
def get_attrs(tfm, zotero_key, kw=kw):
    "Retrieve global attributes from Geotraces dataset."
    return GlobAttrsFeeder(tfm.dfs, cbs=[
        BboxCB(),
        DepthRangeCB(),
        TimeRangeCB(),
        ZoteroCB(zotero_key, cfg=cfg()),
        KeyValuePairCB('keywords', ', '.join(kw)),
        KeyValuePairCB('publisher_postprocess_logs', ', '.join(tfm.logs))
        ])()
zotero_metadata = get_attrs(tfm, zotero_key=zotero_key, kw=kw)
print('Keys: ', zotero_metadata.keys())
print('Title: ', zotero_metadata['title'])
Keys:  dict_keys(['geospatial_lat_min', 'geospatial_lat_max', 'geospatial_lon_min', 'geospatial_lon_max', 'geospatial_bounds', 'geospatial_vertical_max', 'geospatial_vertical_min', 'time_coverage_start', 'time_coverage_end', 'id', 'title', 'summary', 'creator_name', 'keywords', 'publisher_postprocess_logs'])
Title:  The GEOTRACES Intermediate Data Product 2017

Encoding


source

encode

 encode (fname_in, fname_out, **kwargs)
encode(fname_in, fname_out, verbose=False)
Group BIOTA not found in the dataframes.
Group SEDIMENT not found in the dataframes.

TODO:

  • Add salinity, temperature, oxygen variables
decode(fname_in=fname_out, verbose=True)
Saved SEAWATER to ../../_data/output/190-geotraces-2021_SEAWATER.csv
Saved SUSPENDED_MATTER to ../../_data/output/190-geotraces-2021_SUSPENDED_MATTER.csv