class OSSLLoader(DataLoader):
"Load OSSL data and filter it by spectra type and analytes of interest."
DTYPE_DICT = {
'id.layer_local_c' : 'object' ,
'id.location_olc_txt' : 'object' ,
'id.dataset.site_ascii_txt' : 'object' ,
'id.scan_local_c' : 'object' ,
'layer.texture_usda_txt' : 'object' ,
'pedon.taxa_usda_txt' : 'object' ,
'horizon.designation_usda_txt' : 'object' ,
'location.country_iso.3166_txt' : 'object' ,
'surveyor.address_utf8_txt' : 'object' ,
'efferv_usda.a479_class' : 'object' ,
'scan.mir.date.begin_iso.8601_yyyy.mm.dd' : 'object' ,
'scan.mir.date.end_iso.8601_yyyy.mm.dd' : 'object' ,
'scan.mir.model.name_utf8_txt' : 'object' ,
'scan.mir.model.code_any_txt' : 'object' ,
'scan.mir.method.optics_any_txt' : 'object' ,
'scan.mir.method.preparation_any_txt' : 'object' ,
'scan.mir.license.title_ascii_txt' : 'object' ,
'scan.mir.license.address_idn_url' : 'object' ,
'scan.mir.doi_idf_url' : 'object' ,
'scan.mir.contact.name_utf8_txt' : 'object' ,
'scan.mir.contact.email_ietf_txt' : 'object' ,
'scan.visnir.date.begin_iso.8601_yyyy.mm.dd' : 'object' ,
'scan.visnir.date.end_iso.8601_yyyy.mm.dd' : 'object' ,
'scan.visnir.model.name_utf8_txt' : 'object' ,
'scan.visnir.model.code_any_txt' : 'object' ,
'scan.visnir.method.optics_any_txt' : 'object' ,
'scan.visnir.method.preparation_any_txt' : 'object' ,
'scan.visnir.license.title_ascii_txt' : 'object' ,
'scan.visnir.license.address_idn_url' : 'object' ,
'scan.visnir.doi_idf_url' : 'object' ,
'scan.visnir.contact.name_utf8_txt' : 'object' ,
'scan.visnir.contact.email_ietf_txt' : 'object'
}
def __init__ (self ,
src: Path = Path.home() / '.lssm/data/ossl/ossl_all_L0_v1.2.csv.gz' , # Data source file name
spectra_type: str = 'visnir' , # Spectra type
cfgs: dict = None ): # Spectra type configuration
self .src = src
self .spectra_type = spectra_type
self .df = None
self .ds_name_encoder = LabelEncoder()
self .cfgs = cfgs or {
'visnir' : {'ref_col' : 'scan_visnir.1500_ref' , 'range' : [400 , 2500 ]},
'mir' : {'ref_col' : 'scan_mir.1500_abs' , 'range' : [650 , 4000 ]}
}
def _get_spectra(self ,
spectra_type: str # Spectra type
):
cols_ref = [name for name in self .df.columns if f'scan_ { spectra_type} .' in name]
X = self .df[cols_ref].values
X_names = self ._get_wavelengths(spectra_type)
lower_limit, upper_limit = self .cfgs[spectra_type]['range' ]
idxs = np.where((X_names >= lower_limit) & (X_names <= upper_limit))[0 ]
return X[:, idxs], X_names[idxs]
def _encode_dataset_names(self ):
return self .ds_name_encoder.fit_transform(self .df['dataset.code_ascii_txt' ])
def _get_wavelengths(self ,
spectra_type: str # Spectra type
):
pattern = r"scan_ {} \.(\d+)_" .format (spectra_type)
return np.array([int (re.search(pattern, name).group(1 )) for name in self .df.columns
if re.search(pattern, name)])
def load_data(self ,
analytes: str | list , # Analyte(s) of interest
) -> tuple : # Return a tuple of the form (X, y, X_names, smp_idx, ds_name, ds_label)
"Load OSSL data and filter it by spectra type and analytes of interest."
print (f'Loading data from { self . src} ...' )
self .df = pd.read_csv(self .src, dtype= self .DTYPE_DICT,
compression= 'infer' , low_memory= True )
analytes = [analytes] if isinstance (analytes, str ) else analytes
y_names = np.array(analytes)
subset = analytes + [self .cfgs[self .spectra_type]['ref_col' ]]
self .df = self .df.dropna(subset= subset, how= 'any' )
X, X_names = self ._get_spectra(self .spectra_type)
y = self .df[analytes].values
smp_indices = self .df['id.layer_uuid_txt' ].values
ds_name = self ._encode_dataset_names()
return SpectralData(
X= X,
X_names= X_names,
y= y,
y_names= y_names,
sample_indices= smp_indices,
dataset_names= ds_name,
dataset_labels= self .ds_name_encoder.classes_
)
# return X, y, X_names, smp_idx, ds_name, self.ds_name_encoder.classes_, np.array(analytes)