This data pipeline, known as a “handler” in Marisco terminology, is designed to clean, standardize, and encode OSPAR data into NetCDF format. The handler processes raw OSPAR data, applying various transformations and lookups to align it with MARIS data standards.

Key functions of this handler:

This handler is a crucial component in the Marisco data processing workflow, ensuring OSPAR data is properly integrated into the MARIS database.

TipGetting Started

For new MARIS users, please refer to Understanding MARIS Data Formats (NetCDF and Open Refine) for detailed information.

The present notebook pretends to be an instance of Literate Programming in the sense that it is a narrative that includes code snippets that are interspersed with explanations. When a function or a class needs to be exported in a dedicated python module (in our case marisco/handlers/ospar.py) the code snippet is added to the module using #| export as provided by the wonderful nbdev library.

Configuration and File Paths

The handler requires several configuration parameters:

  1. src_dir: path to the maris-crawlers folder containing the OSPAR data in CSV format
  2. fname_out: Output path and filename for NetCDF file (relative paths supported)
  3. zotero_key: Key for retrieving dataset attributes from Zotero
Exported source
src_dir = 'https://raw.githubusercontent.com/franckalbinet/maris-crawlers/refs/heads/main/data/processed/OSPAR'
fname_out = '../../_data/output/191-OSPAR-2024.nc'
zotero_key ='LQRA4MMK' # OSPAR MORS zotero key

Load data

OSPAR data is provided as a zipped Microsoft Access database. To facilitate easier access and integration, we process this dataset and convert it into .csv files. These processed files are then made available in the maris-crawlers repository on GitHub. Once converted, the dataset is in a format that is readily compatible with the marisco data pipeline, ensuring seamless data handling and analysis.


source

read_csv

 read_csv (file_name,
           dir='https://raw.githubusercontent.com/franckalbinet/maris-
           crawlers/refs/heads/main/data/processed/OSPAR')
Exported source
default_smp_types = {  
    'Biota': 'BIOTA', 
    'Seawater': 'SEAWATER', 
}
Exported source
def read_csv(file_name, dir=src_dir):
    file_path = f'{dir}/{file_name}'
    return pd.read_csv(file_path)

source

load_data

 load_data (src_url:str, smp_types:dict={'Biota': 'BIOTA', 'Seawater':
            'SEAWATER'}, use_cache:bool=False, save_to_cache:bool=False,
            verbose:bool=False)

Load OSPAR data and return the data in a dictionary of dataframes with the dictionary key as the sample type.

Type Default Details
src_url str
smp_types dict {‘Biota’: ‘BIOTA’, ‘Seawater’: ‘SEAWATER’} Sample types to load
use_cache bool False Use cache
save_to_cache bool False Save to cache
verbose bool False Verbose
Returns Dict
Exported source
def load_data(src_url: str, 
              smp_types: dict = default_smp_types, # Sample types to load
              use_cache: bool = False, # Use cache
              save_to_cache: bool = False, # Save to cache
              verbose: bool = False # Verbose
              ) -> Dict[str, pd.DataFrame]:
    "Load OSPAR data and return the data in a dictionary of dataframes with the dictionary key as the sample type."
    
    def safe_file_path(url: str) -> str:
        "Safely encode spaces in a URL."
        return url.replace(" ", "%20")

    def get_file_path(dir_path: str, file_prefix: str) -> str:
        """Construct the full file path based on directory and file prefix."""
        file_path = f"{dir_path}/{file_prefix} data.csv"
        return safe_file_path(file_path) if not use_cache else file_path

    def load_and_process_csv(file_path: str) -> pd.DataFrame:
        """Load a CSV file and process it."""
        if use_cache and not Path(file_path).exists():
            if verbose:
                print(f"{file_path} not found in cache.")
            return pd.DataFrame()

        if verbose:
            start_time = time.time()

        try:
            df = pd.read_csv(file_path)
            df.columns = df.columns.str.lower()
            if verbose:
                print(f"Data loaded from {file_path} in {time.time() - start_time:.2f} seconds.")
            return df
        except Exception as e:
            if verbose:
                print(f"Failed to load {file_path}: {e}")
            return pd.DataFrame()

    def save_to_cache_dir(df: pd.DataFrame, file_prefix: str):
        """Save the DataFrame to the cache directory."""
        cache_dir = cache_path()
        cache_file_path = f"{cache_dir}/{file_prefix} data.csv"
        df.to_csv(cache_file_path, index=False)
        if verbose:
            print(f"Data saved to cache at {cache_file_path}")

    data = {}
    for file_prefix, smp_type in smp_types.items():
        dir_path = cache_path() if use_cache else src_url
        file_path = get_file_path(dir_path, file_prefix)
        df = load_and_process_csv(file_path)

        if save_to_cache and not df.empty:
            save_to_cache_dir(df, file_prefix)

        data[smp_type] = df

    return data
dfs = load_data(src_dir, save_to_cache=True, verbose=False)
dfs['SEAWATER'].columns
Index(['id', 'contracting party', 'rsc sub-division', 'station id',
       'sample id', 'latd', 'latm', 'lats', 'latdir', 'longd', 'longm',
       'longs', 'longdir', 'sample type', 'sampling depth', 'sampling date',
       'nuclide', 'value type', 'activity or mda', 'uncertainty', 'unit',
       'data provider', 'measurement comment', 'sample comment',
       'reference comment'],
      dtype='object')

Remove Missing Values

ImportantFEEDBACK TO DATA PROVIDER

We consider records are incomplete if either the activity or mda field or the sampling date field is empty. These are the two key criteria we use to identify missing data.

As shown below: 10 rows are missing the sampling date and 10 rows are missing the activity or mda field.

print(f'Missing sampling date: {dfs["SEAWATER"]["sampling date"].isnull().sum()}')
print(f'Missing activity or mda: {dfs["SEAWATER"]["activity or mda"].isnull().sum()}')
dfs['SEAWATER'][dfs['SEAWATER']['sampling date'].isnull()].sample(2)
Missing sampling date: 10
Missing activity or mda: 10
id contracting party rsc sub-division station id sample id latd latm lats latdir longd ... sampling date nuclide value type activity or mda uncertainty unit data provider measurement comment sample comment reference comment
19191 120367 Ireland 4.0 N9 NaN 53 53.0 0.0 N 5 ... NaN NaN NaN NaN NaN NaN NaN 2021 data The Irish Navy attempted a few times to collec... NaN
16161 120369 Ireland 1.0 Salthill NaN 53 15.0 40.0 N 9 ... NaN NaN NaN NaN NaN NaN NaN 2021 data Woodstown (County Waterford) and Salthill (Cou... NaN

2 rows × 25 columns

To quickly remove all missing values, we can use the RemoveAllNAValuesCB callback.

Exported source
nan_cols_to_check = ['sampling date', 'activity or mda']
dfs = load_data(src_dir)
tfm = Transformer(dfs, cbs = [
    RemoveAllNAValuesCB(nan_cols_to_check)])

dfs_out = tfm()

Now we can see that the sampling date and activity or mda columns have no missing values.

len(dfs_out['SEAWATER'][dfs['SEAWATER']['sampling date'].isnull()])
0

Nuclide Name Normalization

We must standardize the nuclide names in the OSPAR dataset to align with the standardized names provided in the MARISCO lookup table. The lookup process utilizes three key columns: - nuclide_id: This serves as a unique identifier for each nuclide - nuclide: Represents the standardized name of the nuclide as per our conventions - nc_name: Denotes the corresponding name used in NetCDF files

Below, we will examine the structure and contents of the lookup table:

nuc_lut_df = pd.read_excel(nuc_lut_path())
nuc_lut_df.sample(5)
nuclide_id nuclide atomicnb massnb nusymbol half_life hl_unit nc_name
119 128 YTRIUM 39.0 88.0 88Y 0.00 - y88
40 42 LEAD 82.0 212.0 212Pb 10.64 H pb212
34 36 CERIUM 58.0 141.0 141Ce 32.55 D ce141
77 80 CURIUM COMB 96.0 243.0 243,244Cm 0.00 - cm243_244_tot
50 52 RADIUM 88.0 225.0 225Ra 14.80 D ra225
ImportantFEEDBACK TO DATA PROVIDER

In OSPAR dataset, the nuclide column has inconsistent naming:

  • Cs-137, 137Cs or CS-137
  • 239, 240 pu or 239,240 pu
  • ra-226 and 226ra
  • duplicates due to the presence of trailing spaces

See below:

print(get_unique_across_dfs(dfs, 'nuclide', as_df=False))
[
    'Cs-137',
    '210Pb',
    '137Cs  ',
    '238Pu',
    nan,
    '226Ra',
    '99Tc  ',
    'CS-137',
    '239,240Pu',
    '239, 240 Pu',
    '241Am',
    '210Po',
    '210Po  ',
    '228Ra',
    '99Tc',
    '99Tc   ',
    '137Cs',
    '3H'
]

Regardless of these inconsistencies, OSPAR’s nuclide column needs to be standardized accorsing to MARIS nomenclature.

Lower & strip nuclide names

To streamline the process of standardizing nuclide data, we employ the LowerStripNameCB callback. This function is applied to each DataFrame within our dictionary of DataFrames. Specifically, LowerStripNameCB simplifies the nuclide names by converting them to lowercase and removing any leading or trailing whitespace.

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[LowerStripNameCB(col_src='nuclide', col_dst='nuclide')])
dfs_output=tfm()
for key, df in dfs_output.items(): print(f'{key} nuclides: {df["nuclide"].unique()}')
BIOTA nuclides: ['137cs' '226ra' '228ra' '239,240pu' '99tc' '210po' '210pb' '3h' 'cs-137'
 '238pu' '239, 240 pu' '241am']
SEAWATER nuclides: ['137cs' '239,240pu' '226ra' '228ra' '99tc' '3h' '210po' '210pb' nan]

Remap nuclide names to MARIS data formats

Next, we map nuclide names used by OSPAR to the MARIS standard nuclide names.

NoteThe “IMFA” MARISCO PATTERN

Remapping data provider nomenclatures to MARIS standards is a recurrent operation and is done in a semi-automated manner according to the following pattern:

  1. Inspect data provider nomenclature
  2. Match automatically against MARIS nomenclature (using a fuzzy matching algorithm)
  3. Fix potential mismatches
  4. Apply the lookup table to the DataFrame

We will refer to this process as IMFA (Inspect, Match, Fix, Apply).

Let’s now create an instance of a fuzzy matching algorithm Remapper. This instance will align the nuclide names from the OSPAR dataset with the MARIS standard nuclide names, as defined in the lookup table located at nuc_lut_path and previously shown as nuc_lut_df.

remapper = Remapper(
    provider_lut_df=get_unique_across_dfs(dfs_output, col_name='nuclide', as_df=True), 
    maris_lut_fn=nuc_lut_path,
    maris_col_id='nuclide_id',
    maris_col_name='nc_name',
    provider_col_to_match='value',
    provider_col_key='value',
    fname_cache='nuclides_ospar.pkl'
    )

Let’s clarify the meaning of the Remapper parameters:

  • provider_lut_df: It is the nomenclature/lookup table used by the data provider for a certain attribute/variable. When the data provider does not provide such nomenclature, get_unique_across_dfs is used to extract the unique values from data provider data.
  • maris_lut_fn: The path to the lookup table containing the MARIS standard nuclide names
  • maris_col_id: The column name in the lookup table containing the MARIS standard nuclide names
  • maris_col_name: The column name in the lookup table containing the MARIS standard nuclide names
  • provider_col_to_match: The column name in the OSPAR dataset containing the nuclide names used for the remapping
  • provider_col_key: The column name in the OSPAR dataset containing the nuclide names to remap from
  • fname_cache: The filename for the cache file

Both provider_col_to_match and provider_col_key are the same column name in the OSPAR dataset. In other cases, data providers provide an associated nomenclature such as below for instance (see HELCOM handler for instance).

data-provider-nuclide-lut DataFrame:

nuclide_id nuclide
0 Cs-137
1 Cs-134
2 I-131

and uses the nuclide_id value in the data themselves. In such a case: - provider_lut_df: data-provider-nuclide-lut - provider_col_to_match would be nuclide - provider_col_key would be nuclide_id

Now, we can automatically match the OSPAR nuclide names to the MARIS standard. The match_score column helps us evaluate the results.

NotePay Attention

Note that data provider’s name to macth is always transformed to lowercase and stripped of any leading or trailing whitespace to streamline the matching process as mentionned above.

remapper.generate_lookup_table(as_df=True)
remapper.select_match(match_score_threshold=0, verbose=True)
Processing:   0%|          | 0/13 [00:00<?, ?it/s]Processing: 100%|██████████| 13/13 [00:00<00:00, 57.47it/s]
0 entries matched the criteria, while 13 entries had a match score of 0 or higher.
matched_maris_name source_name match_score
source_key
239, 240 pu pu240 239, 240 pu 8
239,240pu pu240 239,240pu 6
137cs i133 137cs 4
241am pu241 241am 4
210po ru106 210po 4
226ra u234 226ra 4
210pb ru106 210pb 4
228ra u235 228ra 4
99tc tu 99tc 3
238pu u238 238pu 3
3h tu 3h 2
cs-137 cs137 cs-137 1
NaN Unknown NaN 0
NoteFuzzy Matching

To try matching/reconciling two nomenclatures, we compute the Levenshtein distance between the OSPAR nuclide names and the MARIS standard nuclide names as indicated in the match_score column. A score of 0 indicates a perfect match.

We now manually review the unmatched nuclide names and construct a dictionary to map them to the MARIS standard.

Exported source
fixes_nuclide_names = {
    '99tc': 'tc99',
    '238pu': 'pu238',
    '226ra': 'ra226',
    'ra-226': 'ra226',
    'ra-228': 'ra228',    
    '210pb': 'pb210',
    '241am': 'am241',
    '228ra': 'ra228',
    '137cs': 'cs137',
    '210po': 'po210',
    '239,240pu': 'pu239_240_tot',
    '239, 240 pu': 'pu239_240_tot',
    '3h': 'h3'
    }

The dictionary fixes_nuclide_names applies manual corrections to the nuclide names before the remapping process begins. Note that we did not remap cs-137 to cs137 as the fuzzy matching algorithm already matched cs-137 to cs137 (though the match score was 1).

The generate_lookup_table function constructs a lookup table for this purpose and includes an overwrite parameter, set to True by default. When activated, this parameter enables the function to update the existing cache with a new pickle file containing the updated lookup table. We are now prepared to test the remapping process.

remapper.generate_lookup_table(as_df=True, fixes=fixes_nuclide_names)
Processing:   0%|          | 0/13 [00:00<?, ?it/s]Processing: 100%|██████████| 13/13 [00:00<00:00, 52.43it/s]
matched_maris_name source_name match_score
source_key
cs-137 cs137 cs-137 1
137cs cs137 137cs 0
241am am241 241am 0
210po po210 210po 0
226ra ra226 226ra 0
99tc tc99 99tc 0
239,240pu pu239_240_tot 239,240pu 0
238pu pu238 238pu 0
3h h3 3h 0
239, 240 pu pu239_240_tot 239, 240 pu 0
210pb pb210 210pb 0
228ra ra228 228ra 0
NaN Unknown NaN 0

To view all remapped nuclides as a lookup table that will be later passed to our RemapNuclideNameCB callback:

remapper.generate_lookup_table(as_df=False, fixes=fixes_nuclide_names, overwrite=True)
Processing:   0%|          | 0/13 [00:00<?, ?it/s]Processing: 100%|██████████| 13/13 [00:00<00:00, 56.68it/s]
{'137cs': Match(matched_id=np.int64(33), matched_maris_name='cs137', source_name='137cs', match_score=np.int64(0)),
 '241am': Match(matched_id=np.int64(72), matched_maris_name='am241', source_name='241am', match_score=np.int64(0)),
 '210po': Match(matched_id=np.int64(47), matched_maris_name='po210', source_name='210po', match_score=np.int64(0)),
 '226ra': Match(matched_id=np.int64(53), matched_maris_name='ra226', source_name='226ra', match_score=np.int64(0)),
 '99tc': Match(matched_id=np.int64(15), matched_maris_name='tc99', source_name='99tc', match_score=np.int64(0)),
 '239,240pu': Match(matched_id=np.int64(77), matched_maris_name='pu239_240_tot', source_name='239,240pu', match_score=np.int64(0)),
 '238pu': Match(matched_id=np.int64(67), matched_maris_name='pu238', source_name='238pu', match_score=np.int64(0)),
 '3h': Match(matched_id=np.int64(1), matched_maris_name='h3', source_name='3h', match_score=np.int64(0)),
 'cs-137': Match(matched_id=np.int64(33), matched_maris_name='cs137', source_name='cs-137', match_score=np.int64(1)),
 '239, 240 pu': Match(matched_id=np.int64(77), matched_maris_name='pu239_240_tot', source_name='239, 240 pu', match_score=np.int64(0)),
 '210pb': Match(matched_id=np.int64(41), matched_maris_name='pb210', source_name='210pb', match_score=np.int64(0)),
 '228ra': Match(matched_id=np.int64(54), matched_maris_name='ra228', source_name='228ra', match_score=np.int64(0)),
 nan: Match(matched_id=-1, matched_maris_name='Unknown', source_name=nan, match_score=0)}

The nuclide names have been successfully remapped. We now create a callback named RemapNuclideNameCB to translate the OSPAR dataset’s nuclide names into the standard nuclide_ids used by MARIS. This callback employs the lut_nuclides lambda function, which provides the required lookup table. Note that the overwrite=False parameter is specified in the Remapper constructor of the lut_nuclides lambda function to utilize the cached version.


source

RemapNuclideNameCB

 RemapNuclideNameCB (fn_lut:Callable, col_name:str)

Remap data provider nuclide names to standardized MARIS nuclide names.

Type Details
fn_lut Callable Function that returns the lookup table dictionary
col_name str Column name to remap
Exported source
# Create a lookup table for nuclide names
lut_nuclides = lambda df: Remapper(provider_lut_df=df,
                                   maris_lut_fn=nuc_lut_path,
                                   maris_col_id='nuclide_id',
                                   maris_col_name='nc_name',
                                   provider_col_to_match='value',
                                   provider_col_key='value',
                                   fname_cache='nuclides_ospar.pkl').generate_lookup_table(fixes=fixes_nuclide_names, 
                                                                                            as_df=False, overwrite=True)
Exported source
class RemapNuclideNameCB(Callback):
    "Remap data provider nuclide names to standardized MARIS nuclide names."
    def __init__(self, 
                 fn_lut: Callable, # Function that returns the lookup table dictionary
                 col_name: str # Column name to remap
                ):
        fc.store_attr()

    def __call__(self, tfm: Transformer):
        df_uniques = get_unique_across_dfs(tfm.dfs, col_name=self.col_name, as_df=True)
        lut = {k: v.matched_id for k, v in self.fn_lut(df_uniques).items()}    
        for k in tfm.dfs.keys():
            tfm.dfs[k]['NUCLIDE'] = tfm.dfs[k][self.col_name].replace(lut)

Let’s see it in action, along with the LowerStripNameCB callback:

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
    RemoveAllNAValuesCB(nan_cols_to_check),
    LowerStripNameCB(col_src='nuclide', col_dst='nuclide'),
    RemapNuclideNameCB(lut_nuclides, col_name='nuclide')
    ])
