Each step is a single-responsibility function that transforms a dict of DataFrames keyed by sample group. They are composed in to_csv() below.
Read NetCDF groups
Open a standardised MARIS NetCDF file, extract each group’s variable arrays, and return them as {group_name: DataFrame}.
source
read_nc_grps
def read_nc_grps(
fname, # Path to netcdf file
):
Read a MARIS NetCDF file and return {group: DataFrame} dict.
fname_in = Path('files/nc/100_HELCOM_MORS_2024.nc' )
dfs = read_nc_grps(fname_in)
print ('groups: ' , list (dfs.keys()), ' \n ' )
print ('A single row: ' , dfs['BIOTA' ].sample(1 ).T)
groups: ['BIOTA', 'SEAWATER', 'SEDIMENT']
A single row: 6
SMP_ID_PROVIDER BCLOR1995035
LON 18.1667
LAT 54.916698
SMP_DEPTH 0.0
TIME 810864000
STATION PL2
NUCLIDE 4
VALUE 104.0
UNIT 5
UNC 7.28
DL 1
BIO_GROUP 4
SPECIES 191
BODY_PART 52
DRYWT 60.900002
WETWT 210.0
PERCENTWT 0.29
Filter columns
Keep only columns in the CSV schema. The NetCDF has internal columns (like SMP_ID) not part of the CSV output.
source
keep_csv_cols
def keep_csv_cols(
dfs:dict , # dict of DataFrames keyed by sample group
cols:list = {'AREA' : 'area' , 'BODY_PART' : 'bodypar_id' , 'BOTTOM' : 'slicedown' , 'COUNT_MET' : 'counmet_id' , 'DL' : 'detection' , 'DLV' : 'detection_lim' , 'DRYWT' : 'drywt' , 'FILT' : 'filtered' , 'LAB' : 'lab_id' , 'LAT' : 'latitude' , 'LON' : 'longitude' , 'NUCLIDE' : 'nuclide_id' , 'PERCENTWT' : 'percentwt' , 'PREP_MET' : 'prepmet_id' , 'PROFILE_ID' : 'profile_id' , 'REF_ID' : 'ref_id' , 'SAL' : 'salinity' , 'SAMP_MET' : 'sampmet_id' , 'SAMPLE_TYPE' : 'samptype_id' , 'SED_TYPE' : 'sedtype_id' , 'SMP_DEPTH' : 'sampdepth' , 'SMP_ID_PROVIDER' : 'samplabcode' , 'SPECIES' : 'species_id' , 'STATION' : 'station' , 'TAXONDB' : 'taxondb' , 'TAXONDBID' : 'taxondb_id' , 'TAXONDBURL' : 'taxondb_url' , 'TAXONNAME' : 'taxonname' , 'TAXONRANK' : 'taxonrank' , 'TAXONREPNAME' : 'taxonrepname' , 'TEMP' : 'temperatur' , 'TIME' : 'begperiod' , 'TOP' : 'sliceup' , 'TOT_DEPTH' : 'totdepth' , 'UNC' : 'uncertaint' , 'UNIT' : 'unit_id' , 'VALUE' : 'activity' , 'VOL' : 'volume' , 'WETWT' : 'wetwt' }, # columns to keep (defaults to CSV_VARS)
):
Keep only columns listed in cols.
test = {'SEAWATER' : pd.DataFrame({'SMP_ID' : [1 ], 'TIME' : [1 ], 'VALUE' : [1 ], 'EXTRA' : [1 ]})}
res = keep_csv_cols(test)
test_eq(list (res['SEAWATER' ].columns), ['TIME' , 'VALUE' ])
Decode time
NetCDF stores time as seconds since epoch. Convert to datetime.
source
decode_time
def decode_time(
dfs:dict , # dict of DataFrames keyed by sample group
):
Decode TIME from epoch seconds to datetime.
test = {'SEAWATER' : pd.DataFrame({'TIME' : [1672531200 ]})}
decode_time(test)
test_eq(test['SEAWATER' ]['TIME' ].iloc[0 ], pd.Timestamp('2023-01-01' ))
Add sample type
SQL import expects a sample type column. Each group has a fixed identifier.
source
add_sample_type
def add_sample_type(
dfs:dict , # dict of DataFrames keyed by sample group
):
Add SAMPLE_TYPE column using group ID mapping.
test = {'SEAWATER' : pd.DataFrame({'VALUE' : [1 ]}), 'BIOTA' : pd.DataFrame({'VALUE' : [2 ]})}
add_sample_type(test)
test_eq(test['SEAWATER' ]['SAMPLE_TYPE' ].iloc[0 ], 1 )
test_eq(test['BIOTA' ]['SAMPLE_TYPE' ].iloc[0 ], 2 )
Add reference ID
Optional column from Zotero / INIS archive location. Omitted if not provided.
source
add_ref_id
def add_ref_id(
dfs:dict , # dict of DataFrames keyed by sample group
ref_id:int = None , # Reference ID to add as REF_ID column
):
Add REF_ID column if ref_id is provided.
test = {'SEAWATER' : pd.DataFrame({'VALUE' : [1 ]})}
add_ref_id(test)
test_eq('REF_ID' in test['SEAWATER' ].columns, False )
test = {'SEAWATER' : pd.DataFrame({'VALUE' : [1 ]})}
add_ref_id(test, ref_id= 42 )
test_eq(test['SEAWATER' ]['REF_ID' ].iloc[0 ], 42 )
Map lookup-table columns
Convert integer enum IDs to display names. DL and FILT use the Excel name column rather than the sanitised version.
source
map_lut
def map_lut(
dfs:dict , # dict of DataFrames keyed by sample group
cols:list , # Column names to map
key:str = 'name' , # LUT key column
value:str = 'id' , # LUT value column
reverse:bool = True , # Reverse the mapping direction
):
Map columns using get_lut.
test = {'SEAWATER' : pd.DataFrame({'DL' : [1 , 2 , 3 ]})}
map_lut(test, ['DL' ])
test_eq(test['SEAWATER' ]['DL' ].tolist(), ['=' , '<' , 'ND' ])
test = {'SEAWATER' : pd.DataFrame({'FILT' : [0 , 1 , 2 ]})}
map_lut(test, ['FILT' ])
test_eq(test['SEAWATER' ]['FILT' ].tolist(), ['Not available' , 'Yes' , 'No' ])
test = {'SEAWATER' : pd.DataFrame({'VALUE' : [1 ]})}
map_lut(test, ['DL' ])
test_eq(list (test['SEAWATER' ].columns), ['VALUE' ])
Decode remaining enumerated columns
CSV_DTYPES marks decoded vs encoded columns. DL and FILT are excluded since they use a different LUT column.
source
decode_csv_vars
def decode_csv_vars(
dfs:dict , # dict of DataFrames keyed by sample group
):
Decode enumerated columns marked as state='decoded' in CSV_DTYPES.
It would be cleaner/more consistent to expect all MARIS LUT values in their encoded (integer) form during SQL import, rather than receiving a mix of decoded display names and encoded IDs.
Write CSV files
Rename columns via CSV_VARS, then write one file per group.
source
to_csv_files
def to_csv_files(
dfs:dict , # dict of DataFrames keyed by sample group
fname_in:str , # Input NetCDF file path
dest_out:str = None , # Destination path stem; defaults to fname_in stem
):
Rename columns and write one CSV per group.
test = {'SEAWATER' : pd.DataFrame({'TIME' : [1 ], 'VALUE' : [2 ]})}
fname = Path('/tmp/test_nc2csv' )
to_csv_files(test, fname)
res = pd.read_csv(f' { fname} _SEAWATER.csv' )
test_eq(list (res.columns), ['begperiod' , 'activity' ])
Path(f' { fname} _SEAWATER.csv' ).unlink()
Entry point
to_csv composes the helpers in order.
source
to_csv
def to_csv(
fname_in:str , # MARIS standard NetCDF file path
dest_out:str = None , # Destination path stem; defaults to parent of fname_in
ref_id:int = None , # Reference ID to add as REF_ID column
):
Convert MARIS standard NetCDF file to import-ready CSV files.
Usage example
Convert a MARIS standard NetCDF file to import-ready CSV files using to_csv .
fname_in = Path('files/nc/100_HELCOM_MORS_2024.nc' )
to_csv(fname_in, ref_id= 100 )
[Path('files/nc/100_HELCOM_MORS_2024_BIOTA.csv'),
Path('files/nc/100_HELCOM_MORS_2024_SEAWATER.csv'),
Path('files/nc/100_HELCOM_MORS_2024_SEDIMENT.csv')]
to_csv produces one CSV per sample type group found in the NetCDF file.
for f in sorted (Path('files/nc/' ).glob('100_HELCOM_MORS_2024_*.csv' )): print (f.name)
100_HELCOM_MORS_2024_BIOTA.csv
100_HELCOM_MORS_2024_SEAWATER.csv
100_HELCOM_MORS_2024_SEDIMENT.csv
The output CSV includes 23 columns: sample metadata, activity, detection limit info, taxon details for biota, and reference ID. Below the first row:
df = pd.read_csv('files/nc/100_HELCOM_MORS_2024_BIOTA.csv' )
print (df.head(1 ).T)
0
samplabcode BBFFG1999001
longitude 13.72
latitude 54.22
sampdepth 0.0
begperiod 929404800
station BGBODD
nuclide_id 4
activity 841.0
unit_id 4
uncertaint 58.87
detection =
BIO_GROUP 11
species_id 96
bodypar_id 54
drywt NaN
wetwt NaN
percentwt 0.1692
samptype_id 2
ref_id 100
taxonname Fucus vesiculosus
taxonrank species
taxondb Wikidata
taxondb_id Q754755
taxondb_url https://www.wikidata.org/wiki/Q754755