Various utilities to encode MARIS dataset as NetCDF, csv, … formats.

NetCDF


source

NetCDFEncoder

 NetCDFEncoder (dfs:Dict[str,pandas.core.frame.DataFrame], dest_fname:str,
                global_attrs:Dict[str,str],
                fn_src_fname:Callable=<function nc_tpl_path>,
                custom_maps:Dict[str,Dict[str,int]]=None,
                verbose:bool=False)

MARIS NetCDF encoder.

Type Default Details
dfs Dict dict of Dataframes to encode with group name as key {‘sediment’: df_sed, …}
dest_fname str Name of output file to produce
global_attrs Dict Global attributes
fn_src_fname Callable nc_tpl_path Function returning file name and path to the MARIS CDL template
custom_maps Dict None Custom maps to encode
verbose bool False Print currently written NetCDF group and variable names
Exported source
class NetCDFEncoder:
    "MARIS NetCDF encoder."
    def __init__(self, 
                 dfs: Dict[str, pd.DataFrame], # dict of Dataframes to encode with group name as key {'sediment': df_sed, ...}
                 dest_fname: str, # Name of output file to produce
                 global_attrs: Dict[str, str], # Global attributes
                 fn_src_fname: Callable=nc_tpl_path, # Function returning file name and path to the MARIS CDL template
                 custom_maps: Dict[str, Dict[str, int]]= None,# Custom maps to encode
                 verbose: bool=False, # Print currently written NetCDF group and variable names
                 ):
        store_attr()
        self.src_fname = fn_src_fname()
        self.enum_dtypes = {}
        self.nc_to_cols = {v:k for k,v in NC_VARS.items()}
df_seawater = pd.DataFrame({
    'ID': [0, 1, 2],
    'SMP_ID': [1, 2, 3],
    'LON': [141, 142, 143], 
    'LAT': [37.3, 38.3, 39.3], 
    'TIME': [1234, 1235, 1236], 
    'NUCLIDE': [1, 2, 3],
    'VALUE': [0.1, 1.1, 2.1], 
    'AREA': [2374, 2379, 2401],
    })

df_biota = pd.DataFrame({
    'ID': [0, 1, 2, 3], 
    'SMP_ID': [1, 2, 3, 4],
    'LON': [141, 142, 143, 144], 
    'LAT': [37.3, 38.3, 39.3, 40.3], 
    'TIME': [1234, 1235, 1236, 1237], 
    'NUCLIDE': [1, 2, 3, 3],
    'VALUE': [0.1, 1.1, 2.1, 3.1], 
    'SPECIES': [1, 2, 3, 3]
    })

# test larger map
smp_dict = {f'SMP {x}': np.int64(x) for x in range(1, 5)}           
custom_maps = {'SEAWATER': {'SMP_ID': smp_dict}}

dfs = {'SEAWATER': df_seawater, 'BIOTA': df_biota}
attrs = {'id': '123', 'title': 'Test title', 'summary': 'Summary test'}
src = './files/nc/maris-template.nc'
dest = './files/nc/encoding-test.nc'

source

NetCDFEncoder.copy_global_attributes

 NetCDFEncoder.copy_global_attributes ()

Update NetCDF template global attributes as specified by global_attrs argument.

Exported source
@patch 
def copy_global_attributes(self:NetCDFEncoder):
    "Update NetCDF template global attributes as specified by `global_attrs` argument."
    self.dest.setncatts(self.src.__dict__)
    for k, v in self.global_attrs.items(): self.dest.setncattr(k, v)

source

NetCDFEncoder.copy_dimensions

 NetCDFEncoder.copy_dimensions (grp_dest)

Copy dimensions to root and all groups from template.

Exported source
@patch
def copy_dimensions(self:NetCDFEncoder, grp_dest):
    "Copy dimensions to root and all groups from template."
    src_dim = self.src.groups[grp_dest.name].dimensions
    for name, dim in src_dim.items():
        grp_dest.createDimension(name, (len(dim) if not dim.isunlimited() else None))

