This data pipeline, known as a “handler” in Marisco terminology, is designed to clean, standardize, and encode HELCOM data into NetCDF format. The handler processes raw HELCOM data, applying various transformations and lookups to align it with MARIS data standards.

Key functions of this handler:

This handler is a crucial component in the Marisco data processing workflow, ensuring HELCOM data is properly integrated into the MARIS database.

Getting Started

The present notebook pretends to be an instance of Literate Programming in the sense that it is a narrative that includes code snippets that are interspersed with explanations. When a function or a class needs to be exported in a dedicated python module (in our case marisco/handlers/helcom.py) the code snippet is added to the module using #| exports as provided by the wonderful nbdev library.

Configuration & file paths

  • src_dir: path to the maris-crawlers folder containing the HELCOM data in CSV format.

  • fname_out: path and filename for the NetCDF output.The path can be defined as a relative path.

  • Zotero key: used to retrieve attributes related to the dataset from Zotero. The MARIS datasets include a library available on Zotero.

Exported source
src_dir = 'https://raw.githubusercontent.com/franckalbinet/maris-crawlers/refs/heads/main/data/processed/HELCOM%20MORS'
fname_out = '../../_data/output/100-HELCOM-MORS-2024.nc'
zotero_key ='26VMZZ2Q' # HELCOM MORS zotero key

Load data

Helcom MORS (Monitoring of Radioactive Substances in the Baltic Sea) data is provided as a zipped Microsoft Access database. We automatically fetch and convert this dataset with database tables exported as .csv files using a Github action here: maris-crawlers.

The dataset is then accessible in an amenable format for the marisco data pipeline.


source

read_csv

 read_csv (file_name,
           dir='https://raw.githubusercontent.com/franckalbinet/maris-
           crawlers/refs/heads/main/data/processed/HELCOM%20MORS')
Exported source
default_smp_types = {  
    'BIO': 'BIOTA', 
    'SEA': 'SEAWATER', 
    'SED': 'SEDIMENT'
}
Exported source
def read_csv(file_name, dir=src_dir):
    file_path = f'{dir}/{file_name}'
    return pd.read_csv(file_path)

source

load_data

 load_data (src_url:str, smp_types:dict={'BIO': 'BIOTA', 'SEA':
            'SEAWATER', 'SED': 'SEDIMENT'}, use_cache:bool=False,
            save_to_cache:bool=False, verbose:bool=False)

Load HELCOM data and return the data in a dictionary of dataframes with the dictionary key as the sample type.

Exported source
def load_data(src_url: str, 
              smp_types: dict = default_smp_types, 
              use_cache: bool = False,
              save_to_cache: bool = False,
              verbose: bool = False) -> Dict[str, pd.DataFrame]:
    "Load HELCOM data and return the data in a dictionary of dataframes with the dictionary key as the sample type."

    
    def load_and_merge(file_prefix: str) -> pd.DataFrame:
        
        if use_cache:
            dir=cache_path()
        else:
            dir = src_url
            
        file_smp_path = f'{dir}/{file_prefix}01.csv'
        file_meas_path = f'{dir}/{file_prefix}02.csv'

        if use_cache:
            if not Path(file_smp_path).exists():
                print(f'{file_smp_path} not found.')            
            if not Path(file_meas_path).exists():
                print(f'{file_meas_path} not found.')
        
        if verbose:
            start_time = time.time()
        df_meas = read_csv(f'{file_prefix}02.csv', dir)
        df_smp = read_csv(f'{file_prefix}01.csv', dir)
        
        df_meas.columns = df_meas.columns.str.lower()
        df_smp.columns = df_smp.columns.str.lower()
        
        merged_df = pd.merge(df_meas, df_smp, on='key', how='left')
        
        if verbose:
            print(f"Downloaded data for {file_prefix}01.csv and {file_prefix}02.csv in {time.time() - start_time:.2f} seconds.")
            
        if save_to_cache:
            dir = cache_path()
            df_smp.to_csv(f'{dir}/{file_prefix}01.csv', index=False)
            df_meas.to_csv(f'{dir}/{file_prefix}02.csv', index=False)
            if verbose:
                print(f"Saved downloaded data to cache at {dir}/{file_prefix}01.csv and {dir}/{file_prefix}02.csv")

        return merged_df
    return {smp_type: load_and_merge(file_prefix) for file_prefix, smp_type in smp_types.items()}

dfs is a dictionary of dataframes created from the Helcom dataset located at the path src_dir. The data to be included in each dataframe is sorted by sample type. Each dictionary is defined with a key equal to the sample type.

dfs = load_data(src_dir, verbose=True, save_to_cache=True)
print('keys/sample types: ', dfs.keys())
Downloaded data for BIO01.csv and BIO02.csv in 1.14 seconds.
Saved downloaded data to cache at /Users/franckalbinet/.marisco/cache/BIO01.csv and /Users/franckalbinet/.marisco/cache/BIO02.csv
Downloaded data for SEA01.csv and SEA02.csv in 2.57 seconds.
Saved downloaded data to cache at /Users/franckalbinet/.marisco/cache/SEA01.csv and /Users/franckalbinet/.marisco/cache/SEA02.csv
Downloaded data for SED01.csv and SED02.csv in 2.20 seconds.
Saved downloaded data to cache at /Users/franckalbinet/.marisco/cache/SED01.csv and /Users/franckalbinet/.marisco/cache/SED02.csv
keys/sample types:  dict_keys(['BIOTA', 'SEAWATER', 'SEDIMENT'])

Lets take a look at each DataFrame:

for key in dfs.keys():
    display(Markdown(f"<b> {key} DataFrame:</b>"))
    with pd.option_context('display.max_columns', None):
        display(dfs[key].head(2))

BIOTA DataFrame:

key nuclide method < value_bq/kg value_bq/kg basis error% number date_of_entry_x country laboratory sequence date year month day station latitude ddmmmm latitude dddddd longitude ddmmmm longitude dddddd sdepth rubin biotatype tissue no length weight dw% loi% mors_subbasin helcom_subbasin date_of_entry_y
0 BVTIG2012041 CS134 VTIG01 < 0.01014 W NaN NaN 02/27/14 00:00:00 6.0 VTIG 2012041 09/23/12 00:00:00 2012 9.0 23.0 SD24 54.17 54.283333 12.19 12.316667 NaN GADU MOR F 5 16.0 45.7 948.0 18.453 92.9 2.0 16 02/27/14 00:00:00
1 BVTIG2012041 K40 VTIG01 135.30000 W 3.57 NaN 02/27/14 00:00:00 6.0 VTIG 2012041 09/23/12 00:00:00 2012 9.0 23.0 SD24 54.17 54.283333 12.19 12.316667 NaN GADU MOR F 5 16.0 45.7 948.0 18.453 92.9 2.0 16 02/27/14 00:00:00

SEAWATER DataFrame:

key nuclide method < value_bq/m³ value_bq/m³ error%_m³ date_of_entry_x country laboratory sequence date year month day station latitude (ddmmmm) latitude (dddddd) longitude (ddmmmm) longitude (dddddd) tdepth sdepth salin ttemp filt mors_subbasin helcom_subbasin date_of_entry_y
0 WKRIL2012003 CS137 NaN NaN 5.3 32.0 08/20/14 00:00:00 90.0 KRIL 2012003.0 05/23/12 00:00:00 2012.0 5.0 23.0 RU10 60.05 60.0833 29.2 29.3333 NaN 0.0 NaN NaN NaN 11.0 11.0 08/20/14 00:00:00
1 WKRIL2012004 CS137 NaN NaN 19.9 20.0 08/20/14 00:00:00 90.0 KRIL 2012004.0 05/23/12 00:00:00 2012.0 5.0 23.0 RU10 60.05 60.0833 29.2 29.3333 NaN 29.0 NaN NaN NaN 11.0 11.0 08/20/14 00:00:00

SEDIMENT DataFrame:

key nuclide method < value_bq/kg value_bq/kg error%_kg < value_bq/m² value_bq/m² error%_m² date_of_entry_x country laboratory sequence date year month day station latitude (ddmmmm) latitude (dddddd) longitude (ddmmmm) longitude (dddddd) device tdepth uppsli lowsli area sedi oxic dw% loi% mors_subbasin helcom_subbasin sum_link date_of_entry_y
0 SKRIL2012116 CS137 NaN NaN 1200.0 20.0 NaN NaN NaN 08/20/14 00:00:00 90.0 KRIL 2012116.0 05/25/12 00:00:00 2012.0 5.0 25.0 RU99 60.28 60,4667 27.48 27.8 KRIL01 25.0 15.0 20.0 0.006 NaN NaN NaN NaN 11.0 11.0 NaN 08/20/14 00:00:00
1 SKRIL2012117 CS137 NaN NaN 250.0 20.0 NaN NaN NaN 08/20/14 00:00:00 90.0 KRIL 2012117.0 05/25/12 00:00:00 2012.0 5.0 25.0 RU99 60.28 60,4667 27.48 27.8 KRIL01 25.0 20.0 25.0 0.006 NaN NaN NaN NaN 11.0 11.0 NaN 08/20/14 00:00:00

Normalize nuclide names

Lower & strip nuclide names

FEEDBACK TO DATA PROVIDER

Some nuclide names contain one or multiple trailing spaces.

This is demonstrated below for the NUCLIDE column:

df = get_unique_across_dfs(load_data(src_dir, use_cache=True), 'nuclide', as_df=True, include_nchars=True)
df['stripped_chars'] = df['value'].str.strip().str.replace(' ', '').str.len()
print(df[df['n_chars'] != df['stripped_chars']])
    index      value  n_chars  stripped_chars
2       2   CO60            8               4
19     19    SR90           7               4
20     20   K40             8               3
21     21   PU238           8               5
29     29   SR90            8               4
32     32   CS137           8               5
47     47    TC99           7               4
57     57     CS137         6               5
74     74  CS137            9               5
82     82     SR90          6               4
87     87   CS134           8               5
90     90      SR90         5               4
93     93   AM241           8               5

To fix this issue, we use the LowerStripNameCB callback. For each dataframe in the dictionary of dataframes, it corrects the nuclide name by converting it lowercase, striping any leading or trailing whitespace(s).

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='nuclide', col_dst='NUCLIDE')])
tfm()
for key, df in tfm.dfs.items():
    print(f'{key} Nuclides: ')
    print(df['NUCLIDE'].unique())
    
print(tfm.logs)
BIOTA Nuclides: 
['cs134' 'k40' 'co60' 'cs137' 'sr90' 'ag108m' 'mn54' 'co58' 'ag110m'
 'zn65' 'sb125' 'pu239240' 'ru106' 'be7' 'ce144' 'pb210' 'po210' 'sb124'
 'sr89' 'zr95' 'te129m' 'ru103' 'nb95' 'ce141' 'la140' 'i131' 'ba140'
 'pu238' 'u235' 'bi214' 'pb214' 'pb212' 'tl208' 'ac228' 'ra223' 'eu155'
 'ra226' 'gd153' 'sn113' 'fe59' 'tc99' 'co57' 'sn117m' 'eu152' 'sc46'
 'rb86' 'ra224' 'th232' 'cs134137' 'am241' 'ra228' 'th228' 'k-40' 'cs138'
 'cs139' 'cs140' 'cs141' 'cs142' 'cs143' 'cs144' 'cs145' 'cs146']
SEAWATER Nuclides: 
['cs137' 'sr90' 'h3' 'cs134' 'pu238' 'pu239240' 'am241' 'cm242' 'cm244'
 'tc99' 'k40' 'ru103' 'sr89' 'sb125' 'nb95' 'ru106' 'zr95' 'ag110m'
 'cm243244' 'ba140' 'ce144' 'u234' 'u238' 'co60' 'pu239' 'pb210' 'po210'
 'np237' 'pu240' 'mn54']
SEDIMENT Nuclides: 
['cs137' 'ra226' 'ra228' 'k40' 'sr90' 'cs134137' 'cs134' 'pu239240'
 'pu238' 'co60' 'ru103' 'ru106' 'sb125' 'ag110m' 'ce144' 'am241' 'be7'
 'th228' 'pb210' 'co58' 'mn54' 'zr95' 'ba140' 'po210' 'ra224' 'nb95'
 'pu238240' 'pu241' 'pu239' 'eu155' 'ir192' 'th232' 'cd109' 'sb124' 'zn65'
 'th234' 'tl208' 'pb212' 'pb214' 'bi214' 'ac228' 'ra223' 'u235' 'bi212']
["Convert 'nuclide' column values to lowercase, strip spaces, and store in 'NUCLIDE' column."]

Remap nuclide names to MARIS data formats

Below, we map nuclide names used by HELCOM to the MARIS standard nuclide names.

Remapping data provider nomenclatures to MARIS standards is a recurrent operation and is done in a semi-automated manner according to the following pattern:

  1. Inspect data provider nomenclature:
  2. Match automatically against MARIS nomenclature (using a fuzzy matching algorithm);
  3. Fix potential mismatches;
  4. Apply the lookup table to the dataframe.

We will refer to this process as IMFA (Inspect, Match, Fix, Apply).

The get_unique_across_dfs function is a utility in MARISCO that retrieves unique values from a specified column across all DataFrames. Note that there is one DataFrame for each sample type, such as biota, sediment, etc.

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='nuclide', col_dst='NUCLIDE')])

dfs_output = tfm()

# Transpose to display the dataframe horizontally
get_unique_across_dfs(dfs_output, col_name='NUCLIDE', as_df=True).T
0 1 2 3 4 5 6 7 8 9 ... 67 68 69 70 71 72 73 74 75 76
index 0 1 2 3 4 5 6 7 8 9 ... 67 68 69 70 71 72 73 74 75 76
value pb212 ce141 sn113 cs144 co57 am241 co60 ba140 cs139 k-40 ... pu241 eu152 ra223 th232 sr90 pu239 ag108m mn54 sc46 sb125

2 rows × 77 columns

Let’s now create an instance of a fuzzy matching algorithm Remapper. This instance will match the nuclide names of the HELCOM dataset to the MARIS standard nuclide names.

remapper = Remapper(provider_lut_df=get_unique_across_dfs(dfs_output, col_name='NUCLIDE', as_df=True),
                    maris_lut_fn=nuc_lut_path,
                    maris_col_id='nuclide_id',
                    maris_col_name='nc_name',
                    provider_col_to_match='value',
                    provider_col_key='value',
                    fname_cache='nuclides_helcom.pkl')

Lets try to match HELCOM nuclide names to MARIS standard nuclide names as automatically as possible. The match_score column allows to assess the results:

remapper.generate_lookup_table(as_df=True)
remapper.select_match(match_score_threshold=1, verbose=True)
Processing: 100%|██████████| 77/77 [00:01<00:00, 48.67it/s]
63 entries matched the criteria, while 14 entries had a match score of 1 or higher.
matched_maris_name source_name match_score
source_key
cm243244 cm242 cm243244 3
pu238240 pu240 pu238240 3
cs134137 cs137 cs134137 3
pu239240 pu239 pu239240 3
cs142 ce140 cs142 2
cs145 ce140 cs145 2
cs143 ce140 cs143 2
cs144 cs134 cs144 1
cs139 ce139 cs139 1
k-40 k40 k-40 1
cs141 ce141 cs141 1
cs146 cs136 cs146 1
cs140 ce140 cs140 1
cs138 cs134 cs138 1

We can now manually inspect the unmatched nuclide names and create a table to correct them to the MARIS standard:

Exported source
fixes_nuclide_names = {
    'cs134137': 'cs134_137_tot',
    'cm243244': 'cm243_244_tot',
    'pu239240': 'pu239_240_tot',
    'pu238240': 'pu238_240_tot',
    'cs143': 'cs137',
    'cs145': 'cs137',
    'cs142': 'cs137',
    'cs141': 'cs137',
    'cs144': 'cs137',
    'k-40': 'k40',
    'cs140': 'cs137',
    'cs146': 'cs137',
    'cs139': 'cs137',
    'cs138': 'cs137'
    }

