This data pipeline, known as a “handler” in Marisco terminology, is designed to clean, standardize, and encode HELCOM data into NetCDF format. The handler processes raw HELCOM data, applying various transformations and lookups to align it with MARIS data standards.

Key functions of this handler:

This handler is a crucial component in the Marisco data processing workflow, ensuring HELCOM data is properly integrated into the MARIS database.

Note: Additionally, an optional encoder (pipeline) is provided below to process data into a .csv format compatible with the MARIS master database. This feature is maintained for legacy purposes, as data ingestion was previously performed using OpenRefine.

Tip

For new MARIS users, please refer to Understanding MARIS Data Formats (NetCDF and Open Refine) for detailed information.

The present notebook pretends to be an instance of Literate Programming in the sense that it is a narrative that includes code snippets that are interspersed with explanations. When a function or a class needs to be exported in a dedicated python module (in our case marisco/handlers/helcom.py) the code snippet is added to the module using #| exports as provided by the wonderful nbdev library.

Configuration & file paths

  • fname_in: path to the folder containing the HELCOM data in CSV format. The path can be defined as a relative path.

  • fname_out_nc: path and filename for the NetCDF output.The path can be defined as a relative path.

  • fname_out_csv: path and filename for the Open Refine csv output.The path can be defined as a relative path.

  • Zotero key: used to retrieve attributes related to the dataset from Zotero. The MARIS datasets include a library available on Zotero.

  • ref_id: refers to the location in Archive of the Zotero library.

Exported source
fname_in = '../../_data/accdb/mors/csv'
fname_out_nc = '../../_data/output/100-HELCOM-MORS-2024.nc'
fname_out_csv = '../../_data/output/100-HELCOM-MORS-2024.csv'
zotero_key ='26VMZZ2Q' # HELCOM MORS zotero key
ref_id = 100 # HELCOM MORS reference id as defined by MARIS

Load data

Helcom MORS (Monitoring of Radioactive Substances in the Baltic Sea) data is provided as a Microsoft Access database. Mdbtools can be used to convert the tables of the Microsoft Access database to .csv files on Unix-like OS.

Example steps:

  1. Download data

  2. Install mdbtools via VScode Terminal:

    sudo apt-get -y install mdbtools
  3. Install unzip via VScode Terminal:

    sudo apt-get -y install unzip
  4. In VS Code terminal (for instance), navigate to the marisco data folder:

    cd /home/marisco/downloads/marisco/_data/accdb/mors_19840101_20211231
  5. Unzip MORS_ENVIRONMENT.zip:

    unzip MORS_ENVIRONMENT.zip 
  6. Run preprocess.sh to generate the required data files:

    ./preprocess.sh MORS_ENVIRONMENT.zip
  7. Content of preprocess.sh script:

    #!/bin/bash
    
    # Example of use: ./preprocess.sh MORS_ENVIRONMENT.zip
    unzip $1
    dbname=$(ls *.accdb)
    mkdir csv
    for table in $(mdb-tables -1 "$dbname"); do
        echo "Export table $table"
        mdb-export "$dbname" "$table" > "csv/$table.csv"
    done

Once converted to .csv files, the data is ready to be loaded into a dictionary of dataframes.


source

load_data

 load_data (src_dir:str|pathlib.Path, smp_types:list=[('SEA', 'seawater'),
            ('SED', 'sediment'), ('BIO', 'biota')])

Load HELCOM data and return the data in a dictionary of dataframes with the dictionary key as the sample type.

Type Default Details
src_dir str | pathlib.Path The directory where the source CSV files are located
smp_types list [(‘SEA’, ‘seawater’), (‘SED’, ‘sediment’), (‘BIO’, ‘biota’)] A list of tuples, each containing the file prefix and the corresponding sample type name
Returns Dict A dictionary with sample types as keys and their corresponding dataframes as values
Exported source
default_smp_types = [('SEA', 'seawater'), ('SED', 'sediment'), ('BIO', 'biota')]
Exported source
def load_data(src_dir: str|Path, # The directory where the source CSV files are located
              smp_types: list=default_smp_types # A list of tuples, each containing the file prefix and the corresponding sample type name
             ) -> Dict[str, pd.DataFrame]: # A dictionary with sample types as keys and their corresponding dataframes as values
    "Load HELCOM data and return the data in a dictionary of dataframes with the dictionary key as the sample type."
    src_path = Path(src_dir)
    
    def load_and_merge(file_prefix: str) -> pd.DataFrame:
        try:
            df_meas = pd.read_csv(src_path / f'{file_prefix}02.csv')
            df_smp = pd.read_csv(src_path / f'{file_prefix}01.csv')
            return pd.merge(df_meas, df_smp, on='KEY', how='left')
        except FileNotFoundError as e:
            print(f"Error loading files for {file_prefix}: {e}")
            return pd.DataFrame()  # Return an empty DataFrame if files are not found
    
    return {smp_type: load_and_merge(file_prefix) for file_prefix, smp_type in smp_types}

dfs is a dictionary of dataframes created from the Helcom dataset located at the path fname_in. The data to be included in each dataframe is sorted by sample type. Each dictionary is defined with a key equal to the sample type.

dfs = load_data(fname_in)

#|eval: false
dfs = load_data(fname_in)
print('keys/sample types: ', dfs.keys())

for key in dfs.keys():
    print(f'{key} columns: ', dfs[key].columns)
keys/sample types:  dict_keys(['seawater', 'sediment', 'biota'])
seawater columns:  Index(['KEY', 'NUCLIDE', 'METHOD', '< VALUE_Bq/m³', 'VALUE_Bq/m³', 'ERROR%_m³',
       'DATE_OF_ENTRY_x', 'COUNTRY', 'LABORATORY', 'SEQUENCE', 'DATE', 'YEAR',
       'MONTH', 'DAY', 'STATION', 'LATITUDE (ddmmmm)', 'LATITUDE (dddddd)',
       'LONGITUDE (ddmmmm)', 'LONGITUDE (dddddd)', 'TDEPTH', 'SDEPTH', 'SALIN',
       'TTEMP', 'FILT', 'MORS_SUBBASIN', 'HELCOM_SUBBASIN', 'DATE_OF_ENTRY_y'],
      dtype='object')
sediment columns:  Index(['KEY', 'NUCLIDE', 'METHOD', '< VALUE_Bq/kg', 'VALUE_Bq/kg', 'ERROR%_kg',
       '< VALUE_Bq/m²', 'VALUE_Bq/m²', 'ERROR%_m²', 'DATE_OF_ENTRY_x',
       'COUNTRY', 'LABORATORY', 'SEQUENCE', 'DATE', 'YEAR', 'MONTH', 'DAY',
       'STATION', 'LATITUDE (ddmmmm)', 'LATITUDE (dddddd)',
       'LONGITUDE (ddmmmm)', 'LONGITUDE (dddddd)', 'DEVICE', 'TDEPTH',
       'UPPSLI', 'LOWSLI', 'AREA', 'SEDI', 'OXIC', 'DW%', 'LOI%',
       'MORS_SUBBASIN', 'HELCOM_SUBBASIN', 'SUM_LINK', 'DATE_OF_ENTRY_y'],
      dtype='object')
biota columns:  Index(['KEY', 'NUCLIDE', 'METHOD', '< VALUE_Bq/kg', 'VALUE_Bq/kg', 'BASIS',
       'ERROR%', 'NUMBER', 'DATE_OF_ENTRY_x', 'COUNTRY', 'LABORATORY',
       'SEQUENCE', 'DATE', 'YEAR', 'MONTH', 'DAY', 'STATION',
       'LATITUDE ddmmmm', 'LATITUDE dddddd', 'LONGITUDE ddmmmm',
       'LONGITUDE dddddd', 'SDEPTH', 'RUBIN', 'BIOTATYPE', 'TISSUE', 'NO',
       'LENGTH', 'WEIGHT', 'DW%', 'LOI%', 'MORS_SUBBASIN', 'HELCOM_SUBBASIN',
       'DATE_OF_ENTRY_y'],
      dtype='object')

Add sample type column

The sample type (seawater, biota, sediment, …) as defined in the configs.ipynb are encoded group names in NetCDF produced. Addition of sample type ids into individual dataframes is done using the AddSampleTypeIdColumnCB callback for legacy purposes (i.e. Open Refine output).

dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[AddSampleTypeIdColumnCB(),
                            CompareDfsAndTfmCB(dfs)
                            ])

print(tfm()['seawater'][['KEY', 'samptype_id']].head())
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
            KEY  samptype_id
0  WKRIL2012003            1
1  WKRIL2012004            1
2  WKRIL2012005            1
3  WKRIL2012006            1
4  WKRIL2012007            1
                                                    seawater  sediment  biota
Number of rows in dfs                                  21216     39817  15827
Number of rows in tfm.dfs                              21216     39817  15827
Number of dropped rows                                     0         0      0
Number of rows in tfm.dfs + Number of dropped rows     21216     39817  15827 

Normalize nuclide names

Lower & strip nuclide names

Tip

FEEDBACK TO DATA PROVIDER: Some nuclide names contain one or multiple trailing spaces.

This is demonstrated below for the NUCLIDE column:

df = get_unique_across_dfs(load_data(fname_in), 'NUCLIDE', as_df=True, include_nchars=True)
df['stripped_chars'] = df['value'].str.strip().str.replace(' ', '').str.len()
print(df[df['n_chars'] != df['stripped_chars']])
    index      value  n_chars  stripped_chars
6       6    TC99           7               4
16     16     CS137         6               5
33     33  CS137            9               5
41     41   CS134           8               5
43     43     SR90          6               4
46     46      SR90         5               4
48     48   K40             8               3
49     49   PU238           8               5
64     64   CO60            8               4
65     65   AM241           8               5
66     66   CS137           8               5
83     83   SR90            8               4
86     86    SR90           7               4

To fix this issue, we use the LowerStripNameCB callback. For each dataframe in the dictionary of dataframes, it corrects the nuclide name by converting it lowercase, striping any leading or trailing whitespace(s) and ensuring the number comes before letters (e.g. 137cs).

dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='NUCLIDE')])

for key in tfm().keys():
    print(f'{key} nuclides: ')
    print(tfm()[key]['NUCLIDE'].unique())
seawater nuclides: 
['cs137' 'sr90' 'h3' 'cs134' 'pu238' 'pu239240' 'am241' 'cm242' 'cm244'
 'tc99' 'k40' 'ru103' 'sr89' 'sb125' 'nb95' 'ru106' 'zr95' 'ag110m'
 'cm243244' 'ba140' 'ce144' 'u234' 'u238' 'co60' 'pu239' 'pb210' 'po210'
 'np237' 'pu240' 'mn54']
sediment nuclides: 
['ra226' 'cs137' 'ra228' 'k40' 'sr90' 'cs134137' 'cs134' 'pu239240'
 'pu238' 'co60' 'ru103' 'ru106' 'sb125' 'ag110m' 'ce144' 'am241' 'be7'
 'th228' 'pb210' 'co58' 'mn54' 'zr95' 'ba140' 'po210' 'ra224' 'nb95'
 'pu238240' 'pu241' 'pu239' 'eu155' 'ir192' 'th232' 'cd109' 'sb124' 'zn65'
 'th234' 'tl208' 'pb212' 'pb214' 'bi214' 'ac228' 'ra223' 'u235' 'bi212']
biota nuclides: 
['cs134' 'k40' 'co60' 'cs137' 'sr90' 'ag108m' 'mn54' 'co58' 'ag110m'
 'zn65' 'sb125' 'pu239240' 'ru106' 'be7' 'ce144' 'pb210' 'po210' 'sb124'
 'sr89' 'zr95' 'te129m' 'ru103' 'nb95' 'ce141' 'la140' 'i131' 'ba140'
 'pu238' 'u235' 'bi214' 'pb214' 'pb212' 'tl208' 'ac228' 'ra223' 'eu155'
 'ra226' 'gd153' 'sn113' 'fe59' 'tc99' 'co57' 'sn117m' 'eu152' 'sc46'
 'rb86' 'ra224' 'th232' 'cs134137' 'am241' 'ra228' 'th228' 'k-40' 'cs138'
 'cs139' 'cs140' 'cs141' 'cs142' 'cs143' 'cs144' 'cs145' 'cs146']

Remap nuclide names to MARIS data formats

We below map nuclide names used by HELCOM to the MARIS standard nuclide names.

Remapping data provider nomenclatures into MARIS standards is one recurrent operation and is done in a semi-automated manner according to the following pattern:

  1. Inspect data provider nomenclature:
  2. Match automatically against MARIS nomenclature (using a fuzzy matching algorithm);
  3. Fix potential mismatches;
  4. Apply the lookup table to the dataframe.

As now on, we will use this pattern to remap the HELCOM data provider nomenclatures into MARIS standards and name it for the sake of brevity IMFA (Inspect, Match, Fix, Apply).

The unique values of the data provider nuclide names. The get_unique_across_dfs is a utility function allowing to retrieve unique values of a specific column across all dataframes (please remind that we have one dataframe per sample type - biota, …).

dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='NUCLIDE')])
dfs_output = tfm()

get_unique_across_dfs(dfs_output, col_name='NUCLIDE', as_df=True).head(5)
index value
0 0 sb125
1 1 ce141
2 2 gd153
3 3 ra226
4 4 ra228

Let’s now create an instance of a fuzzy matching algorithm Remapper:

remapper = Remapper(provider_lut_df=get_unique_across_dfs(dfs_output, col_name='NUCLIDE', as_df=True),
                    maris_lut_fn=nuc_lut_path,
                    maris_col_id='nuclide_id',
                    maris_col_name='nc_name',
                    provider_col_to_match='value',
                    provider_col_key='value',
                    fname_cache='nuclides_helcom.pkl')

And try to match HELCOM to MARIS nuclide names as automatically as possible. The match_score column allows to assess the results:

remapper.generate_lookup_table(as_df=True)
remapper.select_match(match_score_threshold=1)
Processing: 100%|██████████| 77/77 [00:01<00:00, 39.99it/s]
matched_maris_name source_name match_score
source_key
cm243244 cm244 cm243244 3
cs134137 cs137 cs134137 3
pu238240 pu240 pu238240 3
pu239240 pu240 pu239240 3
cs143 cs127 cs143 2
cs145 cs136 cs145 2
cs142 ce144 cs142 2
cs140 ce140 cs140 1
k-40 k40 k-40 1
cs144 ce144 cs144 1
cs141 ce141 cs141 1
cs138 cs137 cs138 1
cs139 ce139 cs139 1
cs146 cs136 cs146 1

We then manually inspect the remaining unmatched names and create a fixes table to map them to the correct MARIS standards:

Exported source
fixes_nuclide_names = {
    'cs134137': 'cs134_137_tot',
    'cm243244': 'cm243_244_tot',
    'pu239240': 'pu239_240_tot',
    'pu238240': 'pu238_240_tot',
    'cs143': 'cs137',
    'cs145': 'cs137',
    'cs142': 'cs137',
    'cs141': 'cs137',
    'cs144': 'cs137',
    'k-40': 'k40',
    'cs140': 'cs137',
    'cs146': 'cs137',
    'cs139': 'cs137',
    'cs138': 'cs137'
    }

Let’s try to match again but this time we use the fixes_nuclide_names to map the nuclide names to the MARIS standards:

remapper.generate_lookup_table(as_df=True, fixes=fixes_nuclide_names)
fc.test_eq(len(remapper.select_match(match_score_threshold=1)), 0)
Processing: 100%|██████████| 77/77 [00:01<00:00, 51.81it/s]

Test passes! We can now create a callback RemapNuclideNameCB to remap the nuclide names. Note that we pass overwrite=False to the Remapper constructor to now use the cached version.


source

RemapNuclideNameCB

 RemapNuclideNameCB (fn_lut:Callable)

Remap data provider nuclide names to MARIS nuclide names.

Type Details
fn_lut Callable Function that returns the lookup table dictionary
Exported source
# Create a lookup table for nuclide names
lut_nuclides = lambda df: Remapper(provider_lut_df=df,
                                   maris_lut_fn=nuc_lut_path,
                                   maris_col_id='nuclide_id',
                                   maris_col_name='nc_name',
                                   provider_col_to_match='value',
                                   provider_col_key='value',
                                   fname_cache='nuclides_helcom.pkl').generate_lookup_table(fixes=fixes_nuclide_names, 
                                                                                            as_df=False, overwrite=False)
Exported source
class RemapNuclideNameCB(Callback):
    "Remap data provider nuclide names to MARIS nuclide names."
    def __init__(self, 
                 fn_lut: Callable # Function that returns the lookup table dictionary
                ):
        fc.store_attr()

    def __call__(self, tfm: Transformer):
        df_uniques = get_unique_across_dfs(tfm.dfs, col_name='NUCLIDE', as_df=True)
        lut = {k: v.matched_maris_name for k, v in self.fn_lut(df_uniques).items()}    
        for k in tfm.dfs.keys():
            tfm.dfs[k]['NUCLIDE'] = tfm.dfs[k]['NUCLIDE'].replace(lut)

Let’s see it in action, along with the RemapRdnNameCB callback:

dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='NUCLIDE'),
                            RemapNuclideNameCB(lut_nuclides)
                            ])
dfs_out = tfm()