source

NetCDFEncoder.process_groups

 NetCDFEncoder.process_groups ()
Exported source
@patch
def process_groups(self:NetCDFEncoder):
    for grp_name, df in self.dfs.items():
        self.process_group(NC_GROUPS[grp_name], df)

source

NetCDFEncoder.process_group

 NetCDFEncoder.process_group (grp_name, df)
Exported source
@patch
def process_group(self:NetCDFEncoder, grp_name, df):
    grp_dest = self.dest.createGroup(grp_name)
    self.copy_dimensions(grp_dest)
    self.copy_variables(grp_name, df, grp_dest)

source

NetCDFEncoder.copy_variables

 NetCDFEncoder.copy_variables (grp_name, df, grp_dest)
Exported source
@patch
def copy_variables(self:NetCDFEncoder, grp_name, df, grp_dest):
    cols = [NC_VARS[col] for col in df.columns if col in NC_VARS]
    for var_name, var_src in self.src.groups[grp_name].variables.items():
        if var_name in cols: 
            self.copy_variable(var_name, var_src, df, grp_dest)
        if self.custom_maps:
            self.copy_custom_map(var_name, grp_dest)

source

NetCDFEncoder.copy_variable

 NetCDFEncoder.copy_variable (var_name, var_src, df, grp_dest)
Exported source
@patch
def copy_variable(self:NetCDFEncoder, var_name, var_src, df, grp_dest):
    dtype_name = var_src.datatype.name
    enums_src = self.src.enumtypes
    if self.verbose: 
        print(80*'-')
        print(f'Group: {grp_dest.name}, Variable: {var_name}')
    self._create_and_copy_variable(var_name, var_src, df, grp_dest, dtype_name)
    self.copy_variable_attributes(var_name, var_src, grp_dest)

source

NetCDFEncoder.sanitize_if_enum_and_nan

 NetCDFEncoder.sanitize_if_enum_and_nan (values, fill_value=-1)
Exported source
@patch
def _create_and_copy_variable(self:NetCDFEncoder, var_name, var_src, df, grp_dest, dtype_name):
    variable_type = self.enum_dtypes.get(dtype_name, var_src.datatype)    
    grp_dest.createVariable(var_name, variable_type, NC_DIM, compression='zlib', complevel=9)            
    isNotEnum = type(variable_type) != netCDF4._netCDF4.EnumType
    values = df[self.nc_to_cols[var_name]].values
    grp_dest[var_name][:] = values if isNotEnum else self.sanitize_if_enum_and_nan(values)
Exported source
@patch
def sanitize_if_enum_and_nan(self:NetCDFEncoder, values, fill_value=-1):
    values[np.isnan(values)] = int(fill_value)
    values = values.astype(int)
    return values
'''
#| exports - Not used in this notebook - NM/01-30-2025.
 
@patch
def copy_enum_type(self:NetCDFEncoder, dtype_name):
    # if enum type not already created
    if dtype_name not in self.enum_types:
        enum_info = self.src.enumtypes[dtype_name]
        # If a subset of an enum is defined in enums_xtra (typically for the lengthy species_t)
        if enum_info.name in self.enums_xtra:
            # add "not applicable"
            enum_info.enum_dict = self.enums_xtra[enum_info.name]
            enum_info.enum_dict['Not applicable'] = -1 # TBD
        self.enum_types[dtype_name] = self.dest.createEnumType(enum_info.dtype, 
                                                               enum_info.name, 
                                                               enum_info.enum_dict)
'''
'\n#| exports - Not used in this notebook - NM/01-30-2025.\n \n@patch\ndef copy_enum_type(self:NetCDFEncoder, dtype_name):\n    # if enum type not already created\n    if dtype_name not in self.enum_types:\n        enum_info = self.src.enumtypes[dtype_name]\n        # If a subset of an enum is defined in enums_xtra (typically for the lengthy species_t)\n        if enum_info.name in self.enums_xtra:\n            # add "not applicable"\n            enum_info.enum_dict = self.enums_xtra[enum_info.name]\n            enum_info.enum_dict[\'Not applicable\'] = -1 # TBD\n        self.enum_types[dtype_name] = self.dest.createEnumType(enum_info.dtype, \n                                                               enum_info.name, \n                                                               enum_info.enum_dict)\n'