dfs_out = tfm()

# For instance
for key in dfs_out.keys():
    print(f'Unique nuclide_ids for {key} NUCLIDE column: ', dfs_out[key]['NUCLIDE'].unique())
Processing: 100%|██████████| 12/12 [00:00<00:00, 46.32it/s]
Unique nuclide_ids for BIOTA NUCLIDE column:  [33 53 54 77 15 47 41  1 67 72]
Unique nuclide_ids for SEAWATER NUCLIDE column:  [33 77 53 54 15  1 47 41]

Standardize Time

We create a callback that remaps the date time format in the dictionary of DataFrames (i.e. %m/%d/%y %H:%M:%S) to a data time object and in the process handle missing date and times.


source

ParseTimeCB

 ParseTimeCB (col_src:dict={'BIOTA': 'sampling date', 'SEAWATER':
              'sampling date'}, col_dst:str='TIME', format:str='%m/%d/%y
              %H:%M:%S')

Parse the time format in the dataframe and check for inconsistencies.

Type Default Details
col_src dict {‘BIOTA’: ‘sampling date’, ‘SEAWATER’: ‘sampling date’} Column name to remap
col_dst str TIME Column name to remap
format str %m/%d/%y %H:%M:%S Time format
Exported source
time_cols = {'BIOTA': 'sampling date', 'SEAWATER': 'sampling date'}
time_format = '%m/%d/%y %H:%M:%S'
Exported source
class ParseTimeCB(Callback):
    "Parse the time format in the dataframe and check for inconsistencies."
    def __init__(self, 
                 col_src: dict=time_cols, # Column name to remap
                 col_dst: str='TIME', # Column name to remap
                 format: str=time_format # Time format
                 ):
        fc.store_attr()
    
    def __call__(self, tfm):
        for grp, df in tfm.dfs.items():
            src_col = self.col_src.get(grp)
            df[self.col_dst] = pd.to_datetime(df[src_col], format=self.format, errors='coerce')
        return tfm

Apply the transformer for callback ParseTimeCB.

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
    RemoveAllNAValuesCB(nan_cols_to_check),
    ParseTimeCB(),
    CompareDfsAndTfmCB(dfs)])
tfm()


display(Markdown("<b> Row Count Comparison Before and After Transformation:</b>"))
with pd.option_context('display.max_rows', None):
    display(pd.DataFrame.from_dict(tfm.compare_stats))

display(Markdown("<b> Example of parsed time column:</b>"))
with pd.option_context('display.max_rows', None):
    display(tfm.dfs['SEAWATER']['TIME'].head(2))

Row Count Comparison Before and After Transformation:

BIOTA SEAWATER
Original row count (dfs) 15951 19193
Transformed row count (tfm.dfs) 15951 19183
Rows removed from original (tfm.dfs_removed) 0 10
Rows created in transformed (tfm.dfs_created) 0 0

Example of parsed time column:

0   2010-01-27
1   2010-01-27
Name: TIME, dtype: datetime64[ns]

The NetCDF time format requires the time to be encoded as number of milliseconds since a time of origin. In our case the time of origin is 1970-01-01 as indicated in configs.ipynb CONFIFS['units']['time'] dictionary.

EncodeTimeCB transforms the datetime object from ParseTimeCB into the MARIS NetCDF time format.

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
    RemoveAllNAValuesCB(nan_cols_to_check),
    ParseTimeCB(),
    EncodeTimeCB(),
    CompareDfsAndTfmCB(dfs)
    ])

tfm()

display(Markdown("<b> Row Count Comparison Before and After Transformation:</b>"))
with pd.option_context('display.max_rows', None):
    display(pd.DataFrame.from_dict(tfm.compare_stats))

Row Count Comparison Before and After Transformation:

BIOTA SEAWATER
Original row count (dfs) 15951 19193
Transformed row count (tfm.dfs) 15951 19183
Rows removed from original (tfm.dfs_removed) 0 10
Rows created in transformed (tfm.dfs_created) 0 0

Sanitize value

We create a callback, SanitizeValueCB, to consolidate measurement values into a single column named VALUE and remove any NaN entries.


source

SanitizeValueCB

 SanitizeValueCB (value_col:dict={'BIOTA': 'activity or mda', 'SEAWATER':
                  'activity or mda'})

Sanitize value by removing blank entries and populating value column.

Type Default Details
value_col dict {‘BIOTA’: ‘activity or mda’, ‘SEAWATER’: ‘activity or mda’} Column name to sanitize
Exported source
value_cols = {'BIOTA': 'activity or mda', 'SEAWATER': 'activity or mda'}
Exported source
class SanitizeValueCB(Callback):
    "Sanitize value by removing blank entries and populating `value` column."
    def __init__(self, 
                 value_col: dict = value_cols # Column name to sanitize
                 ):
        fc.store_attr()

    def __call__(self, tfm):
        for grp, df in tfm.dfs.items():
            # Drop rows where parsing failed (NaT values in TIME column)
            invalid_rows = df[df[self.value_col.get(grp)].isna()]
            if not invalid_rows.empty:     
                print(f"{len(invalid_rows)} invalid rows found in group '{grp}' during sanitize value callback.")
                df.dropna(subset=[self.value_col.get(grp)], inplace=True)
                
            df['VALUE'] = df[self.value_col.get(grp)]
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
    RemoveAllNAValuesCB(nan_cols_to_check),
    SanitizeValueCB(),
    CompareDfsAndTfmCB(dfs)])

tfm()

display(Markdown("<b> Example of VALUE column:</b>"))
with pd.option_context('display.max_rows', None):
    display(tfm.dfs['SEAWATER'][['VALUE']].head())

display(Markdown("<b> Row Count Comparison Before and After Transformation:</b>"))
with pd.option_context('display.max_rows', None):
    display(pd.DataFrame.from_dict(tfm.compare_stats))

display(Markdown("<b> Example of removed data:</b>"))
with pd.option_context('display.max_columns', None):
    display(tfm.dfs_removed['SEAWATER'].head(2))

Example of VALUE column:

VALUE
0 0.20
1 0.27
2 0.26
3 0.25
4 0.20

Row Count Comparison Before and After Transformation:

BIOTA SEAWATER
Original row count (dfs) 15951 19193
Transformed row count (tfm.dfs) 15951 19183
Rows removed from original (tfm.dfs_removed) 0 10
Rows created in transformed (tfm.dfs_created) 0 0

Example of removed data:

id contracting party rsc sub-division station id sample id latd latm lats latdir longd longm longs longdir sample type sampling depth sampling date nuclide value type activity or mda uncertainty unit data provider measurement comment sample comment reference comment
14776 97948 Sweden 11.0 SW7 1 58 36.0 12.0 N 11 14.0 42.0 E WATER 1.0 NaN 3H NaN NaN NaN Bq/l Swedish Radiation Safety Authority no 3H this year due to broken LSC NaN NaN
14780 97952 Sweden 12.0 Ringhals (R35) 7 57 14.0 5.0 N 11 56.0 8.0 E WATER 1.0 NaN 3H NaN NaN NaN Bq/l Swedish Radiation Safety Authority no 3H this year due to broken LSC NaN NaN

Normalize uncertainty

We create a callback, NormalizeUncCB, to standardize the uncertainty value to the MARIS format. For each sample type in the OSPAR dataset, the reported uncertainty is given as an expanded uncertainty with a coverage factor 𝑘=2. For further details, refer to the OSPAR reporting guidelines. In MARIS the uncertainty values are reported as standard uncertainty with a coverage factor 𝑘=1.

NormalizeUncCB callback normalizes the uncertainty using the following lambda function:


source

NormalizeUncCB

 NormalizeUncCB (col_unc:dict={'BIOTA': 'uncertainty', 'SEAWATER':
                 'uncertainty'}, fn_convert_unc:Callable=<function
                 <lambda>>)

Normalize uncertainty values in DataFrames.