We now include the table fixes_nuclide_names, which applies manual corrections to the nuclide names before the remapping process. The generate_lookup_table function has an overwrite parameter (default is True), which, when set to True, creates a pickle file cache of the lookup table. We can now test the remapping process:

remapper.generate_lookup_table(as_df=True, fixes=fixes_nuclide_names)
fc.test_eq(len(remapper.select_match(match_score_threshold=1, verbose=True)), 0)
Processing:   0%|          | 0/77 [00:00<?, ?it/s]Processing: 100%|██████████| 77/77 [00:01<00:00, 43.74it/s]
77 entries matched the criteria, while 0 entries had a match score of 1 or higher.

Test passes! We can now create a callback RemapNuclideNameCB to remap the nuclide names. Note that we pass overwrite=False to the Remapper constructor to now use the cached version.

Exported source
# Create a lookup table for nuclide names
lut_nuclides = lambda df: Remapper(provider_lut_df=df,
                                   maris_lut_fn=nuc_lut_path,
                                   maris_col_id='nuclide_id',
                                   maris_col_name='nc_name',
                                   provider_col_to_match='value',
                                   provider_col_key='value',
                                   fname_cache='nuclides_helcom.pkl').generate_lookup_table(fixes=fixes_nuclide_names, 
                                                                                            as_df=False, overwrite=False)

The callback RemapNuclideNameCB is now created to remap the nuclide names using the lut_nuclides lookup table.


source

RemapNuclideNameCB

 RemapNuclideNameCB (fn_lut:Callable, col_name:str)

Remap data provider nuclide names to standardized MARIS nuclide names.

Type Details
fn_lut Callable Function that returns the lookup table dictionary
col_name str Column name to remap
Exported source
class RemapNuclideNameCB(Callback):
    "Remap data provider nuclide names to standardized MARIS nuclide names."
    def __init__(self, 
                 fn_lut: Callable, # Function that returns the lookup table dictionary
                 col_name: str # Column name to remap
                ):
        fc.store_attr()

    def __call__(self, tfm: Transformer):
        df_uniques = get_unique_across_dfs(tfm.dfs, col_name=self.col_name, as_df=True)
        #lut = {k: v.matched_maris_name for k, v in self.fn_lut(df_uniques).items()}    
        lut = {k: v.matched_id for k, v in self.fn_lut(df_uniques).items()}    
        for k in tfm.dfs.keys():
            tfm.dfs[k]['NUCLIDE'] = tfm.dfs[k][self.col_name].replace(lut)

Let’s see it in action, along with the LowerStripNameCB callback:

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='nuclide', col_dst='NUCLIDE'),
                            RemapNuclideNameCB(lut_nuclides, col_name='NUCLIDE'),
                            CompareDfsAndTfmCB(dfs)
                            ])
dfs_out = tfm()

# For instance
for key in dfs_out.keys():
    print(f'{key} NUCLIDE unique: ', dfs_out[key]['NUCLIDE'].unique())
    
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
BIOTA NUCLIDE unique:  [31  4  9 33 12 21  6  8 22 10 24 77 17  2 37 41 47 23 11 13 25 16 14 36
 35 29 34 67 63 46 43 42 94 55 50 40 53 87 92 86 15  7 93 85 91 90 51 59
 76 72 54 57]
SEAWATER NUCLIDE unique:  [33 12  1 31 67 77 72 73 75 15  4 16 11 24 14 17 13 22 80 34 37 62 64  9
 68 41 47 65 69  6]
SEDIMENT NUCLIDE unique:  [ 33  53  54   4  12  76  31  77  67   9  16  17  24  22  37  72   2  57
  41   8   6  13  34  47  51  14  89  70  68  40  88  59  84  23  10  60
  94  42  43  46  55  50  63 130]
                                               BIOTA  SEAWATER  SEDIMENT
Original row count (dfs)                       16124     21634     40744
Transformed row count (tfm.dfs)                16124     21634     40744
Rows removed from original (tfm.dfs_removed)       0         0         0
Rows created in transformed (tfm.dfs_created)      0         0         0 

Standardize Time

FEEDBACK TO DATA PROVIDER

Time/date is provide in the DATE, YEAR , MONTH, DAY columns. Note that the DATE contains missing values as indicated below. When missing, we fallback on the YEAR, MONTH, DAY columns. Note that sometimes DAY and MONTH contain 0. In this case we systematically set them to 1.

dfs = load_data(src_dir, use_cache=True)
for key in dfs.keys():
    print(f'{key} DATE null values: ', dfs[key]['date'].isna().sum())
BIOTA DATE null values:  88
SEAWATER DATE null values:  554
SEDIMENT DATE null values:  830

source

ParseTimeCB

 ParseTimeCB ()

Standardize time format across all dataframes.

Exported source
class ParseTimeCB(Callback):
    "Standardize time format across all dataframes."
    def __call__(self, tfm: Transformer):
        for df in tfm.dfs.values():
            self._process_dates(df)

    def _process_dates(self, df: pd.DataFrame) -> None:
        "Process and correct date and time information in the DataFrame."
        df['TIME'] = self._parse_date(df)
        self._handle_missing_dates(df)
        self._fill_missing_time(df)

    def _parse_date(self, df: pd.DataFrame) -> pd.Series:
        "Parse the DATE column if present."
        return pd.to_datetime(df['date'], format='%m/%d/%y %H:%M:%S', errors='coerce')

    def _handle_missing_dates(self, df: pd.DataFrame):
        "Handle cases where DAY or MONTH is 0 or missing."
        df.loc[df["day"] == 0, "day"] = 1
        df.loc[df["month"] == 0, "month"] = 1
        
        missing_day_month = (df["day"].isna()) & (df["month"].isna()) & (df["year"].notna())
        df.loc[missing_day_month, ["day", "month"]] = 1

    def _fill_missing_time(self, df: pd.DataFrame) -> None:
        "Fill missing time values using year, month, and day columns."
        missing_time = df['TIME'].isna()
        df.loc[missing_time, 'TIME'] = pd.to_datetime(
            df.loc[missing_time, ['year', 'month', 'day']], 
            format='%Y%m%d', 
            errors='coerce'
        )

Apply the transformer for callbacks ParseTimeCB. Then, print the TIME data for seawater. Passing the CompareDfsAndTfmCB callback allows us to compare the original dataframes with the transformed dataframes using the compare_stats attribute.

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[ParseTimeCB(),
                            CompareDfsAndTfmCB(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
print(tfm.dfs['SEAWATER'][['TIME']])
                                               BIOTA  SEAWATER  SEDIMENT
Original row count (dfs)                       16124     21634     40744
Transformed row count (tfm.dfs)                16124     21634     40744
Rows removed from original (tfm.dfs_removed)       0         0         0
Rows created in transformed (tfm.dfs_created)      0         0         0 

            TIME
0     2012-05-23
1     2012-05-23
2     2012-06-17
3     2012-05-24
4     2012-05-24
...          ...
21629 2023-06-11
21630 2023-06-11
21631 2023-06-13
21632 2023-06-13
21633 2023-06-13

[21634 rows x 1 columns]

The NetCDF time format requires that time be encoded as the number of milliseconds since a specified origin. In our case, the origin is 1970-01-01, as indicated in the cdl.toml file under the [vars.defaults.time.attrs] section.

EncodeTimeCB converts the HELCOM time format to the MARIS NetCDF time format.

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[ParseTimeCB(),
                            EncodeTimeCB(),
                            CompareDfsAndTfmCB(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
#print(tfm.logs)
Warning: 8 missing time value(s) in SEAWATER
Warning: 1 missing time value(s) in SEDIMENT
                                               BIOTA  SEAWATER  SEDIMENT
Original row count (dfs)                       16124     21634     40744
Transformed row count (tfm.dfs)                16124     21626     40743
Rows removed from original (tfm.dfs_removed)       0         8         1
Rows created in transformed (tfm.dfs_created)      0         0         0 

Split Sediment Values

Helcom reports two values for the SEDIMENT sample type: VALUE_Bq/kg and VALUE_Bq/m³. We need to split this and use a single column VALUE for the MARIS standard. We will use the UNIT column to identify the reported values.

Lets take a look at the unit lookup table for MARIS:

pd.read_excel(unit_lut_path())
unit_id unit unit_sanitized ordlist Unnamed: 4 Unnamed: 5 Unnamed: 6
0 -1 Not applicable Not applicable NaN NaN NaN NaN
1 0 NOT AVAILABLE NOT AVAILABLE 0.0 NaN NaN NaN
2 1 Bq/m3 Bq per m3 1.0 Bq/m3 NaN Bq/m<sup>3</sup>
3 2 Bq/m2 Bq per m2 2.0 NaN NaN NaN
4 3 Bq/kg Bq per kg 3.0 NaN NaN NaN
5 4 Bq/kgd Bq per kgd 4.0 NaN NaN NaN
6 5 Bq/kgw Bq per kgw 5.0 NaN NaN NaN
7 6 kg/kg kg per kg 6.0 NaN NaN NaN
8 7 TU TU 7.0 NaN NaN NaN
9 8 DELTA/mill DELTA per mill 8.0 NaN NaN NaN
10 9 atom/kg atom per kg 9.0 NaN NaN NaN
11 10 atom/kgd atom per kgd 10.0 NaN NaN NaN
12 11 atom/kgw atom per kgw 11.0 NaN NaN NaN
13 12 atom/l atom per l 12.0 NaN NaN NaN
14 13 Bq/kgC Bq per kgC 13.0 NaN NaN NaN

We will define the columns of interest for the SEDIMENT measurement types:

Exported source
coi_sediment = {
    'kg_type': {
        'VALUE': 'value_bq/kg',
        'UNC': 'error%_kg',
        'DL': '< value_bq/kg',
        'UNIT': 3,  # Unit ID for Bq/kg
    },
    'm2_type': {
        'VALUE': 'value_bq/m²',
        'UNC': 'error%_m²',
        'DL': '< value_bq/m²',
        'UNIT': 2,  # Unit ID for Bq/m²
    }
}

We define the SplitSedimentValuesCB callback to split the sediment entries into separate rows for Bq/kg and Bq/m². We use underscore to denote the columns are temporary columns created during the splitting process.


source

SplitSedimentValuesCB

 SplitSedimentValuesCB (coi:Dict[str,Dict[str,Any]])

Separate sediment entries into distinct rows for Bq/kg and Bq/m² measurements.

Type Details
coi Dict Columns of interest with value, uncertainty, DL columns and units
Exported source
class SplitSedimentValuesCB(Callback):
    "Separate sediment entries into distinct rows for Bq/kg and Bq/m² measurements."
    def __init__(self, 
                 coi: Dict[str, Dict[str, Any]] # Columns of interest with value, uncertainty, DL columns and units
                ):
        fc.store_attr()
        
    def __call__(self, tfm: Transformer):
        if 'SEDIMENT' not in tfm.dfs:
            return
            
        df = tfm.dfs['SEDIMENT']
        dfs_to_concat = []
        
        # For each measurement type (kg and m2)
        for measure_type, cols in self.coi.items():
            # If any of value/uncertainty/DL exists, keep the row
            has_data = (
                df[cols['VALUE']].notna() | 
                df[cols['UNC']].notna() | 
                df[cols['DL']].notna()
            )
            
            if has_data.any():
                df_measure = df[has_data].copy()
                
                # Copy columns to standardized names
                df_measure['_VALUE'] = df_measure[cols['VALUE']]
                df_measure['_UNC'] = df_measure[cols['UNC']]
                df_measure['_DL'] = df_measure[cols['DL']]
                df_measure['_UNIT'] = cols['UNIT']
                
                dfs_to_concat.append(df_measure)
        
        # Combine all measurement type dataframes
        if dfs_to_concat:
            tfm.dfs['SEDIMENT'] = pd.concat(dfs_to_concat, ignore_index=True)
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[SplitSedimentValuesCB(coi_sediment),
                            CompareDfsAndTfmCB(dfs)
                            ])

tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
with pd.option_context('display.max_columns', None):
    display(tfm.dfs['SEDIMENT'].head())
                                               BIOTA  SEAWATER  SEDIMENT
Original row count (dfs)                       16124     21634     40744
Transformed row count (tfm.dfs)                16124     21634     70697
Rows removed from original (tfm.dfs_removed)       0         0         0
Rows created in transformed (tfm.dfs_created)      0         0     29953 
key nuclide method < value_bq/kg value_bq/kg error%_kg < value_bq/m² value_bq/m² error%_m² date_of_entry_x country laboratory sequence date year month day station latitude (ddmmmm) latitude (dddddd) longitude (ddmmmm) longitude (dddddd) device tdepth uppsli lowsli area sedi oxic dw% loi% mors_subbasin helcom_subbasin sum_link date_of_entry_y _VALUE _UNC _DL _UNIT
0 SKRIL2012116 CS137 NaN NaN 1200.0 20.0 NaN NaN NaN 08/20/14 00:00:00 90.0 KRIL 2012116.0 05/25/12 00:00:00 2012.0 5.0 25.0 RU99 60.28 60,4667 27.48 27.8 KRIL01 25.0 15.0 20.0 0.006 NaN NaN NaN NaN 11.0 11.0 NaN 08/20/14 00:00:00 1200.0 20.0 NaN 3
1 SKRIL2012117 CS137 NaN NaN 250.0 20.0 NaN NaN NaN 08/20/14 00:00:00 90.0 KRIL 2012117.0 05/25/12 00:00:00 2012.0 5.0 25.0 RU99 60.28 60,4667 27.48 27.8 KRIL01 25.0 20.0 25.0 0.006 NaN NaN NaN NaN 11.0 11.0 NaN 08/20/14 00:00:00 250.0 20.0 NaN 3
2 SKRIL2012118 CS137 NaN NaN 140.0 21.0 NaN NaN NaN 08/20/14 00:00:00 90.0 KRIL 2012118.0 05/25/12 00:00:00 2012.0 5.0 25.0 RU99 60.28 60,4667 27.48 27.8 KRIL01 25.0 25.0 30.0 0.006 NaN NaN NaN NaN 11.0 11.0 NaN 08/20/14 00:00:00 140.0 21.0 NaN 3
3 SKRIL2012119 CS137 NaN NaN 79.0 20.0 NaN NaN NaN 08/20/14 00:00:00 90.0 KRIL 2012119.0 05/25/12 00:00:00 2012.0 5.0 25.0 RU99 60.28 60,4667 27.48 27.8 KRIL01 25.0 30.0 35.0 0.006 NaN NaN NaN NaN 11.0 11.0 NaN 08/20/14 00:00:00 79.0 20.0 NaN 3
4 SKRIL2012120 CS137 NaN NaN 29.0 24.0 NaN NaN NaN 08/20/14 00:00:00 90.0 KRIL 2012120.0 05/25/12 00:00:00 2012.0 5.0 25.0 RU99 60.28 60,4667 27.48 27.8 KRIL01 25.0 35.0 40.0 0.006 NaN NaN NaN NaN 11.0 11.0 NaN 08/20/14 00:00:00 29.0 24.0 NaN 3

Sanitize value

FEEDBACK TO DATA PROVIDER

Some of the HELCOM datasets contain missing values in the VALUE column, see output after applying the SanitizeValueCB callback.

We allocate each column containing measurement values (named differently across sample types) into a single column VALUE and remove NA where needed.


source

SanitizeValueCB

 SanitizeValueCB (coi:Dict[str,Dict[str,str]], verbose:bool=False)

Sanitize measurement values by removing blanks and standardizing to use the VALUE column.

Type Default Details
coi Dict Columns of interest. Format: {group_name: {‘val’: ‘column_name’}}
verbose bool False
Exported source
coi_val = {'SEAWATER' : {'VALUE': 'value_bq/m³'},
           'BIOTA':  {'VALUE': 'value_bq/kg'},
           'SEDIMENT': {'VALUE': '_VALUE'}}
Exported source
class SanitizeValueCB(Callback):
    "Sanitize measurement values by removing blanks and standardizing to use the `VALUE` column."
    def __init__(self, 
                 coi: Dict[str, Dict[str, str]], # Columns of interest. Format: {group_name: {'val': 'column_name'}}
                 verbose: bool=False
                 ): 
        fc.store_attr()

    def __call__(self, tfm: Transformer):
        tfm.dfs_dropped = {}
        for grp, df in tfm.dfs.items():
            value_col = self.coi[grp]['VALUE']
            # Count NaN values before dropping
            initial_nan_count = df[value_col].isna().sum()
                        
            # define a dataframe with the rows that were dropped    
            tfm.dfs_dropped[grp] = df[df[value_col].isna()]
            
            df.dropna(subset=[value_col], inplace=True)

            # Count NaN values after dropping
            final_nan_count = df[value_col].isna().sum()
            dropped_nan_count = initial_nan_count - final_nan_count
            
            # Print the number of dropped NaN values
            if dropped_nan_count > 0 and self.verbose:
                print(f"Warning: {dropped_nan_count} missing value(s) in {value_col} for group {grp}.")
            
            
            df['VALUE'] = df[value_col]
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[SplitSedimentValuesCB(coi_sediment),
                            SanitizeValueCB(coi_val, verbose=True),
                            CompareDfsAndTfmCB(dfs)
                            ])

tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
Warning: 30 missing value(s) in value_bq/kg for group BIOTA.
Warning: 153 missing value(s) in value_bq/m³ for group SEAWATER.
Warning: 246 missing value(s) in _VALUE for group SEDIMENT.
                                               BIOTA  SEAWATER  SEDIMENT
Original row count (dfs)                       16124     21634     40744
Transformed row count (tfm.dfs)                16094     21481     70451
Rows removed from original (tfm.dfs_removed)      30       153       144
Rows created in transformed (tfm.dfs_created)      0         0     29851 

Normalize uncertainty

Function unc_rel2stan converts uncertainty from relative uncertainty to standard uncertainty.


source

unc_rel2stan

 unc_rel2stan (df:pandas.core.frame.DataFrame, meas_col:str, unc_col:str)

Convert relative uncertainty to absolute uncertainty.

Type Details
df DataFrame DataFrame containing measurement and uncertainty columns
meas_col str Name of the column with measurement values
unc_col str Name of the column with relative uncertainty values (percentages)
Returns Series Series with calculated absolute uncertainties
Exported source
def unc_rel2stan(
    df: pd.DataFrame, # DataFrame containing measurement and uncertainty columns
    meas_col: str, # Name of the column with measurement values
    unc_col: str # Name of the column with relative uncertainty values (percentages)
) -> pd.Series: # Series with calculated absolute uncertainties
    "Convert relative uncertainty to absolute uncertainty."
    return df.apply(lambda row: row[unc_col] * row[meas_col] / 100, axis=1)

For each sample type in the Helcom dataset, the UNC is provided as a relative uncertainty. The column names for both the VALUE and the UNC vary by sample type. The coi_units_unc dictionary defines the column names for the VALUE and UNC for each sample type.

Exported source
# Columns of interest
coi_units_unc = [('SEAWATER', 'value_bq/m³', 'error%_m³'),
                 ('BIOTA', 'value_bq/kg', 'error%'),
                 ('SEDIMENT', '_VALUE', '_UNC')]

NormalizeUncCB callback normalizes the UNC by converting from relative uncertainty to standard uncertainty.


source

NormalizeUncCB

 NormalizeUncCB (fn_convert_unc:Callable=<function unc_rel2stan>,
                 coi:List[Tuple[str,str,str]]=[('SEAWATER', 'value_bq/m³',
                 'error%_m³'), ('BIOTA', 'value_bq/kg', 'error%'),
                 ('SEDIMENT', '_VALUE', '_UNC')])

Convert from relative error to standard uncertainty.

Type Default Details
fn_convert_unc Callable unc_rel2stan Function converting relative uncertainty to absolute uncertainty
coi List [(‘SEAWATER’, ‘value_bq/m³’, ’error%_m³’), (‘BIOTA’, ‘value_bq/kg’, ‘error%’), (‘SEDIMENT’, ’_VALUE’, ’_UNC’)] List of columns of interest
Exported source
class NormalizeUncCB(Callback):
    "Convert from relative error to standard uncertainty."
    def __init__(self, 
                 fn_convert_unc: Callable=unc_rel2stan, # Function converting relative uncertainty to absolute uncertainty
                 coi: List[Tuple[str, str, str]]=coi_units_unc # List of columns of interest
                ):
        fc.store_attr()
    
    def __call__(self, tfm: Transformer):
        for grp, val, unc in self.coi:
            if grp in tfm.dfs:
                df = tfm.dfs[grp]
                df['UNC'] = self.fn_convert_unc(df, val, unc)

Apply the transformer for callback [NormalizeUncCB](https://franckalbinet.github.io/marisco/handlers/ospar.html#normalizeunccb). Then, print the value (i.e. activity per unit ) and standard uncertainty for each sample type.

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[SplitSedimentValuesCB(coi_sediment),
                            SanitizeValueCB(coi_val),
                            NormalizeUncCB(),
                            CompareDfsAndTfmCB(dfs)
                            ])
tfm()

print(tfm.dfs['SEAWATER'][['VALUE', 'UNC']][:2])
print(tfm.dfs['BIOTA'][['VALUE', 'UNC']][:2])
print(tfm.dfs['SEDIMENT'][['VALUE', 'UNC']][:2])
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
   VALUE    UNC
0    5.3  1.696
1   19.9  3.980
       VALUE      UNC
0    0.01014      NaN
1  135.30000  4.83021
    VALUE    UNC
0  1200.0  240.0
1   250.0   50.0
                                               BIOTA  SEAWATER  SEDIMENT
Original row count (dfs)                       16124     21634     40744
Transformed row count (tfm.dfs)                16094     21481     70451
Rows removed from original (tfm.dfs_removed)      30       153       144
Rows created in transformed (tfm.dfs_created)      0         0     29851 

Remap units

HELCOM incorporates the unit directly into the column name. For the SEDIMENT sample type, the units are accounted for when Splitting the sediment values (i.e. SplitSedimentValuesCB). Let’s examine the units associated with the other sample types.

For the BIOTA sample type, the base unit is Bq/kg, as indicated in the value_bq/kg column. The distinction between wet (W) and dry weight (D) is specified in the basis column.

dfs['BIOTA'][['value_bq/kg', 'basis']].head(1)
value_bq/kg basis
0 0.01014 W

For the SEAWATER sample type, the unit is Bq/m³ as indicated in the value_bq/m³ column.

dfs['SEAWATER'][['value_bq/m³']].head(1)
value_bq/m³
0 5.3

We can now review the units that are available in MARIS:

pd.read_excel(unit_lut_path())[['unit_id', 'unit', 'unit_sanitized']]
unit_id unit unit_sanitized
0 -1 Not applicable Not applicable
1 0 NOT AVAILABLE NOT AVAILABLE
2 1 Bq/m3 Bq per m3
3 2 Bq/m2 Bq per m2
4 3 Bq/kg Bq per kg
5 4 Bq/kgd Bq per kgd
6 5 Bq/kgw Bq per kgw
7 6 kg/kg kg per kg
8 7 TU TU
9 8 DELTA/mill DELTA per mill
10 9 atom/kg atom per kg
11 10 atom/kgd atom per kgd
12 11 atom/kgw atom per kgw
13 12 atom/l atom per l
14 13 Bq/kgC Bq per kgC

We define unit renaming rules for HELCOM in an ad hoc way:

Exported source
lut_units = {
    'SEAWATER': 1,  # 'Bq/m3'
    'SEDIMENT': '_UNIT', # account for in SplitSedimentValuesCB.
    'BIOTA': {
        'D': 4,  # 'Bq/kgd'
        'W': 5,  # 'Bq/kgw'
        'F': 5   # 'Bq/kgw' (assumed to be 'Fresh', so set to wet)
    }
}

We define the RemapUnitCB callback to set the UNIT column in the DataFrames based on the lookup table lut_units.


source

RemapUnitCB

 RemapUnitCB (lut_units:dict={'SEAWATER': 1, 'SEDIMENT': '_UNIT', 'BIOTA':
              {'D': 4, 'W': 5, 'F': 5}})

Set the unit id column in the DataFrames based on a lookup table.

Type Default Details
lut_units dict {‘SEAWATER’: 1, ‘SEDIMENT’: ’_UNIT’, ‘BIOTA’: {‘D’: 4, ‘W’: 5, ‘F’: 5}} Dictionary containing renaming rules for different unit categories
Exported source
class RemapUnitCB(Callback):
    "Set the `unit` id column in the DataFrames based on a lookup table."
    def __init__(self, 
                 lut_units: dict=lut_units # Dictionary containing renaming rules for different unit categories
                ):
        fc.store_attr()

    def __call__(self, tfm: Transformer):
        for grp in tfm.dfs.keys():
            if grp == 'SEAWATER':
                tfm.dfs[grp]['UNIT'] = self.lut_units[grp]
            elif grp == 'BIOTA':
                tfm.dfs[grp]['UNIT'] = tfm.dfs[grp]['basis'].apply(lambda x: lut_units[grp].get(x, 0))
            elif grp == 'SEDIMENT':
                tfm.dfs[grp]['UNIT'] = tfm.dfs[grp]['_UNIT']

Apply the transformer for callback RemapUnitCB(). Then, print the unique UNIT for the SEAWATER dataframe.

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
                            SplitSedimentValuesCB(coi_sediment),
                            SanitizeValueCB(coi_val),
                            NormalizeUncCB(),
                            RemapUnitCB(),
                            CompareDfsAndTfmCB(dfs)
                            ])

tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')

for grp in ['BIOTA', 'SEDIMENT', 'SEAWATER']:
    print(f"{grp}: {tfm()[grp]['UNIT'].unique()}")
                                               BIOTA  SEAWATER  SEDIMENT
Original row count (dfs)                       16124     21634     40744
Transformed row count (tfm.dfs)                16094     21481     70451
Rows removed from original (tfm.dfs_removed)      30       153       144
Rows created in transformed (tfm.dfs_created)      0         0     29851 

BIOTA: [5 0 4]
SEDIMENT: [3 2]
SEAWATER: [1]

Remap detection limit

Detection limits are encoded as follows in MARIS:

pd.read_excel(detection_limit_lut_path())
id name name_sanitized
0 -1 Not applicable Not applicable
1 0 Not Available Not available
2 1 = Detected value
3 2 < Detection limit
4 3 ND Not detected
5 4 DE Derived
Exported source
lut_dl = lambda: pd.read_excel(detection_limit_lut_path(), usecols=['name','id']).set_index('name').to_dict()['id']

Based on columns of interest for each sample type:

Exported source
coi_dl = {'SEAWATER' : {'VALUE' : 'value_bq/m³',
                       'UNC' : 'error%_m³',
                       'DL' : '< value_bq/m³'},
          'BIOTA':  {'VALUE' : 'value_bq/kg',
                     'UNC' : 'error%',
                     'DL' : '< value_bq/kg'},
          'SEDIMENT': {
              'VALUE' : '_VALUE',
              'UNC' : '_UNC',
              'DL' : '_DL'}}

In some cases the detection limit is not provided in the HELCOM dataset. To handle this, we define the RemapDetectionLimitCB callback to process the detection limit (DL) column by applying the following logic: - Lookup Mapping: Maps existing detection limit values using a lookup table provided by fn_lut. This table translates specific detection limit indicators to standardized values. - Equal Condition Assignment: If both the activity value and its uncertainty are present, and the detection limit is not already defined in the lookup table, the detection limit is set to ‘=’ (indicating a detected value). - Handling Unmatched Values: Any detection limit values not found in the lookup table are set to ‘Not Available’, ensuring all entries are accounted for in the final dataset


source

RemapDetectionLimitCB

 RemapDetectionLimitCB (coi:dict, fn_lut:Callable)

Remap value type to MARIS format.

Type Details
coi dict Configuration options for column names
fn_lut Callable Function that returns a lookup table
Exported source
class RemapDetectionLimitCB(Callback):
    "Remap value type to MARIS format."
    
    def __init__(self, 
                 coi: dict,  # Configuration options for column names
                 fn_lut: Callable  # Function that returns a lookup table
                ):
        fc.store_attr()

    def __call__(self, tfm: Transformer):
        lut = self.fn_lut()
        for grp in tfm.dfs:
            df = tfm.dfs[grp]
            self._update_detection_limit(df, grp, lut)

    def _update_detection_limit(self, df: pd.DataFrame, grp: str, lut: dict) -> None:
        if grp not in coi_dl:
            raise ValueError(f"Group '{grp}' not found in coi_dl configuration.")
        
        value_col, uncertainty_col, detection_col = self._get_column_names(grp)
        df['DL'] = df[detection_col]
        self._set_detection_limits(df, value_col, uncertainty_col, lut)

    def _get_column_names(self, grp: str) -> tuple:
        "Retrieve column names for the group."
        return coi_dl[grp]['VALUE'], coi_dl[grp]['UNC'], coi_dl[grp]['DL']

    def _set_detection_limits(self, df: pd.DataFrame, value_col: str, uncertainty_col: str, lut: dict) -> None:
        self._apply_equal_condition(df, value_col, uncertainty_col, lut)
        self._set_unmatched_to_not_available(df, lut)
        self._map_detection_limits(df, lut)

    def _apply_equal_condition(self, df: pd.DataFrame, value_col: str, uncertainty_col: str, lut: dict) -> None:
        "Apply condition to set detection limits to '='."
        # Set detection limits to '=' if there is a value and uncertainty and 'DL' value is not 
        # in the lookup table.
        condition_eq = (df[value_col].notna() & df[uncertainty_col].notna() & ~df['DL'].isin(lut.keys()))
        df.loc[condition_eq, 'DL'] = '='

    def _set_unmatched_to_not_available(self, df: pd.DataFrame, lut: dict) -> None:
        "Set unmatched detection limits to 'Not Available'."
        # Set detection limits to 'Not Available' if 'DL' value is not in the lookup table.
        df.loc[~df['DL'].isin(lut.keys()), 'DL'] = 'Not Available'

    def _map_detection_limits(self, df: pd.DataFrame, lut: dict) -> None:
        "Map detection limits using the lookup table."
        df['DL'] = df['DL'].map(lut)
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
                            SplitSedimentValuesCB(coi_sediment),
                            SanitizeValueCB(coi_val),
                            NormalizeUncCB(),                  
                            RemapUnitCB(),
                            RemapDetectionLimitCB(coi_dl, lut_dl),
                            CompareDfsAndTfmCB(dfs)
                            ])

tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')

for grp in ['BIOTA', 'SEDIMENT', 'SEAWATER']:
    print(f'Unique DL values for {grp}: {tfm.dfs[grp]["DL"].unique()}')
                                               BIOTA  SEAWATER  SEDIMENT
Original row count (dfs)                       16124     21634     40744
Transformed row count (tfm.dfs)                16094     21481     70451
Rows removed from original (tfm.dfs_removed)      30       153       144
Rows created in transformed (tfm.dfs_created)      0         0     29851 

Unique DL values for BIOTA: [2 1 0]
Unique DL values for SEDIMENT: [1 2 0]
Unique DL values for SEAWATER: [1 2 0]

Remap Biota species

FEEDBACK TO DATA PROVIDER

Discrepancies have been identified between some rubin codes in the HELCOM Biota dataset and the entries in the RUBIN_NAME lookup table. These discrepancies include typographical errors and trailing spaces, as illustrated below.

set(dfs['BIOTA']['rubin']) - set(read_csv('RUBIN_NAME.csv')['RUBIN'])
{'CHAR BALT', 'FUCU SPP', 'FUCU VES ', 'FURC LUMB', 'GADU MOR  ', 'STUC PECT'}

Lets review the data that includes inconsistent entries for the rubin column:

rows_to_show = 5 
df = dfs['BIOTA'][dfs['BIOTA']['rubin'].isin(set(dfs['BIOTA']['rubin']) - set(read_csv('RUBIN_NAME.csv')['RUBIN']))]
print (f"Number of inconsistent entries for the `rubin` column: {len(df)}")
with pd.option_context('display.max_columns', None):
    display(df.head(rows_to_show))