source

NetCDFEncoder.copy_variable_attributes

 NetCDFEncoder.copy_variable_attributes (var_name, var_src, grp_dest)
Exported source
@patch
def copy_variable_attributes(self:NetCDFEncoder, var_name, var_src, grp_dest):
    grp_dest[var_name].setncatts(var_src.__dict__)

source

NetCDFEncoder.retrieve_all_cols

 NetCDFEncoder.retrieve_all_cols (dtypes={'AREA': {'name': 'area_t',
                                  'fname': 'dbo_area.xlsx', 'key':
                                  'displayName', 'value': 'areaId'},
                                  'BIO_GROUP': {'name': 'bio_group_t',
                                  'fname': 'dbo_biogroup.xlsx', 'key':
                                  'biogroup', 'value': 'biogroup_id'},
                                  'BODY_PART': {'name': 'body_part_t',
                                  'fname': 'dbo_bodypar.xlsx', 'key':
                                  'bodypar', 'value': 'bodypar_id'},
                                  'COUNT_MET': {'name': 'count_met_t',
                                  'fname': 'dbo_counmet.xlsx', 'key':
                                  'counmet', 'value': 'counmet_id'}, 'DL':
                                  {'name': 'dl_t', 'fname':
                                  'dbo_detectlimit.xlsx', 'key':
                                  'name_sanitized', 'value': 'id'},
                                  'FILT': {'name': 'filt_t', 'fname':
                                  'dbo_filtered.xlsx', 'key': 'name',
                                  'value': 'id'}, 'NUCLIDE': {'name':
                                  'nuclide_t', 'fname':
                                  'dbo_nuclide.xlsx', 'key': 'nc_name',
                                  'value': 'nuclide_id'}, 'PREP_MET':
                                  {'name': 'prep_met_t', 'fname':
                                  'dbo_prepmet.xlsx', 'key': 'prepmet',
                                  'value': 'prepmet_id'}, 'SAMP_MET':
                                  {'name': 'samp_met_t', 'fname':
                                  'dbo_sampmet.xlsx', 'key': 'sampmet',
                                  'value': 'sampmet_id'}, 'SED_TYPE':
                                  {'name': 'sed_type_t', 'fname':
                                  'dbo_sedtype.xlsx', 'key': 'sedtype',
                                  'value': 'sedtype_id'}, 'SPECIES':
                                  {'name': 'species_t', 'fname':
                                  'dbo_species_2024_11_19.xlsx', 'key':
                                  'species', 'value': 'species_id'},
                                  'UNIT': {'name': 'unit_t', 'fname':
                                  'dbo_unit.xlsx', 'key':
                                  'unit_sanitized', 'value': 'unit_id'},
                                  'LAB': {'name': 'lab_t', 'fname':
                                  'dbo_lab_cleaned.xlsx', 'key': 'lab',
                                  'value': 'lab_id'}})

Retrieve all unique columns from the dict of dataframes.

Exported source
@patch
def retrieve_all_cols(self:NetCDFEncoder, 
                      dtypes=NC_DTYPES
                      ):
    "Retrieve all unique columns from the dict of dataframes." 
    return list(set(col for df in self.dfs.values() for col in df.columns if col in dtypes.keys()))

source

NetCDFEncoder.create_enums

 NetCDFEncoder.create_enums ()
Exported source
@patch
def create_enums(self:NetCDFEncoder):
    cols = self.retrieve_all_cols()
    enums = Enums(lut_src_dir=lut_path())
    for col in cols:
        name = NC_DTYPES[col]['name']
        if self.verbose: print(f'Creating enum for {name} with values {enums.types[col]}.')
        dtype = self.dest.createEnumType(np.int64, name, enums.types[col])
        self.enum_dtypes[name] = dtype