Type Default Details
col_unc dict {‘BIOTA’: ‘uncertainty’, ‘SEAWATER’: ‘uncertainty’} Column name to normalize
fn_convert_unc Callable Function correcting coverage factor
Exported source
unc_exp2stan = lambda df, unc_col: df[unc_col] / 2
Exported source
unc_cols = {'BIOTA': 'uncertainty', 'SEAWATER': 'uncertainty'}
Exported source
class NormalizeUncCB(Callback):
    """Normalize uncertainty values in DataFrames."""
    def __init__(self, 
                 col_unc: dict = unc_cols, # Column name to normalize
                 fn_convert_unc: Callable=unc_exp2stan, # Function correcting coverage factor
                 ): 
        fc.store_attr()

    def __call__(self, tfm):
        for grp, df in tfm.dfs.items():
            self._convert_commas_to_periods(df, self.col_unc.get(grp)   )
            self._convert_to_float(df, self.col_unc.get(grp))
            self._apply_conversion_function(df, self.col_unc.get(grp))

    def _convert_commas_to_periods(self, df, col_unc    ):
        """Convert commas to periods in the uncertainty column."""
        df[col_unc] = df[col_unc].astype(str).str.replace(',', '.')

    def _convert_to_float(self, df, col_unc):
        """Convert uncertainty column to float, handling errors by setting them to NaN."""
        df[col_unc] = pd.to_numeric(df[col_unc], errors='coerce')

    def _apply_conversion_function(self, df, col_unc):
        """Apply the conversion function to normalize the uncertainty values."""
        df['UNC'] = self.fn_convert_unc(df, col_unc)
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
    RemoveAllNAValuesCB(nan_cols_to_check),
    SanitizeValueCB(),               
    NormalizeUncCB()
    ])

tfm()

display(Markdown("<b> Example of VALUE and UNC columns:</b>"))  
for grp in ['SEAWATER', 'BIOTA']:
    print(f'\n{grp}:')
    print(tfm.dfs[grp][['VALUE', 'UNC']])

Example of VALUE and UNC columns:

SEAWATER:
          VALUE           UNC
0      0.200000           NaN
1      0.270000           NaN
2      0.260000           NaN
3      0.250000           NaN
4      0.200000           NaN
...         ...           ...
19183  0.000005  2.600000e-07
19184  6.152000  3.076000e-01
19185  0.005390  1.078000e-03
19186  0.001420  2.840000e-04
19187  6.078000  3.039000e-01

[19183 rows x 2 columns]
BIOTA:
          VALUE       UNC
0      0.326416       NaN
1      0.442704       NaN
2      0.412989       NaN
3      0.202768       NaN
4      0.652833       NaN
...         ...       ...
15946  0.384000  0.012096
15947  0.456000  0.012084
15948  0.122000  0.031000
15949  0.310000       NaN
15950  0.306000  0.007191

[15951 rows x 2 columns]
ImportantFEEDBACK TO DATA PROVIDER

The SEAWATER dataset includes instances where the uncertainty values significantly exceed the corresponding measurement values. While such occurrences are not inherently erroneous, they merit attention and may warrant further verification.

To demonstrate instances where the uncertainty significantly surpasses the measurement values, we will initially compute the ‘relative uncertainty’ as a percentage for the seawater dataset.

dfs = load_data(src_dir, use_cache=True)
for grp in ['SEAWATER', 'BIOTA']:
    tfm.dfs[grp]['relative_uncertainty'] = (
    # Divide 'uncertainty' by 'value'
    (tfm.dfs[grp]['uncertainty'] / tfm.dfs[grp]['activity or mda'])
    # Multiply by 100 to convert to percentage
    * 100)

Now we will retrieve all rows where the relative uncertainty exceeds 100% for the seawater dataset.

threshold = 100
grp = 'SEAWATER'
cols_to_show = ['id', 'contracting party', 'nuclide', 'value type', 'activity or mda', 'uncertainty', 'unit', 'relative_uncertainty']
df = tfm.dfs[grp][cols_to_show][tfm.dfs[grp]['relative_uncertainty'] > threshold]

print(f'Number of rows where relative uncertainty is greater than {threshold}%: \n {df.shape[0]} \n')

display(Markdown(f"<b> Example of data with relative uncertainty greater than {threshold}%:</b>"))
with pd.option_context('display.max_rows', None):
    display(df.head())
Number of rows where relative uncertainty is greater than 100%: 
 95 

Example of data with relative uncertainty greater than 100%:

id contracting party nuclide value type activity or mda uncertainty unit relative_uncertainty
969 11075 United Kingdom 137Cs = 0.0028 0.3276 Bq/l 11700.0
971 11077 United Kingdom 137Cs = 0.0029 0.3364 Bq/l 11600.0
973 11079 United Kingdom 137Cs = 0.0025 0.3325 Bq/l 13300.0
975 11081 United Kingdom 137Cs = 0.0025 0.3450 Bq/l 13800.0
977 11083 United Kingdom 137Cs = 0.0038 0.3344 Bq/l 8800.0
ImportantFEEDBACK TO DATA PROVIDER

The BIOTA dataset includes instances where the uncertainty values significantly exceed the corresponding measurement values. While such occurrences are not inherently erroneous, they merit attention and may warrant further verification.

Now we will retrieve all rows where the relative uncertainty exceeds 100% for the biota dataset.

threshold = 100
grp = 'BIOTA' 
cols_to_show=['id', 'contracting party', 'nuclide', 'value type', 'activity or mda', 'uncertainty', 'unit', 'relative_uncertainty']
df=tfm.dfs[grp][cols_to_show][tfm.dfs[grp]['relative_uncertainty'] > threshold]

print(f'Number of rows where relative uncertainty is greater than {threshold}%: \n {df.shape[0]} \n')

display(Markdown(f"<b> Example of data with relative uncertainty greater than {threshold}%:</b>"))
with pd.option_context('display.max_rows', None):
    display(df.head())
Number of rows where relative uncertainty is greater than 100%: 
 100 

Example of data with relative uncertainty greater than 100%:

id contracting party nuclide value type activity or mda uncertainty unit relative_uncertainty
249 3101 Norway 137Cs = 0.0500 0.1000 Bq/kg f.w. 200.000000
306 3158 Norway 137Cs = 0.1500 0.1600 Bq/kg f.w. 106.666667
775 8152 Norway 137Cs = 0.0340 0.0500 Bq/kg f.w. 147.058824
788 8165 Norway 137Cs = 0.0300 0.0500 Bq/kg f.w. 166.666667
1839 19571 Belgium 239,240Pu = 0.0074 0.0093 Bq/kg f.w. 125.675676

Remap units

Let’s inspect the unique units used by OSPAR:

get_unique_across_dfs(dfs, col_name='unit', as_df=True)
index value
0 0 BQ/L
1 1 Bq/L
2 2 Bq/l
3 3 Bq/kg f.w.
4 4 NaN
ImportantFEEDBACK TO DATA PROVIDER

Standardizing the units would simplify data processing, as the units are not consistent across the dataset. For example, BQ/L, Bq/l, and Bq/L are used interchangeably.

We will establish unit renaming rules for the OSPAR dataset:

Exported source
# Define unit names renaming rules
renaming_unit_rules = {'Bq/l': 1, #'Bq/m3'
                       'Bq/L': 1,
                       'BQ/L': 1,
                       'Bq/kg f.w.': 5, # Bq/kgw
                       }

Now we will create a callback, RemapUnitCB, to remap the units in the dataframes. For the SEAWATER dataset, we will set a default unit of Bq/l.


source

RemapUnitCB

 RemapUnitCB (lut:Dict[str,str], default_units:Dict[str,str]={'SEAWATER':
              'Bq/l', 'BIOTA': 'Bq/kg f.w.'}, verbose:bool=False)

Callback to update DataFrame ‘UNIT’ columns based on a lookup table.

Exported source
default_units = {'SEAWATER': 'Bq/l',
                 'BIOTA': 'Bq/kg f.w.'}
Exported source
class RemapUnitCB(Callback):
    """Callback to update DataFrame 'UNIT' columns based on a lookup table."""

    def __init__(self,
                 lut: Dict[str, str],
                 default_units: Dict[str, str] = default_units,
                 verbose: bool = False
                 ):
        fc.store_attr()  # Store the lookup table as an attribute

    def __call__(self, tfm: 'Transformer'):
        for grp, df in tfm.dfs.items():
            # Apply default units to SEAWATER dataset
            if grp == 'SEAWATER':
                self._apply_default_units(df, unit=self.default_units.get(grp))
            # self._print_na_units(df)
            self._update_units(df)

    def _apply_default_units(self, df: pd.DataFrame , unit = None):
        df.loc[df['unit'].isnull(), 'unit'] = unit

    # def _print_na_units(self, df: pd.DataFrame):
    #     na_count = df['unit'].isnull().sum()
    #     if na_count > 0 and self.verbose:
    #         print(f"Number of rows with NaN in 'unit' column: {na_count}")

    def _update_units(self, df: pd.DataFrame):
        df['UNIT'] = df['unit'].apply(lambda x: self.lut.get(x, 'Unknown'))
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
    RemoveAllNAValuesCB(nan_cols_to_check),
    SanitizeValueCB(), # Remove blank value entries (also removes NaN values in Unit column) 
    RemapUnitCB(renaming_unit_rules, verbose=True),
    CompareDfsAndTfmCB(dfs)
    ])

tfm()

display(Markdown("<b> Row Count Comparison Before and After Transformation:</b>"))
with pd.option_context('display.max_rows', None):
    display(pd.DataFrame.from_dict(tfm.compare_stats))

print('Unique Unit values:')
for grp in ['BIOTA', 'SEAWATER']:
    print(f"{grp}: {tfm.dfs[grp]['UNIT'].unique()}")

Row Count Comparison Before and After Transformation:

BIOTA SEAWATER
Original row count (dfs) 15951 19193
Transformed row count (tfm.dfs) 15951 19183
Rows removed from original (tfm.dfs_removed) 0 10
Rows created in transformed (tfm.dfs_created) 0 0
Unique Unit values:
BIOTA: [5]
SEAWATER: [1]

Remap detection limit

ImportantFEEDBACK TO DATA PROVIDER

The Value type column contains numerous nan entries.

# Count the number of NaN entries in the 'value type' column for 'SEAWATER'
na_count_seawater = dfs['SEAWATER']['value type'].isnull().sum()
print(f"Number of NaN 'Value type' entries in 'SEAWATER': {na_count_seawater}")

# Count the number of NaN entries in the 'value type' column for 'BIOTA'
na_count_biota = dfs['BIOTA']['value type'].isnull().sum()
print(f"Number of NaN 'Value type' entries in 'BIOTA': {na_count_biota}")
Number of NaN 'Value type' entries in 'SEAWATER': 64
Number of NaN 'Value type' entries in 'BIOTA': 23

In the OSPAR dataset, the detection limit is denoted by < in the Value type column. When the Value type is <, the Activity or MDAcolumn specifies the detection limit. Conversely, when the Value type is =, it indicates an actual measurement in theActivity or MDA column. Let’s review the entries in the Value type column for the OSPAR dataset:

for grp in dfs.keys():
    print(f'{grp}:')
    print(tfm.dfs[grp]['value type'].unique())
BIOTA:
['<' '=' nan]
SEAWATER:
['<' '=' nan]

In MARIS the Detection limits are encoded as follows:

pd.read_excel(detection_limit_lut_path())
id name name_sanitized
0 -1 Not applicable Not applicable
1 0 Not Available Not available
2 1 = Detected value
3 2 < Detection limit
4 3 ND Not detected
5 4 DE Derived

We can create a lambda function to retrieve the MARIS lookup table.

Exported source
lut_dl = lambda: pd.read_excel(detection_limit_lut_path(), usecols=['name','id']).set_index('name').to_dict()['id']

We can define the columns of interest in both the SEAWATER and BIOTA DataFrames for the detection limit column.

Exported source
coi_dl = {'SEAWATER' : {'DL' : 'value type'},
          'BIOTA':  {'DL' : 'value type'}
          }

We now create a callback RemapDetectionLimitCB to remap OSPAR detection limit values to MARIS formatted values using the lookup table. Since the dataset contains ‘nan’ entries for the detection limit column, we will create a condition to set the detection limit to ‘=’ when the value and uncertainty columns are present and the current detection limit value is not in the lookup keys.


source

RemapDetectionLimitCB

 RemapDetectionLimitCB (coi:dict, fn_lut:Callable)

Remap detection limit values to MARIS format using a lookup table.

Exported source
class RemapDetectionLimitCB(Callback):
    """Remap detection limit values to MARIS format using a lookup table."""

    def __init__(self, coi: dict, fn_lut: Callable):
        """Initialize with column configuration and a function to get the lookup table."""
        fc.store_attr()        

    def __call__(self, tfm: Transformer):
        """Apply the remapping of detection limits across all dataframes"""
        lut = self.fn_lut()  # Retrieve the lookup table
        for grp, df in tfm.dfs.items():
            df['DL'] = df[self.coi[grp]['DL']]
            self._set_detection_limits(df, lut)

    def _set_detection_limits(self, df: pd.DataFrame, lut: dict):
        """Set detection limits based on value and uncertainty columns using specified conditions."""
        # Condition to set '=' when value and uncertainty are present and the current detection limit is not in the lookup keys
        condition_eq = (df['VALUE'].notna() & df['UNC'].notna() & ~df['DL'].isin(lut.keys()))
        df.loc[condition_eq, 'DL'] = '='

        # Set 'Not Available' for unmatched detection limits
        df.loc[~df['DL'].isin(lut.keys()), 'DL'] = 'Not Available'

        # Map existing detection limits using the lookup table
        df['DL'] = df['DL'].map(lut)
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
    RemoveAllNAValuesCB(nan_cols_to_check),
    SanitizeValueCB(),
    NormalizeUncCB(),                  
    RemapUnitCB(renaming_unit_rules, verbose=True),
    RemapDetectionLimitCB(coi_dl, lut_dl)])

