"""
Synthetic data generation for testing, validation, ML training data generation.
This module provides tools for generating realistic simulated spectroscopy
data from models, with support for different detector types and noise models.
Use for:
- Testing fitting algorithms with known ground truth
- Exploring parameter sensitivity and identifiability
- Optimizing experimental design (SNR requirements)
- Generating training data for machine learning
- Validating analysis pipelines
Key Features
------------
- Two detector types: analog and photon counting
- Multiple noise models: Poisson, Gaussian, or none
- 1D and 2D spectrum simulation
- Batch generation for statistical analysis
- Parameter sweeping (grid/random/uniform) for ML training
Detector Types
--------------
**Analog Detectors** (CCD, photodiodes, lock-in amplifiers):
- Continuous signal output
- Additive noise (Gaussian or Poisson)
- Noise level controlled by noise_level parameter
**Photon Counting** (APD, photomultiplier, event mode):
- Discrete photon events
- Shot noise inherent (Poisson statistics)
- Count rate determines signal-to-noise ratio
Workflow
--------
1. Testing and Validation
- Create model with trspecfit.mcp.Model
- Initialize Simulator with model and noise parameters
- Generate data with simulate_1d() or simulate_2d()
OR generate multiple realizations with simulate_n()
- Save data and ground truth with save_data()
- Fit simulated data to validate fitting pipeline
2. Machine Learning Training Data Generation
- Create model with trspecfit.mcp.Model
- Define parameter space using trspecfit.utils.sweep.ParameterSweep
- Initialize Simulator with model and noise parameters
- Generate multiple realizations (n) for each parameter combination
(data, ground truth, and relevant metadata get saved automatically)
Examples
--------
See examples/simulator/ directory for complete workflows.
"""
import json
import warnings
from pathlib import Path
from typing import cast
import h5py
import matplotlib.pyplot as plt
import numpy as np
from trspecfit.config.plot import PlotConfig
from trspecfit.mcp import Model
from trspecfit.utils import plot as uplt
from trspecfit.utils.hdf5 import require_group
from trspecfit.utils.sweep import ParameterSweep
#
#
[docs]
class Simulator:
"""
Simulate 2D time- and energy-resolved spectroscopy data with noise.
This class generates synthetic data based on a model, adding realistic noise
to simulate experimental measurements. Supports both analog detectors (with
additive noise) and photon counting detectors (with shot noise).
Parameters
----------
model : Model
Model instance from trspecfit.mcp with defined components and parameters.
Must have energy and time axes set before simulation.
detection : {'analog', 'photon_counting'}, default='analog'
Detection technique to simulate:
- 'analog': Continuous signal with additive noise
- 'photon_counting': Discrete photon events with Poisson statistics
noise_level : float, default=0.05
Noise amplitude for analog detectors (0.0-1.0 for relative noise).
Larger values = more noise. Ignored for photon_counting.
noise_type : {'poisson', 'gaussian', 'none'}, default='poisson'
Type of noise for analog detectors:
- 'poisson': Shot noise (realistic for low light)
- 'gaussian': White noise (simpler, faster)
- 'none': No noise (testing and debugging)
Ignored for photon_counting (always Poisson).
counts_per_delay : int, optional
Total photon count per time delay (photon_counting only).
Directly sets signal-to-noise ratio. Mutually exclusive with
count_rate + integration_time.
count_rate : float, optional
Photon count rate in Hz (photon_counting only).
Combined with integration_time to compute counts_per_delay.
integration_time : float, optional
Integration time per delay point in seconds (photon_counting only).
Combined with count_rate to compute counts_per_delay.
seed : int, optional
Random seed for reproducibility. If None, uses random initialization.
Attributes
----------
model : Model
Model instance used for simulation
detection : str
Detection type ('analog' or 'photon_counting')
seed : int or None
Random seed value
noise_level : float
Analog detector noise level
noise_type : str
Analog detector noise type
counts_per_delay : int
Photon counting detector count budget
count_rate : float or None
Photon counting detector rate
integration_time : float or None
Photon counting integration time per delay
data_clean : ndarray or None
Most recently generated clean (noiseless) data
data_noisy : ndarray or None
Most recently generated noisy data
noise : ndarray or None
Most recently generated noise component (noisy - clean)
Examples
--------
See examples/simulator/ directory for complete workflows.
Notes
-----
**Analog vs. Photon Counting:**
Analog detectors (CCD, photodiode, lock-in):
- Pros: High dynamic range, simple operation
- Cons: Read noise, dark current
- Simulation: Continuous signal + additive noise
Photon counting (APD, PMT, event mode):
- Pros: No read noise, single-photon sensitivity
- Cons: Dead time, count rate limits, pulse pileup
- Simulation: Discrete events following Poisson statistics
**Noise Level Selection:**
For analog detectors, noise_level is relative to signal:
- 0.01 (1%): Very clean, ideal conditions
- 0.05 (5%): Typical good data
- 0.10 (10%): Moderate noise, still fittable
- 0.20 (20%): Challenging, may need averaging
For photon counting, SNR set by counts_per_delay:
- 100 counts: SNR ~ 10 (marginal)
- 1000 counts: SNR ~ 32 (good)
- 10000 counts: SNR ~ 100 (excellent)
**Photon Counting Parameter Resolution:**
The simulator resolves photon counting parameters as:
1. If counts_per_delay specified directly → use it
2. Else if count_rate and integration_time specified → compute counts_per_delay
3. Else → estimate from model scale (prints warning)
The third case assumes model amplitudes represent realistic count rates,
which may not be true. Always specify counts_per_delay or (count_rate,
integration_time) explicitly for accurate photon counting simulation.
**Memory Usage:**
Large 2D datasets can use significant memory:
- Single dataset: ~8 MB per 1000×500 spectrum (float64)
- simulate_n(n=100): ~800 MB for same size
- Consider smaller grids or batch processing for large n
See Also
--------
trspecfit.mcp.Model : Model class for simulation
simulate_1d : Generate 1D spectrum
simulate_2d : Generate 2D spectrum
simulate_n : Generate multiple realizations
save_data : Save simulated data to HDF5
"""
#
def __init__(
self,
model: Model,
detection: str = "analog",
noise_level: float = 0.05,
noise_type: str = "poisson",
counts_per_delay: int | None = None,
count_rate: float | None = None,
integration_time: float | None = None,
seed: int | None = None,
) -> None:
"""
Initialize simulator with a model and noise parameters.
Parameters
----------
model : Model
Model instance with defined components and parameters.
Must have model.energy and (for 2D) model.time set.
detection : {'analog', 'photon_counting'}, default='analog'
Detection technique to simulate
noise_level : float, default=0.05
Noise amplitude for analog detectors (0.0-1.0 relative to signal)
noise_type : {'poisson', 'gaussian', 'none'}, default='poisson'
Noise type for analog detectors
counts_per_delay : int, optional
Total photons per delay (photon_counting only)
count_rate : float, optional
Photon rate in Hz (photon_counting only)
integration_time : float, optional
Integration time per delay in seconds (photon_counting only)
seed : int, optional
Random seed for reproducibility
Raises
------
ValueError
If detection type is invalid
If counts_per_delay ≤ 0 (after estimation)
Examples
--------
>>> # Analog with default settings
>>> sim = Simulator(model)
>>> # Analog with custom noise
>>> sim = Simulator(model, noise_level=0.1, noise_type='gaussian')
>>> # Photon counting with direct count specification
>>> sim = Simulator(model, detection='photon_counting',
... counts_per_delay=5000)
>>> # Photon counting with rate specification
>>> sim = Simulator(model, detection='photon_counting',
... count_rate=1e6, integration_time=0.01)
>>> # Reproducible simulation
>>> sim = Simulator(model, seed=42)
"""
self.model = model
self.detection = detection.lower()
self.seed = seed
# Analog detector parameters
self.noise_level = noise_level
self.noise_type = noise_type.lower()
# Photon counting parameters
self.counts_per_delay: int | None = counts_per_delay
self.count_rate: float | None = count_rate
self.integration_time: float | None = integration_time
# Validate and resolve photon counting parameters
if self.detection == "photon_counting":
self._resolve_photon_counting_params()
# Validate detection type
if self.detection not in ["analog", "photon_counting"]:
raise ValueError(
f"detection must be 'analog' or 'photon_counting', got '{detection}'"
)
# Set random seed if provided
self.rng = np.random.default_rng(seed)
# Storage for simulated data
self.data_clean: np.ndarray | None = None # Without noise
self.data_noisy: np.ndarray | None = None # With noise
self.noise: np.ndarray | None = None # Just the noise component
#
def __repr__(self) -> str:
return f"Simulator(model='{self.model.name}', detection='{self.detection}')"
#
def _resolve_photon_counting_params(self) -> None:
"""
Resolve photon counting parameters. User can specify either:
- counts_per_delay directly, OR
- count_rate and integration_time (will compute counts_per_delay), OR
- nothing (will estimate from model scale with warning)
"""
if self.counts_per_delay is not None:
# Direct specification takes precedence
if self.count_rate is not None or self.integration_time is not None:
warnings.warn(
"counts_per_delay specified directly."
" Ignoring count_rate and integration_time.",
stacklevel=2,
)
elif self.count_rate is not None and self.integration_time is not None:
# Calculate from rate and time
self.counts_per_delay = int(self.count_rate * self.integration_time)
else:
# Estimate from model: assume clean_data represents count rates
# Generate clean data to get the scale
if self.model.time is not None and len(self.model.time) > 0:
# 2D data
self.model.create_value_2d()
clean_data = self.model.value_2d
else:
# 1D data
self.model.create_value_1d()
clean_data = self.model.value_1d
if clean_data is None:
raise RuntimeError(
"Model evaluation did not produce clean data"
" for photon counting estimation"
)
# Estimate counts_per_delay as the average total signal per time step
# This assumes the model amplitudes are in a realistic count rate range
signal_positive = np.abs(clean_data)
if clean_data.ndim == 2:
row_totals = np.sum(signal_positive, axis=1)
self.counts_per_delay = int(np.mean(row_totals))
else:
self.counts_per_delay = int(np.sum(signal_positive))
warnings.warn(
f"No photon count specified for photon_counting detection. "
f"Estimating from model: {self.counts_per_delay:.2e} counts/delay. "
f"For accurate simulation, specify counts_per_delay"
f" or (count_rate, integration_time). "
f"This estimate assumes your model amplitudes"
f" represent realistic count rates.",
stacklevel=2,
)
# Ensure counts_per_delay is positive
if self.counts_per_delay <= 0:
raise ValueError(
f"counts_per_delay must be positive, got {self.counts_per_delay}.\n"
f"Model may have negative or zero signal."
f" Check your model or specify counts_per_delay manually."
)
#
[docs]
def generate_clean_data(self, dim: int = 2, t_ind: int = 0) -> np.ndarray:
"""
Generate clean data from model (no noise).
Parameters
----------
dim : int
Dimension (1 for 1D, 2 for 2D).
t_ind : int
Time index for 1D simulations (ignored for 2D).
Returns
-------
ndarray
Clean data array (1D or 2D depending on dim).
"""
if dim == 1:
self.model.create_value_1d(t_ind=t_ind, return_1d=False)
assert self.model.value_1d is not None
self.data_clean = self.model.value_1d.copy()
elif dim == 2:
self.model.create_value_2d()
assert self.model.value_2d is not None
self.data_clean = self.model.value_2d.copy()
else:
raise ValueError(f"dim must be 1 or 2, got {dim}")
if self.data_clean is None:
raise RuntimeError("Model evaluation did not produce clean data")
return self.data_clean
#
[docs]
def add_noise(
self, clean_data: np.ndarray, dim: int = 2
) -> tuple[np.ndarray, np.ndarray]:
"""
Add noise to clean data based on detection technique.
Parameters
----------
clean_data : ndarray
Clean data array (1D or 2D).
dim : int
Dimension (1 for 1D, 2 for 2D).
Returns
-------
tuple of (ndarray, ndarray)
Tuple of (noisy_data, noise).
"""
if self.detection == "analog":
# Use traditional noise addition
if dim == 1:
noise = self._generate_noise_analog_1d(clean_data)
elif dim == 2:
noise = self._generate_noise_analog_2d(clean_data)
else:
raise ValueError(f"dim must be 1 or 2, got {dim}")
noisy_data = clean_data + noise
elif self.detection == "photon_counting":
# Sample photons according to signal distribution
if dim == 1:
noisy_data = self._sample_photons_1d(clean_data)
elif dim == 2:
noisy_data = self._sample_photons_2d(clean_data)
else:
raise ValueError(f"dim must be 1 or 2, got {dim}")
noise = noisy_data - clean_data
else:
raise ValueError(f"Unknown detection type: {self.detection}")
return noisy_data, noise
#
[docs]
def simulate_1d(self, t_ind: int = 0) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Simulate 1D spectrum (energy-resolved) at a specific time point.
Generates a single energy-resolved spectrum from the model at the
specified time index, adds appropriate noise for the detector type,
and stores results for later access.
Parameters
----------
t_ind : int, default=0
Time index for which to generate spectrum.
For models without time-dependence, use default 0.
Returns
-------
clean_data : ndarray
Noiseless spectrum from model (shape: [n_energy])
noisy_data : ndarray
Spectrum with added noise (shape: [n_energy])
noise : ndarray
Noise component (noisy - clean, shape: [n_energy])
Examples
--------
>>> # Simulate baseline spectrum
>>> sim = Simulator(model, noise_level=0.05)
>>> clean, noisy, noise = sim.simulate_1d(t_ind=0)
>>>
>>> # Plot comparison
>>> plt.plot(model.energy, clean, 'k-', label='Clean')
>>> plt.plot(model.energy, noisy, 'r.', label='Noisy', ms=2)
>>> plt.legend()
>>> # Calculate SNR
>>> snr = sim.get_snr()
>>> print(f"Signal-to-noise ratio: {snr:.1f}")
>>> # Simulate different time points
>>> for t_i in [0, 50, 100]:
... clean, noisy, noise = sim.simulate_1d(t_ind=t_i)
... plt.plot(model.energy, noisy, label=f't={model.time[t_i]:.1f}')
Notes
-----
Results are stored in simulator attributes for later access:
- self.data_clean: (Last) clean spectrum
- self.data_noisy: Last noisy spectrum
- self.noise: Last noise realization
Can access these without re-simulation:
>>> sim.simulate_1d()
>>> snr = sim.get_snr() # Uses stored data
>>> sim.plot_comparison(dim=1) # Uses stored data
See Also
--------
simulate_2d : Simulate full 2D spectrum
simulate_n : Generate multiple 1D realizations
plot_comparison : Visualize results
"""
# Generate clean spectrum from model
clean_data = self.generate_clean_data(dim=1, t_ind=t_ind)
# Add noise
noisy_data, noise = self.add_noise(clean_data, dim=1)
# Store for later use (e.g., plotting, SNR calculation)
self.data_clean = clean_data
self.data_noisy = noisy_data
self.noise = noise
return clean_data, noisy_data, noise
#
[docs]
def simulate_2d(self) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Simulate 2D spectrum (time- and energy-resolved).
Generates a complete 2D time- and energy-resolved spectrum from the
model, adds appropriate noise for each time point, and stores results.
Returns
-------
clean_data : ndarray
Noiseless 2D spectrum from model (shape: [n_time, n_energy])
noisy_data : ndarray
2D spectrum with added noise (shape: [n_time, n_energy])
noise : ndarray
Noise component (noisy - clean, shape: [n_time, n_energy])
Examples
--------
>>> # Basic 2D simulation
>>> sim = Simulator(model, noise_level=0.05)
>>> clean, noisy, noise = sim.simulate_2d()
>>>
>>> # Visualize with built-in plotter
>>> sim.plot_comparison(dim=2)
>>> # Manual visualization
>>> fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
>>> ax1.pcolormesh(model.energy, model.time, clean)
>>> ax1.set_title('Clean Model')
>>> ax2.pcolormesh(model.energy, model.time, noisy)
>>> ax2.set_title(f'Noisy (SNR={sim.get_snr():.1f})')
>>> # Test fitting on simulated data
>>> clean, noisy, noise = sim.simulate_2d()
>>> # ... set up fitting ...
>>> file.data = noisy # Use noisy data for fit
>>> file.fit_2d(model_name='test', stages=2)
>>> # Compare fitted vs. true parameters
>>> # Vary noise level to study impact
>>> for noise_level in [0.01, 0.05, 0.10]:
... sim.set_noise_level(noise_level)
... clean, noisy, noise = sim.simulate_2d()
... snr = sim.get_snr()
... print(f"Noise level {noise_level:.2f}: SNR = {snr:.1f}")
Notes
-----
**Noise Application:**
For analog detectors, noise is added independently at each pixel.
For photon counting, photons are distributed across all pixels
according to the signal probability distribution, then reconverted
to same scale as input for direct comparison.
**Performance:**
Simulation time scales with:
- Model evaluation time (dominates for complex models)
- Array size (n_time × n_energy)
- Noise generation method
Typical times:
- Simple model, 200×500 array: ~0.1-1 second
- Complex model with time-dependence: ~1-10 seconds
- Photon counting slightly slower than analog
**Memory:**
Three arrays stored (clean, noisy, noise), each:
- Size: n_time × n_energy × 8 bytes (float64)
- Example: 200×500 = ~2.4 MB per array, ~7.2 MB total
See Also
--------
simulate_1d : Simulate single spectrum
simulate_n : Generate multiple 2D realizations
plot_comparison : Visualize results
save_data : Save to HDF5 file
"""
# Generate clean spectrum from model
clean_data = self.generate_clean_data(dim=2)
# Add noise
noisy_data, noise = self.add_noise(clean_data, dim=2)
# Store for later use (e.g., plotting, SNR calculation)
self.data_clean = clean_data
self.data_noisy = noisy_data
self.noise = noise
return clean_data, noisy_data, noise
#
[docs]
def simulate_n(
self, n: int, *, dim: int = 2, t_ind: int = 0, show_progress: bool = True
) -> tuple[np.ndarray, list[np.ndarray], list[np.ndarray]]:
"""
Generate n simulated datasets with independent noise realizations.
Generates the clean data ONCE from the model, then adds n independent
noise realizations. Use for statistical analysis of fitting algorithms
and uncertainty quantification or machine learning model training.
Parameters
----------
n : int
Number of datasets to generate (must be >= 1)
dim : {1, 2}, default=2
Dimensionality:
- 1: Generate 1D spectra
- 2: Generate 2D spectra
t_ind : int, default=0
Time index for 1D simulations (ignored for dim=2)
show_progress : bool, default=True
Print progress updates during generation
Returns
-------
clean_data : ndarray
Single clean dataset (1D or 2D depending on dim).
Same for all n realizations (generated once).
noisy_data_list : list of ndarray
List of n noisy datasets, each with independent noise.
Each element has same shape as clean_data.
noise_list : list of ndarray
List of n noise realizations (noisy - clean for each dataset).
Each element has same shape as clean_data.
Examples
--------
>>> # Generate 20 independent noisy datasets
>>> sim = Simulator(model, noise_level=0.05)
>>> clean, noisy_list, noise_list = sim.simulate_n(n=20, dim=2)
>>>
>>> # Fit each dataset and analyze parameter distribution
>>> fitted_params = []
>>> for noisy_data in noisy_list:
... file.data = noisy_data
... file.fit_2d('test', stages=2)
... fitted_params.append(model.lmfit_pars['amplitude'].value)
>>>
>>> # Check parameter recovery
>>> true_value = model.lmfit_pars['amplitude'].value
>>> mean_fitted = np.mean(fitted_params)
>>> std_fitted = np.std(fitted_params)
>>> print(f"True: {true_value:.2f}")
>>> print(f"Mean fitted: {mean_fitted:.2f} ± {std_fitted:.2f}")
>>> # Analyze noise statistics
>>> noise_mean = np.mean(noise_list, axis=0)
>>> noise_std = np.std(noise_list, axis=0)
>>>
>>> # Should be close to zero (unbiased)
>>> print(f"Noise mean: {np.mean(noise_mean):.2e}")
>>> # Should match noise_level * signal scale
>>> print(f"Noise std: {np.mean(noise_std):.2e}")
>>> # Save multiple realizations for later use
>>> clean, noisy_list, noise_list = sim.simulate_n(n=100, dim=2)
>>> sim.save_data(
... filepath='simulations/batch_001.h5',
... n_data=noisy_list
... )
>>> # Test convergence of fitted parameters with n
>>> for n_datasets in [5, 10, 20, 50]:
... clean, noisy_list, _ = sim.simulate_n(n=n_datasets, dim=2)
... # ... fit each and compute parameter statistics ...
... print(f"n={n_datasets}: parameter std = {param_std:.3f}")
Notes
-----
**Efficiency:**
Generating clean data once and adding n noise realizations is much
faster than generating n complete simulations:
- This method: 1 model evaluation + n noise additions
- n separate simulate_2d calls: n model evaluations + n noise additions
For complex models where evaluation is slow, this can save
minutes to hours of computation time.
**Statistical Analysis:**
This function enables:
- Monte Carlo analysis of fitting uncertainty
- Algorithm validation (can recover true parameters?)
- Bias detection (systematic fitting errors)
- Confidence interval validation (coverage probability)
- Experimental design optimization (required SNR)
**Memory Considerations:**
All n datasets stored in memory as lists:
- Memory usage: n × (n_time × n_energy × 8 bytes)
- Example: 100 datasets of 200×500 = ~800 MB
For very large n or large arrays, consider:
- Processing in batches
- Saving to disk incrementally
- Using generator pattern instead of list
**Progress Display:**
When show_progress=True, prints:
- "Generating clean data from model... Done"
- "Adding noise to dataset i/n" (updates in place)
- "Generated n noisy datasets successfully"
Set show_progress=False for batch processing or when redirecting output.
See Also
--------
simulate_1d : Single 1D simulation
simulate_2d : Single 2D simulation
save_data : Save multiple datasets to HDF5
"""
if n < 1:
raise ValueError(f"n must be >= 1, got {n}")
# Generate clean data ONCE
if show_progress:
print("Generating clean data from model...", end=" ")
clean_data = self.generate_clean_data(dim=dim, t_ind=t_ind)
if show_progress:
print("Done")
# Generate n independent noise realizations
noisy_data_list = []
noise_list = []
for i in range(n):
if show_progress:
print(f"Adding noise to dataset {i + 1}/{n}", end="\r")
# Just add noise to the same clean data
noisy_data, noise = self.add_noise(clean_data, dim=dim)
noisy_data_list.append(noisy_data)
noise_list.append(noise)
if show_progress:
print(f"Generated {n} noisy datasets successfully")
# Store last realization for later use
self.data_clean = clean_data
self.data_noisy = noisy_data_list[-1]
self.noise = noise_list[-1]
return clean_data, noisy_data_list, noise_list
#
def _generate_noise_analog_1d(self, signal: np.ndarray) -> np.ndarray:
"""
Generate 1D noise array for analog detectors.
Parameters
----------
signal : ndarray
Clean signal array.
Returns
-------
ndarray
Noise array with same shape as signal.
"""
if self.noise_type == "none":
return np.zeros_like(signal)
if self.noise_type == "gaussian":
# Gaussian noise with amplitude proportional to noise_level
noise_amp = self.noise_level * np.max(np.abs(signal))
return cast("np.ndarray", self.rng.normal(0, noise_amp, signal.shape))
if self.noise_type == "poisson":
# Poisson noise: scale signal to photon counts, add noise, scale back
# Avoid negative values in Poisson distribution
signal_positive = np.abs(signal)
# Scale signal to photon counts (higher = less relative noise)
scale_factor = 1.0 / (self.noise_level + 1e-10) # Avoid division by zero
signal_scaled = signal_positive * scale_factor
# Generate Poisson noise
noisy_scaled = self.rng.poisson(signal_scaled)
# Scale back and compute noise component
signal_noisy = noisy_scaled / scale_factor
noise = signal_noisy - signal_positive
# Restore original sign
return cast("np.ndarray", noise * np.sign(signal))
raise ValueError(
f"Unknown noise type: {self.noise_type}. "
"Use 'poisson', 'gaussian', or 'none'"
)
#
def _generate_noise_analog_2d(self, signal: np.ndarray) -> np.ndarray:
"""
Generate 2D noise array for analog detectors.
Parameters
----------
signal : ndarray
Clean 2D signal array.
Returns
-------
ndarray
2D noise array with same shape as signal.
"""
if self.noise_type == "none":
return np.zeros_like(signal)
if self.noise_type == "gaussian":
# Gaussian noise with amplitude proportional to noise_level
noise_amp = self.noise_level * np.max(np.abs(signal))
return cast("np.ndarray", self.rng.normal(0, noise_amp, signal.shape))
if self.noise_type == "poisson":
# Poisson noise: scale signal to photon counts, add noise, scale back
signal_positive = np.abs(signal)
# Scale signal to photon counts
scale_factor = 1.0 / (self.noise_level + 1e-10)
signal_scaled = signal_positive * scale_factor
# Generate Poisson noise
noisy_scaled = self.rng.poisson(signal_scaled)
# Scale back and compute noise component
signal_noisy = noisy_scaled / scale_factor
noise = signal_noisy - signal_positive
# Restore original sign
return cast("np.ndarray", noise * np.sign(signal))
raise ValueError(
f"Unknown noise type: {self.noise_type}. "
"Use 'poisson', 'gaussian', or 'none'"
)
#
def _sample_photons_1d(self, signal: np.ndarray) -> np.ndarray:
"""
Sample photons for 1D photon counting detection.
Each energy pixel independently draws from a Poisson distribution
where the expected count is proportional to the signal intensity.
The signal is scaled so the total expected counts across all energy
pixels equals counts_per_delay.
Parameters
----------
signal : ndarray
Clean signal array (represents expected count rate).
Returns
-------
ndarray
Noisy data array in same units as input signal.
"""
signal_positive = np.abs(signal)
total_signal = np.sum(signal_positive)
if total_signal == 0:
return np.zeros_like(signal)
if self.counts_per_delay is None:
raise ValueError(
"counts_per_delay must be defined for photon counting simulation"
)
# Scale signal to expected photon counts
# Total expected counts across all pixels = counts_per_delay
scale_factor = self.counts_per_delay / total_signal
expected_counts = signal_positive * scale_factor
# Independent Poisson draw per pixel
photon_counts = self.rng.poisson(expected_counts)
# Scale back to original signal units and restore sign
# (negative signal = bleach/emission, positive = absorption)
noisy_data = photon_counts / scale_factor * np.sign(signal)
return cast("np.ndarray", noisy_data)
#
def _sample_photons_2d(self, signal: np.ndarray) -> np.ndarray:
"""
Sample photons for 2D photon counting detection.
Applies independent Poisson noise per pixel across the full 2D array.
The signal is scaled so the average total expected counts per time step
equals counts_per_delay. Time steps with stronger signal naturally
accumulate more photons (better SNR), matching real experiments where
each time delay is measured for the same integration time.
Parameters
----------
signal : ndarray
Clean 2D signal array (shape: [n_time, n_energy]).
Returns
-------
ndarray
Noisy 2D data array in same units as input signal.
"""
signal_positive = np.abs(signal)
# Average total signal per time step
row_totals = np.sum(signal_positive, axis=1)
mean_row_total = np.mean(row_totals)
if mean_row_total == 0:
return np.zeros_like(signal)
if self.counts_per_delay is None:
raise ValueError(
"counts_per_delay must be defined for photon counting simulation"
)
# Scale so the average row has counts_per_delay total expected counts
scale_factor = self.counts_per_delay / mean_row_total
expected_counts = signal_positive * scale_factor
# Independent Poisson draw per pixel
photon_counts = self.rng.poisson(expected_counts)
# Scale back to original signal units and restore sign
# (negative signal = bleach/emission, positive = absorption)
noisy_data = photon_counts / scale_factor * np.sign(signal)
return cast("np.ndarray", noisy_data)
#
[docs]
def set_noise_level(self, noise_level: float) -> None:
"""
Update noise level (analog detectors only).
Parameters
----------
noise_level : float
Standard deviation of Gaussian noise (absolute units).
"""
if self.detection != "analog":
warnings.warn(
"noise_level only applies to analog detection",
stacklevel=2,
)
self.noise_level = noise_level
#
[docs]
def set_noise_type(self, noise_type: str) -> None:
"""
Update noise type (analog detectors only).
Parameters
----------
noise_type : str
Noise distribution: ``'gaussian'`` or ``'uniform'``.
"""
if self.detection != "analog":
warnings.warn(
"noise_type only applies to analog detection",
stacklevel=2,
)
self.noise_type = noise_type.lower()
#
[docs]
def set_counts_per_delay(self, counts_per_delay: int) -> None:
"""
Update counts per delay (photon counting only).
Parameters
----------
counts_per_delay : int
Total photon counts collected per delay step.
"""
if self.detection != "photon_counting":
warnings.warn(
"counts_per_delay only applies to photon_counting detection",
stacklevel=2,
)
self.counts_per_delay = counts_per_delay
#
[docs]
def set_count_rate(
self, count_rate: float, integration_time: float | None = None
) -> None:
"""
Update count rate (photon counting only).
Parameters
----------
count_rate : float
Photon rate in Hz.
integration_time : float | None
Integration time per delay in seconds.
If None, uses existing value.
"""
if self.detection != "photon_counting":
warnings.warn(
"count_rate only applies to photon_counting detection",
stacklevel=2,
)
return
self.count_rate = count_rate
if integration_time is not None:
self.integration_time = integration_time
if self.integration_time is not None:
self.counts_per_delay = int(self.count_rate * self.integration_time)
else:
raise ValueError(
"integration_time must be set to calculate"
" counts_per_delay from count_rate"
)
#
[docs]
def set_seed(self, seed: int | None) -> None:
"""
Update random seed.
Parameters
----------
seed : int or None
Random seed for reproducibility. None for non-deterministic.
"""
self.seed = seed
self.rng = np.random.default_rng(seed)
#
[docs]
def get_snr(self, scale: str = "linear") -> float:
"""
Calculate Signal-to-Noise Ratio (SNR).
Computes SNR from the most recently simulated data using power-based
definition: SNR = signal_power / noise_power.
Parameters
----------
scale : {'linear', 'dB'}, default='linear'
Output scale:
- 'linear': SNR as ratio (e.g., 25.0)
- 'dB': SNR in decibels (e.g., 13.98 dB)
Returns
-------
float
SNR value in requested scale.
Returns np.inf if noise_power is exactly zero.
Raises
------
ValueError
If no simulated data available (must call simulate_1d/2d/n first),
or if scale is not 'linear' or 'dB'.
Examples
--------
>>> # Calculate SNR after simulation
>>> sim = Simulator(model, noise_level=0.05)
>>> clean, noisy, noise = sim.simulate_2d()
>>>
>>> snr_linear = sim.get_snr(scale='linear')
>>> print(f"SNR: {snr_linear:.1f}")
SNR: 25.3
>>>
>>> snr_db = sim.get_snr(scale='dB')
>>> print(f"SNR: {snr_db:.1f} dB")
SNR: 14.0 dB
>>> # Compare SNR across noise levels
>>> for noise_level in [0.01, 0.05, 0.10, 0.20]:
... sim.set_noise_level(noise_level)
... sim.simulate_2d()
... snr = sim.get_snr()
... print(f"Noise {noise_level:.2f}: SNR = {snr:.1f}")
Noise 0.01: SNR = 625.0
Noise 0.05: SNR = 25.0
Noise 0.10: SNR = 6.2
Noise 0.20: SNR = 1.6
>>> # Plot SNR vs photon count
>>> counts = [100, 500, 1000, 5000, 10000]
>>> snrs = []
>>> for count in counts:
... sim = Simulator(model, detection='photon_counting',
... counts_per_delay=count)
... sim.simulate_2d()
... snrs.append(sim.get_snr())
>>> plt.loglog(counts, snrs, 'o-')
>>> plt.xlabel('Counts per delay')
>>> plt.ylabel('SNR')
Notes
-----
**SNR Definition:**
Uses power-based (energy) definition:
SNR_linear = (mean(signal²)) / (mean(noise²))
SNR_dB = 10 × log₁₀(SNR_linear)
This differs from amplitude-based definition (20 log₁₀) by factor of 2.
Power-based is standard in signal processing and communications.
**Interpretation:**
Linear scale:
- SNR = 1: Signal and noise have equal power (marginal)
- SNR = 10: Signal 10× stronger than noise (good)
- SNR = 100: Signal 100× stronger than noise (excellent)
dB scale:
- 0 dB: Equal signal and noise
- 10 dB: 10× signal power (good)
- 20 dB: 100× signal power (excellent)
- Each 10 dB = 10× power ratio
**Typical Values:**
For spectroscopy data:
- SNR < 5 (< 7 dB): Difficult to fit reliably
- SNR 5-20 (7-13 dB): Good quality, typical experimental data
- SNR 20-100 (13-20 dB): High quality
- SNR > 100 (> 20 dB): Exceptional, near ideal
**Limitations:**
This is a global SNR averaged over entire spectrum. Local SNR
may vary significantly, especially for:
- Weak features vs. strong peaks
- Time-dependent signals (varying amplitude)
- Non-uniform noise (detector artifacts)
For accurate local SNR, compute on regions of interest separately.
See Also
--------
simulate_1d : Must call before get_snr
simulate_2d : Must call before get_snr
plot_comparison : Shows SNR in title
"""
if self.data_clean is None or self.noise is None:
raise ValueError(
"No simulated data available. Run simulate_1d or simulate_2d first."
)
signal_power = np.mean(self.data_clean**2)
noise_power = np.mean(self.noise**2)
if noise_power == 0:
return float(np.inf)
if scale == "linear":
return float(signal_power / noise_power)
if scale == "dB":
return float(10 * np.log10(signal_power / noise_power))
raise ValueError("scale must be either 'linear' or 'dB'")
#
[docs]
def plot_comparison(
self,
t_ind: int = 0,
dim: int = 1,
snr_scale: str = "linear",
*,
save_img: int = 0,
config: PlotConfig | None = None,
**plot_kwargs,
) -> None:
"""
Plot comparison of clean vs noisy data.
Creates visualization showing clean model data, noisy simulated data,
and noise component side-by-side. Essential for visually assessing
simulation quality and noise characteristics.
Parameters
----------
t_ind : int, default=0
Time index for 1D plots (ignored for dim=2)
dim : {1, 2}, default=1
Dimensionality:
- 1: Create 1D plot with clean, noisy, and noise curves
- 2: Create three-panel 2D plot (clean, noisy, noise)
snr_scale : {'linear', 'dB'}, default='linear'
Scale for SNR display in title:
- 'linear': Show as ratio (e.g., "SNR: 25.0 linear")
- 'dB': Show in decibels (e.g., "SNR: 14.0 dB")
save_img : int, default=0
0: display, 1: save+display, -1: save only, -2: close (no display/save)
config : PlotConfig, optional
Override the model's inherited plot configuration for this call.
If None, uses the model's own plot_config.
**plot_kwargs : dict
Per-call overrides for any PlotConfig field (e.g. ``z_colormap``,
``ticksize``). Applied on top of *config*.
Examples
--------
>>> # 1D comparison
>>> sim = Simulator(model, noise_level=0.05)
>>> sim.simulate_1d(t_ind=0)
>>> sim.plot_comparison(dim=1)
>>> # 2D comparison with dB scale
>>> sim = Simulator(model, noise_level=0.05)
>>> sim.simulate_2d()
>>> sim.plot_comparison(dim=2, snr_scale='dB')
>>> # Compare different noise levels visually
>>> fig, axes = plt.subplots(3, 1, figsize=(10, 12))
>>> for i, noise_level in enumerate([0.01, 0.05, 0.10]):
... sim.set_noise_level(noise_level)
... sim.simulate_1d()
... # ... manual plotting on axes[i] ...
>>> # Check photon counting vs analog
>>> sim_analog = Simulator(model, detection='analog', noise_level=0.05)
>>> sim_photon = Simulator(model, detection='photon_counting',
... counts_per_delay=1000)
>>> sim_analog.simulate_2d()
>>> sim_photon.simulate_2d()
>>> # ... compare visually ...
Notes
-----
**1D Plot Layout:**
Single plot with three traces:
- Clean: Black line (ground truth)
- Noisy: Red scatter points (simulated data)
- Noise: Gray line (noise component)
Scatter points for noisy data help visualize noise granularity.
**2D Plot Layout:**
Three side-by-side panels:
- Left: Clean model data
- Center: Noisy simulated data (with SNR in title)
- Right: Noise component (difference)
All use same colormap from model.plot_config for consistency.
**Visual Assessment:**
Good simulation should show:
- Noisy data follows clean data trend
- Noise is randomly distributed (no patterns)
- SNR appropriate for intended use case
- Peak features still distinguishable in noisy data
If noise dominates signal (SNR << 1), features may be
completely obscured - increase signal or reduce noise.
**Configuration:**
Plot uses model.plot_config for:
- Axis labels (energy/time labels)
- Axis direction (e.g., reversed energy)
- Colormap (for 2D plots)
- DPI settings
This ensures consistency with other trspecfit plots.
See Also
--------
simulate_1d : Generate 1D data to plot
simulate_2d : Generate 2D data to plot
get_snr : SNR calculation shown in title
"""
detection_str = f" [{self.detection}]"
plt_title = (
f"Simulated Data (SNR: {self.get_snr(scale=snr_scale):.1f}"
f" {snr_scale}){detection_str}"
)
if dim == 1:
if self.data_clean is None:
self.simulate_1d(t_ind)
if self.data_clean is None or self.data_noisy is None or self.noise is None:
raise RuntimeError("Simulation data not available for plotting")
# Get config from model
resolved_config = config or self.model.plot_config
# Plot with noisy data as scatter
_call_kwargs: dict = {
"title": plt_title,
"legend": ["Clean", "Noisy", "Noise"],
"linestyles": ["-", "", "-"], # Empty string = no line for noisy data
"markers": [None, "o", None], # Scatter points for noisy data
"markersizes": [6, 3, 6], # Smaller markers for noisy data
"save_img": save_img,
}
_call_kwargs.update(plot_kwargs)
uplt.plot_1d(
data=[self.data_clean, self.data_noisy, self.noise],
x=self.model.energy,
config=resolved_config,
**_call_kwargs,
)
elif dim == 2:
if self.data_clean is None:
self.simulate_2d()
if self.data_clean is None or self.data_noisy is None or self.noise is None:
raise RuntimeError("Simulation data not available for plotting")
# Get config from model
resolved_config = config or self.model.plot_config
if plot_kwargs:
resolved_config = resolved_config.copy(**plot_kwargs)
# Create 3-panel plot
_fig, axes = plt.subplots(1, 3, figsize=(15, 4))
panels = [
(self.data_clean, "Clean Model Data"),
(self.data_noisy, plt_title),
(self.noise, "Noise (Simulated - Clean)"),
]
for ax, (data, title) in zip(axes, panels, strict=True):
im = ax.pcolormesh(
self.model.energy,
self.model.time,
data,
shading="nearest",
cmap=resolved_config.z_colormap,
)
ax.set_title(title)
ax.set_xlabel(resolved_config.x_label)
ax.set_ylabel(resolved_config.y_label)
if resolved_config.ticksize is not None:
ax.tick_params(labelsize=resolved_config.ticksize)
uplt._apply_axis_settings(
ax,
x_type=resolved_config.x_type,
x_dir=resolved_config.x_dir,
y_type=resolved_config.y_type,
y_dir=resolved_config.y_dir,
x_lim=resolved_config.x_lim,
y_lim=resolved_config.y_lim,
)
plt.colorbar(im, ax=ax)
plt.tight_layout()
uplt._finalize_plot(
save_img=save_img,
save_path="",
dpi_save=resolved_config.dpi_save,
)
#
[docs]
def save_data(
self,
*,
filepath: str | None = None,
save_format: str = "hdf5",
n_data: list[np.ndarray] | None = None,
overwrite: bool = True,
show_output: int = 1,
) -> None:
"""
Save simulated data to file with metadata.
Exports simulated data in HDF5 format with complete metadata including
model parameters, noise settings, and experimental axes. Essential for
sharing simulated datasets and ensuring reproducibility.
Parameters
----------
filepath : str or Path, optional
Path where to save data. If None, uses default:
'./simulated_data/simulated_data.h5'
If provided path doesn't include 'simulated_data' directory,
it will be automatically placed there.
save_format : str, default='hdf5'
File format. Currently only 'hdf5' supported.
Future: could add .mat, .npz, etc.
n_data : list of ndarray, optional
Multiple noisy datasets from simulate_n() to save.
If None, saves single dataset from simulate_1d() or simulate_2d().
overwrite : bool, default=True
If True, overwrite existing files.
If False, raise FileExistsError if file exists.
show_output : int, default=1
Output mode:
- 0: Silent / programmatic / API mode -- no prints
- 1: Interactive / notebook / UI mode -- show timing and
save confirmation
Raises
------
ValueError
If no simulated data available (must call simulate first)
FileExistsError
If file exists and overwrite=False
Examples
--------
>>> # Save single simulation
>>> sim = Simulator(model, noise_level=0.05, seed=42)
>>> clean, noisy, noise = sim.simulate_2d()
>>> sim.save_data('simulation_001.h5')
Data saved to: ./simulated_data/simulation_001.h5
>>> # Save multiple realizations
>>> clean, noisy_list, noise_list = sim.simulate_n(n=50, dim=2)
>>> sim.save_data(
... filepath='batch_simulation.h5',
... n_data=noisy_list
... )
Data saved to: ./simulated_data/batch_simulation.h5
>>> # Prevent accidental overwrites
>>> sim.save_data('important_data.h5', overwrite=False)
FileExistsError: File already exists: ./simulated_data/important_data.h5
Set overwrite=True to overwrite, or provide a different filepath.
>>> # Load saved data later
>>> import h5py
>>> with h5py.File('simulated_data/simulation_001.h5', 'r') as f:
... energy = f['energy'][:]
... time = f['time'][:]
... clean = f['clean_data'][:]
... noisy = f['simulated_data/000000'][:]
...
... # Read metadata
... noise_level = f['metadata'].attrs['noise_level']
... model_params = f['metadata'].attrs['model_parameters']
Notes
-----
**HDF5 File Structure:**
::
/
├── energy (dataset: 1D array)
├── time (dataset: 1D array, empty for 1D simulations)
├── clean_data (dataset: 1D or 2D array)
├── simulated_data/ (group)
│ ├── 000000 (dataset: first noisy realization)
│ ├── 000001 (dataset: second noisy realization)
│ └── ...
└── metadata/ (group with [optional]attributes)
├── detection ('analog' or 'photon_counting')
├── noise_level (analog noise level)
├── noise_type (analog noise type)
├── counts_per_delay (photon counting counts)
├── count_rate ([optional] photon counting rate)
├── integration_time ([optional] photon counting integration time)
├── seed ([optional] random seed, if set)
├── dimension (1 or 2)
├── n_datasets (number of noisy datasets)
├── model_parameters (JSON string of all parameters)
└── model_name (model name)
**Why HDF5?**
HDF5 format chosen because:
- Efficient for large multidimensional arrays
- Self-describing (metadata embedded)
- Widely supported (Python, MATLAB, Igor, etc.)
- Allows partial loading (don't need entire file in memory)
- Standard in scientific computing
**Model Parameters:**
All model parameters saved as JSON string in metadata for complete
reproducibility. Includes:
- Parameter values
- vary flags (which parameters were free)
- Bounds (min/max)
- Expressions (parameter constraints)
This allows exact recreation of the model used for simulation.
**File Organization:**
Default directory structure::
project_directory/
└── simulated_data/
├── simulation_001.h5
├── simulation_002.h5
└── batch_001.h5
Keeps simulated data organized and separate from experimental data.
**Multiple Datasets:**
When n_data provided (from simulate_n), all realizations saved in
simulated_data group with sequential names:
- 000000, 000001, ..., 000099 for 100 datasets
- Zero-padded for proper sorting
Clean data saved once (same for all realizations).
**Loading Data:**
Standard h5py usage::
import h5py
with h5py.File('simulated_data/data.h5', 'r') as f:
# Load axes
energy = f['energy'][:]
time = f['time'][:]
# Load clean data
clean = f['clean_data'][:]
# Load all noisy datasets
noisy_datasets = []
for key in sorted(f['simulated_data'].keys()):
noisy_datasets.append(f['simulated_data'][key][:])
# Load metadata
detection = f['metadata'].attrs['detection']
n_datasets = f['metadata'].attrs['n_datasets']
See Also
--------
simulate_n : Generate multiple datasets to save
simulate_1d : Generate 1D data
simulate_2d : Generate 2D data
h5py : Python HDF5 library
"""
if self.data_noisy is None and n_data is None:
raise ValueError(
"No simulated data available."
" Run simulate_1d, simulate_2d, or simulate_n first."
)
# Create simulated_data directory if it doesn't exist
sim_dir = Path.cwd() / "simulated_data"
sim_dir.mkdir(parents=True, exist_ok=True)
# Set default filepath
if filepath is None:
if save_format == "hdf5":
filepath = str(sim_dir / "simulated_data.h5")
else:
# User provided filepath - make sure it's in simulated_data directory
if not Path(filepath).is_relative_to(sim_dir):
filepath = str(sim_dir / Path(filepath).name)
# Check if file exists and handle overwrite
if filepath is None:
raise ValueError("filepath could not be resolved")
if Path(filepath).exists() and not overwrite:
raise FileExistsError(
f"File already exists: {filepath}\n"
f"Set overwrite=True to overwrite, or provide a different filepath."
)
if save_format == "hdf5":
self._save_hdf5(filepath, n_data)
# add other formats here
else:
raise ValueError(f"Unknown save format: {save_format}. Use 'hdf5'.")
if show_output >= 1:
print(f"Data saved to: {filepath}")
#
def _save_hdf5(self, filepath: str, n_data: list[np.ndarray] | None = None) -> None:
"""
Save data to HDF5 format with proper structure
HDF5 structure:
/energy - energy axis array
/time - time axis array (empty if 1D)
/clean_data - clean data without noise
/simulated_data/000000 - first noisy dataset
/simulated_data/000001 - second noisy dataset
...
/metadata - group containing simulation parameters
"""
with h5py.File(filepath, "w") as f:
# Save axes at root level
f.create_dataset("energy", data=self.model.energy)
# Handle time axis - check if it exists and has valid data
if self.model.time is not None and len(self.model.time) > 0:
f.create_dataset("time", data=self.model.time)
else:
# For 1D simulations, save empty array
f.create_dataset("time", data=np.array([]))
# Save clean data at root level
if self.data_clean is not None:
f.create_dataset("clean_data", data=self.data_clean)
# Create simulated_data group for noisy datasets
sim_group = f.create_group("simulated_data")
# Save noisy data - either from n_data list or single simulation
if n_data is not None:
# Multiple datasets from simulate_n
for i, noisy_data in enumerate(n_data):
dataset_name = f"{i:06d}"
sim_group.create_dataset(dataset_name, data=noisy_data)
elif self.data_noisy is not None:
# Single dataset
sim_group.create_dataset("000000", data=self.data_noisy)
# Save metadata group at root level
meta = f.create_group("metadata")
meta.attrs["detection"] = self.detection
# Save detection-specific parameters
if self.detection == "analog":
meta.attrs["noise_level"] = self.noise_level
meta.attrs["noise_type"] = self.noise_type
elif self.detection == "photon_counting":
meta.attrs["counts_per_delay"] = self.counts_per_delay
if self.count_rate is not None:
meta.attrs["count_rate"] = self.count_rate
if self.integration_time is not None:
meta.attrs["integration_time"] = self.integration_time
if self.seed is not None:
meta.attrs["seed"] = self.seed
# Determine dimensionality from clean data
if self.data_clean is not None:
if self.data_clean.ndim == 1:
meta.attrs["dimension"] = 1
else:
meta.attrs["dimension"] = 2
# Save number of datasets
if n_data is not None:
meta.attrs["n_datasets"] = len(n_data)
else:
meta.attrs["n_datasets"] = 1
# Save model parameters as JSON in metadata
params_dict = {}
for par_name in self.model.lmfit_pars:
par = self.model.lmfit_pars[par_name]
params_dict[par_name] = {
"value": float(par.value),
"vary": bool(par.vary),
"min": float(par.min) if par.min is not None else None,
"max": float(par.max) if par.max is not None else None,
"expr": par.expr or None,
}
# Save as JSON string in metadata
meta.attrs["model_parameters"] = json.dumps(params_dict, indent=2)
meta.attrs["model_name"] = self.model.name
#
[docs]
def simulate_parameter_sweep(
self,
parameter_sweep: ParameterSweep,
n_realizations: int,
*,
dim: int = 2,
filepath: str = "ml_training_data.h5",
show_progress: bool = True,
) -> None:
"""
Generate ML training dataset by sweeping parameters.
Processes configurations one at a time, immediately saving to disk.
Memory usage remains constant regardless of parameter space size.
Parameters
----------
parameter_sweep : ParameterSweep
Generator yielding parameter configurations
n_realizations : int
Number of noisy realizations per parameter configuration
dim : {1, 2}, default=2
Dimensionality of simulated data:
- 1: Generate 1D spectra
- 2: Generate 2D spectra
filepath : str, default='ml_training_data.h5'
HDF5 file path for output
show_progress : bool, default=True
Print progress updates during generation
Examples
--------
>>> # Set up parameter space
>>> sweep = ParameterSweep(strategy='random', seed=42)
>>> sweep.add_uniform('GLP_01_A', 5, 30, n_samples=100)
>>> sweep.add_uniform('GLP_01_x0', 5, 15, n_samples=100)
>>>
>>> # Generate dataset
>>> sim = Simulator(model, noise_level=0.05, seed=42)
>>> sim.simulate_parameter_sweep(
... parameter_sweep=sweep,
... n_realizations=20,
... filepath='training_data.h5'
... )
Processing config 1/100: {'GLP_01_A': 12.5, 'GLP_01_x0': 8.3}
Saved config 1 with 20 realizations
...
Parameter sweep complete!
Generated 100 configs × 20 realizations
Data saved to: ./simulated_data/training_data.h5
Notes
-----
**Memory Efficiency:**
Only one configuration is in memory at a time. Each is immediately
written to disk before processing the next. Total memory usage is
independent of parameter space size.
**Resumability:**
If interrupted, completed configurations are already saved to disk.
Currently does not support automatic resume (will overwrite file).
**File Structure:**
See _initialize_sweep_hdf5 for complete HDF5 structure description.
See Also
--------
ParameterSweep : Define parameter space to sweep
simulate_n : Generate multiple noisy realizations
_initialize_sweep_hdf5 : HDF5 file structure
_append_config_to_hdf5 : Incremental saving logic
"""
# Convert to Path object
filepath_obj = Path(filepath)
# If filepath is just a filename (no directory component),
# put it in simulated_data subdirectory
if filepath_obj.parent == Path():
sim_dir = Path.cwd() / "simulated_data"
sim_dir.mkdir(parents=True, exist_ok=True)
filepath = str(sim_dir / filepath_obj.name)
else:
# User provided a path with directory, use it as-is but ensure parent exists
filepath_obj.parent.mkdir(parents=True, exist_ok=True)
filepath = str(filepath_obj)
# Get total number of configurations
n_configs = parameter_sweep.get_n_configs()
if show_progress:
print(
f"Starting parameter sweep:\n"
f" Total configurations: {n_configs}\n"
f" Realizations per config: {n_realizations}\n"
f" Total datasets: {n_configs * n_realizations}\n"
f" Output file: {filepath}\n"
)
# Initialize HDF5 file with structure
self._initialize_sweep_hdf5(
filepath, parameter_sweep, n_realizations, n_configs, dim
)
# Process each configuration
for config_idx, param_config in enumerate(parameter_sweep):
if show_progress:
# Format parameters nicely
param_str = ", ".join(f"{k}={v:.3g}" for k, v in param_config.items())
print(
f"Processing config {config_idx + 1}/{n_configs}: {{{param_str}}}"
)
# Update model parameters
param_names = list(param_config.keys())
param_values = list(param_config.values())
self.model.update_value(param_values, par_select=param_names)
# Generate noisy realizations for this config
clean, noisy_list, _noise_list = self.simulate_n(
n=n_realizations,
dim=dim,
show_progress=False, # Don't clutter output
)
# Append to HDF5 immediately (memory-efficient)
self._append_config_to_hdf5(
filepath, config_idx, param_config, clean, noisy_list
)
if show_progress:
print(
f" ✓ Saved config {config_idx + 1}"
f" with {n_realizations} realizations"
)
if show_progress:
print(
f"\n{'=' * 60}\n"
f"Parameter sweep complete!\n"
f"Generated {n_configs} configs × {n_realizations} realizations\n"
f"Total datasets: {n_configs * n_realizations}\n"
f"Data saved to: {filepath}\n"
f"{'=' * 60}"
)
#
def _initialize_sweep_hdf5(
self,
filepath: str,
parameter_sweep: ParameterSweep,
n_realizations: int,
n_configs: int,
dim: int,
) -> None:
"""
Create HDF5 file structure for parameter sweep data.
File Structure
--------------
/
├── energy (dataset) # Energy axis
├── time (dataset) # Time axis (empty for 1D)
├── metadata/ (group) # Sweep metadata
│ ├── attrs: n_configs, n_realizations_per_config, ...
│ └── attrs: parameter_space (JSON)
├── parameter_configs/ (group) # Parameter configurations
│ ├── config_000000/ (group)
│ │ ├── attrs: GLP_01_A, GLP_01_x0, ...
│ │ ├── attrs: all_parameter_values (JSON)
│ │ └── clean (dataset) # Clean data for this config
│ ├── config_000001/ (group)
│ └── ...
└── simulated_data/ (group) # Noisy realizations
├── config_000000/ (group)
│ ├── 000000 (dataset) # First realization
│ ├── 000001 (dataset) # Second realization
│ └── ...
├── config_000001/ (group)
└── ...
Parameters
----------
filepath : str
Path to HDF5 file to create
parameter_sweep : ParameterSweep
Parameter sweep object (for metadata)
n_realizations : int
Number of noisy realizations per config
n_configs : int
Total number of parameter configurations
dim : {1, 2}
Dimensionality of simulated data
"""
with h5py.File(filepath, "w") as f:
# Save axes (same for all configs)
f.create_dataset("energy", data=self.model.energy)
if self.model.time is not None and len(self.model.time) > 0:
f.create_dataset("time", data=self.model.time)
else:
f.create_dataset("time", data=np.array([]))
# Create groups for organization
f.create_group("parameter_configs")
f.create_group("simulated_data")
# Save sweep metadata
meta = f.create_group("metadata")
meta.attrs["n_configs"] = n_configs
meta.attrs["n_realizations_per_config"] = n_realizations
meta.attrs["total_datasets"] = n_configs * n_realizations
# Simulator settings
meta.attrs["detection"] = self.detection
if self.detection == "analog":
meta.attrs["noise_level"] = self.noise_level
meta.attrs["noise_type"] = self.noise_type
elif self.detection == "photon_counting":
meta.attrs["counts_per_delay"] = self.counts_per_delay
if self.count_rate is not None:
meta.attrs["count_rate"] = self.count_rate
if self.integration_time is not None:
meta.attrs["integration_time"] = self.integration_time
if self.seed is not None:
meta.attrs["seed"] = self.seed
# Parameter sweep settings
meta.attrs["sweep_strategy"] = parameter_sweep.strategy
meta.attrs["sweep_seed"] = (
"None" if parameter_sweep.seed is None else parameter_sweep.seed
)
# Save parameter space definition as JSON
param_space = {}
for par_name, spec in parameter_sweep.parameter_specs.items():
# Convert numpy arrays to lists for JSON serialization
spec_copy = spec.copy()
if "values" in spec_copy:
spec_copy["values"] = spec_copy["values"].tolist()
param_space[par_name] = spec_copy
meta.attrs["parameter_space"] = json.dumps(param_space, indent=2)
# Save full model definition once (static across all configs)
model_params = {}
for par_name in self.model.lmfit_pars:
par = self.model.lmfit_pars[par_name]
model_params[par_name] = {
"value": float(par.value),
"vary": bool(par.vary),
"min": float(par.min) if par.min is not None else None,
"max": float(par.max) if par.max is not None else None,
"expr": par.expr or None,
}
meta.attrs["model_parameters"] = json.dumps(model_params, indent=2)
meta.attrs["model_name"] = self.model.name
# Dimension matches the actual data written, not model capability
meta.attrs["dimension"] = dim
#
def _append_config_to_hdf5(
self,
filepath: str,
config_idx: int,
param_config: dict[str, float],
clean: np.ndarray,
noisy_list: list[np.ndarray],
) -> None:
"""
Append single parameter configuration and its realizations to HDF5.
Parameters
----------
filepath : str
Path to HDF5 file
config_idx : int
Configuration index (for naming)
param_config : dict
Parameter values for this configuration
clean : ndarray
Clean (noiseless) data
noisy_list : list of ndarray
List of noisy realizations
"""
with h5py.File(filepath, "a") as f:
# Create group for this configuration
config_name = f"config_{config_idx:06d}"
configs_group = require_group(f["parameter_configs"], "parameter_configs")
config_group = configs_group.create_group(config_name)
# Save swept parameters as attributes
for par_name, value in param_config.items():
config_group.attrs[par_name] = float(value)
# Save all parameter values for this config (full model state)
param_values = {
par_name: float(self.model.lmfit_pars[par_name].value)
for par_name in self.model.lmfit_pars
}
config_group.attrs["all_parameter_values"] = json.dumps(param_values)
# Save clean data for this configuration
config_group.create_dataset("clean", data=clean)
# Save noisy realizations
simulated_group = require_group(f["simulated_data"], "simulated_data")
data_group = simulated_group.create_group(config_name)
for real_idx, noisy_data in enumerate(noisy_list):
data_group.create_dataset(f"{real_idx:06d}", data=noisy_data)