source

NetCDFEncoder.copy_custom_map

 NetCDFEncoder.copy_custom_map (var_name, grp_dest)

Copy custom maps for variables.

Exported source
@patch
def copy_custom_map(self:NetCDFEncoder, var_name, grp_dest):
    """Copy custom maps for variables."""
    custom_maps = self.custom_maps
    # Convert group names using NC_GROUPS
    custom_maps = {NC_GROUPS[key]: value for key, value in custom_maps.items()}
    group_maps = custom_maps.get(grp_dest.name, {})
    # Convert var names using NC_VARS
    group_maps = {NC_VARS[key]: value for key, value in group_maps.items()}
    if var_name in group_maps:
        # Set the map as an attribute of the variable
        grp_dest[var_name].setncatts({f"{var_name}_map": str(group_maps[var_name])})

source

NetCDFEncoder.encode

 NetCDFEncoder.encode ()

Encode MARIS NetCDF based on template and dataframes.

Exported source
@patch
def encode(self:NetCDFEncoder):
    "Encode MARIS NetCDF based on template and dataframes."
    with Dataset(self.src_fname, format='NETCDF4') as self.src, Dataset(self.dest_fname, 'w', format='NETCDF4') as self.dest:
        self.copy_global_attributes()
        self.create_enums()
        self.process_groups()
encoder = NetCDFEncoder(dfs, 
                        dest_fname=dest, 
                        global_attrs=attrs,
                        custom_maps=custom_maps,
                        verbose=False
                        )
encoder.encode()
# Test that global attributes are copied
#with Dataset(dest, 'r', format='NETCDF4') as nc:
#        for k, v in {'id': '123', 'title': 'Test title', 'summary': 'Summary test'}.items():
#           fc.test_eq(getattr(nc, k), v)
# Test that dimension is `sample` and unlimited
# with Dataset(dest, 'r', format='NETCDF4') as nc:
#     fc.test_eq('sample' in nc.dimensions, True)
#     fc.test_eq(nc.dimensions['sample'].isunlimited(), True)
# Test that groups are created
# with Dataset(dest, 'r', format='NETCDF4') as nc:
#     fc.test_eq(nc.groups.keys(), ['seawater', 'biota'])
# Test that groups are created
# with Dataset(dest, 'r', format='NETCDF4') as nc:
#     fc.test_eq(nc.groups.keys(), ['seawater', 'biota'])
# Test that correct variables are created in groups
# with Dataset(dest, 'r', format='NETCDF4') as nc:
#     fc.test_eq(nc['biota'].variables.keys(), 
#                ['sample', 'lon', 'lat', 'time', 'species', 'i131', 'i131_dl', 'i131_unit'])
    