Number of inconsistent entries for the `rubin` column: 34
key nuclide method < value_bq/kg value_bq/kg basis error% number date_of_entry_x country laboratory sequence date year month day station latitude ddmmmm latitude dddddd longitude ddmmmm longitude dddddd sdepth rubin biotatype tissue no length weight dw% loi% mors_subbasin helcom_subbasin date_of_entry_y
13585 BVTIG2012042 K40 VTIG01 NaN 144.00000 W 6.63 NaN 04/07/16 00:00:00 6.0 VTIG 2012042 12/15/12 00:00:00 2012 12.0 15.0 BARC11 54.4717 54.7862 13.5096 13.8493 37.0 GADU MOR F 5 14.0 48.79 1414.29 19.2 92.9 2.0 2 04/07/16 00:00:00
13586 BVTIG2012042 CS137 VTIG01 NaN 6.17000 W 6.03 NaN 04/07/16 00:00:00 6.0 VTIG 2012042 12/15/12 00:00:00 2012 12.0 15.0 BARC11 54.4717 54.7862 13.5096 13.8493 37.0 GADU MOR F 5 14.0 48.79 1414.29 19.2 92.9 2.0 2 04/07/16 00:00:00
13587 BVTIG2012042 CS134 VTIG01 < 0.02366 W NaN NaN 04/07/16 00:00:00 6.0 VTIG 2012042 12/15/12 00:00:00 2012 12.0 15.0 BARC11 54.4717 54.7862 13.5096 13.8493 37.0 GADU MOR F 5 14.0 48.79 1414.29 19.2 92.9 2.0 2 04/07/16 00:00:00
13594 BVTIG2012045 K40 VTIG01 NaN 131.00000 W 6.62 NaN 04/07/16 00:00:00 6.0 VTIG 2012045 12/16/12 00:00:00 2012 12.0 16.0 B12 54.1385 54.2308 11.4691 11.7818 21.0 GADU MOR F 5 15.0 38.87 1128.67 18.7 92.7 5.0 16 04/07/16 00:00:00
13595 BVTIG2012045 CS137 VTIG01 NaN 5.77000 W 6.03 NaN 04/07/16 00:00:00 6.0 VTIG 2012045 12/16/12 00:00:00 2012 12.0 16.0 B12 54.1385 54.2308 11.4691 11.7818 21.0 GADU MOR F 5 15.0 38.87 1128.67 18.7 92.7 5.0 16 04/07/16 00:00:00

We will remap the HELCOM RUBIN column to the MARIS SPECIES column using the IMFA (Inspect, Match, Fix, Apply) pattern. First lets inspect the RUBIN_NAME.csv file provided by HELCOM, which describes the nomenclature of BIOTA species.

read_csv('RUBIN_NAME.csv').head()
RUBIN_ID RUBIN SCIENTIFIC NAME ENGLISH NAME
0 11 ABRA BRA ABRAMIS BRAMA BREAM
1 12 ANGU ANG ANGUILLA ANGUILLA EEL
2 13 ARCT ISL ARCTICA ISLANDICA ISLAND CYPRINE
3 14 ASTE RUB ASTERIAS RUBENS COMMON STARFISH
4 15 CARD EDU CARDIUM EDULE COCKLE

Now we try to match the SCIENTIFIC NAME column of HELCOM BIOTA dataset to the species column of the MARIS species lookup table, again using a Remapper object:

remapper = Remapper(provider_lut_df=read_csv('RUBIN_NAME.csv'),
                    maris_lut_fn=species_lut_path,
                    maris_col_id='species_id',
                    maris_col_name='species',
                    provider_col_to_match='SCIENTIFIC NAME',
                    provider_col_key='RUBIN',
                    fname_cache='species_helcom.pkl'
                    )

remapper.generate_lookup_table(as_df=True)
remapper.select_match(match_score_threshold=1, verbose=True)
Processing: 100%|██████████| 46/46 [00:07<00:00,  5.80it/s]
38 entries matched the criteria, while 8 entries had a match score of 1 or higher.
matched_maris_name source_name match_score
source_key
STIZ LUC Sander lucioperca STIZOSTEDION LUCIOPERCA 10
LAMI SAC Laminaria japonica LAMINARIA SACCHARINA 7
CARD EDU Cardiidae CARDIUM EDULE 6
CH HI;BA Macoma balthica CHARA BALTICA 6
ENCH CIM Echinodermata ENCHINODERMATA CIM 5
PSET MAX Pinctada maxima PSETTA MAXIMA 5
MACO BAL Macoma balthica MACOMA BALTICA 1
STUC PEC Stuckenia pectinata STUCKENIA PECTINATE 1

Below, we will correct the entries that were not properly matched by the Remapper object:

Exported source
fixes_biota_species = {
    'STIZOSTEDION LUCIOPERCA': 'Sander luciopercas',
    'LAMINARIA SACCHARINA': 'Saccharina latissima',
    'CARDIUM EDULE': 'Cerastoderma edule',
    'CHARA BALTICA': NA,
    'PSETTA MAXIMA': 'Scophthalmus maximus'
    }

And give the remapper another try:

remapper.generate_lookup_table(fixes=fixes_biota_species)
remapper.select_match(match_score_threshold=1, verbose=True)
Processing:   0%|          | 0/46 [00:00<?, ?it/s]Processing: 100%|██████████| 46/46 [00:07<00:00,  6.23it/s]
42 entries matched the criteria, while 4 entries had a match score of 1 or higher.
matched_maris_name source_name match_score
source_key
ENCH CIM Echinodermata ENCHINODERMATA CIM 5
MACO BAL Macoma balthica MACOMA BALTICA 1
STIZ LUC Sander lucioperca STIZOSTEDION LUCIOPERCA 1
STUC PEC Stuckenia pectinata STUCKENIA PECTINATE 1

Visual inspection of the remaining unperfectly matched entries seem acceptable to proceed.

We can now use the generic RemapCB callback to perform the remapping of the RUBIN column to the species column after having defined the lookup table lut_biota.

Exported source
lut_biota = lambda: Remapper(provider_lut_df=read_csv('RUBIN_NAME.csv'),
                             maris_lut_fn=species_lut_path,
                             maris_col_id='species_id',
                             maris_col_name='species',
                             provider_col_to_match='SCIENTIFIC NAME',
                             provider_col_key='RUBIN',
                             fname_cache='species_helcom.pkl'
                             ).generate_lookup_table(fixes=fixes_biota_species, as_df=False, overwrite=False)
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
    RemapCB(fn_lut=lut_biota, col_remap='SPECIES', col_src='rubin', dest_grps='BIOTA')
    ])
tfm()
tfm.dfs['BIOTA'].columns
tfm.dfs['BIOTA']['SPECIES'].unique()
array([  99,  243,   50,  139,  270,  192,  191,  284,   84,  269,  122,
         96,  287,  279,  278,  288,  286,  244,  129,  275,  271,  285,
        283,  247,  120,   59,  280,  274,  273,  290,  289,  272,  277,
        276,   21,  282,  110,  281,  245,  704, 1524,  703,    0,  621,
         60])

Remap Body Part

Let’s inspect the TISSUE.csv file provided by HELCOM describing the tissue nomenclature. Biota tissue is known as body part in the MARIS data set.

remapper = Remapper(provider_lut_df=read_csv('TISSUE.csv'),
                    maris_lut_fn=bodyparts_lut_path,
                    maris_col_id='bodypar_id',
                    maris_col_name='bodypar',
                    provider_col_to_match='TISSUE_DESCRIPTION',
                    provider_col_key='TISSUE',
                    fname_cache='tissues_helcom.pkl'
                    )

remapper.generate_lookup_table(as_df=True)
remapper.select_match(match_score_threshold=1, verbose=True)
Processing: 100%|██████████| 29/29 [00:00<00:00, 137.11it/s]
21 entries matched the criteria, while 8 entries had a match score of 1 or higher.
matched_maris_name source_name match_score
source_key
3 Flesh without bones WHOLE FISH WITHOUT HEAD AND ENTRAILS 20
2 Flesh without bones WHOLE FISH WITHOUT ENTRAILS 13
8 Soft parts SKIN/EPIDERMIS 10
5 Flesh without bones FLESH WITHOUT BONES (FILETS) 9
1 Whole animal WHOLE FISH 5
12 Brain ENTRAILS 5
15 Stomach and intestine STOMACH + INTESTINE 3
41 Whole animal WHOLE ANIMALS 1

We address several entries that were not correctly matched by the Remapper object, as detailed below:

Exported source
fixes_biota_tissues = {
    'WHOLE FISH WITHOUT HEAD AND ENTRAILS': 'Whole animal eviscerated without head',
    'WHOLE FISH WITHOUT ENTRAILS': 'Whole animal eviscerated',
    'SKIN/EPIDERMIS': 'Skin',
    'ENTRAILS': 'Viscera'
    }
remapper.generate_lookup_table(as_df=True, fixes=fixes_biota_tissues)
remapper.select_match(match_score_threshold=1, verbose=True)
Processing:   0%|          | 0/29 [00:00<?, ?it/s]Processing: 100%|██████████| 29/29 [00:00<00:00, 138.84it/s]
25 entries matched the criteria, while 4 entries had a match score of 1 or higher.
matched_maris_name source_name match_score
source_key
5 Flesh without bones FLESH WITHOUT BONES (FILETS) 9
1 Whole animal WHOLE FISH 5
15 Stomach and intestine STOMACH + INTESTINE 3
41 Whole animal WHOLE ANIMALS 1

Visual inspection of the remaining unperfectly matched entries seem acceptable to proceed.

We can now use the generic RemapCB callback to perform the remapping of the TISSUE column to the BODY_PART column after having defined the lookup table lut_tissues.

Exported source
lut_tissues = lambda: Remapper(provider_lut_df=read_csv('TISSUE.csv'),
                               maris_lut_fn=bodyparts_lut_path,
                               maris_col_id='bodypar_id',
                               maris_col_name='bodypar',
                               provider_col_to_match='TISSUE_DESCRIPTION',
                               provider_col_key='TISSUE',
                               fname_cache='tissues_helcom.pkl'
                               ).generate_lookup_table(fixes=fixes_biota_tissues, as_df=False, overwrite=False)
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
    RemapCB(fn_lut=lut_biota, col_remap='SPECIES', col_src='rubin', dest_grps='BIOTA'),
    RemapCB(fn_lut=lut_tissues, col_remap='BODY_PART', col_src='tissue', dest_grps='BIOTA'),
    ])

print(tfm()['BIOTA'][['tissue', 'BODY_PART']][:5])
   tissue  BODY_PART
0       5         52
1       5         52
2       5         52
3       5         52
4       5         52

Remap Biological Group

lut_biogroup_from_biota reads the file at species_lut_path() and from the contents of this file creates a dictionary linking species_id to biogroup_id.

Exported source
lut_biogroup_from_biota = lambda: get_lut(src_dir=species_lut_path().parent, fname=species_lut_path().name, 
                               key='species_id', value='biogroup_id')
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
    RemapCB(fn_lut=lut_biota, col_remap='SPECIES', col_src='rubin', dest_grps='BIOTA'),
    RemapCB(fn_lut=lut_tissues, col_remap='BODY_PART', col_src='tissue', dest_grps='BIOTA'),
    RemapCB(fn_lut=lut_biogroup_from_biota, col_remap='BIO_GROUP', col_src='SPECIES', dest_grps='BIOTA')
    ])

print(tfm()['BIOTA']['BIO_GROUP'].unique())
[ 4  2 14 11  8  3  0]

Remap Sediment Types

FEEDBACK TO DATA PROVIDER

The SEDI values 56 and 73 are not found in the SEDIMENT_TYPE.csv lookup table provided. Note there are many nan values. We reassign them to -99 for now but should be clarified/fixed. This is demonstrated below.

# Load the sediment type lookup table
df_sed_lut = read_csv('SEDIMENT_TYPE.csv')

# Load data with caching enabled
dfs = load_data(src_dir, use_cache=True)

# Extract unique sediment types from the dataset and lookup table
sediment_sedi = set(dfs['SEDIMENT']['sedi'].unique())
lookup_sedi = set(df_sed_lut['SEDI'])

# Identify missing sediment types
missing = sediment_sedi - lookup_sedi

# Output results
print(f"Missing sediment type values in HELCOM lookup table: {missing if missing else 'None'}")
print(f"Number of `56.0` values: {(dfs['SEDIMENT']['sedi']== 56.0).sum()}")
print(f"Number of `73.0` values: {(dfs['SEDIMENT']['sedi']== 73.0).sum()}")
print(f"Number of `NA` values: {(dfs['SEDIMENT']['sedi'].isna()).sum()}")
Missing sediment type values in HELCOM lookup table: {56.0, 73.0, nan}
Number of `56.0` values: 12
Number of `73.0` values: 3
Number of `NA` values: 1239

Once again, we employ the IMFA (Inspect, Match, Fix, Apply) pattern to remap the HELCOM sediment types. Let’s inspect the SEDIMENT_TYPE.csv file provided by HELCOM describing the sediment type nomenclature:

with pd.option_context('display.max_columns', None):
    display(read_csv('SEDIMENT_TYPE.csv').T)
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
SEDI -99 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 20 21 22 23 24 25 30 31 32 33 34 35 40 41 42 43 44 45 46 47 48 49 50 51 52 54 55 57 58 59
SEDIMENT TYPE NO DATA GRAVEL SAND FINE SAND SILT CLAY MUD GLACIAL SOFT SULPHIDIC Fe-Mg CONCRETIONS SAND AND GRAVEL PURE SAND SAND AND FINE SAND SAND AND SILT SAND AND CLAY SAND AND MUD FINE SAND AND GRAVEL FINE SAND AND SAND PURE FINE SAND FINE SAND AND SILT FINE SAND AND CLAY FINE SAND AND MUD SILT AND GRAVEL SILT AND SAND SILT AND FINE SAND PURE SILT SILT AND CLAY SILT AND MUD CLAY AND GRAVEL CLAY AND SAND CLAY AND FINE SAND CLAY AND SILT PURE CLAY CLAY AND MUD CLACIAL CLAY SOFT CLAY SULPHIDIC CLAY CLAY AND Fe-Mg CONCRETIONS MUD AND GARVEL MUD AND SAND MUD AND FINE SAND MUD AND CLAY PURE MUD SOFT MUD SULPHIDIC MUD MUD AND Fe-Mg CONCRETIONS
RECOMMENDED TO BE USED NaN YES YES NO YES YES YES NO NO NO (ONLY TO USE AS ADJECTIVE) NO (ONLY TO USE AS ADJECTIVE) YES NO NO YES YES YES NO NO NO NO NO NO YES YES NO NO YES YES YES YES NO YES NO YES NO NO YES YES YES YES NO YES NO NO YES YES

Let’s try to match as many as possible of the HELCOM sediment types to the MARIS standard sediment types:

remapper = Remapper(provider_lut_df=read_csv('SEDIMENT_TYPE.csv'),
                    maris_lut_fn=sediments_lut_path,
                    maris_col_id='sedtype_id',
                    maris_col_name='sedtype',
                    provider_col_to_match='SEDIMENT TYPE',
                    provider_col_key='SEDI',
                    fname_cache='sediments_helcom.pkl'
                    )

remapper.generate_lookup_table(as_df=True)
remapper.select_match(match_score_threshold=1, verbose=True)
Processing: 100%|██████████| 47/47 [00:00<00:00, 137.98it/s]
44 entries matched the criteria, while 3 entries had a match score of 1 or higher.
matched_maris_name source_name match_score
source_key
-99 Soft NO DATA 5
50 Mud and gravel MUD AND GARVEL 2
46 Glacial clay CLACIAL CLAY 1

We address the remaining unmatched values by adding fixes_sediments:

Exported source
fixes_sediments = {
    'NO DATA': NA
}
remapper.generate_lookup_table(as_df=True, fixes=fixes_sediments)
remapper.select_match(match_score_threshold=1, verbose=True)
Processing: 100%|██████████| 47/47 [00:00<00:00, 120.73it/s]
44 entries matched the criteria, while 3 entries had a match score of 1 or higher.
matched_maris_name source_name match_score
source_key
-99 (Not available) NO DATA 2
50 Mud and gravel MUD AND GARVEL 2
46 Glacial clay CLACIAL CLAY 1

Upon visual inspection, the remaining values are deemed acceptable for further processing. We will now implement a callback to remap the SEDI values to their corresponding MARIS standard sediment types, designated as SED_TYPE. The HELCOM SEDIMENT dataset contains SEDI values that are absent from the HELCOM lookup table. These values will be reassigned to -99, indicating ‘Not Available’ as per the HELCOM standards.

Reassign the SEDI values of 56, 73, and nan to -99 (Not available):


source

RemapSedimentCB

 RemapSedimentCB (fn_lut:Callable, sed_grp_name:str='SEDIMENT',
                  sed_col_name:str='sedi', replace_lut:dict=None)

Lookup sediment id using lookup table.

Type Default Details
fn_lut Callable Function that returns the lookup table dictionary
sed_grp_name str SEDIMENT The name of the sediment group
sed_col_name str sedi The name of the sediment column
replace_lut dict None Dictionary for replacing SEDI values
Exported source
sed_replace_lut = {
    56: -99,
    73: -99,
    NA: -99
}
Exported source
class RemapSedimentCB(Callback):
    "Lookup sediment id using lookup table."
    def __init__(self, 
                 fn_lut: Callable,  # Function that returns the lookup table dictionary
                 sed_grp_name: str = 'SEDIMENT',  # The name of the sediment group
                 sed_col_name: str = 'sedi',  # The name of the sediment column
                 replace_lut: dict = None  # Dictionary for replacing SEDI values
                 ):
        fc.store_attr()

    def __call__(self, tfm: Transformer):
        "Remap sediment types using lookup table."
        df = tfm.dfs[self.sed_grp_name]
        self._fix_inconsistent_values(df)
        self._map_sediment_types(df)

    def _fix_inconsistent_values(self, df: pd.DataFrame) -> None:
        "Fix inconsistent values using the replace lookup table."
        if self.replace_lut:
            df[self.sed_col_name] = df[self.sed_col_name].replace(self.replace_lut)
            if NA in self.replace_lut:
                df[self.sed_col_name] = df[self.sed_col_name].fillna(self.replace_lut[NA])

    def _map_sediment_types(self, df: pd.DataFrame) -> None:
        "Map sediment types using the lookup table."
        lut = self.fn_lut()
        df['SED_TYPE'] = df[self.sed_col_name].map(
            lambda x: lut.get(x, Match(0, None, None, None)).matched_id
        )
Exported source
lut_sediments = lambda: Remapper(provider_lut_df=read_csv('SEDIMENT_TYPE.csv'),
                                 maris_lut_fn=sediments_lut_path,
                                 maris_col_id='sedtype_id',
                                 maris_col_name='sedtype',
                                 provider_col_to_match='SEDIMENT TYPE',
                                 provider_col_key='SEDI',
                                 fname_cache='sediments_helcom.pkl'
                                 ).generate_lookup_table(fixes=fixes_sediments, as_df=False, overwrite=False)

Utilize the RemapSedimentCB callback to remap the SEDI values in the HELCOM dataset to the corresponding MARIS standard sediment type, referred to as SED_TYPE.

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
    RemapSedimentCB(fn_lut=lut_sediments, replace_lut=sed_replace_lut)
    ])

tfm()
tfm.dfs['SEDIMENT']['SED_TYPE'].unique()
array([ 0,  2, 58, 30, 59, 55, 56, 36, 29, 47,  4, 54, 33,  6, 44, 42, 48,
       61, 57, 28, 49, 32, 45, 39, 46, 38, 31, 60, 62, 26, 53, 52,  1, 51,
       37, 34, 50,  7, 10, 41, 43, 35])

Remap Filtering Status

HELCOM filtered status is encoded as follows in the FILT column:

dfs = load_data(src_dir, use_cache=True)
get_unique_across_dfs(dfs, col_name='filt', as_df=True).head(5)
index value
0 0 F
1 1 NaN
2 2 N
3 3 n

MARIS uses a different encoding for filtered status:

pd.read_excel(filtered_lut_path())
id name
0 -1 Not applicable
1 0 Not available
2 1 Yes
3 2 No

For only four categories to remap, the Remapper is an overkill. We can use a simple dictionary to map the values:

Exported source
lut_filtered = {
    'N': 2, # No
    'n': 2, # No
    'F': 1 # Yes
}

RemapFiltCB converts the HELCOM filt data to the MARIS FILT format.


source

RemapFiltCB

 RemapFiltCB (lut_filtered:dict={'N': 2, 'n': 2, 'F': 1})

Lookup filt value in dataframe using the lookup table.

Type Default Details
lut_filtered dict {‘N’: 2, ‘n’: 2, ‘F’: 1} Dictionary mapping filt codes to their corresponding names
Exported source
class RemapFiltCB(Callback):
    "Lookup filt value in dataframe using the lookup table."
    def __init__(self,
                 lut_filtered: dict=lut_filtered, # Dictionary mapping filt codes to their corresponding names
                ):
        fc.store_attr()

    def __call__(self, tfm):
        for df in tfm.dfs.values():
            if 'filt' in df.columns:
                df['FILT'] = df['filt'].map(lambda x: self.lut_filtered.get(x, 0))

For instance:

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[RemapFiltCB(lut_filtered)])

print(tfm()['SEAWATER']['FILT'].unique())
[0 2 1]

Add Sample ID

The AddSampleIDCB callback generates a sample ID, SMP_ID, from the HELCOM KEY. The custom_enums attribute of the Transformer stores dictionaries of custom enums. In this context, custom_enums maps the HELCOM KEY to an integer, which is then used to create the SMP_ID. Custom enums are created for each group and included in the output NetCDF file.

Need to santise the sample id values for the enum.


source

AddSampleIDCB

 AddSampleIDCB ()

Generate a SMP_ID from the KEY values in the HELCOM dataset. Each KEY is mapped to a unique integer, with the mapping stored in an enumeration (i.e., smp_id).

Exported source
class AddSampleIDCB(Callback):
    "Generate a SMP_ID from the KEY values in the HELCOM dataset. Each KEY is mapped to a unique integer, with the mapping stored in an enumeration (i.e., smp_id)."
    def __call__(self, tfm: Transformer):
        for grp, df in tfm.dfs.items():
            # Generate and store the SMP_ID map
            smp_id_map = self._generate_sample_id_map(df)
            tfm.custom_maps[grp]['SMP_ID'] = smp_id_map
            # Create SMP_ID column in the DataFrame
            self._create_smp_id(df, smp_id_map)
        
    def _generate_sample_id_map(self, df: pd.DataFrame) -> dict:
        """Enumerate unique 'key' values and map them to integers."""
        return {key: idx for idx, key in enumerate(df['key'].unique())}

    def _create_smp_id(self, df: pd.DataFrame, smp_id_map: dict) -> None:
        """Map 'key' values to 'SMP_ID' using the provided enum."""
        df['SMP_ID'] = df['key'].map(smp_id_map)
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
                        AddSampleIDCB(),
                        CompareDfsAndTfmCB(dfs)
                        ])
tfm()

print(f'Number of unique sample ids in SEAWATER: {tfm.dfs["SEAWATER"]["SMP_ID"].unique().size}')
print(f'Number of unique sample ids in BIOTA: {tfm.dfs["BIOTA"]["SMP_ID"].unique().size}')
print(f'Number of unique sample ids in SEDIMENT: {tfm.dfs["SEDIMENT"]["SMP_ID"].unique().size}')
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
Number of unique sample ids in SEAWATER: 9784
Number of unique sample ids in BIOTA: 4792
Number of unique sample ids in SEDIMENT: 14234
                                               BIOTA  SEAWATER  SEDIMENT
Original row count (dfs)                       16124     21634     40744
Transformed row count (tfm.dfs)                16124     21634     40744
Rows removed from original (tfm.dfs_removed)       0         0         0
Rows created in transformed (tfm.dfs_created)      0         0         0 
with pd.option_context('display.max_columns', None):
    display(tfm.dfs['SEAWATER'].head())
key nuclide method < value_bq/m³ value_bq/m³ error%_m³ date_of_entry_x country laboratory sequence date year month day station latitude (ddmmmm) latitude (dddddd) longitude (ddmmmm) longitude (dddddd) tdepth sdepth salin ttemp filt mors_subbasin helcom_subbasin date_of_entry_y SMP_ID
0 WKRIL2012003 CS137 NaN NaN 5.3 32.0 08/20/14 00:00:00 90.0 KRIL 2012003.0 05/23/12 00:00:00 2012.0 5.0 23.0 RU10 60.05 60.0833 29.20 29.3333 NaN 0.0 NaN NaN NaN 11.0 11.0 08/20/14 00:00:00 0
1 WKRIL2012004 CS137 NaN NaN 19.9 20.0 08/20/14 00:00:00 90.0 KRIL 2012004.0 05/23/12 00:00:00 2012.0 5.0 23.0 RU10 60.05 60.0833 29.20 29.3333 NaN 29.0 NaN NaN NaN 11.0 11.0 08/20/14 00:00:00 1
2 WKRIL2012005 CS137 NaN NaN 25.5 20.0 08/20/14 00:00:00 90.0 KRIL 2012005.0 06/17/12 00:00:00 2012.0 6.0 17.0 RU11 59.26 59.4333 23.09 23.1500 NaN 0.0 NaN NaN NaN 11.0 3.0 08/20/14 00:00:00 2
3 WKRIL2012006 CS137 NaN NaN 17.0 29.0 08/20/14 00:00:00 90.0 KRIL 2012006.0 05/24/12 00:00:00 2012.0 5.0 24.0 RU19 60.15 60.2500 27.59 27.9833 NaN 0.0 NaN NaN NaN 11.0 11.0 08/20/14 00:00:00 3
4 WKRIL2012007 CS137 NaN NaN 22.2 18.0 08/20/14 00:00:00 90.0 KRIL 2012007.0 05/24/12 00:00:00 2012.0 5.0 24.0 RU19 60.15 60.2500 27.59 27.9833 NaN 39.0 NaN NaN NaN 11.0 11.0 08/20/14 00:00:00 4

Add depths

The HELCOM dataset includes a column for the sampling depth (SDEPTH) for the SEAWATER and BIOTA datasets. Additionally, it contains a column for the total depth (TDEPTH) applicable to both the SEDIMENT and SEAWATER datasets. In this section, we will create a callback to incorporate both the sampling depth (smp_depth) and total depth (tot_depth) into the MARIS dataset.


source

AddDepthCB

 AddDepthCB ()

Ensure depth values are floats and add ‘SMP_DEPTH’ and ‘TOT_DEPTH’ columns.

Exported source
class AddDepthCB(Callback):
    "Ensure depth values are floats and add 'SMP_DEPTH' and 'TOT_DEPTH' columns."
    def __call__(self, tfm: Transformer):
        for df in tfm.dfs.values():
            if 'sdepth' in df.columns:
                df['SMP_DEPTH'] = df['sdepth'].astype(float)
            if 'tdepth' in df.columns:
                df['TOT_DEPTH'] = df['tdepth'].astype(float)
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[AddDepthCB()])
tfm()
for grp in tfm.dfs.keys():  
    if 'SMP_DEPTH' in tfm.dfs[grp].columns and 'TOT_DEPTH' in tfm.dfs[grp].columns:
        print(f'{grp}:', tfm.dfs[grp][['SMP_DEPTH','TOT_DEPTH']].drop_duplicates())
    elif 'SMP_DEPTH' in tfm.dfs[grp].columns:
        print(f'{grp}:', tfm.dfs[grp][['SMP_DEPTH']].drop_duplicates())
    elif 'TOT_DEPTH' in tfm.dfs[grp].columns:
        print(f'{grp}:', tfm.dfs[grp][['TOT_DEPTH']].drop_duplicates())
BIOTA:        SMP_DEPTH
0            NaN
78         22.00
88         39.00
96         40.00
183        65.00
...          ...
15874      43.10
15921      30.43
15984       7.60
15985       5.50
15988      11.20

[301 rows x 1 columns]
SEAWATER:        SMP_DEPTH  TOT_DEPTH
0            0.0        NaN
1           29.0        NaN
4           39.0        NaN
6           62.0        NaN
10          71.0        NaN
...          ...        ...
21059       15.0       15.0
21217        7.0       16.0
21235       19.2       21.0
21312        1.0        5.5
21521        0.5        NaN

[1686 rows x 2 columns]
SEDIMENT:        TOT_DEPTH
0           25.0
6           61.0
19          31.0
33          39.0
42          36.0
...          ...
35882        3.9
36086      103.0
36449      108.9
36498        4.5
36899      125.0

[195 rows x 1 columns]

Add Salinity

FEEDBACK TO DATA PROVIDER

The HELCOM dataset includes a column for the salinity of the water (SALIN). According to the HELCOM documentation, the SALIN column represents “Salinity of water in PSU units”.

In the SEAWATER dataset, three entries have salinity values greater than 50 PSU. While salinity values greater than 50 PSU are possible, these entries may require further verification. Notably, these three entries have a salinity value of 99.99 PSU, which suggests potential data entry errors.

tfm.dfs['SEAWATER'][tfm.dfs['SEAWATER']['salin'] > 50]
key nuclide method < value_bq/m³ value_bq/m³ error%_m³ date_of_entry_x country laboratory sequence ... tdepth sdepth salin ttemp filt mors_subbasin helcom_subbasin date_of_entry_y SMP_DEPTH TOT_DEPTH
12288 WDHIG1998072 CS137 3 NaN 40.1 1.6 NaN 6.0 DHIG 1998072.0 ... 25.0 0.0 99.99 5.0 F 5.0 15.0 NaN 0.0 25.0
12289 WDHIG1998072 CS134 3 NaN 1.1 23.6 NaN 6.0 DHIG 1998072.0 ... 25.0 0.0 99.99 5.0 F 5.0 15.0 NaN 0.0 25.0
12290 WDHIG1998072 SR90 2 NaN 8.5 1.9 NaN 6.0 DHIG 1998072.0 ... 25.0 0.0 99.99 5.0 F 5.0 15.0 NaN 0.0 25.0

3 rows × 29 columns

Lets add the salinity values to the SEAWATER DataFrame.


source

AddSalinityCB

 AddSalinityCB (salinity_col:str='salin')

Base class for callbacks.

Exported source
class AddSalinityCB(Callback):
    def __init__(self, salinity_col: str = 'salin'):
        self.salinity_col = salinity_col
    "Add salinity to the SEAWATER DataFrame."
    def __call__(self, tfm: Transformer):
        for df in tfm.dfs.values():
            if self.salinity_col in df.columns:
                df['SALINITY'] = df[self.salinity_col].astype(float)
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[AddSalinityCB()])
tfm()
for grp in tfm.dfs.keys():  
    if 'SALINITY' in tfm.dfs[grp].columns:
        print(f'{grp}:', tfm.dfs[grp][['SALINITY']].drop_duplicates())
SEAWATER:        SALINITY
0           NaN
97        7.570
98        7.210
101       7.280
104       7.470
...         ...
21449    11.244
21450     7.426
21451     9.895
21452     2.805
21453     7.341

[2766 rows x 1 columns]

Add Temperature

FEEDBACK TO DATA PROVIDER

The HELCOM dataset includes a column for the temperature of the water (TTEMP). According to the HELCOM documentation, the TTEMP column represents: > ‘Water temperature in Celsius (ºC) degrees of sampled water’

In the SEAWATER dataset, several entries have temperature values greater than 50ºC. These entries may require further verification. Notably, these entries have a temperature value of 99.99ºC, which suggests potential data entry errors, see below.

