Source code for trspecfit.trspecfit

"""
Project and file management for spectroscopy data analysis.

This module provides high-level classes for organizing and managing spectroscopy
fitting workflows. It handles project configuration, data loading, model management,
and orchestrates the complete fitting pipeline from data import to result export.

Core Classes
------------
Project : Project configuration and directory management
    Manages project-wide settings including plot configuration, file I/O formats,
    and fitting parameters. Supports YAML-based configuration for reproducible
    workflows across different users and spectroscopy types.

File : Data container with model fitting capabilities
    Represents a single spectroscopy dataset (1D or 2D) with its associated
    energy/time/auxiliary axes. Manages multiple models for comparison, handles
    baseline extraction, and provides methods for baseline fitting, Slice-by-Slice
    fitting, and full 2D time- and energy-resolved fitting. Supports parameter
    profiles over an auxiliary axis via Profile models.

Workflow
--------
1. **Setup**: Create Project with configuration settings
2. **Load Data**: Create File object with data, energy, time, and optionally aux_axis
3. **Define Models**: Load models from YAML files using File.load_model()
4. **Attach Profiles** (optional): Add parameter profiles with File.add_par_profile()
5. **Add Dynamics** (optional): Add time-dependence with File.add_time_dependence()
6. **Set Limits**: Define fitting regions with File.set_fit_limits()
7. **Fit**: Execute appropriate fitting method:
   - File.fit_baseline() for ground state spectrum
   - File.fit_slice_by_slice() for time-independent analysis
   - File.fit_2d() for global time- and energy-resolved fitting
8. **Export**: Results automatically saved to project directory structure

Features
--------
- Hierarchical project/file organization with automatic directory creation
- Multiple model management and comparison within single File
- YAML-based project and model configuration
- Flexible plot customization via PlotConfig
- Support for 1D (energy-only) and 2D (time + energy) datasets
- Parameter profiles over auxiliary axis (e.g. depth) via Profile models
- Baseline/pre-trigger spectrum extraction and fitting
- Slice-by-Slice time-series fitting with parameter evolution tracking
- Global 2D fitting with time-dependent parameters
- Automatic result export (parameters, plots, confidence intervals)

Examples
--------
See examples/ directory for complete workflows.
"""

import concurrent.futures
import copy
import multiprocessing
import os
import pathlib
import re
import time
import types
import warnings
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any, Literal, cast

# TYPE_CHECKING is False at runtime so this import is skipped during execution.
# Exists only for type checkers (mypy/pyright) to resolve types.ModuleType annotations.
if TYPE_CHECKING:
    import types

import lmfit
import numpy as np
import pandas as pd
from IPython.display import display
from ruamel.yaml import YAML
from tqdm import tqdm

from trspecfit import fitlib, mcp

# standardized plotting configuration
from trspecfit.config.plot import PlotConfig
from trspecfit.fit_results import FitResults

# function library for energy, time, and profile components
from trspecfit.functions import energy as fcts_energy
from trspecfit.functions import profile as fcts_profile
from trspecfit.functions import time as fcts_time
from trspecfit.utils import fit_io
from trspecfit.utils import lmfit as ulmfit
from trspecfit.utils import parsing as uparsing
from trspecfit.utils import plot as uplt
from trspecfit.utils import sbs as usbs

PathLike = str | pathlib.Path
ModelRef = str | int | list[str]


#
def _fp_key(fingerprint: dict[str, Any]) -> tuple[Any, ...]:
    """Hashable key for a file fingerprint (used for slot grouping/match)."""

    return (
        fingerprint["data_sha256"],
        fingerprint["energy_sha256"],
        fingerprint["time_sha256"],
        tuple(int(x) for x in fingerprint["shape"]),
    )


#
def _to_str_set(arg: str | Sequence[str] | None) -> set[str] | None:
    """Normalize a string-or-sequence filter arg to a set; ``None`` → no filter."""

    if arg is None:
        return None
    if isinstance(arg, str):
        return {arg}
    return set(arg)


#
def _trspecfit_version() -> str:
    """Best-effort package version for archive metadata."""

    import importlib.metadata as md

    try:
        return md.version("trspecfit")
    except md.PackageNotFoundError:
        return "unknown"


# multi-subcycle models allow for convolution only in the "0th subcycle"
# i.e. first model_info element which affects all times t.
# "conv" functions in individual subcycles are currently ignored