tfm()
for grp in ['BIOTA', 'SEAWATER']:
    print(f"{grp}: {tfm.dfs[grp]['DL'].unique()}")
BIOTA: [2 1]
SEAWATER: [2 1]

Remap Biota species

The OSPAR dataset contains biota species information in the Species column of the biota DataFrame. To ensure consistency with MARIS standards, it is necessary to remap these species names. We will employ a similar approach to that used for standardizing nuclide names, IMFA (Inspect, Match, Fix, Apply).

We first inspect the unique Species values of the OSPAR Biota dataset:

dfs = load_data(src_dir, use_cache=True)
with pd.option_context('display.max_columns', None):
    display(get_unique_across_dfs(dfs, col_name='species', as_df=True).T)
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
index 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
value FUCUS SPP. BUCCINUM UNDATUM Brosme brosme Thunnus sp. Mixture of green, red and brown algae SALMO SALAR Gadiculus argenteus thori Sardina pilchardus Nephrops norvegicus Fucus distichus Anarhichas minor Molva molva GALEUS MELASTOMUS ANARHICHAS LUPUS Buccinum undatum Phoca vitulina Gadus morhua unknown Dasyatis pastinaca MOLVA DYPTERYGIA PLATICHTHYS FLESUS SOLEA SOLEA (S.VULGARIS) LAMINARIA DIGITATA Galeus melastomus Fucus serratus Anarhichas denticulatus Hippoglossus hippoglossus Penaeus vannamei Anguilla anguilla FUCUS spp HIPPOGLOSSUS HIPPOGLOSSUS Gadus morhua Sebastes marinus Scomber scombrus Merluccius merluccius Mallotus villosus Capros aper Homarus gammarus SCOMBER SCOMBRUS DIPTURUS BATIS SEBASTES MENTELLA Clupea harengus Limanda Limanda Sepia spp. Cerastoderma edule CHIMAERA MONSTROSA MONODONTA LINEATA Dicentrarchus labrax LIMANDA LIMANDA PLEURONECTES PLATESSA Unknown PORPHYRA UMBILICALIS PATELLA RHODYMENIA spp Glyptocephalus cynoglossus MOLVA MOLVA Boreogadus saida Trisopterus esmarki Reinhardtius hippoglossoides Boreogadus Saida Ostrea edulis Flatfish Pelvetia canaliculata BOREOGADUS SAIDA Patella sp. Ascophyllum nodosum SCOPHTHALMUS RHOMBUS MERLUCCIUS MERLUCCIUS Mytilus Edulis Gaidropsarus argenteus RHODYMENIA PSEUDOPALAMATA & PALMARIA PALMATA Cyclopterus lumpus MICROMESISTIUS POUTASSOU Fucus Vesiculosus PECTEN MAXIMUS REINHARDTIUS HIPPOGLOSSOIDES Squalus acanthias TRACHURUS TRACHURUS SPRATTUS SPRATTUS Mytilus edulis PELVETIA CANALICULATA CRASSOSTREA GIGAS Trisopterus minutus Melanogrammus aeglefinus PLUERONECTES PLATESSA MERLANGIUS MERLANGUS PATELLA VULGATA Salmo salar Cerastoderma (Cardium) Edule Limanda limanda MYTILUS EDULIS Clupea Harengus Pollachius virens Trisopterus esmarkii ASCOPHYLLUN NODOSUM Fucus sp. Merlangius merlangus Melanogrammus aeglefinus Tapes sp. Fucus vesiculosus Coryphaenoides rupestris Rhodymenia spp. Argentina silus PALMARIA PALMATA DICENTRARCHUS (MORONE) LABRAX Gadus Morhua OSTREA EDULIS Lumpenus lampretaeformis MELANOGRAMMUS AEGLEFINUS Pollachius pollachius Sebastes mentella Pleuronectiformes [order] FUCUS SPIRALIS NUCELLA LAPILLUS CERASTODERMA (CARDIUM) EDULE Anarhichas lupus Gadiculus argenteus Clupea harengus BROSME BROSME Trachurus trachurus Lycodes vahlii Platichthys flesus LITTORINA LITTOREA OSILINUS LINEATUS FUCUS VESICULOSUS Argentina sphyraena CLUPEA HARENGUS PECTINIDAE Eutrigla gurnardus Lophius piscatorius GLYPTOCEPHALUS CYNOGLOSSUS EUTRIGLA GURNARDUS Sprattus sprattus Pleuronectes platessa Sebastes viviparus MERLUCCIUS MERLUCCIUS POLLACHIUS VIRENS RAJIDAE/BATOIDEA Ostrea Edulis Sebastes Mentella Crassostrea gigas GADUS MORHUA Littorina littorea Phycis blennoides Merlangius Merlangus Raja montagui Micromesistius poutassou ASCOPHYLLUM NODOSUM HIPPOGLOSSOIDES PLATESSOIDES NaN CYCLOPTERUS LUMPUS Hippoglossoides platessoides Modiolus modiolus Gadus sp. ETMOPTERUS SPINAX RAJA DIPTURUS BATIS Hyperoplus lanceolatus Sebastes norvegicus Pleuronectes platessa SEBASTES MARINUS Solea solea (S.vulgaris) Sebastes vivipares Pecten maximus Thunnus thynnus MERLANGUIS MERLANGUIS Microstomus kitt FUCUS SERRATUS

We attempt to match the OSPAR species column to the species column of the MARIS nomenclature using the Remapper . First, we initialize the Remapper:

remapper = Remapper(provider_lut_df=get_unique_across_dfs(dfs, col_name='species', as_df=True),
                    maris_lut_fn=species_lut_path,
                    maris_col_id='species_id',
                    maris_col_name='species',
                    provider_col_to_match='value',
                    provider_col_key='value',
                    fname_cache='species_ospar.pkl')

Next, we perform the matching and generate a lookup table that includes the match score, which quantifies the degree of match accuracy:

remapper.generate_lookup_table(as_df=True)
remapper.select_match(match_score_threshold=1, verbose=True)
Processing:   0%|          | 0/167 [00:00<?, ?it/s]Processing: 100%|██████████| 167/167 [00:23<00:00,  7.21it/s]
129 entries matched the criteria, while 38 entries had a match score of 1 or higher.
matched_maris_name source_name match_score
source_key
RHODYMENIA PSEUDOPALAMATA & PALMARIA PALMATA Lomentaria catenata RHODYMENIA PSEUDOPALAMATA & PALMARIA PALMATA 31
Mixture of green, red and brown algae Mercenaria mercenaria Mixture of green, red and brown algae 26
Solea solea (S.vulgaris) Loligo vulgaris Solea solea (S.vulgaris) 12
SOLEA SOLEA (S.VULGARIS) Loligo vulgaris SOLEA SOLEA (S.VULGARIS) 12
Cerastoderma (Cardium) Edule Cerastoderma edule Cerastoderma (Cardium) Edule 10
CERASTODERMA (CARDIUM) EDULE Cerastoderma edule CERASTODERMA (CARDIUM) EDULE 10
DICENTRARCHUS (MORONE) LABRAX Dicentrarchus labrax DICENTRARCHUS (MORONE) LABRAX 9
RAJIDAE/BATOIDEA Batoidea RAJIDAE/BATOIDEA 8
Pleuronectiformes [order] Pleuronectiformes Pleuronectiformes [order] 8
PALMARIA PALMATA Alaria marginata PALMARIA PALMATA 7
Gadiculus argenteus Pampus argenteus Gadiculus argenteus 6
MONODONTA LINEATA Monodonta labio MONODONTA LINEATA 6
RAJA DIPTURUS BATIS Dipturus batis RAJA DIPTURUS BATIS 5
Rhodymenia spp. Rhodymenia Rhodymenia spp. 5
FUCUS SPP. Fucus FUCUS SPP. 5
Unknown Undaria Unknown 5
unknown Undaria unknown 5
Flatfish Lambia Flatfish 5
Sepia spp. Sepia Sepia spp. 5
Patella sp. Patella Patella sp. 4
Gadus sp. Penaeus sp. Gadus sp. 4
Thunnus sp. Thunnus Thunnus sp. 4
FUCUS spp Fucus FUCUS spp 4
Fucus sp. Fucus Fucus sp. 4
Tapes sp. Tapes Tapes sp. 4
RHODYMENIA spp Rhodymenia RHODYMENIA spp 4
MERLANGUIS MERLANGUIS Merlangius merlangus MERLANGUIS MERLANGUIS 3
PLUERONECTES PLATESSA Pleuronectes platessa PLUERONECTES PLATESSA 2
Gaidropsarus argenteus Gaidropsarus argentatus Gaidropsarus argenteus 2
Trisopterus esmarki Trisopterus esmarkii Trisopterus esmarki 1
ASCOPHYLLUN NODOSUM Ascophyllum nodosum ASCOPHYLLUN NODOSUM 1
Clupea harengus Clupea harengus Clupea harengus 1
MERLUCCIUS MERLUCCIUS Merluccius merluccius MERLUCCIUS MERLUCCIUS 1
Hippoglossus hippoglossus Hippoglossus hippoglossus Hippoglossus hippoglossus 1
Melanogrammus aeglefinus Melanogrammus aeglefinus Melanogrammus aeglefinus 1
Pleuronectes platessa Pleuronectes platessa Pleuronectes platessa 1
Gadus morhua Gadus morhua Gadus morhua 1
Sebastes vivipares Sebastes viviparus Sebastes vivipares 1

Below, we fix the entries that are not properly matched by the Remapper:

Exported source
fixes_biota_species = {
    'RHODYMENIA PSEUDOPALAMATA & PALMARIA PALMATA': NA,  # Mix of species, no direct mapping
    'Mixture of green, red and brown algae': NA,  # Mix of species, no direct mapping
    'Solea solea (S.vulgaris)': 'Solea solea',
    'SOLEA SOLEA (S.VULGARIS)': 'Solea solea',
    'RAJIDAE/BATOIDEA': NA, #Mix of species, no direct mapping
    'PALMARIA PALMATA': NA,  # Not defined
    'Unknown': NA,
    'unknown': NA,
    'Flatfish': NA,
    'Gadus sp.': NA,  # Not defined
}

We can now review the remapping results, incorporating the adjustments from the fixes_biota_species dictionary:

remapper.generate_lookup_table(fixes=fixes_biota_species)
remapper.select_match(match_score_threshold=1, verbose=True)
Processing:   0%|          | 0/167 [00:00<?, ?it/s]Processing: 100%|██████████| 167/167 [00:24<00:00,  6.88it/s]
139 entries matched the criteria, while 28 entries had a match score of 1 or higher.
matched_maris_name source_name match_score
source_key
CERASTODERMA (CARDIUM) EDULE Cerastoderma edule CERASTODERMA (CARDIUM) EDULE 10
Cerastoderma (Cardium) Edule Cerastoderma edule Cerastoderma (Cardium) Edule 10
DICENTRARCHUS (MORONE) LABRAX Dicentrarchus labrax DICENTRARCHUS (MORONE) LABRAX 9
Pleuronectiformes [order] Pleuronectiformes Pleuronectiformes [order] 8
Gadiculus argenteus Pampus argenteus Gadiculus argenteus 6
MONODONTA LINEATA Monodonta labio MONODONTA LINEATA 6
FUCUS SPP. Fucus FUCUS SPP. 5
RAJA DIPTURUS BATIS Dipturus batis RAJA DIPTURUS BATIS 5
Sepia spp. Sepia Sepia spp. 5
Rhodymenia spp. Rhodymenia Rhodymenia spp. 5
Tapes sp. Tapes Tapes sp. 4
FUCUS spp Fucus FUCUS spp 4
RHODYMENIA spp Rhodymenia RHODYMENIA spp 4
Patella sp. Patella Patella sp. 4
Thunnus sp. Thunnus Thunnus sp. 4
Fucus sp. Fucus Fucus sp. 4
MERLANGUIS MERLANGUIS Merlangius merlangus MERLANGUIS MERLANGUIS 3
PLUERONECTES PLATESSA Pleuronectes platessa PLUERONECTES PLATESSA 2
Gaidropsarus argenteus Gaidropsarus argentatus Gaidropsarus argenteus 2
Melanogrammus aeglefinus Melanogrammus aeglefinus Melanogrammus aeglefinus 1
Trisopterus esmarki Trisopterus esmarkii Trisopterus esmarki 1
Hippoglossus hippoglossus Hippoglossus hippoglossus Hippoglossus hippoglossus 1
Clupea harengus Clupea harengus Clupea harengus 1
MERLUCCIUS MERLUCCIUS Merluccius merluccius MERLUCCIUS MERLUCCIUS 1
Gadus morhua Gadus morhua Gadus morhua 1
Pleuronectes platessa Pleuronectes platessa Pleuronectes platessa 1
Sebastes vivipares Sebastes viviparus Sebastes vivipares 1
ASCOPHYLLUN NODOSUM Ascophyllum nodosum ASCOPHYLLUN NODOSUM 1

Visual inspection of the remaining imperfectly matched entries appears acceptable. We can now define a Remapper Lambda Function that instantiates the Remapper and returns the corrected lookup table.

Exported source
lut_biota = lambda: Remapper(provider_lut_df=get_unique_across_dfs(dfs, col_name='species', as_df=True),
                             maris_lut_fn=species_lut_path,
                             maris_col_id='species_id',
                             maris_col_name='species',
                             provider_col_to_match='value',
                             provider_col_key='value',
                             fname_cache='species_ospar.pkl').generate_lookup_table(fixes=fixes_biota_species, 
                                                                                    as_df=False, overwrite=False)

