Various utilities to encode MARIS dataset as NetCDF, csv, … formats.

source

NetCDFEncoder

 NetCDFEncoder (dfs:dict[pandas.core.frame.DataFrame], src_fname:str,
                dest_fname:str, global_attrs:Dict, enums_xtra:Dict={},
                verbose:bool=False)

MARIS NetCDF encoder.

Type Default Details
dfs dict dict of Dataframes to encode with group name as key {‘sediment’: df_sed, …}
src_fname str File name and path to the MARIS CDL template
dest_fname str Name of output file to produce
global_attrs Dict Global attributes
enums_xtra Dict {} Enumeration types to overwrite
verbose bool False Print currently written NetCDF group and variable names
Exported source
class NetCDFEncoder:
    "MARIS NetCDF encoder."
    def __init__(self, 
                 dfs:dict[pd.DataFrame], # dict of Dataframes to encode with group name as key {'sediment': df_sed, ...}
                 src_fname:str, # File name and path to the MARIS CDL template
                 dest_fname:str, # Name of output file to produce
                 global_attrs:Dict, # Global attributes
                 enums_xtra:Dict={}, # Enumeration types to overwrite
                 verbose:bool=False, # Print currently written NetCDF group and variable names
                 ):
        store_attr()
        self.enum_types = {}
df_seawater = pd.DataFrame({
    'sample': [0, 1, 5], 
    'lon': [141, 142, 143], 
    'lat': [37.3, 38.3, 39.3], 
    'time': [1234, 1235, 1236], 
    'i131': [1, 1.5, 2],
    'i131_dl': [0, 1, 2], 
    'i131_unit': [1, 1, 2],
    'species': [134, 136, 137]
    })

df_biota = pd.DataFrame({
    'sample': [0, 1], 
    'lon': [141, 142], 
    'lat': [37.3, 38.3], 
    'time': [1234, 1235], 
    'i131': [1, 1.5],
    'i131_dl': [0, 1], 
    'i131_unit': [1, 1],
    'species': [134, 136]
    })

dfs = {'seawater': df_seawater, 'biota': df_biota}
attrs = {'id': '123', 'title': 'Test title', 'summary': 'Summary test'}
src = './files/nc/template-test.nc'
dest = './files/nc/encoding-test.nc'
enums_xtra = {
    'species_t': {'Aristeus antennatus': 134, 'Apostichopus': 136}
    }

source

NetCDFEncoder.copy_global_attributes

 NetCDFEncoder.copy_global_attributes ()

Update NetCDF template global attributes as specified by global_attrs argument.

Exported source
@patch 
def copy_global_attributes(self:NetCDFEncoder):
    "Update NetCDF template global attributes as specified by `global_attrs` argument."
    self.dest.setncatts(self.src.__dict__)
    for k, v in self.global_attrs.items(): self.dest.setncattr(k, v)

source

NetCDFEncoder.copy_dimensions

 NetCDFEncoder.copy_dimensions ()
Exported source
@patch
def copy_dimensions(self:NetCDFEncoder):
    for name, dimension in self.src.dimensions.items():
        self.dest.createDimension(name, (len(dimension) if not dimension.isunlimited() else None))

source

NetCDFEncoder.process_groups

 NetCDFEncoder.process_groups ()
Exported source
@patch
def process_groups(self:NetCDFEncoder):
    for grp_name, df in self.dfs.items():
        self.process_group(grp_name, df)

source

NetCDFEncoder.process_group

 NetCDFEncoder.process_group (group_name, df)
Exported source
@patch
def process_group(self:NetCDFEncoder, group_name, df):
    group_dest = self.dest.createGroup(group_name)
    # Set the dimensions for each group
    group_dest.createDimension(group_name, len(df.index))    
    self.copy_variables(group_name, df, group_dest)

source

NetCDFEncoder.copy_variables

 NetCDFEncoder.copy_variables (group_name, df, group_dest)
Exported source
@patch
def copy_variables(self:NetCDFEncoder, group_name, df, group_dest):
    for var_name, var_src in self.src.groups[group_name].variables.items():
        if var_name in df.reset_index().columns: 
            self.copy_variable(var_name, var_src, df, group_dest)