# For instance
dfs_out['biota'].NUCLIDE.unique()
array(['cs134', 'k40', 'co60', 'cs137', 'sr90', 'ag108m', 'mn54', 'co58',
       'ag110m', 'zn65', 'sb125', 'pu239_240_tot', 'ru106', 'be7',
       'ce144', 'pb210', 'po210', 'sb124', 'sr89', 'zr95', 'te129m',
       'ru103', 'nb95', 'ce141', 'la140', 'i131', 'ba140', 'pu238',
       'u235', 'bi214', 'pb214', 'pb212', 'tl208', 'ac228', 'ra223',
       'eu155', 'ra226', 'gd153', 'sn113', 'fe59', 'tc99', 'co57',
       'sn117m', 'eu152', 'sc46', 'rb86', 'ra224', 'th232',
       'cs134_137_tot', 'am241', 'ra228', 'th228'], dtype=object)

Add Nuclide Id column

The nuclide_id column is added to the dataframe for legacy reasons (again Open Refine output).

dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='NUCLIDE'),
                            RemapNuclideNameCB(lut_nuclides),
                            AddNuclideIdColumnCB(col_value='NUCLIDE')
                            ])
dfs_out = tfm()

# For instance
dfs_out['biota'][['NUCLIDE', 'nuclide_id']]
NUCLIDE nuclide_id
0 cs134 31
1 k40 4
2 co60 9
3 cs137 33
4 cs134 31
... ... ...
15822 k40 4
15823 cs137 33
15824 be7 2
15825 k40 4
15826 cs137 33

15827 rows × 2 columns

Standardize Time

Tip

FEEDBACK TO DATA PROVIDER: Time/date is provide in the DATE, YEAR , MONTH, DAY columns. Note that the DATE contains missing values as indicated below. When missing, we fallback on the YEAR, MONTH, DAY columns. Note also that sometimes DAY and MONTH contain 0. In this case we systematically set them to 1.

dfs = load_data(fname_in)
for key in dfs.keys():
    print(f'{key} DATE null values: ', dfs[key]['DATE'].isna().sum())
seawater DATE null values:  502
sediment DATE null values:  741
biota DATE null values:  72

source

ParseTimeCB

 ParseTimeCB ()

Parse and standardize time information in the dataframe.

Exported source
class ParseTimeCB(Callback):
    "Parse and standardize time information in the dataframe."
    def __call__(self, tfm: Transformer):
        for df in tfm.dfs.values():
            self._process_dates(df)
            self._define_beg_period(df)

    def _process_dates(self, df: pd.DataFrame) -> None:
        "Process and correct date and time information in the DataFrame."
        df['time'] = self._parse_date(df)
        self._handle_missing_dates(df)
        self._fill_missing_time(df)

    def _parse_date(self, df: pd.DataFrame) -> pd.Series:
        "Parse the DATE column if present."
        return pd.to_datetime(df['DATE'], format='%m/%d/%y %H:%M:%S', errors='coerce')

    def _handle_missing_dates(self, df: pd.DataFrame):
        "Handle cases where DAY or MONTH is 0 or missing."
        df.loc[df["DAY"] == 0, "DAY"] = 1
        df.loc[df["MONTH"] == 0, "MONTH"] = 1
        
        missing_day_month = (df["DAY"].isna()) & (df["MONTH"].isna()) & (df["YEAR"].notna())
        df.loc[missing_day_month, ["DAY", "MONTH"]] = 1

    def _fill_missing_time(self, df: pd.DataFrame) -> None:
        "Fill missing time values using YEAR, MONTH, and DAY columns."
        missing_time = df['time'].isna()
        df.loc[missing_time, 'time'] = pd.to_datetime(
            df.loc[missing_time, ['YEAR', 'MONTH', 'DAY']], 
            format='%Y%m%d', 
            errors='coerce'
        )

    def _define_beg_period(self, df: pd.DataFrame) -> None:
        "Create a standardized date representation for Open Refine."
        df['begperiod'] = df['time']

Apply the transformer for callbacks ParseTimeCB. Then, print the begperiod and time data for seawater.

dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[ParseTimeCB(),
                            CompareDfsAndTfmCB(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
print(tfm.dfs['seawater'][['begperiod','time']])
                                                    seawater  sediment  biota
Number of rows in dfs                                  21216     39817  15827
Number of rows in tfm.dfs                              21216     39817  15827
Number of dropped rows                                     0         0      0
Number of rows in tfm.dfs + Number of dropped rows     21216     39817  15827 

       begperiod       time
0     2012-05-23 2012-05-23
1     2012-05-23 2012-05-23
2     2012-06-17 2012-06-17
3     2012-05-24 2012-05-24
4     2012-05-24 2012-05-24
...          ...        ...
21211 2021-10-15 2021-10-15
21212 2021-11-04 2021-11-04
21213 2021-10-15 2021-10-15
21214 2021-05-17 2021-05-17
21215 2021-05-13 2021-05-13

[21216 rows x 2 columns]

NetCDF time format requires the time to be encoded as number of milliseconds since a time of origin. In our case the time of origin is 1970-01-01 as indicated in configs.ipynb CONFIFS['units']['time'] dictionary.

EncodeTimeCB converts the HELCOM time format to the MARIS NetCDF time format.

dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[ParseTimeCB(),
                            EncodeTimeCB(cfg(), verbose=True),
                            CompareDfsAndTfmCB(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
8 of 21216 entries for `time` are invalid for seawater.
1 of 39817 entries for `time` are invalid for sediment.
                                                    seawater  sediment  biota
Number of rows in dfs                                  21216     39817  15827
Number of rows in tfm.dfs                              21208     39816  15827
Number of dropped rows                                     8         1      0
Number of rows in tfm.dfs + Number of dropped rows     21216     39817  15827 
tfm.dfs['seawater']
KEY NUCLIDE METHOD < VALUE_Bq/m³ VALUE_Bq/m³ ERROR%_m³ DATE_OF_ENTRY_x COUNTRY LABORATORY SEQUENCE ... TDEPTH SDEPTH SALIN TTEMP FILT MORS_SUBBASIN HELCOM_SUBBASIN DATE_OF_ENTRY_y time begperiod
0 WKRIL2012003 CS137 NaN NaN 5.3 32.000000 08/20/14 00:00:00 90.0 KRIL 2012003.0 ... NaN 0.0 NaN NaN NaN 11.0 11.0 08/20/14 00:00:00 1337731200 2012-05-23
1 WKRIL2012004 CS137 NaN NaN 19.9 20.000000 08/20/14 00:00:00 90.0 KRIL 2012004.0 ... NaN 29.0 NaN NaN NaN 11.0 11.0 08/20/14 00:00:00 1337731200 2012-05-23
2 WKRIL2012005 CS137 NaN NaN 25.5 20.000000 08/20/14 00:00:00 90.0 KRIL 2012005.0 ... NaN 0.0 NaN NaN NaN 11.0 3.0 08/20/14 00:00:00 1339891200 2012-06-17
3 WKRIL2012006 CS137 NaN NaN 17.0 29.000000 08/20/14 00:00:00 90.0 KRIL 2012006.0 ... NaN 0.0 NaN NaN NaN 11.0 11.0 08/20/14 00:00:00 1337817600 2012-05-24
4 WKRIL2012007 CS137 NaN NaN 22.2 18.000000 08/20/14 00:00:00 90.0 KRIL 2012007.0 ... NaN 39.0 NaN NaN NaN 11.0 11.0 08/20/14 00:00:00 1337817600 2012-05-24
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
21211 WSSSM2021005 H3 SSM45 NaN 1030.0 93.203883 09/06/22 00:00:00 77.0 SSSM 202105.0 ... NaN 1.0 NaN NaN N 1.0 8.0 09/06/22 00:00:00 1634256000 2021-10-15
21212 WSSSM2021006 H3 SSM45 NaN 2240.0 43.303571 09/06/22 00:00:00 77.0 SSSM 202106.0 ... NaN 1.0 NaN NaN N 10.0 10.0 09/06/22 00:00:00 1635984000 2021-11-04
21213 WSSSM2021007 H3 SSM45 NaN 2060.0 47.087379 09/06/22 00:00:00 77.0 SSSM 202107.0 ... NaN 1.0 NaN NaN N 12.0 12.0 09/06/22 00:00:00 1634256000 2021-10-15
21214 WSSSM2021008 H3 SSM45 NaN 2300.0 43.478261 09/06/22 00:00:00 77.0 SSSM 202108.0 ... NaN 1.0 NaN NaN N 12.0 12.0 09/06/22 00:00:00 1621209600 2021-05-17
21215 WSSSM2021004 H3 SSM45 < NaN NaN 09/06/22 00:00:00 77.0 SSSM 202104.0 ... NaN 1.0 NaN NaN N 15.0 18.0 09/06/22 00:00:00 1620864000 2021-05-13

21208 rows × 29 columns

Sanitize value

We allocate each column containing measurement values (named differently across sample types as unit are mentioned as well in column names) into a single column value and remove NA where needed.


source

SanitizeValue

 SanitizeValue (coi:Dict[str,Dict[str,str]])

Sanitize value/measurement by removing blank entries and populating value column.

Type Details
coi Dict Columns of interest. Format: {group_name: {‘val’: ‘column_name’}}
Exported source
coi_val = {'seawater' : {'val': 'VALUE_Bq/m³'},
           'biota':  {'val': 'VALUE_Bq/kg'},
           'sediment': {'val': 'VALUE_Bq/kg'}}
Exported source
class SanitizeValue(Callback):
    "Sanitize value/measurement by removing blank entries and populating `value` column."
    def __init__(self, 
                 coi: Dict[str, Dict[str, str]] # Columns of interest. Format: {group_name: {'val': 'column_name'}}
                 ): 
        fc.store_attr()

    def __call__(self, tfm: Transformer):
        for grp, df in tfm.dfs.items():
            value_col = self.coi[grp]['val']
            df.dropna(subset=[value_col], inplace=True)
            df['value'] = df[value_col]
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[SanitizeValue(coi_val),
                            CompareDfsAndTfmCB(dfs)
                            ])

tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
                                                    seawater  sediment  biota
Number of rows in dfs                                  21216     39817  15827
Number of rows in tfm.dfs                              21122     39532  15798
Number of dropped rows                                    94       285     29
Number of rows in tfm.dfs + Number of dropped rows     21216     39817  15827 

Normalize uncertainty

Function unc_rel2stan converts uncertainty from relative uncertainty to standard uncertainty.


source

unc_rel2stan

 unc_rel2stan (df:pandas.core.frame.DataFrame, meas_col:str, unc_col:str)

Convert relative uncertainty to absolute uncertainty.

Type Details
df DataFrame DataFrame containing measurement and uncertainty columns
meas_col str Name of the column with measurement values
unc_col str Name of the column with relative uncertainty values (percentages)
Returns Series Series with calculated absolute uncertainties
Exported source
def unc_rel2stan(
    df: pd.DataFrame, # DataFrame containing measurement and uncertainty columns
    meas_col: str, # Name of the column with measurement values
    unc_col: str # Name of the column with relative uncertainty values (percentages)
) -> pd.Series: # Series with calculated absolute uncertainties
    "Convert relative uncertainty to absolute uncertainty."
    return df.apply(lambda row: row[unc_col] * row[meas_col] / 100, axis=1)

For each sample type in the Helcom dataset, the uncertainty is given as a relative uncertainty. The column names for both the value and the uncertainty vary by sample type. The coi_units_unc dictionary defines the column names for the Value and Uncertainty for each sample type.

Exported source
# Columns of interest
coi_units_unc = [('seawater', 'VALUE_Bq/m³', 'ERROR%_m³'),
                 ('biota', 'VALUE_Bq/kg', 'ERROR%'),
                 ('sediment', 'VALUE_Bq/kg', 'ERROR%_kg')]

NormalizeUncCB callback normalizes the uncertainty by converting from relative uncertainty to standard uncertainty.


source

NormalizeUncCB

 NormalizeUncCB (fn_convert_unc:Callable=<function unc_rel2stan>,
                 coi:List[Tuple[str,str,str]]=[('seawater', 'VALUE_Bq/m³',
                 'ERROR%_m³'), ('biota', 'VALUE_Bq/kg', 'ERROR%'),
                 ('sediment', 'VALUE_Bq/kg', 'ERROR%_kg')])

Convert from relative error % to uncertainty of activity unit.

Type Default Details
fn_convert_unc Callable unc_rel2stan Function converting relative uncertainty to absolute uncertainty
coi List [(‘seawater’, ‘VALUE_Bq/m³’, ’ERROR%_m³’), (‘biota’, ‘VALUE_Bq/kg’, ‘ERROR%’), (‘sediment’, ‘VALUE_Bq/kg’, ’ERROR%_kg’)] List of columns of interest
Exported source
class NormalizeUncCB(Callback):
    "Convert from relative error % to uncertainty of activity unit."
    def __init__(self, 
                 fn_convert_unc: Callable=unc_rel2stan, # Function converting relative uncertainty to absolute uncertainty
                 coi: List[Tuple[str, str, str]]=coi_units_unc # List of columns of interest
                ):
        fc.store_attr()
    
    def __call__(self, tfm: Transformer):
        for grp, val, unc in self.coi:
            if grp in tfm.dfs:
                df = tfm.dfs[grp]
                df['uncertainty'] = self.fn_convert_unc(df, val, unc)

Apply the transformer for callback NormalizeUncCB(). Then, print the value (i.e. activity per unit ) and standard uncertainty for each sample type.

dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[NormalizeUncCB(),
                            SanitizeValue(coi_val)])

print(tfm()['seawater'][['value', 'uncertainty']][:5])
print(tfm()['biota'][['value', 'uncertainty']][:5])
print(tfm()['sediment'][['value', 'uncertainty']][:5])
   value  uncertainty
0    5.3        1.696
1   19.9        3.980
2   25.5        5.100
3   17.0        4.930
4   22.2        3.996
        value  uncertainty
0    0.010140          NaN
1  135.300000     4.830210
2    0.013980          NaN
3    4.338000     0.150962
4    0.009614          NaN
   value  uncertainty
0   35.0         9.10
1   36.0         7.92
2   38.0         9.12
3   36.0         9.00
4   30.0         6.90

Remap Biota species

We follow in the next following processing steps the same approach as for remapping of nuclide names above.

Let’s inspect the RUBIN_NAME.csv file provided by HELCOM describing the biota species nomenclature.

pd.read_csv(Path(fname_in) / 'RUBIN_NAME.csv').head()
RUBIN_ID RUBIN SCIENTIFIC NAME ENGLISH NAME
0 11 ABRA BRA ABRAMIS BRAMA BREAM
1 12 ANGU ANG ANGUILLA ANGUILLA EEL
2 13 ARCT ISL ARCTICA ISLANDICA ISLAND CYPRINE
3 14 ASTE RUB ASTERIAS RUBENS COMMON STARFISH
4 15 CARD EDU CARDIUM EDULE COCKLE

We try to remap the SCIENTIFIC NAME column to the species column of the MARIS nomenclature, again using a Remapper object:

remapper = Remapper(provider_lut_df=pd.read_csv(Path(fname_in) / 'RUBIN_NAME.csv'),
                    maris_lut_fn=species_lut_path,
                    maris_col_id='species_id',
                    maris_col_name='species',
                    provider_col_to_match='SCIENTIFIC NAME',
                    provider_col_key='RUBIN',
                    fname_cache='species_helcom.pkl'
                    )

remapper.generate_lookup_table(as_df=True)
remapper.select_match(match_score_threshold=1)
Processing: 100%|██████████| 46/46 [00:06<00:00,  6.81it/s]
matched_maris_name source_name match_score
source_key
STIZ LUC Sander lucioperca STIZOSTEDION LUCIOPERCA 10
LAMI SAC Laminaria japonica LAMINARIA SACCHARINA 7
CARD EDU Cardiidae CARDIUM EDULE 6
ENCH CIM Echinodermata ENCHINODERMATA CIM 5
PSET MAX Pinctada maxima PSETTA MAXIMA 5
MACO BAL Macoma balthica MACOMA BALTICA 1
STUC PEC Stuckenia pectinata STUCKENIA PECTINATE 1

We fix below some of the entries that are not properly matched by the Remapper object:

Exported source
fixes_biota_species = {
    'CARDIUM EDULE': 'Cerastoderma edule',
    'LAMINARIA SACCHARINA': 'Saccharina latissima',
    'PSETTA MAXIMA': 'Scophthalmus maximus',
    'STIZOSTEDION LUCIOPERCA': 'Sander luciopercas'}

And give it an another try:

remapper.generate_lookup_table(fixes=fixes_biota_species)
remapper.select_match(match_score_threshold=1)
Processing:   0%|          | 0/46 [00:00<?, ?it/s]Processing: 100%|██████████| 46/46 [00:06<00:00,  6.78it/s]
matched_maris_name source_name match_score
source_key
ENCH CIM Echinodermata ENCHINODERMATA CIM 5
MACO BAL Macoma balthica MACOMA BALTICA 1
STIZ LUC Sander lucioperca STIZOSTEDION LUCIOPERCA 1
STUC PEC Stuckenia pectinata STUCKENIA PECTINATE 1

Visual inspection of the remaining unperfectly matched entries seem acceptable to proceed.

We can now use the generic RemapCB callback to perform the remapping of the RUBIN column to the species column after having defined the lookup table lut_biota.

Exported source
lut_biota = lambda: Remapper(provider_lut_df=pd.read_csv(Path(fname_in) / 'RUBIN_NAME.csv'),
                             maris_lut_fn=species_lut_path,
                             maris_col_id='species_id',
                             maris_col_name='species',
                             provider_col_to_match='SCIENTIFIC NAME',
                             provider_col_key='RUBIN',
                             fname_cache='species_helcom.pkl'
                             ).generate_lookup_table(fixes=fixes_biota_species, as_df=False, overwrite=False)
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
    RemapCB(fn_lut=lut_biota, col_remap='species', col_src='RUBIN', dest_grps='biota')
    ])