Putting it all together, we now apply the RemapCB callback to our data. This process adds a SPECIES column to our BIOTA dataframe, which contains the standardized species IDs.

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
    RemoveAllNAValuesCB(nan_cols_to_check),
    RemapCB(fn_lut=lut_biota, col_remap='SPECIES', col_src='species', dest_grps='BIOTA')    
    ])

tfm()['BIOTA']['SPECIES'].unique()
array([ 377,  129,   96,    0,  192,   99,   50,  378,  270,  379,  380,
        381,  382,  383,  384,  385,  244,  386,  387,  388,  389,  390,
        391,  392,  393,  394,  395,  396,  274,  397,  398,  243,  399,
        400,  401,  402,  403,  404,  405,  406,  407,  191,  139,  408,
        410,  412,  413,  272,  414,  415,  416,  417,  418,  419,  420,
        421,  422,  423,  424,  425,  426,  427,  428,  411,  429,  430,
        431,  432,  433,  434,  435,  436,  437,  438,  439,  440,  441,
        442,  443,  444,  294, 1684, 1610, 1609, 1605, 1608,   23, 1606,
        234,  556, 1701, 1752,  158,  223])

Enhance Species Data Using Biological group.

The Biological group column in the OSPAR dataset provides valuable insights related to species. We will leverage this information to enrich the SPECIES column. To achieve this, we will employ the generic RemapCB callback to create an enhanced_species column. Subsequently, this enhanced_species column will be used to further enrich the SPECIES column.

First we inspect the unique values in the biological group column.

get_unique_across_dfs(dfs, col_name='biological group', as_df=True)
index value
0 0 Seaweed
1 1 FISH
2 2 Molluscs
3 3 MOLLUSCS
4 4 seaweed
5 5 SEAWEED
6 6 Seaweeds
7 7 Fish
8 8 fish
9 9 molluscs

We will remap the biological group columns data to the species column of the MARIS nomenclature, again using a Remapper object:

remapper = Remapper(provider_lut_df=get_unique_across_dfs(dfs, col_name='biological group', as_df=True),
                    maris_lut_fn=species_lut_path,
                    maris_col_id='species_id',
                    maris_col_name='species',
                    provider_col_to_match='value',
                    provider_col_key='value',
                    fname_cache='enhance_species_ospar.pkl')

Like before we will inspect the data.

remapper.generate_lookup_table(as_df=True)
remapper.select_match(match_score_threshold=1)
Processing: 100%|██████████| 10/10 [00:02<00:00,  4.80it/s]
matched_maris_name source_name match_score
source_key
FISH Fucus FISH 4
Fish Fucus Fish 4
fish Fucus fish 4
Molluscs Mollusca Molluscs 1
MOLLUSCS Mollusca MOLLUSCS 1
Seaweeds Seaweed Seaweeds 1
molluscs Mollusca molluscs 1

We can see that some entries require manual fixes.

Exported source
fixes_enhanced_biota_species = {
    'fish': 'Pisces',
    'FISH': 'Pisces',
    'Fish': 'Pisces'    
}

Now we will apply the manual fixes to the lookup table and review.

remapper.generate_lookup_table(fixes=fixes_enhanced_biota_species)
remapper.select_match(match_score_threshold=1)
Processing:   0%|          | 0/10 [00:00<?, ?it/s]Processing: 100%|██████████| 10/10 [00:01<00:00,  7.11it/s]
matched_maris_name source_name match_score
source_key
Molluscs Mollusca Molluscs 1
MOLLUSCS Mollusca MOLLUSCS 1
Seaweeds Seaweed Seaweeds 1
molluscs Mollusca molluscs 1

Visual inspection of the remaining imperfectly matched entries appears acceptable. We can now define a Remapper Lambda Function that instantiates the Remapper and returns the corrected lookup table.

Exported source
lut_biota_enhanced = lambda: Remapper(provider_lut_df=get_unique_across_dfs(dfs, col_name='biological group', as_df=True),
                             maris_lut_fn=species_lut_path,
                             maris_col_id='species_id',
                             maris_col_name='species',
                             provider_col_to_match='value',
                             provider_col_key='value',
                             fname_cache='enhance_species_ospar.pkl').generate_lookup_table(
                                 fixes=fixes_enhanced_biota_species, 
                                 as_df=False, 
                                 overwrite=False)

Now we can apply RemapCB which results in the addition of an enhanced_species column in our BIOTA DataFrame.

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
    RemoveAllNAValuesCB(nan_cols_to_check),
    RemapCB(fn_lut=lut_biota_enhanced, col_remap='enhanced_species', col_src='biological group', dest_grps='BIOTA')    
    ])

tfm()['BIOTA']['enhanced_species'].unique()
array([ 873, 1059,  712])

With the enhanced_species column, we can enrich the SPECIES column. We will use the value in enhanced_species column in the absence of a SPECIES match if the enhanced_species column is valid.


source

EnhanceSpeciesCB

 EnhanceSpeciesCB ()

Enhance the ‘SPECIES’ column using the ‘enhanced_species’ column if conditions are met.

Exported source
class EnhanceSpeciesCB(Callback):
    """Enhance the 'SPECIES' column using the 'enhanced_species' column if conditions are met."""

    def __init__(self):
        fc.store_attr()

    def __call__(self, tfm: 'Transformer'):
        self._enhance_species(tfm.dfs['BIOTA'])

    def _enhance_species(self, df: pd.DataFrame):
        df['SPECIES'] = df.apply(
            lambda row: row['enhanced_species'] if row['SPECIES'] in [-1, 0] and pd.notnull(row['enhanced_species']) else row['SPECIES'],
            axis=1
        )
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
    RemoveAllNAValuesCB(nan_cols_to_check),
    RemapCB(fn_lut=lut_biota, col_remap='SPECIES', col_src='species', dest_grps='BIOTA'),
    RemapCB(fn_lut=lut_biota_enhanced, col_remap='enhanced_species', col_src='biological group', dest_grps='BIOTA'),
    EnhanceSpeciesCB()
    ])

tfm()['BIOTA']['SPECIES'].unique()
array([ 377,  129,   96,  712,  192,   99,   50,  378,  270,  379,  380,
        381,  382,  383,  384,  385,  244,  386,  387,  388,  389,  390,
        391,  392,  393,  394,  395,  396,  274,  397,  398,  243,  399,
        400,  401,  402,  403,  404,  405,  406,  407, 1059,  191,  139,
        408,  410,  412,  413,  272,  414,  415,  416,  417,  418,  419,
        420,  421,  422,  423,  424,  425,  426,  427,  428,  411,  429,
        430,  431,  432,  433,  434,  435,  436,  437,  438,  439,  440,
        441,  442,  443,  444,  294, 1684, 1610, 1609, 1605, 1608,   23,
       1606,  234,  556, 1701, 1752,  158,  223])

All entries are matched for the SPECIES column.

Remap Biota tissues

The OSPAR dataset includes entries where the Body Part is labeled as whole. However, the MARIS data standard requires a more specific distinction for the body_part field, differentiating between Whole animal and Whole plant. Fortunately, the OSPAR dataset provides a Biological group field that allows us to make this distinction.

To address this discrepancy and ensure compatibility with MARIS standards, we will: 1. Create a temporary column body_part_temp that combines information from both Body Part and Biological group. 2. Use this temporary column to perform the lookup using our Remapper object.

Lets create the temporary column, body_part_temp, that combines Body Part and Biological group.


source

AddBodypartTempCB

 AddBodypartTempCB ()

Add a temporary column with the body part and biological group combined.

Exported source
class AddBodypartTempCB(Callback):
    "Add a temporary column with the body part and biological group combined."    
    def __call__(self, tfm):
        tfm.dfs['BIOTA']['body_part_temp'] = (
            tfm.dfs['BIOTA']['body part'] + ' ' + 
            tfm.dfs['BIOTA']['biological group']
            ).str.strip().str.lower()
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[  
                            AddBodypartTempCB(),
                            ])
dfs_test = tfm()
dfs_test['BIOTA']['body_part_temp'].unique()
array(['whole animal molluscs', 'whole plant seaweed', 'whole fish fish',
       'flesh without bones fish', 'whole animal fish', 'muscle fish',
       'head fish', 'soft parts molluscs', 'growing tips seaweed',
       'soft parts fish', 'unknown fish', 'flesh without bone fish',
       'flesh fish', 'flesh with scales fish', 'liver fish',
       'flesh without bones seaweed', 'whole  fish',
       'flesh without bones molluscs', 'whole  seaweed',
       'whole plant seaweeds', 'whole fish', 'whole without head fish',
       'mix of muscle and whole fish without liver fish',
       'whole fisk fish', 'muscle  fish', 'cod medallion fish',
       'tail and claws fish'], dtype=object)

To align the body_part_temp column with the bodypar column in the MARIS nomenclature, we will use the Remapper. However, since the OSPAR dataset lacks a predefined lookup table for the body_part column, we must first create one. This is accomplished by extracting unique values from the body_part_temp column.

get_unique_across_dfs(dfs_test, col_name='body_part_temp', as_df=True).head()
index value
0 0 whole fish
1 1 whole without head fish
2 2 flesh with scales fish
3 3 mix of muscle and whole fish without liver fish
4 4 whole animal fish

We can now remap the body_part_temp column to the bodypar column in the MARIS nomenclature using the Remapper. Subsequently, we will inspect the results:

remapper = Remapper(provider_lut_df=get_unique_across_dfs(dfs_test, col_name='body_part_temp', as_df=True),
                    maris_lut_fn=bodyparts_lut_path,
                    maris_col_id='bodypar_id',
                    maris_col_name='bodypar',
                    provider_col_to_match='value',
                    provider_col_key='value',
                    fname_cache='tissues_ospar.pkl'
                    )

remapper.generate_lookup_table(as_df=True)
remapper.select_match(match_score_threshold=0, verbose=True)
Processing:   0%|          | 0/27 [00:00<?, ?it/s]Processing: 100%|██████████| 27/27 [00:00<00:00, 128.32it/s]
0 entries matched the criteria, while 27 entries had a match score of 0 or higher.
matched_maris_name source_name match_score
source_key
mix of muscle and whole fish without liver fish Flesh without bones mix of muscle and whole fish without liver fish 31
tail and claws fish Stomach and intestine tail and claws fish 13
cod medallion fish Old leaf cod medallion fish 13
whole without head fish Flesh without bones whole without head fish 13
whole fish fish Whole animal whole fish fish 9
whole fisk fish Whole animal whole fisk fish 9
whole animal molluscs Whole animal whole animal molluscs 9
flesh without bones molluscs Flesh without bones flesh without bones molluscs 9
whole plant seaweeds Whole plant whole plant seaweeds 9
soft parts molluscs Soft parts soft parts molluscs 9
unknown fish Growing tips unknown fish 9
flesh without bones seaweed Flesh without bones flesh without bones seaweed 8
whole plant seaweed Whole plant whole plant seaweed 8
growing tips seaweed Growing tips growing tips seaweed 8
flesh fish Shells flesh fish 7
whole seaweed Whole plant whole seaweed 7
muscle fish Muscle muscle fish 6
soft parts fish Soft parts soft parts fish 5
whole fish Whole animal whole fish 5
head fish Head head fish 5
liver fish Liver liver fish 5
whole fish Whole animal whole fish 5
muscle fish Muscle muscle fish 5
whole animal fish Whole animal whole animal fish 5
flesh with scales fish Flesh with scales flesh with scales fish 5
flesh without bones fish Flesh without bones flesh without bones fish 5
flesh without bone fish Flesh without bones flesh without bone fish 4

Many of the lookup entries are sufficient for our needs. However, for values that don’t find a match, we can use the fixes_biota_bodyparts dictionary to apply manual corrections. First we will create the dictionary.

Exported source
fixes_biota_tissues = {
    'whole seaweed' : 'Whole plant',
    'flesh fish': 'Flesh with bones', # We assume it as the category 'Flesh with bones' also exists
    'flesh fish' : 'Flesh with bones',
    'unknown fish' : NA,
    'unknown fish' : NA,
    'cod medallion fish' : NA, # TO BE DETERMINED
    'mix of muscle and whole fish without liver fish' : NA, # TO BE DETERMINED
    'whole without head fish' : NA, # TO BE DETERMINED
    'flesh without bones seaweed' : NA, # TO BE DETERMINED
    'tail and claws fish' : NA # TO BE DETERMINED
}

Now we will generate the lookup table and apply the manual fixes defined in the fixes_biota_bodyparts dictionary.

remapper.generate_lookup_table(fixes=fixes_biota_tissues)
remapper.select_match(match_score_threshold=1, verbose=True)
Processing:   0%|          | 0/27 [00:00<?, ?it/s]Processing: 100%|██████████| 27/27 [00:00<00:00, 143.57it/s]
1 entries matched the criteria, while 26 entries had a match score of 1 or higher.
matched_maris_name source_name match_score
source_key
whole fisk fish Whole animal whole fisk fish 9
soft parts molluscs Soft parts soft parts molluscs 9
whole fish fish Whole animal whole fish fish 9
flesh without bones molluscs Flesh without bones flesh without bones molluscs 9
whole animal molluscs Whole animal whole animal molluscs 9
whole plant seaweeds Whole plant whole plant seaweeds 9
growing tips seaweed Growing tips growing tips seaweed 8
whole plant seaweed Whole plant whole plant seaweed 8
whole seaweed Whole plant whole seaweed 7
muscle fish Muscle muscle fish 6
muscle fish Muscle muscle fish 5
liver fish Liver liver fish 5
flesh with scales fish Flesh with scales flesh with scales fish 5
soft parts fish Soft parts soft parts fish 5
whole fish Whole animal whole fish 5
head fish Head head fish 5
flesh without bones fish Flesh without bones flesh without bones fish 5
whole animal fish Whole animal whole animal fish 5
whole fish Whole animal whole fish 5
flesh without bone fish Flesh without bones flesh without bone fish 4
cod medallion fish (Not available) cod medallion fish 2
flesh without bones seaweed (Not available) flesh without bones seaweed 2
unknown fish (Not available) unknown fish 2
whole without head fish (Not available) whole without head fish 2
mix of muscle and whole fish without liver fish (Not available) mix of muscle and whole fish without liver fish 2
tail and claws fish (Not available) tail and claws fish 2