source

NetCDFEncoder.copy_variable

 NetCDFEncoder.copy_variable (var_name, var_src, df, group_dest)
Exported source
@patch
def copy_variable(self:NetCDFEncoder, var_name, var_src, df, group_dest):
    dtype_name = var_src.datatype.name
    enums_src = self.src.enumtypes
    if self.verbose: 
        print(80*'-')
        print(f'Group: {group_dest.name}, Variable: {var_name}')
    # If the type of the var is an enum (meaning present in the template src) then create it
    if dtype_name in enums_src: self.copy_enum_type(dtype_name) 
    self._create_and_copy_variable(var_name, var_src, df, group_dest, dtype_name)
    self.copy_variable_attributes(var_name, var_src, group_dest)

source

NetCDFEncoder.sanitize_if_enum_and_nan

 NetCDFEncoder.sanitize_if_enum_and_nan (values, fill_value=-1)
Exported source
@patch
def _create_and_copy_variable(self:NetCDFEncoder, var_name, var_src, df, group_dest, dtype_name):
    variable_type = self.enum_types.get(dtype_name, var_src.datatype)
    # Use the group_dest dimensions
    group_dest.createVariable(var_name, variable_type, group_dest.dimensions, compression='zlib', complevel=9)            
    isNotEnum = type(variable_type) != netCDF4._netCDF4.EnumType
    values = df[var_name].values
    group_dest[var_name][:] = values if isNotEnum else self.sanitize_if_enum_and_nan(values)
Exported source
@patch
def sanitize_if_enum_and_nan(self:NetCDFEncoder, values, fill_value=-1):
    values[np.isnan(values)] = int(fill_value)
    values = values.astype(int)
    return values

source

NetCDFEncoder.copy_enum_type

 NetCDFEncoder.copy_enum_type (dtype_name)
Exported source
@patch
def copy_enum_type(self:NetCDFEncoder, dtype_name):
    # if enum type not already created
    if dtype_name not in self.enum_types:
        enum_info = self.src.enumtypes[dtype_name]
        # If a subset of an enum is defined in enums_xtra (typically for the lengthy species_t)
        if enum_info.name in self.enums_xtra:
            # add "not applicable"
            enum_info.enum_dict = self.enums_xtra[enum_info.name]
            enum_info.enum_dict['Not applicable'] = -1 # TBD
        self.enum_types[dtype_name] = self.dest.createEnumType(enum_info.dtype, 
                                                               enum_info.name, 
                                                               enum_info.enum_dict)

source

NetCDFEncoder.copy_variable_attributes

 NetCDFEncoder.copy_variable_attributes (var_name, var_src, group_dest)
Exported source
@patch
def copy_variable_attributes(self:NetCDFEncoder, var_name, var_src, group_dest):
    group_dest[var_name].setncatts(var_src.__dict__)
# DEPRECATED
@patch
def cast_verbose_rf(self:NetCDFEncoder, 
                    df, 
                    col):
    """
    Try to cast df column to numeric type:
        - Silently coerce to nan if not possible
        - But log when it failed
    """
    n_before = sum(df.reset_index()[col].notna())
    df_after = pd.to_numeric(df.reset_index()[col], errors='coerce', downcast=None)
    n_after = sum(df_after.notna())
    if n_before != n_after: print(f'Failed to convert type of {col} in {n_before - n_after} occurences')
    return df_after

source

NetCDFEncoder.encode

 NetCDFEncoder.encode ()

Encode MARIS NetCDF based on template and dataframes.

Exported source
@patch
def encode(self:NetCDFEncoder):
    "Encode MARIS NetCDF based on template and dataframes."
    with Dataset(self.src_fname, format='NETCDF4') as self.src, Dataset(self.dest_fname, 'w', format='NETCDF4') as self.dest:
        self.copy_global_attributes()
        self.copy_dimensions()
        self.process_groups()
encoder = NetCDFEncoder(dfs, src_fname=src, dest_fname=dest, 
                        global_attrs=attrs, enums_xtra=enums_xtra, verbose=False)