t_df= tfm.dfs['SEAWATER'][tfm.dfs['SEAWATER']['ttemp'] > 50]
print('Number of entries with temperature greater than 50ºC: ', t_df.shape[0])
t_df.head()
Number of entries with temperature greater than 50ºC:  92
key nuclide method < value_bq/m³ value_bq/m³ error%_m³ date_of_entry_x country laboratory sequence ... longitude (dddddd) tdepth sdepth salin ttemp filt mors_subbasin helcom_subbasin date_of_entry_y SALINITY
5954 WDHIG1995559 CS134 4 NaN 1.7 15.0 NaN 6.0 DHIG 1995559.0 ... 10.2033 13.0 11.0 14.81 99.9 N 5.0 15.0 NaN 14.81
5955 WDHIG1995559 CS137 4 NaN 58.7 2.0 NaN 6.0 DHIG 1995559.0 ... 10.2033 13.0 11.0 14.81 99.9 N 5.0 15.0 NaN 14.81
5960 WDHIG1995569 CS134 4 NaN 1.4 12.0 NaN 6.0 DHIG 1995569.0 ... 10.2777 14.0 12.0 14.80 99.9 N 5.0 15.0 NaN 14.80
5961 WDHIG1995569 CS137 4 NaN 62.8 1.0 NaN 6.0 DHIG 1995569.0 ... 10.2777 14.0 12.0 14.80 99.9 N 5.0 15.0 NaN 14.80
5964 WDHIG1995571 CS134 4 NaN 1.5 17.0 NaN 6.0 DHIG 1995571.0 ... 10.2000 19.0 17.0 14.59 99.9 N 5.0 15.0 NaN 14.59

5 rows × 28 columns

Lets add the temperature values to the SEAWATER DataFrame.


source

AddTemperatureCB

 AddTemperatureCB (temperature_col:str='ttemp')

Base class for callbacks.

Exported source
class AddTemperatureCB(Callback):
    def __init__(self, temperature_col: str = 'ttemp'):
        self.temperature_col = temperature_col
    "Add temperature to the SEAWATER DataFrame."
    def __call__(self, tfm: Transformer ):
        for df in tfm.dfs.values():
            if self.temperature_col in df.columns:
                df['TEMPERATURE'] = df[self.temperature_col].astype(float)
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[AddTemperatureCB()])
tfm()
for grp in tfm.dfs.keys():  
    if 'TEMPERATURE' in tfm.dfs[grp].columns:
        print(f'{grp}:', tfm.dfs[grp][['TEMPERATURE']].drop_duplicates())
SEAWATER:        TEMPERATURE
0              NaN
987           7.80
990           6.50
993           4.10
996           4.80
...            ...
21521         0.57
21523        18.27
21525        21.54
21529         4.94
21537         2.35

[1086 rows x 1 columns]

Add slice position (TOP and BOTTOM)


source

RemapSedSliceTopBottomCB

 RemapSedSliceTopBottomCB ()

Remap Sediment slice top and bottom to MARIS format.

Exported source
class RemapSedSliceTopBottomCB(Callback):
    "Remap Sediment slice top and bottom to MARIS format."
    def __call__(self, tfm: Transformer):
        "Iterate through all DataFrames in the transformer object and remap sediment slice top and bottom."
        tfm.dfs['SEDIMENT']['TOP'] = tfm.dfs['SEDIMENT']['uppsli']
        tfm.dfs['SEDIMENT']['BOTTOM'] = tfm.dfs['SEDIMENT']['lowsli']
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[RemapSedSliceTopBottomCB()])
tfm()
print(tfm.dfs['SEDIMENT'][['TOP','BOTTOM']].head())
    TOP  BOTTOM
0  15.0    20.0
1  20.0    25.0
2  25.0    30.0
3  30.0    35.0
4  35.0    40.0

Add dry weight, wet weight and percentage weight

FEEDBACK TO DATA PROVIDER

Entries for the BASIS value of the BIOTA dataset report a value of F which is not consistent with the HELCOM description provided in the metadata. The GUIDELINES FOR MONITORING OF RADIOACTIVE SUBSTANCES was obtained from here.

Lets take a look at the BIOTA BASIS values:

dfs['BIOTA']['basis'].unique()
array(['W', nan, 'D', 'F'], dtype=object)

Number of entries for each BASIS value:

dfs['BIOTA']['basis'].value_counts()
basis
W    12164
D     3868
F       25
Name: count, dtype: int64
FEEDBACK TO DATA PROVIDER

Some entries for DW% (Dry weight as percentage (%) of fresh weight) are much higher than 100%. Additionally, DW% is repoted as 0% in some cases.

For BIOTA, the number of entries for DW% higher than 100%:

dfs['BIOTA']['dw%'][dfs['BIOTA']['dw%'] > 100].count()
20

For BIOTA, the number of entries for DW% equal to 0%:

dfs['BIOTA']['dw%'][dfs['BIOTA']['dw%'] == 0].count()
6

For SEDIMENT, the number of entries for DW% higher than 100%:

dfs['SEDIMENT']['dw%'][dfs['SEDIMENT']['dw%'] > 100].count()
625

For SEDIMENT, the number of entries for DW% equal to 0%:

dfs['SEDIMENT']['dw%'][dfs['SEDIMENT']['dw%'] == 0].count()
302
FEEDBACK TO DATA PROVIDER

Several SEDIMENT entries have DW% (Dry weight as percentage of fresh weight) values less than 1%. While technically possible, this would indicate samples contained more than 99% water content.

For SEDIMENT, the number of entries for DW% less than 1% but greater than 0.001%:

percent=1
dfs['SEDIMENT']['dw%'][(dfs['SEDIMENT']['dw%'] < percent) & (dfs['SEDIMENT']['dw%'] > 0.001)].count()
24

Lets take a look at the MARIS description of the percentwt, drywt and wetwt variables:

  • percentwt: Dry weight as ratio of fresh weight, expressed as a decimal .
  • drywt: Dry weight in grams.
  • wetwt: Fresh weight in grams.

Lets take a look at the HELCOM dataset, the weight of the sample is not reported for SEDIMENT. However, the percentage dry weight is reported as DW%.

dfs['SEDIMENT'].columns
Index(['key', 'nuclide', 'method', '< value_bq/kg', 'value_bq/kg', 'error%_kg',
       '< value_bq/m²', 'value_bq/m²', 'error%_m²', 'date_of_entry_x',
       'country', 'laboratory', 'sequence', 'date', 'year', 'month', 'day',
       'station', 'latitude (ddmmmm)', 'latitude (dddddd)',
       'longitude (ddmmmm)', 'longitude (dddddd)', 'device', 'tdepth',
       'uppsli', 'lowsli', 'area', 'sedi', 'oxic', 'dw%', 'loi%',
       'mors_subbasin', 'helcom_subbasin', 'sum_link', 'date_of_entry_y'],
      dtype='object')

The BIOTA dataset reports the weight of the sample as WEIGHT and the percentage dry weight as DW%. The BASIS column describes the basis of the value reported. Lets create a callback to include the PERCENTWT, DRYWT and WETWT columns in the MARIS dataset.


source

LookupDryWetPercentWeightCB

 LookupDryWetPercentWeightCB ()

Lookup dry-wet ratio and format for MARIS.

Exported source
class LookupDryWetPercentWeightCB(Callback):
    "Lookup dry-wet ratio and format for MARIS."
    def __call__(self, tfm: Transformer):
        "Iterate through all DataFrames in the transformer object and apply the dry-wet ratio lookup."
        for grp in tfm.dfs.keys():
            if 'dw%' in tfm.dfs[grp].columns:
                self._apply_dry_wet_ratio(tfm.dfs[grp])
            if 'weight' in tfm.dfs[grp].columns and 'basis' in tfm.dfs[grp].columns:
                self._correct_basis(tfm.dfs[grp])
                self._apply_weight(tfm.dfs[grp])

    def _apply_dry_wet_ratio(self, df: pd.DataFrame) -> None:
        "Apply dry-wet ratio conversion and formatting to the given DataFrame."
        df['PERCENTWT'] = df['dw%'] / 100  # Convert percentage to fraction
        df.loc[df['PERCENTWT'] == 0, 'PERCENTWT'] = np.NaN  # Convert 0% to NaN

    def _correct_basis(self, df: pd.DataFrame) -> None:
        "Correct BASIS values. Assuming F = Fresh weight, so F = W"
        df.loc[df['basis'] == 'F', 'basis'] = 'W'

    def _apply_weight(self, df: pd.DataFrame) -> None:
        "Apply weight conversion and formatting to the given DataFrame."
        dry_condition = df['basis'] == 'D'
        wet_condition = df['basis'] == 'W'
        
        df.loc[dry_condition, 'DRYWT'] = df['weight']
        df.loc[dry_condition & df['PERCENTWT'].notna(), 'WETWT'] = df['weight'] / df['PERCENTWT']
        
        df.loc[wet_condition, 'WETWT'] = df['weight']
        df.loc[wet_condition & df['PERCENTWT'].notna(), 'DRYWT'] = df['weight'] * df['PERCENTWT']
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
                            LookupDryWetPercentWeightCB(),
                            CompareDfsAndTfmCB(dfs)
                            ])

tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
print('BIOTA:', tfm.dfs['BIOTA'][['PERCENTWT','DRYWT','WETWT']].head(), '\n')
print('SEDIMENT:', tfm.dfs['SEDIMENT']['PERCENTWT'].unique())
                                               BIOTA  SEAWATER  SEDIMENT
Original row count (dfs)                       16124     21634     40744
Transformed row count (tfm.dfs)                16124     21634     40744
Rows removed from original (tfm.dfs_removed)       0         0         0
Rows created in transformed (tfm.dfs_created)      0         0         0 

BIOTA:    PERCENTWT      DRYWT  WETWT
0    0.18453  174.93444  948.0
1    0.18453  174.93444  948.0
2    0.18453  174.93444  948.0
3    0.18453  174.93444  948.0
4    0.18458  177.93512  964.0 

SEDIMENT: [       nan 0.1        0.13       ... 0.24418605 0.25764192 0.26396495]

Note that the dry weight is greater than the wet weight for some entries in the BIOTA dataset due to the DW% being greater than 100%, see above. Lets take a look at the number of entries where this is the case:

tfm.dfs['BIOTA'][['DRYWT','WETWT']][tfm.dfs['BIOTA']['DRYWT'] > tfm.dfs['BIOTA']['WETWT']].count()
DRYWT    20
WETWT    20
dtype: int64

Standardize Coordinates

FEEDBACK TO DATA PROVIDER

Column names for geographical coordinates are inconsistent across sample types (biota, sediment, seawater). Sometimes using parentheses, sometimes not.

dfs = load_data(src_dir, use_cache=True)
for grp in dfs.keys():
    print(f'{grp}: {[col for col in dfs[grp].columns if "lon" in col or "lat" in col]}')
BIOTA: ['latitude ddmmmm', 'latitude dddddd', 'longitude ddmmmm', 'longitude dddddd']
SEAWATER: ['latitude (ddmmmm)', 'latitude (dddddd)', 'longitude (ddmmmm)', 'longitude (dddddd)']
SEDIMENT: ['latitude (ddmmmm)', 'latitude (dddddd)', 'longitude (ddmmmm)', 'longitude (dddddd)']
FEEDBACK TO DATA PROVIDER

HELCOM SEAWATER data includes values of 0 or nan for both latitude and longitude.

Lets create a callback to parse the coordinates of the HELCOM dataset.


source

ParseCoordinates

 ParseCoordinates (fn_convert_cor:Callable)

Get geographical coordinates from columns expressed in degrees decimal format or from columns in degrees/minutes decimal format where degrees decimal format is missing or zero.

Type Details
fn_convert_cor Callable Function that converts coordinates from degree-minute to decimal degree format
Exported source
class ParseCoordinates(Callback):
    "Get geographical coordinates from columns expressed in degrees decimal format or from columns in degrees/minutes decimal format where degrees decimal format is missing or zero."
    def __init__(self, 
                 fn_convert_cor: Callable # Function that converts coordinates from degree-minute to decimal degree format
                 ):
        self.fn_convert_cor = fn_convert_cor

    def __call__(self, tfm:Transformer):
        for df in tfm.dfs.values():
            self._format_coordinates(df)

    def _format_coordinates(self, df:pd.DataFrame) -> None:
        coord_cols = self._get_coord_columns(df.columns)
        
        
        for coord in ['lat', 'lon']:
            decimal_col, minute_col = coord_cols[f'{coord}_d'], coord_cols[f'{coord}_m']
            # Attempt to convert columns to numeric, coercing errors to NaN.
            df[decimal_col] = pd.to_numeric(df[decimal_col], errors='coerce')
            df[minute_col] = pd.to_numeric(df[minute_col], errors='coerce')
            condition = df[decimal_col].isna() | (df[decimal_col] == 0)
            df[coord.upper()] = np.where(condition,
                                 df[minute_col].apply(self._safe_convert),
                                 df[decimal_col])
        
        df.dropna(subset=['LAT', 'LON'], inplace=True)

    def _get_coord_columns(self, columns) -> dict:
        return {
            'lon_d': self._find_coord_column(columns, 'lon', 'dddddd'),
            'lat_d': self._find_coord_column(columns, 'lat', 'dddddd'),
            'lon_m': self._find_coord_column(columns, 'lon', 'ddmmmm'),
            'lat_m': self._find_coord_column(columns, 'lat', 'ddmmmm')
        }

    def _find_coord_column(self, columns, coord_type, coord_format) -> str:
        pattern = re.compile(f'{coord_type}.*{coord_format}', re.IGNORECASE)
        matching_columns = [col for col in columns if pattern.search(col)]
        return matching_columns[0] if matching_columns else None

    def _safe_convert(self, value) -> str:
        if pd.isna(value):
            return value
        try:
            return self.fn_convert_cor(value)
        except Exception as e:
            print(f"Error converting value {value}: {e}")
            return value
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[                    
                            ParseCoordinates(ddmm_to_dd),
                            CompareDfsAndTfmCB(dfs)
                            ])
tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
print(tfm.dfs['BIOTA'][['LAT','LON']])
                                               BIOTA  SEAWATER  SEDIMENT
Original row count (dfs)                       16124     21634     40744
Transformed row count (tfm.dfs)                16124     21626     40743
Rows removed from original (tfm.dfs_removed)       0         8         1
Rows created in transformed (tfm.dfs_created)      0         0         0 

             LAT        LON
0      54.283333  12.316667
1      54.283333  12.316667
2      54.283333  12.316667
3      54.283333  12.316667
4      54.283333  12.316667
...          ...        ...
16119  61.241500  21.395000
16120  61.241500  21.395000
16121  61.343333  21.385000
16122  61.343333  21.385000
16123  61.343333  21.385000

[16124 rows x 2 columns]

Lets review the rows removed from SEAWATER dataset during the parsing of coordinates:

with pd.option_context('display.max_columns', None, 'display.max_colwidth', None):
    display(tfm.dfs_removed['SEAWATER'])
key nuclide method < value_bq/m³ value_bq/m³ error%_m³ date_of_entry_x country laboratory sequence date year month day station latitude (ddmmmm) latitude (dddddd) longitude (ddmmmm) longitude (dddddd) tdepth sdepth salin ttemp filt mors_subbasin helcom_subbasin date_of_entry_y
20556 WSSSM2015009 H3 STYR201 < 2450.0 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
20557 WSSSM2015010 H3 STYR201 NaN 2510.0 29.17 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
20558 WSSSM2015011 H3 STYR201 < 2450.0 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
20559 WSSSM2015012 H3 STYR201 NaN 1740.0 41.26 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
20560 WSSSM2015013 H3 STYR201 NaN 1650.0 43.53 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
20561 WSSSM2015014 H3 STYR201 < 2277.0 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
20562 WSSSM2015015 H3 STYR201 < 2277.0 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
20563 WSSSM2015016 H3 STYR201 < 2277.0 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN

Sanitize coordinates by dropping rows where both longitude and latitude are zero or contain unrealistic values. Convert the , separator in longitude and latitude to a . separator

