Source code for xpectrass.utils.denoise

"""
Denoising Module for FTIR Spectral Preprocessing
=================================================

**IMPORTANT**: This module expects absorbance data (AU), not transmittance (%).
Convert transmittance to absorbance first using convert_spectra() from trans_abs.py

Features:
- Single spectrum denoising via denoise()
- Batch DataFrame processing via apply_denoising()
- Automatic column detection and sorting by wavenumber
- Performance optimized for large datasets (vectorized operations)
- Pandas and Polars DataFrame support
- Method evaluation via evaluate_denoising_methods()
- Memory-safe evaluation via evaluate_denoising_methods_safe()
- Composite scoring for method selection via find_best_denoising_method()

Memory Management:
For systems with limited RAM or large datasets, use evaluate_denoising_methods_safe()
instead of evaluate_denoising_methods(). See MEMORY_MANAGEMENT_GUIDE.md for details.

Logging:
This module uses Python's logging module for warnings and informational messages.
Configure the logger to control output:

    import logging
    logging.getLogger('utils.denoise').setLevel(logging.INFO)  # Show all messages
    logging.getLogger('utils.denoise').setLevel(logging.ERROR)  # Only errors

Available Methods:
Run denoise_method_names() to see all available denoising algorithms.
Common methods: savgol, wavelet, moving_average, gaussian, median, whittaker, lowpass
"""

from __future__ import annotations
from typing import Union, Tuple, List, Optional
import logging
import numpy as np
import pandas as pd

# Optional polars support
try:
    import polars as pl  # type: ignore
except Exception:
    pl = None  # type: ignore
from scipy import signal
from scipy.ndimage import uniform_filter1d, gaussian_filter1d
from scipy.sparse import eye
from scipy.sparse.linalg import spsolve

# Optional wavelet support
try:
    import pywt  # type: ignore
    HAS_PYWT = True
except ImportError:
    HAS_PYWT = False
    pywt = None  # type: ignore
import joblib
from tqdm import tqdm
from contextlib import contextmanager
import matplotlib.pyplot as plt

from .spectral_utils import (
    _infer_spectral_columns,
    _sort_spectral_columns
)

# Configure module logger
logger = logging.getLogger(__name__)
if not logger.handlers:
    logger.setLevel(logging.WARNING)
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter('%(levelname)s - %(message)s'))
    logger.addHandler(handler)