#
#
[docs] class Project: """ Project-wide configuration and directory structure management. Project provides centralized configuration for spectroscopy analysis workflows. It manages project directories, plot settings, file I/O formats, and analysis parameters. Configuration can be customized via YAML files to support different users, instruments, or spectroscopy techniques. Parameters ---------- path : str or Path Base directory for project data files and YAML configuration. If None, defaults to 'test' directory. name : str, default='test' Name for this analysis run. Creates subdirectory in results folder. config_file : str or Path, optional YAML configuration file name (located in path directory). If None, uses default settings only. Attributes ---------- path : Path Base project directory containing data and configuration path_results : Path Results directory (path + '_fits' suffix) name : str Name for this analysis run files : list of File All File instances registered with this Project show_output : int Verbosity level: - 0: Silent / programmatic / API mode -- no prints, no plots displayed or saved - 1: Interactive / notebook / UI mode -- show timing, fit results, save plots and data spec_fun_str : str Name of fitting function in ``trspecfit.spectra`` (e.g. ``'fit_model_gir'``, ``'fit_model_mcp'``, ``'fit_model_compare'``) Notes ----- **Plot Configuration:** Plot-related attributes (axis labels, directions, colormaps, DPI, etc.) are used to construct PlotConfig objects. See trspecfit.config.plot.PlotConfig for full documentation of available plot settings. **YAML Configuration:** Create a YAML file in the project directory to override defaults. Only specified settings need to be included; others use defaults. **File I/O Settings:** Attributes ext, num_fmt, delim, da_fmt, and da_slices_fmt control file export formats and can be customized per project via YAML or direct attribute assignment. """ # def __init__( self, path: PathLike | None, name: str = "test", config_file: PathLike | None = "project.yaml", ) -> None: self.path = pathlib.Path(path) if path is not None else pathlib.Path("test") self.path_results = pathlib.Path(f"{path}_fits") self.name = name self._config_file: PathLike | None = None self.files: list[File] = [] self._project_fit_result: list[Any] | None = None # Append-only log of completed fit slots, populated eagerly at fit # completion by the _slot_from_<fit_type> helpers in utils/fit_io.py. self._fit_history: list[fit_io.SavedFitSlot] = [] # Set defaults first self._set_defaults() # Override with YAML config if provided if config_file is not None: self._load_config(config_file) # def _set_defaults(self) -> None: """Set default project configuration.""" self.show_output = 1 # When False, fit_* methods skip the automatic CSV/PNG side effects # (fit_wrapper(save_output=1) and the legacy save_*_fit calls). # Explicit File.export_fit / Project.export_fits still write. self.auto_export = True # Plot settings self.e_label = "Energy" self.t_label = "Time" self.z_label = "Intensity" self.x_dir = "def" self.x_type = "lin" self.y_dir = "def" self.y_type = "lin" self.z_colormap = "viridis" self.z_colorbar = "ver" self.z_type = "lin" self.dpi_plt = 100 self.dpi_save = 300 self.res_mult = 5 self.title = "" self.x_lim = None self.y_lim = None self.data_slice = None self.z_lim = None self.colors = None self.linestyles = None self.linewidths = None self.markers = None self.markersizes = None self.alphas = None self.legend = None self.waterfall = 0 self.vlines = None self.hlines = None self.ticksize = None self.y_norm = 0 self.y_scale = None # File I/O settings self.ext = ".dat" self.num_fmt = "%.6e" self.delim = "," self.da_fmt = "%04d" self.da_slices_fmt = "%06d" # Advanced settings self.spec_fun_str = "fit_model_gir" # Noise / sigma defaults — applied to every File at construction time. # Files inherit these values at __init__ and may override them via # File.set_sigma(...). self.noise_type: str = fit_io.NOISE_TYPE_UNKNOWN self.sigma_source: str = fit_io.SIGMA_SOURCE_USER self.sigma_type: str = fit_io.SIGMA_TYPE_CONSTANT self.sigma_data: float = float("nan") # def __repr__(self) -> str: return f"Project(path='{self.path}', name='{self.name}')" # @property def results(self) -> FitResults: """ Snapshot view over the in-session fit history. Returns a fresh ``FitResults`` wrapping a copy of ``_fit_history``; subsequent fits append to the log and do not affect previously returned ``FitResults``. Object identity is unstable (``p.results is p.results`` is False); the contents at a given access are fixed. """ return FitResults(slots=list(self._fit_history)) #
[docs] def save_fits( self, filepath: PathLike | str | None = None, *, file: "int | str | File | Sequence[int | str | File] | None" = None, model: str | Sequence[str] | None = None, fit_type: fit_io.FitType | Sequence[fit_io.FitType] | None = None, overwrite: bool = False, show_output: int = 1, ) -> None: """ Save filtered fit slots from ``_fit_history`` to an HDF5 archive. Filters ``_fit_history`` by ``(file, model, fit_type)``, collapses to latest-per-``history_key`` (snapshot semantics), assembles a ``SavedProject``, and writes via ``utils.fit_io.write_archive``. Parameters ---------- filepath : path, optional Archive path. Default: ``./fit_results/<project_name>.fit.h5``. Append-mode: existing archives are augmented in place; pass a new path to start fresh. file : int | str | File | sequence, optional Restrict to slots whose live ``Project.files`` entry matches. ``int`` indexes into ``self.files``; ``str`` matches ``File.name``; ``File`` is taken directly. Resolved to file fingerprints, then matched against ``slot.file_fingerprint``. model : str | sequence, optional String filter on ``slot.model_name``. fit_type : str | sequence, optional String filter on ``slot.fit_type``. overwrite : bool, default False Slot-scoped: a slot collision (same canonical identity already in the archive) raises ``FileExistsError`` unless True. show_output : int, default 1 ``0`` to silence the per-call summary line. Notes ----- v1 behavior is snapshot-only (one slot per ``history_key``). ``keep_history=True`` for full-log save is deferred. """ project = self._build_saved_project_from_history( file=file, model=model, fit_type=fit_type ) if project is None: if show_output: print("No fit slots match the filter; nothing to save.") return if filepath is None: path = pathlib.Path("fit_results") / f"{self.name}.fit.h5" else: path = pathlib.Path(filepath) fit_io.write_archive(path, project=project, overwrite=overwrite) if show_output: n_slots = sum(len(sf.slots) for sf in project.files) print( f"Saved {n_slots} slot(s) across {len(project.files)} file(s) to {path}" )
#
[docs] def export_fits( self, filepath: PathLike | str | None = None, *, format: Literal["csv"] = "csv", file: "int | str | File | Sequence[int | str | File] | None" = None, model: str | Sequence[str] | None = None, fit_type: fit_io.FitType | Sequence[fit_io.FitType] | None = None, overwrite: bool = False, show_output: int = 1, ) -> None: """ Export filtered fit slots from ``_fit_history`` as a CSV/PNG tree. Same filter + snapshot-collapse pipeline as :meth:`save_fits`, but the output is a directory of human-readable artifacts rather than an HDF5 archive. One-way export — there is no ``load`` counterpart; round-tripping fits to disk is HDF5's job (use ``save_fits`` / ``load_fits`` for that). Parameters ---------- filepath : path, optional Output directory. Default: ``./fit_results/<project_name>``. Created if missing. format : {"csv"}, default ``"csv"`` Reserved kwarg; only CSV is implemented in v1. file : int | str | File | sequence, optional Restrict to slots whose live ``Project.files`` entry matches; same semantics as :meth:`save_fits`. model : str | sequence, optional String filter on ``slot.model_name``. fit_type : str | sequence, optional String filter on ``slot.fit_type``. overwrite : bool, default False Per-slot directory: a non-empty target dir raises ``FileExistsError`` unless True. Pre-checked across all slots before any writes (single conflict aborts the entire export). show_output : int, default 1 ``0`` to silence the per-call summary line. Output layout ------------- Exports are grouped by output directory, file name, model name, and fit type. The model and fit-type parts are joined with two underscores; when multiple slots share the same file, model, and fit type, an additional two-underscore hash suffix is appended. Each slot contains params.csv, metrics.csv (or metrics_per_slice.csv for SbS), conf_ci.csv / mcmc/flatchain.csv when present, plus per-fit-type artifacts: - baseline / spectrum: fit_1d.csv (energy, observed, fit, residual). - 2d / sbs: fit_2d.csv, observed_2d.csv, energy.csv, time.csv, 2D_data_fit_res.png. - sbs only: fit_pars.csv (per-slice param values) and one PNG per parameter from plt_fit_res_pars. The optional hash directory suffix appears only when more than one slot in the snapshot shares the same file, model, and fit type (i.e. different selections); the hash is the first 8 chars of the history key. """ if format != "csv": raise ValueError( f"Unsupported export format: {format!r}. Only 'csv' is " f"implemented in v1." ) project = self._build_saved_project_from_history( file=file, model=model, fit_type=fit_type ) if project is None: if show_output: print("No fit slots match the filter; nothing to export.") return if filepath is None: root = pathlib.Path("fit_results") / self.name else: root = pathlib.Path(filepath) # Per-file plot_config preserves File.plot_config customizations # (axis labels, colormaps, etc.) — File.save_sbs_fit / save_2d_fit # used self.plot_config for plotting, and we keep that contract. plot_configs: dict[str, Any] = {} for sf in project.files: live = self._find_file_for_slot(sf.slots[0]) if live is not None and live.plot_config is not None: plot_configs[sf.name] = live.plot_config n_written = fit_io.write_csv_export( root, project=project, num_fmt=self.num_fmt, delim=self.delim, plot_config=plot_configs, overwrite=overwrite, ) if show_output: print( f"Exported {n_written} slot(s) across {len(project.files)} " f"file(s) to {root}" )
# def _build_saved_project_from_history( self, *, file: "int | str | File | Sequence[int | str | File] | None", model: str | Sequence[str] | None, fit_type: fit_io.FitType | Sequence[fit_io.FitType] | None, ) -> fit_io.SavedProject | None: """ Apply the standard filter + collapse pipeline to ``_fit_history`` and return a fully-populated ``SavedProject``. Returns ``None`` when no slots survive the filter so callers can emit a "nothing to do" message and short-circuit. Used by :meth:`save_fits` and :meth:`export_fits` so both go through the identical filter / collapse / file-grouping logic. """ file_ids = self._resolve_save_file_filter(file) models_filter = _to_str_set(model) types_filter = _to_str_set(fit_type) filtered: list[fit_io.SavedFitSlot] = [] for slot in self._fit_history: slot_id = (_fp_key(slot.file_fingerprint), slot.file_name) if file_ids is not None and slot_id not in file_ids: continue if models_filter is not None and slot.model_name not in models_filter: continue if types_filter is not None and slot.fit_type not in types_filter: continue filtered.append(slot) snapshot = fit_io.collapse_history_to_snapshot(filtered) if not snapshot: return None # Group slots by source file identity (fingerprint + file_name) so # two distinct Project.files with byte-identical raw arrays but # different names are kept separate. Project enforces unique # File.name in-session, so name disambiguates fingerprint # collisions; the archive's full identity tuple is # (fingerprint, name, original_path) — see fit_archive_schema.md. by_id: dict[tuple[tuple[Any, ...], str], list[fit_io.SavedFitSlot]] = {} for s in snapshot: by_id.setdefault((_fp_key(s.file_fingerprint), s.file_name), []).append(s) saved_files: list[fit_io.SavedFile] = [] for slots in by_id.values(): live = self._find_file_for_slot(slots[0]) if live is None: raise ValueError( f"Slot for file {slots[0].file_name!r} has no matching " f"Project.files entry; cannot read raw arrays. " f"Re-attach the file or filter it out." ) assert live.data is not None # type guard assert live.energy is not None # type guard saved_files.append( fit_io.SavedFile( name=live.name, original_path=str(live.path), dim=int(live.dim), shape=tuple(int(x) for x in live.data.shape), fingerprint=slots[0].file_fingerprint, data=live.data, energy=live.energy, time=( live.time if live.time is not None else np.array([], dtype=np.float64) ), e_lim=list(live.e_lim) if live.e_lim else None, t_lim=list(live.t_lim) if live.t_lim else None, slots=tuple(slots), ) ) now = fit_io._now_iso() return fit_io.SavedProject( name=self.name, trspecfit_version=_trspecfit_version(), schema_version=fit_io.SCHEMA_VERSION, timestamp_created=now, timestamp_updated=now, files=tuple(saved_files), ) #
[docs] def load_fits( self, filepath: PathLike | str, *, file: str | Sequence[str] | None = None, model: str | Sequence[str] | None = None, fit_type: fit_io.FitType | Sequence[fit_io.FitType] | None = None, show_output: int = 1, ) -> FitResults: """ Load a fit archive and return a ``FitResults`` view. Convenience wrapper around ``FitResults.load`` for users who already have a ``Project`` in hand. Does **not** mutate Project state — the returned ``FitResults`` is independent of ``_fit_history`` and never merges into it. """ out = FitResults.load(filepath, file=file, model=model, fit_type=fit_type) if show_output: print(f"Loaded {len(out)} slot(s) from {filepath}") return out
# def _resolve_save_file_filter( self, arg: "int | str | File | Sequence[int | str | File] | None", ) -> set[tuple[tuple[Any, ...], str]] | None: """ Map a ``file=...`` arg for :meth:`save_fits` to a set of slot-identity keys ``(fingerprint_key, file_name)``. Both components are matched against the slot to disambiguate two ``Project.files`` with byte-identical raw arrays but distinct names. Returns ``None`` if ``arg`` is ``None`` (no filter). """ if arg is None: return None items: Sequence[int | str | File] if isinstance(arg, int | str | File): items = [arg] else: items = arg keys: set[tuple[tuple[Any, ...], str]] = set() for item in items: if isinstance(item, int): f = self.files[item] elif isinstance(item, str): f = self[item] elif isinstance(item, File): f = item else: raise TypeError( f"Unsupported file filter entry type: {type(item).__name__}" ) keys.add((_fp_key(f.fingerprint()), f.name)) return keys # def _find_file_for_slot(self, slot: fit_io.SavedFitSlot) -> "File | None": """ Look up the live ``File`` whose name and fingerprint match a slot. Both name and fingerprint must match; either alone can produce a false positive when (a) the user has two byte-identical files in the project under different names, or (b) a file was renamed post-fit so the slot's name no longer matches the live one. The slot's recorded ``file_name`` is the authoritative in-session identity (Project enforces unique names), and the fingerprint cross-checks that the live arrays still match. """ target_fp = _fp_key(slot.file_fingerprint) for f in self.files: if f.name != slot.file_name: continue if f.data is None or f.energy is None: continue if _fp_key(f.fingerprint()) == target_fp: return f return None # def __getitem__(self, key: int | str) -> "File": """ Access a File by index or name. Parameters ---------- key : int or str Integer index into ``self.files``, or string matching a File's ``name`` attribute. Returns ------- File Raises ------ KeyError If string key does not match any file name. IndexError If integer index is out of range. """ if isinstance(key, int): return self.files[key] for f in self.files: if f.name == key: return f available = [f.name for f in self.files] raise KeyError(f"No file with name '{key}'. Available: {available}") #
[docs] def describe(self, detail: int = 0) -> None: """ Display project configuration summary. Parameters ---------- detail : int, default=0 Verbosity level. - 0: project paths and config source. - 1: also list attached Files (path, dim, shape, models) and plot 2D data grid. - 2: also show plot and file I/O settings. """ if self.show_output < 1: return print("Project") print(f" path: {self.path}") print(f" results: {self.path_results}") print(f" name: {self.name}") if self._config_file is not None: print(f" config: {self._config_file}") else: print(" config: defaults (no YAML loaded)") if detail >= 1: print(f"\n Files ({len(self.files)}):") if not self.files: print(" (none)") for f in self.files: if f.data is None: print(f" {f.name}: no data") continue e_range = ( f"energy [{f.energy.min():.2f}, {f.energy.max():.2f}]" if f.energy is not None else "" ) t_range = ( f"time [{f.time.min():.2f}, {f.time.max():.2f}]" if f.time is not None else "" ) z_range = f"z [{f.data.min():.4g}, {f.data.max():.4g}]" parts = [p for p in [e_range, t_range, z_range] if p] print(f" {f.name}: {f.dim}D {f.data.shape}, {', '.join(parts)}") # Plot 2D data grid (only if all files share axes) if self.show_output >= 1: files_2d = [f for f in self.files if f.dim == 2 and f.data is not None] try: self._validate_shared_axes() axes_ok = True except ValueError: axes_ok = False if files_2d and axes_ok: config = files_2d[0].plot_config datasets = [f.data for f in files_2d if f.data is not None] uplt.plot_2d_grid( datasets=datasets, x=files_2d[0].energy, y=files_2d[0].time, titles=[f.name for f in files_2d], config=config, vlines=[f.e_lim_abs for f in files_2d], hlines=[f.t_lim_abs for f in files_2d], ) elif files_2d: print( " (2D grid plot skipped: files have" " different axis shapes and/or values)" ) if detail >= 2: print("\n Plot settings:") print(f" e_label: {self.e_label}") print(f" t_label: {self.t_label}") print(f" z_label: {self.z_label}") print(f" x_dir: {self.x_dir}") print(f" x_type: {self.x_type}") print(f" y_dir: {self.y_dir}") print(f" y_type: {self.y_type}") print(f" z_colormap: {self.z_colormap}") print(f" z_colorbar: {self.z_colorbar}") print(f" z_type: {self.z_type}") print(f" dpi_plt: {self.dpi_plt}") print(f" dpi_save: {self.dpi_save}") print(f" res_mult: {self.res_mult}") print("\n File I/O settings:") print(f" ext: {self.ext}") print(f" num_fmt: {self.num_fmt}") print(f" delim: {repr(self.delim)}") print(f" da_fmt: {self.da_fmt}") print(f" DA_slices: {self.da_slices_fmt}")
# def _load_config(self, config_file: PathLike) -> None: """ Load project configuration from YAML file. Allow different users or types of spectroscopy to overwrite project attributes Parameters ---------- config_file : str or Path Name or path of config file (looks in self.path) """ yaml = YAML(typ="safe") config_path = self.path / config_file try: with config_path.open() as f: config = yaml.load(f) if config is None: if self.show_output >= 1: print(f"Warning: {config_file} is empty, using defaults") return # Update attributes from config for key, value in config.items(): normalized_key = key.replace("-", "_") _key_map = { "x_label": "e_label", "y_label": "t_label", "dpi_plot": "dpi_plt", } project_key = _key_map.get(normalized_key) or normalized_key if hasattr(self, project_key): setattr(self, project_key, value) else: if self.show_output >= 1: print(f"Warning: Unknown config key '{key}' ignored") # Coerce + validate noise metadata. YAML "null" arrives as None # for sigma_data; normalize to NaN so downstream code can treat # the "unset" case as a finite-check, not a None-check. self.sigma_data = fit_io.normalize_sigma_data(self.sigma_data) fit_io.validate_noise_metadata( noise_type=self.noise_type, sigma_source=self.sigma_source, sigma_type=self.sigma_type, ) self._config_file = config_path except FileNotFoundError: if self.show_output >= 1: print(f"Config file {config_path} not found, using defaults") except Exception as e: # noqa: BLE001 if self.show_output >= 1: print(f"Error loading config: {e}") print("Using default settings") # ------------------------------------------------------------------ # Project-level model loading and baseline fitting # ------------------------------------------------------------------ # def _validate_shared_axes(self) -> None: """Verify all files share the same energy and time axes.""" files_with_data = [f for f in self.files if f.data is not None] if len(files_with_data) < 2: return ref = files_with_data[0] for f in files_with_data[1:]: if f.energy is not None and ref.energy is not None: if f.energy.shape != ref.energy.shape or not np.allclose( f.energy, ref.energy ): raise ValueError( f'Energy axis mismatch: "{ref.name}" vs "{f.name}". ' f"Project-level methods require shared axes." ) if f.time is not None and ref.time is not None: if f.time.shape != ref.time.shape or not np.allclose(f.time, ref.time): raise ValueError( f'Time axis mismatch: "{ref.name}" vs "{f.name}". ' f"Project-level methods require shared axes." ) #
[docs] def load_models( self, *, model_yaml: PathLike, model_info: str | list[str], ) -> None: """ Load the same energy model onto every file in the project. Parameters ---------- model_yaml : str or Path YAML file name (located in project path) defining the model. model_info : str or list of str Model name(s) to load (see ``File.load_model``). """ if not self.files: raise ValueError("Project has no files.") saved = self.show_output self.show_output = 0 try: for f in self.files: f.load_model(model_yaml=model_yaml, model_info=model_info) finally: self.show_output = saved if self.show_output >= 1: name = model_info if isinstance(model_info, str) else "_".join(model_info) print(f"Model '{name}' loaded for {len(self.files)} files")
#
[docs] def add_time_dependences( self, *, target_model: str, target_parameter: str, dynamics_yaml: PathLike, dynamics_model: str | list[str], frequency: float = -1, ) -> None: """ Add time dependence to a parameter on every file in the project. Parameters ---------- target_model : str Name of the energy model (must exist on every file). target_parameter : str Parameter to make time-dependent (e.g. ``'GLP_01_x0'``). dynamics_yaml : str or Path YAML file defining the dynamics model. dynamics_model : str or list of str Dynamics model name(s) (see ``File.add_time_dependence``). frequency : float, default=-1 Sub-cycle frequency (-1 = single cycle). """ if not self.files: raise ValueError("Project has no files.") saved = self.show_output self.show_output = 0 try: for f in self.files: f.add_time_dependence( target_model=target_model, target_parameter=target_parameter, dynamics_yaml=dynamics_yaml, dynamics_model=dynamics_model, frequency=frequency, ) finally: self.show_output = saved if self.show_output >= 1: print( f"Time dependence '{target_parameter}' added " f"for {len(self.files)} files" )
#
[docs] def set_fit_limits( self, energy_limits: Sequence[float] | None, *, time_limits: Sequence[float] | None = None, ) -> None: """ Set energy and time fitting limits on every file in the project. Parameters ---------- energy_limits : list of float or None Energy range ``[min, max]`` in absolute values. If None, uses each file's full energy range. time_limits : list of float, optional Time range ``[min, max]`` in absolute values. """ if not self.files: raise ValueError("Project has no files.") for f in self.files: f.set_fit_limits( energy_limits, time_limits=time_limits, show_plot=False, ) if self.show_output >= 1: e_str = ( f"energy {list(energy_limits)}" if energy_limits is not None else "energy [full]" ) t_str = f", time {list(time_limits)}" if time_limits is not None else "" print(f"Fit limits set for {len(self.files)} files: {e_str}{t_str}")
#
[docs] def define_baselines( self, *, time_start: float, time_stop: float, time_type: str = "abs", ) -> None: """ Define baseline on every file in the project. Parameters ---------- time_start : float or int Start of baseline region (absolute value or index). time_stop : float or int End of baseline region (absolute value or index). time_type : {'abs', 'ind'}, default='abs' Interpretation of time_start/time_stop. """ if not self.files: raise ValueError("Project has no files.") self._validate_shared_axes() for f in self.files: f.define_baseline( time_start=time_start, time_stop=time_stop, time_type=time_type, show_plot=False, ) if self.show_output >= 1: type_label = "index" if time_type == "ind" else "absolute" print( f"Baseline defined for {len(self.files)} files: " f"time ({type_label}) [{time_start}, {time_stop}]" )
#
[docs] def fit_baselines( self, *, model_name: str, stages: int = 2, **fit_wrapper_kwargs, ) -> None: """ Fit baseline on every file in the project. Each file must already have the named model loaded and a baseline defined (via ``define_baselines`` or ``File.define_baseline``). Parameters ---------- model_name : str Name of baseline model (must exist on every file). stages : {1, 2}, default=2 Number of optimization stages. **fit_wrapper_kwargs Additional keyword arguments passed to ``fitlib.fit_wrapper``. """ if not self.files: raise ValueError("Project has no files.") for f in self.files: if f.select_model(model_name) is None: raise ValueError(f'File "{f.name}": model "{model_name}" not loaded.') if f.data_base is None: raise ValueError( f'File "{f.name}": baseline not defined. ' f"Run define_baselines() first." ) t_start = time.time() saved = self.show_output self.show_output = 0 try: for f in self.files: f.fit_baseline( model_name=model_name, stages=stages, **fit_wrapper_kwargs, ) finally: self.show_output = saved if self.show_output >= 1: fitlib.time_display( t_start=t_start, print_str=(f"Baseline fit complete for {len(self.files)} files: "), ) # Show saved baseline fit plots in a grid import matplotlib.image as mpimg images = [] for f in self.files: img_path = ( f.create_model_path(model_name, fit_type="baseline") / "base_fit.png" ) if img_path.exists(): images.append(mpimg.imread(str(img_path))) if images: uplt.plot_grid(images, columns=min(3, len(images)))
# ------------------------------------------------------------------ # Project-level fitting # ------------------------------------------------------------------ # def _build_fit_params( self, *, model_name: str, ) -> tuple[lmfit.Parameters, dict]: """ Build combined lmfit.Parameters and mapping for project-level fit. Walks every File's active 2D model, reads each parameter's ``vary_level`` (``"project"``/``"file"``/``"static"``), and assembles: * A single ``lmfit.Parameters`` object for the optimizer. * A ``project_fit_info`` dict consumed by ``spectra.fit_project_mcp``. Parameters ---------- model_name : str Name of the model to fit (must exist on every File). Returns ------- combined_pars : lmfit.Parameters Optimizer parameters (project-vary once, file-vary per file). project_fit_info : dict Context dict for ``fit_project_mcp`` containing mapping, file/model references, and parameter names. """ # (project_param_name, file_index, local_param_name) mapping: list[tuple[str, int, str]] = [] # Track which project-vary params we've already added project_seen: set[str] = set() models: list[mcp.Model] = [] # Pass 1: collect models, build per-file name remapping, create # parameters (without expressions — those need the full remap). # name_remap[file_idx] maps local_name -> project_name name_remaps: list[dict[str, str]] = [] # Deferred expression params: (project_name, file_idx, expr) expr_deferred: list[tuple[str, int, str]] = [] combined_pars = lmfit.Parameters() for file_idx, f in enumerate(self.files): model = f.select_model(model_name) if model is None: raise ValueError(f'File "{f.path}" does not have model "{model_name}"') models.append(model) vary_levels = model.get_vary_levels() name_remap: dict[str, str] = {} for local_name, lmf_par in model.lmfit_pars.items(): level = vary_levels.get(local_name, "static") if level == "project": proj_name = local_name name_remap[local_name] = proj_name if local_name not in project_seen: combined_pars.add( lmfit.Parameter( proj_name, value=lmf_par.value, vary=lmf_par.vary, min=lmf_par.min, max=lmf_par.max, ) ) project_seen.add(local_name) else: existing = combined_pars[proj_name] if not np.isclose(existing.value, lmf_par.value): warnings.warn( f'Project-vary param "{local_name}" has ' f"different initial values across files: " f"using {existing.value} (file 0), " f"ignoring {lmf_par.value} " f"(file {file_idx}).", stacklevel=2, ) if not np.isclose(existing.min, lmf_par.min): raise ValueError( f'Project-vary param "{local_name}" has ' f"different min bounds across files: " f"{existing.min} (file 0) vs " f"{lmf_par.min} (file {file_idx}). " f"Shared parameters must use identical bounds " f"across all files." ) if not np.isclose(existing.max, lmf_par.max): raise ValueError( f'Project-vary param "{local_name}" has ' f"different max bounds across files: " f"{existing.max} (file 0) vs " f"{lmf_par.max} (file {file_idx}). " f"Shared parameters must use identical bounds " f"across all files." ) mapping.append((proj_name, file_idx, local_name)) else: proj_name = f"file{file_idx:02d}_{local_name}" name_remap[local_name] = proj_name vary = lmf_par.vary if level == "file" else False if lmf_par.expr: # Defer — need full remap to rewrite references combined_pars.add( lmfit.Parameter( proj_name, value=lmf_par.value, vary=False, ) ) expr_deferred.append((proj_name, file_idx, lmf_par.expr)) else: combined_pars.add( lmfit.Parameter( proj_name, value=lmf_par.value, vary=vary, min=lmf_par.min, max=lmf_par.max, ) ) mapping.append((proj_name, file_idx, local_name)) name_remaps.append(name_remap) # Pass 2: resolve deferred expressions using per-file name remaps. for proj_name, file_idx, expr in expr_deferred: remap = name_remaps[file_idx] # Replace longest names first to avoid partial substitution rewritten = expr for local_name in sorted(remap, key=len, reverse=True): rewritten = re.sub( rf"\b{re.escape(local_name)}\b", remap[local_name], rewritten, ) combined_pars[proj_name].set(expr=rewritten) par_names = list(combined_pars.keys()) project_fit_info: dict = { "mapping": mapping, "files": self.files, "models": models, "par_names": par_names, } return combined_pars, project_fit_info #
[docs] def fit_2d( self, *, model_name: str, stages: int = 2, **fit_wrapper_kwargs, ) -> None: """ Fit all files simultaneously with shared/independent parameters. Builds a combined parameter set from all files, runs the optimizer once, and distributes final results back to each file's model. Parameter sharing is controlled by the ``vary`` field in each model's YAML (``"project"``/``"file"``/``"static"``). Parameters ---------- model_name : str Model name (must exist on every File in the project). stages : {1, 2}, default=2 Number of optimization stages (see ``fitlib.fit_wrapper``). **fit_wrapper_kwargs Additional keyword arguments passed to ``fitlib.fit_wrapper``. """ t_start = time.time() if not self.files: raise ValueError("Project has no files to fit.") # Validate: each file must have baseline fitted and 2D model ready for f in self.files: if f.model_base is None: raise ValueError( f'File "{f.name}": baseline not fitted. ' f"Fit individual baselines with file.fit_baseline() " f"or all at once with project.fit_baselines()." ) model = f.select_model(model_name) if model is None: raise ValueError(f'File "{f.path}" does not have model "{model_name}"') if f.energy is None or f.time is None or f.data is None: raise ValueError(f'File "{f.path}": data or axes missing.') # Set fixed params from baseline (same as File.fit_2d does) base_df = ulmfit.par_to_df(f.model_base.lmfit_pars, col_type="min") model.update_value( new_par_values=list(base_df["value"]), par_select=list(base_df["name"]), ) # Build combined parameters and project fit context combined_pars, project_fit_info = self._build_fit_params( model_name=model_name, ) # Build concatenated data array (sliced per file) data_slices: list[np.ndarray] = [] for f in self.files: assert f.data is not None # type guard d = f.data if f.e_lim and f.t_lim: d = d[f.t_lim[0] : f.t_lim[1], f.e_lim[0] : f.e_lim[1]] elif f.e_lim: d = d[:, f.e_lim[0] : f.e_lim[1]] elif f.t_lim: d = d[f.t_lim[0] : f.t_lim[1], :] data_slices.append(d.flatten()) concat_data = np.concatenate(data_slices) # const: (x, data, package, fit_fun_str, unpack, e_lim, t_lim) # e_lim and t_lim are empty — slicing is handled inside # fit_project_mcp const: tuple[Any, ...] = ( np.array([]), # x — unused by fit_project_mcp concat_data, "fit_project_mcp", 0, [], # no additional e_lim slicing [], # no additional t_lim slicing ) args = (project_fit_info, 2) par_names = project_fit_info["par_names"] if self.show_output >= 1: n_project = sum( 1 for p in combined_pars.values() if p.vary and not p.name.startswith("file") ) n_file = sum( 1 for p in combined_pars.values() if p.vary and p.name.startswith("file") ) n_static = sum(1 for p in combined_pars.values() if not p.vary) print( f"Project fit: {len(self.files)} files, " f"{n_project} project-vary, {n_file} file-vary, " f"{n_static} static params" ) result = fitlib.fit_wrapper( const=const, args=args, par_names=par_names, par=combined_pars, stages=stages, show_output=1 if self.show_output >= 1 else 0, save_output=0, **fit_wrapper_kwargs, ) # Distribute final parameters back to file models and hook into # the standard File 2D-fit lifecycle so that save_2d_fit() and # get_fit_results("2d") work on project-fitted files. mapping = project_fit_info["mapping"] models = project_fit_info["models"] joint_result = result[1] if result[1] else None if joint_result: final_pars = joint_result.params for proj_name, file_idx, local_name in mapping: if proj_name in final_pars: models[file_idx].lmfit_pars[local_name].value = final_pars[ proj_name ].value # method/nvarys come from the joint optimizer; per-file stderr/CI are # absent by design (joint covariance does not decompose cleanly per # file). conf_ci is an empty DataFrame so _append_2d_slot's # `result[2].empty` check works without branching. joint_method = ( getattr(joint_result, "method", "unknown") if joint_result else "unknown" ) joint_nvarys = int(getattr(joint_result, "nvarys", 0)) if joint_result else 0 for f, model in zip(self.files, models, strict=True): f.model_2d = model assert model is not None # type guard assert f.energy is not None and f.data is not None # type guard # Length-5 list mirrors fit_wrapper's [par_ini, par_fin, conf_ci, # emcee_fin, emcee_ci]. Project fits do not run per-file MCMC, so # emcee slots are inert (None / empty DataFrame). model.result = [ None, types.SimpleNamespace( params=model.lmfit_pars, method=joint_method, nvarys=joint_nvarys, ), pd.DataFrame(), None, pd.DataFrame(), ] # const/args mirror File.fit_2d so _append_2d_slot can evaluate # the per-file fit grid via fitlib.residual_fun. Project fits # always go through the MCP path. model.const = (f.energy, f.data, "fit_model_mcp", 0, f.e_lim, f.t_lim) model.args = (model, 2) self._project_fit_result = result # Append a per-file 2D slot to Project._fit_history so the joint fit # is discoverable via Project.results, matching the File.fit_2d() # entry point. if joint_result: for f in self.files: f._append_2d_slot(model_name=model_name, fit_fun_str="fit_model_mcp") # Save per-file 2D fit results (silently) if self.auto_export: saved = self.show_output self.show_output = 0 try: for f in self.files: if f.model_2d is not None: path_2d = f.create_model_path(model_name, fit_type="2d") f._save_2d_fit_legacy(save_path=path_2d) finally: self.show_output = saved if self.show_output >= 1: fitlib.time_display( t_start=t_start, print_str="Time elapsed for project-level 2D fit: ", ) # Show saved 2D fit plots in a grid import matplotlib.image as mpimg images = [] for f in self.files: img_path = ( f.create_model_path(model_name, fit_type="2d") / "2D_data_fit_res.png" ) if img_path.exists(): images.append(mpimg.imread(str(img_path))) if images: uplt.plot_grid(images, columns=min(3, len(images)))
# # class File: """ Data container with model management and fitting capabilities. File represents a single spectroscopy dataset (1D or 2D) with associated energy and time axes. It manages multiple models for comparison, handles baseline extraction, set fitting limits, and orchestrates baseline fitting, Slice-by-Slice fitting, and full 2D fitting workflows. Parameters ---------- parent_project : Project, optional Parent Project instance for configuration. If None, creates default test project internally. path : str or Path, default='test' File path for result directory structure and data I/O. name : str, optional Human-readable identifier used for ``project["name"]`` lookup and plot titles. Defaults to the stem of ``path`` (e.g. ``"data_1"`` for ``path="data_1.h5"``). Useful when multiple files share a path (e.g. HDF5 groups) or when the path is not descriptive. data : ndarray, optional Spectroscopy data to fit: - 1D: shape [n_energy] for energy-resolved spectra - 2D: shape [n_time, n_energy] for time- and energy-resolved data energy : array-like, optional Energy axis values. If None and data provided, uses indices. time : array-like, optional Time axis values (for 2D data only). If None and 2D data provided, uses indices. aux_axis : array-like, optional Auxiliary physical axis (e.g. depth, position) for Profile models. Propagated to models loaded via ``load_model()``. Attributes ---------- p : Project Parent project providing configuration path : str or Path File identifier data : ndarray Spectroscopy data (1D or 2D), with dark subtraction and sensitivity calibration applied (if any) data_raw : ndarray Original unmodified spectroscopy data as passed to the constructor dim : int Data dimensionality (1 or 2) energy : ndarray Energy axis dark : ndarray Dark/background spectrum subtracted from data (zeros by default) calibration : ndarray Sensitivity calibration spectrum dividing data (ones by default) time : ndarray or None Time axis (None for 1D data) aux_axis : ndarray or None Auxiliary physical axis for Profile models (None if unused) models : list of Model All models loaded for this file model_active : Model or None Currently active model (default for operations) e_lim_abs, e_lim : list Energy fitting limits (absolute values and indices) t_lim_abs, t_lim : list Time fitting limits (absolute values and indices) data_base : ndarray or None Baseline spectrum (averaged from specified time range) base_t_abs, base_t_ind : list Time range for baseline extraction (absolute and indices) model_base : Model or None Model used for baseline fitting model_sbs : Model or None Model used for Slice-by-Slice fitting results_sbs : list Slice-by-Slice fit results for all time slices model_spec : Model or None Model used for individual spectrum fitting data_spec : ndarray or None Extracted 1D spectrum for individual spectrum fitting spec_t_abs, spec_t_ind : list Time bounds for spectrum extraction (absolute values and indices) plot_config : PlotConfig Plot configuration (created from parent Project on first access) Notes ----- **Typical Workflow:** 1. Create File with data, energy, time (and optionally aux_axis) 2. Load model(s) with load_model() 3. Set active model with set_active_model() 4. Optionally attach Profile models with add_par_profile() 5. Optionally add time-dependence with add_time_dependence() 6. Define baseline with define_baseline() (for 2D data) 7. Set fit limits with set_fit_limits() 8. Fit baseline with fit_baseline() 9. Perform Slice-by-Slice or 2D fit with fit_slice_by_slice() or fit_2d() **Model Management:** File can manage multiple models simultaneously for comparison. Use select_model() to retrieve specific models by name or index, and set_active_model() to change which model is used by default. **Baseline Spectrum:** For 2D data, extract a baseline/pre-trigger/ground-state reference spectrum using define_baseline(). This is required before fitting time-dependent data. """ #
[docs] def __init__( self, parent_project: Project | None = None, path: PathLike = "test", name: str | None = None, data: np.ndarray | None = None, energy: np.ndarray | None = None, time: np.ndarray | None = None, aux_axis: np.ndarray | None = None, ) -> None: # pass parent project or (default) create a functioning test project environment self.p = parent_project if parent_project is not None else Project(path=None) self.path = path # path to load/save [?] data from self.name = name if name is not None else pathlib.Path(path).stem # Check for duplicate names before registering existing = [f.name for f in self.p.files] if self.name in existing: raise ValueError( f'Duplicate file name "{self.name}". ' f"Pass an explicit name= to disambiguate " f'(e.g. name="{self.name}_2").' ) self.p.files.append(self) # register with parent project self._plot_config: PlotConfig | None = None # create plot config from project self.data = data # (time-[optional] and) energy-dependent data to fit self.data_raw: np.ndarray | None = data.copy() if data is not None else None self.dim = 0 if data is None else data.ndim # 1/2 D for energy/+time # take energy and time input or create a generic axis if None is passed if energy is not None or data is None: self.energy = energy elif self.dim == 1: self.energy = np.arange(data.shape[0]) else: self.energy = np.arange(data.shape[1]) if time is not None or self.dim <= 1 or data is None: self.time = time else: self.time = np.arange(data.shape[0]) self.aux_axis: np.ndarray | None = ( aux_axis # auxiliary physical axis (e.g. depth) ) # data correction arrays (dark subtraction, sensitivity calibration) n_energy = self.energy.shape[0] if self.energy is not None else 0 self.dark: np.ndarray | None = np.zeros(n_energy) if data is not None else None self.calibration: np.ndarray | None = ( np.ones(n_energy) if data is not None else None ) # keep track of models that are used to fit this file/data self.models: list[mcp.Model] = [] self.model_active: mcp.Model | None = None # default model to work with # Energy and time limits for fitting methods self.e_lim_abs: list[float] = [] # energy limits (low, high) user-defined self.e_lim: list[int] = [] # index [start, stop) for energy[start:stop] self.t_lim_abs: list[float] = [] # time limits (low, high) user-defined self.t_lim: list[int] = [] # index [start, stop) for time[start:stop] # self.base_t_abs: list[ float ] = [] # start and stop time of the baseline spectrum self.base_t_ind: list[int] = [] # index of the above start and stop time self.data_base = None # average spectrum between above indices self.model_base: mcp.Model | None = None # self.model_sbs: mcp.Model | None = None self.model_2d: mcp.Model | None = None # all Slice-by-Slice fit results (different from model_sbs.result) self.results_sbs: list = [] # self.model_spec: mcp.Model | None = None # model for individual spectrum fit self.data_spec: np.ndarray | None = None # extracted 1D spectrum self.spec_t_abs: list[float] = [] # time bounds (absolute) self.spec_t_ind: list[int] = [] # time bounds (indices) # Noise metadata — inherited from parent Project at construction # users override per file via File.set_sigma() # materialized into each saved slot at fit completion self.noise_type: str = getattr(self.p, "noise_type", fit_io.NOISE_TYPE_UNKNOWN) self.sigma_source: str = getattr( self.p, "sigma_source", fit_io.SIGMA_SOURCE_USER ) self.sigma_type: str = getattr(self.p, "sigma_type", fit_io.SIGMA_TYPE_CONSTANT) self.sigma_data: float = float(getattr(self.p, "sigma_data", float("nan"))) # default fit limits to entire dataset (energy is None only for bare File()) if self.energy is not None: self.set_fit_limits(energy_limits=None, show_plot=False)
@property def plot_config(self) -> PlotConfig: """ Get plot config for this File. Created from parent Project on first access. File can then customize persistently (e.g., for different time axes across files). """ if self._plot_config is None: self._plot_config = PlotConfig.from_project(self.p) return self._plot_config @plot_config.setter def plot_config(self, config: PlotConfig) -> None: """Allow setting a custom config for this File""" self._plot_config = config # def __repr__(self) -> str: shape = self.data.shape if self.data is not None else None n_models = len(self.models) return ( f"File(name='{self.name}', data={shape}, {n_models} models, dim={self.dim})" ) #
[docs] def describe(self, *, waterfall: float | None = None) -> None: """ Display information about this file's data. Plots the data with current fit limits indicated by reference lines (or reduced opacity in waterfall mode). For 1D data, shows energy spectrum. For 2D data, shows time- and energy-resolved map or a waterfall plot for small datasets. Parameters ---------- waterfall : float or None, optional Controls 2D data visualization mode. * ``None`` (default) — auto-select: waterfall plot if ``len(self.time) <= 12``, 2D map otherwise. The waterfall offset is set to the maximum peak-to-peak range across spectra. * ``0`` — force 2D map regardless of dataset size. * nonzero float — force waterfall plot with this y-offset between traces. Notes ----- In waterfall mode, traces outside the active time fit window (``t_lim_abs``) are drawn at reduced opacity (alpha = 0.35). """ if self.p.show_output < 1: return print(f"File: {self.name} [path: {self.path}]") if self.data is None: warnings.warn("No data loaded; nothing to describe.", stacklevel=2) return if self.energy is None: self.energy = np.arange(self.data.shape[-1]) warnings.warn("Energy axis missing; using index axis.", stacklevel=2) if self.dim == 2 and self.time is None: self.time = np.arange(self.data.shape[0]) warnings.warn("Time axis missing; using index axis.", stacklevel=2) config = self.plot_config if self.dim == 1: uplt.plot_1d( data=[ self.data, ], x=self.energy, config=config, vlines=self.e_lim_abs, ) elif self.dim == 2: assert self.time is not None # type guard — ensured above _WATERFALL_MAX_SPECTRA = 12 use_waterfall = ( waterfall is None and len(self.time) <= _WATERFALL_MAX_SPECTRA ) use_waterfall = use_waterfall or (waterfall is not None and waterfall != 0) if use_waterfall: if waterfall is None or waterfall == 0: ptp = np.nanmax(self.data, axis=1) - np.nanmin(self.data, axis=1) waterfall = float(np.nanmax(ptp)) legend = [f"{t:.4g}" for t in self.time] if len(self.t_lim_abs) == 2: t_lo, t_hi = self.t_lim_abs alphas = [1.0 if t_lo <= t <= t_hi else 0.35 for t in self.time] else: alphas = None uplt.plot_1d( data=self.data, x=self.energy, config=config, waterfall=waterfall, legend=legend, vlines=self.e_lim_abs, alphas=alphas, y_label=config.z_label, y_dir="def", y_type=config.z_type, ) else: uplt.plot_2d( data=self.data, x=self.energy, y=self.time, config=config, vlines=self.e_lim_abs, hlines=self.t_lim_abs, )
#
[docs] def model_list_to_name(self, model_list: Sequence[str]) -> str: """ Create composite model name from list of submodel names. Joins individual model names with underscores. Used primarily for mcp.Dynamics models with multiple subcycles. For single-element lists, returns that element unchanged. Parameters ---------- model_list : list of str List of model names to combine Returns ------- str Combined model name (e.g., ['model1', 'model2'] -> 'model1_model2') """ return "_".join(model_list) # see str.join()
#
[docs] def select_model(self, model_info: ModelRef) -> mcp.Model | None: """ Select model by name, index, or multi-cycle name list. Parameters ---------- model_info : str, int, or list of str Model identifier: name, position in ``self.models``, or list of submodel names (joined with ``_`` to match composite name). Returns ------- Model or None The matched model, or None if not found. """ if isinstance(model_info, str): for m in self.models: if m.name == model_info: return m return None if isinstance(model_info, int): if 0 <= model_info < len(self.models): return self.models[model_info] return None if isinstance(model_info, list): m_name = self.model_list_to_name(model_info) for m in self.models: if m.name == m_name: return m return None
#
[docs] def set_active_model(self, model_info: ModelRef) -> None: """ Set model to be used as active model. All functions requiring a model input will default to the currently active model unless a model is specified as input to the respective function (via ``model_info``). Parameters ---------- model_info : str or int Model identifier (name or index) """ self.model_active = self.select_model(model_info)
#
[docs] def load_model( self, model_yaml: PathLike, model_info: str | list[str], par_name: str = "", model_type: Literal["energy", "dynamics", "profile"] = "energy", ) -> mcp.Model: """ Load a model from YAML file. Loads a model defined in ``model_yaml`` file located in Parent.path. Parameters ---------- model_yaml : str or Path YAML file name (located in project path) defining the model model_info : str or list of str Model name(s) to load. A bare string is treated as a single-element list. - For energy-dependent models (1D or 2D): ``'model_name'`` or ``['model_name']`` (single element). Model will be set as active model. - For standard time-dependent models: ``'model_name'`` or ``['model_name']``. Single element applies to entire time axis. - For multi-cycle time-dependent models: ``['model1', 'model2', ...]``. Element 0 applies to entire time axis, elements 1+ apply to respective subcycles only. - For profile models: ``'model_name'`` or ``['model_name']`` (single element). par_name : str, default='' Parameter name for dynamics and profile models. Empty string (default) indicates an energy-dependent model. For ``"dynamics"`` and ``"profile"`` model types, must match the name of the energy model parameter that the loaded model will be attached to. model_type : {'energy', 'dynamics', 'profile'}, default='energy' Type of model to load: - ``'energy'``: energy- (and time-)dependent spectral model - ``'dynamics'``: time-dependence of a single model parameter - ``'profile'``: spatial profile of a single model parameter over aux_axis Returns ------- Model The loaded model. For ``'energy'`` models the model is also registered in ``self.models`` and set as the active model. """ # normalize bare string to single-element list if isinstance(model_info, str): model_info = [model_info] if model_type == "energy" and len(model_info) != 1: raise ValueError( 'Energy-resolved data (model_type="energy") require a single model' " name in model_info.\nPass model name as the only element in the" " model_info list." ) if model_type == "profile" and len(model_info) != 1: raise ValueError( 'Profile models (model_type="profile") require a single model name' " in model_info." ) if model_type == "energy": if self.select_model(model_info) is not None: raise ValueError( f'Model named "{self.model_list_to_name(model_info)}"' " already exists. Delete the existing model" " or change the name of the new model." ) # Load and process YAML file with appropriate numbering strategy model_yaml_path = self.p.path / pathlib.Path(model_yaml) model_info_dict = uparsing.load_and_number_yaml_components( model_yaml_path=model_yaml_path, model_info=model_info, is_dynamics=model_type == "dynamics", ) # Initialize model fcts_package: types.ModuleType if model_type == "energy": if self.p.show_output >= 1: print( f"Loading model to describe energy- (and time-)dependent data: " f"{self.model_list_to_name(model_info)}" ) fcts_package = fcts_energy loaded_model = mcp.Model(self.model_list_to_name(model_info)) elif model_type == "dynamics": if self.p.show_output >= 1: print( f"Loading model to describe time-dependence of a model parameter: " f"{par_name} of {self.model_list_to_name(model_info)} model" ) fcts_package = fcts_time loaded_model = mcp.Dynamics(par_name) elif model_type == "profile": if self.p.show_output >= 1: print( f"Loading profile model for parameter: " f"{par_name} of {self.model_list_to_name(model_info)} model" ) fcts_package = fcts_profile loaded_model = mcp.Profile(par_name) else: raise ValueError( f'Model type "{model_type}" not recognized. Must be one of:' ' "energy", "dynamics", or "profile".' ) # Inherit necessary model attributes from function input, file, and project loaded_model.yaml_f_name = pathlib.Path(model_yaml).stem # yaml file name loaded_model.dim = 1 # start with 1, +1 when adding dynamics if isinstance(loaded_model, mcp.Dynamics): loaded_model.subcycles = len(model_info) - 1 loaded_model.parent_file = self loaded_model.energy = self.energy loaded_model.time = self.time loaded_model.aux_axis = self.aux_axis all_comps = [] # initialize component list # Go through (sub)model(s) # (for mcp.Dynamics model instances length model_info could be larger than 1) for subcycle, submodel in enumerate(model_info): # Get the section defined by model_info try: submodel_info = model_info_dict[submodel] except KeyError as err: available_models = list(model_info_dict.keys()) raise ValueError( f'Model "{submodel}" not found in {model_yaml}\n' f"Available models in this file: {available_models}\n" f"Check for typos in model name." ) from err # Create components for this submodel using existing mcp.Component logic for c_name, c_info in submodel_info.items(): c_temp = mcp.Component(c_name, fcts_package, subcycle) c_temp.add_pars(c_info) all_comps.append(c_temp) # Add all components (and their parameters) to model loaded_model.add_components(all_comps) # Add model to file if model_type == "energy": self.models.append(loaded_model) self.set_active_model(model_info) # set as current active model return loaded_model
#
[docs] def describe_model( self, model_info: ModelRef | None = None, detail: int = 0 ) -> None: """ Display information about a specific model. Shows model parameters and optionally plots data with initial guess and residual. Useful for inspecting models before fitting. Parameters ---------- model_info : str or int, optional Model identifier (name or index). If None, uses currently active model. detail : {0, 1}, default=0 Level of detail: - 0: Show parameter table only - 1: Show parameters and plot data/initial guess/residual """ if self.p.show_output < 1: return mod = self.model_active if model_info is None else self.select_model(model_info) if mod is None: warnings.warn("Model not found; nothing to describe.", stacklevel=2) return # parameter list mod.describe(detail=0) if detail == 1 and isinstance(mod, mcp.Dynamics): mod.create_value_1d(store_1d=1) # update individual component spectra mod.plot_1d(plot_sum=False) # plot guess only (individual components) if detail == 1 and mod.dim == 1: if self.energy is None or self.data_base is None: warnings.warn( "Energy axis or baseline data missing;" " cannot plot 1D model summary.", stacklevel=2, ) return mod.create_value_1d(store_1d=1) # update individual component spectra # plot initial guess (individual components), data, and residual title_mod = ( f"File: {self.path}, " f'Model: "{model_info}" (from "{mod.yaml_f_name}.yaml")' ": initial guess" ) fitlib.plt_fit_res_1d( x=self.energy, y=self.data_base, fit_fun_str=self.p.spec_fun_str, par_init=[], par_fin=mod.lmfit_pars, args=(mod, 1), plot_sum=False, show_init=False, title=title_mod, fit_lim=self.e_lim, config=self.plot_config, legend=[comp.name for comp in mod.components], ) if detail == 1 and mod.dim == 2: mod.create_value_2d() # update spectrum if self.data is None or mod.value_2d is None: warnings.warn( "2D data/model values missing; cannot plot 2D model summary.", stacklevel=2, ) return # plot data, fit, and residual 2D maps fitlib.plt_fit_res_2d( data=self.data, fit=mod.value_2d, x=self.energy, y=self.time, config=self.plot_config, x_lim=self.e_lim, y_lim=self.t_lim, )
#
[docs] def delete_model(self, model_to_delete: ModelRef | None = None) -> None: """ Remove a model from this file's model list. Parameters ---------- model_to_delete : str, int, list of str, or None, optional Model to delete (name, index, multi-cycle name list, or None for the currently active model). Notes ----- After deletion, model_active may be invalid. Set a new active model if needed using set_active_model(). """ if model_to_delete is None: if self.model_active is None: warnings.warn("No active model to delete.", stacklevel=2) return self.models.remove(self.model_active) return mod = self.select_model(model_to_delete) if mod is None: warnings.warn( f"delete_model: Model {model_to_delete!r} not found.", stacklevel=2, ) return self.models.remove(mod)
#
[docs] def reset_models(self) -> None: """ Remove all models from this file. Clears the models list and resets model_active to None. Use this to start fresh with model loading without creating a new File object. """ self.models = []
# def fingerprint(self) -> dict[str, Any]: """ Multi-sha content fingerprint of this file. Recomputed on every call so corrections that mutate ``self.data`` (subtract_dark, calibrate_data, reset_corrections) propagate into slot identity. Sha256 over typical data is sub-ms; the cost is negligible compared to a fit, and a stale cache silently collapses pre- and post-correction slots into the same ``history_key``. """ if self.data is None or self.energy is None: raise ValueError("Cannot fingerprint a File without data and energy axis.") return fit_io.compute_file_fingerprint( data=self.data, energy=self.energy, time=self.time ) #
[docs] def create_model_path( self, model_name: str, *, fit_type: Literal["baseline", "spectrum", "sbs", "2d"], subfolders: list[str] | None = None, ) -> pathlib.Path: """ Create directory structure for saving model fit results. Layout: ``{Project.path_results}/{File.name}/{fit_type}/{model_name}/``. Creates directories if they don't exist. Parameters ---------- model_name : str Name of model (must exist in self.models) fit_type : {"baseline", "spectrum", "sbs", "2d"} Fit type segment in the output path. subfolders : list of str, default=[] Additional subdirs to create (e.g., ['slices'] for Slice-by-Slice fits) Returns ------- Path Path to model results directory """ path_model = self.p.path_results / self.name / fit_type / model_name path_model.mkdir(parents=True, exist_ok=True) if subfolders is None: subfolders = [] for subfolder in subfolders: (path_model / subfolder).mkdir(parents=True, exist_ok=True) return path_model
# def _apply_corrections(self) -> None: """Rebuild ``data`` from ``data_raw`` by applying dark and calibration.""" assert self.data_raw is not None # type guard assert self.dark is not None # type guard assert self.calibration is not None # type guard self.data = (self.data_raw - self.dark) / self.calibration # recompute baseline if it was previously defined if self.base_t_abs: self.define_baseline( self.base_t_abs[0], self.base_t_abs[1], show_plot=False ) #
[docs] def subtract_dark(self, dark: np.ndarray) -> None: """ Subtract a dark/background spectrum from the data. The dark spectrum is subtracted from every row (time slice) of the data before dividing by the sensitivity calibration. Replaces any previously set dark spectrum. Parameters ---------- dark : ndarray, shape (n_energy,) Dark/background spectrum to subtract. Must have the same length as the energy axis. Notes ----- Apply dark subtraction before fitting. The correction is stored and can be replaced by calling this method again, or removed with ``reset_dark()``. The original data is always preserved in ``data_raw``. """ if self.data_raw is None or self.energy is None: raise ValueError("No data loaded; cannot subtract dark.") n_energy = self.energy.shape[0] if dark.ndim != 1 or dark.shape[0] != n_energy: raise ValueError( f"dark must be 1D with length {n_energy}, got shape {dark.shape}." ) self.dark = dark self._apply_corrections()
#
[docs] def calibrate_data(self, calibration: np.ndarray) -> None: """ Apply sensitivity calibration by dividing the data. The (dark-subtracted) data is divided by the calibration spectrum. Replaces any previously set calibration. Parameters ---------- calibration : ndarray, shape (n_energy,) Sensitivity/response spectrum to divide by. Must have the same length as the energy axis and must not contain zeros. Notes ----- Apply calibration before fitting. The correction is stored and can be replaced by calling this method again, or removed with ``reset_calibration()``. The original data is always preserved in ``data_raw``. """ if self.data_raw is None or self.energy is None: raise ValueError("No data loaded; cannot calibrate.") n_energy = self.energy.shape[0] if calibration.ndim != 1 or calibration.shape[0] != n_energy: raise ValueError( f"calibration must be 1D with length {n_energy}, " f"got shape {calibration.shape}." ) if np.any(calibration == 0): raise ValueError( "calibration contains zeros; division by zero is not allowed." ) self.calibration = calibration self._apply_corrections()
#
[docs] def reset_dark(self) -> None: """Reset dark subtraction to zeros (no subtraction).""" if self.data_raw is None or self.energy is None: raise ValueError("No data loaded; cannot reset dark.") self.dark = np.zeros(self.energy.shape[0]) self._apply_corrections()
#
[docs] def reset_calibration(self) -> None: """Reset sensitivity calibration to ones (no calibration).""" if self.data_raw is None or self.energy is None: raise ValueError("No data loaded; cannot reset calibration.") self.calibration = np.ones(self.energy.shape[0]) self._apply_corrections()
#
[docs] def define_baseline( self, time_start: float, time_stop: float, *, time_type: str = "abs", show_plot: bool = True, ) -> None: """ Define ground state/pre-trigger/baseline reference spectrum. 2D data will be cut and averaged between the specified time points to produce the baseline spectrum. Parameters ---------- time_start : float or int Start point in time, inclusive (absolute value or index, see time_type) time_stop : float or int Stop point in time, inclusive (absolute value or index, see time_type) time_type : {'abs', 'ind'}, default='abs' Type of time specification: - 'abs': Absolute time stamps - 'ind': Time array indices show_plot : bool, default=True If True, plot the resulting baseline spectrum """ if self.dim == 1: raise ValueError("Cannot define baseline for 1D data.") if self.data is None: raise ValueError("No data loaded; cannot define baseline.") if self.time is None: self.time = np.arange(self.data.shape[0]) warnings.warn( "Time axis missing; using index axis for baseline definition.", stacklevel=2, ) if self.energy is None: self.energy = np.arange(self.data.shape[1]) warnings.warn("Energy axis missing; using index axis.", stacklevel=2) self.base_t_ind = self._resolve_time_selection( time_start, time_stop, time_type=time_type ) self.base_t_abs = [ self.time[self.base_t_ind[0]], self.time[self.base_t_ind[1] - 1], ] # cut and average self.data_base = np.mean( self.data[self.base_t_ind[0] : self.base_t_ind[1], :], axis=0 ) # plot if show_plot: if self.data_base is None: warnings.warn( "Baseline data is unavailable; skipping baseline plot.", stacklevel=2, ) return uplt.plot_1d( data=[ self.data_base, ], x=self.energy, config=self.plot_config, title=f"Baseline data: t in {self.base_t_abs} (idx: {self.base_t_ind})", )
#
[docs] def set_fit_limits( self, energy_limits: Sequence[float] | None, *, time_limits: Sequence[float] | None = None, show_plot: bool = True, ) -> None: """ Set energy (and time) limits for fits. Pass absolute values (NOT indices). Parameters ---------- energy_limits : list of float or None Energy range for fitting ``[min, max]`` in absolute values. If None, uses full energy range ``[np.min(energy), np.max(energy)]``. time_limits : list of float, optional Time range for fitting ``[min, max]`` in absolute values. If None, no time limits are applied. show_plot : bool, default=True If True, plot data with fit limits indicated """ if self.data is None and self.energy is None: raise ValueError("No data/energy axis loaded; cannot set fit limits.") if self.energy is None and self.data is not None: self.energy = np.arange(self.data.shape[-1]) warnings.warn("Energy axis missing; using index axis.", stacklevel=2) if self.energy is None: raise ValueError("Energy axis unavailable; cannot set fit limits.") energy = self.energy if energy_limits is None: energy_limits = [float(np.min(energy)), float(np.max(energy))] self.e_lim_abs = [np.min(energy_limits), np.max(energy_limits)] # convert energy limits to [start, stop) slice indices # searchsorted with side='left' for lower bound, side='right' for upper bound if energy[0] > energy[-1]: # descending energy rev = energy[::-1] start = len(energy) - int( np.searchsorted(rev, np.max(energy_limits), side="right") ) stop = len(energy) - int( np.searchsorted(rev, np.min(energy_limits), side="left") ) self.e_lim = [start, stop] else: # ascending energy start = int(np.searchsorted(energy, np.min(energy_limits), side="left")) stop = int(np.searchsorted(energy, np.max(energy_limits), side="right")) self.e_lim = [start, stop] if time_limits is None and self.time is not None: time_limits = [float(np.min(self.time)), float(np.max(self.time))] if time_limits is not None: if self.time is None: if self.data is None or self.dim != 2: raise ValueError("Time axis missing; cannot apply time limits.") self.time = np.arange(self.data.shape[0]) warnings.warn( "Time axis missing; using index axis for time limits.", stacklevel=2, ) self.t_lim_abs = list(time_limits) self.t_lim = self._resolve_time_selection( float(np.min(time_limits)), float(np.max(time_limits)) ) if show_plot: # show data with limits if self.dim == 1: if self.data is None: warnings.warn("Data missing; cannot plot fit limits.", stacklevel=2) return x_cut = energy[self.e_lim[0] : self.e_lim[1]] y_cut = self.data[self.e_lim[0] : self.e_lim[1]] uplt.plot_1d( data=[self.data, y_cut], x=[energy, x_cut], config=self.plot_config, waterfall=(np.max(np.abs(y_cut)) - np.min(np.abs(y_cut))) / 8, legend=["all", "cut"], vlines=self.e_lim_abs, ) elif self.dim == 2: if self.data is None: warnings.warn( "Data missing; cannot plot 2D fit limits.", stacklevel=2, ) return uplt.plot_2d( data=self.data, x=energy, y=self.time, config=self.plot_config, vlines=self.e_lim_abs, hlines=self.t_lim_abs, )
# def set_sigma( self, sigma: float | None, *, noise_type: str | None = None, sigma_source: str = fit_io.SIGMA_SOURCE_USER, sigma_type: str = fit_io.SIGMA_TYPE_CONSTANT, ) -> float | None: """ Set the per-pixel noise σ for this file. Subsequent fits on this file will materialize the σ into their saved slots (chi2 / chi2_red calibrated from chi2_raw / chi2_red_raw). The change is stateful but does **not** retroactively rewrite existing slots. Parameters ---------- sigma : float or None Per-pixel σ in data units. Pass ``None`` to clear (slots fit afterwards will record ``noise_type='unknown'`` and ``NaN`` σ fields, so calibrated metrics resolve to ``NaN``). Must be a finite positive number when not ``None``. noise_type : str, optional ``"gaussian"`` or ``"unknown"``. Defaults to ``"gaussian"`` when ``sigma`` is set, ``"unknown"`` when ``sigma`` is ``None``. sigma_source : str, default ``"user_supplied"`` v1 only supports ``"user_supplied"``. sigma_type : str, default ``"constant"`` v1 only supports ``"constant"``. Returns ------- float or None The previous ``sigma_data`` value (``None`` if unset). Stash this if you intend to run additional fits under a different σ and restore the file's prior σ state afterwards. Notes ----- ``set_sigma`` only affects **future** fits on this file. Slots already in ``Project._fit_history`` keep the σ snapshot that was materialized at their fit completion; ``compare_models()`` reads those snapshots and is therefore unaffected by σ changes made after the fit. For an alternative calibration of *existing* results, divide the always-present ``chi2_red_raw`` column by ``alt_sigma**2`` directly on the returned DataFrame — no API needed. Raises ------ ValueError If ``sigma`` is not ``None`` and not a finite positive number, or if any of the discriminator fields is outside v1's supported subset. """ new_sigma_data = fit_io.normalize_sigma_data(sigma) is_unset = not np.isfinite(new_sigma_data) new_noise_type = noise_type if new_noise_type is None: new_noise_type = ( fit_io.NOISE_TYPE_UNKNOWN if is_unset else fit_io.NOISE_TYPE_GAUSSIAN ) fit_io.validate_noise_metadata( noise_type=new_noise_type, sigma_source=sigma_source, sigma_type=sigma_type, ) previous = None if not np.isfinite(self.sigma_data) else float(self.sigma_data) self.sigma_data = new_sigma_data self.noise_type = new_noise_type self.sigma_source = sigma_source self.sigma_type = sigma_type return previous #
[docs] def fit_baseline( self, model_name: str, stages: int = 1, **lmfit_wrapper_kwargs ) -> None: """ Fit the baseline/ground state/pre-trigger reference spectrum. Parameters ---------- model_name : str Name of a previously loaded model (use File.load_model first) stages : {1, 2}, default=1 Number of optimization stages: - 1: Single optimization with ``fit_alg_1`` - 2: Two-stage fit (``fit_alg_1`` then ``fit_alg_2``) **lmfit_wrapper_kwargs Additional keyword arguments passed to fitlib.fit_wrapper """ t_base = time.time() # start timing for baseline fit self.model_base = self._resolve_model(model_name) if self.model_base.dim == 2: raise ValueError( f'Model "{model_name}" has time dependence (dim=2) and cannot ' "be used for a 1D baseline fit. Use a model without dynamics." ) if self.energy is None or self.data_base is None: raise ValueError( "Baseline data/energy axis missing; cannot fit baseline.\n" "Run define_baseline() first to extract the baseline region." ) # get initial guess initial_guess = ulmfit.par_extract( self.model_base.lmfit_pars, return_type="list" ) # define (and create) path where basline fit results will be saved to path_base_results = self.create_model_path(model_name, fit_type="baseline") # const = (x, data, package, fnctn string, unpack, energy limits, time limits) _fun_str = self.p.spec_fun_str self.model_base.const = ( self.energy, self.data_base, _fun_str, 0, self.e_lim, [], ) # --- dispatch: GIR fast path vs interpreter --- _args = self._build_1d_dispatch_args(self.model_base, _fun_str) self.model_base.args = _args # CSV format/delimiter: project defaults unless caller overrides lmfit_wrapper_kwargs.setdefault("num_fmt", self.p.num_fmt) lmfit_wrapper_kwargs.setdefault("delim", self.p.delim) # fit (optionally) with confidence intervals self.model_base.result = fitlib.fit_wrapper( const=self.model_base.const, args=self.model_base.args, par_names=self.model_base.parameter_names, par=self.model_base.lmfit_pars, stages=stages, show_output=1 if self.p.show_output >= 1 else 0, save_output=1 if self.p.auto_export else 0, save_path=path_base_results / model_name, **lmfit_wrapper_kwargs, ) # Write optimized values back to model.lmfit_pars. fit_wrapper # optimizes a deepcopy, so model.lmfit_pars may be stale when # the GIR path was used (it never calls model.update_value). if stages >= 1 and self.model_base.result[1] != []: self.model_base.update_value( new_par_values=ulmfit.par_extract( self.model_base.result[1], return_type="list" ) ) self._append_baseline_slot(model_name=model_name, fit_fun_str=_fun_str) if self.p.auto_export: self.save_baseline_fit(save_path=path_base_results) # display/plot and save baseline fit summary title_base = ( f"File: {self.path}, " f'Model: "{model_name}" (from "{self.model_base.yaml_f_name}.yaml")' ) save_plot = self.p.auto_export show_plot = self.p.show_output >= 1 if save_plot or show_plot: fitlib.plt_fit_res_1d( x=self.energy, y=self.data_base, fit_fun_str=self.p.spec_fun_str, par_init=initial_guess, par_fin=self.model_base.result[1], args=self.model_base.args, plot_sum=False, show_init=True, title=title_base, fit_lim=self.e_lim, config=self.plot_config, legend=[comp.name for comp in self.model_base.components], save_img=uplt._save_img_flag(save=save_plot, show=show_plot), save_path=path_base_results / "base_fit.png", ) if stages >= 1 and self.p.show_output >= 1: fitlib.time_display( t_start=t_base, print_str="Time elapsed for baseline fit: " ) display(self.model_base.result[1].params) # display final pars below figure
# def _save_1d_fit(self, model: mcp.Model | None, save_path: PathLike) -> None: """ Internal helper: write ``fit_1d.csv`` for a 1D fitted model. Columns: ``energy``, ``sum``, then one column per component (named after ``component.name``). """ if model is None or self.energy is None: raise ValueError("Model/energy missing; nothing to save.") if not model.result or not getattr(model.result[1], "params", None): return # mocked / placeholder; nothing to dump model.create_value_1d(store_1d=1) if model.value_1d is None: raise ValueError( "Model evaluation did not produce value_1d; nothing to save." ) columns: dict[str, np.ndarray] = { "energy": np.asarray(self.energy), "sum": np.asarray(model.value_1d), } for comp, arr in zip(model.components, model.component_spectra, strict=True): columns[comp.name] = np.asarray(arr) pd.DataFrame(columns).to_csv( pathlib.Path(save_path) / "fit_1d.csv", index=False, float_format=self.p.num_fmt, sep=self.p.delim, ) # def save_baseline_fit(self, save_path: PathLike) -> None: """ Export baseline fit as ``fit_1d.csv``. Evaluates the baseline model at final parameters and writes a CSV with columns ``energy``, ``sum``, and one column per component. Parameters ---------- save_path : str or Path Directory path for saving (file: ``save_path/fit_1d.csv``). """ self._save_1d_fit(self.model_base, save_path) #
[docs] def fit_spectrum( self, model_name: str, *, time_point: float | None = None, time_range: tuple[float, float] | None = None, time_type: Literal["abs", "ind"] = "abs", stages: int = 1, show_plot: bool = True, **lmfit_wrapper_kwargs, ) -> None: """ Fit a 1D model to an individual spectrum at a selected time point or range. Extracts a single spectrum from 2D data at the given time point or by averaging over a time range, then fits it with the specified model. Parameters ---------- model_name : str Name of a previously loaded model (use File.load_model first). time_point : float or int, optional Single time value to extract the spectrum at. Mutually exclusive with *time_range*. time_range : tuple of (float, float), optional ``(start, stop)`` time bounds (inclusive). Slices in this window are averaged to produce the spectrum. Mutually exclusive with *time_point*. time_type : {'abs', 'ind'}, default='abs' How *time_point* / *time_range* values are interpreted: - ``'abs'``: absolute time-axis values - ``'ind'``: integer indices into the time array stages : {1, 2}, default=1 Number of optimization stages: - 1: single optimization with ``fit_alg_1`` - 2: two-stage fit (``fit_alg_1`` then ``fit_alg_2``) show_plot : bool, default=True If True, display the fit result plot. **lmfit_wrapper_kwargs Additional keyword arguments passed to ``fitlib.fit_wrapper``. Raises ------ ValueError If the data is not 2D, or neither *time_point* nor *time_range* is provided, or *time_type* is invalid. """ t_spec = time.time() # start timing for spectrum fit if self.dim != 2: raise ValueError( "fit_spectrum() requires 2D data. " "For 1D data use fit_baseline() instead." ) if self.data is None or self.energy is None or self.time is None: raise ValueError( "Data, energy axis, or time axis is missing; " "cannot fit individual spectrum." ) if time_point is None and time_range is None: raise ValueError( "Provide either time_point or time_range to select a spectrum." ) if time_point is not None and time_range is not None: raise ValueError("time_point and time_range are mutually exclusive.") self.model_spec = self._resolve_model(model_name) if self.model_spec.dim == 2: raise ValueError( f'Model "{model_name}" has time dependence (dim=2) and cannot ' "be used for a 1D spectrum fit. Use a model without dynamics, " "or use fit_slice_by_slice() / fit_2d() instead." ) # extract 1D spectrum at selected time point / range if time_point is not None: self.spec_t_ind = self._resolve_time_selection( time_point, time_point, time_type=time_type ) self.spec_t_abs = [ float(self.time[self.spec_t_ind[0]]), float(self.time[self.spec_t_ind[0]]), ] self.data_spec = self.data[self.spec_t_ind[0], :] else: assert time_range is not None # type guard self.spec_t_ind = self._resolve_time_selection( time_range[0], time_range[1], time_type=time_type ) self.spec_t_abs = [ float(self.time[self.spec_t_ind[0]]), float(self.time[self.spec_t_ind[1] - 1]), ] self.data_spec = np.mean( self.data[self.spec_t_ind[0] : self.spec_t_ind[1], :], axis=0 ) assert self.data_spec is not None # type guard # get initial guess initial_guess = ulmfit.par_extract( self.model_spec.lmfit_pars, return_type="list" ) # define (and create) path where spectrum fit results will be saved to path_spec_results = self.create_model_path(model_name, fit_type="spectrum") # const = (x, data, fnctn string, unpack, energy limits, time limits) _fun_str = self.p.spec_fun_str self.model_spec.const = ( self.energy, self.data_spec, _fun_str, 0, self.e_lim, [], ) # --- dispatch: GIR fast path vs interpreter --- _args = self._build_1d_dispatch_args(self.model_spec, _fun_str) self.model_spec.args = _args # CSV format/delimiter: project defaults unless caller overrides lmfit_wrapper_kwargs.setdefault("num_fmt", self.p.num_fmt) lmfit_wrapper_kwargs.setdefault("delim", self.p.delim) # fit self.model_spec.result = fitlib.fit_wrapper( const=self.model_spec.const, args=self.model_spec.args, par_names=self.model_spec.parameter_names, par=self.model_spec.lmfit_pars, stages=stages, show_output=1 if self.p.show_output >= 1 else 0, save_output=1 if self.p.auto_export else 0, save_path=path_spec_results / model_name, **lmfit_wrapper_kwargs, ) # Write optimized values back to model.lmfit_pars (see fit_baseline). if stages >= 1 and self.model_spec.result[1] != []: self.model_spec.update_value( new_par_values=ulmfit.par_extract( self.model_spec.result[1], return_type="list" ) ) self._append_spectrum_slot( model_name=model_name, fit_fun_str=_fun_str, time_point=time_point, time_range=list(time_range) if time_range is not None else None, time_type=time_type, ) if self.p.auto_export: self.save_spectrum_fit(save_path=path_spec_results) # display/plot and save spectrum fit summary time_label = ( f"t = {self.spec_t_abs[0]}" if self.spec_t_abs[0] == self.spec_t_abs[1] else f"t in [{self.spec_t_abs[0]}, {self.spec_t_abs[1]}]" ) title_spec = ( f"File: {self.path}, {time_label}, " f'Model: "{model_name}" ' f'(from "{self.model_spec.yaml_f_name}.yaml")' ) save_fig = self.p.auto_export show_fig = show_plot and self.p.show_output >= 1 if save_fig or show_fig: fitlib.plt_fit_res_1d( x=self.energy, y=self.data_spec, fit_fun_str=self.p.spec_fun_str, par_init=initial_guess, par_fin=self.model_spec.result[1], args=self.model_spec.args, plot_sum=False, show_init=True, title=title_spec, fit_lim=self.e_lim, config=self.plot_config, legend=[comp.name for comp in self.model_spec.components], save_img=uplt._save_img_flag(save=save_fig, show=show_fig), save_path=path_spec_results / "spec_fit.png", ) if stages >= 1 and self.p.show_output >= 1: fitlib.time_display( t_start=t_spec, print_str="Time elapsed for spectrum fit: " ) display(self.model_spec.result[1].params)
# def save_spectrum_fit(self, save_path: PathLike) -> None: """ Export single-spectrum fit as ``fit_1d.csv``. Evaluates the spectrum model at final parameters and writes a CSV with columns ``energy``, ``sum``, and one column per component. Parameters ---------- save_path : str or Path Directory path for saving (file: ``save_path/fit_1d.csv``). """ self._save_1d_fit(self.model_spec, save_path) # def save_fit( self, filepath: PathLike | str | None = None, *, model: str | Sequence[str] | None = None, fit_type: fit_io.FitType | Sequence[fit_io.FitType] | None = None, overwrite: bool = False, show_output: int = 1, ) -> None: """ Save this file's fit slots to an HDF5 archive. One-line delegate to ``self.p.save_fits(file=self, ...)``. Useful when the user holds a ``File`` reference and wants to persist its fits without first reaching into the parent ``Project``. See :meth:`Project.save_fits` for full semantics. """ self.p.save_fits( filepath, file=self, model=model, fit_type=fit_type, overwrite=overwrite, show_output=show_output, ) # def export_fit( self, filepath: PathLike | str | None = None, *, format: Literal["csv"] = "csv", model: str | Sequence[str] | None = None, fit_type: fit_io.FitType | Sequence[fit_io.FitType] | None = None, overwrite: bool = False, show_output: int = 1, ) -> None: """ Export this file's fit slots as a CSV/PNG tree. One-line delegate to ``self.p.export_fits(file=self, ...)``. See :meth:`Project.export_fits` for full semantics and output layout. """ self.p.export_fits( filepath, format=format, file=self, model=model, fit_type=fit_type, overwrite=overwrite, show_output=show_output, ) # # ------------------------------------------------------------------ # Deprecated aliases — scheduled for removal before v1.0.0. # See TODO.md "Build & release → Remove legacy/backwards-compat code". # ------------------------------------------------------------------ #
[docs] def save_sbs_fit(self, save_path: PathLike) -> None: """ .. deprecated:: Use :meth:`File.export_fit` (``fit_type="sbs"``) instead. ``save_sbs_fit`` is scheduled for removal before v1.0.0. Behavior is preserved: this wrapper still writes the legacy on-disk layout. Only the warning is new. """ warnings.warn( "File.save_sbs_fit is deprecated and will be removed before " "v1.0.0; use File.export_fit(fit_type='sbs', filepath=...) " "for the supported export pipeline.", DeprecationWarning, stacklevel=2, ) self._save_sbs_fit_legacy(save_path)
#
[docs] def save_2d_fit(self, save_path: PathLike) -> None: """ .. deprecated:: Use :meth:`File.export_fit` (``fit_type="2d"``) instead. ``save_2d_fit`` is scheduled for removal before v1.0.0. Behavior is preserved: this wrapper still writes the legacy on-disk layout. Only the warning is new. """ warnings.warn( "File.save_2d_fit is deprecated and will be removed before " "v1.0.0; use File.export_fit(fit_type='2d', filepath=...) " "for the supported export pipeline.", DeprecationWarning, stacklevel=2, ) self._save_2d_fit_legacy(save_path)
#
[docs] def fit_slice_by_slice( self, model_name: str, stages: int = 1, n_workers: int | None = None, *, seed_source: Literal["model", "baseline", "explicit"] = "baseline", seed_values: Any | None = None, seed_adapt: Literal["argmax_shift"] | None = "argmax_shift", **fit_wrapper_kwargs, ) -> None: """ Fit time- and energy-resolved spectrum Slice-by-Slice (SbS). Treats every time step as independent from other times. Each slice starts from a shared seed template selected via ``seed_source``, optionally adapted per slice via ``seed_adapt``. There is no cross-slice warm start, which keeps the workflow embarrassingly parallel. Parameters ---------- model_name : str Name of a previously loaded model (use File.load_model first) stages : {1, 2}, default=1 Number of optimization stages: - 1: Single optimization with ``fit_alg_1`` - 2: Two-stage fit (``fit_alg_1`` then ``fit_alg_2``) n_workers : int or None, default=None Number of parallel worker processes. Slices are independent so SbS is embarrassingly parallel. - ``None``: auto, ``os.cpu_count() - 1`` (leaves one core free) - ``1``: serial path (debug escape hatch — no pool overhead) - ``N > 1``: ``ProcessPoolExecutor`` with the ``spawn`` start method, the only portable option (Windows lacks ``fork``). Workers reuse one pickled model installed at startup, so per-slice pickle overhead is bounded. Notes for very small fits: spawn worker startup costs ~200-500ms on Linux/macOS and ~1-2s on Windows per worker, so for fits of fewer than ~20 slices, ``n_workers=1`` is usually faster. seed_source : {'model', 'baseline', 'explicit'}, default='baseline' Shared parameter template used to seed every slice before any per-slice adaptation is applied. - ``'model'``: use the current ``model_sbs.lmfit_pars`` values - ``'baseline'``: use the stored ``fit_baseline()`` result values - ``'explicit'``: use ``seed_values`` after normalizing it to the model parameter order seed_values : optional Explicit seed template used when ``seed_source='explicit'``. Accepts the same broad value shapes as ``ulmfit.par_extract()`` plus dicts keyed by parameter name. seed_adapt : {None, 'argmax_shift'}, default='argmax_shift' Optional per-slice tweak applied after resolving the shared seed template and before fitting a slice. - ``None``: use the same initial guess for every slice - ``'argmax_shift'``: shift all parameters ending in ``'_x0'`` by the difference between the current slice's argmax energy and the baseline spectrum's argmax energy **fit_wrapper_kwargs Additional keyword arguments passed to fitlib.fit_wrapper Notes ----- The actual per-slice fit results live in ``self.results_sbs``. ``self.model_sbs`` is retained as shared model/reconstruction context and is restored to the unadapted seed template after fitting. """ t_sbs = time.time() # start timing for SbS fit self.model_sbs = self._resolve_model(model_name) if self.model_sbs.dim == 2: raise ValueError( f'Model "{model_name}" has time dependence (dim=2) and cannot ' "be used for Slice-by-Slice fitting. " "Use a model without dynamics, or use fit_2d() instead." ) if self.data is None or self.time is None or self.energy is None: raise ValueError("Data/axes missing; cannot run Slice-by-Slice fit.") if seed_source not in ("model", "baseline", "explicit"): raise ValueError("seed_source must be 'model', 'baseline', or 'explicit'.") if seed_adapt not in (None, "argmax_shift"): raise ValueError("seed_adapt must be None or 'argmax_shift'.") if seed_source == "baseline" and ( self.model_base is None or not self.model_base.result ): raise ValueError( "Baseline seed requested but baseline model is not fitted yet; " "run fit_baseline() first or use seed_source='model'/'explicit'." ) if seed_source != "explicit" and seed_values is not None: raise ValueError("seed_values is only used when seed_source='explicit'.") if seed_source == "explicit" and seed_values is None: raise ValueError("seed_source='explicit' requires seed_values.") if seed_adapt == "argmax_shift" and self.data_base is None: raise ValueError( "seed_adapt='argmax_shift' requires baseline data; " "run define_baseline() first or use seed_adapt=None." ) # define (and create) path where SbS fit results will be saved to path_sbs_results = self.create_model_path( model_name, fit_type="sbs", subfolders=[ "slices", ], ) if seed_source == "model": seed_template = ulmfit.par_extract( self.model_sbs.lmfit_pars, return_type="list" ) elif seed_source == "baseline": assert self.model_base is not None # type guard seed_template = ulmfit.par_extract( self.model_base.result[1], return_type="list" ) else: seed_template = usbs.extract_sbs_seed_template( seed_values, self.model_sbs.parameter_names, ) self.model_sbs.update_value(new_par_values=seed_template, par_select="all") # find all parameters with names ending in "x0" # so they can be updated for every slice e_pos_pars = [ name for name in self.model_sbs.parameter_names if name.endswith("_x0") ] e_pos_vals: pd.Series | None data_base_argmax_energy: float | None if seed_adapt == "argmax_shift": e_pos_vals = pd.Series( data=[self.model_sbs.lmfit_pars[name].value for name in e_pos_pars], index=e_pos_pars, dtype=float, ) assert self.data_base is not None # type guard data_base_argmax_energy = float(self.energy[np.argmax(self.data_base)]) else: e_pos_vals = None data_base_argmax_energy = None # --- dispatch: GIR fast path vs interpreter --- _fun_str = self.p.spec_fun_str _args_sbs = self._build_1d_dispatch_args(self.model_sbs, _fun_str) n_slices = len(self.data) def _slice_path(s_i: int) -> pathlib.Path: return path_sbs_results / "slices" / str(self.p.da_slices_fmt % s_i) # resolve worker count: None -> auto, otherwise honour user. if n_workers is None: n_workers = max(1, (os.cpu_count() or 1) - 1) # No point spawning more workers than slices. n_workers = max(1, min(n_workers, n_slices)) # CSV format/delimiter: project defaults unless caller overrides # (forwarded into both the serial and parallel SbS dispatch paths) fit_wrapper_kwargs.setdefault("num_fmt", self.p.num_fmt) fit_wrapper_kwargs.setdefault("delim", self.p.delim) if n_workers == 1: # serial path (debug escape hatch). self.results_sbs = [] for s_i, s in enumerate(self.data): print(f"Analyzing slice number {s_i + 1}/{len(self.time)}", end="\r") path_slice = _slice_path(s_i) initial_guess = usbs.prepare_sbs_model_for_slice( self.model_sbs, _args_sbs, seed_template, seed_adapt=seed_adapt, s=s, energy=self.energy, e_lim=self.e_lim, e_pos_pars=e_pos_pars, e_pos_vals=e_pos_vals, data_base_argmax_energy=data_base_argmax_energy, fit_fun_str=_fun_str, ) const = self.model_sbs.const args = self.model_sbs.args assert const is not None assert args is not None result_sbs = fitlib.fit_wrapper( const=const, args=args, par_names=self.model_sbs.parameter_names, par=self.model_sbs.lmfit_pars, stages=stages, show_output=0, save_output=1 if self.p.auto_export else 0, save_path=path_slice, **fit_wrapper_kwargs, ) self.results_sbs.append(result_sbs) if self.p.auto_export: fitlib.plt_fit_res_1d( x=const[0], y=const[1], fit_fun_str=self.p.spec_fun_str, par_init=initial_guess, par_fin=result_sbs[1], args=args, plot_sum=False, show_init=True, fit_lim=self.e_lim, config=self.plot_config, save_img=-1, save_path=path_slice.with_suffix(".png"), ) else: # parallel path: spawn pool, install model once per worker. ctx = multiprocessing.get_context("spawn") by_id: dict[int, list[Any]] = {} with concurrent.futures.ProcessPoolExecutor( max_workers=n_workers, mp_context=ctx, initializer=usbs.sbs_worker_init, initargs=(self.model_sbs, _args_sbs, seed_template), ) as executor: futures = { executor.submit( usbs.sbs_fit_one_slice, s_i, self.data[s_i], energy=self.energy, e_lim=self.e_lim, seed_adapt=seed_adapt, e_pos_pars=e_pos_pars, e_pos_vals=e_pos_vals, data_base_argmax_energy=data_base_argmax_energy, fit_fun_str=_fun_str, stages=stages, path_slice=_slice_path(s_i), plot_config=self.plot_config, fit_wrapper_kwargs=fit_wrapper_kwargs, auto_export=self.p.auto_export, ): s_i for s_i in range(n_slices) } try: for fut in tqdm( concurrent.futures.as_completed(futures), total=len(futures), desc=f"SbS fit ({n_workers} workers)", ): slice_idx, result_sbs = fut.result() by_id[slice_idx] = result_sbs except BaseException: # fail-fast: cancel remaining futures and re-raise for f in futures: f.cancel() raise self.results_sbs = [by_id[i] for i in sorted(by_id)] # mirror the serial path's final model state so downstream # consumers (save_sbs_fit, plot helpers) see identical # const/args regardless of which path produced the results. self.model_sbs.const = ( self.energy, self.data[n_slices - 1], _fun_str, 0, self.e_lim, [], ) self.model_sbs.args = _args_sbs if stages >= 1: # Extract slot BEFORE save_sbs_fit / seed restoration so the slot # captures pristine per-slice fit state (results_sbs and parameter # names taken before any post-fit cleanup). self._append_sbs_slot(model_name=model_name, fit_fun_str=_fun_str) if self.p.auto_export: self._save_sbs_fit_legacy(save_path=path_sbs_results) self.model_sbs.update_value(new_par_values=seed_template, par_select="all") self.model_sbs.args = _args_sbs if stages >= 1: fitlib.time_display( t_start=t_sbs, print_str="Time elapsed for Slice-by-Slice fit: " )
# def _save_sbs_fit_legacy(self, save_path: PathLike) -> None: """ Legacy SbS export — preserves the original on-disk layout used by the auto-export path inside :meth:`fit_slice_by_slice`. Internal use only. The public ``save_sbs_fit`` is a deprecated wrapper that forwards to :meth:`export_fit` (different layout); prefer :meth:`export_fit` for new code. """ if self.model_sbs is None or self.time is None: raise ValueError( "Slice-by-Slice model/results are incomplete; nothing to save." ) if self.data is None: raise ValueError("Data missing; cannot save Slice-by-Slice fit.") if self.energy is None: raise ValueError("Energy axis missing; cannot save Slice-by-Slice fit.") if self.model_sbs.const is None or self.model_sbs.args is None: raise ValueError( "Slice-by-Slice model const/args missing; cannot reconstruct 2D fit." ) # axis sidecars (one value per row); paired with fit_2d.csv np.savetxt( pathlib.Path(save_path) / "energy.csv", np.asarray(self.energy), fmt=self.p.num_fmt, delimiter=self.p.delim, ) np.savetxt( pathlib.Path(save_path) / "time.csv", np.asarray(self.time), fmt=self.p.num_fmt, delimiter=self.p.delim, ) # convert results, specifically par_fin to dataframe and save # this also plots all parameters as a function of time df_sbs = fitlib.results_to_df( results=self.results_sbs, x=self.time, index=np.arange(0, len(self.time)), config=self.plot_config, save_df=-1 if self.p.show_output == 0 else 1, save_path=save_path, num_fmt=self.p.num_fmt, delim=self.p.delim, ) # get slice-by-slice fit spectra as a 2D map df_sbs_pars = df_sbs.loc[:, self.model_sbs.parameter_names] fit_2d_sbs = fitlib.results_to_fit_2d( results=df_sbs_pars, const=self.model_sbs.const, args=self.model_sbs.args, num_fmt=self.p.num_fmt, delim=self.p.delim, save_2d=-1 if self.p.show_output == 0 else 1, save_path=save_path, ) # plot data, fit, and residual 2D maps fitlib.plt_fit_res_2d( data=self.data, fit=fit_2d_sbs, x=self.energy, y=self.time, config=self.plot_config, x_lim=self.e_lim, y_lim=self.t_lim, save_img=-1 if self.p.show_output == 0 else 1, save_path=save_path, ) # # ------------------------------------------------------------------ # Slot capture (eager extraction into Project._fit_history) # ------------------------------------------------------------------ # def _append_baseline_slot(self, *, model_name: str, fit_fun_str: str) -> None: """ Build a SavedFitSlot from the just-completed baseline fit and append it to ``self.p._fit_history``. Uses copied snapshot args so the slot is invariant to subsequent state changes on ``self.model_base``. """ assert self.model_base is not None # type guard assert self.data_base is not None # type guard assert self.energy is not None # type guard if self.data is None: return # data_base-only fixture / no source File data to fingerprint result_fin = self.model_base.result[1] if not hasattr(result_fin, "params"): return # mocked / placeholder result; nothing to record # Evaluate model on the same grid as data_base, then crop to e_lim # so observed.shape == fit.shape and matches the residual grid. fit_full = np.asarray( fitlib.residual_fun( par=result_fin.params, x=self.energy, data=self.data_base, fit_fun_str=fit_fun_str, args=self.model_base.args, res_type="fit", ) ) e_lim = list(self.e_lim) if self.e_lim else None if e_lim: observed = self.data_base[e_lim[0] : e_lim[1]].copy() fit_arr = fit_full[e_lim[0] : e_lim[1]].copy() else: observed = self.data_base.copy() fit_arr = fit_full.copy() params_df = ulmfit.par_to_df( result_fin.params, col_type="min", par_names=self.model_base.parameter_names, ) conf_ci = self.model_base.result[2] mcmc = fit_io._mcmc_payload( self.model_base.result[3], self.model_base.result[4], ) slot = fit_io._slot_from_baseline( file_fingerprint=self.fingerprint(), file_name=self.name, model_name=model_name, fit_alg=str(getattr(result_fin, "method", "unknown")), yaml_filename=self.model_base.yaml_f_name, params_df=params_df, observed=observed, fit=fit_arr, base_t_ind=list(self.base_t_ind), e_lim=e_lim, n_free_pars=int(getattr(result_fin, "nvarys", 0)), noise_type=self.noise_type, sigma_source=self.sigma_source, sigma_type=self.sigma_type, sigma_data=self.sigma_data, conf_ci=conf_ci if not conf_ci.empty else None, mcmc=mcmc, ) self.p._fit_history.append(slot) # def _append_spectrum_slot( self, *, model_name: str, fit_fun_str: str, time_point: float | None, time_range: list[float] | None, time_type: str, ) -> None: """Build and append a SavedFitSlot for a completed spectrum fit.""" assert self.model_spec is not None # type guard assert self.data_spec is not None # type guard assert self.energy is not None # type guard result_fin = self.model_spec.result[1] if not hasattr(result_fin, "params"): return # mocked / placeholder result; nothing to record fit_full = np.asarray( fitlib.residual_fun( par=result_fin.params, x=self.energy, data=self.data_spec, fit_fun_str=fit_fun_str, args=self.model_spec.args, res_type="fit", ) ) e_lim = list(self.e_lim) if self.e_lim else None if e_lim: observed = self.data_spec[e_lim[0] : e_lim[1]].copy() fit_arr = fit_full[e_lim[0] : e_lim[1]].copy() else: observed = self.data_spec.copy() fit_arr = fit_full.copy() params_df = ulmfit.par_to_df( result_fin.params, col_type="min", par_names=self.model_spec.parameter_names, ) conf_ci = self.model_spec.result[2] mcmc = fit_io._mcmc_payload( self.model_spec.result[3], self.model_spec.result[4], ) slot = fit_io._slot_from_spectrum( file_fingerprint=self.fingerprint(), file_name=self.name, model_name=model_name, fit_alg=str(getattr(result_fin, "method", "unknown")), yaml_filename=self.model_spec.yaml_f_name, params_df=params_df, observed=observed, fit=fit_arr, time_point=time_point, time_range=time_range, time_type=time_type, e_lim=e_lim, n_free_pars=int(getattr(result_fin, "nvarys", 0)), noise_type=self.noise_type, sigma_source=self.sigma_source, sigma_type=self.sigma_type, sigma_data=self.sigma_data, conf_ci=conf_ci if not conf_ci.empty else None, mcmc=mcmc, ) self.p._fit_history.append(slot) # def _append_sbs_slot(self, *, model_name: str, fit_fun_str: str) -> None: """ Build and append a SavedFitSlot for a completed slice-by-slice fit. Must be called BEFORE the seed-template restoration that runs at the end of ``fit_slice_by_slice`` — the helper takes copied snapshot args and is invariant to subsequent state changes, but the per-slice ``residual_fun`` evaluation here uses the still-correct ``self.model_sbs.const/args``. """ assert self.model_sbs is not None # type guard assert self.data is not None # type guard assert self.energy is not None # type guard if not self.results_sbs or not hasattr(self.results_sbs[0][1], "params"): return # mocked / placeholder results; nothing to record n_slices = len(self.data) e_lim = list(self.e_lim) if self.e_lim else None # Per-slice observed view + per-slice model evaluation. residual_fun # is called once per slice with that slice's final params. observed_rows = [] fit_rows = [] for s_i in range(n_slices): slice_data = self.data[s_i] slice_par = self.results_sbs[s_i][1].params fit_full = np.asarray( fitlib.residual_fun( par=slice_par, x=self.energy, data=slice_data, fit_fun_str=fit_fun_str, args=self.model_sbs.args, res_type="fit", ) ) if e_lim: observed_rows.append(slice_data[e_lim[0] : e_lim[1]].copy()) fit_rows.append(fit_full[e_lim[0] : e_lim[1]].copy()) else: observed_rows.append(slice_data.copy()) fit_rows.append(fit_full.copy()) observed = np.stack(observed_rows, axis=0) fit_arr = np.stack(fit_rows, axis=0) # Per-slice DataFrame (one row per slice, columns = parameter values). params_df = ulmfit.list_of_par_to_df(self.results_sbs) # n_free_pars + fit_alg captured from slice 0 (consistent across slices # because the same model_sbs is used for every slice). slice0_result = self.results_sbs[0][1] slice0_conf_ci = self.results_sbs[0][2] # MCMC payload — captured from slice 0, mirroring fit_alg / nvarys. # Per-slice MCMC chains are not stored; aggregate analysis uses # slice 0 as the representative. slice0_mcmc = fit_io._mcmc_payload( self.results_sbs[0][3], self.results_sbs[0][4], ) slot = fit_io._slot_from_sbs( file_fingerprint=self.fingerprint(), file_name=self.name, model_name=model_name, fit_alg=str(getattr(slice0_result, "method", "unknown")), yaml_filename=self.model_sbs.yaml_f_name, params_df=params_df, observed=observed, fit=fit_arr, e_lim=e_lim, t_lim=None, n_free_pars=int(getattr(slice0_result, "nvarys", 0)), noise_type=self.noise_type, sigma_source=self.sigma_source, sigma_type=self.sigma_type, sigma_data=self.sigma_data, conf_ci=slice0_conf_ci if not slice0_conf_ci.empty else None, mcmc=slice0_mcmc, ) self.p._fit_history.append(slot) # def _append_2d_slot(self, *, model_name: str, fit_fun_str: str) -> None: """Build and append a SavedFitSlot for a completed 2D global fit.""" assert self.model_2d is not None # type guard assert self.data is not None # type guard assert self.energy is not None # type guard result_fin = self.model_2d.result[1] if not hasattr(result_fin, "params"): return # mocked / placeholder result; nothing to record # Evaluate the 2D model on the full grid; crop to (t_lim, e_lim) so # observed.shape == fit.shape and matches the residual grid. fit_full = np.asarray( fitlib.residual_fun( par=result_fin.params, x=self.energy, data=self.data, fit_fun_str=fit_fun_str, args=self.model_2d.args, res_type="fit", ) ) e_lim = list(self.e_lim) if self.e_lim else None t_lim = list(self.t_lim) if self.t_lim else None observed = self.data fit_arr = fit_full if t_lim: observed = observed[t_lim[0] : t_lim[1], :] fit_arr = fit_arr[t_lim[0] : t_lim[1], :] if e_lim: observed = observed[:, e_lim[0] : e_lim[1]] fit_arr = fit_arr[:, e_lim[0] : e_lim[1]] observed = observed.copy() fit_arr = fit_arr.copy() params_df = ulmfit.par_to_df( result_fin.params, col_type="min", par_names=self.model_2d.parameter_names, ) conf_ci = self.model_2d.result[2] mcmc = fit_io._mcmc_payload( self.model_2d.result[3], self.model_2d.result[4], ) slot = fit_io._slot_from_2d( file_fingerprint=self.fingerprint(), file_name=self.name, model_name=model_name, fit_alg=str(getattr(result_fin, "method", "unknown")), yaml_filename=self.model_2d.yaml_f_name, params_df=params_df, observed=observed, fit=fit_arr, e_lim=e_lim, t_lim=t_lim, n_free_pars=int(getattr(result_fin, "nvarys", 0)), noise_type=self.noise_type, sigma_source=self.sigma_source, sigma_type=self.sigma_type, sigma_data=self.sigma_data, conf_ci=conf_ci if not conf_ci.empty else None, mcmc=mcmc, ) self.p._fit_history.append(slot) # def _resolve_model(self, model_name: str | None) -> mcp.Model: """ Resolve a model by name, falling back to model_active. Raises ValueError if the model cannot be found. """ if model_name is None: if self.model_active is None: available = [m.name for m in self.models] raise ValueError( "No active model set. Call set_active_model() first " "or pass target_model explicitly.\n" f"Available models: {available or 'none loaded'}" ) return self.model_active mod = self.select_model(model_name) if mod is None: available = [m.name for m in self.models] raise ValueError( f"Model '{model_name}' not found.\n" f"Available models: {available or 'none loaded'}" ) return mod # def _build_1d_dispatch_args( self, model: mcp.Model, fit_fun_str: str, ) -> tuple[Any, ...]: """Return compiled 1D evaluator args when available. Plain energy models loaded on a 2D File inherit the file time axis for convenience. For 1D fitting workflows we compile against a copied model with ``time=None`` so the graph lowers as ``ENERGY_1D`` while the runtime still evaluates against the original model object. """ if fit_fun_str not in ("fit_model_gir", "fit_model_compare"): return (model, 1) from trspecfit.graph_ir import build_graph, can_lower_1d, schedule_1d model_1d = copy.copy(model) model_1d.time = None graph = build_graph(model_1d) if can_lower_1d(graph): plan = schedule_1d(graph) name_to_idx = {n: i for i, n in enumerate(model.parameter_names)} theta_indices = np.array( [name_to_idx[n] for n in plan.opt_param_names], dtype=np.intp, ) return (plan, theta_indices, model, 1) return (model, 1) # def _resolve_time_selection( self, t_start: float, t_stop: float, *, time_type: str = "abs", ) -> list[int]: """ Convert time bounds to validated ``[ind_start, ind_stop)`` slice indices. For a single time point pass ``t_start == t_stop``. Both bounds are inclusive in the input; the returned stop is exclusive. Raises ValueError if the result is out of range or empty. """ if self.time is None: raise ValueError("Time axis is not set.") n = len(self.time) if time_type == "abs": if t_start == t_stop: ind_start = int(np.searchsorted(self.time, t_start, side="left")) ind_stop = ind_start + 1 else: ind_start = int(np.searchsorted(self.time, t_start, side="left")) ind_stop = int(np.searchsorted(self.time, t_stop, side="right")) elif time_type == "ind": ind_start = int(t_start) ind_stop = int(t_stop) + 1 if t_start == t_stop else int(t_stop + 1) else: raise ValueError( f"Unknown time_type '{time_type}'. Expected 'abs' or 'ind'." ) if ind_start >= ind_stop or ind_start >= n or ind_stop <= 0: raise ValueError( f"Time selection resolves to an empty or out-of-range slice " f"[{ind_start}:{ind_stop}). " f"Time axis has {n} points [{self.time[0]}, {self.time[-1]}]." ) ind_start = max(ind_start, 0) ind_stop = min(ind_stop, n) return [ind_start, ind_stop] #
[docs] def add_time_dependence( self, target_model: str, target_parameter: str, dynamics_yaml: PathLike, dynamics_model: str | list[str], *, frequency: float = -1, ) -> None: """ Add time dependence for one parameter of a model. Loads a "Dynamics"-type model to describe time-dependent behavior. The parameter can live either directly in the energy model or inside a Profile model attached to an energy model parameter. To avoid strongly correlated fits, adding dynamics directly to an energy-model parameter that already has a profile (``p_vary=True``) is currently disallowed. In that case, add dynamics to a profile parameter instead. Parameters ---------- target_model : str Name of the energy model to add dynamics to. target_parameter : str Name of parameter to make time-dependent. Can be an energy model parameter or a profile model parameter. dynamics_yaml : str or Path YAML file name defining the Dynamics model. dynamics_model : str or list of str Model name(s) for time-dependent behavior. frequency : float, default=-1 Repetition frequency for time-dependent behavior. -1 (default) means no repetition (single cycle). """ model = self._resolve_model(target_model) t_mod = self.load_model( dynamics_yaml, dynamics_model, target_parameter, model_type="dynamics", ) # Try energy model first ci, pi = model.find_par_by_name(target_parameter) if ci is not None and pi is not None: target_par = model.components[ci].pars[pi] if target_par.p_vary: raise ValueError( f"Cannot add time dependence to parameter " f"'{target_parameter}' because it already has a profile " "(p_vary=True). This is currently disabled to avoid strongly " "correlated fits. Add dynamics to a profile parameter " "instead, or remove/fix the profile first." ) model.add_dynamics(cast("mcp.Dynamics", t_mod), frequency) model.dim = 2 return # Search profile models attached to energy model parameters for comp in model.components: for par in comp.pars: if par.p_vary and par.p_model is not None: pci, _ppi = par.p_model.find_par_by_name(target_parameter) if pci is not None: # Add dynamics to the profile model par.p_model.add_dynamics(cast("mcp.Dynamics", t_mod), frequency) # Sync dynamics params into the energy model's parameter list par.lmfit_par_list.extend(t_mod.lmfit_par_list) model.update() model.dim = 2 return # Not found in energy model or any attached profile available = model.parameter_names profile_pars: list[str] = [] for comp in model.components: for par in comp.pars: if par.p_vary and par.p_model is not None: profile_pars.extend(par.p_model.parameter_names) if profile_pars: available = available + profile_pars raise ValueError( f"Parameter '{target_parameter}' not found in model " f"'{model.name}' or any attached profile models.\n" f"Available parameters: {available}" )
#
[docs] def add_par_profile( self, target_model: str, target_parameter: str, profile_yaml: PathLike, profile_model: str | list[str], ) -> None: """ Add a parameter profile over the auxiliary axis to a model. Loads a ``"profile"``-type model from a YAML file and attaches it to the named parameter, so that the parameter varies over ``aux_axis`` (uniform averaging over the auxiliary dimension). If any parameters inside the profile model are time-dependent (``t_vary=True``), the model's dim is automatically set to 2. To avoid strongly correlated fits, adding a profile to an energy-model parameter that already has time dependence (``t_vary=True``) is currently disallowed. Parameters ---------- target_model : str Name of the energy model to add the profile to. target_parameter : str Name of the parameter to attach the profile to (e.g. ``'GLP_01_A'``). profile_yaml : str or Path YAML file name defining the Profile model. profile_model : str or list of str Model name(s) for the profile (single element). """ model = self._resolve_model(target_model) p_mod = self.load_model( profile_yaml, profile_model, target_parameter, model_type="profile", ) ci, pi = model.find_par_by_name(target_parameter) if ci is None or pi is None: raise ValueError( f"Parameter '{target_parameter}' not found in model '{model.name}'.\n" f"Available parameters: {model.parameter_names}" ) target_par = model.components[ci].pars[pi] if target_par.t_vary: raise ValueError( f"Cannot add profile to parameter '{target_parameter}' because " "it already has time dependence (t_vary=True). This is currently " "disabled to avoid strongly correlated fits. Add profile first " "and dynamics to a profile parameter instead, or remove/fix " "time dependence first." ) model.add_profile(cast("mcp.Profile", p_mod)) # auto-promote to 2D if any parameter inside the profile is time-dependent if any(p.t_vary for comp in p_mod.components for p in comp.pars): model.dim = 2
#
[docs] def fit_2d(self, model_name: str, stages: int = 1, **fit_wrapper_kwargs) -> None: """ Perform energy- and time-dependent 2D model fit. Parameters ---------- model_name : str Name of the model to fit (loaded via File.load_model) stages : {1, 2}, default=1 Number of optimization stages: - 1: Single optimization with ``fit_alg_1`` - 2: Two-stage fit (``fit_alg_1`` then ``fit_alg_2``) **fit_wrapper_kwargs Additional keyword arguments passed to fitlib.fit_wrapper (see fitlib.fit_wrapper for details) """ t_2d = time.time() # start timing for 2D fit self.model_2d = self._resolve_model(model_name) if self.model_base is None: raise ValueError( "Baseline model is not fitted yet; run fit_baseline() first." ) if self.energy is None or self.time is None or self.data is None: raise ValueError("Data/axes missing; cannot run 2D fit.") # define (and create) path where 2D fit results will be saved to path_2d_results = self.create_model_path(model_name, fit_type="2d") # set all fixed 2D fit parameters equal to baseline model results base_df = ulmfit.par_to_df(self.model_base.lmfit_pars, col_type="min") self.model_2d.update_value( new_par_values=list(base_df["value"]), par_select=list(base_df["name"]) ) # --- dispatch: GIR fast path vs interpreter --- _fun_str = self.p.spec_fun_str if _fun_str in ("fit_model_gir", "fit_model_compare"): from trspecfit.graph_ir import build_graph, can_lower_2d, schedule_2d _graph = build_graph(self.model_2d) if can_lower_2d(_graph): _plan = schedule_2d(_graph) _name_to_idx = { n: i for i, n in enumerate(self.model_2d.parameter_names) } _theta_indices = np.array( [_name_to_idx[n] for n in _plan.opt_param_names], dtype=np.intp, ) _args: tuple[Any, ...] = ( _plan, _theta_indices, self.model_2d, 2, ) else: # Not lowerable — fit_model_gir delegates to fit_model_mcp _args = (self.model_2d, 2) else: _args = (self.model_2d, 2) # const [x, data, function string, unpack, energy limits, time limits] self.model_2d.const = ( self.energy, self.data, _fun_str, 0, self.e_lim, self.t_lim, ) self.model_2d.args = _args # CSV format/delimiter: project defaults unless caller overrides fit_wrapper_kwargs.setdefault("num_fmt", self.p.num_fmt) fit_wrapper_kwargs.setdefault("delim", self.p.delim) # fit (with confidence intervals) self.model_2d.result = fitlib.fit_wrapper( const=self.model_2d.const, args=self.model_2d.args, par_names=self.model_2d.parameter_names, par=self.model_2d.lmfit_pars, stages=stages, show_output=1 if self.p.show_output >= 1 else 0, save_output=1 if self.p.auto_export else 0, save_path=path_2d_results / model_name, **fit_wrapper_kwargs, ) # Write optimized values back to model.lmfit_pars. fit_wrapper # optimizes a deepcopy, so model.lmfit_pars may be stale — especially # on the GIR path where fit_model_gir never calls model.update_value. if stages >= 1 and self.model_2d.result[1] != []: final_params = self.model_2d.result[1].params for name in self.model_2d.parameter_names: if name in final_params: self.model_2d.lmfit_pars[name].value = final_params[name].value self._append_2d_slot(model_name=model_name, fit_fun_str=_fun_str) if stages >= 1: if self.p.auto_export: self._save_2d_fit_legacy(save_path=path_2d_results) fitlib.time_display( t_start=t_2d, print_str="Time elapsed for 2D model fit: " ) display(self.model_2d.result[1].params) # display final pars below figure
# def _save_2d_fit_legacy(self, save_path: PathLike) -> None: """ Legacy 2D export — preserves the original on-disk layout used by the auto-export path inside :meth:`fit_2d` and :meth:`Project.fit_2d`. Internal use only. The public ``save_2d_fit`` is a deprecated wrapper that forwards to :meth:`export_fit` (different layout); prefer :meth:`export_fit` for new code. """ if ( self.model_2d is None or self.energy is None or self.time is None or self.data is None ): raise ValueError("2D model/data/axes missing; nothing to save.") self.model_2d.create_value_2d() # update 2D spectrum to final fit result if self.model_2d.value_2d is None: raise ValueError( "2D model evaluation did not produce value_2d; nothing to save." ) # save 2D fit map as CSV (rows=time, cols=energy), mirroring save_sbs_fit np.savetxt( pathlib.Path(save_path) / "fit_2d.csv", self.model_2d.value_2d, fmt=self.p.num_fmt, delimiter=self.p.delim, ) # axis sidecars (one value per row); paired with fit_2d.csv np.savetxt( pathlib.Path(save_path) / "energy.csv", np.asarray(self.energy), fmt=self.p.num_fmt, delimiter=self.p.delim, ) np.savetxt( pathlib.Path(save_path) / "time.csv", np.asarray(self.time), fmt=self.p.num_fmt, delimiter=self.p.delim, ) # plot data, fit, and residual 2D maps fitlib.plt_fit_res_2d( data=self.data, fit=self.model_2d.value_2d, x=self.energy, y=self.time, config=self.plot_config, x_lim=self.e_lim, y_lim=self.t_lim, save_img=-1 if self.p.show_output == 0 else 1, save_path=save_path, ) # dpi_plot = round(1.5 *self.p.dpi_plt), NOT AVAILABLE YET (fig_size) #
[docs] def get_fit_results( self, *, fit_type: Literal["baseline", "spectrum", "sbs", "2d"] = "baseline", ) -> pd.DataFrame: """ Return fit results as a DataFrame for programmatic access. Parameters ---------- fit_type : {'baseline', 'spectrum', 'sbs', '2d'}, default='baseline' Which fit results to return: - 'baseline': Baseline/ground-state fit (from ``fit_baseline``) - 'spectrum': Single-spectrum fit (from ``fit_spectrum``) - 'sbs': Slice-by-Slice fit (from ``fit_slice_by_slice``) - '2d': 2D global fit (from ``fit_2d``) Returns ------- pd.DataFrame For 'baseline', 'spectrum', and '2d': one row per parameter with columns ``['name', 'value', 'stderr', 'init_value', 'min', 'max', 'vary', 'expr']``. For 'sbs': one row per time slice with columns = parameter names. Raises ------ ValueError If the requested fit has not been performed yet. """ if fit_type == "baseline": if self.model_base is None or not self.model_base.result: raise ValueError("No baseline fit results. Run fit_baseline() first.") return ulmfit.par_to_df( self.model_base.result[1].params, col_type="min", par_names=self.model_base.parameter_names, ) if fit_type == "spectrum": if self.model_spec is None or not self.model_spec.result: raise ValueError("No spectrum fit results. Run fit_spectrum() first.") return ulmfit.par_to_df( self.model_spec.result[1].params, col_type="min", par_names=self.model_spec.parameter_names, ) if fit_type == "sbs": if not self.results_sbs: raise ValueError( "No Slice-by-Slice fit results. Run fit_slice_by_slice() first." ) return ulmfit.list_of_par_to_df(self.results_sbs) if fit_type == "2d": if self.model_2d is None or not self.model_2d.result: raise ValueError("No 2D fit results. Run fit_2d() first.") return ulmfit.par_to_df( self.model_2d.result[1].params, col_type="min", par_names=self.model_2d.parameter_names, ) raise ValueError( f"Unknown fit_type={fit_type!r}; " "use 'baseline', 'spectrum', 'sbs', or '2d'." )
# def compare_models( self, *models: str, fit_type: fit_io.FitType | Sequence[fit_io.FitType] | None = None, metrics: Sequence[str] | None = None, sbs_aggregation: Literal["median", "mean", "sum", "long"] = "median", ) -> pd.DataFrame: """ Compare fit-quality metrics across this file's models. Sugar for ``self.p.results.compare_models(file=self, models=...)``. Pass model names as positional arguments; omit them to include every model fit on this file. See :meth:`FitResults.compare_models` for the full kwarg semantics, the defensive ``observed_sha256`` cross-check, and the dynamic column set driven by whether a sigma was set via :meth:`File.set_sigma`. Examples -------- >>> file.set_sigma(0.23) >>> file.compare_models("modelA", "modelB", fit_type="baseline") """ return self.p.results.compare_models( file=self, models=list(models) if models else None, fit_type=fit_type, metrics=metrics, sbs_aggregation=sbs_aggregation, )