# For instance:
print(tfm()['biota']['species'].unique())
[  99  243   50  139  270  192  191  284   84  269  122   96  287  279
  278  288  286  244  129  275  271  285  283  247  120   59  280  274
  273  290  289  272  277  276   21  282  110  281  245  704 1524  703
 1611  621   60]

Remap Biota tissues

Let’s inspect the TISSUE.csv file provided by HELCOM describing the tissue nomenclature. Biota tissue is known as body part in the maris data set.

pd.read_csv('../../_data/accdb/mors/csv/TISSUE.csv').head()
TISSUE TISSUE_DESCRIPTION
0 1 WHOLE FISH
1 2 WHOLE FISH WITHOUT ENTRAILS
2 3 WHOLE FISH WITHOUT HEAD AND ENTRAILS
3 4 FLESH WITH BONES
4 5 FLESH WITHOUT BONES (FILETS)
remapper = Remapper(provider_lut_df=pd.read_csv('../../_data/accdb/mors/csv/TISSUE.csv'),
                    maris_lut_fn=bodyparts_lut_path,
                    maris_col_id='bodypar_id',
                    maris_col_name='bodypar',
                    provider_col_to_match='TISSUE_DESCRIPTION',
                    provider_col_key='TISSUE',
                    fname_cache='tissues_helcom.pkl'
                    )

remapper.generate_lookup_table(as_df=True)
remapper.select_match(match_score_threshold=1)
Processing: 100%|██████████| 29/29 [00:00<00:00, 137.28it/s]
matched_maris_name source_name match_score
source_key
3 Flesh without bones WHOLE FISH WITHOUT HEAD AND ENTRAILS 20
2 Flesh without bones WHOLE FISH WITHOUT ENTRAILS 13
8 Soft parts SKIN/EPIDERMIS 10
5 Flesh without bones FLESH WITHOUT BONES (FILETS) 9
1 Whole animal WHOLE FISH 5
12 Brain ENTRAILS 5
15 Stomach and intestine STOMACH + INTESTINE 3
41 Whole animal WHOLE ANIMALS 1

We fix below some of the entries that are not properly matched by the Remapper object:

Exported source
fixes_biota_tissues = {
    'WHOLE FISH WITHOUT HEAD AND ENTRAILS': 'Whole animal eviscerated without head',
    'ENTRAILS': 'Viscera',
    'SKIN/EPIDERMIS': 'Skin'}
remapper.generate_lookup_table(as_df=True, fixes=fixes_biota_tissues)
remapper.select_match(match_score_threshold=1)
Processing: 100%|██████████| 29/29 [00:00<00:00, 137.01it/s]
matched_maris_name source_name match_score
source_key
2 Flesh without bones WHOLE FISH WITHOUT ENTRAILS 13
5 Flesh without bones FLESH WITHOUT BONES (FILETS) 9
1 Whole animal WHOLE FISH 5
15 Stomach and intestine STOMACH + INTESTINE 3
41 Whole animal WHOLE ANIMALS 1

Visual inspection of the remaining unperfectly matched entries seem acceptable to proceed.

We can now use the generic RemapCB callback to perform the remapping of the TISSUE column to the body_part column after having defined the lookup table lut_tissues.

Exported source
lut_tissues = lambda: Remapper(provider_lut_df=pd.read_csv('../../_data/accdb/mors/csv/TISSUE.csv'),
                               maris_lut_fn=bodyparts_lut_path,
                               maris_col_id='bodypar_id',
                               maris_col_name='bodypar',
                               provider_col_to_match='TISSUE_DESCRIPTION',
                               provider_col_key='TISSUE',
                               fname_cache='tissues_helcom.pkl'
                               ).generate_lookup_table(fixes=fixes_biota_tissues, as_df=False, overwrite=False)
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
    RemapCB(fn_lut=lut_biota, col_remap='species', col_src='RUBIN', dest_grps='biota'),
    RemapCB(lut_tissues, 'body_part', 'TISSUE', 'biota')
    ])

print(tfm()['biota'][['TISSUE', 'body_part']][:5])
   TISSUE  body_part
0       5         52
1       5         52
2       5         52
3       5         52
4       5         52

Remap biogroup

get_biogroup_lut reads the file at species_lut_path() and from the contents of this file creates a dictionary linking species_id to biogroup_id.

Exported source
lut_biogroup = lambda: get_lut(species_lut_path().parent, species_lut_path().name, 
                               key='species_id', value='biogroup_id')
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
    RemapCB(fn_lut=lut_biota, col_remap='species', col_src='RUBIN', dest_grps='biota'),
    RemapCB(lut_tissues, 'body_part', 'TISSUE', 'biota'),
    RemapCB(lut_biogroup, 'bio_group', 'species', 'biota')
    ])

print(tfm()['biota']['bio_group'].unique())
[ 4  2 14 11  8  3]

Remap Taxon Information

Currently, the details (Taxonname, TaxonRepName, Taxonrank) are used for importing into the MARIS master database, but they are not included in the NetCDF encoding.

We first need to retrieve the taxon information from the dbo_species.xlsx file.


source

get_taxon_info_lut

 get_taxon_info_lut (maris_lut:str)

Retrieve a lookup table for Taxonname from a MARIS lookup table.

Type Details
maris_lut str Path to the MARIS lookup table (Excel file)
Returns dict A dictionary mapping species_id to biogroup_id
Exported source
# TODO: Include Commonname field after next MARIS data reconciling process.
def get_taxon_info_lut(
    maris_lut:str # Path to the MARIS lookup table (Excel file)
) -> dict: # A dictionary mapping species_id to biogroup_id
    "Retrieve a lookup table for Taxonname from a MARIS lookup table."
    species = pd.read_excel(maris_lut)
    return species[['species_id', 'Taxonname', 'Taxonrank','TaxonDB','TaxonDBID','TaxonDBURL']].set_index('species_id').to_dict()

lut_taxon = lambda: get_taxon_info_lut(species_lut_path())

source

RemapTaxonInformationCB

 RemapTaxonInformationCB (fn_lut:Callable)

Update taxon information based on MARIS species LUT.

Exported source
class RemapTaxonInformationCB(Callback):
    "Update taxon information based on MARIS species LUT."
    def __init__(self, fn_lut: Callable):
        self.fn_lut = fn_lut

    def __call__(self, tfm: Transformer):
        lut = self.fn_lut()
        df = tfm.dfs['biota']
        
        df['TaxonRepName'] = df.get('RUBIN', 'Unknown')
        
        taxon_columns = ['Taxonname', 'Taxonrank', 'TaxonDB', 'TaxonDBID', 'TaxonDBURL']
        for col in taxon_columns:
            df[col] = df['species'].map(lut[col]).fillna('Unknown')
        
        unmatched = df[df['Taxonname'] == 'Unknown']['species'].unique()
        if len(unmatched) > 0:
            print(f"Unmatched species IDs: {', '.join(unmatched)}")
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[ 
                            RemapCB(fn_lut=lut_biota, col_remap='species', col_src='RUBIN', dest_grps='biota'),
                            RemapCB(lut_tissues, 'body_part', 'TISSUE', 'biota'),
                            RemapCB(lut_biogroup, 'bio_group', 'species', 'biota'),
                            RemapTaxonInformationCB(lut_taxon)
                            ])
tfm()
print(tfm.dfs['biota'][['TaxonRepName', 'Taxonname', 'Taxonrank',
                        'TaxonDB','TaxonDBID','TaxonDBURL']].drop_duplicates().head())
   TaxonRepName             Taxonname Taxonrank   TaxonDB TaxonDBID  \
0      GADU MOR          Gadus morhua   species  Wikidata   Q199788   
40     SPRA SPR     Sprattus sprattus   species  Wikidata   Q506823   
44     CLUP HAR       Clupea harengus   species  Wikidata  Q2396858   
77     MERL MNG  Merlangius merlangus   species  Wikidata   Q273083   
78     LIMA LIM       Limanda limanda   species  Wikidata  Q1135526   

                                TaxonDBURL  
0    https://www.wikidata.org/wiki/Q199788  
40   https://www.wikidata.org/wiki/Q506823  
44  https://www.wikidata.org/wiki/Q2396858  
77   https://www.wikidata.org/wiki/Q273083  
78  https://www.wikidata.org/wiki/Q1135526  

Remap Sediment types

We use again the same IMFA (Inspect, Match, Fix, Apply) pattern to remap the HELCOM sediment types.

Let’s inspect the SEDIMENT_TYPE.csv file provided by HELCOM describing the sediment type nomenclature:

pd.read_csv(Path(fname_in) / 'SEDIMENT_TYPE.csv').head()
SEDI SEDIMENT TYPE RECOMMENDED TO BE USED
0 -99 NO DATA NaN
1 0 GRAVEL YES
2 1 SAND YES
3 2 FINE SAND NO
4 3 SILT YES
Tip

FEEDBACK TO DATA PROVIDER: The SEDI values 56 and 73 are not found in the SEDIMENT_TYPE.csv lookup table provided. Note also there are many nan values in the SEDIMENT_TYPE.csv file.

We reassign them to -99 for now but should be clarified/fixed. This is demonstrated below.

df_sed_lut = pd.read_csv(Path(fname_in) / 'SEDIMENT_TYPE.csv')
dfs = load_data(fname_in)

sediment_sedi = set(dfs['sediment'].SEDI.unique())
lookup_sedi = set(df_sed_lut['SEDI'])
missing = sediment_sedi - lookup_sedi
print(f"Missing SEDI values: {missing if missing else 'None'}")
Missing SEDI values: {56.0, 73.0, nan}

Let’s try to match as many as possible:

remapper = Remapper(provider_lut_df=pd.read_csv(Path(fname_in)/'SEDIMENT_TYPE.csv'),
                    maris_lut_fn=sediments_lut_path,
                    maris_col_id='sedtype_id',
                    maris_col_name='sedtype',
                    provider_col_to_match='SEDIMENT TYPE',
                    provider_col_key='SEDI',
                    fname_cache='sediments_helcom.pkl'
                    )

remapper.generate_lookup_table(as_df=True)
remapper.select_match(match_score_threshold=1)
Processing: 100%|██████████| 47/47 [00:00<00:00, 139.47it/s]
matched_maris_name source_name match_score
source_key
-99 Soft NO DATA 5
50 Mud and gravel MUD AND GARVEL 2
46 Glacial clay CLACIAL CLAY 1
Exported source
fixes_sediments = {
    'NO DATA': '(Not available)'
}
remapper.generate_lookup_table(as_df=True, fixes=fixes_sediments)
remapper.select_match(match_score_threshold=1)
Processing: 100%|██████████| 47/47 [00:00<00:00, 140.51it/s]
matched_maris_name source_name match_score
source_key
50 Mud and gravel MUD AND GARVEL 2
46 Glacial clay CLACIAL CLAY 1

source

RemapSedimentCB

 RemapSedimentCB (fn_lut:Callable)

Update sediment id based on MARIS species LUT (dbo_sedtype.xlsx).

Type Details
fn_lut Callable Function that returns the lookup table dictionary
Exported source
class RemapSedimentCB(Callback):
    "Update sediment id based on MARIS species LUT (dbo_sedtype.xlsx)."
    def __init__(self, 
                 fn_lut: Callable, # Function that returns the lookup table dictionary
                ):
        fc.store_attr()

    def _fix_inconsistent_sedi(self, df:pd.DataFrame) -> pd.DataFrame:
        "Temporary fix for inconsistent SEDI values. Data provider to confirm and clarify."
        df['SEDI'] = df['SEDI'].replace({56: -99, 73: -99, np.nan: -99})
        return df
    
    def __call__(self, tfm: Transformer):
        "Remap sediment types in the DataFrame using the lookup table and handle specific replacements."
        lut = self.fn_lut()
        
        # Set SedRepName (TBC: what's used for?)
        tfm.dfs['sediment']['SedRepName']  = tfm.dfs['sediment']['SEDI'] 
        
        tfm.dfs['sediment'] = self._fix_inconsistent_sedi(tfm.dfs['sediment'])
        tfm.dfs['sediment']['sed_type'] = tfm.dfs['sediment']['SEDI'].apply(lambda x: self._get_sediment_type(x, lut))

    def _get_sediment_type(self, 
                           sedi_value: int, # The `SEDI` value from the DataFrame
                           lut: dict # The lookup table dictionary
                          ) -> Match: # The Match object
        "Get the matched_id from the lookup table and print SEDI if the matched_id is -1."
        match = lut.get(sedi_value, Match(-1, None, None, None))
        
        if match.matched_id == -1:
            self._print_unmatched_sedi(sedi_value)
        return match.matched_id

    def _print_unmatched_sedi(self, 
                              sedi_value: int # The `SEDI` value from the DataFram
                             ) -> None:
        "Print the SEDI value if the matched_id is -1."
        print(f"Unmatched SEDI: {sedi_value}")
Exported source
lut_sediments = lambda: Remapper(provider_lut_df=pd.read_csv(Path(fname_in) / 'SEDIMENT_TYPE.csv'),
                                 maris_lut_fn=sediments_lut_path,
                                 maris_col_id='sedtype_id',
                                 maris_col_name='sedtype',
                                 provider_col_to_match='SEDIMENT TYPE',
                                 provider_col_key='SEDI',
                                 fname_cache='sediments_helcom.pkl'
                                 ).generate_lookup_table(fixes=fixes_sediments, as_df=False, overwrite=False)

Apply the transformer for callbacks RemapSedimentCB(get_maris_sediments). Then, print the SEDI and sed_type for the biota dataframe.

dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[RemapSedimentCB(lut_sediments)])

tfm()['sediment']['sed_type'].unique()
array([ 0,  2, 58, 30, 59, 55, 56, 36, 29, 47,  4, 54, 33,  6, 44, 42, 48,
       61, 57, 28, 49, 32, 45, 39, 46, 38, 31, 60, 62, 26, 53, 52,  1, 51,
       37, 34, 50,  7, 10, 41, 43, 35])

Remap units

Tip

FEEDBACK TO DATA PROVIDER: The handling of unit types varies between biota and sediment sample types. For consistency and ease of use, it would be beneficial to have dedicated unit columns for all sample types.

For seawater and sediment sample types, the HELCOM dataset refers to units direcly in the name of certain columns, such as VALUE_Bq/m³ or VALUE_Bq/kg. As for biota, the units are included in the BASIS column. This is shown below:

dfs = load_data(fname_in)
for grp in ['biota', 'sediment', 'seawater']:
    print(f"{grp}: {dfs[grp].columns}")
    
dfs['biota']['BASIS'].unique()
biota: Index(['KEY', 'NUCLIDE', 'METHOD', '< VALUE_Bq/kg', 'VALUE_Bq/kg', 'BASIS',
       'ERROR%', 'NUMBER', 'DATE_OF_ENTRY_x', 'COUNTRY', 'LABORATORY',
       'SEQUENCE', 'DATE', 'YEAR', 'MONTH', 'DAY', 'STATION',
       'LATITUDE ddmmmm', 'LATITUDE dddddd', 'LONGITUDE ddmmmm',
       'LONGITUDE dddddd', 'SDEPTH', 'RUBIN', 'BIOTATYPE', 'TISSUE', 'NO',
       'LENGTH', 'WEIGHT', 'DW%', 'LOI%', 'MORS_SUBBASIN', 'HELCOM_SUBBASIN',
       'DATE_OF_ENTRY_y'],
      dtype='object')
sediment: Index(['KEY', 'NUCLIDE', 'METHOD', '< VALUE_Bq/kg', 'VALUE_Bq/kg', 'ERROR%_kg',
       '< VALUE_Bq/m²', 'VALUE_Bq/m²', 'ERROR%_m²', 'DATE_OF_ENTRY_x',
       'COUNTRY', 'LABORATORY', 'SEQUENCE', 'DATE', 'YEAR', 'MONTH', 'DAY',
       'STATION', 'LATITUDE (ddmmmm)', 'LATITUDE (dddddd)',
       'LONGITUDE (ddmmmm)', 'LONGITUDE (dddddd)', 'DEVICE', 'TDEPTH',
       'UPPSLI', 'LOWSLI', 'AREA', 'SEDI', 'OXIC', 'DW%', 'LOI%',
       'MORS_SUBBASIN', 'HELCOM_SUBBASIN', 'SUM_LINK', 'DATE_OF_ENTRY_y'],
      dtype='object')
seawater: Index(['KEY', 'NUCLIDE', 'METHOD', '< VALUE_Bq/m³', 'VALUE_Bq/m³', 'ERROR%_m³',
       'DATE_OF_ENTRY_x', 'COUNTRY', 'LABORATORY', 'SEQUENCE', 'DATE', 'YEAR',
       'MONTH', 'DAY', 'STATION', 'LATITUDE (ddmmmm)', 'LATITUDE (dddddd)',
       'LONGITUDE (ddmmmm)', 'LONGITUDE (dddddd)', 'TDEPTH', 'SDEPTH', 'SALIN',
       'TTEMP', 'FILT', 'MORS_SUBBASIN', 'HELCOM_SUBBASIN', 'DATE_OF_ENTRY_y'],
      dtype='object')
array(['W', nan, 'D', 'F'], dtype=object)

Given the inconsistent handling of units across sample types, we need to define custom mapping rules for standardizing the units. Below the MARIS unit types:

pd.read_excel(unit_lut_path())[['unit_id', 'unit', 'unit_sanitized']]
unit_id unit unit_sanitized
0 -1 Not applicable Not applicable
1 0 NOT AVAILABLE NOT AVAILABLE
2 1 Bq/m3 Bq per m3
3 2 Bq/m2 Bq per m2
4 3 Bq/kg Bq per kg
5 4 Bq/kgd Bq per kgd
6 5 Bq/kgw Bq per kgw
7 6 kg/kg kg per kg
8 7 TU TU
9 8 DELTA/mill DELTA per mill
10 9 atom/kg atom per kg
11 10 atom/kgd atom per kgd
12 11 atom/kgw atom per kgw
13 12 atom/l atom per l
14 13 Bq/kgC Bq per kgC

We define unit names renaming rules from HELCOM in an ad hoc way for now:


source

RemapUnitCB

 RemapUnitCB (lut_units:dict={'seawater': 1, 'sediment': 4, 'biota': {'D':
              4, 'W': 5, 'F': 5}})

Set the unit id column in the DataFrames based on a lookup table.

Type Default Details
lut_units dict {‘seawater’: 1, ‘sediment’: 4, ‘biota’: {‘D’: 4, ‘W’: 5, ‘F’: 5}} Dictionary containing renaming rules for different unit categories
Exported source
lut_units = {
    'seawater': 1,  # 'Bq/m3'
    'sediment': 4,  # 'Bq/kgd' for sediment
    'biota': {
        'D': 4,  # 'Bq/kgd'
        'W': 5,  # 'Bq/kgw'
        'F': 5   # 'Bq/kgw' (assumed to be 'Fresh', so set to wet)
    }
}
Exported source
class RemapUnitCB(Callback):
    "Set the `unit` id column in the DataFrames based on a lookup table."
    def __init__(self, 
                 lut_units: dict=lut_units # Dictionary containing renaming rules for different unit categories
                ):
        fc.store_attr()

    def __call__(self, tfm: Transformer):
        for grp in tfm.dfs.keys():
            if grp in ['seawater', 'sediment']:
                tfm.dfs[grp]['unit'] = self.lut_units[grp]
            else:
                tfm.dfs[grp]['unit'] = tfm.dfs[grp]['BASIS'].apply(lambda x: lut_units[grp].get(x, 0))

Apply the transformer for callback RemapUnitCB(). Then, print the unique unit for the seawater dataframe.

dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[RemapUnitCB()])

for grp in ['biota', 'sediment', 'seawater']:
    print(f"{grp}: {tfm()[grp]['unit'].unique()}")
biota: [5 0 4]
sediment: [4]
seawater: [1]

Remap detection limit

Detection limits are encoded as follows in MARIS:

pd.read_excel(detection_limit_lut_path())
id name name_sanitized
0 -1 Not applicable Not applicable
1 0 Not Available Not available
2 1 = Detected value
3 2 < Detection limit
4 3 ND Not detected
5 4 DE Derived
Exported source
lut_dl = lambda: pd.read_excel(detection_limit_lut_path(), usecols=['name','id']).set_index('name').to_dict()['id']

Based on columns of interest for each sample type:

Exported source
coi_dl = {'seawater' : {'val' : 'VALUE_Bq/m³',
                       'unc' : 'ERROR%_m³',
                       'dl' : '< VALUE_Bq/m³'},
          'biota':  {'val' : 'VALUE_Bq/kg',
                     'unc' : 'ERROR%',
                     'dl' : '< VALUE_Bq/kg'},
          'sediment': {
              'val' : 'VALUE_Bq/kg',
              'unc' : 'ERROR%_kg',
              'dl' : '< VALUE_Bq/kg'}}

We follow the following business logic to encode the detection limit:

RemapDetectionLimitCB creates a detection_limit column with values determined as follows: 1. Perform a lookup with the appropriate columns value type (or detection limit) columns (< VALUE_Bq/m³ or < VALUE_Bq/kg) against the table returned from the function get_detectionlimit_lut. 2. If < VALUE_Bq/m³ or < VALUE_Bq/kg is NaN but both activity values (VALUE_Bq/m³ or VALUE_Bq/kg) and standard uncertainty (ERROR%_m³, ERROR%, or ERROR%_kg) are provided, then assign the ID of 1 (i.e. “Detected value”). 3. For other NaN values in the detection_limit column, set them to 0 (i.e. Not Available).


source

RemapDetectionLimitCB

 RemapDetectionLimitCB (coi:dict, fn_lut:Callable)

Remap value type to MARIS format.

Type Details
coi dict Configuration options for column names
fn_lut Callable Function that returns a lookup table
Exported source
# TO BE REFACTORED
class RemapDetectionLimitCB(Callback):
    "Remap value type to MARIS format."
    def __init__(self, 
                 coi: dict, # Configuration options for column names
                 fn_lut: Callable # Function that returns a lookup table
                ):
        fc.store_attr()

    def __call__(self, tfm: Transformer):
        "Remap detection limits in the DataFrames using the lookup table."
        lut = self.fn_lut()
        
        for grp in tfm.dfs:
            df = tfm.dfs[grp]
            self._update_detection_limit(df, grp, lut)
    
    def _update_detection_limit(self, 
                                df: pd.DataFrame, # The DataFrame to modify
                                grp: str, # The group name to get the column configuration
                                lut: dict # The lookup table dictionary
                               ) -> None:
        "Update detection limit column in the DataFrame based on lookup table and rules."
        detection_col = self.coi[grp]['dl']
        value_col = self.coi[grp]['val']
        uncertainty_col = self.coi[grp]['unc']
        
        # Copy detection limit column
        df['detection_limit'] = df[detection_col]
        
        # Fill values with '=' or 'Not Available'
        condition = ((df[value_col].notna()) & (df[uncertainty_col].notna()) &
                     (~df['detection_limit'].isin(lut.keys())))
        df.loc[condition, 'detection_limit'] = '='
        df.loc[~df['detection_limit'].isin(lut.keys()), 'detection_limit'] = 'Not Available'
        
        # Perform lookup
        df['detection_limit'] = df['detection_limit'].map(lut)
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            NormalizeUncCB(),
                            SanitizeValue(coi_val),                       
                            RemapUnitCB(),
                            RemapDetectionLimitCB(coi_dl, lut_dl)])


for grp in ['biota', 'sediment', 'seawater']:
    print(f"{grp}: {tfm()[grp]['detection_limit'].unique()}")
biota: [2 1 0]
sediment: [1 2 0]
seawater: [1 2 0]

Remap filtering status

HELCOM filtered status is encoded as follows in the FILT column:

dfs = load_data(fname_in)
get_unique_across_dfs(dfs, col_name='FILT', as_df=True).head(5)
index value
0 0 NaN
1 1 F
2 2 n
3 3 N

While MARIS uses a different encoding for filtered status:

pd.read_excel(filtered_lut_path())
id name
0 -1 Not applicable
1 0 Not available
2 1 Yes
3 2 No

For only four categories to remap, the Remapper is an overkill. We can use a simple dictionary to map the values:

Exported source
lut_filtered = {
    'N': 2,
    'n': 2,
    'F': 1
}

RemapFiltCB converts the HELCOM FILT format to the MARIS FILT format.


source

RemapFiltCB

 RemapFiltCB (lut_filtered:dict={'N': 2, 'n': 2, 'F': 1})

Lookup FILT value in dataframe using the lookup table.

Type Default Details
lut_filtered dict {‘N’: 2, ‘n’: 2, ‘F’: 1} Dictionary mapping FILT codes to their corresponding names
Exported source
class RemapFiltCB(Callback):
    "Lookup FILT value in dataframe using the lookup table."
    def __init__(self,
                 lut_filtered: dict=lut_filtered, # Dictionary mapping FILT codes to their corresponding names
                ):
        fc.store_attr()

    def __call__(self, tfm):
        for df in tfm.dfs.values():
            if 'FILT' in df.columns:
                df['FILT'] = df['FILT'].map(lambda x: self.lut_filtered.get(x, 0))

For instance:

dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[RemapFiltCB(lut_filtered)])

print(tfm()['seawater']['FILT'].unique())
[0 2 1]

Add Sample Laboratory code

Sample Laboratory code is currently stored in MARIS master DB but not encoded as NetCDF variable. Decision to include it in the NetCDF output is TBD.


source

AddSampleLabCodeCB

 AddSampleLabCodeCB ()

Remap KEY column to samplabcode in each DataFrame.

Exported source
class AddSampleLabCodeCB(Callback):
    "Remap `KEY` column to `samplabcode` in each DataFrame."
    def __call__(self, tfm: Transformer):
        for grp in tfm.dfs:
            self._remap_sample_id(tfm.dfs[grp])
    
    def _remap_sample_id(self, df: pd.DataFrame):
        df['samplabcode'] = df['KEY']
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            AddSampleLabCodeCB(),
                            CompareDfsAndTfmCB(dfs)
                            ])

print(tfm()['seawater']['samplabcode'].unique())
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
['WKRIL2012003' 'WKRIL2012004' 'WKRIL2012005' ... 'WSSSM2021006'
 'WSSSM2021007' 'WSSSM2021008']
                                                    seawater  sediment  biota
Number of rows in dfs                                  21216     39817  15827
Number of rows in tfm.dfs                              21216     39817  15827
Number of dropped rows                                     0         0      0
Number of rows in tfm.dfs + Number of dropped rows     21216     39817  15827 

Add measurement note

The HELCOM dataset includes a look-up table ANALYSIS_METHOD.csv capturing the measurement method used as described by HELCOM. For instance:

pd.read_csv(Path(fname_in) / 'ANALYSIS_METHOD.csv').head()
METHOD DESCRIPTION COUNTRY
0 BFFG01 Gammaspectrometric analysis with Germanium det... 6
1 BFFG02 Sr-90, a) Y-90 extraction method dried ash and... 6
2 BFFG03 Pu238, Pu239241; Ashing and and drying the tra... 6
3 BFFG04 Am-241 (not to in use any more) 6
4 CLOR01 137Cs and 40K activity concentrations are dete... 67

source

AddMeasurementNoteCB

 AddMeasurementNoteCB (fn_lut:Callable)

Record measurement notes by adding a ‘measurenote’ column to DataFrames.

Type Details
fn_lut Callable Function that returns the lookup dictionary with METHOD as key and DESCRIPTION as value
Exported source
lut_method = lambda: pd.read_csv(Path(fname_in) / 'ANALYSIS_METHOD.csv').set_index('METHOD').to_dict()['DESCRIPTION']
Exported source
class AddMeasurementNoteCB(Callback):
    "Record measurement notes by adding a 'measurenote' column to DataFrames."
    def __init__(self, 
                 fn_lut: Callable # Function that returns the lookup dictionary with `METHOD` as key and `DESCRIPTION` as value
                ):
        fc.store_attr()
        
    def __call__(self, tfm: Transformer):
        lut = self.fn_lut()
        for df in tfm.dfs.values():
            if 'METHOD' in df.columns:
                df['measurementnote'] = df['METHOD'].map(lambda x: lut.get(x, 0))
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
    AddMeasurementNoteCB(lut_method),
    CompareDfsAndTfmCB(dfs)])

tfm()
print(tfm.dfs['seawater']['measurementnote'].unique()[:5])
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
[0
 'Radiochemical method Radiocaesium separation from seawater samples.134+137Cs was adsorbed on AMP mat,  dissolved with NaOH and after purification precipitated as chloroplatinate (Cs2PtCl6).Counting with low background anticoincidence beta counter.'
 'Radiochem. meth of Sr90. Precipation with oxalate and separation of calcium, barium, radium and ytrium couting with low background anticoincidence beta counter. 1982-1994'
 'For tritium liquid scintialtion counting, combined with electrolytic enrichment of analysed water samples, double distilled, before and after electrolysis in cells. Liquid Scintillation spectrometer LKB Wallac model 1410'
 'Pretreatment drying (sediment, biota samples) and ashing (biota samples)or vaporization to 1000 ml (sea water samples), measured by gamma-spectrometry using HPGe detectors sediment, biota, sea water /Cs-137, Cs-134, K-40']
                                                    seawater  sediment  biota
Number of rows in dfs                                  21216     39817  15827
Number of rows in tfm.dfs                              21216     39817  15827
Number of dropped rows                                     0         0      0
Number of rows in tfm.dfs + Number of dropped rows     21216     39817  15827 

Add station

For MARIS master DB import only (not included in the NetCDF output).


source

RemapStationIdCB

 RemapStationIdCB ()

Remap Station ID to MARIS format.

Exported source
class RemapStationIdCB(Callback):
    "Remap Station ID to MARIS format."
    def __init__(self):
        fc.store_attr()

    def __call__(self, tfm: Transformer):
        "Iterate through all DataFrames in the transformer object and remap `STATION` to `station_id`."
        for grp in tfm.dfs.keys(): 
            tfm.dfs[grp]['station'] = tfm.dfs[grp]['STATION']
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            RemapStationIdCB(),
                            CompareDfsAndTfmCB(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
                                                    seawater  sediment  biota
Number of rows in dfs                                  21216     39817  15827
Number of rows in tfm.dfs                              21216     39817  15827
Number of dropped rows                                     0         0      0
Number of rows in tfm.dfs + Number of dropped rows     21216     39817  15827 

Add slice position (top and bottom)

For MARIS master DB import only (not included in the NetCDF output).


source

RemapSedSliceTopBottomCB

 RemapSedSliceTopBottomCB ()

Remap Sediment slice top and bottom to MARIS format.

Exported source
class RemapSedSliceTopBottomCB(Callback):
    "Remap Sediment slice top and bottom to MARIS format."
    def __call__(self, tfm: Transformer):
        "Iterate through all DataFrames in the transformer object and remap sediment slice top and bottom."
        tfm.dfs['sediment']['top'] = tfm.dfs['sediment']['UPPSLI']
        tfm.dfs['sediment']['bottom'] = tfm.dfs['sediment']['LOWSLI']
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[RemapSedSliceTopBottomCB()])
tfm()
print(tfm.dfs['sediment'][['top','bottom']].head())
    top  bottom
0  15.0    20.0
1  20.0    27.0
2   0.0     2.0
3   2.0     4.0
4   4.0     6.0

Add dry to wet ratio

DW% is not included in the NetCDF output currently.

HELCOM Description:

Sediment: 1. DW%: DRY WEIGHT AS PERCENTAGE (%) OF FRESH WEIGHT. 2. VALUE_Bq/kg: Measured radioactivity concentration in Bq/kg dry wt. in scientific format(e.g. 123 = 1.23E+02, 0.076 = 7.6E-02)

Biota: 1. WEIGHT: Average weight (in g) of specimen in the sample 2. DW%: DRY WEIGHT AS PERCENTAGE (%) OF FRESH WEIGHT


source

LookupDryWetRatio

 LookupDryWetRatio ()

Lookup dry-wet ratio and format for MARIS.

Exported source
class LookupDryWetRatio(Callback):
    "Lookup dry-wet ratio and format for MARIS."
    def __call__(self, tfm: Transformer):
        "Iterate through all DataFrames in the transformer object and apply the dry-wet ratio lookup."
        for grp in tfm.dfs.keys():
            if 'DW%' in tfm.dfs[grp].columns:
                self._apply_dry_wet_ratio(tfm.dfs[grp])

    def _apply_dry_wet_ratio(self, df: pd.DataFrame) -> None:
        "Apply dry-wet ratio conversion and formatting to the given DataFrame."
        df['dry_wet_ratio'] = df['DW%']
        # Convert 'DW%' = 0% to NaN.
        df.loc[df['dry_wet_ratio'] == 0, 'dry_wet_ratio'] = np.NaN
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            LookupDryWetRatio(),
                            CompareDfsAndTfmCB(dfs)
                            ])

tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
print(tfm.dfs['biota']['dry_wet_ratio'].head())
                                                    seawater  sediment  biota
Number of rows in dfs                                  21216     39817  15827
Number of rows in tfm.dfs                              21216     39817  15827
Number of dropped rows                                     0         0      0
Number of rows in tfm.dfs + Number of dropped rows     21216     39817  15827 

0    18.453
1    18.453
2    18.453
3    18.453
4    18.458
Name: dry_wet_ratio, dtype: float64

Standardize Coordinates

Tip

FEEDBACK TO DATA PROVIDER: Column names for geographical coordinates are inconsistent across sample types (biota, sediment, seawater). Sometimes using parentheses, sometimes not.

dfs = load_data(fname_in)
for grp in dfs.keys():
    print(f'{grp}: {[col for col in dfs[grp].columns if "LON" in col or "LAT" in col]}')
seawater: ['LATITUDE (ddmmmm)', 'LATITUDE (dddddd)', 'LONGITUDE (ddmmmm)', 'LONGITUDE (dddddd)']
sediment: ['LATITUDE (ddmmmm)', 'LATITUDE (dddddd)', 'LONGITUDE (ddmmmm)', 'LONGITUDE (dddddd)']
biota: ['LATITUDE ddmmmm', 'LATITUDE dddddd', 'LONGITUDE ddmmmm', 'LONGITUDE dddddd']
Tip

FEEDBACK TO DATA PROVIDER:

  • Geographical coordinates are provided in both decimal degree and degree-minute formats. Some coordinates are missing the decimal format and obliged us to use the degree-minute format with less precision.
  • Also note that latitude values have , as decimal separator while longitude values have . as decimal separator (see below)