At this stage, the majority of entries have been successfully matched to the MARIS nomenclature. Entries that remain unmatched are appropriately marked as ‘not available’. We are now ready to proceed with the final remapping process. We will define a lambda function to instantiate the Remapper, which will then generate and return the corrected lookup table.

Exported source
lut_bodyparts = lambda: Remapper(provider_lut_df=get_unique_across_dfs(tfm.dfs, col_name='body_part_temp', as_df=True),
                               maris_lut_fn=bodyparts_lut_path,
                               maris_col_id='bodypar_id',
                               maris_col_name='bodypar',
                               provider_col_to_match='value',
                               provider_col_key='value',
                               fname_cache='tissues_ospar.pkl'
                               ).generate_lookup_table(fixes=fixes_biota_tissues, as_df=False, overwrite=False)

Putting it all together, we now apply the RemapCB callback. This process results in the addition of a BODY_PART column to our BIOTA DataFrame.

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[  
                            RemoveAllNAValuesCB(nan_cols_to_check),
                            AddBodypartTempCB(),
                            RemapCB(fn_lut=lut_bodyparts, col_remap='BODY_PART', col_src='body_part_temp' , dest_grps='BIOTA')
                            ])
tfm()
tfm.dfs['BIOTA']['BODY_PART'].unique()
array([ 1, 40, 52, 34, 13, 19, 56,  0,  4, 60, 25])

Remap biogroup

The MARIS species lookup table contains a biogroup_id column that associates each species with its corresponding biogroup. We will leverage this relationship to create a BIO_GROUP column in the BIOTA DataFrame.

Exported source
lut_biogroup_from_biota = lambda: get_lut(src_dir=species_lut_path().parent, fname=species_lut_path().name, 
                               key='species_id', value='biogroup_id')
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[ 
    RemoveAllNAValuesCB(nan_cols_to_check),                            
    RemapCB(fn_lut=lut_biota, col_remap='SPECIES', col_src='species', dest_grps='BIOTA'),
    RemapCB(fn_lut=lut_biota_enhanced, col_remap='enhanced_species', col_src='biological group', dest_grps='BIOTA'),
    EnhanceSpeciesCB(),
    RemapCB(fn_lut=lut_biogroup_from_biota, col_remap='BIO_GROUP', col_src='SPECIES', dest_grps='BIOTA')
    ])

tfm()
print(tfm.dfs['BIOTA']['BIO_GROUP'].unique())
[14 11  4 13 12  2  5]
tfm.dfs['BIOTA'].head()
id contracting party rsc sub-division station id sample id latd latm lats latdir longd ... activity or mda uncertainty unit data provider measurement comment sample comment reference comment SPECIES enhanced_species BIO_GROUP
0 1 Belgium 8 Kloosterzande-Schelde DA 17531 51 23.0 36.0 N 4 ... 0.326416 NaN Bq/kg f.w. SCK•CEN NaN NaN NaN 377 873 14
1 2 Belgium 8 Kloosterzande-Schelde DA 17534 51 23.0 36.0 N 4 ... 0.442704 NaN Bq/kg f.w. SCK•CEN NaN NaN NaN 377 873 14
2 3 Belgium 8 Kloosterzande-Schelde DA 17537 51 23.0 36.0 N 4 ... 0.412989 NaN Bq/kg f.w. SCK•CEN NaN NaN NaN 377 873 14
3 4 Belgium 8 Kloosterzande-Schelde DA 17540 51 23.0 36.0 N 4 ... 0.202768 NaN Bq/kg f.w. SCK•CEN NaN NaN NaN 377 873 14
4 5 Belgium 8 Kloosterzande-Schelde DA 17531 51 23.0 36.0 N 4 ... 0.652833 NaN Bq/kg f.w. SCK•CEN NaN NaN NaN 377 873 14

5 rows × 30 columns

Add Sample ID

  • SMP_ID is a internal unique identifier for each sample
  • SMP_ID_PROVIDER is data provided by the data provider.

source

AddSampleIdCB

 AddSampleIdCB ()

Create incremental SMP_ID and store original sample id in SMP_ID_PROVIDER

Exported source
class AddSampleIdCB(Callback):
    "Create incremental SMP_ID and store original sample id in SMP_ID_PROVIDER"
    def __call__(self, tfm):
        for _, df in tfm.dfs.items():
            df['SMP_ID'] = range(1, len(df) + 1)
            df['SMP_ID_PROVIDER'] = df['sample id'].astype(str)
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[RemoveAllNAValuesCB(nan_cols_to_check),
                            AddSampleIdCB(),
                            CompareDfsAndTfmCB(dfs)
                            ])
tfm()

tfm.dfs['BIOTA'].head()
id contracting party rsc sub-division station id sample id latd latm lats latdir longd ... value type activity or mda uncertainty unit data provider measurement comment sample comment reference comment SMP_ID SMP_ID_PROVIDER
0 1 Belgium 8 Kloosterzande-Schelde DA 17531 51 23.0 36.0 N 4 ... < 0.326416 NaN Bq/kg f.w. SCK•CEN NaN NaN NaN 1 DA 17531
1 2 Belgium 8 Kloosterzande-Schelde DA 17534 51 23.0 36.0 N 4 ... < 0.442704 NaN Bq/kg f.w. SCK•CEN NaN NaN NaN 2 DA 17534
2 3 Belgium 8 Kloosterzande-Schelde DA 17537 51 23.0 36.0 N 4 ... < 0.412989 NaN Bq/kg f.w. SCK•CEN NaN NaN NaN 3 DA 17537
3 4 Belgium 8 Kloosterzande-Schelde DA 17540 51 23.0 36.0 N 4 ... < 0.202768 NaN Bq/kg f.w. SCK•CEN NaN NaN NaN 4 DA 17540
4 5 Belgium 8 Kloosterzande-Schelde DA 17531 51 23.0 36.0 N 4 ... < 0.652833 NaN Bq/kg f.w. SCK•CEN NaN NaN NaN 5 DA 17531

5 rows × 29 columns

Add depth

The OSPAR dataset features a Sampling depth column specifically for the SEAWATER dataset. In this section, we will develop a callback to integrate the sampling depth, denoted as SMP_DEPTH, into the MARIS dataset.


source

AddDepthCB

 AddDepthCB ()

Ensure depth values are floats and add ‘SMP_DEPTH’ columns.

Exported source
class AddDepthCB(Callback):
    "Ensure depth values are floats and add 'SMP_DEPTH' columns."
    def __call__(self, tfm: Transformer):
        for grp, df in tfm.dfs.items():
            if grp == 'SEAWATER':
                if 'sampling depth' in df.columns:
                    df['SMP_DEPTH'] = df['sampling depth'].astype(float)
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
    AddDepthCB()
    ])
tfm()
for grp in tfm.dfs.keys():  
    if 'SMP_DEPTH' in tfm.dfs[grp].columns:
        print(f'{grp}:', tfm.dfs[grp][['SMP_DEPTH']].drop_duplicates())
SEAWATER:        SMP_DEPTH
0            3.0
80           2.0
81          21.0
85          31.0
87          32.0
...          ...
16022       71.0
16023       66.0
16025       81.0
16385     1660.0
16389     1500.0

[134 rows x 1 columns]

Standardize Coordinates

The OSPAR dataset offers coordinates in degrees, minutes, and seconds (DMS). The following callback is designed to convert DMS to decimal degrees.


source

ConvertLonLatCB

 ConvertLonLatCB ()

Convert Coordinates to decimal degrees (DDD.DDDDD°).

Exported source
class ConvertLonLatCB(Callback):
    """Convert Coordinates to decimal degrees (DDD.DDDDD°)."""
    def __init__(self):
        fc.store_attr()

    def __call__(self, tfm: 'Transformer'):
        for grp, df in tfm.dfs.items():
            df['LAT'] = self._convert_latitude(df)
            df['LON'] = self._convert_longitude(df)

    def _convert_latitude(self, df: pd.DataFrame) -> pd.Series:
        return np.where(
            df['latdir'].isin(['S']),
            self._dms_to_decimal(df['latd'], df['latm'], df['lats']) * -1,
            self._dms_to_decimal(df['latd'], df['latm'], df['lats'])
        )

    def _convert_longitude(self, df: pd.DataFrame) -> pd.Series:
        return np.where(
            df['longdir'].isin(['W']),
            self._dms_to_decimal(df['longd'], df['longm'], df['longs']) * -1,
            self._dms_to_decimal(df['longd'], df['longm'], df['longs'])
        )

    def _dms_to_decimal(self, degrees: pd.Series, minutes: pd.Series, seconds: pd.Series) -> pd.Series:
        return degrees + minutes / 60 + seconds / 3600
dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
    RemoveAllNAValuesCB(nan_cols_to_check),
    ConvertLonLatCB()
    ])

tfm()

with pd.option_context('display.max_columns', None):
    display(tfm.dfs['SEAWATER'][['LAT','latd', 'latm', 'lats', 'LON', 'latdir', 'longd', 'longm','longs', 'longdir']])
LAT latd latm lats LON latdir longd longm longs longdir
0 51.375278 51 22.0 31.0 3.188056 N 3 11.0 17.0 E
1 51.223611 51 13.0 25.0 2.859444 N 2 51.0 34.0 E
2 51.184444 51 11.0 4.0 2.713611 N 2 42.0 49.0 E
3 51.420278 51 25.0 13.0 3.262222 N 3 15.0 44.0 E
4 51.416111 51 24.0 58.0 2.809722 N 2 48.0 35.0 E
... ... ... ... ... ... ... ... ... ... ...
19183 52.831944 52 49.0 55.0 4.615278 N 4 36.0 55.0 E
19184 51.411944 51 24.0 43.0 3.565556 N 3 33.0 56.0 E
19185 51.411944 51 24.0 43.0 3.565556 N 3 33.0 56.0 E
19186 51.411944 51 24.0 43.0 3.565556 N 3 33.0 56.0 E
19187 51.719444 51 43.0 10.0 3.493889 N 3 29.0 38.0 E

19183 rows × 10 columns

Sanitize coordinates drops a row when both longitude & latitude equal 0 or data contains unrealistic longitude & latitude values. Converts longitude & latitude , separator to . separator.”

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
                            ConvertLonLatCB(),
                            SanitizeLonLatCB(),
                            CompareDfsAndTfmCB(dfs)
                            ])

tfm()

display(Markdown("<b> Row Count Comparison Before and After Transformation:</b>"))
with pd.option_context('display.max_rows', None):
    display(pd.DataFrame.from_dict(tfm.compare_stats))

with pd.option_context('display.max_columns', None):
    display(tfm.dfs['SEAWATER'][['LAT','LON']])

Row Count Comparison Before and After Transformation:

BIOTA SEAWATER
Original row count (dfs) 15951 19193
Transformed row count (tfm.dfs) 15951 19193
Rows removed from original (tfm.dfs_removed) 0 0
Rows created in transformed (tfm.dfs_created) 0 0
LAT LON
0 51.375278 3.188056
1 51.223611 2.859444
2 51.184444 2.713611
3 51.420278 3.262222
4 51.416111 2.809722
... ... ...
19188 53.600000 -5.933333
19189 53.733333 -5.416667
19190 53.650000 -5.233333
19191 53.883333 -5.550000
19192 53.866667 -5.883333

19193 rows × 2 columns

Add Station


source

AddStationCB

 AddStationCB ()

Ensure station values are floats and add ‘STATION’ columns.

Exported source
class AddStationCB(Callback):
    "Ensure station values are floats and add 'STATION' columns."
    def __call__(self, tfm: Transformer):
        for grp, df in tfm.dfs.items():
            df['STATION'] = df['station id'].astype(str)

Review all callbacks

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
                            RemoveAllNAValuesCB(nan_cols_to_check),
                            LowerStripNameCB(col_src='nuclide', col_dst='nuclide'),
                            RemapNuclideNameCB(lut_nuclides, col_name='nuclide'),
                            ParseTimeCB(),
                            EncodeTimeCB(),
                            SanitizeValueCB(),
                            NormalizeUncCB(),
                            RemapUnitCB(renaming_unit_rules),
                            RemapDetectionLimitCB(coi_dl, lut_dl),
                            RemapCB(fn_lut=lut_biota, col_remap='SPECIES', col_src='species', dest_grps='BIOTA'),    
                            RemapCB(fn_lut=lut_biota_enhanced, col_remap='enhanced_species', col_src='biological group', dest_grps='BIOTA'),    
                            EnhanceSpeciesCB(),
                            AddBodypartTempCB(),
                            RemapCB(fn_lut=lut_bodyparts, col_remap='BODY_PART', col_src='body_part_temp' , dest_grps='BIOTA'),
                            AddSampleIdCB(),
                            AddDepthCB(),    
                            ConvertLonLatCB(),
                            SanitizeLonLatCB(),
                            AddStationCB(),
                            CompareDfsAndTfmCB(dfs)
                            ])

tfm()
print(pd.DataFrame.from_dict(tfm.compare_stats) , '\n')
Processing: 100%|██████████| 12/12 [00:00<00:00, 55.39it/s]
                                               BIOTA  SEAWATER
Original row count (dfs)                       15951     19193
Transformed row count (tfm.dfs)                15951     19183
Rows removed from original (tfm.dfs_removed)       0        10
Rows created in transformed (tfm.dfs_created)      0         0 

