from soilspecdata.datasets.ossl import get_ossl
from sklearn.pipeline import Pipeline
from soilspectfm.visualization import plot_spectra
from matplotlib import pyplot as plt
from soilspectfm.visualization import plot_spectra_comparison
from soilspectfm.utils import load_toy_mir, load_toy_noisy_mir
Core API
= load_toy_mir() X, ws
Baseline corrections
SNV
SNV (center_func:Callable=<function mean at 0x7f2b71133130>, scale_func:Callable=<function std at 0x7f2b71133330>, eps:float=1e-10)
*Standard Normal Variate transformation with flexible centering and scaling.
Common centering functions:
- np.mean: Standard choice, sensitive to outliers
- np.median: Robust to outliers, slower computation
- np.min: Ensures positive values, sensitive to noise
- lambda x, **kw: 0: No centering, preserves absolute values
Common scaling functions:
- np.std: Standard choice, assumes normal distribution
- lambda x, **kw: np.sqrt(np.mean(x**2, **kw)): RMS, good for baseline variations
- scipy.stats.iqr: Robust to outliers, ignores extreme peaks
- lambda x, **kw: np.max(x, **kw) - np.min(x, **kw): Preserves relative peaks
- lambda x, **kw: np.median(np.abs(x - np.median(x, **kw)), **kw): Most robust, slower*
Type | Default | Details | |
---|---|---|---|
center_func | Callable | mean | Function to center the data |
scale_func | Callable | std | Function to scale the data |
eps | float | 1e-10 | Small value to avoid division by zero |
Exported source
class SNV(BaseEstimator, TransformerMixin):
"""
Standard Normal Variate transformation with flexible centering and scaling.
Common centering functions:
- np.mean: Standard choice, sensitive to outliers
- np.median: Robust to outliers, slower computation
- np.min: Ensures positive values, sensitive to noise
- lambda x, **kw: 0: No centering, preserves absolute values
Common scaling functions:
- np.std: Standard choice, assumes normal distribution
- lambda x, **kw: np.sqrt(np.mean(x**2, **kw)): RMS, good for baseline variations
- scipy.stats.iqr: Robust to outliers, ignores extreme peaks
- lambda x, **kw: np.max(x, **kw) - np.min(x, **kw): Preserves relative peaks
- lambda x, **kw: np.median(np.abs(x - np.median(x, **kw)), **kw): Most robust, slower
"""
def __init__(self,
=np.mean, # Function to center the data
center_func: Callable=np.std, # Function to scale the data
scale_func: Callablefloat=1e-10 # Small value to avoid division by zero
eps:
):
store_attr()def fit(self, X, y=None): return self
def transform(self,
# Spectral data to be transformed
X: np.ndarray -> np.ndarray: # Transformed spectra
) = self.center_func(X, axis=1, keepdims=True)
center = self.scale_func(X - center, axis=1, keepdims=True) + self.eps
scale return (X - center) / scale
= SNV().fit_transform(X) X_tfm
plot_spectra_comparison(
X,
SNV().fit_transform(X),
ws,='Raw Spectra',
raw_title='Transformed Spectra'
transformed_title; )
MSC
MSC (reference_method:Union[str,numpy.ndarray]='mean', n_jobs:Optional[int]=None)
Multiplicative Scatter Correction with fastai-style implementation
Type | Default | Details | |
---|---|---|---|
reference_method | Union | mean | Method to compute reference spectrum (‘mean’/‘median’) or custom reference spectrum |
n_jobs | Optional | None | Number of parallel jobs to run. None means using all processors |
Exported source
class MSC(BaseEstimator, TransformerMixin):
"Multiplicative Scatter Correction with fastai-style implementation"
def __init__(self,
str, np.ndarray] = 'mean', # Method to compute reference spectrum ('mean'/'median') or custom reference spectrum
reference_method: Union[int] = None # Number of parallel jobs to run. None means using all processors
n_jobs: Optional[
):
store_attr()self.reference_ = None
def _compute_reference(self, x: np.ndarray):
"Compute reference spectrum from array using specified method"
if isinstance(self.reference_method, str):
assert self.reference_method in ['mean', 'median'], "reference_method must be 'mean' or 'median'"
return np.mean(x, axis=0) if self.reference_method == 'mean' else np.median(x, axis=0)
return np.array(self.reference_method)
def fit(self, X: np.ndarray, y=None):
"Compute the reference spectrum"
self.reference_ = self._compute_reference(X)
return self
def _transform_single(self,
# Spectral data to be transformed
x: np.ndarray -> np.ndarray: # Transformed spectra
) "Transform a single spectrum"
= np.polyfit(self.reference_, x, deg=1)
coef return (x - coef[1]) / coef[0]
def transform(self,
# Spectral data to be transformed
X: np.ndarray -> np.ndarray: # Transformed spectra
) "Apply MSC to the spectra"
if self.reference_ is None: raise ValueError("MSC not fitted. Call 'fit' first.")
return np.array(parallel(self._transform_single, X, n_workers=self.n_jobs))
= MSC(reference_method='median').fit_transform(X) X_tfm
plot_spectra_comparison(
X,
MSC().fit_transform(X),
ws,='Raw Spectra',
raw_title='MSC Transformed Spectra'
transformed_title; )
References: - https://eigenvector.com/wp-content/uploads/2020/01/RobustFittingtoBasisFunctionsIII.pdf - https://nirpyresearch.com/two-methods-baseline-correction-spectral-data/ - https://diposit.ub.edu/dspace/bitstream/2445/188026/1/2014_IEEE_Adaptive_MarcoS_postprint.pdf
class ALS(BaseEstimator, TransformerMixin):
"Asymmetric least squares detrending"
pass
Derivatives
TakeDerivative
TakeDerivative (window_length=11, polyorder=1, deriv=1)
Creates scikit-learn derivation + savitsky-golay smoothing custom transformer
Type | Default | Details | |
---|---|---|---|
window_length | int | 11 | Window length for the savgol filter |
polyorder | int | 1 | Polynomial order for the savgol filter |
deriv | int | 1 | Derivation degree |
Exported source
class TakeDerivative(BaseEstimator, TransformerMixin):
"Creates scikit-learn derivation + savitsky-golay smoothing custom transformer"
def __init__(self,
=11, # Window length for the savgol filter
window_length=1, # Polynomial order for the savgol filter
polyorder=1 # Derivation degree
deriv
):self.window_length = window_length
self.polyorder = polyorder
self.deriv = deriv
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
return savgol_filter(X, self.window_length, self.polyorder, self.deriv)
How to use the TakeDerivative transformer in combination with the SNV transformer?
= Pipeline([('snv', SNV()),
pipe 'deriv', TakeDerivative(deriv=1))
(
])
= pipe.fit_transform(X) X_tfm
plot_spectra_comparison(
X,
MSC().fit_transform(X),
ws,='Raw Spectra',
raw_title='SNV + Derivative (1st order) Transformed Spectra'
transformed_title; )
Smoothing
WaveletDenoise
WaveletDenoise (wavelet:str='db6', level:Optional[int]=None, threshold_mode:str='soft')
Wavelet denoising transformer compatible with scikit-learn.
Type | Default | Details | |
---|---|---|---|
wavelet | str | db6 | Wavelet to use for decomposition |
level | Optional | None | Decomposition level. If None, maximum level is used |
threshold_mode | str | soft | Thresholding mode (‘soft’/‘hard’) |
Exported source
class WaveletDenoise(BaseEstimator, TransformerMixin):
"Wavelet denoising transformer compatible with scikit-learn."
def __init__(self,
str='db6', # Wavelet to use for decomposition
wavelet:int]=None, # Decomposition level. If None, maximum level is used
level:Optional[str='soft' # Thresholding mode ('soft'/'hard')
threshold_mode:
):
store_attr()
def _denoise_single(self, spectrum):
"Denoise a single spectrum"
# If level is None, calculate maximum possible level
if self.level is None:
self.level_ = pywt.dwt_max_level(len(spectrum),
self.wavelet).dec_len)
pywt.Wavelet(else:
self.level_ = self.level
= pywt.wavedec(spectrum, self.wavelet, level=self.level_)
coeffs
# Calculate threshold using MAD estimator
= np.concatenate([c for c in coeffs[1:]])
detail_coeffs = np.median(np.abs(detail_coeffs)) / 0.6745
sigma = sigma * np.sqrt(2 * np.log(len(spectrum)))
threshold
# Apply threshold to detail coefficients
= list(coeffs)
new_coeffs for i in range(1, len(coeffs)):
= pywt.threshold(coeffs[i],
new_coeffs[i] * (1/2**((self.level_-i)/2)),
threshold =self.threshold_mode)
mode
= pywt.waverec(new_coeffs, self.wavelet)
denoised return denoised[:len(spectrum)]
def fit(self, X, y=None):
"Fit the transformer (no-op)"
return self
def transform(self, X):
"Apply wavelet denoising to spectra."
= np.asarray(X)
X = np.zeros_like(X)
X_denoised for i in range(X.shape[0]): X_denoised[i] = self._denoise_single(X[i])
return X_denoised
= load_toy_noisy_mir()
X, ws, _ print(f'X shape: {X.shape}, First 5 wavenumbers: {ws[:5]}')
= WaveletDenoise(wavelet='db6', level=5, threshold_mode='soft')
denoiser = denoiser.fit_transform(X)
X_denoised - X_denoised, ws, alpha=0.1, title='Raw Spectra - Wavelet Denoised'); plot_spectra(X
X shape: (48, 3315), First 5 wavenumbers: [599.91153162 600.93702142 601.96251122 602.98800102 604.01349081]
SavGolSmooth
SavGolSmooth (window_length:int=15, polyorder:int=3, deriv:int=0)
Savitzky-Golay smoothing transformer compatible with scikit-learn.
Type | Default | Details | |
---|---|---|---|
window_length | int | 15 | Window length for the savgol filter |
polyorder | int | 3 | Polynomial order for the savgol filter |
deriv | int | 0 | Derivation degree |
Exported source
class SavGolSmooth(BaseEstimator, TransformerMixin):
"Savitzky-Golay smoothing transformer compatible with scikit-learn."
def __init__(self,
int=15, # Window length for the savgol filter
window_length:int=3, # Polynomial order for the savgol filter
polyorder:int=0 # Derivation degree
deriv:
):
store_attr()
def _validate_params(self):
"Validate parameters."
if self.window_length % 2 == 0:
raise ValueError("window_length must be odd")
if self.window_length <= self.polyorder:
raise ValueError("window_length must be greater than polyorder")
if self.deriv > self.polyorder:
raise ValueError("deriv must be <= polyorder")
def fit(self,
# Spectral data to be smoothed.
X:np.ndarray,=None # Ignored
y:Optional[np.ndarray]
):"Validate parameters and fit the transformer."
self._validate_params()
return self
def transform(self,
# Spectral data to be smoothed.
X: np.ndarray -> np.ndarray: # Smoothed spectra
) "Apply Savitzky-Golay filter to spectra."
= np.asarray(X)
X = np.zeros_like(X)
X_smoothed
for i in range(X.shape[0]):
= savgol_filter(X[i],
X_smoothed[i] =self.window_length,
window_length=self.polyorder,
polyorder=self.deriv)
deriv
return X_smoothed
= SavGolSmooth(window_length=15, polyorder=3)
smoother = smoother.fit_transform(X)
X_smoothed ='Raw Spectra', transformed_title='SavGolSmooth Transformed Spectra'); plot_spectra_comparison(X, X_smoothed, ws, raw_title
Other Transformations
ToAbsorbance
ToAbsorbance (eps:float=1e-05)
Creates scikit-learn transformer to transform reflectance to absorbance
Type | Default | Details | |
---|---|---|---|
eps | float | 1e-05 | Small value to avoid log(0) |
Exported source
class ToAbsorbance(BaseEstimator, TransformerMixin):
"Creates scikit-learn transformer to transform reflectance to absorbance"
def __init__(self,
float=1e-5 # Small value to avoid log(0)
eps: self.eps = eps
): def fit(self, X, y=None): return self
def transform(self, X, y=None): return -np.log10(np.clip(X, self.eps, 1))
Resample
Resample (source_x:numpy.ndarray, target_x:numpy.ndarray, interpolation_kind:str='cubic')
Resampling transformer compatible with scikit-learn.
Type | Default | Details | |
---|---|---|---|
source_x | ndarray | Source x-axis points (wavenumbers or wavelengths) | |
target_x | ndarray | Target x-axis points (wavenumbers or wavelengths) for resampling | |
interpolation_kind | str | cubic | Type of spline interpolation to use |
Exported source
class Resample(BaseEstimator, TransformerMixin):
"Resampling transformer compatible with scikit-learn."
def __init__(self,
# Source x-axis points (wavenumbers or wavelengths)
source_x: np.ndarray, # Target x-axis points (wavenumbers or wavelengths) for resampling
target_x: np.ndarray, str='cubic' # Type of spline interpolation to use
interpolation_kind:
):
store_attr()
def fit(self,
# Spectral data to be resampled
X: np.ndarray, =None # Not used
y: np.ndarray
):"No-op in that particular case"
return self
def transform(self,
# Spectral data to be resampled
X: np.ndarray
):"Resample spectra to new x-axis points."
= np.asarray(X)
X = np.zeros((X.shape[0], len(self.target_x)))
X_transformed
for i in range(X.shape[0]):
= CubicSpline(self.source_x, X[i])
cs = cs(self.target_x)
X_transformed[i]
return X_transformed
= np.arange(600, 4001, 2) # New wavenumber axis
new_ws = Resample(source_x=ws, target_x=new_ws)
resampler = resampler.fit_transform(X)
X_resampled print(f'X_resampled.shape: {X_resampled.shape}, New wavenumbers: {new_ws[:10]}')
=False,title='Original Spectra', alpha=0.2);
plot_spectra(X, ws, ascending=False,title='Resampled Spectra', alpha=0.2, color='steelblue'); plot_spectra(X_resampled, new_ws, ascending
X_resampled.shape: (48, 1701), New wavenumbers: [600 602 604 606 608 610 612 614 616 618]
Trim
Trim (ws:numpy.ndarray, w_min:Optional[float]=None, w_max:Optional[float]=None)
Trims the spectra to the specified wavenumbers (or wavelengths) range.
Type | Default | Details | |
---|---|---|---|
ws | ndarray | Wavenumbers or wavelengths | |
w_min | Optional | None | Minimum wavenumber or wavelength |
w_max | Optional | None | Maximum wavenumber or wavelength |
Exported source
class Trim(BaseEstimator, TransformerMixin):
"Trims the spectra to the specified wavenumbers (or wavelengths) range."
def __init__(self,
# Wavenumbers or wavelengths
ws: np.ndarray, float]=None, # Minimum wavenumber or wavelength
w_min: Optional[float]=None # Maximum wavenumber or wavelength
w_max: Optional[
):
store_attr()
def fit(self,
# Spectra to be trimmed
X: np.ndarray, =None # Ignored
y: Optional[np.ndarray]
):"Store wavenumbers and compute indices for trimming"
self.mask_ = (self.ws >= (self.w_min if self.w_min else -np.inf)) & \
self.ws <= (self.w_max if self.w_max else np.inf))
(return self
def transform(self, X: np.ndarray) -> np.ndarray:
"Trim the spectra"
return X[:, self.mask_]
def get_wavenumbers(self) -> np.ndarray:
"Return the trimmed wavenumbers"
return self.ws[self.mask_]
= Trim(ws, w_min=650, w_max=4000)
trimmer = trimmer.fit_transform(X)
X_trimmed
print(f'X_trimmed.shape: {X_trimmed.shape}, Trimmed wavenumbers: {trimmer.get_wavenumbers()[:10]}')
='Trimmed Spectra', ascending=False); plot_spectra(X_trimmed, trimmer.get_wavenumbers(), title
X_trimmed.shape: (50, 1676), Trimmed wavenumbers: [650 652 654 656 658 660 662 664 666 668]