dfs = load_data(src_dir,  use_cache=True)
tfm = Transformer(dfs, cbs=[
                            ParseCoordinates(ddmm_to_dd),
                            SanitizeLonLatCB(),
                            CompareDfsAndTfmCB(dfs)
                            ])

tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
print(tfm.dfs['BIOTA'][['LAT','LON']])
                                               BIOTA  SEAWATER  SEDIMENT
Original row count (dfs)                       16124     21634     40744
Transformed row count (tfm.dfs)                16124     21626     40743
Rows removed from original (tfm.dfs_removed)       0         8         1
Rows created in transformed (tfm.dfs_created)      0         0         0 

             LAT        LON
0      54.283333  12.316667
1      54.283333  12.316667
2      54.283333  12.316667
3      54.283333  12.316667
4      54.283333  12.316667
...          ...        ...
16119  61.241500  21.395000
16120  61.241500  21.395000
16121  61.343333  21.385000
16122  61.343333  21.385000
16123  61.343333  21.385000

[16124 rows x 2 columns]

Review all callbacks

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
                            LowerStripNameCB(col_src='nuclide', col_dst='NUCLIDE'),
                            RemapNuclideNameCB(lut_nuclides, col_name='NUCLIDE'),
                            ParseTimeCB(),
                            EncodeTimeCB(),
                            SplitSedimentValuesCB(coi_sediment),
                            SanitizeValueCB(coi_val),       
                            NormalizeUncCB(),
                            RemapUnitCB(),
                            RemapDetectionLimitCB(coi_dl, lut_dl),                           
                            RemapCB(fn_lut=lut_biota, col_remap='SPECIES', col_src='rubin', dest_grps='BIOTA'),
                            RemapCB(fn_lut=lut_tissues, col_remap='BODY_PART', col_src='tissue', dest_grps='BIOTA'),
                            RemapCB(fn_lut=lut_biogroup_from_biota, col_remap='BIO_GROUP', col_src='SPECIES', dest_grps='BIOTA'),
                            RemapSedimentCB(fn_lut=lut_sediments, replace_lut=sed_replace_lut),
                            RemapFiltCB(lut_filtered),
                            AddSampleIDCB(),
                            AddDepthCB(),
                            AddSalinityCB(),
                            AddTemperatureCB(),
                            RemapSedSliceTopBottomCB(),
                            LookupDryWetPercentWeightCB(),
                            ParseCoordinates(ddmm_to_dd),
                            SanitizeLonLatCB(),
                            CompareDfsAndTfmCB(dfs)
                            ])

tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
Warning: 8 missing time value(s) in SEAWATER
Warning: 1 missing time value(s) in SEDIMENT
                                               BIOTA  SEAWATER  SEDIMENT
Original row count (dfs)                       16124     21634     40744
Transformed row count (tfm.dfs)                16094     21473     70449
Rows removed from original (tfm.dfs_removed)      30       161       144
Rows created in transformed (tfm.dfs_created)      0         0     29849 

Lets inspect the rows that are removed for the SEAWATER data:

grp='SEAWATER' # 'SEAWATER', 'BIOTA' or 'SEDIMENT'
print(f'{grp}, number of dropped rows: {tfm.dfs_removed[grp].shape[0]}.')
print(f'Viewing dropped rows for {grp}:')
tfm.dfs_removed[grp]
SEAWATER, number of dropped rows: 161.
Viewing dropped rows for SEAWATER:
key nuclide method < value_bq/m³ value_bq/m³ error%_m³ date_of_entry_x country laboratory sequence ... longitude (ddmmmm) longitude (dddddd) tdepth sdepth salin ttemp filt mors_subbasin helcom_subbasin date_of_entry_y
13439 WRISO2001025 CS137 RISO02 NaN NaN 10.0 NaN 26.0 RISO 2001025.0 ... 10.500 10.833333 22.0 20.0 0.00 NaN N 5.0 5.0 NaN
14017 WLEPA2002001 CS134 LEPA02 < NaN NaN NaN 93.0 LEPA 2002001.0 ... 21.030 21.050000 16.0 0.0 3.77 14.40 N 4.0 9.0 NaN
14020 WLEPA2002002 CS134 LEPA02 < NaN NaN NaN 93.0 LEPA 2002004.0 ... 20.574 20.956667 14.0 0.0 6.57 11.95 N 4.0 9.0 NaN
14023 WLEPA2002003 CS134 LEPA02 < NaN NaN NaN 93.0 LEPA 2002007.0 ... 19.236 19.393333 73.0 0.0 7.00 9.19 N 4.0 9.0 NaN
14026 WLEPA2002004 CS134 LEPA02 < NaN NaN NaN 93.0 LEPA 2002010.0 ... 20.205 20.341700 47.0 0.0 7.06 8.65 N 4.0 9.0 NaN
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
21542 WLRPC2023011 SR90 LRPC02 NaN NaN NaN 05/03/24 00:00:00 93.0 LRPC 2023011.0 ... 20.480 20.800000 45.0 1.0 7.22 19.80 N 4.0 9.0 05/03/24 00:00:00
21543 WLRPC2023012 CS137 LRPC01 NaN NaN NaN 05/03/24 00:00:00 93.0 LRPC 2023012.0 ... 20.480 20.800000 45.0 1.0 7.23 8.80 N 4.0 9.0 05/03/24 00:00:00
21544 WLRPC2023012 SR90 LRPC02 NaN NaN NaN 05/03/24 00:00:00 93.0 LRPC 2023012.0 ... 20.480 20.800000 45.0 1.0 7.23 8.80 N 4.0 9.0 05/03/24 00:00:00
21545 WLRPC2023013 CS137 LRPC01 NaN NaN NaN 05/03/24 00:00:00 93.0 LRPC 2023013.0 ... 20.427 20.711700 41.0 1.0 7.23 19.30 N 4.0 9.0 05/03/24 00:00:00
21546 WLRPC2023013 SR90 LRPC02 NaN NaN NaN 05/03/24 00:00:00 93.0 LRPC 2023013.0 ... 20.427 20.711700 41.0 1.0 7.23 19.30 N 4.0 9.0 05/03/24 00:00:00

161 rows × 27 columns

Example change logs

dfs = load_data(src_dir, use_cache=True)

tfm = Transformer(dfs, cbs=[
                            LowerStripNameCB(col_src='nuclide', col_dst='NUCLIDE'),
                            RemapNuclideNameCB(lut_nuclides, col_name='NUCLIDE'),
                            ParseTimeCB(),
                            EncodeTimeCB(),
                            SplitSedimentValuesCB(coi_sediment),
                            SanitizeValueCB(coi_val),       
                            NormalizeUncCB(),
                            RemapUnitCB(),
                            RemapDetectionLimitCB(coi_dl, lut_dl),                           
                            RemapCB(fn_lut=lut_biota, col_remap='SPECIES', col_src='rubin', dest_grps='BIOTA'),
                            RemapCB(fn_lut=lut_tissues, col_remap='BODY_PART', col_src='tissue', dest_grps='BIOTA'),
                            RemapCB(fn_lut=lut_biogroup_from_biota, col_remap='BIO_GROUP', col_src='SPECIES', dest_grps='BIOTA'),
                            RemapSedimentCB(fn_lut=lut_sediments, replace_lut=sed_replace_lut),
                            RemapFiltCB(lut_filtered),
                            AddSampleIDCB(),
                            AddDepthCB(),
                            AddSalinityCB(),
                            AddTemperatureCB(),
                            RemapSedSliceTopBottomCB(),
                            LookupDryWetPercentWeightCB(),
                            ParseCoordinates(ddmm_to_dd),
                            SanitizeLonLatCB(),
                            ])

tfm()
tfm.logs
Warning: 8 missing time value(s) in SEAWATER
Warning: 1 missing time value(s) in SEDIMENT
["Convert 'nuclide' column values to lowercase, strip spaces, and store in 'NUCLIDE' column.",
 'Remap data provider nuclide names to standardized MARIS nuclide names.',
 'Standardize time format across all dataframes.',
 'Encode time as seconds since epoch.',
 'Separate sediment entries into distinct rows for Bq/kg and Bq/m² measurements.',
 'Sanitize measurement values by removing blanks and standardizing to use the `VALUE` column.',
 'Convert from relative error to standard uncertainty.',
 'Set the `unit` id column in the DataFrames based on a lookup table.',
 'Remap value type to MARIS format.',
 "Remap values from 'rubin' to 'SPECIES' for groups: BIOTA.",
 "Remap values from 'tissue' to 'BODY_PART' for groups: BIOTA.",
 "Remap values from 'SPECIES' to 'BIO_GROUP' for groups: BIOTA.",
 'Lookup sediment id using lookup table.',
 'Lookup filt value in dataframe using the lookup table.',
 'Generate a SMP_ID from the KEY values in the HELCOM dataset. Each KEY is mapped to a unique integer, with the mapping stored in an enumeration (i.e., smp_id).',
 "Ensure depth values are floats and add 'SMP_DEPTH' and 'TOT_DEPTH' columns.",
 'Remap Sediment slice top and bottom to MARIS format.',
 'Lookup dry-wet ratio and format for MARIS.',
 'Get geographical coordinates from columns expressed in degrees decimal format or from columns in degrees/minutes decimal format where degrees decimal format is missing or zero.',
 'Drop rows with invalid longitude & latitude values. Convert `,` separator to `.` separator.']

Feed global attributes


source