[docs] def denoise( intensities: Union[np.ndarray, list], wavenumbers: Optional[Union[np.ndarray, list]] = None, method: str = "savgol", **kwargs ) -> np.ndarray: """ Denoise a 1-D FTIR spectrum using various filtering methods. Parameters ---------- intensities : array-like Raw intensity values (1-D). wavenumbers : array-like, optional X-axis values (wavenumbers in cm⁻¹). If provided, validates that data is sorted in ascending order and warns if not. Ensures API consistency with other preprocessing modules (baseline, atmospheric). method : str, default "savgol" Denoising method. Options: - 'savgol': Savitzky-Golay filter (preserves peak shape) - 'wavelet': Discrete wavelet transform denoising - 'moving_average': Simple moving average - 'gaussian': Gaussian filter - 'median': Median filter (good for spike noise) - 'whittaker': Penalized least squares smoother - 'lowpass': Low-pass Butterworth filter **kwargs : method-specific parameters savgol: window_length (15), polyorder (3) wavelet: wavelet ('db4'), level (3), threshold_mode ('soft') moving_average: window (11) gaussian: sigma (2.0) median: kernel_size (5) whittaker: lam (1e4), d (2) lowpass: cutoff (0.1), order (4) Returns ------- np.ndarray Denoised intensity values. NaN values in input are preserved at their original positions; denoising is applied only to finite values. """ y = np.asarray(intensities, dtype=np.float64) if y.ndim != 1: raise ValueError("`intensities` must be a 1-D array-like object.") # Validate wavenumbers if provided if wavenumbers is not None: x = np.asarray(wavenumbers, dtype=np.float64) if x.ndim != 1: raise ValueError("`wavenumbers` must be a 1-D array-like object.") if len(x) != len(y): raise ValueError("`wavenumbers` and `intensities` must have the same length.") # Check if wavenumbers are monotonic (required for proper denoising) from .spectral_utils import _is_monotonic_strict if not _is_monotonic_strict(x): logger.warning( "Wavenumbers are not strictly monotonic. Denoising methods assume " "uniform spacing or sorted data. Results may be incorrect." ) # Handle NaN values: preserve positions but compute denoising on finite values only nan_mask = ~np.isfinite(y) has_nans = np.any(nan_mask) if has_nans: # If all values are NaN, return NaN array if np.all(nan_mask): return np.full_like(y, np.nan) # Store original array and extract finite values y_original = y.copy() y_finite = y[~nan_mask] # Need at least 2 points for denoising if len(y_finite) < 2: logger.warning( f"Insufficient finite data points ({len(y_finite)}) for denoising. " "Returning original spectrum with NaN preserved." ) return y_original # Process finite values only if NaN present y_to_process = y_finite if has_nans else y try: if method == "savgol": denoised_finite = _denoise_savgol(y_to_process, **kwargs) elif method == "wavelet": denoised_finite = _denoise_wavelet(y_to_process, **kwargs) elif method == "moving_average": denoised_finite = _denoise_moving_average(y_to_process, **kwargs) elif method == "gaussian": denoised_finite = _denoise_gaussian(y_to_process, **kwargs) elif method == "median": denoised_finite = _denoise_median(y_to_process, **kwargs) elif method == "whittaker": denoised_finite = _denoise_whittaker(y_to_process, **kwargs) elif method == "lowpass": denoised_finite = _denoise_lowpass(y_to_process, **kwargs) else: raise ValueError( f"Unknown denoising method: '{method}'. " "Valid options: savgol, wavelet, moving_average, gaussian, " "median, whittaker, lowpass" ) except Exception as e: if isinstance(e, ValueError) and "Unknown denoising method" in str(e): raise raise RuntimeError( f"Denoising failed for method '{method}'. " f"Error: {type(e).__name__}: {str(e)}. " f"Check parameter compatibility with method documentation." ) from e # Restore NaN positions if needed if has_nans: result = np.full_like(y, np.nan) result[~nan_mask] = denoised_finite return result else: return denoised_finite
[docs] def denoise_method_names() -> List[str]: """Return list of available denoising method names.""" return sorted([ "savgol", "wavelet", "moving_average", "gaussian", "median", "whittaker", "lowpass" ])
# --------------------------------------------------------------------------- # DATAFRAME-COMPATIBLE BATCH DENOISING # ---------------------------------------------------------------------------
[docs] def apply_denoising( data: Union[pd.DataFrame, "pl.DataFrame"], method: str = "savgol", label_column: str = "label", sample_id_column: str = "sample_id", exclude_columns: Optional[List[str]] = None, wn_min: Optional[float] = None, wn_max: Optional[float] = None, show_progress: bool = True, **kwargs ) -> Union[pd.DataFrame, "pl.DataFrame"]: """ Apply denoising to a DataFrame of FTIR spectra (batch processing). Works with both pandas and polars DataFrames. Each row is a sample, numerical columns are wavenumbers. Applies denoising to all samples. Parameters ---------- data : pd.DataFrame | pl.DataFrame Wide-format DataFrame where rows = samples, columns = wavenumbers. Should contain numerical columns with spectral data and optional metadata columns (e.g., 'sample', 'label'). method : str, default "savgol" Denoising method. Options: - 'savgol': Savitzky-Golay filter (preserves peak shape) - 'wavelet': Discrete wavelet transform denoising - 'moving_average': Simple moving average - 'gaussian': Gaussian filter - 'median': Median filter (good for spike noise) - 'whittaker': Penalized least squares smoother - 'lowpass': Low-pass Butterworth filter label_column : str, default "label" Name of the label/group column to exclude from denoising. exclude_columns : list[str], optional Additional column names to exclude from denoising (e.g., 'sample', 'id'). If None, automatically excludes non-numeric columns. wn_min : float, optional Minimum wavenumber bound (cm⁻¹). If None, uses 200.0 cm⁻¹ as default, or auto-expands if no columns found within default range. wn_max : float, optional Maximum wavenumber bound (cm⁻¹). If None, uses 8000.0 cm⁻¹ as default, or auto-expands if no columns found within default range. show_progress : bool, default True If True, display a progress bar during processing. **kwargs : additional parameters Method-specific parameters forwarded to the denoising algorithm: - savgol: window_length (15), polyorder (3) - wavelet: wavelet ('db4'), level (3), threshold_mode ('soft') - moving_average: window (11) - gaussian: sigma (2.0) - median: kernel_size (5) - whittaker: lam (1e4), d (2) - lowpass: cutoff (0.1), order (4) Returns ------- pd.DataFrame | pl.DataFrame Denoised DataFrame (same type as input) with spectral data denoised and metadata columns preserved. Columns are sorted by ascending wavenumber order. Examples -------- >>> # Apply Savitzky-Golay denoising to all samples >>> df_denoised = apply_denoising(df_wide, method="savgol") >>> # Use wavelet denoising with custom parameters >>> df_denoised = apply_denoising( ... df_wide, ... method="wavelet", ... wavelet="db4", ... level=3, ... threshold_mode="soft" ... ) >>> # Use Gaussian smoothing >>> df_denoised = apply_denoising( ... df_wide, ... method="gaussian", ... sigma=2.0 ... ) >>> # Works with both pandas and polars >>> df_pd_denoised = apply_denoising(df_pandas) >>> df_pl_denoised = apply_denoising(df_polars) >>> # Disable progress bar for cleaner output >>> df_denoised = apply_denoising(df_wide, show_progress=False) """ # Check for polars support is_polars = False if pl is not None: is_polars = isinstance(data, pl.DataFrame) # Convert to pandas for processing if is_polars: df = data.to_pandas() else: df = data.copy() # Prepare exclude_columns list if exclude_columns is None: exclude_columns = [] elif isinstance(exclude_columns, str): exclude_columns = [exclude_columns] else: exclude_columns = list(exclude_columns) # Always exclude the label column if it exists if label_column in df.columns and label_column not in exclude_columns: exclude_columns.append(label_column) if sample_id_column in df.columns and sample_id_column not in exclude_columns: exclude_columns.append(sample_id_column) # Use spectral_utils to infer and sort spectral columns numeric_cols, wavenumbers = _infer_spectral_columns( df, exclude_columns, wn_min, wn_max ) sorted_cols, sorted_wavenumbers, sort_idx = _sort_spectral_columns( numeric_cols, wavenumbers ) # OPTIMIZATION: Extract numpy array and pre-allocate result spectral_data = df[sorted_cols].values.astype(np.float64) n_samples = spectral_data.shape[0] n_wavenumbers = spectral_data.shape[1] # VALIDATION: Check if data appears to be transmittance instead of absorbance sample_size = min(100, n_samples) sample_data = spectral_data[:sample_size, :].flatten() sample_data_finite = sample_data[np.isfinite(sample_data)] if len(sample_data_finite) > 0: median_val = np.median(sample_data_finite) p95_val = np.percentile(sample_data_finite, 95) if p95_val > 10.0 and median_val > 1.0: raise ValueError( f"Input data appears to be transmittance (%) rather than absorbance (AU). " f"Detected: median={median_val:.2f}, 95th percentile={p95_val:.2f}. " f"Denoising should be performed on absorbance for physical validity. " f"Please convert your data first using: " f"convert_spectra(data, mode='to_absorbance') from trans_abs.py" ) denoised_data = np.empty((n_samples, n_wavenumbers), dtype=np.float64) # Denoising loop with progress bar iterator = tqdm( range(n_samples), desc=f"Denoising ({method})", disable=not show_progress, dynamic_ncols=True ) for i in iterator: intensities = spectral_data[i, :] denoised_data[i, :] = denoise( intensities=intensities, wavenumbers=sorted_wavenumbers, method=method, **kwargs ) # PHYSICAL CONSTRAINT VALIDATION: Check for negative absorbance after denoising finite_mask = np.isfinite(denoised_data) if np.any(finite_mask): n_negative = np.sum(denoised_data[finite_mask] < 0) if n_negative > 0: min_negative = np.min(denoised_data[finite_mask]) pct_negative = 100.0 * n_negative / np.sum(finite_mask) logger.warning( f"Denoising produced {n_negative} negative absorbance values " f"({pct_negative:.1f}% of valid points, min={min_negative:.4f}). " f"This is physically invalid and may indicate: " f"(1) aggressive smoothing parameters, (2) baseline drift in input data, " f"or (3) input data already near zero. " f"Recommendations: Apply baseline correction before denoising, or adjust " f"denoising parameters (e.g., reduce window_length for savgol)." ) # Create result DataFrame with metadata + denoised spectra metadata_cols = [c for c in df.columns if c not in numeric_cols] metadata_df = df[metadata_cols].copy() # Create spectral DataFrame from numpy array (avoids fragmentation) spectral_df = pd.DataFrame(denoised_data, columns=sorted_cols, index=df.index) # Concatenate metadata and spectral data result_df = pd.concat([metadata_df, spectral_df], axis=1) # Reorder columns to match original structure (metadata first, then spectra) final_cols = metadata_cols + sorted_cols result_df = result_df[final_cols] # Convert back to polars if input was polars if is_polars: result_df = pl.from_pandas(result_df) return result_df
# --------------------------------------------------------------------------- # INDIVIDUAL METHODS # --------------------------------------------------------------------------- def _denoise_savgol( y: np.ndarray, window_length: int = 15, polyorder: int = 3 ) -> np.ndarray: """ Savitzky-Golay filter. Fits successive sub-sets of adjacent data points with a low-degree polynomial by the method of linear least squares. Excellent for preserving peak shapes. """ original_window = window_length original_poly = polyorder # Ensure window_length is odd if window_length % 2 == 0: window_length += 1 logger.debug(f"Savitzky-Golay: Adjusted window_length from {original_window} to {window_length} (must be odd)") # Ensure polyorder < window_length if polyorder >= window_length: polyorder = window_length - 1 logger.debug(f"Savitzky-Golay: Adjusted polyorder from {original_poly} to {polyorder} (must be < window_length)") return signal.savgol_filter(y, window_length, polyorder) def _denoise_wavelet( y: np.ndarray, wavelet: str = "db4", level: int = 3, threshold_mode: str = "soft" ) -> np.ndarray: """ Wavelet denoising using thresholding. Decomposes signal into wavelet coefficients, thresholds small coefficients, and reconstructs. Good for multi-scale noise reduction. Requires pywt (PyWavelets) package. If not installed, raises ImportError. """ if not HAS_PYWT: raise ImportError( "Wavelet denoising requires the 'pywt' (PyWavelets) package. " "Install it with: pip install PyWavelets" ) # Decompose coeffs = pywt.wavedec(y, wavelet, level=level) # Estimate noise level from finest detail coefficients sigma = np.median(np.abs(coeffs[-1])) / 0.6745 # Universal threshold threshold = sigma * np.sqrt(2 * np.log(len(y))) # Threshold detail coefficients (keep approximation coefficients) denoised_coeffs = [coeffs[0]] # Keep approximation for c in coeffs[1:]: if threshold_mode == "soft": denoised_coeffs.append(pywt.threshold(c, threshold, mode='soft')) else: denoised_coeffs.append(pywt.threshold(c, threshold, mode='hard')) # Reconstruct return pywt.waverec(denoised_coeffs, wavelet)[:len(y)] def _denoise_moving_average( y: np.ndarray, window: int = 11 ) -> np.ndarray: """Simple moving average filter.""" return uniform_filter1d(y, size=window, mode='nearest') def _denoise_gaussian( y: np.ndarray, sigma: float = 2.0 ) -> np.ndarray: """Gaussian smoothing filter.""" return gaussian_filter1d(y, sigma=sigma, mode='nearest') def _denoise_median( y: np.ndarray, kernel_size: int = 5 ) -> np.ndarray: """ Median filter. Non-linear filter excellent for removing impulse/spike noise while preserving edges. """ original_kernel = kernel_size # Ensure kernel_size is odd if kernel_size % 2 == 0: kernel_size += 1 logger.debug(f"Median filter: Adjusted kernel_size from {original_kernel} to {kernel_size} (must be odd)") return signal.medfilt(y, kernel_size=kernel_size) def _denoise_whittaker( y: np.ndarray, lam: float = 1e4, d: int = 2 ) -> np.ndarray: """ Whittaker smoother (penalized least squares) with sparse matrices. Balances fidelity to data with smoothness penalty. Higher lambda = smoother result. Uses sparse matrix operations for O(n) performance on large spectra (vs O(n^3) for dense implementation). Efficient for n > 1000. Parameters ---------- y : np.ndarray Input signal lam : float, default 1e4 Smoothing parameter. Higher values = smoother result. d : int, default 2 Order of differences for penalty term (2 = penalize curvature) Returns ------- np.ndarray Smoothed signal """ n = len(y) # Create sparse difference matrix (d-th order differences) # For d=2: D is (n-2) x n matrix computing second differences E = eye(n, format='csr') D = E[1:, :] - E[:-1, :] # First difference for _ in range(d - 1): D = D[1:, :] - D[:-1, :] # Higher-order differences # Solve sparse system: (I + lam * D'D) z = y # D'D is (n x n), symmetric positive definite W = E + lam * (D.T @ D) z = spsolve(W, y) return z def _denoise_lowpass( y: np.ndarray, cutoff: float = 0.1, order: int = 4 ) -> np.ndarray: """ Low-pass Butterworth filter. Removes high-frequency noise components. cutoff: normalized frequency (0 to 1, relative to Nyquist). """ # Design filter b, a = signal.butter(order, cutoff, btype='low') # Apply forward-backward filtering (zero phase) return signal.filtfilt(b, a, y) # --------------------------------------------------------------------------- # EVALUATION UTILITIES # ---------------------------------------------------------------------------
[docs] def estimate_snr( y_raw: np.ndarray, y_denoised: np.ndarray, flat_regions: Optional[Union[List[Tuple[int, int]], List[Tuple[float, float]]]] = None, wavenumbers: Optional[np.ndarray] = None ) -> float: """ Estimate Signal-to-Noise Ratio improvement (NaN-aware). Parameters ---------- y_raw : np.ndarray Original noisy spectrum. y_denoised : np.ndarray Denoised spectrum. flat_regions : list of tuples, optional Regions known to be baseline-only (for noise estimation). Can be either: - List of (start_idx, end_idx) integer index tuples (legacy) - List of (wn_min, wn_max) float wavenumber tuples (recommended) If wavenumbers provided, flat_regions interpreted as wavenumber ranges. If None, uses high-frequency residual estimation. wavenumbers : np.ndarray, optional Wavenumber array. Required if flat_regions specified as wavenumber ranges. Returns ------- float Estimated SNR in dB. Returns np.nan if insufficient finite data. Notes ----- - Uses NaN-aware statistics to handle missing values in spectra - Wavenumber-based regions (recommended): More robust across different spectral resolutions - Index-based regions (legacy): Faster but resolution-dependent Examples -------- >>> # Index-based (legacy) >>> snr = estimate_snr(y_raw, y_denoised, flat_regions=[(10, 50), (200, 250)]) >>> # Wavenumber-based (recommended) >>> wn = np.linspace(650, 4000, 1000) >>> snr = estimate_snr(y_raw, y_denoised, flat_regions=[(2500, 2600), (3200, 3500)], wavenumbers=wn) """ if flat_regions: # Estimate noise from difference in flat regions (NaN-aware) noise_samples = [] # Determine if flat_regions are wavenumber-based or index-based if wavenumbers is not None and len(flat_regions) > 0: # Check if first element looks like wavenumbers (floats > 100) vs indices (ints < data length) first_start, first_end = flat_regions[0] if isinstance(first_start, float) or first_start > len(y_raw): # Wavenumber-based: convert to indices for wn_min, wn_max in flat_regions: mask = (wavenumbers >= wn_min) & (wavenumbers <= wn_max) if np.any(mask): indices = np.where(mask)[0] start_idx, end_idx = indices[0], indices[-1] + 1 region_diff = y_raw[start_idx:end_idx] - y_denoised[start_idx:end_idx] finite_diff = region_diff[np.isfinite(region_diff)] if len(finite_diff) > 0: noise_samples.extend(finite_diff) else: # Index-based (legacy): use directly for start, end in flat_regions: region_diff = y_raw[start:end] - y_denoised[start:end] finite_diff = region_diff[np.isfinite(region_diff)] if len(finite_diff) > 0: noise_samples.extend(finite_diff) else: # No wavenumbers provided: assume index-based for start, end in flat_regions: region_diff = y_raw[start:end] - y_denoised[start:end] finite_diff = region_diff[np.isfinite(region_diff)] if len(finite_diff) > 0: noise_samples.extend(finite_diff) if len(noise_samples) == 0: return np.nan noise_std = np.std(noise_samples) else: # Estimate noise from high-frequency residuals (NaN-aware) residual = y_raw - y_denoised finite_residual = residual[np.isfinite(residual)] if len(finite_residual) == 0: return np.nan noise_std = np.std(finite_residual) # Use NaN-aware variance for signal power finite_signal = y_denoised[np.isfinite(y_denoised)] if len(finite_signal) == 0: return np.nan signal_power = np.var(finite_signal) noise_power = noise_std ** 2 if noise_power > 0 and np.isfinite(signal_power): return 10 * np.log10(signal_power / noise_power) elif noise_power == 0 and signal_power > 0: return np.inf # Perfect denoising else: return np.nan
# --------------------------------------------------------------------------- # PARALLEL PROCESSING HELPERS # --------------------------------------------------------------------------- def _evaluate_one_sample( sample_name: str, y_raw: np.ndarray, methods: List[str] ) -> List[dict]: """ Worker function: evaluate all denoising methods for one sample (NaN-aware). Returns list of result dictionaries for each method. Notes ----- Uses NaN-aware statistics to handle missing values in evaluation metrics. Includes computation time measurement for performance comparison. """ import time results = [] for method in methods: try: # Time the denoising operation start_time = time.perf_counter() y_denoised = denoise(y_raw, method=method) elapsed_time = time.perf_counter() - start_time snr_improvement = estimate_snr(y_raw, y_denoised) # Compute smoothness (inverse of 2nd derivative variance) - NaN-aware finite_denoised = y_denoised[np.isfinite(y_denoised)] if len(finite_denoised) >= 3: # Need at least 3 points for 2nd derivative d2 = np.diff(finite_denoised, n=2) finite_d2 = d2[np.isfinite(d2)] if len(finite_d2) > 0: smoothness = 1.0 / (np.var(finite_d2) + 1e-10) else: smoothness = np.nan else: smoothness = np.nan # Compute fidelity (correlation with original) - NaN-aware # Use only positions where both signals are finite valid_mask = np.isfinite(y_raw) & np.isfinite(y_denoised) if np.sum(valid_mask) >= 2: # Need at least 2 points for correlation y_raw_finite = y_raw[valid_mask] y_denoised_finite = y_denoised[valid_mask] # Check for constant arrays (correlation undefined) if np.std(y_raw_finite) > 0 and np.std(y_denoised_finite) > 0: fidelity = np.corrcoef(y_raw_finite, y_denoised_finite)[0, 1] else: fidelity = np.nan else: fidelity = np.nan results.append({ 'sample': sample_name, 'method': method, 'snr_db': snr_improvement, 'smoothness': smoothness, 'fidelity': fidelity, 'time_ms': elapsed_time * 1000 # Convert to milliseconds }) except Exception as e: # Log warning for debugging logger.debug( f"Evaluation failed for sample '{sample_name}' with method '{method}': " f"{type(e).__name__}: {str(e)}" ) results.append({ 'sample': sample_name, 'method': method, 'snr_db': np.nan, 'smoothness': np.nan, 'fidelity': np.nan, 'time_ms': np.nan }) return results @contextmanager def _tqdm_joblib(tqdm_object): """Context manager to patch joblib to report into tqdm progress bar.""" class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack): def __call__(self, *args, **kwargs): tqdm_object.update(n=self.batch_size) return super().__call__(*args, **kwargs) old_batch_callback = joblib.parallel.BatchCompletionCallBack joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback try: yield tqdm_object finally: joblib.parallel.BatchCompletionCallBack = old_batch_callback tqdm_object.close()
[docs] def evaluate_denoising_methods_safe( data: Union[pd.DataFrame, "pl.DataFrame"], methods: Optional[List[str]] = None, **kwargs ) -> pd.DataFrame: """ Memory-safe wrapper for evaluate_denoising_methods() with conservative defaults. This function automatically sets safe defaults to prevent memory issues: - n_samples=50 (instead of all samples) - n_jobs=2 (instead of all CPU cores) - methods=['savgol', 'gaussian', 'median'] if not specified Use this function for initial exploration, then switch to full evaluate_denoising_methods() with custom parameters if needed. Parameters ---------- data : pd.DataFrame | pl.DataFrame Spectral DataFrame methods : list of str, optional Denoising methods to test. If None, uses ['savgol', 'gaussian', 'median']. **kwargs : additional parameters Forwarded to evaluate_denoising_methods(). Note that n_samples and n_jobs will be overridden to safe defaults unless explicitly provided. Returns ------- pd.DataFrame Evaluation results with columns: sample, method, snr_db, smoothness, fidelity, time_ms Examples -------- >>> # Safe evaluation (won't cause memory issues) >>> results = evaluate_denoising_methods_safe(df) >>> recommendations = find_best_denoising_method(results) >>> # With custom methods but still safe >>> results = evaluate_denoising_methods_safe(df, methods=['savgol', 'wavelet']) """ # Set safe defaults safe_defaults = { 'n_samples': 50, 'n_jobs': 2, 'sample_selection': 'random', 'random_state': 42 } # Use default safe methods if not specified if methods is None: methods = ['savgol', 'gaussian', 'median'] # Merge user kwargs with safe defaults (user kwargs take precedence) params = {**safe_defaults, **kwargs} logger.info( f"Running safe evaluation with: {len(methods)} methods, " f"n_samples={params['n_samples']}, n_jobs={params['n_jobs']}" ) return evaluate_denoising_methods(data, methods=methods, **params)
[docs] def evaluate_denoising_methods( data: Union[pd.DataFrame, "pl.DataFrame"], methods: Optional[List[str]] = None, label_column: str = "label", sample_id_column: str = "sample_id", exclude_columns: Optional[List[str]] = None, wn_min: Optional[float] = None, wn_max: Optional[float] = None, n_samples: Optional[int] = None, # None = use all samples sample_selection: str = "random", # "random", "first", "last" random_state: Optional[int] = None, # for reproducibility n_jobs: int = -1, # -1 → all CPU cores ) -> pd.DataFrame: """ Compare denoising methods on a subset of spectra. **MEMORY WARNING**: Parallel processing can consume significant memory. For systems with <16 GB RAM or large datasets (>1000 samples): - Set n_jobs=2 (not -1) - Set n_samples=50 (not None) - Test with methods=['savgol', 'gaussian'] first See MEMORY_MANAGEMENT_GUIDE.md for detailed recommendations. Parameters ---------- data : pd.DataFrame | pl.DataFrame Wide-format spectral DataFrame where rows = samples, columns = wavenumbers. Should contain numerical columns with spectral data and optional metadata columns (e.g., 'sample', 'label'). methods : list of str, optional Methods to evaluate. If None, evaluates all available methods. Available: 'savgol', 'wavelet', 'moving_average', 'gaussian', 'median', 'whittaker', 'lowpass'. **Recommendation**: Start with 2-3 methods to test memory usage. label_column : str, default "label" Name of the label/group column to exclude from evaluation. exclude_columns : list[str], optional Additional column names to exclude from evaluation (e.g., 'sample', 'id'). If None, automatically excludes non-numeric columns. wn_min : float, optional Minimum wavenumber bound (cm⁻¹). If None, uses 200.0 cm⁻¹ as default. wn_max : float, optional Maximum wavenumber bound (cm⁻¹). If None, uses 8000.0 cm⁻¹ as default. n_samples : int, optional Number of samples to evaluate. If None, uses all samples. sample_selection : str, default "random" How to select samples: "random", "first", or "last". random_state : int, optional Random seed for reproducibility when sample_selection="random". n_jobs : int, default -1 Number of parallel jobs. -1 uses all CPU cores. Returns ------- pd.DataFrame Evaluation metrics for each (sample, method) combination with columns: - sample: Sample identifier - method: Denoising method name - snr_db: Signal-to-noise ratio improvement (dB) - smoothness: Inverse of 2nd derivative variance (higher = smoother) - fidelity: Correlation with original signal (0-1, higher = better) - time_ms: Computation time in milliseconds (for performance comparison) """ # Check for polars support is_polars = False if pl is not None: is_polars = isinstance(data, pl.DataFrame) # Convert to pandas for processing if is_polars: df = data.to_pandas() else: df = data.copy() # Prepare exclude_columns list if exclude_columns is None: exclude_columns = [] elif isinstance(exclude_columns, str): exclude_columns = [exclude_columns] else: exclude_columns = list(exclude_columns) # Always exclude the label column if it exists if label_column in df.columns and label_column not in exclude_columns: exclude_columns.append(label_column) if sample_id_column in df.columns and sample_id_column not in exclude_columns: exclude_columns.append(sample_id_column) # Use spectral_utils to infer and sort spectral columns numeric_cols, wavenumbers = _infer_spectral_columns( df, exclude_columns, wn_min, wn_max ) sorted_cols, sorted_wavenumbers, sort_idx = _sort_spectral_columns( numeric_cols, wavenumbers ) if methods is None: methods = denoise_method_names() # Get sample indices (up to n_samples) samples = df.index.tolist() # ---------------- Memory safety check ------------------------------------ n_samples_actual = n_samples if n_samples is not None else len(samples) n_methods = len(methods) n_wavenumbers = len(sorted_cols) n_workers = n_jobs if n_jobs > 0 else joblib.cpu_count() # Estimate memory usage (rough heuristic) estimated_mb_per_worker = (n_samples_actual * n_wavenumbers * 8) / (1024 * 1024) # 8 bytes per float64 estimated_total_mb = estimated_mb_per_worker * n_workers * 2 # 2x for processing overhead # Warn if configuration is likely to cause memory issues if estimated_total_mb > 8192: logger.warning( f"Memory warning: Current configuration may use ~{estimated_total_mb:.0f} MB RAM.\n" f" - Samples: {n_samples_actual}\n" f" - Methods: {n_methods}\n" f" - Wavenumbers: {n_wavenumbers}\n" f" - Parallel workers: {n_workers}\n" f"Recommendations to reduce memory usage:\n" f" 1. Set n_jobs=2 (instead of {n_jobs})\n" f" 2. Set n_samples=50 (instead of {n_samples_actual})\n" f" 3. Reduce methods list to 2-3 methods\n" f"See MEMORY_MANAGEMENT_GUIDE.md for details." ) # ---------------- sample selection ------------------------------------ if n_samples is not None and n_samples < len(samples): if sample_selection == "random": rng = np.random.default_rng(random_state) samples = list(rng.choice(samples, size=n_samples, replace=False)) elif sample_selection == "first": samples = samples[:n_samples] elif sample_selection == "last": samples = samples[-n_samples:] else: raise ValueError( f"Unknown sample_selection '{sample_selection}'. " "Choose from: 'random', 'first', 'last'." ) # Extract spectra to NumPy once (parent process) using sorted columns spectra = { s: df.loc[s, sorted_cols].astype(float).values for s in samples } # ---------------- parallel loop -------------------------------------- worker = joblib.delayed(_evaluate_one_sample) with _tqdm_joblib(tqdm(desc="denoise eval", total=len(samples), dynamic_ncols=True)) as _: sample_results = joblib.Parallel(n_jobs=n_jobs, backend="loky")( worker(s, spectra[s], methods) for s in samples ) # ---------------- assemble results ----------------------------------- # Flatten the list of lists into a single list results = [item for sublist in sample_results for item in sublist] return pd.DataFrame(results)
# --------------------------------------------------------------------------- # EVALUATION VISUALIZATION # ---------------------------------------------------------------------------
[docs] def plot_denoising_evaluation( eval_df: pd.DataFrame, metrics: Optional[List[str]] = None, figsize: Tuple[int, int] = (14, 5), show_mean_sd: bool = True, save_plot: Optional[bool] = None, save_path: Optional[str] = None ) -> None: """ Plot evaluation metrics from evaluate_denoising_methods as box plots. Creates box plots for each metric (SNR, smoothness, fidelity) across different denoising methods to help select the best method. Parameters ---------- eval_df : pd.DataFrame Output from evaluate_denoising_methods() with columns: ['sample', 'method', 'snr_db', 'smoothness', 'fidelity'] metrics : list of str, optional Metrics to plot. If None, plots all three: ['snr_db', 'smoothness', 'fidelity'] Available: 'snr_db', 'smoothness', 'fidelity' figsize : tuple, default (14, 5) Figure size (width, height) show_mean_sd : bool, default True If True, overlay mean ± SD on box plots save_path : str, optional If provided, save the figure to this path (e.g., 'denoising_eval.pdf') save_path : str, optional If provided, save the figure to this path (e.g., 'denoising_eval.pdf') Returns ------- None Displays matplotlib figure Examples -------- >>> # Evaluate methods >>> eval_results = evaluate_denoising_methods(df_wide, n_samples=50) >>> >>> # Plot all metrics >>> plot_denoising_evaluation(eval_results) >>> >>> # Plot only SNR and fidelity >>> plot_denoising_evaluation(eval_results, metrics=['snr_db', 'fidelity']) >>> >>> # Save to file >>> plot_denoising_evaluation(eval_results, save_path='denoise_eval.pdf') """ if metrics is None: metrics = ['snr_db', 'smoothness', 'fidelity'] # Validate metrics available_metrics = ['snr_db', 'smoothness', 'fidelity'] for metric in metrics: if metric not in available_metrics: raise ValueError(f"Unknown metric '{metric}'. Choose from: {available_metrics}") # Get unique methods methods = sorted(eval_df['method'].unique()) n_metrics = len(metrics) # Create subplots fig, axes = plt.subplots(1, n_metrics, figsize=figsize) if n_metrics == 1: axes = [axes] # Metric display names and better labels metric_labels = { 'snr_db': 'SNR (dB)', 'smoothness': 'Smoothness', 'fidelity': 'Fidelity (correlation)' } for idx, metric in enumerate(metrics): ax = axes[idx] # Prepare data for box plot data = [eval_df[eval_df['method'] == m][metric].dropna().values for m in methods] # Create box plot bp = ax.boxplot( data, labels=methods, patch_artist=True, showfliers=False, widths=0.6, medianprops=dict(color='black', linewidth=1.5), boxprops=dict(facecolor='lightblue', alpha=0.7) ) # Overlay mean ± SD if show_mean_sd: means = [np.nanmean(d) for d in data] sds = [np.nanstd(d, ddof=1) for d in data] x_positions = np.arange(1, len(methods) + 1) ax.errorbar( x_positions, means, yerr=sds, fmt='o', color='red', ecolor='red', elinewidth=2, capsize=4, markersize=6, linewidth=0, label='mean ± SD', zorder=10 ) # Formatting ax.set_ylabel(metric_labels[metric], fontsize=11) ax.set_xlabel('Denoising Method', fontsize=10) ax.set_title(f'{metric_labels[metric]} Comparison', fontsize=12, fontweight='bold') ax.tick_params(axis='x', rotation=45) ax.grid(axis='y', alpha=0.3, linestyle='--') if show_mean_sd and idx == n_metrics - 1: ax.legend(loc='best', frameon=False, fontsize=9) plt.tight_layout() # Save if path provided if save_plot: plt.savefig(save_path, dpi=300, bbox_inches='tight') print(f"Figure saved to: {save_path}") plt.show()
[docs] def plot_denoising_evaluation_summary( eval_df: pd.DataFrame, figsize: Tuple[int, int] = (10, 6), save_plot: Optional[bool] = None, save_path: Optional[str] = None ) -> None: """ Create a summary table showing mean ± SD for all metrics across methods. Displays a colored heatmap-style visualization to quickly identify the best performing denoising methods. Parameters ---------- eval_df : pd.DataFrame Output from evaluate_denoising_methods() figsize : tuple, default (10, 6) Figure size save_plot : bool, optional If True, save the figure to save_path save_path : str, optional If provided, save the figure to this path Examples -------- >>> eval_results = evaluate_denoising_methods(df_wide, n_samples=50) >>> plot_denoising_evaluation_summary(eval_results) """ # Calculate summary statistics summary = eval_df.groupby('method').agg({ 'snr_db': ['mean', 'std'], 'smoothness': ['mean', 'std'], 'fidelity': ['mean', 'std'] }).round(3) # Flatten column names summary.columns = ['_'.join(col).strip() for col in summary.columns.values] summary = summary.reset_index() # Create figure fig, ax = plt.subplots(figsize=figsize) ax.axis('off') # Create table table_data = [] headers = ['Method', 'SNR (dB)', 'Smoothness', 'Fidelity'] for _, row in summary.iterrows(): method = row['method'] snr = f"{row['snr_db_mean']:.2f} ± {row['snr_db_std']:.2f}" smooth = f"{row['smoothness_mean']:.2e} ± {row['smoothness_std']:.2e}" fidelity = f"{row['fidelity_mean']:.3f} ± {row['fidelity_std']:.3f}" table_data.append([method, snr, smooth, fidelity]) table = ax.table( cellText=table_data, colLabels=headers, cellLoc='center', loc='center', bbox=[0, 0, 1, 1] ) # Style the table table.auto_set_font_size(False) table.set_fontsize(10) table.scale(1, 2) # Color header for i in range(len(headers)): table[(0, i)].set_facecolor('#4472C4') table[(0, i)].set_text_props(weight='bold', color='white') # Alternate row colors for i in range(1, len(table_data) + 1): for j in range(len(headers)): if i % 2 == 0: table[(i, j)].set_facecolor('#E7E6E6') else: table[(i, j)].set_facecolor('#F2F2F2') plt.title('Denoising Methods: Summary Statistics', fontsize=14, fontweight='bold', pad=20) # Save if path provided if save_plot: plt.savefig(save_path, dpi=300, bbox_inches='tight') print(f"Summary table saved to: {save_path}") plt.show()
[docs] def find_best_denoising_method( eval_df: pd.DataFrame, snr_min: float = 10.0, smoothness_min: float = 1e3, fidelity_min: float = 0.9, time_max_ms: float = 100.0, top_n: int = 5, ) -> pd.DataFrame: """ Recommend best denoising methods based on evaluation metrics. Analyzes SNR, smoothness, fidelity, and computation time across all samples to identify methods that consistently perform well. Methods are ranked by a composite score combining all metrics. Parameters ---------- eval_df : pd.DataFrame Output from evaluate_denoising_methods() with columns: ['sample', 'method', 'snr_db', 'smoothness', 'fidelity', 'time_ms'] snr_min : float, default 10.0 Minimum acceptable SNR in dB (higher is better). smoothness_min : float, default 1e3 Minimum acceptable smoothness (higher is better). fidelity_min : float, default 0.9 Minimum acceptable fidelity correlation (0-1, higher is better). time_max_ms : float, default 100.0 Maximum acceptable computation time in milliseconds (lower is better). top_n : int, default 5 Number of top methods to return. Returns ------- pd.DataFrame Ranked methods with columns: - method: Method name - median_snr_db: Median SNR across samples - median_smoothness: Median smoothness across samples - median_fidelity: Median fidelity across samples - median_time_ms: Median computation time across samples - pass_rate: Fraction of samples passing all thresholds (0-1) - composite_score: Weighted score (higher is better) Sorted by composite_score descending (best methods first). Notes ----- Composite Score Calculation: - Normalizes each metric to [0, 1] range - SNR: Higher is better - Smoothness: Higher is better - Fidelity: Higher is better - Time: Lower is better (inverted for scoring) - Pass rate: Bonus for consistent performance - Composite = (0.3 * SNR_score) + (0.25 * smoothness_score) + (0.3 * fidelity_score) + (0.05 * time_score) + (0.1 * pass_rate) Example ------- >>> eval_results = evaluate_denoising_methods(df, n_samples=50) >>> recommendations = find_best_denoising_method(eval_results, top_n=3) >>> print(recommendations) method median_snr_db median_smoothness median_fidelity median_time_ms pass_rate composite_score 0 savgol 18.5 2.5e4 0.985 12.3 0.94 0.87 1 wavelet 16.2 1.8e4 0.972 45.2 0.88 0.82 2 gaussian 15.8 2.1e4 0.968 8.5 0.86 0.80 """ import warnings # Suppress warnings in this function with warnings.catch_warnings(): warnings.simplefilter("ignore") # Compute median metrics across samples for each method grouped = eval_df.groupby('method') median_snr = grouped['snr_db'].median() median_smoothness = grouped['smoothness'].median() median_fidelity = grouped['fidelity'].median() median_time = grouped['time_ms'].median() # Compute pass rate: fraction of samples meeting all thresholds snr_pass = (eval_df.groupby('method')['snr_db'].apply(lambda x: (x >= snr_min).sum()) / eval_df.groupby('method').size()) smoothness_pass = (eval_df.groupby('method')['smoothness'].apply(lambda x: (x >= smoothness_min).sum()) / eval_df.groupby('method').size()) fidelity_pass = (eval_df.groupby('method')['fidelity'].apply(lambda x: (x >= fidelity_min).sum()) / eval_df.groupby('method').size()) time_pass = (eval_df.groupby('method')['time_ms'].apply(lambda x: (x <= time_max_ms).sum()) / eval_df.groupby('method').size()) # Average pass rate across all metrics pass_rate = (snr_pass + smoothness_pass + fidelity_pass + time_pass) / 4.0 # Normalize metrics to [0, 1] for composite scoring # SNR, smoothness, fidelity: higher is better # Time: lower is better (invert for scoring) snr_min_val, snr_max = median_snr.min(), median_snr.max() smooth_min_val, smooth_max = median_smoothness.min(), median_smoothness.max() fid_min_val, fid_max = median_fidelity.min(), median_fidelity.max() time_min_val, time_max_val = median_time.min(), median_time.max() # Avoid division by zero snr_range = snr_max - snr_min_val if snr_max > snr_min_val else 1.0 smooth_range = smooth_max - smooth_min_val if smooth_max > smooth_min_val else 1.0 fid_range = fid_max - fid_min_val if fid_max > fid_min_val else 1.0 time_range = time_max_val - time_min_val if time_max_val > time_min_val else 1.0 # Normalize (higher is better for all scores) snr_score = (median_snr - snr_min_val) / snr_range smooth_score = (median_smoothness - smooth_min_val) / smooth_range fid_score = (median_fidelity - fid_min_val) / fid_range time_score = 1.0 - (median_time - time_min_val) / time_range # Invert time # Composite score (weighted average) # Weights: SNR=0.3, Smoothness=0.25, Fidelity=0.3, Time=0.05, PassRate=0.1 composite_score = (0.30 * snr_score + 0.25 * smooth_score + 0.30 * fid_score + 0.05 * time_score + 0.10 * pass_rate) # Build results DataFrame results = pd.DataFrame({ 'method': median_snr.index, 'median_snr_db': median_snr.values, 'median_smoothness': median_smoothness.values, 'median_fidelity': median_fidelity.values, 'median_time_ms': median_time.values, 'pass_rate': pass_rate.values, 'composite_score': composite_score.values }) # Sort by composite score (best first) and return top_n results = results.sort_values('composite_score', ascending=False).reset_index(drop=True) return results.head(top_n)
[docs] def plot_denoising_comparison( y_raw: np.ndarray, wavenumbers: np.ndarray, methods: Optional[List[str]] = None, sample_name: str = "", figsize: Tuple[int, int] = (12, 8) ) -> None: """ Plot comparison of multiple denoising methods. Parameters ---------- y_raw : np.ndarray Raw spectrum. wavenumbers : np.ndarray Wavenumber axis. methods : list of str, optional Methods to compare. Default: all. sample_name : str Sample name for title. figsize : tuple Figure size. """ if methods is None: methods = denoise_method_names() n_methods = len(methods) fig, axes = plt.subplots(n_methods + 1, 1, figsize=figsize, sharex=True) # Plot raw axes[0].plot(wavenumbers, y_raw, 'k-', lw=0.8) axes[0].set_ylabel('Raw') axes[0].set_title(f'Denoising Comparison: {sample_name}') # Plot each method for i, method in enumerate(methods, 1): y_denoised = denoise(y_raw, method=method) axes[i].plot(wavenumbers, y_denoised, 'b-', lw=0.8) axes[i].set_ylabel(method) axes[-1].set_xlabel('Wavenumber (cm⁻¹)') plt.gca().invert_xaxis() plt.tight_layout() plt.show()