tfm.dfs['SEAWATER'].head()
id contracting party rsc sub-division station id sample id latd latm lats latdir longd ... VALUE UNC UNIT DL SMP_ID SMP_ID_PROVIDER SMP_DEPTH LAT LON STATION
0 1 Belgium 8.0 Belgica-W01 WNZ 01 51 22.0 31.0 N 3 ... 0.20 NaN 1 2 1 WNZ 01 3.0 51.375278 3.188056 Belgica-W01
1 2 Belgium 8.0 Belgica-W02 WNZ 02 51 13.0 25.0 N 2 ... 0.27 NaN 1 2 2 WNZ 02 3.0 51.223611 2.859444 Belgica-W02
2 3 Belgium 8.0 Belgica-W03 WNZ 03 51 11.0 4.0 N 2 ... 0.26 NaN 1 2 3 WNZ 03 3.0 51.184444 2.713611 Belgica-W03
3 4 Belgium 8.0 Belgica-W04 WNZ 04 51 25.0 13.0 N 3 ... 0.25 NaN 1 2 4 WNZ 04 3.0 51.420278 3.262222 Belgica-W04
4 5 Belgium 8.0 Belgica-W05 WNZ 05 51 24.0 58.0 N 2 ... 0.20 NaN 1 2 5 WNZ 05 3.0 51.416111 2.809722 Belgica-W05

5 rows × 37 columns

Example change logs

Review the change logs for the netcdf encoding.

dfs = load_data(src_dir, use_cache=True)
tfm = Transformer(dfs, cbs=[
                            RemoveAllNAValuesCB(nan_cols_to_check), 
                            LowerStripNameCB(col_src='nuclide', col_dst='nuclide'),
                            RemapNuclideNameCB(lut_nuclides, col_name='nuclide'),
                            ParseTimeCB(),
                            EncodeTimeCB(),
                            SanitizeValueCB(),
                            NormalizeUncCB(),
                            RemapUnitCB(renaming_unit_rules),
                            RemapDetectionLimitCB(coi_dl, lut_dl),
                            RemapCB(fn_lut=lut_biota, col_remap='SPECIES', col_src='species', dest_grps='BIOTA'),    
                            RemapCB(fn_lut=lut_biota_enhanced, col_remap='enhanced_species', col_src='biological group', dest_grps='BIOTA'),    
                            EnhanceSpeciesCB(),
                            AddBodypartTempCB(),
                            RemapCB(fn_lut=lut_bodyparts, col_remap='BODY_PART', col_src='body_part_temp' , dest_grps='BIOTA'),
                            AddSampleIdCB(),
                            AddDepthCB(),    
                            ConvertLonLatCB(),
                            SanitizeLonLatCB(),
                            AddStationCB(),
                            ])

# Transform
tfm()
# Check transformation logs
tfm.logs
Processing: 100%|██████████| 12/12 [00:00<00:00, 56.55it/s]
['Remove rows with all NA values in specified columns.',
 "Convert 'nuclide' column values to lowercase, strip spaces, and store in 'nuclide' column.",
 'Remap data provider nuclide names to standardized MARIS nuclide names.',
 'Parse the time format in the dataframe and check for inconsistencies.',
 'Encode time as seconds since epoch.',
 'Sanitize value by removing blank entries and populating `value` column.',
 'Normalize uncertainty values in DataFrames.',
 "Callback to update DataFrame 'UNIT' columns based on a lookup table.",
 'Remap detection limit values to MARIS format using a lookup table.',
 "Remap values from 'species' to 'SPECIES' for groups: BIOTA.",
 "Remap values from 'biological group' to 'enhanced_species' for groups: BIOTA.",
 "Enhance the 'SPECIES' column using the 'enhanced_species' column if conditions are met.",
 'Add a temporary column with the body part and biological group combined.',
 "Remap values from 'body_part_temp' to 'BODY_PART' for groups: BIOTA.",
 'Create incremental SMP_ID and store original sample id in SMP_ID_PROVIDER',
 "Ensure depth values are floats and add 'SMP_DEPTH' columns.",
 'Convert Coordinates to decimal degrees (DDD.DDDDD°).',
 'Drop rows with invalid longitude & latitude values. Convert `,` separator to `.` separator.',
 "Ensure station values are floats and add 'STATION' columns."]

Feed global attributes


source