dfs['sediment'][['LATITUDE (ddmmmm)', 'LATITUDE (dddddd)']].head()
LATITUDE (ddmmmm) LATITUDE (dddddd)
0 59.400 59,6667
1 59.400 59,6667
2 59.516 59,86
3 59.516 59,86
4 59.516 59,86

source

ParseCoordinates

 ParseCoordinates (fn_convert_cor:Callable)

Get geographical coordinates from columns expressed in degrees decimal format or from columns in degrees/minutes decimal format where degrees decimal format is missing.

Type Details
fn_convert_cor Callable Function that converts coordinates from degree-minute to decimal degree format
Exported source
class ParseCoordinates(Callback):
    """
    Get geographical coordinates from columns expressed in degrees decimal format 
    or from columns in degrees/minutes decimal format where degrees decimal format is missing.
    """
    def __init__(self, 
                 fn_convert_cor: Callable # Function that converts coordinates from degree-minute to decimal degree format
                 ):
        self.fn_convert_cor = fn_convert_cor

    def __call__(self, tfm:Transformer):
        for df in tfm.dfs.values():
            self._format_coordinates(df)

    def _format_coordinates(self, df:pd.DataFrame) -> None:
        coord_cols = self._get_coord_columns(df.columns)
        
        for coord in ['lat', 'lon']:
            decimal_col, minute_col = coord_cols[f'{coord}_d'], coord_cols[f'{coord}_m']
            
            condition = df[decimal_col].isna() | (df[decimal_col] == 0)
            df[coord] = np.where(condition,
                                 df[minute_col].apply(self._safe_convert),
                                 df[decimal_col])
        
        df.dropna(subset=['lat', 'lon'], inplace=True)

    def _get_coord_columns(self, columns) -> dict:
        return {
            'lon_d': self._find_coord_column(columns, 'LON', 'dddddd'),
            'lat_d': self._find_coord_column(columns, 'LAT', 'dddddd'),
            'lon_m': self._find_coord_column(columns, 'LON', 'ddmmmm'),
            'lat_m': self._find_coord_column(columns, 'LAT', 'ddmmmm')
        }

    def _find_coord_column(self, columns, coord_type, coord_format) -> str:
        pattern = re.compile(f'{coord_type}.*{coord_format}', re.IGNORECASE)
        matching_columns = [col for col in columns if pattern.search(col)]
        return matching_columns[0] if matching_columns else None

    def _safe_convert(self, value) -> str:
        if pd.isna(value):
            return value
        try:
            return self.fn_convert_cor(value)
        except Exception as e:
            print(f"Error converting value {value}: {e}")
            return value
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[                    
                            ParseCoordinates(ddmm_to_dd),
                            CompareDfsAndTfmCB(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
print(tfm.dfs['biota'][['lat','lon']])
                                                    seawater  sediment  biota
Number of rows in dfs                                  21216     39817  15827
Number of rows in tfm.dfs                              21208     39816  15827
Number of dropped rows                                     8         1      0
Number of rows in tfm.dfs + Number of dropped rows     21216     39817  15827 

             lat        lon
0      54.283333  12.316667
1      54.283333  12.316667
2      54.283333  12.316667
3      54.283333  12.316667
4      54.283333  12.316667
...          ...        ...
15822  60.373333  18.395667
15823  60.373333  18.395667
15824  60.503333  18.366667
15825  60.503333  18.366667
15826  60.503333  18.366667

[15827 rows x 2 columns]
Tip

FEEDBACK TO DATA PROVIDER: Some samples have (lon, lat): (0, 0) or are outside lon/lat possible values.

Sanitize coordinates drops a row when both longitude & latitude equal 0 or data contains unrealistic longitude & latitude values. Converts longitude & latitude , separator to . separator.”

dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            ParseCoordinates(ddmm_to_dd),
                            SanitizeLonLatCB(),
                            CompareDfsAndTfmCB(dfs)
                            ])

tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
print(tfm.dfs['biota'][['lat','lon']])
                                                    seawater  sediment  biota
Number of rows in dfs                                  21216     39817  15827
Number of rows in tfm.dfs                              21208     39816  15827
Number of dropped rows                                     8         1      0
Number of rows in tfm.dfs + Number of dropped rows     21216     39817  15827 

             lat        lon
0      54.283333  12.316667
1      54.283333  12.316667
2      54.283333  12.316667
3      54.283333  12.316667
4      54.283333  12.316667
...          ...        ...
15822  60.373333  18.395667
15823  60.373333  18.395667
15824  60.503333  18.366667
15825  60.503333  18.366667
15826  60.503333  18.366667

[15827 rows x 2 columns]

Review all callbacks

dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
                            AddSampleTypeIdColumnCB(),
                            LowerStripNameCB(col_src='NUCLIDE'),
                            RemapNuclideNameCB(lut_nuclides),
                            AddNuclideIdColumnCB(col_value='NUCLIDE'),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),
                            SanitizeValue(coi_val),       
                            NormalizeUncCB(),
                            RemapCB(fn_lut=lut_biota, col_remap='species', col_src='RUBIN', dest_grps='biota'),
                            RemapCB(lut_tissues, 'body_part', 'TISSUE', 'biota'),
                            RemapCB(lut_biogroup, 'bio_group', 'species', 'biota'),
                            RemapTaxonInformationCB(lut_taxon),
                            RemapSedimentCB(lut_sediments),
                            RemapUnitCB(),
                            RemapDetectionLimitCB(coi_dl, lut_dl),
                            RemapFiltCB(lut_filtered),
                            AddSampleLabCodeCB(),
                            AddMeasurementNoteCB(lut_method),
                            RemapStationIdCB(),
                            RemapSedSliceTopBottomCB(),
                            LookupDryWetRatio(),
                            ParseCoordinates(ddmm_to_dd),
                            SanitizeLonLatCB(),
                            CompareDfsAndTfmCB(dfs)
                            ])

tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
                                                    seawater  sediment  biota
Number of rows in dfs                                  21216     39817  15827
Number of rows in tfm.dfs                              21114     39531  15798
Number of dropped rows                                   102       286     29
Number of rows in tfm.dfs + Number of dropped rows     21216     39817  15827 

For instance, to inspect dropped rows:

tfm.dfs_dropped['seawater'].head()
KEY NUCLIDE METHOD < VALUE_Bq/m³ VALUE_Bq/m³ ERROR%_m³ DATE_OF_ENTRY_x COUNTRY LABORATORY SEQUENCE ... LONGITUDE (ddmmmm) LONGITUDE (dddddd) TDEPTH SDEPTH SALIN TTEMP FILT MORS_SUBBASIN HELCOM_SUBBASIN DATE_OF_ENTRY_y
13439 WRISO2001025 CS137 RISO02 NaN NaN 10.0 NaN 26.0 RISO 2001025.0 ... 10.500 10.833333 22.0 20.0 0.00 NaN N 5.0 5.0 NaN
14017 WLEPA2002001 CS134 LEPA02 < NaN NaN NaN 93.0 LEPA 2002001.0 ... 21.030 21.050000 16.0 0.0 3.77 14.40 N 4.0 9.0 NaN
14020 WLEPA2002002 CS134 LEPA02 < NaN NaN NaN 93.0 LEPA 2002004.0 ... 20.574 20.956667 14.0 0.0 6.57 11.95 N 4.0 9.0 NaN
14023 WLEPA2002003 CS134 LEPA02 < NaN NaN NaN 93.0 LEPA 2002007.0 ... 19.236 19.393333 73.0 0.0 7.00 9.19 N 4.0 9.0 NaN
14026 WLEPA2002004 CS134 LEPA02 < NaN NaN NaN 93.0 LEPA 2002010.0 ... 20.205 20.341700 47.0 0.0 7.06 8.65 N 4.0 9.0 NaN

5 rows × 27 columns

Rename columns of interest for NetCDF or Open Refine

Column names are standardized to MARIS NetCDF format (i.e. PEP8 ).


source

get_common_rules

 get_common_rules (vars:dict, encoding_type:str)

Get common renaming rules for NetCDF and OpenRefine.

Type Details
vars dict Configuration dictionary
encoding_type str Encoding type (netcdf or openrefine)
Returns dict Common renaming rules for NetCDF and OpenRefine.
Exported source
def get_common_rules(
    vars: dict, # Configuration dictionary
    encoding_type: str # Encoding type (`netcdf` or `openrefine`)
    ) -> dict: # Common renaming rules for NetCDF and OpenRefine.
    "Get common renaming rules for NetCDF and OpenRefine."
    common = {
        'KEY': 'key',
        'lat': 'latitude' if encoding_type == 'openrefine' else vars['defaults']['lat']['name'],
        'lon': 'longitude' if encoding_type == 'openrefine' else vars['defaults']['lon']['name'],
        'time': 'begperiod' if encoding_type == 'openrefine' else vars['defaults']['time']['name'],
        'NUCLIDE': 'nuclide_id' if encoding_type == 'openrefine' else 'nuclide',
        'detection_limit': 'detection' if encoding_type == 'openrefine' else vars['suffixes']['detection_limit']['name'],
        'unit': 'unit_id' if encoding_type == 'openrefine' else vars['suffixes']['unit']['name'],
        'value': 'activity' if encoding_type == 'openrefine' else 'value',
        'uncertainty': 'uncertaint' if encoding_type == 'openrefine' else vars['suffixes']['uncertainty']['name'],
        'SDEPTH': 'sampdepth' if encoding_type == 'openrefine' else vars['defaults']['smp_depth']['name'],
        'TDEPTH': 'totdepth' if encoding_type == 'openrefine' else vars['defaults']['tot_depth']['name'],
    }
    
    if encoding_type == 'openrefine':
        common.update({
            'samptype_id': 'samptype_id',
            'station': 'station',
            'samplabcode': 'samplabcode',
            'SALIN': 'salinity',
            'TTEMP': 'temperatur',
            'FILT': 'filtered',
            'measurenote': 'measurenote'
        })
    else:
        common.update({
            'counting_method': vars['suffixes']['counting_method']['name'],
            'sampling_method': vars['suffixes']['sampling_method']['name'],
            'preparation_method': vars['suffixes']['preparation_method']['name'],
            'SALIN': vars['suffixes']['salinity']['name'],
            'TTEMP': vars['suffixes']['temperature']['name'],
        })
    
    return common

source

get_specific_rules

 get_specific_rules (vars:dict, encoding_type:str)

Get specific renaming rules for NetCDF and OpenRefine.

Type Details
vars dict Configuration dictionary
encoding_type str Encoding type (netcdf or openrefine)
Returns dict Specific renaming rules for NetCDF and OpenRefine.
Exported source
def get_specific_rules(
    vars: dict, # Configuration dictionary
    encoding_type: str # Encoding type (`netcdf` or `openrefine`)
    ) -> dict: # Specific renaming rules for NetCDF and OpenRefine.
    "Get specific renaming rules for NetCDF and OpenRefine."
    if encoding_type == 'netcdf':
        return {
            'biota': {
                'species': vars['bio']['species']['name'],
                'body_part': vars['bio']['body_part']['name'],
                'bio_group': vars['bio']['bio_group']['name']
            },
            'sediment': {
                'sed_type': vars['sed']['sed_type']['name'],
                'top': vars['sed']['top']['name'],
                'bottom': vars['sed']['bottom']['name'],
            }
        }
    elif encoding_type == 'openrefine':
        return {
            'biota': {
                'species': 'species_id',
                'Taxonname': 'Taxonname',
                'TaxonRepName': 'TaxonRepName',
                'Taxonrank': 'Taxonrank',
                'TaxonDB': 'TaxonDB',
                'TaxonDBID': 'TaxonDBID',
                'TaxonDBURL': 'TaxonDBURL',
                'body_part': 'bodypar_id',
                'dry_wet_ratio': 'percentwt',
            },
            'sediment': {
                'sed_type': 'sedtype_id',
                'top': 'sliceup',
                'bottom': 'slicedown',
                'SedRepName': 'SedRepName',
                'dry_wet_ratio': 'percentwt',
            }
        }

source

get_renaming_rules

 get_renaming_rules (encoding_type:str='netcdf')

Get renaming rules for NetCDF and OpenRefine.

Type Default Details
encoding_type str netcdf Encoding type (netcdf or openrefine)
Returns dict Renaming rules for NetCDF and OpenRefine.
Exported source
def get_renaming_rules(
    encoding_type: str = 'netcdf' # Encoding type (`netcdf` or `openrefine`)
    ) -> dict: # Renaming rules for NetCDF and OpenRefine.
    "Get renaming rules for NetCDF and OpenRefine."
    vars = cdl_cfg()['vars']
    
    if encoding_type not in ['netcdf', 'openrefine']:
        raise ValueError("Invalid encoding_type provided. Please use 'netcdf' or 'openrefine'.")
    
    common_rules = get_common_rules(vars, encoding_type)
    specific_rules = get_specific_rules(vars, encoding_type)
    
    rules = defaultdict(dict)
    for sample_type in ['seawater', 'biota', 'sediment']:
        rules[sample_type] = common_rules.copy()
        rules[sample_type].update(specific_rules.get(sample_type, {}))
    
    return dict(rules)

source

SelectAndRenameColumnCB

 SelectAndRenameColumnCB (fn_renaming_rules:Callable,
                          encoding_type:str='netcdf', verbose:bool=False)

Select and rename columns in a DataFrame based on renaming rules for a specified encoding type.

Type Default Details
fn_renaming_rules Callable A function that returns an OrderedDict of renaming rules
encoding_type str netcdf The encoding type (netcdf or openrefine) to determine which renaming rules to use
verbose bool False Whether to print out renaming rules that were not applied
Exported source
class SelectAndRenameColumnCB(Callback):
    "Select and rename columns in a DataFrame based on renaming rules for a specified encoding type."
    def __init__(self, 
                 fn_renaming_rules: Callable, # A function that returns an OrderedDict of renaming rules 
                 encoding_type: str='netcdf', # The encoding type (`netcdf` or `openrefine`) to determine which renaming rules to use
                 verbose: bool=False # Whether to print out renaming rules that were not applied
                 ):
        fc.store_attr()

    def __call__(self, tfm: Transformer):
        "Apply column selection and renaming to DataFrames in the transformer, and identify unused rules."
        try:
            renaming_rules = self.fn_renaming_rules(self.encoding_type)
        except ValueError as e:
            print(f"Error fetching renaming rules: {e}")
            return

        for group in tfm.dfs.keys():
            # Get relevant renaming rules for the current group
            group_rules = self._get_group_rules(renaming_rules, group)

            if not group_rules:
                continue

            # Apply renaming rules and track keys not found in the DataFrame
            df = tfm.dfs[group]
            df, not_found_keys = self._apply_renaming(df, group_rules)
            tfm.dfs[group] = df
            
            # Print any renaming rules that were not used
            if not_found_keys and self.verbose:
                print(f"\nGroup '{group}' has the following renaming rules not applied:")
                for old_col in not_found_keys:
                    print(f"Key '{old_col}' from renaming rules was not found in the DataFrame.")

    def _get_group_rules(self, 
                         renaming_rules: OrderedDict, # Renaming rules
                         group: str # Group name to filter rules
                         ) -> OrderedDict: # Renaming rules applicable to the specified group
        "Retrieve and merge renaming rules for the specified group based on the encoding type."
        relevant_rules = [rules for key, rules in renaming_rules.items() if group in key]
        merged_rules = OrderedDict()
        for rules in relevant_rules:
            merged_rules.update(rules)
        return merged_rules

    def _apply_renaming(self, 
                        df: pd.DataFrame, # DataFrame to modify
                        rename_rules: OrderedDict # Renaming rules
                        ) -> tuple: # (Renamed and filtered df, Column names from renaming rules that were not found in the DataFrame)
        """
        Select columns based on renaming rules and apply renaming, only for existing columns
        while maintaining the order of the dictionary columns."""
        existing_columns = set(df.columns)
        valid_rules = OrderedDict((old_col, new_col) for old_col, new_col in rename_rules.items() if old_col in existing_columns)

        # Create a list to maintain the order of columns
        columns_to_keep = [col for col in rename_rules.keys() if col in existing_columns]
        columns_to_keep += [new_col for old_col, new_col in valid_rules.items() if new_col in df.columns]

        df = df[list(OrderedDict.fromkeys(columns_to_keep))]

        # Apply renaming
        df.rename(columns=valid_rules, inplace=True)

        # Determine which keys were not found
        not_found_keys = set(rename_rules.keys()) - existing_columns
        return df, not_found_keys
dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[AddSampleTypeIdColumnCB(),
                            LowerStripNameCB(col_src='NUCLIDE'),
                            RemapNuclideNameCB(lut_nuclides),
                            AddNuclideIdColumnCB(col_value='NUCLIDE'),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),
                            SanitizeValue(coi_val),       
                            NormalizeUncCB(),
                             RemapCB(fn_lut=lut_biota, col_remap='species', col_src='RUBIN', dest_grps='biota'),
                            RemapCB(lut_tissues, 'body_part', 'TISSUE', 'biota'),
                            RemapCB(lut_biogroup, 'bio_group', 'species', 'biota'),
                            RemapTaxonInformationCB(lut_taxon),
                            RemapSedimentCB(lut_sediments),
                            RemapUnitCB(),
                            RemapDetectionLimitCB(coi_dl, lut_dl),
                            RemapFiltCB(lut_filtered),
                            AddSampleLabCodeCB(),
                            AddMeasurementNoteCB(lut_method),
                            RemapStationIdCB(),
                            RemapSedSliceTopBottomCB(),
                            LookupDryWetRatio(),
                            ParseCoordinates(ddmm_to_dd),
                            SanitizeLonLatCB(),
                            CompareDfsAndTfmCB(dfs),
                            SelectAndRenameColumnCB(get_renaming_rules, encoding_type='netcdf'),
                            ])

