This data pipeline, known as a “handler” in Marisco terminology, is designed to clean, standardize, and encode HELCOM data into NetCDF format. The handler processes raw HELCOM data, applying various transformations and lookups to align it with MARIS data standards.
Key functions of this handler:
Cleans and normalizes raw HELCOM data
Applies standardized nomenclature and units
Encodes the processed data into NetCDF format compatible with MARIS requirements
This handler is a crucial component in the Marisco data processing workflow, ensuring HELCOM data is properly integrated into the MARIS database.
Note: Additionally, an optional encoder (pipeline) is provided below to process data into a .csv format compatible with the MARIS master database. This feature is maintained for legacy purposes, as data ingestion was previously performed using OpenRefine.
The present notebook pretends to be an instance of Literate Programming in the sense that it is a narrative that includes code snippets that are interspersed with explanations. When a function or a class needs to be exported in a dedicated python module (in our case marisco/handlers/helcom.py) the code snippet is added to the module using #| exports as provided by the wonderful nbdev library.
Configuration & file paths
fname_in: path to the folder containing the HELCOM data in CSV format. The path can be defined as a relative path.
fname_out_nc: path and filename for the NetCDF output.The path can be defined as a relative path.
fname_out_csv: path and filename for the Open Refine csv output.The path can be defined as a relative path.
Zotero key: used to retrieve attributes related to the dataset from Zotero. The MARIS datasets include a library available on Zotero.
ref_id: refers to the location in Archive of the Zotero library.
Exported source
fname_in ='../../_data/accdb/mors/csv'fname_out_nc ='../../_data/output/100-HELCOM-MORS-2024.nc'fname_out_csv ='../../_data/output/100-HELCOM-MORS-2024.csv'zotero_key ='26VMZZ2Q'# HELCOM MORS zotero keyref_id =100# HELCOM MORS reference id as defined by MARIS
def load_data(src_dir: str|Path, # The directory where the source CSV files are located smp_types: list=default_smp_types # A list of tuples, each containing the file prefix and the corresponding sample type name ) -> Dict[str, pd.DataFrame]: # A dictionary with sample types as keys and their corresponding dataframes as values"Load HELCOM data and return the data in a dictionary of dataframes with the dictionary key as the sample type." src_path = Path(src_dir)def load_and_merge(file_prefix: str) -> pd.DataFrame:try: df_meas = pd.read_csv(src_path /f'{file_prefix}02.csv') df_smp = pd.read_csv(src_path /f'{file_prefix}01.csv')return pd.merge(df_meas, df_smp, on='KEY', how='left')exceptFileNotFoundErroras e:print(f"Error loading files for {file_prefix}: {e}")return pd.DataFrame() # Return an empty DataFrame if files are not foundreturn {smp_type: load_and_merge(file_prefix) for file_prefix, smp_type in smp_types}
dfs is a dictionary of dataframes created from the Helcom dataset located at the path fname_in. The data to be included in each dataframe is sorted by sample type. Each dictionary is defined with a key equal to the sample type.
The sample type (seawater, biota, sediment, …) as defined in the configs.ipynb are encoded group names in NetCDF produced. Addition of sample type ids into individual dataframes is done using the AddSampleTypeIdColumnCB callback for legacy purposes (i.e. Open Refine output).
KEY samptype_id
0 WKRIL2012003 1
1 WKRIL2012004 1
2 WKRIL2012005 1
3 WKRIL2012006 1
4 WKRIL2012007 1
seawater sediment biota
Number of rows in dfs 21216 39817 15827
Number of rows in tfm.dfs 21216 39817 15827
Number of dropped rows 0 0 0
Number of rows in tfm.dfs + Number of dropped rows 21216 39817 15827
Normalize nuclide names
Lower & strip nuclide names
Tip
FEEDBACK TO DATA PROVIDER: Some nuclide names contain one or multiple trailing spaces.
This is demonstrated below for the NUCLIDE column:
To fix this issue, we use the LowerStripNameCB callback. For each dataframe in the dictionary of dataframes, it corrects the nuclide name by converting it lowercase, striping any leading or trailing whitespace(s) and ensuring the number comes before letters (e.g. 137cs).
We below map nuclide names used by HELCOM to the MARIS standard nuclide names.
Remapping data provider nomenclatures into MARIS standards is one recurrent operation and is done in a semi-automated manner according to the following pattern:
Inspect data provider nomenclature:
Match automatically against MARIS nomenclature (using a fuzzy matching algorithm);
Fix potential mismatches;
Apply the lookup table to the dataframe.
As now on, we will use this pattern to remap the HELCOM data provider nomenclatures into MARIS standards and name it for the sake of brevity IMFA (Inspect, Match, Fix, Apply).
The unique values of the data provider nuclide names. The get_unique_across_dfs is a utility function allowing to retrieve unique values of a specific column across all dataframes (please remind that we have one dataframe per sample type - biota, …).
Test passes! We can now create a callback RemapNuclideNameCB to remap the nuclide names. Note that we pass overwrite=False to the Remapper constructor to now use the cached version.
Remap data provider nuclide names to MARIS nuclide names.
Type
Details
fn_lut
Callable
Function that returns the lookup table dictionary
Exported source
# Create a lookup table for nuclide nameslut_nuclides =lambda df: Remapper(provider_lut_df=df, maris_lut_fn=nuc_lut_path, maris_col_id='nuclide_id', maris_col_name='nc_name', provider_col_to_match='value', provider_col_key='value', fname_cache='nuclides_helcom.pkl').generate_lookup_table(fixes=fixes_nuclide_names, as_df=False, overwrite=False)
Exported source
class RemapNuclideNameCB(Callback):"Remap data provider nuclide names to MARIS nuclide names."def__init__(self, fn_lut: Callable # Function that returns the lookup table dictionary ): fc.store_attr()def__call__(self, tfm: Transformer): df_uniques = get_unique_across_dfs(tfm.dfs, col_name='NUCLIDE', as_df=True) lut = {k: v.matched_maris_name for k, v inself.fn_lut(df_uniques).items()} for k in tfm.dfs.keys(): tfm.dfs[k]['NUCLIDE'] = tfm.dfs[k]['NUCLIDE'].replace(lut)
Let’s see it in action, along with the RemapRdnNameCB callback:
FEEDBACK TO DATA PROVIDER: Time/date is provide in the DATE, YEAR , MONTH, DAY columns. Note that the DATE contains missing values as indicated below. When missing, we fallback on the YEAR, MONTH, DAY columns. Note also that sometimes DAY and MONTH contain 0. In this case we systematically set them to 1.
dfs = load_data(fname_in)for key in dfs.keys():print(f'{key} DATE null values: ', dfs[key]['DATE'].isna().sum())
seawater DATE null values: 502
sediment DATE null values: 741
biota DATE null values: 72
Parse and standardize time information in the dataframe.
Exported source
class ParseTimeCB(Callback):"Parse and standardize time information in the dataframe."def__call__(self, tfm: Transformer):for df in tfm.dfs.values():self._process_dates(df)self._define_beg_period(df)def _process_dates(self, df: pd.DataFrame) ->None:"Process and correct date and time information in the DataFrame." df['time'] =self._parse_date(df)self._handle_missing_dates(df)self._fill_missing_time(df)def _parse_date(self, df: pd.DataFrame) -> pd.Series:"Parse the DATE column if present."return pd.to_datetime(df['DATE'], format='%m/%d/%y %H:%M:%S', errors='coerce')def _handle_missing_dates(self, df: pd.DataFrame):"Handle cases where DAY or MONTH is 0 or missing." df.loc[df["DAY"] ==0, "DAY"] =1 df.loc[df["MONTH"] ==0, "MONTH"] =1 missing_day_month = (df["DAY"].isna()) & (df["MONTH"].isna()) & (df["YEAR"].notna()) df.loc[missing_day_month, ["DAY", "MONTH"]] =1def _fill_missing_time(self, df: pd.DataFrame) ->None:"Fill missing time values using YEAR, MONTH, and DAY columns." missing_time = df['time'].isna() df.loc[missing_time, 'time'] = pd.to_datetime( df.loc[missing_time, ['YEAR', 'MONTH', 'DAY']], format='%Y%m%d', errors='coerce' )def _define_beg_period(self, df: pd.DataFrame) ->None:"Create a standardized date representation for Open Refine." df['begperiod'] = df['time']
Apply the transformer for callbacks ParseTimeCB. Then, print the begperiod and time data for seawater.
seawater sediment biota
Number of rows in dfs 21216 39817 15827
Number of rows in tfm.dfs 21216 39817 15827
Number of dropped rows 0 0 0
Number of rows in tfm.dfs + Number of dropped rows 21216 39817 15827
begperiod time
0 2012-05-23 2012-05-23
1 2012-05-23 2012-05-23
2 2012-06-17 2012-06-17
3 2012-05-24 2012-05-24
4 2012-05-24 2012-05-24
... ... ...
21211 2021-10-15 2021-10-15
21212 2021-11-04 2021-11-04
21213 2021-10-15 2021-10-15
21214 2021-05-17 2021-05-17
21215 2021-05-13 2021-05-13
[21216 rows x 2 columns]
NetCDF time format requires the time to be encoded as number of milliseconds since a time of origin. In our case the time of origin is 1970-01-01 as indicated in configs.ipynbCONFIFS['units']['time'] dictionary.
EncodeTimeCB converts the HELCOM time format to the MARIS NetCDF time format.
8 of 21216 entries for `time` are invalid for seawater.
1 of 39817 entries for `time` are invalid for sediment.
seawater sediment biota
Number of rows in dfs 21216 39817 15827
Number of rows in tfm.dfs 21208 39816 15827
Number of dropped rows 8 1 0
Number of rows in tfm.dfs + Number of dropped rows 21216 39817 15827
tfm.dfs['seawater']
KEY
NUCLIDE
METHOD
< VALUE_Bq/m³
VALUE_Bq/m³
ERROR%_m³
DATE_OF_ENTRY_x
COUNTRY
LABORATORY
SEQUENCE
...
TDEPTH
SDEPTH
SALIN
TTEMP
FILT
MORS_SUBBASIN
HELCOM_SUBBASIN
DATE_OF_ENTRY_y
time
begperiod
0
WKRIL2012003
CS137
NaN
NaN
5.3
32.000000
08/20/14 00:00:00
90.0
KRIL
2012003.0
...
NaN
0.0
NaN
NaN
NaN
11.0
11.0
08/20/14 00:00:00
1337731200
2012-05-23
1
WKRIL2012004
CS137
NaN
NaN
19.9
20.000000
08/20/14 00:00:00
90.0
KRIL
2012004.0
...
NaN
29.0
NaN
NaN
NaN
11.0
11.0
08/20/14 00:00:00
1337731200
2012-05-23
2
WKRIL2012005
CS137
NaN
NaN
25.5
20.000000
08/20/14 00:00:00
90.0
KRIL
2012005.0
...
NaN
0.0
NaN
NaN
NaN
11.0
3.0
08/20/14 00:00:00
1339891200
2012-06-17
3
WKRIL2012006
CS137
NaN
NaN
17.0
29.000000
08/20/14 00:00:00
90.0
KRIL
2012006.0
...
NaN
0.0
NaN
NaN
NaN
11.0
11.0
08/20/14 00:00:00
1337817600
2012-05-24
4
WKRIL2012007
CS137
NaN
NaN
22.2
18.000000
08/20/14 00:00:00
90.0
KRIL
2012007.0
...
NaN
39.0
NaN
NaN
NaN
11.0
11.0
08/20/14 00:00:00
1337817600
2012-05-24
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
...
21211
WSSSM2021005
H3
SSM45
NaN
1030.0
93.203883
09/06/22 00:00:00
77.0
SSSM
202105.0
...
NaN
1.0
NaN
NaN
N
1.0
8.0
09/06/22 00:00:00
1634256000
2021-10-15
21212
WSSSM2021006
H3
SSM45
NaN
2240.0
43.303571
09/06/22 00:00:00
77.0
SSSM
202106.0
...
NaN
1.0
NaN
NaN
N
10.0
10.0
09/06/22 00:00:00
1635984000
2021-11-04
21213
WSSSM2021007
H3
SSM45
NaN
2060.0
47.087379
09/06/22 00:00:00
77.0
SSSM
202107.0
...
NaN
1.0
NaN
NaN
N
12.0
12.0
09/06/22 00:00:00
1634256000
2021-10-15
21214
WSSSM2021008
H3
SSM45
NaN
2300.0
43.478261
09/06/22 00:00:00
77.0
SSSM
202108.0
...
NaN
1.0
NaN
NaN
N
12.0
12.0
09/06/22 00:00:00
1621209600
2021-05-17
21215
WSSSM2021004
H3
SSM45
<
NaN
NaN
09/06/22 00:00:00
77.0
SSSM
202104.0
...
NaN
1.0
NaN
NaN
N
15.0
18.0
09/06/22 00:00:00
1620864000
2021-05-13
21208 rows × 29 columns
Sanitize value
We allocate each column containing measurement values (named differently across sample types as unit are mentioned as well in column names) into a single column value and remove NA where needed.
seawater sediment biota
Number of rows in dfs 21216 39817 15827
Number of rows in tfm.dfs 21122 39532 15798
Number of dropped rows 94 285 29
Number of rows in tfm.dfs + Number of dropped rows 21216 39817 15827
Normalize uncertainty
Function unc_rel2stan converts uncertainty from relative uncertainty to standard uncertainty.
Convert relative uncertainty to absolute uncertainty.
Type
Details
df
DataFrame
DataFrame containing measurement and uncertainty columns
meas_col
str
Name of the column with measurement values
unc_col
str
Name of the column with relative uncertainty values (percentages)
Returns
Series
Series with calculated absolute uncertainties
Exported source
def unc_rel2stan( df: pd.DataFrame, # DataFrame containing measurement and uncertainty columns meas_col: str, # Name of the column with measurement values unc_col: str# Name of the column with relative uncertainty values (percentages)) -> pd.Series: # Series with calculated absolute uncertainties"Convert relative uncertainty to absolute uncertainty."return df.apply(lambda row: row[unc_col] * row[meas_col] /100, axis=1)
For each sample type in the Helcom dataset, the uncertainty is given as a relative uncertainty. The column names for both the value and the uncertainty vary by sample type. The coi_units_unc dictionary defines the column names for the Value and Uncertainty for each sample type.
class NormalizeUncCB(Callback):"Convert from relative error % to uncertainty of activity unit."def__init__(self, fn_convert_unc: Callable=unc_rel2stan, # Function converting relative uncertainty to absolute uncertainty coi: List[Tuple[str, str, str]]=coi_units_unc # List of columns of interest ): fc.store_attr()def__call__(self, tfm: Transformer):for grp, val, unc inself.coi:if grp in tfm.dfs: df = tfm.dfs[grp] df['uncertainty'] =self.fn_convert_unc(df, val, unc)
Apply the transformer for callback NormalizeUncCB(). Then, print the value (i.e. activity per unit ) and standard uncertainty for each sample type.
Visual inspection of the remaining unperfectly matched entries seem acceptable to proceed.
We can now use the generic RemapCB callback to perform the remapping of the RUBIN column to the species column after having defined the lookup table lut_biota.
Visual inspection of the remaining unperfectly matched entries seem acceptable to proceed.
We can now use the generic RemapCB callback to perform the remapping of the TISSUE column to the body_part column after having defined the lookup table lut_tissues.
Currently, the details (Taxonname, TaxonRepName, Taxonrank) are used for importing into the MARIS master database, but they are not included in the NetCDF encoding.
We first need to retrieve the taxon information from the dbo_species.xlsx file.
Retrieve a lookup table for Taxonname from a MARIS lookup table.
Type
Details
maris_lut
str
Path to the MARIS lookup table (Excel file)
Returns
dict
A dictionary mapping species_id to biogroup_id
Exported source
# TODO: Include Commonname field after next MARIS data reconciling process.def get_taxon_info_lut( maris_lut:str# Path to the MARIS lookup table (Excel file)) ->dict: # A dictionary mapping species_id to biogroup_id"Retrieve a lookup table for Taxonname from a MARIS lookup table." species = pd.read_excel(maris_lut)return species[['species_id', 'Taxonname', 'Taxonrank','TaxonDB','TaxonDBID','TaxonDBURL']].set_index('species_id').to_dict()lut_taxon =lambda: get_taxon_info_lut(species_lut_path())
FEEDBACK TO DATA PROVIDER: The SEDI values 56 and 73 are not found in the SEDIMENT_TYPE.csv lookup table provided. Note also there are many nan values in the SEDIMENT_TYPE.csv file.
We reassign them to -99 for now but should be clarified/fixed. This is demonstrated below.
Update sediment id based on MARIS species LUT (dbo_sedtype.xlsx).
Type
Details
fn_lut
Callable
Function that returns the lookup table dictionary
Exported source
class RemapSedimentCB(Callback):"Update sediment id based on MARIS species LUT (dbo_sedtype.xlsx)."def__init__(self, fn_lut: Callable, # Function that returns the lookup table dictionary ): fc.store_attr()def _fix_inconsistent_sedi(self, df:pd.DataFrame) -> pd.DataFrame:"Temporary fix for inconsistent SEDI values. Data provider to confirm and clarify." df['SEDI'] = df['SEDI'].replace({56: -99, 73: -99, np.nan: -99})return dfdef__call__(self, tfm: Transformer):"Remap sediment types in the DataFrame using the lookup table and handle specific replacements." lut =self.fn_lut()# Set SedRepName (TBC: what's used for?) tfm.dfs['sediment']['SedRepName'] = tfm.dfs['sediment']['SEDI'] tfm.dfs['sediment'] =self._fix_inconsistent_sedi(tfm.dfs['sediment']) tfm.dfs['sediment']['sed_type'] = tfm.dfs['sediment']['SEDI'].apply(lambda x: self._get_sediment_type(x, lut))def _get_sediment_type(self, sedi_value: int, # The `SEDI` value from the DataFrame lut: dict# The lookup table dictionary ) -> Match: # The Match object"Get the matched_id from the lookup table and print SEDI if the matched_id is -1." match = lut.get(sedi_value, Match(-1, None, None, None))if match.matched_id ==-1:self._print_unmatched_sedi(sedi_value)return match.matched_iddef _print_unmatched_sedi(self, sedi_value: int# The `SEDI` value from the DataFram ) ->None:"Print the SEDI value if the matched_id is -1."print(f"Unmatched SEDI: {sedi_value}")
FEEDBACK TO DATA PROVIDER: The handling of unit types varies between biota and sediment sample types. For consistency and ease of use, it would be beneficial to have dedicated unit columns for all sample types.
For seawater and sediment sample types, the HELCOM dataset refers to units direcly in the name of certain columns, such as VALUE_Bq/m³ or VALUE_Bq/kg. As for biota, the units are included in the BASIS column. This is shown below:
dfs = load_data(fname_in)for grp in ['biota', 'sediment', 'seawater']:print(f"{grp}: {dfs[grp].columns}")dfs['biota']['BASIS'].unique()
Given the inconsistent handling of units across sample types, we need to define custom mapping rules for standardizing the units. Below the MARIS unit types:
Dictionary containing renaming rules for different unit categories
Exported source
lut_units = {'seawater': 1, # 'Bq/m3''sediment': 4, # 'Bq/kgd' for sediment'biota': {'D': 4, # 'Bq/kgd''W': 5, # 'Bq/kgw''F': 5# 'Bq/kgw' (assumed to be 'Fresh', so set to wet) }}
Exported source
class RemapUnitCB(Callback):"Set the `unit` id column in the DataFrames based on a lookup table."def__init__(self, lut_units: dict=lut_units # Dictionary containing renaming rules for different unit categories ): fc.store_attr()def__call__(self, tfm: Transformer):for grp in tfm.dfs.keys():if grp in ['seawater', 'sediment']: tfm.dfs[grp]['unit'] =self.lut_units[grp]else: tfm.dfs[grp]['unit'] = tfm.dfs[grp]['BASIS'].apply(lambda x: lut_units[grp].get(x, 0))
Apply the transformer for callback RemapUnitCB(). Then, print the unique unit for the seawater dataframe.
We follow the following business logic to encode the detection limit:
RemapDetectionLimitCB creates a detection_limit column with values determined as follows: 1. Perform a lookup with the appropriate columns value type (or detection limit) columns (< VALUE_Bq/m³ or < VALUE_Bq/kg) against the table returned from the function get_detectionlimit_lut. 2. If < VALUE_Bq/m³ or < VALUE_Bq/kg is NaN but both activity values (VALUE_Bq/m³ or VALUE_Bq/kg) and standard uncertainty (ERROR%_m³, ERROR%, or ERROR%_kg) are provided, then assign the ID of 1 (i.e. “Detected value”). 3. For other NaN values in the detection_limit column, set them to 0 (i.e. Not Available).
# TO BE REFACTOREDclass RemapDetectionLimitCB(Callback):"Remap value type to MARIS format."def__init__(self, coi: dict, # Configuration options for column names fn_lut: Callable # Function that returns a lookup table ): fc.store_attr()def__call__(self, tfm: Transformer):"Remap detection limits in the DataFrames using the lookup table." lut =self.fn_lut()for grp in tfm.dfs: df = tfm.dfs[grp]self._update_detection_limit(df, grp, lut)def _update_detection_limit(self, df: pd.DataFrame, # The DataFrame to modify grp: str, # The group name to get the column configuration lut: dict# The lookup table dictionary ) ->None:"Update detection limit column in the DataFrame based on lookup table and rules." detection_col =self.coi[grp]['dl'] value_col =self.coi[grp]['val'] uncertainty_col =self.coi[grp]['unc']# Copy detection limit column df['detection_limit'] = df[detection_col]# Fill values with '=' or 'Not Available' condition = ((df[value_col].notna()) & (df[uncertainty_col].notna()) & (~df['detection_limit'].isin(lut.keys()))) df.loc[condition, 'detection_limit'] ='=' df.loc[~df['detection_limit'].isin(lut.keys()), 'detection_limit'] ='Not Available'# Perform lookup df['detection_limit'] = df['detection_limit'].map(lut)
Lookup FILT value in dataframe using the lookup table.
Type
Default
Details
lut_filtered
dict
{‘N’: 2, ‘n’: 2, ‘F’: 1}
Dictionary mapping FILT codes to their corresponding names
Exported source
class RemapFiltCB(Callback):"Lookup FILT value in dataframe using the lookup table."def__init__(self, lut_filtered: dict=lut_filtered, # Dictionary mapping FILT codes to their corresponding names ): fc.store_attr()def__call__(self, tfm):for df in tfm.dfs.values():if'FILT'in df.columns: df['FILT'] = df['FILT'].map(lambda x: self.lut_filtered.get(x, 0))
Remap KEY column to samplabcode in each DataFrame.
Exported source
class AddSampleLabCodeCB(Callback):"Remap `KEY` column to `samplabcode` in each DataFrame."def__call__(self, tfm: Transformer):for grp in tfm.dfs:self._remap_sample_id(tfm.dfs[grp])def _remap_sample_id(self, df: pd.DataFrame): df['samplabcode'] = df['KEY']
['WKRIL2012003' 'WKRIL2012004' 'WKRIL2012005' ... 'WSSSM2021006'
'WSSSM2021007' 'WSSSM2021008']
seawater sediment biota
Number of rows in dfs 21216 39817 15827
Number of rows in tfm.dfs 21216 39817 15827
Number of dropped rows 0 0 0
Number of rows in tfm.dfs + Number of dropped rows 21216 39817 15827
Add measurement note
The HELCOM dataset includes a look-up table ANALYSIS_METHOD.csv capturing the measurement method used as described by HELCOM. For instance:
class AddMeasurementNoteCB(Callback):"Record measurement notes by adding a 'measurenote' column to DataFrames."def__init__(self, fn_lut: Callable # Function that returns the lookup dictionary with `METHOD` as key and `DESCRIPTION` as value ): fc.store_attr()def__call__(self, tfm: Transformer): lut =self.fn_lut()for df in tfm.dfs.values():if'METHOD'in df.columns: df['measurementnote'] = df['METHOD'].map(lambda x: lut.get(x, 0))
[0
'Radiochemical method Radiocaesium separation from seawater samples.134+137Cs was adsorbed on AMP mat, dissolved with NaOH and after purification precipitated as chloroplatinate (Cs2PtCl6).Counting with low background anticoincidence beta counter.'
'Radiochem. meth of Sr90. Precipation with oxalate and separation of calcium, barium, radium and ytrium couting with low background anticoincidence beta counter. 1982-1994'
'For tritium liquid scintialtion counting, combined with electrolytic enrichment of analysed water samples, double distilled, before and after electrolysis in cells. Liquid Scintillation spectrometer LKB Wallac model 1410'
'Pretreatment drying (sediment, biota samples) and ashing (biota samples)or vaporization to 1000 ml (sea water samples), measured by gamma-spectrometry using HPGe detectors sediment, biota, sea water /Cs-137, Cs-134, K-40']
seawater sediment biota
Number of rows in dfs 21216 39817 15827
Number of rows in tfm.dfs 21216 39817 15827
Number of dropped rows 0 0 0
Number of rows in tfm.dfs + Number of dropped rows 21216 39817 15827
Add station
For MARIS master DB import only (not included in the NetCDF output).
class RemapStationIdCB(Callback):"Remap Station ID to MARIS format."def__init__(self): fc.store_attr()def__call__(self, tfm: Transformer):"Iterate through all DataFrames in the transformer object and remap `STATION` to `station_id`."for grp in tfm.dfs.keys(): tfm.dfs[grp]['station'] = tfm.dfs[grp]['STATION']
seawater sediment biota
Number of rows in dfs 21216 39817 15827
Number of rows in tfm.dfs 21216 39817 15827
Number of dropped rows 0 0 0
Number of rows in tfm.dfs + Number of dropped rows 21216 39817 15827
Add slice position (top and bottom)
For MARIS master DB import only (not included in the NetCDF output).
Remap Sediment slice top and bottom to MARIS format.
Exported source
class RemapSedSliceTopBottomCB(Callback):"Remap Sediment slice top and bottom to MARIS format."def__call__(self, tfm: Transformer):"Iterate through all DataFrames in the transformer object and remap sediment slice top and bottom." tfm.dfs['sediment']['top'] = tfm.dfs['sediment']['UPPSLI'] tfm.dfs['sediment']['bottom'] = tfm.dfs['sediment']['LOWSLI']
class LookupDryWetRatio(Callback):"Lookup dry-wet ratio and format for MARIS."def__call__(self, tfm: Transformer):"Iterate through all DataFrames in the transformer object and apply the dry-wet ratio lookup."for grp in tfm.dfs.keys():if'DW%'in tfm.dfs[grp].columns:self._apply_dry_wet_ratio(tfm.dfs[grp])def _apply_dry_wet_ratio(self, df: pd.DataFrame) ->None:"Apply dry-wet ratio conversion and formatting to the given DataFrame." df['dry_wet_ratio'] = df['DW%']# Convert 'DW%' = 0% to NaN. df.loc[df['dry_wet_ratio'] ==0, 'dry_wet_ratio'] = np.NaN
seawater sediment biota
Number of rows in dfs 21216 39817 15827
Number of rows in tfm.dfs 21216 39817 15827
Number of dropped rows 0 0 0
Number of rows in tfm.dfs + Number of dropped rows 21216 39817 15827
0 18.453
1 18.453
2 18.453
3 18.453
4 18.458
Name: dry_wet_ratio, dtype: float64
Standardize Coordinates
Tip
FEEDBACK TO DATA PROVIDER: Column names for geographical coordinates are inconsistent across sample types (biota, sediment, seawater). Sometimes using parentheses, sometimes not.
dfs = load_data(fname_in)for grp in dfs.keys():print(f'{grp}: {[col for col in dfs[grp].columns if"LON"in col or"LAT"in col]}')
Geographical coordinates are provided in both decimal degree and degree-minute formats. Some coordinates are missing the decimal format and obliged us to use the degree-minute format with less precision.
Also note that latitude values have , as decimal separator while longitude values have . as decimal separator (see below)
Get geographical coordinates from columns expressed in degrees decimal format or from columns in degrees/minutes decimal format where degrees decimal format is missing.
Type
Details
fn_convert_cor
Callable
Function that converts coordinates from degree-minute to decimal degree format
Exported source
class ParseCoordinates(Callback):""" Get geographical coordinates from columns expressed in degrees decimal format or from columns in degrees/minutes decimal format where degrees decimal format is missing. """def__init__(self, fn_convert_cor: Callable # Function that converts coordinates from degree-minute to decimal degree format ):self.fn_convert_cor = fn_convert_cordef__call__(self, tfm:Transformer):for df in tfm.dfs.values():self._format_coordinates(df)def _format_coordinates(self, df:pd.DataFrame) ->None: coord_cols =self._get_coord_columns(df.columns)for coord in ['lat', 'lon']: decimal_col, minute_col = coord_cols[f'{coord}_d'], coord_cols[f'{coord}_m'] condition = df[decimal_col].isna() | (df[decimal_col] ==0) df[coord] = np.where(condition, df[minute_col].apply(self._safe_convert), df[decimal_col]) df.dropna(subset=['lat', 'lon'], inplace=True)def _get_coord_columns(self, columns) ->dict:return {'lon_d': self._find_coord_column(columns, 'LON', 'dddddd'),'lat_d': self._find_coord_column(columns, 'LAT', 'dddddd'),'lon_m': self._find_coord_column(columns, 'LON', 'ddmmmm'),'lat_m': self._find_coord_column(columns, 'LAT', 'ddmmmm') }def _find_coord_column(self, columns, coord_type, coord_format) ->str: pattern = re.compile(f'{coord_type}.*{coord_format}', re.IGNORECASE) matching_columns = [col for col in columns if pattern.search(col)]return matching_columns[0] if matching_columns elseNonedef _safe_convert(self, value) ->str:if pd.isna(value):return valuetry:returnself.fn_convert_cor(value)exceptExceptionas e:print(f"Error converting value {value}: {e}")return value
seawater sediment biota
Number of rows in dfs 21216 39817 15827
Number of rows in tfm.dfs 21208 39816 15827
Number of dropped rows 8 1 0
Number of rows in tfm.dfs + Number of dropped rows 21216 39817 15827
lat lon
0 54.283333 12.316667
1 54.283333 12.316667
2 54.283333 12.316667
3 54.283333 12.316667
4 54.283333 12.316667
... ... ...
15822 60.373333 18.395667
15823 60.373333 18.395667
15824 60.503333 18.366667
15825 60.503333 18.366667
15826 60.503333 18.366667
[15827 rows x 2 columns]
Tip
FEEDBACK TO DATA PROVIDER: Some samples have (lon, lat): (0, 0) or are outside lon/lat possible values.
Sanitize coordinates drops a row when both longitude & latitude equal 0 or data contains unrealistic longitude & latitude values. Converts longitude & latitude , separator to . separator.”
seawater sediment biota
Number of rows in dfs 21216 39817 15827
Number of rows in tfm.dfs 21114 39531 15798
Number of dropped rows 102 286 29
Number of rows in tfm.dfs + Number of dropped rows 21216 39817 15827
For instance, to inspect dropped rows:
tfm.dfs_dropped['seawater'].head()
KEY
NUCLIDE
METHOD
< VALUE_Bq/m³
VALUE_Bq/m³
ERROR%_m³
DATE_OF_ENTRY_x
COUNTRY
LABORATORY
SEQUENCE
...
LONGITUDE (ddmmmm)
LONGITUDE (dddddd)
TDEPTH
SDEPTH
SALIN
TTEMP
FILT
MORS_SUBBASIN
HELCOM_SUBBASIN
DATE_OF_ENTRY_y
13439
WRISO2001025
CS137
RISO02
NaN
NaN
10.0
NaN
26.0
RISO
2001025.0
...
10.500
10.833333
22.0
20.0
0.00
NaN
N
5.0
5.0
NaN
14017
WLEPA2002001
CS134
LEPA02
<
NaN
NaN
NaN
93.0
LEPA
2002001.0
...
21.030
21.050000
16.0
0.0
3.77
14.40
N
4.0
9.0
NaN
14020
WLEPA2002002
CS134
LEPA02
<
NaN
NaN
NaN
93.0
LEPA
2002004.0
...
20.574
20.956667
14.0
0.0
6.57
11.95
N
4.0
9.0
NaN
14023
WLEPA2002003
CS134
LEPA02
<
NaN
NaN
NaN
93.0
LEPA
2002007.0
...
19.236
19.393333
73.0
0.0
7.00
9.19
N
4.0
9.0
NaN
14026
WLEPA2002004
CS134
LEPA02
<
NaN
NaN
NaN
93.0
LEPA
2002010.0
...
20.205
20.341700
47.0
0.0
7.06
8.65
N
4.0
9.0
NaN
5 rows × 27 columns
Rename columns of interest for NetCDF or Open Refine
Column names are standardized to MARIS NetCDF format (i.e. PEP8 ).
Select and rename columns in a DataFrame based on renaming rules for a specified encoding type.
Type
Default
Details
fn_renaming_rules
Callable
A function that returns an OrderedDict of renaming rules
encoding_type
str
netcdf
The encoding type (netcdf or openrefine) to determine which renaming rules to use
verbose
bool
False
Whether to print out renaming rules that were not applied
Exported source
class SelectAndRenameColumnCB(Callback):"Select and rename columns in a DataFrame based on renaming rules for a specified encoding type."def__init__(self, fn_renaming_rules: Callable, # A function that returns an OrderedDict of renaming rules encoding_type: str='netcdf', # The encoding type (`netcdf` or `openrefine`) to determine which renaming rules to use verbose: bool=False# Whether to print out renaming rules that were not applied ): fc.store_attr()def__call__(self, tfm: Transformer):"Apply column selection and renaming to DataFrames in the transformer, and identify unused rules."try: renaming_rules =self.fn_renaming_rules(self.encoding_type)exceptValueErroras e:print(f"Error fetching renaming rules: {e}")returnfor group in tfm.dfs.keys():# Get relevant renaming rules for the current group group_rules =self._get_group_rules(renaming_rules, group)ifnot group_rules:continue# Apply renaming rules and track keys not found in the DataFrame df = tfm.dfs[group] df, not_found_keys =self._apply_renaming(df, group_rules) tfm.dfs[group] = df# Print any renaming rules that were not usedif not_found_keys andself.verbose:print(f"\nGroup '{group}' has the following renaming rules not applied:")for old_col in not_found_keys:print(f"Key '{old_col}' from renaming rules was not found in the DataFrame.")def _get_group_rules(self, renaming_rules: OrderedDict, # Renaming rules group: str# Group name to filter rules ) -> OrderedDict: # Renaming rules applicable to the specified group"Retrieve and merge renaming rules for the specified group based on the encoding type." relevant_rules = [rules for key, rules in renaming_rules.items() if group in key] merged_rules = OrderedDict()for rules in relevant_rules: merged_rules.update(rules)return merged_rulesdef _apply_renaming(self, df: pd.DataFrame, # DataFrame to modify rename_rules: OrderedDict # Renaming rules ) ->tuple: # (Renamed and filtered df, Column names from renaming rules that were not found in the DataFrame)""" Select columns based on renaming rules and apply renaming, only for existing columns while maintaining the order of the dictionary columns.""" existing_columns =set(df.columns) valid_rules = OrderedDict((old_col, new_col) for old_col, new_col in rename_rules.items() if old_col in existing_columns)# Create a list to maintain the order of columns columns_to_keep = [col for col in rename_rules.keys() if col in existing_columns] columns_to_keep += [new_col for old_col, new_col in valid_rules.items() if new_col in df.columns] df = df[list(OrderedDict.fromkeys(columns_to_keep))]# Apply renaming df.rename(columns=valid_rules, inplace=True)# Determine which keys were not found not_found_keys =set(rename_rules.keys()) - existing_columnsreturn df, not_found_keys
["Convert values from 'NUCLIDE' to lowercase, strip spaces, and store in 'None'.",
'Parse and standardize time information in the dataframe.',
'Encode time as `int` representing seconds since xxx',
'Sanitize value/measurement by removing blank entries and populating `value` column.',
'Convert from relative error % to uncertainty of activity unit.',
"Remap values from 'RUBIN' to 'species' for groups: b, i, o, t, a.",
"Remap values from 'TISSUE' to 'body_part' for groups: b, i, o, t, a.",
"Remap values from 'species' to 'bio_group' for groups: b, i, o, t, a.",
'Update taxon information based on MARIS species LUT.',
'Update sediment id based on MARIS species LUT (dbo_sedtype.xlsx).',
'Set the `unit` id column in the DataFrames based on a lookup table.',
'Remap value type to MARIS format.',
'Lookup FILT value in dataframe using the lookup table.',
'Remap `KEY` column to `samplabcode` in each DataFrame.',
"Record measurement notes by adding a 'measurenote' column to DataFrames.",
'Remap Station ID to MARIS format.',
'Remap Sediment slice top and bottom to MARIS format.',
'Lookup dry-wet ratio and format for MARIS.',
'\n Get geographical coordinates from columns expressed in degrees decimal format \n or from columns in degrees/minutes decimal format where degrees decimal format is missing.\n ',
'Drop row when both longitude & latitude equal 0. Drop unrealistic longitude & latitude values. Convert longitude & latitude `,` separator to `.` separator.',
'Select and rename columns in a DataFrame based on renaming rules for a specified encoding type.']
def get_attrs( tfm: Transformer, # Transformer object zotero_key: str, # Zotero dataset record key kw: list= kw # List of keywords ) ->dict: # Global attributes"Retrieve all global attributes."return GlobAttrsFeeder(tfm.dfs, cbs=[ BboxCB(), DepthRangeCB(), TimeRangeCB(cfg()), ZoteroCB(zotero_key, cfg=cfg()), KeyValuePairCB('keywords', ', '.join(kw)), KeyValuePairCB('publisher_postprocess_logs', ', '.join(tfm.logs)) ])()
get_attrs(tfm, zotero_key=zotero_key, kw=kw)
{'geospatial_lat_min': '31.17',
'geospatial_lat_max': '65.75',
'geospatial_lon_min': '9.6333',
'geospatial_lon_max': '53.5',
'geospatial_bounds': 'POLYGON ((9.6333 53.5, 31.17 53.5, 31.17 65.75, 9.6333 65.75, 9.6333 53.5))',
'time_coverage_start': '1984-01-10T00:00:00',
'time_coverage_end': '2021-12-15T00:00:00',
'title': 'Environmental database - Helsinki Commission Monitoring of Radioactive Substances',
'summary': 'MORS Environment database has been used to collate data resulting from monitoring of environmental radioactivity in the Baltic Sea based on HELCOM Recommendation 26/3.\n\nThe database is structured according to HELCOM Guidelines on Monitoring of Radioactive Substances (https://www.helcom.fi/wp-content/uploads/2019/08/Guidelines-for-Monitoring-of-Radioactive-Substances.pdf), which specifies reporting format, database structure, data types and obligatory parameters used for reporting data under Recommendation 26/3.\n\nThe database is updated and quality assured annually by HELCOM MORS EG.',
'creator_name': '[{"creatorType": "author", "name": "HELCOM MORS"}]',
'keywords': 'oceanography, Earth Science > Oceans > Ocean Chemistry> Radionuclides, Earth Science > Human Dimensions > Environmental Impacts > Nuclear Radiation Exposure, Earth Science > Oceans > Ocean Chemistry > Ocean Tracers, Earth Science > Oceans > Marine Sediments, Earth Science > Oceans > Ocean Chemistry, Earth Science > Oceans > Sea Ice > Isotopes, Earth Science > Oceans > Water Quality > Ocean Contaminants, Earth Science > Biological Classification > Animals/Vertebrates > Fish, Earth Science > Biosphere > Ecosystems > Marine Ecosystems, Earth Science > Biological Classification > Animals/Invertebrates > Mollusks, Earth Science > Biological Classification > Animals/Invertebrates > Arthropods > Crustaceans, Earth Science > Biological Classification > Plants > Macroalgae (Seaweeds)',
'publisher_postprocess_logs': "Convert values from 'NUCLIDE' to lowercase, strip spaces, and store in 'None'., Parse and standardize time information in the dataframe., Encode time as `int` representing seconds since xxx, Sanitize value/measurement by removing blank entries and populating `value` column., Convert from relative error % to uncertainty of activity unit., Remap values from 'RUBIN' to 'species' for groups: b, i, o, t, a., Remap values from 'TISSUE' to 'body_part' for groups: b, i, o, t, a., Remap values from 'species' to 'bio_group' for groups: b, i, o, t, a., Update taxon information based on MARIS species LUT., Update sediment id based on MARIS species LUT (dbo_sedtype.xlsx)., Set the `unit` id column in the DataFrames based on a lookup table., Remap value type to MARIS format., Lookup FILT value in dataframe using the lookup table., Remap `KEY` column to `samplabcode` in each DataFrame., Record measurement notes by adding a 'measurenote' column to DataFrames., Remap Station ID to MARIS format., Remap Sediment slice top and bottom to MARIS format., Lookup dry-wet ratio and format for MARIS., \n Get geographical coordinates from columns expressed in degrees decimal format \n or from columns in degrees/minutes decimal format where degrees decimal format is missing.\n , Drop row when both longitude & latitude equal 0. Drop unrealistic longitude & latitude values. Convert longitude & latitude `,` separator to `.` separator., Select and rename columns in a DataFrame based on renaming rules for a specified encoding type."}
Retrieve a subset of the lengthy enum as species_t for instance.
Type
Details
tfm
Transformer
Transformer object
vars
list
List of variables to extract from the transformer
Exported source
def enums_xtra( tfm: Transformer, # Transformer objectvars: list# List of variables to extract from the transformer ):"Retrieve a subset of the lengthy enum as `species_t` for instance." enums = Enums(lut_src_dir=lut_path(), cdl_enums=cdl_cfg()['enums']) xtras = {}for var invars: unique_vals = tfm.unique(var)if unique_vals.any(): xtras[f'{var}_t'] = enums.filter(f'{var}_t', unique_vals)return xtras
Group 'seawater' has the following renaming rules not applied:
Key 'measurenote' from renaming rules was not found in the DataFrame.
Group 'sediment' has the following renaming rules not applied:
Key 'SDEPTH' from renaming rules was not found in the DataFrame.
Key 'measurenote' from renaming rules was not found in the DataFrame.
Key 'TTEMP' from renaming rules was not found in the DataFrame.
Key 'FILT' from renaming rules was not found in the DataFrame.
Key 'SALIN' from renaming rules was not found in the DataFrame.
Group 'biota' has the following renaming rules not applied:
Key 'TDEPTH' from renaming rules was not found in the DataFrame.
Key 'measurenote' from renaming rules was not found in the DataFrame.
Key 'TTEMP' from renaming rules was not found in the DataFrame.
Key 'FILT' from renaming rules was not found in the DataFrame.
Key 'SALIN' from renaming rules was not found in the DataFrame.
seawater sediment biota
Number of rows in dfs 21216 39817 15827
Number of rows in tfm.dfs 21114 39531 15798
Number of dropped rows 102 286 29
Number of rows in tfm.dfs + Number of dropped rows 21216 39817 15827
Example of data included in dfs_dropped.
Main reasons for data to be dropped from dfs: - No activity value reported (e.g. VALUE_Bq/kg) - No time value reported.