get_attrs

 get_attrs (tfm:marisco.callbacks.Transformer, zotero_key:str,
            kw:list=['oceanography', 'Earth Science > Oceans > Ocean
            Chemistry> Radionuclides', 'Earth Science > Human Dimensions >
            Environmental Impacts > Nuclear Radiation Exposure', 'Earth
            Science > Oceans > Ocean Chemistry > Ocean Tracers, Earth
            Science > Oceans > Marine Sediments', 'Earth Science > Oceans
            > Ocean Chemistry, Earth Science > Oceans > Sea Ice >
            Isotopes', 'Earth Science > Oceans > Water Quality > Ocean
            Contaminants', 'Earth Science > Biological Classification >
            Animals/Vertebrates > Fish', 'Earth Science > Biosphere >
            Ecosystems > Marine Ecosystems', 'Earth Science > Biological
            Classification > Animals/Invertebrates > Mollusks', 'Earth
            Science > Biological Classification > Animals/Invertebrates >
            Arthropods > Crustaceans', 'Earth Science > Biological
            Classification > Plants > Macroalgae (Seaweeds)'])

Retrieve all global attributes.

Type Default Details
tfm Transformer Transformer object
zotero_key str Zotero dataset record key
kw list [‘oceanography’, ‘Earth Science > Oceans > Ocean Chemistry> Radionuclides’, ‘Earth Science > Human Dimensions > Environmental Impacts > Nuclear Radiation Exposure’, ‘Earth Science > Oceans > Ocean Chemistry > Ocean Tracers, Earth Science > Oceans > Marine Sediments’, ‘Earth Science > Oceans > Ocean Chemistry, Earth Science > Oceans > Sea Ice > Isotopes’, ‘Earth Science > Oceans > Water Quality > Ocean Contaminants’, ‘Earth Science > Biological Classification > Animals/Vertebrates > Fish’, ‘Earth Science > Biosphere > Ecosystems > Marine Ecosystems’, ‘Earth Science > Biological Classification > Animals/Invertebrates > Mollusks’, ‘Earth Science > Biological Classification > Animals/Invertebrates > Arthropods > Crustaceans’, ‘Earth Science > Biological Classification > Plants > Macroalgae (Seaweeds)’] List of keywords
Returns dict Global attributes
Exported source
kw = ['oceanography', 'Earth Science > Oceans > Ocean Chemistry> Radionuclides',
      'Earth Science > Human Dimensions > Environmental Impacts > Nuclear Radiation Exposure',
      'Earth Science > Oceans > Ocean Chemistry > Ocean Tracers, Earth Science > Oceans > Marine Sediments',
      'Earth Science > Oceans > Ocean Chemistry, Earth Science > Oceans > Sea Ice > Isotopes',
      'Earth Science > Oceans > Water Quality > Ocean Contaminants',
      'Earth Science > Biological Classification > Animals/Vertebrates > Fish',
      'Earth Science > Biosphere > Ecosystems > Marine Ecosystems',
      'Earth Science > Biological Classification > Animals/Invertebrates > Mollusks',
      'Earth Science > Biological Classification > Animals/Invertebrates > Arthropods > Crustaceans',
      'Earth Science > Biological Classification > Plants > Macroalgae (Seaweeds)']
Exported source
def get_attrs(
    tfm: Transformer, # Transformer object
    zotero_key: str, # Zotero dataset record key
    kw: list = kw # List of keywords
    ) -> dict: # Global attributes
    "Retrieve all global attributes."
    return GlobAttrsFeeder(tfm.dfs, cbs=[
        BboxCB(),
        DepthRangeCB(),
        TimeRangeCB(),
        ZoteroCB(zotero_key, cfg=cfg()),
        KeyValuePairCB('keywords', ', '.join(kw)),
        KeyValuePairCB('publisher_postprocess_logs', ', '.join(tfm.logs))
        ])()
get_attrs(tfm, zotero_key=zotero_key, kw=kw)
{'geospatial_lat_min': '49.43222222222222',
 'geospatial_lat_max': '81.26805555555555',
 'geospatial_lon_min': '-58.23166666666667',
 'geospatial_lon_max': '36.181666666666665',
 'geospatial_bounds': 'POLYGON ((-58.23166666666667 36.181666666666665, 49.43222222222222 36.181666666666665, 49.43222222222222 81.26805555555555, -58.23166666666667 81.26805555555555, -58.23166666666667 36.181666666666665))',
 'geospatial_vertical_max': '1850.0',
 'geospatial_vertical_min': '0.0',
 'time_coverage_start': '1995-01-01T00:00:00',
 'time_coverage_end': '2022-12-31T00:00:00',
 'id': 'LQRA4MMK',
 'title': 'OSPAR Environmental Monitoring of Radioactive Substances',
 'summary': '',
 'creator_name': '[{"creatorType": "author", "firstName": "", "lastName": "OSPAR Comission\'s Radioactive Substances Committee (RSC)"}]',
 'keywords': 'oceanography, Earth Science > Oceans > Ocean Chemistry> Radionuclides, Earth Science > Human Dimensions > Environmental Impacts > Nuclear Radiation Exposure, Earth Science > Oceans > Ocean Chemistry > Ocean Tracers, Earth Science > Oceans > Marine Sediments, Earth Science > Oceans > Ocean Chemistry, Earth Science > Oceans > Sea Ice > Isotopes, Earth Science > Oceans > Water Quality > Ocean Contaminants, Earth Science > Biological Classification > Animals/Vertebrates > Fish, Earth Science > Biosphere > Ecosystems > Marine Ecosystems, Earth Science > Biological Classification > Animals/Invertebrates > Mollusks, Earth Science > Biological Classification > Animals/Invertebrates > Arthropods > Crustaceans, Earth Science > Biological Classification > Plants > Macroalgae (Seaweeds)',
 'publisher_postprocess_logs': "Remove rows with all NA values in specified columns., Convert 'nuclide' column values to lowercase, strip spaces, and store in 'nuclide' column., Remap data provider nuclide names to standardized MARIS nuclide names., Parse the time format in the dataframe and check for inconsistencies., Encode time as seconds since epoch., Sanitize value by removing blank entries and populating `value` column., Normalize uncertainty values in DataFrames., Callback to update DataFrame 'UNIT' columns based on a lookup table., Remap detection limit values to MARIS format using a lookup table., Remap values from 'species' to 'SPECIES' for groups: BIOTA., Remap values from 'biological group' to 'enhanced_species' for groups: BIOTA., Enhance the 'SPECIES' column using the 'enhanced_species' column if conditions are met., Add a temporary column with the body part and biological group combined., Remap values from 'body_part_temp' to 'BODY_PART' for groups: BIOTA., Create incremental SMP_ID and store original sample id in SMP_ID_PROVIDER, Ensure depth values are floats and add 'SMP_DEPTH' columns., Convert Coordinates to decimal degrees (DDD.DDDDD°)., Drop rows with invalid longitude & latitude values. Convert `,` separator to `.` separator., Ensure station values are floats and add 'STATION' columns."}

Encoding NETCDF


source

encode

 encode (fname_out:str, **kwargs)

Encode data to NetCDF.

Type Details
fname_out str Output file name
kwargs VAR_KEYWORD
Returns None Additional arguments
Exported source
def encode(
    fname_out: str, # Output file name
    **kwargs # Additional arguments
    ) -> None:
    "Encode data to NetCDF."
    dfs = load_data(src_dir, use_cache=True)
    tfm = Transformer(dfs, cbs=[
                            RemoveAllNAValuesCB(nan_cols_to_check),
                            LowerStripNameCB(col_src='nuclide', col_dst='nuclide'),
                            RemapNuclideNameCB(lut_nuclides, col_name='nuclide'),
                            ParseTimeCB(),
                            EncodeTimeCB(),
                            SanitizeValueCB(),
                            NormalizeUncCB(),
                            RemapUnitCB(renaming_unit_rules),
                            RemapDetectionLimitCB(coi_dl, lut_dl),
                            RemapCB(fn_lut=lut_biota, col_remap='SPECIES', col_src='species', dest_grps='BIOTA'),    
                            RemapCB(fn_lut=lut_biota_enhanced, col_remap='enhanced_species', col_src='biological group', dest_grps='BIOTA'),    
                            EnhanceSpeciesCB(),
                            AddBodypartTempCB(),
                            RemapCB(fn_lut=lut_bodyparts, col_remap='BODY_PART', col_src='body_part_temp' , dest_grps='BIOTA'),
                            AddSampleIdCB(),
                            AddDepthCB(),    
                            ConvertLonLatCB(),
                            SanitizeLonLatCB(),
                            AddStationCB()
                                ])
    tfm()
    encoder = NetCDFEncoder(tfm.dfs, 
                            dest_fname=fname_out, 
                            global_attrs=get_attrs(tfm, zotero_key=zotero_key, kw=kw),
                            verbose=kwargs.get('verbose', False),
                           )
    encoder.encode()
encode(fname_out, verbose=False)
Processing: 100%|██████████| 12/12 [00:00<00:00, 38.63it/s]

NetCDF Review

First lets review the global attributes of the NetCDF file:

contents = ExtractNetcdfContents(fname_out)
print(contents.global_attrs)
{
    'id': 'LQRA4MMK',
    'title': 'OSPAR Environmental Monitoring of Radioactive Substances',
    'summary': '',
    'keywords': 'oceanography, Earth Science > Oceans > Ocean Chemistry> Radionuclides, Earth Science > Human 
Dimensions > Environmental Impacts > Nuclear Radiation Exposure, Earth Science > Oceans > Ocean Chemistry > Ocean 
Tracers, Earth Science > Oceans > Marine Sediments, Earth Science > Oceans > Ocean Chemistry, Earth Science > 
Oceans > Sea Ice > Isotopes, Earth Science > Oceans > Water Quality > Ocean Contaminants, Earth Science > 
Biological Classification > Animals/Vertebrates > Fish, Earth Science > Biosphere > Ecosystems > Marine Ecosystems,
Earth Science > Biological Classification > Animals/Invertebrates > Mollusks, Earth Science > Biological 
Classification > Animals/Invertebrates > Arthropods > Crustaceans, Earth Science > Biological Classification > 
Plants > Macroalgae (Seaweeds)',
    'history': 'TBD',
    'keywords_vocabulary': 'GCMD Science Keywords',
    'keywords_vocabulary_url': 'https://gcmd.earthdata.nasa.gov/static/kms/',
    'record': 'TBD',
    'featureType': 'TBD',
    'cdm_data_type': 'TBD',
    'Conventions': 'CF-1.10 ACDD-1.3',
    'publisher_name': 'Paul MCGINNITY, Iolanda OSVATH, Florence DESCROIX-COMANDUCCI',
    'publisher_email': 'p.mc-ginnity@iaea.org, i.osvath@iaea.org, F.Descroix-Comanducci@iaea.org',
    'publisher_url': 'https://maris.iaea.org',
    'publisher_institution': 'International Atomic Energy Agency - IAEA',
    'creator_name': '[{"creatorType": "author", "firstName": "", "lastName": "OSPAR Comission\'s Radioactive 
Substances Committee (RSC)"}]',
    'institution': 'TBD',
    'metadata_link': 'TBD',
    'creator_email': 'TBD',
    'creator_url': 'TBD',
    'references': 'TBD',
    'license': 'Without prejudice to the applicable Terms and Conditions 
(https://nucleus.iaea.org/Pages/Others/Disclaimer.aspx), I hereby agree that any use of the data will contain 
appropriate acknowledgement of the data source(s) and the IAEA Marine Radioactivity Information System (MARIS).',
    'comment': 'TBD',
    'geospatial_lat_min': '49.43222222222222',
    'geospatial_lon_min': '-58.23166666666667',
    'geospatial_lat_max': '81.26805555555555',
    'geospatial_lon_max': '36.181666666666665',
    'geospatial_vertical_min': '0.0',
    'geospatial_vertical_max': '1850.0',
    'geospatial_bounds': 'POLYGON ((-58.23166666666667 36.181666666666665, 49.43222222222222 36.181666666666665, 
49.43222222222222 81.26805555555555, -58.23166666666667 81.26805555555555, -58.23166666666667 
36.181666666666665))',
    'geospatial_bounds_crs': 'EPSG:4326',
    'time_coverage_start': '1995-01-01T00:00:00',
    'time_coverage_end': '2022-12-31T00:00:00',
    'local_time_zone': 'TBD',
    'date_created': 'TBD',
    'date_modified': 'TBD',
    'publisher_postprocess_logs': "Remove rows with all NA values in specified columns., Convert 'nuclide' column 
values to lowercase, strip spaces, and store in 'nuclide' column., Remap data provider nuclide names to 
standardized MARIS nuclide names., Parse the time format in the dataframe and check for inconsistencies., Encode 
time as seconds since epoch., Sanitize value by removing blank entries and populating `value` column., Normalize 
uncertainty values in DataFrames., Callback to update DataFrame 'UNIT' columns based on a lookup table., Remap 
detection limit values to MARIS format using a lookup table., Remap values from 'species' to 'SPECIES' for groups: 
BIOTA., Remap values from 'biological group' to 'enhanced_species' for groups: BIOTA., Enhance the 'SPECIES' column
using the 'enhanced_species' column if conditions are met., Add a temporary column with the body part and 
biological group combined., Remap values from 'body_part_temp' to 'BODY_PART' for groups: BIOTA., Create 
incremental SMP_ID and store original sample id in SMP_ID_PROVIDER, Ensure depth values are floats and add 
'SMP_DEPTH' columns., Convert Coordinates to decimal degrees (DDD.DDDDD°)., Drop rows with invalid longitude & 
latitude values. Convert `,` separator to `.` separator., Ensure station values are floats and add 'STATION' 
columns."
}

Review the publisher_postprocess_logs.

print(contents.global_attrs['publisher_postprocess_logs'])
Remove rows with all NA values in specified columns., Convert 'nuclide' column values to lowercase, strip spaces, 
and store in 'nuclide' column., Remap data provider nuclide names to standardized MARIS nuclide names., Parse the 
time format in the dataframe and check for inconsistencies., Encode time as seconds since epoch., Sanitize value by
removing blank entries and populating `value` column., Normalize uncertainty values in DataFrames., Callback to 
update DataFrame 'UNIT' columns based on a lookup table., Remap detection limit values to MARIS format using a 
lookup table., Remap values from 'species' to 'SPECIES' for groups: BIOTA., Remap values from 'biological group' to
'enhanced_species' for groups: BIOTA., Enhance the 'SPECIES' column using the 'enhanced_species' column if 
conditions are met., Add a temporary column with the body part and biological group combined., Remap values from 
'body_part_temp' to 'BODY_PART' for groups: BIOTA., Create incremental SMP_ID and store original sample id in 
SMP_ID_PROVIDER, Ensure depth values are floats and add 'SMP_DEPTH' columns., Convert Coordinates to decimal 
degrees (DDD.DDDDD°)., Drop rows with invalid longitude & latitude values. Convert `,` separator to `.` separator.,
Ensure station values are floats and add 'STATION' columns.

Lets review the data of the NetCDF file:

dfs = contents.dfs
dfs
{'BIOTA':       SMP_ID_PROVIDER        LON        LAT        TIME  \
 0            DA 17531   4.031111  51.393333  1267574400   
 1            DA 17534   4.031111  51.393333  1276473600   
 2            DA 17537   4.031111  51.393333  1285545600   
 3            DA 17540   4.031111  51.393333  1291766400   
 4            DA 17531   4.031111  51.393333  1267574400   
 ...               ...        ...        ...         ...   
 15946             nan  12.087778  57.252499  1660003200   
 15947             nan  12.107500  57.306389  1663891200   
 15948             nan  11.245000  58.603333  1667779200   
 15949             nan  11.905278  57.302502  1663632000   
 15950             nan  12.076667  57.335278  1662076800   
 
                      STATION  SMP_ID  NUCLIDE     VALUE  UNIT       UNC  DL  \
 0      Kloosterzande-Schelde       1       33  0.326416     5       NaN   2   
 1      Kloosterzande-Schelde       2       33  0.442704     5       NaN   2   
 2      Kloosterzande-Schelde       3       33  0.412989     5       NaN   2   
 3      Kloosterzande-Schelde       4       33  0.202768     5       NaN   2   
 4      Kloosterzande-Schelde       5       53  0.652833     5       NaN   2   
 ...                      ...     ...      ...       ...   ...       ...  ..   
 15946         Ringhals (R22)   15947       33  0.384000     5  0.012096   1   
 15947         Ringhals (R23)   15948       33  0.456000     5  0.012084   1   
 15948                    SW7   15949       33  0.122000     5  0.031000   1   
 15949                   SW6a   15950       33  0.310000     5       NaN   2   
 15950         Ringhals (R25)   15951       33  0.306000     5  0.007191   1   
 
        SPECIES  BODY_PART  
 0          377          1  
 1          377          1  
 2          377          1  
 3          377          1  
 4          377          1  
 ...        ...        ...  
 15946      272         52  
 15947      272         52  
 15948      129         19  
 15949      129         19  
 15950       96         40  
 
 [15951 rows x 13 columns],
 'SEAWATER':       SMP_ID_PROVIDER       LON        LAT  SMP_DEPTH        TIME  \
 0              WNZ 01  3.188056  51.375278        3.0  1264550400   
 1              WNZ 02  2.859444  51.223610        3.0  1264550400   
 2              WNZ 03  2.713611  51.184444        3.0  1264550400   
 3              WNZ 04  3.262222  51.420277        3.0  1264550400   
 4              WNZ 05  2.809722  51.416111        3.0  1264464000   
 ...               ...       ...        ...        ...         ...   
 19178      2019010074  4.615278  52.831944        1.0  1573649640   
 19179      2019010420  3.565556  51.411945        1.0  1575977820   
 19180      2019010420  3.565556  51.411945        1.0  1575977820   
 19181      2019010420  3.565556  51.411945        1.0  1575977820   
 19182      2019010526  3.493889  51.719444        1.0  1576680180   
 
             STATION  SMP_ID  NUCLIDE     VALUE  UNIT           UNC  DL  
 0       Belgica-W01       1       33  0.200000     1           NaN   2  
 1       Belgica-W02       2       33  0.270000     1           NaN   2  
 2       Belgica-W03       3       33  0.260000     1           NaN   2  
 3       Belgica-W04       4       33  0.250000     1           NaN   2  
 4       Belgica-W05       5       33  0.200000     1           NaN   2  
 ...             ...     ...      ...       ...   ...           ...  ..  
 19178         PETT5   19179       77  0.000005     1  2.600000e-07   1  
 19179  VLISSGBISSVH   19180        1  6.152000     1  3.076000e-01   1  
 19180  VLISSGBISSVH   19181       53  0.005390     1  1.078000e-03   1  
 19181  VLISSGBISSVH   19182       54  0.001420     1  2.840000e-04   1  
 19182     SCHOUWN10   19183        1  6.078000     1  3.039000e-01   1  
 
 [19183 rows x 12 columns]}

Lets review the biota data:

nc_dfs_biota = dfs['BIOTA']
nc_dfs_biota
SMP_ID_PROVIDER LON LAT TIME STATION SMP_ID NUCLIDE VALUE UNIT UNC DL SPECIES BODY_PART
0 DA 17531 4.031111 51.393333 1267574400 Kloosterzande-Schelde 1 33 0.326416 5 NaN 2 377 1
1 DA 17534 4.031111 51.393333 1276473600 Kloosterzande-Schelde 2 33 0.442704 5 NaN 2 377 1
2 DA 17537 4.031111 51.393333 1285545600 Kloosterzande-Schelde 3 33 0.412989 5 NaN 2 377 1
3 DA 17540 4.031111 51.393333 1291766400 Kloosterzande-Schelde 4 33 0.202768 5 NaN 2 377 1
4 DA 17531 4.031111 51.393333 1267574400 Kloosterzande-Schelde 5 53 0.652833 5 NaN 2 377 1
... ... ... ... ... ... ... ... ... ... ... ... ... ...
15946 nan 12.087778 57.252499 1660003200 Ringhals (R22) 15947 33 0.384000 5 0.012096 1 272 52
15947 nan 12.107500 57.306389 1663891200 Ringhals (R23) 15948 33 0.456000 5 0.012084 1 272 52
15948 nan 11.245000 58.603333 1667779200 SW7 15949 33 0.122000 5 0.031000 1 129 19
15949 nan 11.905278 57.302502 1663632000 SW6a 15950 33 0.310000 5 NaN 2 129 19
15950 nan 12.076667 57.335278 1662076800 Ringhals (R25) 15951 33 0.306000 5 0.007191 1 96 40

15951 rows × 13 columns

Lets review the seawater data:

nc_dfs_seawater = dfs['SEAWATER']
nc_dfs_seawater
SMP_ID_PROVIDER LON LAT SMP_DEPTH TIME STATION SMP_ID NUCLIDE VALUE UNIT UNC DL
0 WNZ 01 3.188056 51.375278 3.0 1264550400 Belgica-W01 1 33 0.200000 1 NaN 2
1 WNZ 02 2.859444 51.223610 3.0 1264550400 Belgica-W02 2 33 0.270000 1 NaN 2
2 WNZ 03 2.713611 51.184444 3.0 1264550400 Belgica-W03 3 33 0.260000 1 NaN 2
3 WNZ 04 3.262222 51.420277 3.0 1264550400 Belgica-W04 4 33 0.250000 1 NaN 2
4 WNZ 05 2.809722 51.416111 3.0 1264464000 Belgica-W05 5 33 0.200000 1 NaN 2
... ... ... ... ... ... ... ... ... ... ... ... ...
19178 2019010074 4.615278 52.831944 1.0 1573649640 PETT5 19179 77 0.000005 1 2.600000e-07 1
19179 2019010420 3.565556 51.411945 1.0 1575977820 VLISSGBISSVH 19180 1 6.152000 1 3.076000e-01 1
19180 2019010420 3.565556 51.411945 1.0 1575977820 VLISSGBISSVH 19181 53 0.005390 1 1.078000e-03 1
19181 2019010420 3.565556 51.411945 1.0 1575977820 VLISSGBISSVH 19182 54 0.001420 1 2.840000e-04 1
19182 2019010526 3.493889 51.719444 1.0 1576680180 SCHOUWN10 19183 1 6.078000 1 3.039000e-01 1

19183 rows × 12 columns

Data Format Conversion

The MARIS data processing workflow involves two key steps:

  1. NetCDF to Standardized CSV Compatible with OpenRefine Pipeline
    • Convert standardized NetCDF files to CSV formats compatible with OpenRefine using the NetCDFDecoder.
    • Preserve data integrity and variable relationships.
    • Maintain standardized nomenclature and units.
  2. Database Integration
    • Process the converted CSV files using OpenRefine.
    • Apply data cleaning and standardization rules.
    • Export validated data to the MARIS master database.

This section focuses on the first step: converting NetCDF files to a format suitable for OpenRefine processing using the NetCDFDecoder class.

decode(fname_in=fname_out, verbose=True)
Saved BIOTA to ../../_data/output/191-OSPAR-2024_BIOTA.csv
Saved SEAWATER to ../../_data/output/191-OSPAR-2024_SEAWATER.csv