tfm()
for grp in tfm.dfs.keys():
    print(f'{grp} columns:')
    print(tfm.dfs[grp].columns)
seawater columns:
Index(['key', 'lat', 'lon', 'time', 'nuclide', '_dl', '_unit', 'value', '_unc',
       'smp_depth', 'tot_depth', '_sal', '_temp'],
      dtype='object')
sediment columns:
Index(['key', 'lat', 'lon', 'time', 'nuclide', '_dl', '_unit', 'value', '_unc',
       'tot_depth', 'sed_type', 'top', 'bottom'],
      dtype='object')
biota columns:
Index(['key', 'lat', 'lon', 'time', 'nuclide', '_dl', '_unit', 'value', '_unc',
       'smp_depth', 'species', 'body_part', 'bio_group'],
      dtype='object')
result = tfm.dfs['sediment']; result.head()
key lat lon time nuclide _dl _unit value _unc tot_depth sed_type top bottom
0 SKRIL2012048 59.6667 24.0000 1339891200 ra226 1 4 35.0 9.10 71.0 0 15.0 20.0
1 SKRIL2012049 59.6667 24.0000 1339891200 ra226 1 4 36.0 7.92 71.0 0 20.0 27.0
2 SKRIL2012050 59.8600 28.8433 1344556800 ra226 1 4 38.0 9.12 23.0 0 0.0 2.0
3 SKRIL2012051 59.8600 28.8433 1344556800 ra226 1 4 36.0 9.00 23.0 0 2.0 4.0
4 SKRIL2012052 59.8600 28.8433 1344556800 ra226 1 4 30.0 6.90 23.0 0 4.0 6.0

Reshape: long to wide

Convert data from long to wide and rename columns to comply with NetCDF format.

dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[AddSampleTypeIdColumnCB(),
                            LowerStripNameCB(col_src='NUCLIDE'),
                            RemapNuclideNameCB(lut_nuclides),
                            AddNuclideIdColumnCB(col_value='NUCLIDE'),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),
                            SanitizeValue(coi_val),       
                            NormalizeUncCB(),
                            RemapCB(fn_lut=lut_biota, col_remap='species', col_src='RUBIN', dest_grps='biota'),
                            RemapCB(lut_tissues, 'body_part', 'TISSUE', 'biota'),
                            RemapCB(lut_biogroup, 'bio_group', 'species', 'biota'),
                            RemapTaxonInformationCB(lut_taxon),
                            RemapSedimentCB(lut_sediments),
                            RemapUnitCB(),
                            RemapDetectionLimitCB(coi_dl, lut_dl),
                            RemapFiltCB(lut_filtered),
                            AddSampleLabCodeCB(),
                            AddMeasurementNoteCB(lut_method),
                            RemapStationIdCB(),
                            RemapSedSliceTopBottomCB(),
                            LookupDryWetRatio(),
                            ParseCoordinates(ddmm_to_dd),
                            SanitizeLonLatCB(),
                            SelectAndRenameColumnCB(get_renaming_rules, encoding_type='netcdf'),
                            ReshapeLongToWide()
                            ])

tfm()
for grp in tfm.dfs.keys():
    print(f'{grp} columns:')
    print(tfm.dfs[grp].columns)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In [98], line 30
      2 dfs = load_data(fname_in)
      3 tfm = Transformer(dfs, cbs=[AddSampleTypeIdColumnCB(),
      4                             LowerStripNameCB(col_src='NUCLIDE'),
      5                             RemapNuclideNameCB(lut_nuclides),
   (...)
     27                             ReshapeLongToWide()
     28                             ])
---> 30 tfm()
     31 for grp in tfm.dfs.keys():
     32     print(f'{grp} columns:')

File ~/pro/IAEA/MARIS/marisco/marisco/callbacks.py:70, in Transformer.__call__(self)
     68 def __call__(self):
     69     "Transform the dataframe(s) according to the specified callbacks."
---> 70     if self.cbs: run_cbs(self.cbs, self)
     71     return self.df if self.dfs is None else self.dfs

File ~/pro/IAEA/MARIS/marisco/marisco/callbacks.py:38, in run_cbs(cbs, obj)
     36 for cb in sorted(cbs, key=attrgetter('order')):
     37     if cb.__doc__: obj.logs.append(cb.__doc__)
---> 38     cb(obj)

File ~/pro/IAEA/MARIS/marisco/marisco/callbacks.py:266, in ReshapeLongToWide.__call__(self, tfm)
    264 def __call__(self, tfm):
    265     for grp in tfm.dfs.keys():
--> 266         tfm.dfs[grp] = self.pivot(tfm.dfs[grp])
    267         tfm.dfs[grp].columns = self.renamed_cols(tfm.dfs[grp].columns)

File ~/pro/IAEA/MARIS/marisco/marisco/callbacks.py:247, in ReshapeLongToWide.pivot(self, df)
    243 idx = list(set(df.columns) - set(self.columns + derived_coi + self.values))
    245 df, num_fill_value = self._fill_nan_values(df, idx)
--> 247 pivot_df = df.pivot_table(index=idx,
    248                           columns=self.columns,
    249                           values=self.values + derived_coi,
    250                           fill_value=np.nan,
    251                           aggfunc=lambda x: x).reset_index()
    253 pivot_df[idx] = pivot_df[idx].replace({self.str_fill_value: np.nan, num_fill_value: np.nan})
    254 pivot_df = self.set_index(pivot_df)

File ~/mambaforge/envs/marisco/lib/python3.10/site-packages/pandas/core/frame.py:9482, in DataFrame.pivot_table(self, values, index, columns, aggfunc, fill_value, margins, dropna, margins_name, observed, sort)
   9465 @Substitution("")
   9466 @Appender(_shared_docs["pivot_table"])
   9467 def pivot_table(
   (...)
   9478     sort: bool = True,
   9479 ) -> DataFrame:
   9480     from pandas.core.reshape.pivot import pivot_table
-> 9482     return pivot_table(
   9483         self,
   9484         values=values,
   9485         index=index,
   9486         columns=columns,
   9487         aggfunc=aggfunc,
   9488         fill_value=fill_value,
   9489         margins=margins,
   9490         dropna=dropna,
   9491         margins_name=margins_name,
   9492         observed=observed,
   9493         sort=sort,
   9494     )

File ~/mambaforge/envs/marisco/lib/python3.10/site-packages/pandas/core/reshape/pivot.py:102, in pivot_table(data, values, index, columns, aggfunc, fill_value, margins, dropna, margins_name, observed, sort)
     99     table = concat(pieces, keys=keys, axis=1)
    100     return table.__finalize__(data, method="pivot_table")
--> 102 table = __internal_pivot_table(
    103     data,
    104     values,
    105     index,
    106     columns,
    107     aggfunc,
    108     fill_value,
    109     margins,
    110     dropna,
    111     margins_name,
    112     observed,
    113     sort,
    114 )
    115 return table.__finalize__(data, method="pivot_table")

File ~/mambaforge/envs/marisco/lib/python3.10/site-packages/pandas/core/reshape/pivot.py:183, in __internal_pivot_table(data, values, index, columns, aggfunc, fill_value, margins, dropna, margins_name, observed, sort)
    173 if observed is lib.no_default and any(
    174     ping._passed_categorical for ping in grouped._grouper.groupings
    175 ):
    176     warnings.warn(
    177         "The default value of observed=False is deprecated and will change "
    178         "to observed=True in a future version of pandas. Specify "
   (...)
    181         stacklevel=find_stack_level(),
    182     )
--> 183 agged = grouped.agg(aggfunc)
    185 if dropna and isinstance(agged, ABCDataFrame) and len(agged.columns):
    186     agged = agged.dropna(how="all")

File ~/mambaforge/envs/marisco/lib/python3.10/site-packages/pandas/core/groupby/generic.py:1466, in DataFrameGroupBy.aggregate(self, func, engine, engine_kwargs, *args, **kwargs)
   1463 # grouper specific aggregations
   1464 if self._grouper.nkeys > 1:
   1465     # test_groupby_as_index_series_scalar gets here with 'not self.as_index'
-> 1466     return self._python_agg_general(func, *args, **kwargs)
   1467 elif args or kwargs:
   1468     # test_pass_args_kwargs gets here (with and without as_index)
   1469     # can't return early
   1470     result = self._aggregate_frame(func, *args, **kwargs)

File ~/mambaforge/envs/marisco/lib/python3.10/site-packages/pandas/core/groupby/generic.py:1532, in DataFrameGroupBy._python_agg_general(self, func, *args, **kwargs)
   1530 output: dict[int, ArrayLike] = {}
   1531 for idx, (name, ser) in enumerate(obj.items()):
-> 1532     result = self._grouper.agg_series(ser, f)
   1533     output[idx] = result
   1535 res = self.obj._constructor(output)

File ~/mambaforge/envs/marisco/lib/python3.10/site-packages/pandas/core/groupby/ops.py:863, in BaseGrouper.agg_series(self, obj, func, preserve_dtype)
    856 if not isinstance(obj._values, np.ndarray):
    857     # we can preserve a little bit more aggressively with EA dtype
    858     #  because maybe_cast_pointwise_result will do a try/except
    859     #  with _from_sequence.  NB we are assuming here that _from_sequence
    860     #  is sufficiently strict that it casts appropriately.
    861     preserve_dtype = True
--> 863 result = self._aggregate_series_pure_python(obj, func)
    865 npvalues = lib.maybe_convert_objects(result, try_float=False)
    866 if preserve_dtype:

File ~/mambaforge/envs/marisco/lib/python3.10/site-packages/pandas/core/groupby/ops.py:889, in BaseGrouper._aggregate_series_pure_python(self, obj, func)
    885 res = extract_result(res)
    887 if not initialized:
    888     # We only do this validation on the first iteration
--> 889     check_result_array(res, group.dtype)
    890     initialized = True
    892 result[i] = res

File ~/mambaforge/envs/marisco/lib/python3.10/site-packages/pandas/core/groupby/ops.py:88, in check_result_array(obj, dtype)
     84 if isinstance(obj, np.ndarray):
     85     if dtype != object:
     86         # If it is object dtype, the function can be a reduction/aggregation
     87         #  and still return an ndarray e.g. test_agg_over_numpy_arrays
---> 88         raise ValueError("Must produce aggregated value")

ValueError: Must produce aggregated value

NetCDF encoder

Example change logs

dfs = load_data(fname_in)

tfm = Transformer(dfs, cbs=[AddSampleTypeIdColumnCB(),
                            LowerStripNameCB(col_src='NUCLIDE'),
                            RemapNuclideNameCB(lut_nuclides),
                            AddNuclideIdColumnCB(col_value='NUCLIDE'),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),
                            SanitizeValue(coi_val),       
                            NormalizeUncCB(),
                            RemapCB(fn_lut=lut_biota, col_remap='species', col_src='RUBIN', dest_grps='biota'),
                            RemapCB(lut_tissues, 'body_part', 'TISSUE', 'biota'),
                            RemapCB(lut_biogroup, 'bio_group', 'species', 'biota'),
                            RemapTaxonInformationCB(lut_taxon),
                            RemapSedimentCB(lut_sediments),
                            RemapUnitCB(),
                            RemapDetectionLimitCB(coi_dl, lut_dl),
                            RemapFiltCB(lut_filtered),
                            AddSampleLabCodeCB(),
                            AddMeasurementNoteCB(lut_method),
                            RemapStationIdCB(),
                            RemapSedSliceTopBottomCB(),
                            LookupDryWetRatio(),
                            ParseCoordinates(ddmm_to_dd),
                            SanitizeLonLatCB(),
                            SelectAndRenameColumnCB(get_renaming_rules, encoding_type='netcdf'),
                            ReshapeLongToWide()
                            ])

tfm()
tfm.logs
["Convert values from 'NUCLIDE' to lowercase, strip spaces, and store in 'None'.",
 'Parse and standardize time information in the dataframe.',
 'Encode time as `int` representing seconds since xxx',
 'Sanitize value/measurement by removing blank entries and populating `value` column.',
 'Convert from relative error % to uncertainty of activity unit.',
 "Remap values from 'RUBIN' to 'species' for groups: b, i, o, t, a.",
 "Remap values from 'TISSUE' to 'body_part' for groups: b, i, o, t, a.",
 "Remap values from 'species' to 'bio_group' for groups: b, i, o, t, a.",
 'Update taxon information based on MARIS species LUT.',
 'Update sediment id based on MARIS species LUT (dbo_sedtype.xlsx).',
 'Set the `unit` id column in the DataFrames based on a lookup table.',
 'Remap value type to MARIS format.',
 'Lookup FILT value in dataframe using the lookup table.',
 'Remap `KEY` column to `samplabcode` in each DataFrame.',
 "Record measurement notes by adding a 'measurenote' column to DataFrames.",
 'Remap Station ID to MARIS format.',
 'Remap Sediment slice top and bottom to MARIS format.',
 'Lookup dry-wet ratio and format for MARIS.',
 '\n    Get geographical coordinates from columns expressed in degrees decimal format \n    or from columns in degrees/minutes decimal format where degrees decimal format is missing.\n    ',
 'Drop row when both longitude & latitude equal 0. Drop unrealistic longitude & latitude values. Convert longitude & latitude `,` separator to `.` separator.',
 'Select and rename columns in a DataFrame based on renaming rules for a specified encoding type.']

Feed global attributes


source