encoder.encode()
# Test that global attributes are copied
with Dataset(dest, 'r', format='NETCDF4') as nc:
    for k, v in {'id': '123', 'title': 'Test title', 'summary': 'Summary test'}.items():
        fc.test_eq(getattr(nc, k), v)
# Test that dimension is `sample` and unlimited
with Dataset(dest, 'r', format='NETCDF4') as nc:
    fc.test_eq('sample' in nc.dimensions, True)
    fc.test_eq(nc.dimensions['sample'].isunlimited(), True)
# Test that groups are created
with Dataset(dest, 'r', format='NETCDF4') as nc:
    fc.test_eq(nc.groups.keys(), ['seawater', 'biota'])
# Test that groups are created
with Dataset(dest, 'r', format='NETCDF4') as nc:
    fc.test_eq(nc.groups.keys(), ['seawater', 'biota'])
# Test that correct variables are created in groups
with Dataset(dest, 'r', format='NETCDF4') as nc:
    fc.test_eq(nc['biota'].variables.keys(), 
               ['sample', 'lon', 'lat', 'time', 'species', 'i131', 'i131_dl', 'i131_unit'])
    
    fc.test_eq(nc['seawater'].variables.keys(), 
               ['sample', 'lon', 'lat', 'time', 'i131', 'i131_dl', 'i131_unit'])
# Test that correct variables are created in groups
with Dataset(dest, 'r', format='NETCDF4') as nc:
    print(nc.dimensions.items())
    print(nc['biota'].dimensions.items())
    print(nc['seawater'].dimensions.items())
dict_items([('sample', <class 'netCDF4._netCDF4.Dimension'> (unlimited): name = 'sample', size = 0)])
dict_items([('biota', <class 'netCDF4._netCDF4.Dimension'>: name = 'biota', size = 2)])
dict_items([('seawater', <class 'netCDF4._netCDF4.Dimension'>: name = 'seawater', size = 3)])

OpenRefine CSV encoder


source

OpenRefineCsvEncoder

 OpenRefineCsvEncoder (dfs:dict[pandas.core.frame.DataFrame],
                       dest_fname:str, ref_id=-1, verbose:bool=False)

OpenRefine CSV from NetCDF.

Type Default Details
dfs dict dict of Dataframes to encode with group name as key {‘sediment’: df_sed, …}
dest_fname str Name of output file to produce
ref_id int -1 ref_id to include
verbose bool False Print
Exported source
class OpenRefineCsvEncoder:
    "OpenRefine CSV from NetCDF."
    def __init__(self, 
                 dfs:dict[pd.DataFrame], # dict of Dataframes to encode with group name as key {'sediment': df_sed, ...}
                 dest_fname:str, # Name of output file to produce
                 ref_id = -1, # ref_id to include 
                 verbose:bool=False, # Print 
                 ):
        store_attr()

source

OpenRefineCsvEncoder.process_groups_to_csv

 OpenRefineCsvEncoder.process_groups_to_csv ()
Exported source
@patch
def process_groups_to_csv(self:OpenRefineCsvEncoder):
    for grp_name, df in self.dfs.items():
        # include ref_id
        if self.ref_id != -1:
            df['ref_id'] = self.ref_id
        self.process_group_to_csv(grp_name, df)

source

OpenRefineCsvEncoder.process_group_to_csv

 OpenRefineCsvEncoder.process_group_to_csv (group_name, df)
Exported source
@patch
def process_group_to_csv(self:OpenRefineCsvEncoder, group_name, df):
    filename, file_extension=os.path.splitext(self.dest_fname)
    path = filename + '_' + group_name + file_extension
    df.to_csv( path_or_buf= path, sep=',', index=False)

source

OpenRefineCsvEncoder.encode

 OpenRefineCsvEncoder.encode ()

Encode OpenRefine CSV based on dataframes from NetCDF.

Exported source
@patch
def encode(self:OpenRefineCsvEncoder):
    "Encode OpenRefine CSV based on dataframes from NetCDF."
    # Include ref_id
    
    # Process to csv
    self.process_groups_to_csv()
dest = '../files/csv/encoding-test.csv'

encoder = OpenRefineCsvEncoder(dfs,  dest_fname=dest)
encoder.encode()