get_attrs

 get_attrs (tfm:marisco.callbacks.Transformer, zotero_key:str,
            kw:list=['oceanography', 'Earth Science > Oceans > Ocean
            Chemistry> Radionuclides', 'Earth Science > Human Dimensions >
            Environmental Impacts > Nuclear Radiation Exposure', 'Earth
            Science > Oceans > Ocean Chemistry > Ocean Tracers, Earth
            Science > Oceans > Marine Sediments', 'Earth Science > Oceans
            > Ocean Chemistry, Earth Science > Oceans > Sea Ice >
            Isotopes', 'Earth Science > Oceans > Water Quality > Ocean
            Contaminants', 'Earth Science > Biological Classification >
            Animals/Vertebrates > Fish', 'Earth Science > Biosphere >
            Ecosystems > Marine Ecosystems', 'Earth Science > Biological
            Classification > Animals/Invertebrates > Mollusks', 'Earth
            Science > Biological Classification > Animals/Invertebrates >
            Arthropods > Crustaceans', 'Earth Science > Biological
            Classification > Plants > Macroalgae (Seaweeds)'])

Retrieve all global attributes.

Type Default Details
tfm Transformer Transformer object
zotero_key str Zotero dataset record key
kw list [‘oceanography’, ‘Earth Science > Oceans > Ocean Chemistry> Radionuclides’, ‘Earth Science > Human Dimensions > Environmental Impacts > Nuclear Radiation Exposure’, ‘Earth Science > Oceans > Ocean Chemistry > Ocean Tracers, Earth Science > Oceans > Marine Sediments’, ‘Earth Science > Oceans > Ocean Chemistry, Earth Science > Oceans > Sea Ice > Isotopes’, ‘Earth Science > Oceans > Water Quality > Ocean Contaminants’, ‘Earth Science > Biological Classification > Animals/Vertebrates > Fish’, ‘Earth Science > Biosphere > Ecosystems > Marine Ecosystems’, ‘Earth Science > Biological Classification > Animals/Invertebrates > Mollusks’, ‘Earth Science > Biological Classification > Animals/Invertebrates > Arthropods > Crustaceans’, ‘Earth Science > Biological Classification > Plants > Macroalgae (Seaweeds)’] List of keywords
Returns dict Global attributes
Exported source
def get_attrs(
    tfm: Transformer, # Transformer object
    zotero_key: str, # Zotero dataset record key
    kw: list = kw # List of keywords
    ) -> dict: # Global attributes
    "Retrieve all global attributes."
    return GlobAttrsFeeder(tfm.dfs, cbs=[
        BboxCB(),
        DepthRangeCB(),
        TimeRangeCB(),
        ZoteroCB(zotero_key, cfg=cfg()),
        KeyValuePairCB('keywords', ', '.join(kw)),
        KeyValuePairCB('publisher_postprocess_logs', ', '.join(tfm.logs))
        ])()
get_attrs(tfm, zotero_key=zotero_key, kw=kw)
{'geospatial_lat_min': '31.17',
 'geospatial_lat_max': '65.75',
 'geospatial_lon_min': '9.6333',
 'geospatial_lon_max': '53.5',
 'geospatial_bounds': 'POLYGON ((9.6333 53.5, 31.17 53.5, 31.17 65.75, 9.6333 65.75, 9.6333 53.5))',
 'geospatial_vertical_max': '437.0',
 'geospatial_vertical_min': '0.0',
 'time_coverage_start': '1984-01-10T00:00:00',
 'time_coverage_end': '2023-11-30T00:00:00',
 'id': '26VMZZ2Q',
 'title': 'Environmental database - Helsinki Commission Monitoring of Radioactive Substances',
 'summary': 'MORS Environment database has been used to collate data resulting from monitoring of environmental radioactivity in the Baltic Sea based on HELCOM Recommendation 26/3.\n\nThe database is structured according to HELCOM Guidelines on Monitoring of Radioactive Substances (https://www.helcom.fi/wp-content/uploads/2019/08/Guidelines-for-Monitoring-of-Radioactive-Substances.pdf), which specifies reporting format, database structure, data types and obligatory parameters used for reporting data under Recommendation 26/3.\n\nThe database is updated and quality assured annually by HELCOM MORS EG.',
 'creator_name': '[{"creatorType": "author", "name": "HELCOM MORS"}]',
 'keywords': 'oceanography, Earth Science > Oceans > Ocean Chemistry> Radionuclides, Earth Science > Human Dimensions > Environmental Impacts > Nuclear Radiation Exposure, Earth Science > Oceans > Ocean Chemistry > Ocean Tracers, Earth Science > Oceans > Marine Sediments, Earth Science > Oceans > Ocean Chemistry, Earth Science > Oceans > Sea Ice > Isotopes, Earth Science > Oceans > Water Quality > Ocean Contaminants, Earth Science > Biological Classification > Animals/Vertebrates > Fish, Earth Science > Biosphere > Ecosystems > Marine Ecosystems, Earth Science > Biological Classification > Animals/Invertebrates > Mollusks, Earth Science > Biological Classification > Animals/Invertebrates > Arthropods > Crustaceans, Earth Science > Biological Classification > Plants > Macroalgae (Seaweeds)',
 'publisher_postprocess_logs': "Convert 'nuclide' column values to lowercase, strip spaces, and store in 'NUCLIDE' column., Remap data provider nuclide names to standardized MARIS nuclide names., Standardize time format across all dataframes., Encode time as seconds since epoch., Separate sediment entries into distinct rows for Bq/kg and Bq/m² measurements., Sanitize measurement values by removing blanks and standardizing to use the `VALUE` column., Convert from relative error to standard uncertainty., Set the `unit` id column in the DataFrames based on a lookup table., Remap value type to MARIS format., Remap values from 'rubin' to 'SPECIES' for groups: BIOTA., Remap values from 'tissue' to 'BODY_PART' for groups: BIOTA., Remap values from 'SPECIES' to 'BIO_GROUP' for groups: BIOTA., Lookup sediment id using lookup table., Lookup filt value in dataframe using the lookup table., Generate a SMP_ID from the KEY values in the HELCOM dataset. Each KEY is mapped to a unique integer, with the mapping stored in an enumeration (i.e., smp_id)., Ensure depth values are floats and add 'SMP_DEPTH' and 'TOT_DEPTH' columns., Remap Sediment slice top and bottom to MARIS format., Lookup dry-wet ratio and format for MARIS., Get geographical coordinates from columns expressed in degrees decimal format or from columns in degrees/minutes decimal format where degrees decimal format is missing or zero., Drop rows with invalid longitude & latitude values. Convert `,` separator to `.` separator."}

Encoding NetCDF


source

encode

 encode (fname_out:str, **kwargs)

Encode data to NetCDF.

Type Details
fname_out str Output file name
kwargs VAR_KEYWORD
Returns None Additional arguments
Exported source
def encode(
    fname_out: str, # Output file name
    **kwargs # Additional arguments
    ) -> None:
    "Encode data to NetCDF."
    dfs = load_data(src_dir)
    tfm = Transformer(dfs, cbs=[
                            LowerStripNameCB(col_src='nuclide', col_dst='NUCLIDE'),
                            RemapNuclideNameCB(lut_nuclides, col_name='NUCLIDE'),
                            ParseTimeCB(),
                            EncodeTimeCB(),
                            SplitSedimentValuesCB(coi_sediment),
                            SanitizeValueCB(coi_val),       
                            NormalizeUncCB(),
                            RemapUnitCB(),
                            RemapDetectionLimitCB(coi_dl, lut_dl),                           
                            RemapCB(fn_lut=lut_biota, col_remap='SPECIES', col_src='rubin', dest_grps='BIOTA'),
                            RemapCB(fn_lut=lut_tissues, col_remap='BODY_PART', col_src='tissue', dest_grps='BIOTA'),
                            RemapCB(fn_lut=lut_biogroup_from_biota, col_remap='BIO_GROUP', col_src='SPECIES', dest_grps='BIOTA'),
                            RemapSedimentCB(fn_lut=lut_sediments, replace_lut=sed_replace_lut),
                            RemapFiltCB(lut_filtered),
                            AddSampleIDCB(),
                            AddDepthCB(),
                            AddSalinityCB(),
                            AddTemperatureCB(),
                            RemapSedSliceTopBottomCB(),
                            LookupDryWetPercentWeightCB(),
                            ParseCoordinates(ddmm_to_dd),
                            SanitizeLonLatCB(),
                            ])
    tfm()
    encoder = NetCDFEncoder(tfm.dfs, 
                            dest_fname=fname_out, 
                            global_attrs=get_attrs(tfm, zotero_key=zotero_key, kw=kw),
                            # custom_maps=tfm.custom_maps,
                            verbose=kwargs.get('verbose', False),
                           )
    encoder.encode()
encode(fname_out, verbose=False)
Warning: 8 missing time value(s) in SEAWATER
Warning: 1 missing time value(s) in SEDIMENT

NetCDF Review

First lets review the global attributes of the NetCDF file:

contents = ExtractNetcdfContents(fname_out_nc)
print(contents.global_attrs)
{'id': '26VMZZ2Q', 'title': 'Environmental database - Helsinki Commission Monitoring of Radioactive Substances', 'summary': 'MORS Environment database has been used to collate data resulting from monitoring of environmental radioactivity in the Baltic Sea based on HELCOM Recommendation 26/3.\n\nThe database is structured according to HELCOM Guidelines on Monitoring of Radioactive Substances (https://www.helcom.fi/wp-content/uploads/2019/08/Guidelines-for-Monitoring-of-Radioactive-Substances.pdf), which specifies reporting format, database structure, data types and obligatory parameters used for reporting data under Recommendation 26/3.\n\nThe database is updated and quality assured annually by HELCOM MORS EG.', 'keywords': 'oceanography, Earth Science > Oceans > Ocean Chemistry> Radionuclides, Earth Science > Human Dimensions > Environmental Impacts > Nuclear Radiation Exposure, Earth Science > Oceans > Ocean Chemistry > Ocean Tracers, Earth Science > Oceans > Marine Sediments, Earth Science > Oceans > Ocean Chemistry, Earth Science > Oceans > Sea Ice > Isotopes, Earth Science > Oceans > Water Quality > Ocean Contaminants, Earth Science > Biological Classification > Animals/Vertebrates > Fish, Earth Science > Biosphere > Ecosystems > Marine Ecosystems, Earth Science > Biological Classification > Animals/Invertebrates > Mollusks, Earth Science > Biological Classification > Animals/Invertebrates > Arthropods > Crustaceans, Earth Science > Biological Classification > Plants > Macroalgae (Seaweeds)', 'history': 'TBD', 'keywords_vocabulary': 'GCMD Science Keywords', 'keywords_vocabulary_url': 'https://gcmd.earthdata.nasa.gov/static/kms/', 'record': 'TBD', 'featureType': 'TBD', 'cdm_data_type': 'TBD', 'Conventions': 'CF-1.10 ACDD-1.3', 'publisher_name': 'Paul MCGINNITY, Iolanda OSVATH, Florence DESCROIX-COMANDUCCI', 'publisher_email': 'p.mc-ginnity@iaea.org, i.osvath@iaea.org, F.Descroix-Comanducci@iaea.org', 'publisher_url': 'https://maris.iaea.org', 'publisher_institution': 'International Atomic Energy Agency - IAEA', 'creator_name': '[{"creatorType": "author", "name": "HELCOM MORS"}]', 'institution': 'TBD', 'metadata_link': 'TBD', 'creator_email': 'TBD', 'creator_url': 'TBD', 'references': 'TBD', 'license': 'Without prejudice to the applicable Terms and Conditions (https://nucleus.iaea.org/Pages/Others/Disclaimer.aspx), I hereby agree that any use of the data will contain appropriate acknowledgement of the data source(s) and the IAEA Marine Radioactivity Information System (MARIS).', 'comment': 'TBD', 'geospatial_lat_min': '31.17', 'geospatial_lon_min': '9.6333', 'geospatial_lat_max': '65.75', 'geospatial_lon_max': '53.5', 'geospatial_vertical_min': '0.0', 'geospatial_vertical_max': '437.0', 'geospatial_bounds': 'POLYGON ((9.6333 53.5, 31.17 53.5, 31.17 65.75, 9.6333 65.75, 9.6333 53.5))', 'geospatial_bounds_crs': 'EPSG:4326', 'time_coverage_start': '1984-01-10T00:00:00', 'time_coverage_end': '2023-11-30T00:00:00', 'local_time_zone': 'TBD', 'date_created': 'TBD', 'date_modified': 'TBD', 'publisher_postprocess_logs': "Convert 'nuclide' column values to lowercase, strip spaces, and store in 'NUCLIDE' column., Remap data provider nuclide names to standardized MARIS nuclide names., Standardize time format across all dataframes., Encode time as seconds since epoch., Separate sediment entries into distinct rows for Bq/kg and Bq/m² measurements., Sanitize measurement values by removing blanks and standardizing to use the `VALUE` column., Convert from relative error to standard uncertainty., Set the `unit` id column in the DataFrames based on a lookup table., Remap value type to MARIS format., Remap values from 'rubin' to 'SPECIES' for groups: BIOTA., Remap values from 'tissue' to 'BODY_PART' for groups: BIOTA., Remap values from 'SPECIES' to 'BIO_GROUP' for groups: BIOTA., Lookup sediment id using lookup table., Lookup filt value in dataframe using the lookup table., Generate a SMP_ID from the KEY values in the HELCOM dataset. Each KEY is mapped to a unique integer, with the mapping stored in an enumeration (i.e., smp_id)., Ensure depth values are floats and add 'SMP_DEPTH' and 'TOT_DEPTH' columns., Remap Sediment slice top and bottom to MARIS format., Lookup dry-wet ratio and format for MARIS., Get geographical coordinates from columns expressed in degrees decimal format or from columns in degrees/minutes decimal format where degrees decimal format is missing or zero., Drop rows with invalid longitude & latitude values. Convert `,` separator to `.` separator."}

Review the publisher_postprocess_logs.

print(contents.global_attrs['publisher_postprocess_logs'])
Convert 'nuclide' column values to lowercase, strip spaces, and store in 'NUCLIDE' column., Remap data provider nuclide names to standardized MARIS nuclide names., Standardize time format across all dataframes., Encode time as seconds since epoch., Separate sediment entries into distinct rows for Bq/kg and Bq/m² measurements., Sanitize measurement values by removing blanks and standardizing to use the `VALUE` column., Convert from relative error to standard uncertainty., Set the `unit` id column in the DataFrames based on a lookup table., Remap value type to MARIS format., Remap values from 'rubin' to 'SPECIES' for groups: BIOTA., Remap values from 'tissue' to 'BODY_PART' for groups: BIOTA., Remap values from 'SPECIES' to 'BIO_GROUP' for groups: BIOTA., Lookup sediment id using lookup table., Lookup filt value in dataframe using the lookup table., Generate a SMP_ID from the KEY values in the HELCOM dataset. Each KEY is mapped to a unique integer, with the mapping stored in an enumeration (i.e., smp_id)., Ensure depth values are floats and add 'SMP_DEPTH' and 'TOT_DEPTH' columns., Remap Sediment slice top and bottom to MARIS format., Lookup dry-wet ratio and format for MARIS., Get geographical coordinates from columns expressed in degrees decimal format or from columns in degrees/minutes decimal format where degrees decimal format is missing or zero., Drop rows with invalid longitude & latitude values. Convert `,` separator to `.` separator.

Now lets review the enums of the groups in the NetCDF file:

print('Example of enum_dicts:')
print(contents.enum_dicts['BIOTA']['bio_group'])
Example of enum_dicts:
{'Not applicable': '-1', 'Not available': '0', 'Birds': '1', 'Crustaceans': '2', 'Echinoderms': '3', 'Fish': '4', 'Mammals': '5', 'Molluscs': '6', 'Others': '7', 'Plankton': '8', 'Polychaete worms': '9', 'Reptile': '10', 'Seaweeds and plants': '11', 'Cephalopods': '12', 'Gastropods': '13', 'Bivalves': '14'}

Lets review the custom maps of the NetCDF file:

print('Example of custom_maps, first 10 key-value pairs:')
dict(list(contents.custom_maps['SEAWATER']['SMP_ID'].items())[:10])
Example of custom_maps, first 10 key-value pairs:
{'WKRIL2012003': 0,
 'WKRIL2012004': 1,
 'WKRIL2012005': 2,
 'WKRIL2012006': 3,
 'WKRIL2012007': 4,
 'WKRIL2012008': 5,
 'WKRIL2012009': 6,
 'WKRIL2012010': 7,
 'WKRIL2012011': 8,
 'WKRIL2012012': 9}

Lets return the data contained in the NetCDF file:

dfs = contents.dfs

Lets review the biota data:

nc_dfs_biota=dfs['BIOTA']
with pd.option_context('display.max_columns', None):
    display(nc_dfs_biota)
LON LAT SMP_DEPTH TIME SMP_ID NUCLIDE VALUE UNIT UNC DL BIO_GROUP SPECIES BODY_PART DRYWT WETWT PERCENTWT
0 12.316667 54.283333 NaN 1348358400 0 31 0.010140 5 NaN 2 4 99 52 174.934433 948.0 0.18453
1 12.316667 54.283333 NaN 1348358400 0 4 135.300003 5 4.830210 1 4 99 52 174.934433 948.0 0.18453
2 12.316667 54.283333 NaN 1348358400 0 9 0.013980 5 NaN 2 4 99 52 174.934433 948.0 0.18453
3 12.316667 54.283333 NaN 1348358400 0 33 4.338000 5 0.150962 1 4 99 52 174.934433 948.0 0.18453
4 12.316667 54.283333 NaN 1348358400 1 31 0.009614 5 NaN 2 4 99 52 177.935120 964.0 0.18458
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
16089 21.395000 61.241501 2.0 1652140800 4789 33 13.700000 4 0.520600 1 11 96 55 NaN NaN NaN
16090 21.395000 61.241501 2.0 1652140800 4789 9 0.500000 4 0.045500 1 11 96 55 NaN NaN NaN
16091 21.385000 61.343334 NaN 1663200000 4790 4 50.700001 4 4.106700 1 14 129 1 NaN NaN NaN
16092 21.385000 61.343334 NaN 1663200000 4790 33 0.880000 4 0.140800 1 14 129 1 NaN NaN NaN
16093 21.385000 61.343334 NaN 1663200000 4790 12 6.600000 4 0.349800 1 14 129 1 NaN NaN NaN

16094 rows × 16 columns

Lets review the sediment data:

nc_dfs_sediment = dfs['SEDIMENT']
with pd.option_context('display.max_columns', None):
    display(nc_dfs_sediment)
LON LAT TOT_DEPTH TIME SMP_ID NUCLIDE VALUE UNIT UNC DL SED_TYPE TOP BOTTOM PERCENTWT
0 27.799999 60.466667 25.0 1337904000 0 33 1200.000000 3 240.000000 1 0 15.0 20.0 NaN
1 27.799999 60.466667 25.0 1337904000 1 33 250.000000 3 50.000000 1 0 20.0 25.0 NaN
2 27.799999 60.466667 25.0 1337904000 2 33 140.000000 3 29.400000 1 0 25.0 30.0 NaN
3 27.799999 60.466667 25.0 1337904000 3 33 79.000000 3 15.800000 1 0 30.0 35.0 NaN
4 27.799999 60.466667 25.0 1337904000 4 33 29.000000 3 6.960000 1 0 35.0 40.0 NaN
... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
70444 15.537800 54.617832 62.0 1654646400 14121 67 0.044000 2 0.015312 1 10 15.0 17.0 0.257642
70445 15.537800 54.617832 62.0 1654646400 14121 77 2.500000 2 0.185000 1 10 15.0 17.0 0.257642
70446 15.537800 54.617832 62.0 1654646400 14122 4 5873.000000 2 164.444000 1 10 17.0 19.0 0.263965
70447 15.537800 54.617832 62.0 1654646400 14122 33 21.200001 2 2.162400 1 10 17.0 19.0 0.263965
70448 15.537800 54.617832 62.0 1654646400 14122 77 0.370000 2 0.048100 1 10 17.0 19.0 0.263965

70449 rows × 14 columns

Lets review the seawater data:

nc_dfs_seawater = dfs['SEAWATER']
with pd.option_context('display.max_columns', None):
    display(nc_dfs_seawater)
LON LAT SMP_DEPTH TOT_DEPTH TIME SMP_ID NUCLIDE VALUE UNIT UNC DL FILT
0 29.333300 60.083302 0.0 NaN 1337731200 0 33 5.300000 1 1.696000 1 0
1 29.333300 60.083302 29.0 NaN 1337731200 1 33 19.900000 1 3.980000 1 0
2 23.150000 59.433300 0.0 NaN 1339891200 2 33 25.500000 1 5.100000 1 0
3 27.983299 60.250000 0.0 NaN 1337817600 3 33 17.000000 1 4.930000 1 0
4 27.983299 60.250000 39.0 NaN 1337817600 4 33 22.200001 1 3.996000 1 0
... ... ... ... ... ... ... ... ... ... ... ... ...
21468 13.499833 54.600334 0.0 47.0 1686441600 9724 1 702.838074 1 51.276207 1 0
21469 13.499833 54.600334 45.0 47.0 1686441600 9725 1 725.855713 1 52.686260 1 0
21470 14.200833 54.600334 0.0 11.0 1686614400 9731 1 648.992920 1 48.154419 1 0
21471 14.665500 54.600334 0.0 20.0 1686614400 9732 1 627.178406 1 46.245316 1 0
21472 14.330000 54.600334 0.0 17.0 1686614400 9734 1 605.715088 1 45.691143 1 0

21473 rows × 12 columns

Data Format Conversion

The MARIS data processing workflow involves two key steps:

  1. NetCDF to Standardized CSV Compatible with OpenRefine Pipeline
    • Convert standardized NetCDF files to CSV formats compatible with OpenRefine using the NetCDFDecoder.
    • Preserve data integrity and variable relationships.
    • Maintain standardized nomenclature and units.
  2. Database Integration
    • Process the converted CSV files using OpenRefine.
    • Apply data cleaning and standardization rules.
    • Export validated data to the MARIS master database.

This section focuses on the first step: converting NetCDF files to a format suitable for OpenRefine processing using the NetCDFDecoder class.

decode(fname_in=fname_out_nc, verbose=True)
Saved BIOTA to ../../_data/output/100-HELCOM-MORS-2024_BIOTA.csv
Saved SEAWATER to ../../_data/output/100-HELCOM-MORS-2024_SEAWATER.csv
Saved SEDIMENT to ../../_data/output/100-HELCOM-MORS-2024_SEDIMENT.csv