#     fc.test_eq(nc['seawater'].variables.keys(), 
#                ['sample', 'lon', 'lat', 'time', 'i131', 'i131_dl', 'i131_unit'])
# Test that correct variables are created in groups
# with Dataset(dest, 'r', format='NETCDF4') as nc:
#     print(nc.dimensions.items())
#     print(nc['biota'].dimensions.items())
#     print(nc['seawater'].dimensions.items())
# Test that custom maps are copied
#with Dataset(dest, 'r', format='NETCDF4') as nc:
#    print(nc['seawater'].variables.items())
#    print(nc['biota'].variables.items())
dict_items([('id', <class 'netCDF4.Variable'>
uint64 id(id)
    long_name: Measurement ID
path = /seawater
unlimited dimensions: id
current shape = (3,)
filling on, default _FillValue of 18446744073709551614 used), ('lon', <class 'netCDF4.Variable'>
float32 lon(id)
    long_name: Measurement longitude
    standard_name: longitude
    units: degrees_east
path = /seawater
unlimited dimensions: id
current shape = (3,)
filling on, default _FillValue of 9.969209968386869e+36 used), ('lat', <class 'netCDF4.Variable'>
float32 lat(id)
    long_name: Measurement latitude
    standard_name: latitude
    units: degrees_north
path = /seawater
unlimited dimensions: id
current shape = (3,)
filling on, default _FillValue of 9.969209968386869e+36 used), ('time', <class 'netCDF4.Variable'>
uint64 time(id)
    long_name: Time of measurement
    standard_name: time
    units: seconds since 1970-01-01 00:00:00.0
    time_origin: 1970-01-01 00:00:00
    time_zone: UTC
    abbreviation: Date/Time
    axis: T
    calendar: gregorian
path = /seawater
unlimited dimensions: id
current shape = (3,)
filling on, default _FillValue of 18446744073709551614 used), ('area', <class 'netCDF4.Variable'>
enum area(id)
    long_name: Marine area/region id
    standard_name: area_id
enum data type: int64
path = /seawater
unlimited dimensions: id
current shape = (3,)), ('smp_id', <class 'netCDF4.Variable'>
uint64 smp_id(id)
    long_name: Data provider sample ID
    standard_name: sample_id
    smp_id_map: {'SMP 1': 1, 'SMP 2': 2, 'SMP 3': 3, 'SMP 4': 4}
path = /seawater
unlimited dimensions: id
current shape = (3,)
filling on, default _FillValue of 18446744073709551614 used), ('nuclide', <class 'netCDF4.Variable'>
enum nuclide(id)
    long_name: Nuclide
    standard_name: nuclide
enum data type: int64
path = /seawater
unlimited dimensions: id
current shape = (3,)), ('value', <class 'netCDF4.Variable'>
float32 value(id)
    long_name: Activity
    standard_name: activity
path = /seawater
unlimited dimensions: id
current shape = (3,)
filling on, default _FillValue of 9.969209968386869e+36 used)])
dict_items([('id', <class 'netCDF4.Variable'>
uint64 id(id)
    long_name: Measurement ID
path = /biota
unlimited dimensions: id
current shape = (4,)
filling on, default _FillValue of 18446744073709551614 used), ('lon', <class 'netCDF4.Variable'>
float32 lon(id)
    long_name: Measurement longitude
    standard_name: longitude
    units: degrees_east
path = /biota
unlimited dimensions: id
current shape = (4,)
filling on, default _FillValue of 9.969209968386869e+36 used), ('lat', <class 'netCDF4.Variable'>
float32 lat(id)
    long_name: Measurement latitude
    standard_name: latitude
    units: degrees_north
path = /biota
unlimited dimensions: id
current shape = (4,)
filling on, default _FillValue of 9.969209968386869e+36 used), ('time', <class 'netCDF4.Variable'>
uint64 time(id)
    long_name: Time of measurement
    standard_name: time
    units: seconds since 1970-01-01 00:00:00.0
    time_origin: 1970-01-01 00:00:00
    time_zone: UTC
    abbreviation: Date/Time
    axis: T
    calendar: gregorian
path = /biota
unlimited dimensions: id
current shape = (4,)
filling on, default _FillValue of 18446744073709551614 used), ('smp_id', <class 'netCDF4.Variable'>
uint64 smp_id(id)
    long_name: Data provider sample ID
    standard_name: sample_id
path = /biota
unlimited dimensions: id
current shape = (4,)
filling on, default _FillValue of 18446744073709551614 used), ('nuclide', <class 'netCDF4.Variable'>
enum nuclide(id)
    long_name: Nuclide
    standard_name: nuclide
enum data type: int64
path = /biota
unlimited dimensions: id
current shape = (4,)), ('value', <class 'netCDF4.Variable'>
float32 value(id)
    long_name: Activity
    standard_name: activity
path = /biota
unlimited dimensions: id
current shape = (4,)
filling on, default _FillValue of 9.969209968386869e+36 used), ('species', <class 'netCDF4.Variable'>
enum species(id)
    long_name: Species
    standard_name: species
enum data type: int64
path = /biota
unlimited dimensions: id
current shape = (4,))])