"""
Normalization Module for FTIR Spectral Preprocessing
=====================================================
**IMPORTANT**: This module expects absorbance data (AU), not transmittance (%).
Convert transmittance to absorbance first using convert_spectra() from trans_abs.py
Provides multiple normalization methods for FTIR spectra including
SNV, vector, area, min-max, and peak normalization.
Also includes mean centering for PCA/PLS preparation.
"""
from __future__ import annotations
from typing import Union, Tuple, List, Optional
import logging
import numpy as np
import pandas as pd
import polars as pl
from scipy import ndimage
from tqdm import tqdm
# Import shared spectral utilities
from .spectral_utils import (
_infer_spectral_columns,
_sort_spectral_columns
)
# Configure module logger
logger = logging.getLogger(__name__)
if not logger.handlers:
logger.setLevel(logging.WARNING)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(levelname)s - %(message)s'))
logger.addHandler(handler)
[docs]
def normalize(
intensities: Union[np.ndarray, list],
wavenumbers: Optional[Union[np.ndarray, list]] = None,
method: str = "snv",
**kwargs
) -> np.ndarray:
"""
Normalize a 1-D FTIR spectrum.
Parameters
----------
intensities : array-like
Intensity values (1-D).
wavenumbers : array-like, optional
X-axis values (wavenumbers in cm⁻¹). Required for peak_wavenumber
and adaptive_regional methods.
method : str, default "snv"
Normalization method. Options:
- 'snv': Standard Normal Variate (mean=0, std=1 within spectrum)
- 'vector': L2 vector normalization (unit length)
- 'minmax': Min-Max scaling to [0, 1]
- 'area': Area normalization (total area = 1)
- 'peak': Normalize by peak intensity
- 'range': Normalize by intensity range
- 'max': Normalize by maximum value
- 'detrend': Polynomial detrending
- 'snv_detrend': SNV followed by detrending
Novel methods (1D-compatible):
- 'robust_snv': Robust SNV using median/MAD
- 'curvature_weighted': Curvature-weighted normalization
- 'peak_envelope': Peak envelope normalization
- 'entropy_weighted': Entropy-weighted normalization
- 'pqn': Probabilistic quotient normalization
- 'total_variation': Total variation normalization
- 'spectral_moments': Spectral moment normalization
- 'adaptive_regional': Adaptive regional normalization (requires wavenumbers)
- 'derivative_ratio': Derivative ratio normalization
- 'signal_to_baseline': Signal-to-baseline ratio normalization
**kwargs : method-specific parameters
peak: peak_idx (index), peak_wavenumber (float, requires wavenumbers),
peak_value (float), use_absolute (bool)
minmax: feature_range (tuple, default (0, 1))
detrend: order (int, default 1)
adaptive_regional: regions (list of tuples), method_per_region (dict)
Returns
-------
np.ndarray
Normalized intensity values.
Notes
-----
Methods that require multiple spectra (2D data) are NOT available here:
- 'mean_center': Use mean_center() directly on 2D array
- 'auto_scale': Use auto_scale() directly on 2D array
- 'pareto': Use pareto_scale() directly on 2D array
These methods compute column-wise statistics and should be applied via
normalize_df() for batch processing.
**PQN (Probabilistic Quotient Normalization):**
- For single spectrum WITH reference: Provide `reference` kwarg for true PQN
Example: normalize(y, method='pqn', reference=ref_spectrum)
- For single spectrum WITHOUT reference: Falls back to median scaling (NOT true PQN!)
A warning will be issued in this case.
- For batch PQN: Use normalize_df() instead (auto-computes reference from dataset)
"""
y = np.asarray(intensities, dtype=np.float64)
if y.ndim != 1:
raise ValueError("`intensities` must be a 1-D array-like object.")
if not np.all(np.isfinite(y)):
raise ValueError("`intensities` contains non-finite values (NaN or Inf).")
# Convert wavenumbers to numpy array if provided
wn = np.asarray(wavenumbers, dtype=np.float64) if wavenumbers is not None else None
if method == "snv":
return _normalize_snv(y)
elif method == "vector":
return _normalize_vector(y)
elif method == "minmax":
feature_range = kwargs.get('feature_range', (0.0, 1.0))
return _normalize_minmax(y, feature_range=feature_range)
elif method == "area":
return _normalize_area(y, wavenumbers=wn)
elif method == "peak":
# Handle peak_wavenumber parameter
peak_wn = kwargs.get("peak_wavenumber", None)
if peak_wn is not None:
if wn is None:
raise ValueError("`wavenumbers` must be provided if `peak_wavenumber` is used.")
if len(wn) != len(y):
raise ValueError("`wavenumbers` must have the same length as `intensities`.")
peak_idx = int(np.argmin(np.abs(wn - float(peak_wn))))
else:
peak_idx = kwargs.get("peak_idx", None)
peak_value = kwargs.get("peak_value", None)
use_absolute = kwargs.get("use_absolute", True)
return _normalize_peak(y, peak_idx=peak_idx, peak_value=peak_value, use_absolute=use_absolute)
elif method == "range":
return _normalize_range(y)
elif method == "max":
return _normalize_max(y)
elif method == "detrend":
order = kwargs.get('order', 1)
return detrend(y, order=order, wavenumbers=wn)
elif method == "snv_detrend":
order = kwargs.get('order', 1)
return snv_detrend(y, detrend_order=order, wavenumbers=wn)
elif method == "robust_snv":
return normalize_robust_snv(y)
elif method == "curvature_weighted":
sigma = kwargs.get('sigma', 3.0)
min_weight = kwargs.get('min_weight', 0.01)
return normalize_curvature_weighted(y, sigma=sigma, min_weight=min_weight)
elif method == "peak_envelope":
percentile = kwargs.get('percentile', 95)
window_size = kwargs.get('window_size', 50)
return normalize_peak_envelope(y, percentile=percentile, window_size=window_size)
elif method == "entropy_weighted":
n_bins = kwargs.get('n_bins', 50)
window_size = kwargs.get('window_size', 30)
return normalize_entropy_weighted(y, n_bins=n_bins, window_size=window_size)
elif method == "pqn":
reference = kwargs.get('reference', None)
reference_type = kwargs.get('reference_type', 'median')
return normalize_pqn(y, reference=reference, reference_type=reference_type)
elif method == "total_variation":
order = kwargs.get('order', 1)
return normalize_total_variation(y, order=order)
elif method == "spectral_moments":
moment_order = kwargs.get('moment_order', 2)
use_central = kwargs.get('use_central', True)
return normalize_spectral_moments(y, moment_order=moment_order, use_central=use_central)
elif method == "adaptive_regional":
regions = kwargs.get('regions', None)
method_per_region = kwargs.get('method_per_region', None)
return normalize_adaptive_regional(y, wn, regions, method_per_region)
elif method == "derivative_ratio":
sigma = kwargs.get('sigma', 2.0)
return normalize_derivative_ratio(y, sigma=sigma)
elif method == "signal_to_baseline":
baseline_percentile = kwargs.get('baseline_percentile', 10)
signal_percentile = kwargs.get('signal_percentile', 90)
return normalize_signal_to_baseline(y, baseline_percentile, signal_percentile)
else:
raise ValueError(
f"Unknown normalization method: '{method}'. "
"Valid 1D methods: snv, vector, minmax, area, peak, range, max, "
"detrend, snv_detrend, robust_snv, curvature_weighted, peak_envelope, "
"entropy_weighted, pqn, total_variation, spectral_moments, adaptive_regional, "
"derivative_ratio, signal_to_baseline. "
"For 2D methods (mean_center, auto_scale, pareto), use normalize_df() instead."
)
[docs]
def normalize_method_names() -> List[str]:
"""
Return list of available 1D normalization method names.
Note: Methods requiring 2D data (mean_center, auto_scale, pareto)
are not included. Use normalize_df() for batch processing with those methods.
"""
return sorted([
"snv", "vector", "minmax", "area", "peak", "range", "max",
"detrend", "snv_detrend", "robust_snv", "curvature_weighted",
"peak_envelope", "entropy_weighted", "pqn", "total_variation",
"spectral_moments", "adaptive_regional", "derivative_ratio",
"signal_to_baseline"
])
# ---------------------------------------------------------------------------
# INDIVIDUAL METHODS
# ---------------------------------------------------------------------------
def _normalize_snv(y: np.ndarray) -> np.ndarray:
"""
Standard Normal Variate (SNV) normalization.
Removes multiplicative scatter effects by centering and scaling
each spectrum individually. Common preprocessing for NIR/IR.
Formula: (x - mean(x)) / std(x)
"""
mean = np.mean(y)
std = np.std(y)
if std == 0:
return np.zeros_like(y)
return (y - mean) / std
def _normalize_vector(y: np.ndarray) -> np.ndarray:
"""
L2 Vector normalization.
Scales spectrum to unit length (L2 norm = 1).
Useful for comparing spectral shapes regardless of intensity.
Formula: x / ||x||_2
"""
norm = np.linalg.norm(y)
if norm == 0:
return np.zeros_like(y)
return y / norm
def _normalize_minmax(
y: np.ndarray,
feature_range: Tuple[float, float] = (0.0, 1.0)
) -> np.ndarray:
"""
Min-Max normalization.
Scales values to specified range (default [0, 1]).
Formula: (x - min) / (max - min) * (new_max - new_min) + new_min
"""
y_min, y_max = np.min(y), np.max(y)
if y_max == y_min:
return np.full_like(y, feature_range[0])
scaled = (y - y_min) / (y_max - y_min)
new_min, new_max = feature_range
return scaled * (new_max - new_min) + new_min
def _normalize_area(
y: np.ndarray,
wavenumbers: Optional[np.ndarray] = None
) -> np.ndarray:
"""
Area normalization using trapezoidal integration.
Scales spectrum so total area under curve equals 1.
Useful for comparing relative concentrations.
Parameters
----------
y : np.ndarray
Spectrum intensities
wavenumbers : np.ndarray, optional
Wavenumber array for proper area integration.
If provided, uses trapezoidal integration accounting for spacing.
If None, falls back to sum of absolute values (uniform spacing assumption).
Returns
-------
np.ndarray
Area-normalized spectrum
Notes
-----
- With wavenumbers: Uses np.trapezoid() for true area under curve,
accounting for non-uniform spacing
- Without wavenumbers: Uses sum(|y|) assuming uniform spacing
- Works on absolute values to handle spectra with negative regions
Formula:
With wavenumbers: x / trapezoid(|x|, x=wavenumbers)
Without wavenumbers: x / sum(|x|)
"""
if wavenumbers is not None:
if len(wavenumbers) != len(y):
raise ValueError(
f"Length mismatch: wavenumbers has {len(wavenumbers)} elements "
f"but intensities has {len(y)} elements."
)
# Use trapezoidal integration for proper area calculation
# This accounts for non-uniform wavenumber spacing
x = np.asarray(wavenumbers, dtype=np.float64)
yy = np.abs(y)
# Ensure integration direction doesn't flip sign (common FT-IR: descending cm^-1)
if x[0] > x[-1]:
x = x[::-1]
yy = yy[::-1]
total_area = np.trapezoid(yy, x=x)
else:
# Fallback: sum of absolute values (uniform spacing assumption)
total_area = np.sum(np.abs(y))
if total_area == 0:
return np.zeros_like(y)
return y / total_area
def _normalize_peak(
y: np.ndarray,
peak_idx: Optional[int] = None,
peak_value: Optional[float] = None,
use_absolute: bool = True
) -> np.ndarray:
"""
Peak normalization.
Normalize by the intensity at a specific peak or maximum.
Useful when reference peak intensity is known/expected.
Parameters
----------
y : np.ndarray
1-D spectrum.
peak_idx : int, optional
Index of the normalization peak. If None, uses maximum absolute value.
Must be within valid range [0, len(y)-1].
peak_value : float, optional
If provided, normalize to this value at peak_idx.
use_absolute : bool, default True
If True, use absolute value for intensity (recommended for robustness).
If False, use signed value (can invert spectrum if peak is negative).
Returns
-------
np.ndarray
Peak-normalized spectrum.
Raises
------
IndexError
If peak_idx is out of bounds.
"""
if peak_idx is None:
# Use maximum absolute value (default behavior)
ref_intensity = np.max(np.abs(y))
else:
# Validate peak_idx bounds
if peak_idx < 0 or peak_idx >= len(y):
raise IndexError(
f"peak_idx={peak_idx} is out of bounds for spectrum of length {len(y)}. "
f"Valid range: [0, {len(y)-1}]"
)
# Get intensity at specified index
if use_absolute:
ref_intensity = np.abs(y[peak_idx])
else:
ref_intensity = y[peak_idx]
if ref_intensity == 0:
return np.zeros_like(y)
if peak_value is not None:
return y * (peak_value / ref_intensity)
return y / ref_intensity
def _normalize_range(y: np.ndarray) -> np.ndarray:
"""
Range normalization.
Divides by the range (max - min).
Formula: x / (max - min)
"""
y_range = np.max(y) - np.min(y)
if y_range == 0:
return np.zeros_like(y)
return y / y_range
def _normalize_max(y: np.ndarray) -> np.ndarray:
"""
Maximum normalization.
Scales so maximum value equals 1.
Formula: x / max(|x|)
"""
max_val = np.max(np.abs(y))
if max_val == 0:
return np.zeros_like(y)
return y / max_val
# ---------------------------------------------------------------------------
# BATCH OPERATIONS
# ---------------------------------------------------------------------------
[docs]
def normalize_df(
data: Union[pd.DataFrame, pl.DataFrame, np.ndarray],
method: str = "snv",
label_column: str = "label",
sample_id_column: str = "sample_id",
exclude_columns: Optional[List[str]] = None,
wn_min: Optional[float] = None,
wn_max: Optional[float] = None,
show_progress: bool = True,
**kwargs
) -> Union[pd.DataFrame, pl.DataFrame, np.ndarray]:
"""
Normalize multiple spectra (DataFrame or numpy array).
Works with both pandas and polars DataFrames, or numpy arrays.
For DataFrames: each row is a sample, numerical columns are wavenumbers.
For numpy arrays: shape (n_samples, n_wavenumbers).
Parameters
----------
data : pd.DataFrame | pl.DataFrame | np.ndarray
Wide-format DataFrame where rows = samples, columns = wavenumbers,
OR numpy array of shape (n_samples, n_wavenumbers).
method : str, default "snv"
Normalization method. Options:
Single-spectrum methods (row-wise, 1D):
- 'snv': Standard Normal Variate (mean=0, std=1 within each spectrum)
- 'vector': L2 vector normalization (unit length)
- 'minmax': Min-Max scaling to [0, 1]
- 'area': Area normalization (total area = 1)
- 'peak': Normalize by peak intensity
- 'range': Normalize by intensity range
- 'max': Normalize by maximum value
- 'detrend': Polynomial detrending
- 'snv_detrend': SNV followed by detrending
- Plus all novel methods (robust_snv, curvature_weighted, etc.)
Multi-spectrum methods (column-wise, 2D - for PCA/PLS prep):
- 'mean_center': Column-wise mean centering (mean=0 per wavenumber)
**Requires ≥2 samples**
- 'auto_scale': Column-wise auto-scaling (mean=0, std=1 per wavenumber)
**Requires ≥2 samples**
- 'pareto': Column-wise Pareto scaling (mean=0, scaled by sqrt(std))
**Requires ≥2 samples**
Dataset-level methods (require reference from entire dataset):
- 'pqn': Probabilistic Quotient Normalization (computes reference spectrum
from dataset median/mean, then normalizes each spectrum relative to it)
**Requires ≥3 samples**
label_column : str, default "label"
Name of the label/group column to exclude from normalization.
Only used for DataFrame inputs.
exclude_columns : list[str], optional
Additional column names to exclude from normalization (e.g., 'sample', 'id').
If None, automatically excludes non-numeric columns.
Only used for DataFrame inputs.
wn_min : float, optional
Minimum wavenumber bound (cm⁻¹). If None, uses 200.0 cm⁻¹ as default,
or auto-expands if no columns found within default range.
wn_max : float, optional
Maximum wavenumber bound (cm⁻¹). If None, uses 8000.0 cm⁻¹ as default,
or auto-expands if no columns found within default range.
show_progress : bool, default True
If True, display a progress bar during processing.
Only used for DataFrame inputs.
**kwargs : method-specific parameters
peak: peak_idx (index) or peak_wavenumber (requires wavenumbers array)
minmax: feature_range (tuple, default (0, 1))
detrend: order (int, default 1)
pqn: reference_type (str, default 'median') - 'median' or 'mean' for
computing reference spectrum from dataset
Returns
-------
pd.DataFrame | pl.DataFrame | np.ndarray
Normalized data (same type as input).
Raises
------
ValueError
If using 2D methods (mean_center, auto_scale, pareto) with <2 samples.
If using PQN with <3 samples.
Examples
--------
>>> # Normalize DataFrame with SNV
>>> df_norm = normalize_batch(df_wide, method="snv")
>>> # Normalize with vector normalization
>>> df_norm = normalize_batch(df_wide, method="vector")
>>> # Normalize numpy array
>>> spectra_norm = normalize_batch(spectra_array, method="snv")
>>> # Min-max normalization to [0, 1]
>>> df_norm = normalize_batch(df_wide, method="minmax", feature_range=(0, 1))
>>> # PQN with median reference (proper batch PQN)
>>> df_norm = normalize_batch(df_wide, method="pqn", reference_type="median")
>>> # PQN with mean reference
>>> df_norm = normalize_batch(df_wide, method="pqn", reference_type="mean")
>>> # Disable progress bar
>>> df_norm = normalize_batch(df_wide, method="snv", show_progress=False)
"""
# Handle numpy array input (only 1d methods)
if isinstance(data, np.ndarray):
return np.array([normalize(s, method=method, **kwargs) for s in data])
# Handle DataFrame input (both 1d, 2d methods, with dataframes only)
is_polars = isinstance(data, pl.DataFrame)
# Convert to pandas for processing
if is_polars:
df = data.to_pandas()
else:
df = data.copy()
# Prepare exclude_columns list
if exclude_columns is None:
exclude_columns = []
elif isinstance(exclude_columns, str):
exclude_columns = [exclude_columns]
else:
exclude_columns = list(exclude_columns)
# Always exclude the label column if it exists
if label_column in df.columns and label_column not in exclude_columns:
exclude_columns.append(label_column)
if sample_id_column in df.columns and sample_id_column not in exclude_columns:
exclude_columns.append(sample_id_column)
# Use spectral_utils to infer and sort spectral columns
numeric_cols, wavenumbers = _infer_spectral_columns(
df, exclude_columns, wn_min, wn_max
)
sorted_cols, sorted_wavenumbers, sort_idx = _sort_spectral_columns(
numeric_cols, wavenumbers
)
# Identify all parseable numeric columns (for metadata separation)
all_numeric_cols = []
for c in df.columns:
if c in exclude_columns:
continue
try:
float(c)
all_numeric_cols.append(c)
except Exception:
continue
# Metadata = all columns that are NOT parseable as wavenumbers
metadata_cols = [c for c in df.columns if c not in all_numeric_cols]
# VALIDATION: Check if data appears to be transmittance instead of absorbance
# Sample spectral data for efficiency
temp_spectral_data = df[sorted_cols].values.astype(np.float64)
sample_size = min(100, temp_spectral_data.shape[0])
sample_data = temp_spectral_data[:sample_size, :].flatten()
sample_data_finite = sample_data[np.isfinite(sample_data)]
if len(sample_data_finite) > 0:
median_val = np.median(sample_data_finite)
p95_val = np.percentile(sample_data_finite, 95)
if p95_val > 10.0 and median_val > 1.0:
raise ValueError(
f"Input data appears to be transmittance (%) rather than absorbance (AU). "
f"Detected: median={median_val:.2f}, 95th percentile={p95_val:.2f}. "
f"Normalization should be performed on absorbance for physical validity. "
f"Please convert your data first using: "
f"convert_spectra(data, mode='to_absorbance') from trans_abs.py"
)
# Check if method requires dataset-level processing
# - Column-wise 2D methods: mean_center, auto_scale, pareto
# - Dataset reference methods: pqn (requires reference spectrum from dataset)
methods_2d = ['mean_center', 'auto_scale', 'pareto']
methods_dataset_ref = ['pqn']
# Validate minimum sample size for dataset-level methods
n_samples = len(df)
MIN_SAMPLES_2D = 2 # Minimum for meaningful column-wise statistics
MIN_SAMPLES_PQN = 3 # Minimum for meaningful reference spectrum
if method in methods_2d and n_samples < MIN_SAMPLES_2D:
raise ValueError(
f"Method '{method}' requires at least {MIN_SAMPLES_2D} samples for "
f"column-wise normalization, but only {n_samples} sample(s) provided. "
f"For single-spectrum normalization, use a row-wise method like 'snv', "
f"'vector', 'area', etc."
)
if method in methods_dataset_ref and n_samples < MIN_SAMPLES_PQN:
raise ValueError(
f"Method '{method}' requires at least {MIN_SAMPLES_PQN} samples to compute "
f"a meaningful reference spectrum, but only {n_samples} sample(s) provided. "
f"For single-spectrum normalization, use normalize() with a provided reference, "
f"or use a different method."
)
if method in methods_2d:
# Extract spectral data as 2D array (using sorted, filtered columns)
spectral_data = df[sorted_cols].values.astype(np.float64)
# Apply 2D method
if method == 'mean_center':
normalized_data = mean_center(spectral_data, axis=0, return_mean=False)
elif method == 'auto_scale':
normalized_data = auto_scale(spectral_data, return_params=False)
elif method == 'pareto':
normalized_data = pareto_scale(spectral_data, return_params=False)
# Create result DataFrame with metadata + normalized spectra
metadata_df = df[metadata_cols].copy()
# Create spectral DataFrame from numpy array
spectral_df = pd.DataFrame(normalized_data, columns=sorted_cols, index=df.index)
# Concatenate metadata and spectral data
result_df = pd.concat([metadata_df, spectral_df], axis=1)
# Reorder columns to match original structure (metadata first, then spectra)
final_cols = metadata_cols + sorted_cols
df = result_df[final_cols]
elif method in methods_dataset_ref:
# PQN: Requires computing reference spectrum from dataset
# Extract spectral data as 2D array
spectral_data = df[sorted_cols].values.astype(np.float64)
n_samples = spectral_data.shape[0]
n_wavenumbers = spectral_data.shape[1]
# Compute reference spectrum from dataset
reference_type = kwargs.get('reference_type', 'median')
if reference_type == 'median':
reference_spectrum = np.median(spectral_data, axis=0)
elif reference_type == 'mean':
reference_spectrum = np.mean(spectral_data, axis=0)
else:
raise ValueError(
f"Invalid reference_type '{reference_type}'. "
f"Must be 'median' or 'mean'."
)
# Apply PQN to each spectrum using the reference
normalized_data = np.empty((n_samples, n_wavenumbers), dtype=np.float64)
iterator = tqdm(
range(n_samples),
desc=f"PQN normalization (ref={reference_type})",
disable=not show_progress,
dynamic_ncols=True
)
for i in iterator:
normalized_data[i, :] = normalize_pqn(
spectral_data[i, :],
reference=reference_spectrum,
reference_type=reference_type
)
# Create result DataFrame with metadata + normalized spectra
metadata_df = df[metadata_cols].copy()
# Create spectral DataFrame from numpy array
spectral_df = pd.DataFrame(normalized_data, columns=sorted_cols, index=df.index)
# Concatenate metadata and spectral data
result_df = pd.concat([metadata_df, spectral_df], axis=1)
# Reorder columns to match original structure (metadata first, then spectra)
final_cols = metadata_cols + sorted_cols
df = result_df[final_cols]
else:
# Apply 1D normalization to each sample (row-wise)
# OPTIMIZATION: Extract numpy array and pre-allocate result
spectral_data = df[sorted_cols].values.astype(np.float64)
n_samples = spectral_data.shape[0]
n_wavenumbers = spectral_data.shape[1]
normalized_data = np.empty((n_samples, n_wavenumbers), dtype=np.float64)
# Normalization loop with progress bar
iterator = tqdm(
range(n_samples),
desc=f"Normalization ({method})",
disable=not show_progress,
dynamic_ncols=True
)
for i in iterator:
intensities = spectral_data[i, :]
normalized_data[i, :] = normalize(
intensities=intensities,
wavenumbers=sorted_wavenumbers,
method=method,
**kwargs
)
# Create result DataFrame with metadata + normalized spectra
metadata_df = df[metadata_cols].copy()
# Create spectral DataFrame from numpy array (avoids fragmentation)
spectral_df = pd.DataFrame(normalized_data, columns=sorted_cols, index=df.index)
# Concatenate metadata and spectral data
result_df = pd.concat([metadata_df, spectral_df], axis=1)
# Reorder columns to match original structure (metadata first, then spectra)
final_cols = metadata_cols + sorted_cols
df = result_df[final_cols]
# PHYSICAL CONSTRAINT VALIDATION: Method-specific validation for negative values
# Categorize methods by whether negative values are expected
# Methods that EXPECT negative values (mean-centering, detrending, baseline-shifting)
methods_expect_negative = [
# Row-wise mean-centering methods
'snv', 'snv_detrend', 'robust_snv',
# Column-wise centering (for PCA/PLS prep)
'mean_center', 'auto_scale', 'pareto',
# Detrending methods (polynomial baseline removal)
'detrend',
# Novel methods that produce centered data or can produce negatives
'spectral_moments', # With use_central=True (default), returns mean-centered data
'signal_to_baseline', # Shifts by baseline_level, can produce negatives
'derivative_ratio', # Preserves sign of input (can be negative after baseline correction)
]
# Methods that should ONLY produce non-negative values (if input is non-negative)
methods_should_be_nonnegative = [
# Simple scaling methods (preserve sign, but should be positive if input is positive)
'minmax', 'area', 'peak', 'range', 'max', 'vector',
# Ratio-based methods (should stay positive if input is positive)
'pqn',
# Novel methods that should preserve positivity
'curvature_weighted', 'peak_envelope', 'entropy_weighted',
'total_variation',
# Note: adaptive_regional depends on methods used per region
]
# Extract final normalized data for validation
final_spectral_data = df[sorted_cols].values.astype(np.float64)
finite_mask = np.isfinite(final_spectral_data)
if np.any(finite_mask):
n_negative = np.sum(final_spectral_data[finite_mask] < 0)
if n_negative > 0:
min_negative = np.min(final_spectral_data[finite_mask])
pct_negative = 100.0 * n_negative / np.sum(finite_mask)
# Decide whether to warn based on method type
if method in methods_expect_negative:
# Negative values are EXPECTED - only log at debug level
logger.debug(
f"Normalization method '{method}' produced {n_negative} negative values "
f"({pct_negative:.1f}% of valid points, min={min_negative:.4f}). "
f"This is EXPECTED for mean-centering/baseline-shifting methods."
)
elif method in methods_should_be_nonnegative:
# Negative values are UNEXPECTED - warn strongly
logger.warning(
f"Normalization method '{method}' produced {n_negative} negative values "
f"({pct_negative:.1f}% of valid points, min={min_negative:.4f}). "
f"This is UNEXPECTED for this method and indicates a problem. "
f"Possible causes: (1) Input data already has negative absorbance (check baseline correction), "
f"(2) Numerical instability in normalization, or (3) Data contains artifacts. "
f"Recommendation: Inspect input data and consider baseline correction."
)
elif method == 'adaptive_regional':
# Region-dependent method - give informational message
logger.info(
f"Normalization method 'adaptive_regional' produced {n_negative} negative values "
f"({pct_negative:.1f}% of valid points, min={min_negative:.4f}). "
f"This depends on the methods used per region (method_per_region parameter). "
f"If mean-centering methods (snv, robust_snv) are used, negatives are expected."
)
else:
# Method not categorized - give general warning
logger.info(
f"Normalization method '{method}' produced {n_negative} negative values "
f"({pct_negative:.1f}% of valid points, min={min_negative:.4f}). "
f"Verify if this is expected for your normalization method."
)
# Convert back to polars if input was polars
if is_polars:
df = pl.from_pandas(df)
return df
# ---------------------------------------------------------------------------
# MEAN CENTERING
# ---------------------------------------------------------------------------
[docs]
def mean_center(
spectra: np.ndarray,
axis: int = 0,
return_mean: bool = False
) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]:
"""
Mean-center spectra (essential preprocessing for PCA/PLS).
Parameters
----------
spectra : np.ndarray, shape (n_samples, n_wavenumbers)
Matrix of spectra.
axis : int, default 0
Axis along which to compute mean.
- 0: Column-wise (feature/wavenumber centering) - standard for PCA
- 1: Row-wise (sample centering)
return_mean : bool, default False
If True, return the mean array for later reconstruction.
Returns
-------
centered : np.ndarray
Mean-centered spectra.
mean : np.ndarray, optional
Mean values (returned if return_mean=True).
"""
mean = np.mean(spectra, axis=axis, keepdims=True)
centered = spectra - mean
if return_mean:
return centered, np.squeeze(mean)
return centered
[docs]
def auto_scale(
spectra: np.ndarray,
return_params: bool = False
) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray, np.ndarray]]:
"""
Auto-scaling (mean centering + unit variance scaling).
Each variable (wavenumber) is scaled to have mean=0 and std=1.
Common preprocessing for PCA/PLS when variables have different scales.
Parameters
----------
spectra : np.ndarray, shape (n_samples, n_wavenumbers)
Matrix of spectra.
return_params : bool, default False
If True, return mean and std for reconstruction.
Returns
-------
scaled : np.ndarray
Auto-scaled spectra.
mean : np.ndarray, optional
std : np.ndarray, optional
"""
mean = np.mean(spectra, axis=0)
std = np.std(spectra, axis=0)
std[std == 0] = 1 # Avoid division by zero
scaled = (spectra - mean) / std
if return_params:
return scaled, mean, std
return scaled
[docs]
def pareto_scale(
spectra: np.ndarray,
return_params: bool = False
) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray, np.ndarray]]:
"""
Pareto scaling (mean centering + sqrt(std) scaling).
Less aggressive than auto-scaling; preserves more of the
original data structure. Good for spectral data.
Parameters
----------
spectra : np.ndarray, shape (n_samples, n_wavenumbers)
Matrix of spectra.
return_params : bool, default False
If True, return mean and std for reconstruction.
Returns
-------
scaled : np.ndarray
Pareto-scaled spectra.
"""
mean = np.mean(spectra, axis=0)
std = np.std(spectra, axis=0)
std[std == 0] = 1
scaled = (spectra - mean) / np.sqrt(std)
if return_params:
return scaled, mean, std
return scaled
# ---------------------------------------------------------------------------
# DETRENDING
# ---------------------------------------------------------------------------
[docs]
def detrend(
intensities: np.ndarray,
order: int = 1,
wavenumbers: Optional[np.ndarray] = None
) -> np.ndarray:
"""
Remove polynomial trend from spectrum.
Often used after SNV to remove residual slope.
Parameters
----------
intensities : np.ndarray
1-D spectrum.
order : int, default 1
Polynomial order (1 = linear detrending).
wavenumbers : np.ndarray, optional
Wavenumber array for physical slope calculation.
If provided, polynomial fit uses actual wavenumber values (cm⁻¹).
If None, uses array indices (0, 1, 2, ...).
Returns
-------
np.ndarray
Detrended spectrum.
Notes
-----
- With wavenumbers: Fit uses physical x-axis (cm⁻¹), yielding physically
meaningful slope coefficients. Important when comparing spectra on
different grids or after resampling.
- Without wavenumbers: Fit uses indices (0, 1, 2, ...), which is
grid-dependent. Slope coefficients are in units of intensity/index.
Examples
--------
>>> # Index-based detrending (grid-dependent)
>>> detrended = detrend(spectrum)
>>>
>>> # Physical detrending (grid-independent)
>>> detrended = detrend(spectrum, wavenumbers=wn, order=1)
"""
if wavenumbers is not None:
# Use physical wavenumber values for fitting
if len(wavenumbers) != len(intensities):
raise ValueError(
f"Length mismatch: wavenumbers has {len(wavenumbers)} elements "
f"but intensities has {len(intensities)} elements."
)
x = wavenumbers
else:
# Use array indices (original behavior)
x = np.arange(len(intensities))
coeffs = np.polyfit(x, intensities, order)
trend = np.polyval(coeffs, x)
return intensities - trend
[docs]
def snv_detrend(
intensities: np.ndarray,
detrend_order: int = 1,
wavenumbers: Optional[np.ndarray] = None
) -> np.ndarray:
"""
SNV followed by detrending.
Common combined preprocessing for scatter correction.
Parameters
----------
intensities : np.ndarray
1-D spectrum.
detrend_order : int, default 1
Polynomial order for detrending (1 = linear).
wavenumbers : np.ndarray, optional
Wavenumber array for physical slope calculation in detrending.
If provided, uses actual wavenumber values (cm⁻¹).
If None, uses array indices.
Returns
-------
np.ndarray
SNV-normalized and detrended spectrum.
"""
snv_result = _normalize_snv(intensities)
return detrend(snv_result, order=detrend_order, wavenumbers=wavenumbers)
# New methods
"""
Novel Normalization Methods for FT-IR Absorbance Spectra
=========================================================
These methods address limitations of traditional approaches by incorporating
spectral physics, robust statistics, and information-theoretic principles.
"""
# ---------------------------------------------------------------------------
# 1. CURVATURE-WEIGHTED NORMALIZATION
# ---------------------------------------------------------------------------
[docs]
def normalize_curvature_weighted(
y: np.ndarray,
sigma: float = 3.0,
min_weight: float = 0.01
) -> np.ndarray:
"""
Normalize with weights proportional to local curvature (2nd derivative).
Physical motivation: In FT-IR, peaks have high curvature while baseline
regions are flat. This method emphasizes peak regions during normalization,
making it more representative of actual chemical information.
The normalization factor is the curvature-weighted L2 norm.
Parameters
----------
y : np.ndarray
1-D spectrum.
sigma : float, default 3.0
Gaussian smoothing sigma for curvature estimation (reduces noise).
min_weight : float, default 0.01
Minimum weight to avoid division issues in flat regions.
Returns
-------
np.ndarray
Curvature-weighted normalized spectrum.
Notes
-----
Handles flat spectra (zero curvature) by falling back to uniform weighting.
"""
# Smooth before computing curvature to reduce noise amplification
y_smooth = ndimage.gaussian_filter1d(y, sigma=sigma)
# Second derivative (curvature proxy)
d2y = np.gradient(np.gradient(y_smooth))
# Weights = |curvature|, normalized
weights = np.abs(d2y)
max_weight = np.max(weights)
if max_weight > 0:
# Normal case: normalize by max curvature
weights = weights / max_weight + min_weight
else:
# Flat spectrum: use uniform weighting
weights = np.ones_like(weights) * (1.0 + min_weight)
# Weighted L2 norm
weighted_norm = np.sqrt(np.sum(weights * y**2))
if weighted_norm == 0:
return np.zeros_like(y)
return y / weighted_norm
# ---------------------------------------------------------------------------
# 2. PEAK-ENVELOPE NORMALIZATION
# ---------------------------------------------------------------------------
[docs]
def normalize_peak_envelope(
y: np.ndarray,
percentile: float = 95,
window_size: int = 50
) -> np.ndarray:
"""
Normalize by the upper envelope of the spectrum.
Physical motivation: The upper envelope represents the maximum signal
level across the spectrum, accounting for varying peak densities and
intensities. This is more representative than single-peak normalization.
Works on absolute values to handle negative/baseline-corrected spectra.
Parameters
----------
y : np.ndarray
1-D spectrum.
percentile : float, default 95
Percentile for envelope estimation (avoids noise spikes).
window_size : int, default 50
Rolling window size for envelope estimation.
Returns
-------
np.ndarray
Envelope-normalized spectrum.
Notes
-----
Uses absolute values to avoid NaN issues with negative/baseline-corrected spectra.
"""
# Work with absolute values to handle negative spectra
y_abs = np.abs(y)
n = len(y_abs)
envelope = np.zeros(n)
half_win = window_size // 2
for i in range(n):
start = max(0, i - half_win)
end = min(n, i + half_win + 1)
envelope[i] = np.percentile(y_abs[start:end], percentile)
# Use median of envelope as normalization factor
# Filter finite values > 0
valid_envelope = envelope[np.isfinite(envelope) & (envelope > 0)]
if len(valid_envelope) == 0:
# Fallback: use max absolute value
max_val = np.max(y_abs)
norm_factor = max_val if max_val > 0 else 1.0
else:
norm_factor = np.median(valid_envelope)
if norm_factor == 0:
return np.zeros_like(y)
return y / norm_factor
# ---------------------------------------------------------------------------
# 3. ENTROPY-WEIGHTED NORMALIZATION
# ---------------------------------------------------------------------------
[docs]
def normalize_entropy_weighted(
y: np.ndarray,
n_bins: int = 50,
window_size: int = 30,
epsilon: float = 1e-10
) -> np.ndarray:
"""
Normalize with weights based on local spectral entropy.
Motivation: Regions with high entropy (high variability/information)
should contribute more to normalization. Flat baseline regions have
low entropy and contribute less.
Local entropy is computed in sliding windows using histogram-based
probability estimation.
Parameters
----------
y : np.ndarray
1-D spectrum.
n_bins : int, default 50
Number of bins for local histogram.
window_size : int, default 30
Window size for local entropy calculation.
epsilon : float
Small value to avoid log(0).
Returns
-------
np.ndarray
Entropy-weighted normalized spectrum.
"""
n = len(y)
local_entropy = np.zeros(n)
half_win = window_size // 2
for i in range(n):
start = max(0, i - half_win)
end = min(n, i + half_win + 1)
window = y[start:end]
# Compute local histogram and entropy
hist, _ = np.histogram(window, bins=min(n_bins, len(window)//2 + 1))
hist = hist / (hist.sum() + epsilon)
hist = hist[hist > 0]
local_entropy[i] = -np.sum(hist * np.log(hist + epsilon))
# Normalize entropy to [0, 1] and use as weights
weights = (local_entropy - local_entropy.min()) / (local_entropy.max() - local_entropy.min() + epsilon)
weights = weights + 0.1 # Floor to avoid zero weights
# Weighted normalization
weighted_norm = np.sqrt(np.sum(weights * y**2))
if weighted_norm == 0:
return np.zeros_like(y)
return y / weighted_norm
# ---------------------------------------------------------------------------
# 4. TOTAL VARIATION NORMALIZATION
# ---------------------------------------------------------------------------
[docs]
def normalize_total_variation(
y: np.ndarray,
order: int = 1
) -> np.ndarray:
"""
Normalize by total variation (sum of absolute differences).
Physical motivation: Total variation captures the "roughness" or
total signal content independent of baseline offset. It's related
to the first derivative energy and is baseline-invariant.
TV = sum(|y[i+1] - y[i]|) for first order
Parameters
----------
y : np.ndarray
1-D spectrum.
order : int, default 1
Order of differences (1 = first derivative, 2 = second derivative).
Returns
-------
np.ndarray
TV-normalized spectrum.
"""
diff = y.copy()
for _ in range(order):
diff = np.diff(diff)
tv = np.sum(np.abs(diff))
if tv == 0:
return np.zeros_like(y)
return y / tv
# ---------------------------------------------------------------------------
# 5. SPECTRAL MOMENT NORMALIZATION
# ---------------------------------------------------------------------------
[docs]
def normalize_spectral_moments(
y: np.ndarray,
moment_order: int = 2,
use_central: bool = True
) -> np.ndarray:
"""
Normalize using spectral moments.
Physical motivation: Higher-order moments capture distribution
characteristics beyond simple mean/variance. The nth moment
emphasizes larger deviations, useful for peak-rich spectra.
Parameters
----------
y : np.ndarray
1-D spectrum.
moment_order : int, default 2
Order of moment to use (2 = variance-like, 3 = skewness-like, etc.)
use_central : bool, default True
If True, use central moments (subtract mean first).
Returns
-------
np.ndarray
Moment-normalized spectrum.
"""
if use_central:
y_centered = y - np.mean(y)
else:
y_centered = y
# Compute nth moment
moment = np.mean(np.abs(y_centered) ** moment_order) ** (1.0 / moment_order)
if moment == 0:
return np.zeros_like(y)
if use_central:
return y_centered / moment
return y / moment
# ---------------------------------------------------------------------------
# 6. ADAPTIVE REGIONAL NORMALIZATION
# ---------------------------------------------------------------------------
[docs]
def normalize_adaptive_regional(
y: np.ndarray,
wavenumbers: Optional[np.ndarray],
regions: Optional[list] = None,
method_per_region: Optional[dict] = None
) -> np.ndarray:
"""
Apply different normalization to different spectral regions.
Physical motivation: Different FT-IR regions have different
characteristics:
- 3600-2800 cm⁻¹: O-H, N-H, C-H stretching (often intense)
- 1800-1500 cm⁻¹: C=O, C=C, amide bands
- 1500-400 cm⁻¹: Fingerprint region (complex)
Each region may benefit from different normalization.
Parameters
----------
y : np.ndarray
1-D spectrum.
wavenumbers : np.ndarray, optional
Wavenumber axis. Required for this method.
regions : list of tuples, optional
[(start1, end1), (start2, end2), ...] defining regions.
Default: standard FT-IR regions.
method_per_region : dict, optional
{"region_idx": "method_name"} mapping.
Default: SNV for all regions.
Returns
-------
np.ndarray
Regionally-normalized spectrum.
Raises
------
ValueError
If wavenumbers is None.
"""
if wavenumbers is None:
raise ValueError(
"adaptive_regional normalization requires wavenumbers array. "
"Provide wavenumbers to normalize() function."
)
if len(wavenumbers) != len(y):
raise ValueError(
f"Length mismatch: wavenumbers has {len(wavenumbers)} elements "
f"but spectrum has {len(y)} elements."
)
if regions is None:
# Default FT-IR regions
regions = [
(3600, 2800), # X-H stretching
(1800, 1500), # Double bonds
(1500, 400), # Fingerprint
]
if method_per_region is None:
method_per_region = {i: "snv" for i in range(len(regions))}
y_normalized = y.copy()
for i, (start, end) in enumerate(regions):
mask = (wavenumbers >= min(start, end)) & (wavenumbers <= max(start, end))
if not np.any(mask):
continue
region_data = y[mask]
method = method_per_region.get(i, "snv")
if method == "snv":
mean, std = np.mean(region_data), np.std(region_data)
if std > 0:
y_normalized[mask] = (region_data - mean) / std
elif method == "minmax":
rmin, rmax = np.min(region_data), np.max(region_data)
if rmax > rmin:
y_normalized[mask] = (region_data - rmin) / (rmax - rmin)
elif method == "robust":
y_normalized[mask] = normalize_robust_snv(region_data)
return y_normalized
# ---------------------------------------------------------------------------
# 7. DERIVATIVE RATIO NORMALIZATION
# ---------------------------------------------------------------------------
[docs]
def normalize_derivative_ratio(
y: np.ndarray,
sigma: float = 2.0
) -> np.ndarray:
"""
Normalize using the ratio of derivative energies.
Physical motivation: The ratio of second to first derivative
energy characterizes peak sharpness independent of intensity.
This provides baseline-independent normalization.
Parameters
----------
y : np.ndarray
1-D spectrum.
sigma : float
Smoothing parameter before derivative computation.
Returns
-------
np.ndarray
Derivative-ratio normalized spectrum.
"""
y_smooth = ndimage.gaussian_filter1d(y, sigma=sigma)
d1 = np.gradient(y_smooth)
d2 = np.gradient(d1)
# Energy of derivatives
e1 = np.sqrt(np.mean(d1**2))
e2 = np.sqrt(np.mean(d2**2))
# Combined normalization factor
if e1 == 0:
return np.zeros_like(y)
# Use geometric mean of derivative energies
norm_factor = np.sqrt(e1 * e2) if e2 > 0 else e1
return y / norm_factor
# ---------------------------------------------------------------------------
# 8. SIGNAL-TO-BASELINE RATIO NORMALIZATION
# ---------------------------------------------------------------------------
[docs]
def normalize_signal_to_baseline(
y: np.ndarray,
baseline_percentile: float = 10,
signal_percentile: float = 90
) -> np.ndarray:
"""
Normalize by the ratio of signal to baseline levels.
Physical motivation: This separates the "signal" (peaks) from
"baseline" (background) and normalizes by their contrast. Useful
when baseline levels vary between samples.
Parameters
----------
y : np.ndarray
1-D spectrum.
baseline_percentile : float
Percentile to estimate baseline level.
signal_percentile : float
Percentile to estimate signal level.
Returns
-------
np.ndarray
Signal-to-baseline normalized spectrum.
"""
baseline_level = np.percentile(y, baseline_percentile)
signal_level = np.percentile(y, signal_percentile)
contrast = signal_level - baseline_level
if contrast <= 0:
return np.zeros_like(y)
# Shift to baseline and normalize by contrast
return (y - baseline_level) / contrast
# ---------------------------------------------------------------------------
# Other methods - modified from original
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# 1. ROBUST SNV (RSNV) - Median/MAD-based SNV
# ---------------------------------------------------------------------------
[docs]
def normalize_robust_snv(y: np.ndarray, consistency_correction: bool = True, epsilon: float = 1e-10) -> np.ndarray:
"""
Robust Standard Normal Variate using median and MAD.
Traditional SNV uses mean and std, which are sensitive to:
- Baseline artifacts (shifts the mean)
- Outlier peaks (inflates std)
- Asymmetric intensity distributions
RSNV uses median (robust center) and MAD (robust scale).
Formula: (x - median(x)) / MAD(x)
Parameters
----------
y : np.ndarray
1-D spectrum.
consistency_correction : bool, default True
If True, scale MAD by 1.4826 to be consistent with std for normal data.
epsilon : float, default 1e-10
Small value added to MAD to avoid division by zero and prevent
zero vectors (which cause issues with cosine-based methods).
Returns
-------
np.ndarray
Robustly normalized spectrum.
Reference
---------
Novel method - combines robust statistics with scatter correction.
Notes
-----
When MAD is very small or zero (flat spectrum), epsilon prevents
returning a zero vector, which would cause errors in downstream
methods that use cosine similarity (e.g., clustering, PCA with
cosine kernel).
"""
median = np.median(y)
mad = np.median(np.abs(y - median))
if consistency_correction:
mad *= 1.4826 # Makes MAD consistent with std for normal distribution
# Add epsilon to prevent zero division and zero vectors
# This ensures compatibility with cosine-based methods
mad = max(mad, epsilon)
return (y - median) / mad
# ---------------------------------------------------------------------------
# 2. PROBABILISTIC QUOTIENT NORMALIZATION (PQN)
# ---------------------------------------------------------------------------
[docs]
def normalize_pqn(
y: np.ndarray,
reference: Optional[np.ndarray] = None,
reference_type: str = "median"
) -> np.ndarray:
"""
Probabilistic Quotient Normalization (PQN) for FT-IR spectra.
IMPORTANT: True PQN requires a reference spectrum from your dataset.
For single-spectrum processing without a reference, this function
performs "median scaling" (dividing by median intensity), which is
NOT true PQN. Use normalize_df() for proper batch PQN.
Originally from metabolomics (Dieterle et al., 2006), PQN uses
median fold-change relative to a reference spectrum, which is robust
to varying numbers/intensities of peaks.
Physical motivation: Accounts for dilution effects and path length
variations without being dominated by major peaks.
Parameters
----------
y : np.ndarray
1-D spectrum.
reference : np.ndarray, optional
Reference spectrum from your dataset.
- If provided: Performs true PQN using this reference
- If None: Falls back to "median scaling" (divides by median
of positive values). This is NOT true PQN!
reference_type : str, default "median"
Currently unused. Reserved for future use in batch processing
via normalize_df() where reference can be computed from dataset.
Returns
-------
np.ndarray
Normalized spectrum.
Notes
-----
**Single spectrum (reference=None):**
Returns: y / median(y[y > 0])
This is "median scaling", not true PQN.
**With reference spectrum:**
1. Compute quotients: q[i] = y[i] / reference[i] (where both > 0)
2. Normalization factor: median(q)
3. Returns: y / median(q)
**For batch processing:**
Use normalize_df() with method='pqn' to automatically compute
a reference spectrum (median or mean) from your dataset.
References
----------
Dieterle et al. (2006) Probabilistic quotient normalization as robust
method to account for dilution of complex biological mixtures.
Anal Chem 78(13):4281-90.
Examples
--------
>>> # Single spectrum: median scaling (NOT true PQN)
>>> y_scaled = normalize_pqn(spectrum) # reference=None
>>>
>>> # True PQN: provide reference from dataset
>>> ref = np.median(all_spectra, axis=0) # median spectrum
>>> y_pqn = normalize_pqn(spectrum, reference=ref)
>>>
>>> # Batch PQN: use normalize_df() instead
>>> df_pqn = normalize_df(df, method='pqn')
"""
if reference is None:
# FALLBACK: Median scaling for single spectrum
# This is NOT true PQN - it's just dividing by median intensity
import warnings
warnings.warn(
"PQN called without a reference spectrum. Falling back to median scaling, "
"which is NOT true PQN. For proper PQN: (1) provide a reference spectrum "
"via the 'reference' parameter, or (2) use normalize_df() for batch processing "
"which automatically computes a reference from your dataset.",
UserWarning,
stacklevel=2
)
ref_value = np.median(y[y > 0]) if np.any(y > 0) else 1.0
return y / ref_value
# TRUE PQN: Compute quotients (fold changes) relative to reference
# Only use positions where both spectra are positive
mask = (y > 0) & (reference > 0)
if not np.any(mask):
return y # Cannot normalize
quotients = y[mask] / reference[mask]
# Median quotient is the normalization factor
norm_factor = np.median(quotients)
if norm_factor == 0:
return np.zeros_like(y)
return y / norm_factor
# ---------------------------------------------------------------------------
# CONVENIENCE FUNCTION
# ---------------------------------------------------------------------------
[docs]
def normalize_novel(
y: np.ndarray,
method: str = "robust_snv",
**kwargs
) -> np.ndarray:
"""
Apply novel normalization method.
Parameters
----------
y : np.ndarray
1-D spectrum.
method : str
One of: 'robust_snv', 'curvature', 'envelope', 'entropy',
'pqn', 'total_variation', 'moments', 'derivative_ratio',
'signal_baseline'
**kwargs : method-specific parameters
Returns
-------
np.ndarray
Normalized spectrum.
"""
methods = {
'curvature': normalize_curvature_weighted,
'envelope': normalize_peak_envelope,
'entropy': normalize_entropy_weighted,
'total_variation': normalize_total_variation,
'moments': normalize_spectral_moments,
'derivative_ratio': normalize_derivative_ratio,
'signal_baseline': normalize_signal_to_baseline,
}
if method not in methods:
raise ValueError(f"Unknown method '{method}'. Choose from: {list(methods.keys())}")
return methods[method](y, **kwargs)
[docs]
def novel_normalize_method_names() -> list:
"""Return list of novel normalization method names."""
return [
'curvature', 'envelope', 'entropy',
'total_variation', 'moments', 'derivative_ratio',
'signal_baseline'
]