get_attrs

 get_attrs (tfm:marisco.callbacks.Transformer, zotero_key:str,
            kw:list=['oceanography', 'Earth Science > Oceans > Ocean
            Chemistry> Radionuclides', 'Earth Science > Human Dimensions >
            Environmental Impacts > Nuclear Radiation Exposure', 'Earth
            Science > Oceans > Ocean Chemistry > Ocean Tracers, Earth
            Science > Oceans > Marine Sediments', 'Earth Science > Oceans
            > Ocean Chemistry, Earth Science > Oceans > Sea Ice >
            Isotopes', 'Earth Science > Oceans > Water Quality > Ocean
            Contaminants', 'Earth Science > Biological Classification >
            Animals/Vertebrates > Fish', 'Earth Science > Biosphere >
            Ecosystems > Marine Ecosystems', 'Earth Science > Biological
            Classification > Animals/Invertebrates > Mollusks', 'Earth
            Science > Biological Classification > Animals/Invertebrates >
            Arthropods > Crustaceans', 'Earth Science > Biological
            Classification > Plants > Macroalgae (Seaweeds)'])

Retrieve all global attributes.

Type Default Details
tfm Transformer Transformer object
zotero_key str Zotero dataset record key
kw list [‘oceanography’, ‘Earth Science > Oceans > Ocean Chemistry> Radionuclides’, ‘Earth Science > Human Dimensions > Environmental Impacts > Nuclear Radiation Exposure’, ‘Earth Science > Oceans > Ocean Chemistry > Ocean Tracers, Earth Science > Oceans > Marine Sediments’, ‘Earth Science > Oceans > Ocean Chemistry, Earth Science > Oceans > Sea Ice > Isotopes’, ‘Earth Science > Oceans > Water Quality > Ocean Contaminants’, ‘Earth Science > Biological Classification > Animals/Vertebrates > Fish’, ‘Earth Science > Biosphere > Ecosystems > Marine Ecosystems’, ‘Earth Science > Biological Classification > Animals/Invertebrates > Mollusks’, ‘Earth Science > Biological Classification > Animals/Invertebrates > Arthropods > Crustaceans’, ‘Earth Science > Biological Classification > Plants > Macroalgae (Seaweeds)’] List of keywords
Returns dict Global attributes
Exported source
def get_attrs(
    tfm: Transformer, # Transformer object
    zotero_key: str, # Zotero dataset record key
    kw: list = kw # List of keywords
    ) -> dict: # Global attributes
    "Retrieve all global attributes."
    return GlobAttrsFeeder(tfm.dfs, cbs=[
        BboxCB(),
        DepthRangeCB(),
        TimeRangeCB(cfg()),
        ZoteroCB(zotero_key, cfg=cfg()),
        KeyValuePairCB('keywords', ', '.join(kw)),
        KeyValuePairCB('publisher_postprocess_logs', ', '.join(tfm.logs))
        ])()
get_attrs(tfm, zotero_key=zotero_key, kw=kw)
{'geospatial_lat_min': '31.17',
 'geospatial_lat_max': '65.75',
 'geospatial_lon_min': '9.6333',
 'geospatial_lon_max': '53.5',
 'geospatial_bounds': 'POLYGON ((9.6333 53.5, 31.17 53.5, 31.17 65.75, 9.6333 65.75, 9.6333 53.5))',
 'time_coverage_start': '1984-01-10T00:00:00',
 'time_coverage_end': '2021-12-15T00:00:00',
 'title': 'Environmental database - Helsinki Commission Monitoring of Radioactive Substances',
 'summary': 'MORS Environment database has been used to collate data resulting from monitoring of environmental radioactivity in the Baltic Sea based on HELCOM Recommendation 26/3.\n\nThe database is structured according to HELCOM Guidelines on Monitoring of Radioactive Substances (https://www.helcom.fi/wp-content/uploads/2019/08/Guidelines-for-Monitoring-of-Radioactive-Substances.pdf), which specifies reporting format, database structure, data types and obligatory parameters used for reporting data under Recommendation 26/3.\n\nThe database is updated and quality assured annually by HELCOM MORS EG.',
 'creator_name': '[{"creatorType": "author", "name": "HELCOM MORS"}]',
 'keywords': 'oceanography, Earth Science > Oceans > Ocean Chemistry> Radionuclides, Earth Science > Human Dimensions > Environmental Impacts > Nuclear Radiation Exposure, Earth Science > Oceans > Ocean Chemistry > Ocean Tracers, Earth Science > Oceans > Marine Sediments, Earth Science > Oceans > Ocean Chemistry, Earth Science > Oceans > Sea Ice > Isotopes, Earth Science > Oceans > Water Quality > Ocean Contaminants, Earth Science > Biological Classification > Animals/Vertebrates > Fish, Earth Science > Biosphere > Ecosystems > Marine Ecosystems, Earth Science > Biological Classification > Animals/Invertebrates > Mollusks, Earth Science > Biological Classification > Animals/Invertebrates > Arthropods > Crustaceans, Earth Science > Biological Classification > Plants > Macroalgae (Seaweeds)',
 'publisher_postprocess_logs': "Convert values from 'NUCLIDE' to lowercase, strip spaces, and store in 'None'., Parse and standardize time information in the dataframe., Encode time as `int` representing seconds since xxx, Sanitize value/measurement by removing blank entries and populating `value` column., Convert from relative error % to uncertainty of activity unit., Remap values from 'RUBIN' to 'species' for groups: b, i, o, t, a., Remap values from 'TISSUE' to 'body_part' for groups: b, i, o, t, a., Remap values from 'species' to 'bio_group' for groups: b, i, o, t, a., Update taxon information based on MARIS species LUT., Update sediment id based on MARIS species LUT (dbo_sedtype.xlsx)., Set the `unit` id column in the DataFrames based on a lookup table., Remap value type to MARIS format., Lookup FILT value in dataframe using the lookup table., Remap `KEY` column to `samplabcode` in each DataFrame., Record measurement notes by adding a 'measurenote' column to DataFrames., Remap Station ID to MARIS format., Remap Sediment slice top and bottom to MARIS format., Lookup dry-wet ratio and format for MARIS., \n    Get geographical coordinates from columns expressed in degrees decimal format \n    or from columns in degrees/minutes decimal format where degrees decimal format is missing.\n    , Drop row when both longitude & latitude equal 0. Drop unrealistic longitude & latitude values. Convert longitude & latitude `,` separator to `.` separator., Select and rename columns in a DataFrame based on renaming rules for a specified encoding type."}

source

enums_xtra

 enums_xtra (tfm:marisco.callbacks.Transformer, vars:list)

Retrieve a subset of the lengthy enum as species_t for instance.

Type Details
tfm Transformer Transformer object
vars list List of variables to extract from the transformer
Exported source
def enums_xtra(
    tfm: Transformer, # Transformer object
    vars: list # List of variables to extract from the transformer
    ):
    "Retrieve a subset of the lengthy enum as `species_t` for instance."
    enums = Enums(lut_src_dir=lut_path(), cdl_enums=cdl_cfg()['enums'])
    xtras = {}
    for var in vars:
        unique_vals = tfm.unique(var)
        if unique_vals.any():
            xtras[f'{var}_t'] = enums.filter(f'{var}_t', unique_vals)
    return xtras

Encoding NetCDF


source

encode

 encode (fname_in:str, fname_out_nc:str, nc_tpl_path:str, **kwargs)

Encode data to NetCDF.

Type Details
fname_in str Input file name
fname_out_nc str Output file name
nc_tpl_path str NetCDF template file name
kwargs
Returns None Additional arguments
Exported source
def encode(
    fname_in: str, # Input file name
    fname_out_nc: str, # Output file name
    nc_tpl_path: str, # NetCDF template file name
    **kwargs # Additional arguments
    ) -> None:
    "Encode data to NetCDF."
    dfs = load_data(fname_in)
    tfm = Transformer(dfs, cbs=[AddSampleTypeIdColumnCB(),
                            LowerStripNameCB(col_src='NUCLIDE'),
                            RemapNuclideNameCB(lut_nuclides),
                            AddNuclideIdColumnCB(col_value='NUCLIDE'),
                            ParseTimeCB(),
                            EncodeTimeCB(cfg()),
                            SanitizeValue(coi_val),       
                            NormalizeUncCB(),
                            RemapCB(fn_lut=lut_biota, col_remap='species', col_src='RUBIN', dest_grps='biota'),
                            RemapCB(lut_tissues, 'body_part', 'TISSUE', 'biota'),
                            RemapCB(lut_biogroup, 'bio_group', 'species', 'biota'),
                            RemapTaxonInformationCB(lut_taxon),
                            RemapSedimentCB(lut_sediments),
                            RemapUnitCB(),
                            RemapDetectionLimitCB(coi_dl, lut_dl),
                            RemapFiltCB(lut_filtered),
                            AddSampleLabCodeCB(),
                            AddMeasurementNoteCB(lut_method),
                            RemapStationIdCB(),
                            RemapSedSliceTopBottomCB(),
                            LookupDryWetRatio(),
                            ParseCoordinates(ddmm_to_dd),
                            SanitizeLonLatCB(),
                            SelectAndRenameColumnCB(get_renaming_rules, encoding_type='netcdf'),
                            ReshapeLongToWide()
                            ])
    tfm()
    encoder = NetCDFEncoder(tfm.dfs, 
                            src_fname=nc_tpl_path,
                            dest_fname=fname_out_nc, 
                            global_attrs=get_attrs(tfm, zotero_key=zotero_key, kw=kw),
                            verbose=kwargs.get('verbose', False),
                            enums_xtra=enums_xtra(tfm, vars=['species', 'body_part'])
                           )
    encoder.encode()
encode(fname_in, fname_out_nc, nc_tpl_path(), verbose=False)

Open Refine Pipeline (WIP)

Rename columns for Open Refine

dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[
    AddSampleTypeIdColumnCB(),
    LowerStripNameCB(col_src='NUCLIDE'),
    RemapNuclideNameCB(lut_nuclides),
    AddNuclideIdColumnCB(col_value='NUCLIDE'),
    ParseTimeCB(),
    EncodeTimeCB(cfg()),        
    SanitizeValue(coi_val),                       
    NormalizeUncCB(),
    RemapCB(fn_lut=lut_biota, col_remap='species', col_src='RUBIN', dest_grps='biota'),
    RemapCB(lut_tissues, 'body_part', 'TISSUE', 'biota'),
    RemapCB(lut_biogroup, 'bio_group', 'species', 'biota'),
    RemapTaxonInformationCB(lut_taxon),
    RemapSedimentCB(lut_sediments),
    RemapUnitCB(),
    RemapDetectionLimitCB(coi_dl, lut_dl),
    RemapFiltCB(lut_filtered),
    AddSampleLabCodeCB(),
    AddMeasurementNoteCB(lut_method),
    RemapStationIdCB(),
    RemapSedSliceTopBottomCB(),
    LookupDryWetRatio(),
    ParseCoordinates(ddmm_to_dd),
    SanitizeLonLatCB(),
    SelectAndRenameColumnCB(get_renaming_rules, encoding_type='openrefine', verbose=True),
    CompareDfsAndTfmCB(dfs)
    ])

tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')

Group 'seawater' has the following renaming rules not applied:
Key 'measurenote' from renaming rules was not found in the DataFrame.

Group 'sediment' has the following renaming rules not applied:
Key 'SDEPTH' from renaming rules was not found in the DataFrame.
Key 'measurenote' from renaming rules was not found in the DataFrame.
Key 'TTEMP' from renaming rules was not found in the DataFrame.
Key 'FILT' from renaming rules was not found in the DataFrame.
Key 'SALIN' from renaming rules was not found in the DataFrame.

Group 'biota' has the following renaming rules not applied:
Key 'TDEPTH' from renaming rules was not found in the DataFrame.
Key 'measurenote' from renaming rules was not found in the DataFrame.
Key 'TTEMP' from renaming rules was not found in the DataFrame.
Key 'FILT' from renaming rules was not found in the DataFrame.
Key 'SALIN' from renaming rules was not found in the DataFrame.
                                                    seawater  sediment  biota
Number of rows in dfs                                  21216     39817  15827
Number of rows in tfm.dfs                              21114     39531  15798
Number of dropped rows                                   102       286     29
Number of rows in tfm.dfs + Number of dropped rows     21216     39817  15827 

Example of data included in dfs_dropped.

Main reasons for data to be dropped from dfs: - No activity value reported (e.g. VALUE_Bq/kg) - No time value reported.

grp='sediment'
#grp='seawater'
#grp='biota'

tfm.dfs_dropped[grp]
KEY NUCLIDE METHOD < VALUE_Bq/kg VALUE_Bq/kg ERROR%_kg < VALUE_Bq/m² VALUE_Bq/m² ERROR%_m² DATE_OF_ENTRY_x ... LOWSLI AREA SEDI OXIC DW% LOI% MORS_SUBBASIN HELCOM_SUBBASIN SUM_LINK DATE_OF_ENTRY_y
11784 SLREB1998021 SR90 2 NaN NaN NaN NaN NaN NaN NaN ... 12.0 0.02100 55.0 O NaN NaN 14.0 14.0 a NaN
11824 SLVDC1997023 CS137 1 NaN NaN NaN NaN NaN NaN NaN ... 14.0 0.02100 55.0 O NaN NaN 9.0 9.0 a NaN
11832 SLVDC1997031 CS137 1 NaN NaN NaN NaN NaN NaN NaN ... 14.0 0.02100 55.0 O NaN NaN 9.0 9.0 a NaN
11841 SLVDC1997040 CS137 1 NaN NaN NaN NaN NaN NaN NaN ... 16.0 0.02100 55.0 O NaN NaN 9.0 9.0 a NaN
11849 SLVDC1998011 CS137 1 NaN NaN NaN NaN NaN NaN NaN ... 16.0 0.02100 55.0 O NaN NaN 14.0 14.0 a NaN
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
39769 SSSSM2021030 CO60 SSSM43 < NaN NaN < NaN NaN 09/06/22 00:00:00 ... 2.0 0.01608 NaN NaN 28.200000 15.0 12.0 12.0 NaN 09/06/22 00:00:00
39774 SSSSM2021030 RA226 SSSM43 < NaN NaN < NaN NaN 09/06/22 00:00:00 ... 2.0 0.01608 NaN NaN 28.200000 15.0 12.0 12.0 NaN 09/06/22 00:00:00
39775 SSSSM2021030 RA223 SSSM43 < NaN NaN < NaN NaN 09/06/22 00:00:00 ... 2.0 0.01608 NaN NaN 28.200000 15.0 12.0 12.0 NaN 09/06/22 00:00:00
39777 SSSSM2021031 CS137 SSSM43 < NaN NaN < 0.0 NaN 09/06/22 00:00:00 ... 2.0 0.01608 NaN NaN 31.993243 NaN 13.0 13.0 NaN 09/06/22 00:00:00
39779 SSSSM2021031 CO60 SSSM43 < NaN NaN < NaN NaN 09/06/22 00:00:00 ... 2.0 0.01608 NaN NaN 31.993243 NaN 13.0 13.0 NaN 09/06/22 00:00:00

286 rows × 35 columns

Open Refine encoder (WIP)

def encode_or(
    fname_in: str, # Input file name
    fname_out_csv: str, # Output file name
    ref_id: str, # Reference ID as defined in MARIS master DB
    **kwargs # Additional arguments
    ) -> None:
    "Encode data to Open Refine CSV."
    dfs = load_data(fname_in)
    tfm = Transformer(dfs, cbs=[
        AddSampleTypeIdColumnCB(),
        LowerStripNameCB(col_src='NUCLIDE'),
        RemapNuclideNameCB(lut_nuclides),
        AddNuclideIdColumnCB(col_value='NUCLIDE'),
        ParseTimeCB(),
        EncodeTimeCB(cfg()),        
        SanitizeValue(coi_val),                       
        NormalizeUncCB(),
        RemapCB(fn_lut=lut_biota, col_remap='species', col_src='RUBIN', dest_grps='biota'),
        RemapCB(lut_tissues, 'body_part', 'TISSUE', 'biota'),
        RemapCB(lut_biogroup, 'bio_group', 'species', 'biota'),
        RemapTaxonInformationCB(lut_taxon),
        RemapSedimentCB(lut_sediments),
        RemapUnitCB(),
        RemapDetectionLimitCB(coi_dl, lut_dl),
        RemapFiltCB(lut_filtered),
        AddSampleLabCodeCB(),
        AddMeasurementNoteCB(lut_method),
        RemapStationIdCB(),
        RemapSedSliceTopBottomCB(),
        LookupDryWetRatio(),
        ParseCoordinates(ddmm_to_dd),
        SanitizeLonLatCB(),
        SelectAndRenameColumnCB(get_renaming_rules, encoding_type='openrefine', verbose=True),
        CompareDfsAndTfmCB(dfs)
        ])
    
    tfm()

    encoder = OpenRefineCsvEncoder(tfm.dfs, 
                                    dest_fname=fname_out_csv, 
                                    ref_id = ref_id,
                                    verbose = True
                                )
    encoder.encode()
encode_or(fname_in, fname_out_csv, ref_id, verbose=True)

Open Refine Variables not included in Helcom

Field name Full name HELCOM
sampquality Sample quality N
lab_id Laboratory ID N
profile_id Profile ID N
transect_id Transect ID N
endperiod End period N
vartype Variable type N
freq Frequency N
rl_detection Range low detection N
rangelow Range low N
rangeupp Range upper N
Commonname Common name N
volume Volume N
filtpore Filter pore N
acid Acidified N
oxygen Oxygen N
samparea Sample area N
drywt Dry weight N
wetwt Wet weight N
sampmet_id Sampling method ID N
drymet_id Drying method ID N
prepmet_id Preparation method ID N
counmet_id Counting method ID N
refnote Reference note N
sampnote Sample note N
gfe Good for export ?

TODO:

  • Should we use a single encoder for both NetCDF and OpenRefine? If so, should we have a single encode function that accepts a variable ‘encoding_type’.

TODO: Include FILT for NetCDF

TODO: Check sediment ‘DW%’ data that is less than 1%. Is this realistic? Check the ‘DW%’ data that is 0%. Run below before SelectAndRenameColumnCB.

dfs = load_data(fname_in)
tfm = Transformer(dfs, cbs=[LowerStripRdnNameCB(col_src='NUCLIDE'),
                            ])
tfm()
{'seawater':                 KEY NUCLIDE METHOD < VALUE_Bq/m³  VALUE_Bq/m³  ERROR%_m³  \
 0      WKRIL2012003   cs137    NaN           NaN          5.3  32.000000   
 1      WKRIL2012004   cs137    NaN           NaN         19.9  20.000000   
 2      WKRIL2012005   cs137    NaN           NaN         25.5  20.000000   
 3      WKRIL2012006   cs137    NaN           NaN         17.0  29.000000   
 4      WKRIL2012007   cs137    NaN           NaN         22.2  18.000000   
 ...             ...     ...    ...           ...          ...        ...   
 21211  WSSSM2021005      h3  SSM45           NaN       1030.0  93.203883   
 21212  WSSSM2021006      h3  SSM45           NaN       2240.0  43.303571   
 21213  WSSSM2021007      h3  SSM45           NaN       2060.0  47.087379   
 21214  WSSSM2021008      h3  SSM45           NaN       2300.0  43.478261   
 21215  WSSSM2021004      h3  SSM45             <          NaN        NaN   
 
          DATE_OF_ENTRY_x  COUNTRY LABORATORY   SEQUENCE  ...  \
 0      08/20/14 00:00:00     90.0       KRIL  2012003.0  ...   
 1      08/20/14 00:00:00     90.0       KRIL  2012004.0  ...   
 2      08/20/14 00:00:00     90.0       KRIL  2012005.0  ...   
 3      08/20/14 00:00:00     90.0       KRIL  2012006.0  ...   
 4      08/20/14 00:00:00     90.0       KRIL  2012007.0  ...   
 ...                  ...      ...        ...        ...  ...   
 21211  09/06/22 00:00:00     77.0       SSSM   202105.0  ...   
 21212  09/06/22 00:00:00     77.0       SSSM   202106.0  ...   
 21213  09/06/22 00:00:00     77.0       SSSM   202107.0  ...   
 21214  09/06/22 00:00:00     77.0       SSSM   202108.0  ...   
 21215  09/06/22 00:00:00     77.0       SSSM   202104.0  ...   
 
       LONGITUDE (ddmmmm)  LONGITUDE (dddddd)  TDEPTH  SDEPTH SALIN  TTEMP  \
 0                29.2000             29.3333     NaN     0.0   NaN    NaN   
 1                29.2000             29.3333     NaN    29.0   NaN    NaN   
 2                23.0900             23.1500     NaN     0.0   NaN    NaN   
 3                27.5900             27.9833     NaN     0.0   NaN    NaN   
 4                27.5900             27.9833     NaN    39.0   NaN    NaN   
 ...                  ...                 ...     ...     ...   ...    ...   
 21211            18.2143             18.3572     NaN     1.0   NaN    NaN   
 21212            17.0000             17.0000     NaN     1.0   NaN    NaN   
 21213            11.5671             11.9452     NaN     1.0   NaN    NaN   
 21214            11.5671             11.9452     NaN     1.0   NaN    NaN   
 21215            11.1470             11.2450     NaN     1.0   NaN    NaN   
 
        FILT  MORS_SUBBASIN  HELCOM_SUBBASIN    DATE_OF_ENTRY_y  
 0       NaN           11.0             11.0  08/20/14 00:00:00  
 1       NaN           11.0             11.0  08/20/14 00:00:00  
 2       NaN           11.0              3.0  08/20/14 00:00:00  
 3       NaN           11.0             11.0  08/20/14 00:00:00  
 4       NaN           11.0             11.0  08/20/14 00:00:00  
 ...     ...            ...              ...                ...  
 21211     N            1.0              8.0  09/06/22 00:00:00  
 21212     N           10.0             10.0  09/06/22 00:00:00  
 21213     N           12.0             12.0  09/06/22 00:00:00  
 21214     N           12.0             12.0  09/06/22 00:00:00  
 21215     N           15.0             18.0  09/06/22 00:00:00  
 
 [21216 rows x 27 columns],
 'sediment':                 KEY NUCLIDE  METHOD < VALUE_Bq/kg  VALUE_Bq/kg  ERROR%_kg  \
 0      SKRIL2012048   ra226     NaN           NaN         35.0      26.00   
 1      SKRIL2012049   ra226     NaN           NaN         36.0      22.00   
 2      SKRIL2012050   ra226     NaN           NaN         38.0      24.00   
 3      SKRIL2012051   ra226     NaN           NaN         36.0      25.00   
 4      SKRIL2012052   ra226     NaN           NaN         30.0      23.00   
 ...             ...     ...     ...           ...          ...        ...   
 39812  SSSSM2020029   ac228  SSSM43           NaN         37.5       5.00   
 39813  SSSSM2020030     k40  SSSM43           NaN        526.0       1.72   
 39814  SSSSM2020030   cs137  SSSM43           NaN         17.2       2.21   
 39815  SSSSM2020031     k40  SSSM43           NaN       1000.0       1.80   
 39816  SSSSM2020031   cs137  SSSM43           NaN         64.0       1.20   
 
       < VALUE_Bq/m²  VALUE_Bq/m²  ERROR%_m²    DATE_OF_ENTRY_x  ...  LOWSLI  \
 0               NaN          NaN        NaN  08/20/14 00:00:00  ...    20.0   
 1               NaN          NaN        NaN  08/20/14 00:00:00  ...    27.0   
 2               NaN          NaN        NaN  08/20/14 00:00:00  ...     2.0   
 3               NaN          NaN        NaN  08/20/14 00:00:00  ...     4.0   
 4               NaN          NaN        NaN  08/20/14 00:00:00  ...     6.0   
 ...             ...          ...        ...                ...  ...     ...   
 39812           NaN        255.0       28.0  04/22/22 00:00:00  ...     2.0   
 39813           NaN       5690.0        2.0  04/22/22 00:00:00  ...     2.0   
 39814           NaN        186.0        2.0  04/22/22 00:00:00  ...     2.0   
 39815           NaN      16000.0        2.0  04/22/22 00:00:00  ...     2.0   
 39816           NaN       1020.0        1.0  04/22/22 00:00:00  ...     2.0   
 
         AREA  SEDI OXIC    DW%  LOI%  MORS_SUBBASIN HELCOM_SUBBASIN  SUM_LINK  \
 0      0.006   NaN  NaN    NaN   NaN           11.0            11.0       NaN   
 1      0.006   NaN  NaN    NaN   NaN           11.0            11.0       NaN   
 2      0.006   NaN  NaN    NaN   NaN           11.0            11.0       NaN   
 3      0.006   NaN  NaN    NaN   NaN           11.0            11.0       NaN   
 4      0.006   NaN  NaN    NaN   NaN           11.0            11.0       NaN   
 ...      ...   ...  ...    ...   ...            ...             ...       ...   
 39812  0.019   0.0    O  28.73  14.0           13.0            13.0       NaN   
 39813  0.019   0.0    O  32.03   NaN           12.0            12.0       NaN   
 39814  0.019   0.0    O  32.03   NaN           12.0            12.0       NaN   
 39815  0.017   0.0    O  48.77   NaN            1.0             8.0       NaN   
 39816  0.017   0.0    O  48.77   NaN            1.0             8.0       NaN   
 
          DATE_OF_ENTRY_y  
 0      08/20/14 00:00:00  
 1      08/20/14 00:00:00  
 2      08/20/14 00:00:00  
 3      08/20/14 00:00:00  
 4      08/20/14 00:00:00  
 ...                  ...  
 39812  04/22/22 00:00:00  
 39813  04/22/22 00:00:00  
 39814  04/22/22 00:00:00  
 39815  04/22/22 00:00:00  
 39816  04/22/22 00:00:00  
 
 [39817 rows x 35 columns],
 'biota':                 KEY NUCLIDE  METHOD < VALUE_Bq/kg  VALUE_Bq/kg BASIS  ERROR%  \
 0      BVTIG2012041   cs134  VTIG01             <     0.010140     W     NaN   
 1      BVTIG2012041     k40  VTIG01                 135.300000     W    3.57   
 2      BVTIG2012041    co60  VTIG01             <     0.013980     W     NaN   
 3      BVTIG2012041   cs137  VTIG01                   4.338000     W    3.48   
 4      BVTIG2012040   cs134  VTIG01             <     0.009614     W     NaN   
 ...             ...     ...     ...           ...          ...   ...     ...   
 15822  BSSSM2020016     k40  SSSM42           NaN    65.000000     D   10.20   
 15823  BSSSM2020016   cs137  SSSM42           NaN     4.500000     D    6.20   
 15824  BSSSM2020017     be7  SSSM42           NaN    94.000000     D    3.40   
 15825  BSSSM2020017     k40  SSSM42           NaN  1100.000000     D    1.60   
 15826  BSSSM2020017   cs137  SSSM42           NaN    13.000000     D    2.50   
 
        NUMBER    DATE_OF_ENTRY_x  COUNTRY  ... BIOTATYPE  TISSUE     NO  \
 0         NaN  02/27/14 00:00:00      6.0  ...         F       5   16.0   
 1         NaN  02/27/14 00:00:00      6.0  ...         F       5   16.0   
 2         NaN  02/27/14 00:00:00      6.0  ...         F       5   16.0   
 3         NaN  02/27/14 00:00:00      6.0  ...         F       5   16.0   
 4         NaN  02/27/14 00:00:00      6.0  ...         F       5   17.0   
 ...       ...                ...      ...  ...       ...     ...    ...   
 15822     NaN  04/22/22 00:00:00     77.0  ...         B      41  319.0   
 15823     NaN  04/22/22 00:00:00     77.0  ...         B      41  319.0   
 15824     NaN  04/22/22 00:00:00     77.0  ...         P      51    NaN   
 15825     NaN  04/22/22 00:00:00     77.0  ...         P      51    NaN   
 15826     NaN  04/22/22 00:00:00     77.0  ...         P      51    NaN   
 
        LENGTH  WEIGHT     DW%  LOI%  MORS_SUBBASIN  HELCOM_SUBBASIN  \
 0        45.7   948.0  18.453  92.9            2.0               16   
 1        45.7   948.0  18.453  92.9            2.0               16   
 2        45.7   948.0  18.453  92.9            2.0               16   
 3        45.7   948.0  18.453  92.9            2.0               16   
 4        45.9   964.0  18.458  92.9            2.0               16   
 ...       ...     ...     ...   ...            ...              ...   
 15822     NaN     NaN  41.000   0.0            1.0                8   
 15823     NaN     NaN  41.000   0.0            1.0                8   
 15824     NaN     NaN  21.000   0.0            1.0                8   
 15825     NaN     NaN  21.000   0.0            1.0                8   
 15826     NaN     NaN  21.000   0.0            1.0                8   
 
          DATE_OF_ENTRY_y  
 0      02/27/14 00:00:00  
 1      02/27/14 00:00:00  
 2      02/27/14 00:00:00  
 3      02/27/14 00:00:00  
 4      02/27/14 00:00:00  
 ...                  ...  
 15822  04/22/22 00:00:00  
 15823  04/22/22 00:00:00  
 15824  04/22/22 00:00:00  
 15825  04/22/22 00:00:00  
 15826  04/22/22 00:00:00  
 
 [15827 rows x 33 columns]}
grp='sediment'
check_data_sediment=tfm.dfs[grp][(tfm.dfs[grp]['DW%'] < 1) & (tfm.dfs[grp]['DW%'] > 0.001) ]
check_data_sediment
KEY NUCLIDE METHOD < VALUE_Bq/kg VALUE_Bq/kg ERROR%_kg < VALUE_Bq/m² VALUE_Bq/m² ERROR%_m² DATE_OF_ENTRY_x ... LOWSLI AREA SEDI OXIC DW% LOI% MORS_SUBBASIN HELCOM_SUBBASIN SUM_LINK DATE_OF_ENTRY_y
30938 SLVEA2010001 cs137 LVEA01 NaN 334.25 1.57 NaN 131.886 41179.0 NaN ... 2.0 0.0151 5.0 O 0.115 0.9 14.0 14.0 NaN 11/11/11 00:00:00
30939 SLVEA2010002 cs137 LVEA01 NaN 343.58 1.49 NaN 132.092 41179.0 NaN ... 4.0 0.0151 5.0 A 0.159 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30940 SLVEA2010003 cs137 LVEA01 NaN 334.69 1.56 NaN 134.390 41179.0 NaN ... 6.0 0.0151 5.0 A 0.189 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30941 SLVEA2010004 cs137 LVEA01 NaN 348.50 1.56 NaN 136.699 41179.0 NaN ... 8.0 0.0151 5.0 A 0.194 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30942 SLVEA2010005 cs137 LVEA01 NaN 258.67 1.73 NaN 104.894 41179.0 NaN ... 10.0 0.0151 5.0 A 0.195 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30943 SLVEA2010006 cs137 LVEA01 NaN 182.02 2.05 NaN 77.523 41179.0 NaN ... 12.0 0.0151 5.0 A 0.221 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30944 SLVEA2010007 cs137 LVEA01 NaN 116.34 2.79 NaN 46.946 41179.0 NaN ... 14.0 0.0151 5.0 A 0.238 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30945 SLVEA2010008 cs137 LVEA01 NaN 94.07 2.61 NaN 38.162 41179.0 NaN ... 16.0 0.0151 5.0 A 0.234 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30946 SLVEA2010009 cs137 LVEA01 NaN 69.70 3.12 NaN 27.444 41179.0 NaN ... 18.0 0.0151 5.0 A 0.242 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30947 SLVEA2010010 cs137 LVEA01 NaN 59.63 3.40 NaN 24.220 41179.0 NaN ... 20.0 0.0151 5.0 A 0.257 0.7 14.0 14.0 NaN 11/11/11 00:00:00
30948 SLVEA2010011 cs137 LVEA01 < 12.24 3.88 < 5.035 41179.0 NaN ... 22.0 0.0151 5.0 A 0.264 0.7 14.0 14.0 NaN 11/11/11 00:00:00
30949 SLVEA2010012 cs137 LVEA01 < 0.83 NaN < 0.330 41179.0 NaN ... 24.0 0.0151 5.0 A 0.244 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30950 SLVEA2010013 cs137 LVEA01 NaN 331.61 1.40 NaN 125.566 41179.0 NaN ... 2.0 0.0151 5.0 O 0.115 0.9 14.0 14.0 NaN 11/11/11 00:00:00
30951 SLVEA2010014 cs137 LVEA01 NaN 352.06 1.33 NaN 144.516 41179.0 NaN ... 4.0 0.0151 5.0 A 0.164 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30952 SLVEA2010015 cs137 LVEA01 NaN 367.11 1.36 NaN 139.434 41179.0 NaN ... 6.0 0.0151 5.0 A 0.191 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30953 SLVEA2010016 cs137 LVEA01 NaN 328.97 1.42 NaN 124.348 41179.0 NaN ... 8.0 0.0151 5.0 A 0.188 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30954 SLVEA2010017 cs137 LVEA01 NaN 356.30 1.37 NaN 135.447 41179.0 NaN ... 10.0 0.0151 5.0 A 0.179 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30955 SLVEA2010018 cs137 LVEA01 NaN 314.75 1.42 NaN 118.765 41179.0 NaN ... 12.0 0.0151 5.0 A 0.186 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30956 SLVEA2010019 cs137 LVEA01 NaN 261.64 1.52 NaN 104.580 41179.0 NaN ... 14.0 0.0151 5.0 A 0.194 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30957 SLVEA2010020 cs137 LVEA01 NaN 181.00 1.76 NaN 74.058 41179.0 NaN ... 16.0 0.0151 5.0 A 0.209 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30958 SLVEA2010021 cs137 LVEA01 NaN 143.65 2.02 NaN 57.680 41179.0 NaN ... 18.0 0.0151 5.0 A 0.214 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30959 SLVEA2010022 cs137 LVEA01 NaN 109.36 2.15 NaN 42.153 41179.0 NaN ... 20.0 0.0151 5.0 A 0.218 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30960 SLVEA2010023 cs137 LVEA01 NaN 94.12 1.39 NaN 35.873 41179.0 NaN ... 22.0 0.0151 5.0 A 0.212 0.8 14.0 14.0 NaN 11/11/11 00:00:00
30961 SLVEA2010024 cs137 LVEA01 NaN 96.63 1.35 NaN 38.864 41179.0 NaN ... 24.0 0.0151 5.0 A 0.217 0.8 14.0 14.0 NaN 11/11/11 00:00:00

24 rows × 35 columns

grp='sediment'
check_data_sediment=tfm.dfs[grp][(tfm.dfs[grp]['DW%'] == 0) ]
check_data_sediment
KEY NUCLIDE METHOD < VALUE_Bq/kg VALUE_Bq/kg ERROR%_kg < VALUE_Bq/m² VALUE_Bq/m² ERROR%_m² DATE_OF_ENTRY_x ... LOWSLI AREA SEDI OXIC DW% LOI% MORS_SUBBASIN HELCOM_SUBBASIN SUM_LINK DATE_OF_ENTRY_y
9824 SERPC1997001 cs134 NaN NaN 3.80 20.0 NaN 5.75 NaN NaN ... 2.0 0.008 5.0 A 0.0 0.0 11.0 11.0 a NaN
9825 SERPC1997001 cs137 NaN NaN 389.00 4.0 NaN 589.00 NaN NaN ... 2.0 0.008 5.0 A 0.0 0.0 11.0 11.0 a NaN
9826 SERPC1997002 cs134 NaN NaN 4.78 13.0 NaN 12.00 NaN NaN ... 4.0 0.008 5.0 A 0.0 0.0 11.0 11.0 a NaN
9827 SERPC1997002 cs137 NaN NaN 420.00 4.0 NaN 1060.00 NaN NaN ... 4.0 0.008 5.0 A 0.0 0.0 11.0 11.0 a NaN
9828 SERPC1997003 cs134 NaN NaN 3.12 17.0 NaN 12.00 NaN NaN ... 6.0 0.008 5.0 A 0.0 0.0 11.0 11.0 a NaN
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
15257 SKRIL1999062 th228 1 NaN 68.00 NaN NaN NaN NaN NaN ... 15.0 0.006 0.0 O 0.0 0.0 11.0 11.0 a NaN
15258 SKRIL1999063 k40 1 NaN 1210.00 NaN NaN NaN NaN NaN ... 21.5 0.006 0.0 O 0.0 0.0 11.0 11.0 a NaN
15259 SKRIL1999063 ra226 KRIL01 NaN 56.50 NaN NaN NaN NaN NaN ... 21.5 0.006 0.0 O 0.0 0.0 11.0 11.0 a NaN
15260 SKRIL1999063 ra228 KRIL01 NaN 72.20 NaN NaN NaN NaN NaN ... 21.5 0.006 0.0 O 0.0 0.0 11.0 11.0 a NaN
15261 SKRIL1999063 th228 1 NaN 74.20 NaN NaN NaN NaN NaN ... 21.5 0.006 0.0 O 0.0 0.0 11.0 11.0 a NaN

302 rows × 35 columns

grp='biota'
check_data_sediment=tfm.dfs[grp][(tfm.dfs[grp]['DW%'] == 0) ]
check_data_sediment
KEY NUCLIDE METHOD < VALUE_Bq/kg VALUE_Bq/kg BASIS ERROR% NUMBER DATE_OF_ENTRY_x COUNTRY ... BIOTATYPE TISSUE NO LENGTH WEIGHT DW% LOI% MORS_SUBBASIN HELCOM_SUBBASIN DATE_OF_ENTRY_y
5971 BERPC1997002 k40 NaN NaN 116.00 W 3.0 NaN NaN 91.0 ... F 5 0.0 0.0 0.0 0.0 0.0 11.0 11 NaN
5972 BERPC1997002 cs137 NaN NaN 12.60 W 4.0 NaN NaN 91.0 ... F 5 0.0 0.0 0.0 0.0 0.0 11.0 11 NaN
5973 BERPC1997002 cs134 NaN NaN 0.14 W 18.0 NaN NaN 91.0 ... F 5 0.0 0.0 0.0 0.0 0.0 11.0 11 NaN
5974 BERPC1997001 k40 NaN NaN 116.00 W 4.0 NaN NaN 91.0 ... F 5 0.0 0.0 0.0 0.0 0.0 11.0 11 NaN
5975 BERPC1997001 cs137 NaN NaN 12.00 W 4.0 NaN NaN 91.0 ... F 5 0.0 0.0 0.0 0.0 0.0 11.0 11 NaN
5976 BERPC1997001 cs134 NaN NaN 0.21 W 24.0 NaN NaN 91.0 ... F 5 0.0 0.0 0.0 0.0 0.0 11.0 11 NaN

6 rows × 33 columns