"""
Model/Component/Parameter (MCP) system for spectroscopy fitting.
This module implements the hierarchical model construction system that is the
heart of trspecfit. It provides classes for building complex spectral models
from reusable components with flexible parameter management.
Core Classes
------------
Model : Container for spectral components and parameters
Represents a complete 1D or 2D spectral model built from components.
Handles parameter management, model evaluation, and plotting.
Component : Individual spectral, temporal, or profile function
Wraps functions from trspecfit.functions (energy/time/profile) into
model-ready objects with parameter management and axis handling.
Par : Parameter with optional time-dependence and profile variation
Extends lmfit.Parameter to support time-varying parameters through
the Dynamics system, profile-varying parameters through the Profile
system, and handles parameter expressions/constraints.
Dynamics : Model subclass for time-dependent behavior
Special Model type that describes how parameters evolve over time,
with support for multi-cycle dynamics and convolution kernels.
Profile : Model subclass for auxiliary-axis parameter variation
Special Model type that describes how a parameter varies over an
auxiliary physical axis (e.g. depth, position, fluence). Used for
inhomogeneous averaging over the auxiliary dimension.
Architecture
------------
The MCP system uses composition to build models from bottom-up::
Par (values + time-dependence + profile variation)
↓
Component (function + parameters)
↓
Model (components + combination rules)
↓
Spectrum (1D or 2D evaluated model)
Key Features
------------
- Hierarchical model construction from reusable components
- Automatic parameter naming and numbering for multi-component models
- Time-dependent parameters via Dynamics models
- Profile-varying parameters via Profile models (auxiliary-axis averaging)
- Expression-based parameter constraints and relationships
- Support for convolution with instrumental response functions
- Multi-cycle dynamics with subcycle support
- Automatic component combination (addition, convolution, backgrounds)
"""
import importlib
import inspect
import re
import types
from collections.abc import Callable
from typing import Any, Literal, cast
import lmfit
import numpy as np
# asteval is used for expressions referencing time-dependent parameters
from asteval import Interpreter
from IPython.display import display
# function configurations
from trspecfit.config.functions import (
background_functions,
convolution_functions,
energy_functions,
time_functions,
)
# plot configuration
from trspecfit.config.plot import PlotConfig
# function library for energy, time, and profile components
from trspecfit.functions import energy as fcts_energy
from trspecfit.functions import profile as fcts_profile
from trspecfit.functions import time as fcts_time
from trspecfit.utils import arrays as uarr
from trspecfit.utils import lmfit as ulmfit
from trspecfit.utils import parsing as uparsing
from trspecfit.utils import plot as uplt
#
#
[docs]
class Model:
"""
Define a 2D time- and energy-resolved fit model using lmfit.
Model is the top-level container for spectral fitting. It manages a
collection of Component objects and their parameters, handles model
evaluation, and provides methods for fitting and visualization.
Parameters
----------
model_name : str, default='test'
Name identifier for this model (used in file I/O and plotting)
Attributes
----------
name : str
Model identifier
yaml_f_name : str or None
Name of YAML file this model was loaded from (without extension)
peak_fcts : list
Function objects for all components (extracted from Component.fct)
components : list of Component
Component objects that define this model's behavior
lmfit_par_list : list of lmfit.Parameter
Flattened list of all individual parameters (spectral + temporal + profile)
lmfit_pars : lmfit.Parameters
Complete parameter object for fitting (from lmfit_par_list)
parameter_names : list of str
Names of all parameters in the model
component_spectra : list of ndarray
Individual component spectra from last evaluation (when store_1d=1)
value_1d : ndarray or None
1D spectrum (sum of all components) from last evaluation
value_2d : ndarray or None
2D spectrum (time × energy) from last evaluation
const : tuple or None
Constants for residual function (x, data, package, function_str, ...)
args : tuple or None
Arguments for fit function (model, dim)
result : list
Fit results from fit_wrapper [par_ini, par_fin, conf_ci, emcee_fin, emcee_ci]
parent_file : File or None
Parent File object (set when model is loaded)
dim : int or None
Dimensionality (1 for energy/time only, 2 for time+energy)
energy : ndarray or None
Energy axis for spectral components
time : ndarray or None
Time axis for temporal dynamics
aux_axis : ndarray or None
Auxiliary physical axis (e.g. depth, position) for Profile models
Notes
-----
**Component Combination:**
Components are combined in reverse order (last to first) via the
Model.combine() static method, which handles:
- Addition (regular peaks)
- Convolution (with kernels)
- Background addition (requires existing spectrum)
**Parameter Management:**
All parameters are stored in a flat lmfit.Parameters object for fitting.
Time-dependent parameters add additional parameters from their Dynamics
models to this flat structure. Profile-varying parameters add parameters
from their Profile models similarly.
**Inheritance:**
Model attributes (energy, time, aux_axis, parent_file) are inherited by:
- Components (need axes for evaluation)
- Parameters (need time axis for dynamics)
- Dynamics models (need time axis and parent reference)
- Profile models (need aux_axis and parent reference)
See Also
--------
Component : Individual spectral/temporal/profile components
Par : Parameters with optional time-dependence and profile variation
Dynamics : Model subclass for time-dependent parameters
Profile : Model subclass for auxiliary-axis parameter variation
File.load_model : Load models from YAML definitions
"""
#
def __init__(self, model_name: str = "test") -> None:
self.name: str = model_name
# file name of yaml file containing model details
self.yaml_f_name: str | None = None
# functions of spectral components of fit
self.peak_fcts: list[Callable] = []
# list of objects of type defined in Component class
self.components: list[Component] = []
# flattened lmfit parameters list (1D with time- and energy-components)
self.lmfit_par_list: list[lmfit.Parameter] = [] # (individual objects)
# lmfit.Parameters object corresponding to lmfit_par_list attribute
self.lmfit_pars: lmfit.Parameters = lmfit.Parameters()
# list of all parameter names
self.parameter_names: list[str] = []
# list of component spectra (from last evaluation/ current parameters)
self.component_spectra: list[np.ndarray] = []
# 1D spectrum (i.e. sum/ combination of all components)
self.value_1d: np.ndarray | None = None
# 2D spectrum (i.e. 1D spectra one per time step)
self.value_2d: np.ndarray | None = None
# fit parameters and results
self.const: tuple | None = None
self.args: tuple | None = None
self.result: list = []
# ATTRIBUTES THAT SHOULD BE INHERITED FROM A PARENT ENTITY WHEN LOADING MODEL
self.parent_file: Any | None = None # parent reference
# self.data = None # (currently) not necessary
self.dim: int | None = None
self.energy: np.ndarray | None = (
None # necessarry or should just point to file?
)
self.time: np.ndarray | None = None # necessarry or should just point to file?
self.aux_axis: np.ndarray | None = None # auxiliary physical axis (e.g. depth)
#
def __repr__(self) -> str:
n_comp = len(self.components)
n_par = len(self.parameter_names)
dim = self.dim or "?"
cls = type(self).__name__
return f"{cls}('{self.name}', {n_comp} comp, {n_par} pars, dim={dim})"
#
def __getstate__(self) -> dict[str, Any]:
"""Pickle protocol: strip back-refs and transient state.
Pickled Models are for short-lived process-boundary transfer
(multiprocessing, ``copy.deepcopy``), **not** for persistence.
``parent_file`` (and, on Dynamics/Profile subclasses,
``parent_model``) is nulled to keep pickles bounded in size —
otherwise the whole ``Project`` graph would be dragged in.
Transient fit state (``const``, ``args``) is stripped because it
is meant to be re-populated by the caller before the next fit.
"""
state = self.__dict__.copy()
for key in ("parent_file", "parent_model", "const", "args"):
if key in state:
state[key] = None
return state
#
def __setstate__(self, state: dict[str, Any]) -> None:
"""Pickle protocol: restore intra-Model ``parent_model`` back-refs.
``parent_file`` stays ``None`` (caller must re-attach if needed —
see ``__getstate__``). ``parent_model`` is different: the back-ref
is internal to this Model graph, since this Model owns its
Components, their Pars, and any Dynamics/Profile sub-Models
attached to those Pars. Rewire from ``self`` so expression-on-Par
evaluation paths that traverse ``Par.get_all_parameters`` keep
working post-unpickle. Sub-Models (Dynamics/Profile) inherit this
method, so their internal back-refs are restored by the same
recursion.
"""
self.__dict__.update(state)
for comp in self.components:
comp.parent_model = self
for par in comp.pars:
par.parent_model = self
if par.t_model is not None:
par.t_model.parent_model = self
if par.p_model is not None:
par.p_model.parent_model = self
@property
def plot_config(self) -> PlotConfig:
"""
Get plot configuration from parent File.
Models inherit plot settings from their parent File, ensuring
consistent plotting across all models for the same dataset.
Returns
-------
PlotConfig
Configuration object with plot settings (axes, colors, DPI, etc.)
"""
if hasattr(self, "parent_file") and self.parent_file is not None:
return cast("PlotConfig", self.parent_file.plot_config)
# Fallback to defaults if no parent
return PlotConfig()
#
[docs]
def describe(self, detail: int = 0) -> None:
"""
Display information about model structure and parameters.
Parameters
----------
detail : int, default=0
Level of detail to display:
- 0: Component list and parameters only
- 1+: Also plot initial guess and (for 2D) data comparison
"""
# minimum model description
print("model name: " + self.name)
if self.components:
for comp in self.components:
comp.describe(detail=detail - 1)
else:
print("no elements in this model")
print("all lmfit.Parameters() [flattened and sorted alphabetically]:")
if self.lmfit_pars:
self.lmfit_pars.pretty_print()
else:
print("lmfit.Parameters() object is empty")
print()
# plot initial guess of model
if detail >= 1:
if isinstance(self, Dynamics):
self.create_value_1d(store_1d=1)
self.plot_1d()
else: # energy-resolved model
if self.dim == 1:
self.create_value_1d(store_1d=1)
self.plot_1d()
elif self.dim == 2:
self.create_value_2d()
self.plot_2d()
#
[docs]
def visualize(
self,
*,
rendering: Literal["graphviz", "string"] = "graphviz",
collapse_profiles: bool = True,
) -> str | None:
"""Display the model's dependency graph.
Builds a ``GraphIR`` from this model and renders it as a DAG.
Parameters
----------
rendering : {'graphviz', 'string'}, default='graphviz'
How to render the graph:
- ``'graphviz'``: Render inline SVG via the ``graphviz`` Python
package (install with ``pip install "trspecfit[lab]"`` or
``pip install graphviz``; also requires the ``dot`` system
binary). Falls back to ``'string'`` with a warning if the
package is not installed.
- ``'string'``: Print the raw DOT source and return it.
collapse_profiles : bool, default=True
When True, per-sample profile nodes are collapsed into single
representative nodes showing the sample count, keeping profile
models readable.
Returns
-------
str or None
The DOT source string when ``rendering='string'``, otherwise
``None`` (the graph is displayed inline).
"""
from trspecfit.graph_ir import build_graph
graph = build_graph(self)
dot_source = graph.to_dot(collapse_profiles=collapse_profiles)
if rendering == "graphviz":
try:
gv = cast(Any, importlib.import_module("graphviz"))
except ImportError:
import warnings
warnings.warn(
"graphviz Python package not installed; falling back"
" to DOT string output.\n"
'Install with: pip install "trspecfit[lab]"'
" or pip install graphviz",
stacklevel=2,
)
print(dot_source)
return dot_source
try:
source = gv.Source(dot_source)
display(source)
except gv.ExecutableNotFound:
import warnings
warnings.warn(
"Graphviz 'dot' executable not found on PATH;"
" falling back to DOT string output.\n"
"The Python graphviz package is installed, but the"
" Graphviz system binary is missing.\n"
"Install it with your package manager, e.g.:\n"
" Linux: sudo apt-get install graphviz\n"
" macOS: brew install graphviz\n"
" Windows: choco install graphviz",
stacklevel=2,
)
print(dot_source)
return dot_source
return None
# rendering == "string"
print(dot_source)
return dot_source
#
[docs]
def add_components(self, comps_list: list["Component"]) -> None:
"""
Add components to model and initialize their parameters.
This is the primary method for building up a model. It takes a list
of Component objects, assigns them appropriate names/prefixes, creates
their parameters, and updates the model's parameter structure.
Parameters
----------
comps_list : list of Component
Components to add to this model. Components should already have
their parameter dictionaries populated via Component.add_pars().
Notes
-----
**Component Naming:**
Components are expected to have their names already set from YAML
parsing (e.g., GLP_01, GLP_02, Offset, Shirley). The numbering is
handled during YAML parsing, not here.
**Parameter Naming:**
Parameter names are constructed as: prefix + component_name + '_' + param_name
- For Dynamics models: prefix = model.name (e.g., ``'GLP_01_x0_'``)
- For regular models: prefix = ``''`` (e.g., ``'GLP_01_A'``)
**Component Preparation:**
Each component receives:
- energy/time axes from model
- parent_model reference (for finding other parameters)
- subcycle time axis (for multi-cycle Dynamics)
- kernel time axis (for convolution components)
**Model Updates:**
After adding components, the model's lmfit_pars and parameter_names
are automatically updated via self.update().
"""
# add list to components attribute
self.components = comps_list
# prefix is model name for Dynamics and Profile, empty string for general models
prefix = self.name if isinstance(self, (Dynamics, Profile)) else ""
# assemble parameter list for all components
# [components can be energy or time functions]
for comp in self.components:
# add current component function to list of all functions
self.peak_fcts.append(comp.fct)
# Share axis references (not copies) with components.
# Contract: parent must not rebind these after propagation.
# Exception: conv components get a kernel time axis below.
comp.energy = self.energy
comp.time = self.time
comp.aux_axis = self.aux_axis
# set parent model reference
comp.parent_model = self
# subcycle time axis [updated when using Dynamics.set_frequency]
if isinstance(self, Dynamics):
if self.time is None:
raise ValueError("Model time axis required for Dynamics components")
comp.time_n_sub = np.ones(len(self.time)) # initialize all active
# if comp should be convoluted it will be defined on a t_kernel axis
if comp.comp_type == "conv":
comp.time = Component.create_t_kernel(comp)
# populate pars attribute in the component
comp.create_pars(prefix=prefix)
# update model lmfit_par_list (+parameter_names) and components
self.update()
#
[docs]
def find_par_by_name(self, par_name: str) -> tuple[int | None, int | None]:
"""
Find the component and parameter indices for a given parameter name.
Searches through all components and their parameters to locate the
indices needed to access a specific parameter by name (exact match).
Parameters
----------
par_name : str
Full parameter name to search for (e.g., 'GLP_01_x0', 'Offset_y0')
Returns
-------
ci : int or None
Component index in self.components, or None if not found
pi : int or None
Parameter index in component.pars, or None if not found
"""
for ci, comp in enumerate(self.components):
for pi, par in enumerate(comp.pars):
if par.name == par_name:
return ci, pi
# parameter name not found in any component of the model
return None, None
#
[docs]
def print_all_pars(self, detail: int = 0) -> None:
"""
Print information on all parameters individually.
Debugging utility to inspect parameter structure and values.
For routine parameter inspection, use model.describe() or
model.lmfit_pars.pretty_print().
Parameters
----------
detail : int, default=0
Verbosity level passed to each Par.describe() call.
"""
for c in self.components:
for p in c.pars:
p.describe(detail)
#
[docs]
def update(self) -> None:
"""
Update model from bottom up: parameters → components → model.
Recompiles all parameters from all components and recreates the
flattened lmfit parameter structures. Call this after modifying
parameter structure.
(automatically called after add_components, add_profile, add_dynamics)
"""
# re-initialize
self.lmfit_par_list = []
self.lmfit_pars = lmfit.Parameters()
self.parameter_names = []
for comp in self.components:
# create a flattened lmfit.Parameter list for the component
comp.update_lmfit_par_list()
# add lmfit.Parameter list of this component to corresponding model list
self.lmfit_par_list.extend(comp.lmfit_par_list)
# create lmfit.Parameters object from the lmfit_par_list
self.lmfit_pars.add_many(*self.lmfit_par_list)
# update list of all parameter names
self.parameter_names = [par.name for par in self.lmfit_par_list]
#
[docs]
def update_value(
self,
new_par_values: list[float] | np.ndarray,
par_select: str | list[str] = "all",
) -> None:
"""
Update model from top down: model → components → parameters.
Updates parameter values in the model's lmfit_pars based on new
values (e.g., from optimizer). Used during fitting to apply
proposed parameter values before model evaluation.
Parameters
----------
new_par_values : array-like
New parameter values to apply. Length must match number of
parameters being updated.
par_select : str or list of str, default='all'
Which parameters to update:
- 'all': Update all parameters in order
- list: Update only parameters with names in this list
Notes
-----
Called by spectra.fit_model_mcp() on every iteration during fitting
to update model parameters before evaluation.
Does not trigger model re-evaluation; call create_value_1d() or
create_value_2d() after updating values.
"""
p_count = 0 # initialize counter for parameters in par_select
for i, p in enumerate(self.lmfit_pars):
# update ALL values in model.lmfit_pars attribute
if par_select == "all":
self.lmfit_pars[p].value = new_par_values[i]
# update only selected values in model.lmfit_pars
else:
if self.lmfit_pars[p].name in par_select:
self.lmfit_pars[p].value = new_par_values[p_count]
p_count += 1
#
[docs]
def add_dynamics(self, dynamics_model: "Dynamics", frequency: float = -1) -> None:
"""
Add temporal dynamics model to a parameter.
Makes a parameter time-dependent by attaching a Dynamics model that
describes how the parameter evolves over time. The Dynamics model name
must match the parameter name exactly.
Parameters
----------
dynamics_model : Dynamics
Dynamics instance describing time evolution. The model name
must match a parameter in this model (e.g., 'GLP_01_x0').
frequency : float, default=-1
Repetition frequency for cyclic dynamics (Hz):
- -1: Single cycle over entire time axis
- >0: Dynamics repeat at this frequency
"""
# set the model instance calling this method as parent model for Dynamics
dynamics_model.parent_model = self
if frequency != -1: # set a repetition frequency
dynamics_model.set_frequency(frequency)
# find component and parameter index from Dynamics model name
ci, pi = self.find_par_by_name(dynamics_model.name)
if ci is None or pi is None:
raise ValueError(
f'Parameter "{dynamics_model.name}" not found in model '
f"{self.name}.\n"
f"Available parameters: {self.parameter_names}"
)
target_par = self.components[ci].pars[pi]
# Disallow adding dynamics to expression-linked parameters.
# Their value is already constrained by the expression.
if len(target_par.info) == 1 and isinstance(target_par.info[0], str):
raise ValueError(
f"Cannot add time dependence to expression parameter "
f'"{dynamics_model.name}" '
f"(expression: {target_par.info[0]}). "
"Add dynamics to the referenced base parameter instead."
)
# add Dynamics model and update corresponding parameter
target_par.update(dynamics_model)
# update model lmfit_par_list, parameter_names and components
self.update()
# Re-analyze all expressions since time-dependence status may have changed
self._analyze_expression_dependencies()
#
[docs]
def add_profile(self, profile_model: "Profile") -> None:
"""
Add a profile variation to a parameter over the auxiliary axis.
Makes a parameter vary over ``aux_axis`` by attaching a Profile model.
During Component.value(), the component is evaluated at every aux_axis
point with the profile value substituted for this parameter, and the
average is returned (uniform integration over the auxiliary dimension).
The Profile model name must match the target parameter name exactly,
using the same convention as Dynamics (e.g., ``'GLP_01_A'``).
Parameters
----------
profile_model : Profile
Profile instance describing the parameter variation over aux_axis.
Its name must match a parameter in this model.
Raises
------
ValueError
If the parameter is not found, is expression-linked, or if
``aux_axis`` has not been set on this model.
"""
# set this model as parent for the Profile
profile_model.parent_model = self
# find component and parameter index from Profile model name
ci, pi = self.find_par_by_name(profile_model.name)
if ci is None or pi is None:
raise ValueError(
f'Parameter "{profile_model.name}" not found in model '
f"{self.name}.\n"
f"Available parameters: {self.parameter_names}"
)
target_par = self.components[ci].pars[pi]
# Disallow adding profile to expression-linked parameters
if len(target_par.info) == 1 and isinstance(target_par.info[0], str):
raise ValueError(
f"Cannot add profile to expression parameter "
f'"{profile_model.name}" '
f"(expression: {target_par.info[0]}). "
"Add profile to the referenced base parameter instead."
)
# aux_axis must be set before adding a profile
if self.aux_axis is None:
raise ValueError(
f"Cannot add profile to model '{self.name}': aux_axis is not set. "
"Set File.aux_axis and reload the model, "
"or set model.aux_axis directly."
)
# propagate aux_axis to profile model and its components
profile_model.aux_axis = self.aux_axis
for comp in profile_model.components:
comp.aux_axis = self.aux_axis
# update target parameter
target_par.p_vary = True
target_par.p_model = profile_model
# evaluate profile to initialize value_1d
profile_model.create_value_1d()
# include profile parameters in this model's lmfit parameter list
target_par.lmfit_par_list.extend(profile_model.lmfit_par_list)
# update model lmfit_par_list, parameter_names and components
self.update()
# Re-analyze all expressions since profile status may have changed
self._analyze_expression_dependencies()
#
def _analyze_expression_dependencies(self) -> None:
"""
Analyze all parameter expressions for dynamic references.
Single pass: sets ``expr_refs_time_dep`` / ``expr_refs_profile_dep``
for expressions that *directly* reference a t_vary / p_vary parameter.
Then checks for transitive chains (expression → expression → dynamic
parameter) and raises if any are found. Pure expression chains
(no dynamics/profiles) are fine — lmfit resolves those natively.
Called automatically after add_dynamics() and add_profile().
"""
all_parameters = self.get_all_parameters()
# Single pass: set direct flags only
for par in all_parameters:
par.analyze_expression_dependencies(all_parameters)
# Detect transitive chains through dynamic parameters.
for par in all_parameters:
if not par.expr_refs:
continue
for ref_name in par.expr_refs:
ref_par = par._find_parameter_by_name(ref_name, all_parameters)
if ref_par and ref_par.expr_refs_time_dep:
raise ValueError(
f"Parameter '{par.name}' indirectly references "
f"time-varying parameter via '{ref_name}'. "
f"Transitive expression chains are not supported. "
f"Rewrite the expression to reference the "
f"t_vary parameter directly."
)
if ref_par and ref_par.expr_refs_profile_dep:
raise ValueError(
f"Parameter '{par.name}' indirectly references "
f"profile-varying parameter via '{ref_name}'. "
f"Transitive expression chains are not supported. "
f"Rewrite the expression to reference the "
f"p_vary parameter directly."
)
#
[docs]
def get_all_parameters(self) -> list["Par"]:
"""
Get all Par objects from all components in this model.
"""
all_parameters = []
for comp in self.components:
all_parameters.extend(comp.pars)
return all_parameters
#
@staticmethod
def _collect_par_levels(par: "Par", levels: dict[str, str]) -> None:
"""
Recursively collect vary levels for a Par and all nested sub-model parameters.
Handles arbitrary nesting depth (e.g. profile params that themselves
have dynamics sub-models).
"""
for lmf_par in par.lmfit_par.values():
levels[lmf_par.name] = par.vary_level
if par.t_vary and par.t_model is not None:
for sub_par in par.t_model.get_all_parameters():
Model._collect_par_levels(sub_par, levels)
if par.p_vary and par.p_model is not None:
for sub_par in par.p_model.get_all_parameters():
Model._collect_par_levels(sub_par, levels)
#
[docs]
def get_vary_levels(self) -> dict[str, str]:
"""
Get vary level for every lmfit parameter name in this model.
Walks the Par tree including Dynamics and Profile sub-model
parameters at any nesting depth, returning a mapping from lmfit
parameter name to vary level (``"project"``, ``"file"``, or
``"static"``).
Returns
-------
dict of str to str
``{lmfit_param_name: vary_level}`` for all parameters.
"""
levels: dict[str, str] = {}
for par in self.get_all_parameters():
Model._collect_par_levels(par, levels)
return levels
#
[docs]
@staticmethod
def combine(value: np.ndarray, comp: "Component", t_ind: int = 0) -> np.ndarray:
"""
Combine component value with input spectrum via addition or convolution.
This is the core method that defines how components are combined to
build up a complete spectrum. Components are processed in reverse order
(last to first) during model evaluation.
Parameters
----------
value : ndarray
Current spectrum being built up
comp : Component
Component to add/convolve with current spectrum
t_ind : int, default=0
Time index for evaluation (for time-dependent parameters)
Returns
-------
ndarray
Updated spectrum after combining with component
"""
# skip 'none' type components entirely (no-op)
if comp.comp_type == "none":
return value
# add a component to existing spectrum
if comp.comp_type == "add":
return cast("np.ndarray", value + np.asarray(comp.value(t_ind)))
# add a background component to exisiting spectrum
if comp.comp_type == "back":
return cast(
"np.ndarray", value + np.asarray(comp.value(t_ind, spectrum=value))
)
# convolute component with existing spectrum
if comp.comp_type == "conv":
if comp.package == fcts_energy:
x_axis = comp.energy
print("convolution of spectral components not defined")
elif comp.package == fcts_time:
x_axis = comp.time
else:
x_axis = None
#
if x_axis is None:
raise ValueError(
f"Convolution axis not defined for component '{comp.name}'"
)
return uarr.my_conv(x=x_axis, y=value, kernel=np.asarray(comp.value(t_ind)))
raise ValueError(f"Unknown component type: {comp.comp_type}")
#
[docs]
def create_value_1d(
self, t_ind: int = 0, *, store_1d: int = 0, return_1d: int = 0
) -> np.ndarray | None:
"""
Evaluate model to create 1D spectrum (energy or time).
Combines all components according to their types (addition, convolution,
background) to generate the complete model spectrum at a specific time
point (or for a Dynamics model, the complete time evolution).
Parameters
----------
t_ind : int, default=0
Time index for evaluation. For energy-resolved models, this selects
which time point to evaluate. For Dynamics models, affects which
parameters have time-dependence applied.
store_1d : int, default=0
If 1, store individual component spectra in self.component_spectra
for later plotting or analysis.
return_1d : int, default=0
If 1, return the computed spectrum. Otherwise return None and store
in self.value_1d only.
Returns
-------
ndarray or None
If return_1d=1, returns the 1D spectrum. Otherwise returns None.
Spectrum is always stored in self.value_1d regardless of return setting.
Notes
-----
**Component Combination Order:**
Components are combined in REVERSE order (last to first):
1. Initialize with last component
2. Combine with second-to-last, third-to-last, etc.
3. This allows backgrounds to access the sum of all peaks
**Stored Components:**
When store_1d=1, self.component_spectra contains individual contributions
in the ORIGINAL order (not reversed). This matches the order components
were defined and makes plotting intuitive.
**Performance:**
For 2D models, this function is called repeatedly (once per time point).
If components have no time-dependence, their evaluation could be cached,
but current implementation re-evaluates for simplicity and correctness
with convolution/background interactions.
"""
# re-initialize list containing individual component spectra
if store_1d == 1:
self.component_spectra = []
# initialize value_1d by evaluating last component
self.value_1d = self.components[-1].value(t_ind)
if store_1d == 1:
self.component_spectra.append(self.value_1d)
# combine the components into a spectrum/ time dynamics curve
for i in range(len(self.components) - 1):
if store_1d == 1:
current_spec = self.value_1d.copy()
#
self.value_1d = Model.combine(
self.value_1d, self.components[-(i + 2)], t_ind
)
# check on last component value added to model
if store_1d == 1:
self.component_spectra.append(self.value_1d - current_spec)
# flip component spectra list as components are combined LIFO in this function
if store_1d == 1:
self.component_spectra = self.component_spectra[::-1]
#
if return_1d == 1:
return self.value_1d
return None
#
[docs]
def create_value_2d(self, t_ind: list[int] | None = None) -> None:
"""
Evaluate model to create 2D spectrum (time × energy).
Generates the complete time- and energy-resolved spectrum by calling
create_value_1d() for each time point. This is where time-dependent
parameters dynamically modify the model at each time step.
Parameters
----------
t_ind : list of int or None, optional
Time index range to process:
- None (default): Process entire time axis
- [start, stop]: Process self.time[start:stop] only
Notes
-----
**Performance:**
Evaluation time scales linearly with:
- Number of time points (len(self.time))
- Number of energy points (len(self.energy))
- Model complexity (number of components, time-dependent parameters)
**Memory:**
Result stored in self.value_2d has shape (n_time, n_energy).
For 1000 time points × 500 energy points × 8 bytes/float:
~4 MB per model evaluation.
**Time-Dependence:**
For each time point t_i:
1. Time-dependent parameters evaluate their Dynamics at t_i
2. Model components use these parameter values
3. 1D spectrum computed and stored in value_2d[t_i, :]
"""
if self.time is None or self.energy is None:
raise ValueError("Model time and energy axes required for 2D evaluation")
time_slice = self.time if t_ind is None else self.time[t_ind[0] : t_ind[1]]
self.value_2d = np.empty((len(time_slice), len(self.energy)))
for ti, _t in enumerate(time_slice):
val = self.create_value_1d(t_ind=ti, return_1d=1)
if val is None:
raise RuntimeError("create_value_1d returned None during 2D eval")
self.value_2d[ti, :] = val
#
[docs]
def plot_1d(
self,
t_ind: int = 0,
*,
plot_sum: bool = True,
x_lim: tuple[float, float] | None = None,
y_lim: tuple[float, float] | None = None,
save_img: int = 0,
save_path: str = "",
config: PlotConfig | None = None,
**plot_kwargs,
) -> None:
"""
Plot 1D model spectrum (energy or time).
Visualizes model evaluation results, either as sum of all components
or as individual component contributions. Useful for checking initial
guesses and understanding fit results.
Parameters
----------
t_ind : int, default=0
Time index for energy-resolved models. Ignored for Dynamics models
which show time evolution.
plot_sum : bool, default=True
Component display mode:
- True: Plot only sum of all components
- False: Plot each component separately
x_lim : tuple of float, optional
X-axis display range (min, max) in axis coordinates
y_lim : tuple of float, optional
Y-axis display range (min, max)
save_img : int, default=0
Save/display control:
- 0: Display only
- 1: Display and save
- -1: Save only (no display)
save_path : str, default=''
Path for saving figure (if save_img != 0)
config : PlotConfig, optional
Override the model's inherited plot configuration for this call.
If None, uses the model's own plot_config.
**plot_kwargs : dict
Per-call overrides for any PlotConfig field (e.g. ``colors``,
``ticksize``, ``legend``). Applied on top of *config*.
"""
# Get model config for plotting
config = config or self.plot_config
# Model calling this method is a ...
# ... time-resolved model (mcp.Dynamics)
if isinstance(self, Dynamics):
x_dir = "def" # Always default for Dynamics
if self.time is None:
raise ValueError("Dynamics model time axis is not defined")
x = self.time
x_label = config.y_label # t_label from project
info = ""
# ... energy-resolved model
else:
x_dir = config.x_dir
if self.energy is None or self.time is None:
raise ValueError("Model energy/time axes are not defined")
x = self.energy
x_label = config.x_label # e_label from project
info = f"[{config.y_label}={round(self.time[t_ind], 3)} (index={t_ind})]"
# Populate component_spectra argument of the model
self.create_value_1d(t_ind, store_1d=1)
if plot_sum and self.value_1d is None:
raise RuntimeError("Model evaluation did not produce value_1d")
# Plot
plot_data = (
[cast("np.ndarray", self.value_1d)] if plot_sum else self.component_spectra
)
_call_kwargs: dict = {
"title": f'Model "{self.name}" {info}',
"x_label": x_label,
"y_label": config.z_label,
"x_dir": x_dir,
"x_lim": x_lim,
"y_lim": y_lim,
"legend": ["sum"] if plot_sum else [c.name for c in self.components],
"save_img": save_img,
"save_path": save_path,
}
_call_kwargs.update(plot_kwargs)
uplt.plot_1d(data=plot_data, x=x, config=config, **_call_kwargs)
#
[docs]
def plot_2d(
self,
save_img: int = 0,
save_path: str = "",
x_lim: tuple[float, float] | None = None,
y_lim: tuple[float, float] | None = None,
z_lim: tuple[float, float] | None = None,
config: PlotConfig | None = None,
**plot_kwargs,
) -> None:
"""
Plot 2D time-and-energy spectrum as heatmap.
Visualizes the complete 2D model evaluation showing how the spectrum
evolves over time. Essential for understanding time-dependent models
before and after fitting.
Parameters
----------
save_img : int, default=0
Save/display control:
- 0: Display only
- 1: Display and save
- -1: Save only (no display)
save_path : str, default=''
Path for saving figure (if save_img != 0)
x_lim : tuple of float, optional
Energy axis display range (min, max)
y_lim : tuple of float, optional
Time axis display range (min, max)
z_lim : tuple of float, optional
Color scale limits (min, max)
config : PlotConfig, optional
Override the model's inherited plot configuration for this call.
If None, uses the model's own plot_config.
**plot_kwargs : dict
Per-call overrides for any PlotConfig field (e.g. ``z_colormap``,
``ticksize``). Applied on top of *config*.
"""
if self.value_2d is None:
self.create_value_2d()
if self.value_2d is None or self.energy is None or self.time is None:
raise ValueError("Model value_2d, energy, and time required for plot_2d")
# Plot using the utility plot_2d
_call_kwargs: dict = {
"title": f'model "{self.name}"',
"x_lim": x_lim,
"y_lim": y_lim,
"z_lim": z_lim,
"save_img": save_img,
"save_path": save_path,
}
_call_kwargs.update(plot_kwargs)
uplt.plot_2d(
data=self.value_2d,
x=self.energy,
y=self.time,
config=config or self.plot_config,
**_call_kwargs,
)
#
#
[docs]
class Component:
"""
Individual spectral, temporal, or profile component with parameter management.
Component wraps a function from trspecfit.functions (energy, time, or
profile) into a model-ready object with parameter management, axis handling,
and integration within the Model/Component/Parameter hierarchy.
Parameters
----------
comp_name : str
Component name, possibly with numbering (e.g., 'GLP_01', 'expFun_02').
Numbering is typically assigned during YAML parsing.
package : module, optional
Python module containing the component function (fcts_energy,
fcts_time, or fcts_profile). Defaults to fcts_energy.
comp_subcycle : int, default=0
Subcycle number for multi-cycle Dynamics models:
- 0: Active for entire time axis (default)
- 1, 2, ...: Active only during specific subcycle
Attributes
----------
package : module
Module containing the function (fcts_energy, fcts_time, or fcts_profile)
comp_name : str
Full component name including any numbering
fct_str : str
Base function name without numbering (e.g., 'GLP', 'expFun')
N : int
Component number (e.g., 1, 2) or -1 if unnumbered
comp_type : str
Component type determining combination method:
- 'add': Regular addition (peaks, lineshapes)
- 'back': Background (requires existing spectrum)
- 'conv': Convolution kernel
- 'none': Placeholder (no operation)
par_dict : dict
Parameter specifications from YAML: {name: [value, vary, min, max]}
subcycle : int
Subcycle number for multi-cycle dynamics
time_n_sub : ndarray or None
Binary mask (1=active, 0=inactive) for subcycle timing
time_norm : ndarray or None
Normalized time axis (resets to 0 at each subcycle start)
pars : list of Par
Parameter objects for this component
lmfit_par_list : list of lmfit.Parameter
Flattened list of lmfit parameters
lmfit_pars : lmfit.Parameters
lmfit.Parameters object built from lmfit_par_list
time : ndarray or None
Time axis (inherited from model, or kernel axis for convolutions)
energy : ndarray or None
Energy axis (inherited from model)
aux_axis : ndarray or None
Auxiliary physical axis (inherited from model, used by Profile components)
parent_model : Model or None
Parent model reference (for parameter lookups)
Notes
-----
**Component Properties:**
The Component class provides several computed properties for accessing
function information:
- ``fct`` : callable - Function object (auto-updates if package or fct_str changes)
- ``fct_specs`` : inspect.FullArgSpec - Function signature information
- ``fct_args`` : list of str - Function argument names
- ``prefix`` : str - Prefix for parameter names ('' for exceptions,
comp_name+'_' otherwise)
- ``name`` : str - Component display name
- ``package_name`` : str - Name of the package module (e.g. 'fcts_energy')
"""
#
def __init__(
self,
comp_name: str,
package: types.ModuleType | None = None,
comp_subcycle: int = 0,
) -> None:
# package containing component (either fcts_energy or fcts_time)
if package is None:
package = fcts_energy
self.package: types.ModuleType = package
# name of the component (str)
self.comp_name: str = comp_name
# parse the component name into function string and component number
self.fct_str: str
self.num: int | None
self.fct_str, self.num = uparsing.parse_component_name(comp_name)
# determine component type: 'add', 'conv', 'back', or 'none'
if self.fct_str in background_functions():
self.comp_type: str = "back"
elif self.fct_str == "none": # placeholder function (see src/functions/time.py)
self.comp_type = "none"
elif "CONV" in self.fct_str or self.fct_str.endswith("CONV"):
self.comp_type = "conv"
else:
self.comp_type = "add"
# dict of par_name: par_info from yaml file passed by user
self.par_dict: dict[str, list] = {}
# (for self.package=fcts_time) which subcycle is this component part of
self.subcycle: int = (
comp_subcycle # see "t-dynamics.normalize_time" for details
)
self.time_n_sub: np.ndarray | None = (
None # (0/)1 where component is (in/)active
)
self.time_norm: np.ndarray | None = None # restarts at zero for every subcycle
# list of Par objects needed to construct component
self.pars: list[Par] = [] # used to create component value during fit
# flattened list of all lmfit parameters defining this component
self.lmfit_par_list: list[lmfit.Parameter] = []
self.lmfit_pars: lmfit.Parameters = lmfit.Parameters() # for describe() method
# time, energy, and aux axes are inherited from model
self.time: np.ndarray | None = None
self.energy: np.ndarray | None = None
self.aux_axis: np.ndarray | None = None
# parent model reference
self.parent_model: Model | None = None
#
def __repr__(self) -> str:
n = len(self.pars)
return f"Component('{self.comp_name}', type='{self.comp_type}', {n} pars)"
#
def __getstate__(self) -> dict[str, Any]:
"""Pickle protocol: strip module and back-refs.
``package`` is a live module reference (e.g.
``trspecfit.functions.energy``), which is not picklable. Replace
it with the dotted module name; ``__setstate__`` restores the
module via ``importlib.import_module``. ``parent_model`` is
nulled — pickled Components are detached from their parent Model.
"""
state = self.__dict__.copy()
state["package"] = self.package.__name__
state["parent_model"] = None
return state
#
def __setstate__(self, state: dict[str, Any]) -> None:
"""Pickle protocol: restore the ``package`` module from its name."""
package_name = state.pop("package")
self.__dict__.update(state)
self.package = importlib.import_module(package_name)
# [automatic] create self.fct attribute that will update if either
# self.package or self.fct_str changes [attribute is read only]
@property
def fct(self) -> Callable:
"""
Function object for this component.
Automatically retrieves function from package using fct_str.
Updates dynamically if package or fct_str changes.
Returns
-------
callable
Function object (e.g., fcts_energy.GLP, fcts_time.expFun)
"""
return cast("Callable", getattr(self.package, self.fct_str))
# [automatic] do the same for function argument specs
@property
def fct_specs(self) -> inspect.FullArgSpec:
"""
Function signature specifications.
Returns
-------
inspect.FullArgSpec
Complete function signature information
"""
return inspect.getfullargspec(self.fct)
# [automatic] and function arguments specifically
@property
def fct_args(self) -> list[str]:
"""
Function argument names.
Returns
-------
list of str
Argument names from function signature
"""
return self.fct_specs.args
# [automatic] create a prefix for parameters of this component
@property
def prefix(self) -> str:
"""
Prefix for parameter names.
Returns
-------
str
Parameter prefix:
comp_name + '_': For regular components
"""
# component number handled by self.num
return self.comp_name + "_"
# [automatic] create a name for this component
@property
def name(self) -> str:
"""
Component display name.
Returns
-------
str
Component name (same as comp_name)
"""
# use the stored component name (which includes numbering if applicable)
return self.comp_name
# [automatic] get name of package this component belongs to
@property
def package_name(self) -> str:
"""
Name of the package this component belongs to.
Returns
-------
str
Package name (e.g., 'fcts_energy', 'fcts_time')
"""
return getattr(self.package, "__name__", str(self.package))
#
def _add_prefix_to_expression(self, expr: str, prefix: str) -> str:
"""
Add prefix to time function parameter references in Dynamics models.
For Dynamics models, parameters of time functions need the Dynamics model
name as a prefix, while energy function parameters reference the parent
energy model directly without prefix.
Parameters
----------
expr : str
Expression string from YAML (e.g., "expFun_01_tau + GLP_01_A")
prefix : str
Dynamics model name to prepend (e.g., "GLP_01_x0")
Returns
-------
str
Expression with time function parameters prefixed
(e.g., "GLP_01_x0_expFun_01_tau + GLP_01_A")
Examples
--------
For Dynamics model "GLP_01_x0":
- "expFun_01_tau" -> "GLP_01_x0_expFun_01_tau" (time function)
- "GLP_01_A" -> "GLP_01_A" (energy function, unchanged)
- "expFun_01_tau * 0.5 + GLP_01_A" -> "GLP_01_x0_expFun_01_tau * 0.5 + GLP_01_A"
"""
# Get function names from both libraries
time_funcs = time_functions()
energy_funcs = energy_functions()
conv_funcs = convolution_functions()
# Pattern to match parameter references: function_name_NN_param_name
# Captures: (function_name)(_NN_param_name)
pattern = r"\b([A-Za-z_][A-Za-z0-9_]*?)(_\d{2,}_[A-Za-z_][A-Za-z0-9_]*)\b"
def _replace_with_prefix(match: re.Match[str]) -> str:
func_name = match.group(1) # e.g., "expFun" or "GLP"
rest = match.group(2) # e.g., "_01_tau"
full_match = match.group(0) # e.g., "expFun_01_tau"
# Defensive: Check if already prefixed
if full_match.startswith(prefix + "_"):
return full_match
# Time functions need prefix (they're in this Dynamics model)
if func_name in time_funcs or func_name in conv_funcs:
return f"{prefix}{func_name}{rest}"
# Energy functions don't need prefix (they reference parent energy model)
if func_name in energy_funcs:
return full_match
# Unknown function - leave unchanged and let lmfit error naturally
return full_match
return re.sub(pattern, _replace_with_prefix, expr)
#
[docs]
def add_pars(self, par_info_dict: dict[str, list]) -> None:
"""
Add parameter specifications to component.
Stores parameter information that will be used to create Par objects
when create_pars() is called. Typically populated from YAML model
definitions.
Parameters
----------
par_info_dict : dict
Parameter specifications: {name: [value, vary, min, max]} for
constrained or {name: [value], vary} for unconstrained parameters,
or {name: ['expression']} for dependent parameters
Notes
-----
Does not create the actual Par objects yet - that happens in
create_pars(). This separation allows parameters to be defined
before axes are known.
"""
self.par_dict = par_info_dict
#
[docs]
def create_pars(self, prefix: str = "") -> None:
"""
Create Par objects from parameter dictionary.
Populates self.pars with Par objects for each entry in self.par_dict.
Uses two-pass approach to handle expression parameters that may
reference parameters defined later in the component.
Parameters
----------
prefix : str, default=''
Prefix to prepend to parameter names (e.g., for Dynamics models)
Notes
-----
**Two-Pass Creation:**
1. First pass: Create all Par objects with values/bounds
2. Second pass: Set expressions (so forward references work)
This ensures expressions like 'GLP_02_A' work even when GLP_02
is defined after GLP_01 in the component list.
"""
lst = [] # initialize pars list
if len(prefix) > 0:
prefix += "_"
# First pass: create all Par objects, but do not set expressions
expr_params: list[tuple[Par, str]] = []
for p_name, p_info in self.par_dict.items():
temp = Par(name=prefix + self.prefix + p_name)
temp.info = p_info # see Par class for details
# Extract vary level from info before lmfit creation
if len(p_info) >= 2 and not isinstance(p_info[0], str):
temp.vary_level = ulmfit.vary_to_level(p_info[1])
# Set parent model reference
temp.parent_model = self.parent_model
# If this is an expression, skip setting it for now
if len(p_info) == 1 and isinstance(p_info[0], str):
expr = p_info[0]
# If this component is part of a Dynamics model,
# auto-prefix time function parameter references
if isinstance(self.parent_model, Dynamics) and prefix:
expr = self._add_prefix_to_expression(expr, prefix)
expr_params.append((temp, expr))
# Temporarily set a dummy value (needed for lmfit creation)
temp.create(expr_skip=True)
else:
temp.create()
lst.append(temp)
self.pars = lst
# Second pass: set expressions now that all parameters exist
for temp, expr in expr_params:
# Set the expression on the lmfit parameter
par_name = temp.name
try:
temp.lmfit_par[par_name].set(expr=expr)
except Exception as e: # noqa: BLE001
raise ValueError(
f"Failed to set expression '{expr}' for parameter '{par_name}': {e}"
) from e
#
[docs]
def update_lmfit_par_list(self) -> None:
"""
Update flattened list of lmfit parameters and lmfit.Parameters object.
Collects all lmfit.Parameter objects from all Par objects in this
component and stores them in a flat list. This includes both spectral
and temporal parameters (if any Par has time-dependence or a profile).
Notes
-----
Called automatically by Model.update(). Users typically don't need
to call this directly.
The flattened list is used by Model to construct the complete
lmfit.Parameters object for fitting.
"""
# re-initialize the list and lmfit.Parameters object
self.lmfit_par_list = []
self.lmfit_pars = lmfit.Parameters()
# go through all pars of this component ...
for p in self.pars:
# ... and add their list of all lmfit.Parameter objects
self.lmfit_par_list.extend(p.lmfit_par_list)
# update lmfit.Parameters object from the lmfit_par_list
self.lmfit_pars.add_many(*self.lmfit_par_list)
#
[docs]
def describe(self, detail: int = 1) -> None:
"""
Print component information.
Parameters
----------
detail : int, default=1
Detail level:
- 0: Function name only
- 1+: Function name, type, subcycle, and parameters
"""
# print info on function
print(f"function: {self.fct_str} from {self.package_name}")
# detailed description
if detail >= 1:
# addition or convolution?
if self.comp_type == "add":
comp_type_str = "added to other components"
elif self.comp_type == "conv":
comp_type_str = "convoluted with other components"
elif self.comp_type == "back":
comp_type_str = "added as background to other components"
elif self.comp_type == "none":
comp_type_str = "skipped (no operation)"
# subcycle info
if self.subcycle == 0:
subcycle_str = "for all times t"
else:
subcycle_str = f"within subcycle {self.subcycle}"
# print info
print(f"function will be {comp_type_str} [{subcycle_str}]\n")
print("all lmfit.Parameters() [flattened and sorted alphabetically]:")
if self.lmfit_pars:
self.lmfit_pars.pretty_print()
else:
print("lmfit.Parameters() object is empty")
print()
#
[docs]
def create_t_kernel(self) -> np.ndarray:
"""
Create time axis for convolution kernel.
Convolution kernels need a time axis that extends beyond the data
time axis to properly handle edge effects. This method creates an
appropriately sized kernel axis based on the kernel width.
Returns
-------
ndarray
Kernel time axis, centered at 0 and extending ±(width * kernel_parameter)
"""
# get kernel parameters i.e. component parameters
par_k = cast("list[Any]", ulmfit.par_extract(self.par_dict, return_type="list"))
# define kernel time axis. Kernel-width helpers may inspect the
# full parameter list for multi-parameter kernels such as Voigt.
kernel_width = getattr(fcts_time, self.fct_str + "_kernel_width")(*par_k)
t_range = par_k[0] * kernel_width
if self.time is None or len(self.time) < 2:
raise ValueError(f"time axis of component {self.fct_str} not defined")
t_step = self.time[1] - self.time[0]
return np.arange(-t_range, t_range + t_step, t_step)
#
[docs]
def value(self, t_ind: int = 0, **kwargs) -> np.ndarray:
"""
Evaluate component at specific time point.
Computes the component's contribution to the spectrum using current
parameter values. Handles time-dependent parameters, subcycle masking,
and passes appropriate axes.
Parameters
----------
t_ind : int, default=0
Time index for evaluation (affects time-dependent parameters)
**kwargs : dict
Additional arguments passed to function (e.g., spectrum for backgrounds)
Returns
-------
ndarray
Component value as function of energy or time
Notes
-----
**Parameter Evaluation:**
Each Par object is evaluated at t_ind, which:
- Returns current value for time-independent parameters
- Returns value(t_ind) for time-dependent parameters (via Dynamics)
**Subcycle Handling:**
For multi-cycle Dynamics models (subcycle != 0):
- Uses time_norm instead of time (resets to 0 each subcycle)
- Multiplies result by time_n_sub mask (1=active, 0=inactive)
**Background Functions:**
Background functions receive the 'spectrum' kwarg containing the
current peak sum, which they use to compute backgrounds like Shirley.
"""
# get component parameters as list
pars = []
for p in self.pars:
pars.append(p.value(t_ind, update_t_model=t_ind == 0))
# get x axis and create component function evaluation
if self.package == fcts_energy:
if self.energy is None:
raise ValueError(
f"Energy axis not defined for component '{self.comp_name}'"
)
# Profile averaging: if any parameter has p_vary or references
# a profiled parameter via expression, loop over aux_axis
p_vary_pars = [p for p in self.pars if p.p_vary or p.expr_refs_profile_dep]
if p_vary_pars:
traces = self._value_profile_instances(t_ind=t_ind, **kwargs)
avg: np.ndarray = np.sum(np.asarray(traces), axis=0) / len(traces)
return avg
return np.asarray(self.fct(self.energy, *pars, **kwargs))
if self.package == fcts_profile:
if self.aux_axis is None:
raise ValueError(
f"Auxiliary axis not defined for profile "
f"component '{self.comp_name}'"
)
return np.asarray(self.fct(self.aux_axis, *pars, **kwargs))
if self.package == fcts_time:
if self.time is None:
raise ValueError(
f"Time axis not defined for component '{self.comp_name}'"
)
if self.subcycle == 0: # single cycle
return np.asarray(self.fct(self.time, *pars, **kwargs))
# multi-cycle
# multpliy value with 1 where subcycle applies, 0 otherwise
# use normalized time instead of standard time for sub!=0]
if self.time_norm is None or self.time_n_sub is None:
raise ValueError(
f"Subcycle axes not defined for component '{self.comp_name}'"
)
return np.asarray(
self.fct(self.time_norm, *pars, **kwargs) * self.time_n_sub
)
raise ValueError(
f"Unsupported function package for component '{self.comp_name}'"
)
#
def _value_profile_instances(self, t_ind: int = 0, **kwargs) -> list[np.ndarray]:
"""
Evaluate one energy trace per aux-axis point for profile-varying parameters.
Parameters
----------
t_ind : int, default=0
Time index for evaluation
**kwargs : dict
Additional arguments passed to component function
Returns
-------
list of ndarray
One component trace per aux-axis index
"""
if self.energy is None:
raise ValueError(
f"Energy axis not defined for component '{self.comp_name}'"
)
p_vary_pars = [p for p in self.pars if p.p_vary]
profile_expr_pars = [p for p in self.pars if p.expr_refs_profile_dep]
if not p_vary_pars and not profile_expr_pars:
return [np.asarray(self.value(t_ind, **kwargs))]
# Get aux_axis length: prefer direct p_vary par, fall back to
# component's aux_axis (set for expr_refs_profile_dep components)
if p_vary_pars:
p_model_ref = p_vary_pars[0].p_model
if p_model_ref is None or p_model_ref.aux_axis is None:
raise ValueError(
f"Profile model not initialized for component "
f"'{self.comp_name}'. "
"Call model.add_profile() before evaluating."
)
n_aux = len(p_model_ref.aux_axis)
else:
if self.aux_axis is None:
raise ValueError(
f"aux_axis not set for component '{self.comp_name}'. "
"Cannot evaluate profile-dependent expressions."
)
n_aux = len(self.aux_axis)
values: list[np.ndarray] = []
for i in range(n_aux):
pars_i: list[Any] = []
for p in self.pars:
if p.p_vary and p.p_model is not None:
if i == 0:
# update profile once per time step (handles t_vary profiles)
p.p_model.create_value_1d(t_ind=t_ind)
if p.p_model.value_1d is None:
raise ValueError(f"Profile value_1d is None for par '{p.name}'")
base = cast("list[Any]", ulmfit.par_extract(p.lmfit_par))
pars_i.append(base[0] + p.p_model.value_1d[i])
else:
pars_i.append(
p.value(
t_ind,
update_t_model=(t_ind == 0 and i == 0),
aux_ind=i if p.expr_refs_profile_dep else None,
)
)
values.append(np.asarray(self.fct(self.energy, *pars_i, **kwargs)))
return values
#
[docs]
def plot(
self,
t_ind: int = 0,
*,
plot_traces: bool = True,
plot_every: int = 1,
plot_max: int | None = None,
save_img: int = 0,
config: PlotConfig | None = None,
plot_kwargs: dict | None = None,
**kwargs,
) -> None:
"""
Plot component as standalone spectrum/dynamics.
Quick visualization of individual component behavior, useful for
debugging component definitions and understanding parameter effects.
Parameters
----------
t_ind : int, default=0
Time index for evaluation
plot_traces : bool, default=True
For components with profile-varying parameter(s) (p_vary=True):
- True: plot one trace per aux-axis point
- False: plot single combined trace (average over aux-axis traces)
For all other components, a single trace is plotted.
plot_every : int, default=1
When plotting individual aux-axis traces (plot_traces=True), show
every N-th curve (N=1 means show all curves).
plot_max : int or None, default=None
Optional hard cap on number of individual aux-axis curves to plot.
First plot_max traces are shown (spaced according to plot_every).
save_img : int, default=0
0: display, 1: save+display, -1: save only, -2: close (no display/save)
config : PlotConfig, optional
Override the inherited plot configuration for this call.
If None, falls back to the parent model's plot_config.
plot_kwargs : dict, optional
Per-call overrides for any PlotConfig field (e.g. ``colors``,
``ticksize``). Applied on top of *config*.
**kwargs : dict
Additional arguments passed to component function.
Background components require ``spectrum`` to be provided.
"""
if plot_every < 1:
raise ValueError("plot_every must be >= 1")
if plot_max is not None and plot_max < 1:
raise ValueError("plot_max must be >= 1 when provided")
if self.fct_str in background_functions() and kwargs.get("spectrum") is None:
raise ValueError(
f"Background component '{self.comp_name}' requires keyword argument "
"'spectrum' for plotting. Call "
"component.plot(..., spectrum=<peak_sum_array>)."
)
# get x axis, label, and plot config
if config is None and self.parent_model is not None:
config = self.parent_model.plot_config
if self.package == fcts_energy:
x_axis = self.energy
x_name = config.x_label if config else "Energy"
elif self.package == fcts_time:
x_axis = self.time
x_name = config.y_label if config else "Time"
elif self.package == fcts_profile:
x_axis = self.aux_axis
x_name = "Auxiliary axis"
else:
x_axis = None
x_name = "x"
# For energy components with profile-varying parameters, optionally
# show all aux-axis contributions instead of only the combined value.
profile_dep = any(p.p_vary or p.expr_refs_profile_dep for p in self.pars)
if self.package == fcts_energy and profile_dep:
traces = self._value_profile_instances(t_ind=t_ind, **kwargs)
if plot_traces:
if x_axis is None:
raise ValueError(
f"Energy axis not defined for component '{self.comp_name}'"
)
p_vary_pars = [p for p in self.pars if p.p_vary]
if p_vary_pars and p_vary_pars[0].p_model is not None:
aux_axis = p_vary_pars[0].p_model.aux_axis
else:
aux_axis = self.aux_axis
if aux_axis is None:
raise ValueError(
f"Auxiliary axis not defined for component '{self.comp_name}'"
)
# Select curves to display (stride + optional hard cap).
indices = list(range(0, len(traces), plot_every))
if indices[-1] != len(traces) - 1:
indices.append(len(traces) - 1)
if plot_max is not None and len(indices) > plot_max:
indices = indices[:plot_max]
legend = [f"aux[{i}]={aux_axis[i]:g}" for i in indices]
plot_data = [traces[i] for i in indices]
else:
legend = ["avg(aux-axis instances)"]
plot_data = [np.mean(np.asarray(traces), axis=0)]
else:
plot_data = [self.value(t_ind, **kwargs)]
legend = [self.comp_name]
# Energy components inherit x_dir from config (e.g. reversed eV axis);
# time and profile components always use default (forward) direction.
x_dir = "def"
if self.package == fcts_energy and config is not None:
x_dir = config.x_dir
#
_call_kwargs: dict = {
"title": f"function: {self.fct_str} from {self.package_name}",
"x_label": x_name,
"x_dir": x_dir,
"y_label": "Amplitude",
"legend": legend,
"save_img": save_img,
}
_call_kwargs.update(plot_kwargs or {})
uplt.plot_1d(data=plot_data, config=config, x=x_axis, **_call_kwargs)
#
#
[docs]
class Par:
"""
Parameter with optional time-dependence, profile variation, and expression support.
Par extends lmfit.Parameter to support:
- Time-varying parameters via Dynamics models
- Profile-varying parameters via Profile models (auxiliary-axis averaging)
- Expression-based constraints referencing other parameters
- Tracking of time-dependent expression references
Parameters
----------
name : str
Parameter name (typically prefixed with component name)
info : list, default=[]
Parameter specification:
- [value, vary, min, max]: Standard parameter
- ['expression']: Expression-based parameter
Attributes
----------
name : str
Parameter name
info : list
Parameter specification from initialization
lmfit_par : lmfit.Parameters
lmfit Parameters object (contains 1+ parameters)
t_vary : bool
Whether parameter has time-dependence (via Dynamics)
t_model : Dynamics or None
Dynamics model describing time evolution (if t_vary=True)
p_vary : bool
Whether parameter varies over the auxiliary axis (via Profile)
p_model : Profile or None
Profile model describing variation over aux_axis (if p_vary=True)
lmfit_par_list : list
Flattened list of all lmfit parameters (spectral + temporal + profile)
expr_refs_time_dep : bool
Whether expression references time-dependent parameters
expr_string : str or None
Original expression string (for expression parameters)
expr_refs : list of str
Parameter names referenced in expression
parent_model : Model or None
Parent model reference (for finding other parameters)
Notes
-----
**Time-Dependence:**
When t_vary=True, the parameter value at time t is::
value(t) = base_value + dynamics_model.value_1d[t]
**Profile Variation:**
When p_vary=True, Component.value() evaluates the component at every
aux_axis point with the profile value added to the base parameter value,
then returns the uniform average (integration over auxiliary dimension).
**Expression Handling:**
Expressions are evaluated using asteval (same as lmfit) for safety.
Can reference other parameters, including time-dependent ones.
**Expression + Dynamics:**
A parameter can have an expression that references a time-dependent
parameter. In this case, expr_refs_time_dep=True and the expression
is re-evaluated at each time point during model evaluation.
**Parameter Flattening:**
lmfit_par_list contains all parameters defining this Par:
- Without time/profile-dependence: 1 parameter (the spectral one)
- With time-dependence: N parameters (spectral + all from Dynamics model)
- With profile variation: M parameters (spectral + all from Profile model)
- Both: N + M - 1 parameters (spectral counted once)
"""
#
def __init__(self, name: str, info: list[Any] | None = None) -> None:
self.name = name
self.info: list[Any] = [] if info is None else list(info)
self.vary_level: str = "static" # "project", "file", or "static"
self.lmfit_par: lmfit.Parameters = lmfit.Parameters()
self.t_vary: bool = False
self.t_model: Dynamics | None = None # set by add_dynamics()
self.p_vary: bool = False
self.p_model: Profile | None = None # set by add_profile()
self.lmfit_par_list: list[lmfit.Parameter] = []
# Expression analysis attributes
self.expr_refs_time_dep: bool = False # flag for time-dependent references
self.expr_refs_profile_dep: bool = False # flag for profile-dependent refs
self.expr_string: str | None = None # store original expression
self.expr_refs: list[str] = [] # list of referenced parameter names
self.parent_model: Model | None = None # reference to parent model
#
def __repr__(self) -> str:
flags = []
if self.t_vary:
flags.append("t_vary")
if self.p_vary:
flags.append("p_vary")
if self.expr_string:
flags.append(f"expr='{self.expr_string}'")
extra = f" [{', '.join(flags)}]" if flags else ""
return f"Par('{self.name}'{extra})"
#
def __getstate__(self) -> dict[str, Any]:
"""Pickle protocol: null the ``parent_model`` back-reference.
Pickled ``Par`` instances are detached from their parent Model;
anything that relied on ``parent_model`` (e.g. expression
resolution that needs the full parameter set) must re-attach
after unpickling.
"""
state = self.__dict__.copy()
state["parent_model"] = None
return state
#
[docs]
def describe(self, detail: int = 0) -> None:
"""
Print parameter information.
Parameters
----------
detail : int, default=0
Detail level for display (passed to t_model.describe if applicable)
"""
print(
f"par name: {self.name} [value: {self.value()}]"
" and its lmfit_par attribute:"
)
if isinstance(self.lmfit_par, lmfit.Parameters):
self.lmfit_par.pretty_print()
else:
print("[this is not an lmfit.Parameter instance]")
display(self.lmfit_par)
#
if not self.t_vary:
print("parameter has no time dependence")
elif self.t_vary and self.t_model is not None:
print(
f"parameter has time-dependence described by model {self.t_model.name}"
)
if detail == 1 and self.t_model is not None:
self.t_model.describe()
if not self.p_vary:
print("parameter has no profile (aux_axis) dependence")
else:
p_name = self.p_model.name if self.p_model is not None else "?"
print(f"parameter has profile described by model {p_name}")
if detail == 1 and self.p_model is not None:
self.p_model.describe()
print()
#
[docs]
def create(
self,
*,
prefix: str = "",
suffix: str = "",
expr_skip: bool = False,
) -> None:
"""
Create lmfit parameter from info specification.
Initializes the lmfit.Parameters object with the parameter defined
by self.info. Handles both standard parameters and expression-based
parameters.
Parameters
----------
prefix : str, default=''
Prefix to prepend to parameter name
suffix : str, default=''
Suffix to append to parameter name
expr_skip : bool, default=False
If True and info is expression, create with dummy value first
(expression set later in two-pass creation)
Notes
-----
For expression parameters, expr_skip=True creates a temporary
parameter with dummy value. The actual expression is set in a
second pass (see Component.create_pars) to handle forward references.
"""
# create standard lmfit parameter (spectral component)
if expr_skip and len(self.info) == 1 and isinstance(self.info[0], str):
# if skipping expression, use a dummy value for now
lmfit_par = ulmfit.par_create(
self.name, [1, True, -np.inf, np.inf], prefix, suffix
)
else:
lmfit_par = ulmfit.par_create(self.name, self.info, prefix, suffix)
# add to lmfit_par attribute
self.lmfit_par.add_many(lmfit_par)
# and list of individual lmfit paramters
self.lmfit_par_list.extend([lmfit_par])
#
[docs]
def update(self, t_model: "Dynamics") -> None:
"""
Add time-dependence to parameter via Dynamics model.
Converts a static parameter into a time-dependent one by attaching
a Dynamics model that describes temporal evolution.
Parameters
----------
t_model : Dynamics
Dynamics model describing time evolution
"""
# update t_vary (default = False)
self.t_vary = True
# update t_model attribute
self.t_model = t_model
# evaluate t_model to update/create model.value_1d
self.t_model.create_value_1d()
# add t_model pars to list of individual lmfit parameters
self.lmfit_par_list.extend(self.t_model.lmfit_par_list)
#
[docs]
def value(
self,
t_ind: int = 0,
*,
update_t_model: bool = True,
aux_ind: int | None = None,
) -> float:
"""
Get parameter value at specific time point and aux-axis index.
Returns the parameter value, accounting for time-dependence,
profile-dependence, and expressions that reference either.
Parameters
----------
t_ind : int, default=0
Time index for evaluation
update_t_model : bool, default=True
If True, recompute Dynamics model before evaluation.
Set False when calling repeatedly during 2D model evaluation.
aux_ind : int or None, default=None
Auxiliary axis index for profile evaluation. When set,
p_vary parameters return base + profile.value_1d[aux_ind],
and expressions referencing p_vary parameters are
re-evaluated with the profiled values.
Returns
-------
float
Parameter value at time point t_ind
"""
if not self.t_vary:
if self.expr_refs_time_dep or (
self.expr_refs_profile_dep and aux_ind is not None
):
all_parameters = self.get_all_parameters()
return self._evaluate_dynamic_expression(
t_ind,
all_parameters,
update_t_model=update_t_model,
aux_ind=aux_ind,
)
# Profile-varying parameter with aux_ind
if self.p_vary and aux_ind is not None and self.p_model is not None:
# Ensure profile is fresh for this t_ind (no-op if already
# evaluated via the owning component, cheap cache check).
self.p_model.create_value_1d(t_ind=t_ind)
base = cast("list[float]", ulmfit.par_extract(self.lmfit_par))
if self.p_model.value_1d is None:
raise RuntimeError(
f'Profile model "{self.p_model.name}" has no value_1d'
)
return float(base[0] + self.p_model.value_1d[aux_ind])
# Standard lmfit evaluation
value = cast("list[float]", ulmfit.par_extract(self.lmfit_par))[0]
elif self.t_vary and self.t_model is not None:
if update_t_model:
# update t_model, specifically self.t_model.value_1d
self.t_model.create_value_1d()
base = cast("list[float]", ulmfit.par_extract(self.lmfit_par))
if self.t_model.value_1d is None:
raise RuntimeError(
f'Dynamics model "{self.t_model.name}" has no value_1d'
)
value = float(base[0] + self.t_model.value_1d[t_ind])
else:
value = -1.0
print(f't_vary attribute of Par "{self.name}" is not valid')
return value
#
[docs]
def analyze_expression_dependencies(self, all_parameters: list["Par"]) -> None:
"""
Analyze expression for time-dependent parameter references.
Checks if this parameter's expression references any time-dependent
parameters. If so, sets expr_refs_time_dep flag so value() can
handle dynamic expression evaluation.
Called automatically after adding time-dependence to any parameter.
Parameters
----------
all_parameters : list of Par
All parameters in parent model (to check time-dependence)
"""
if len(self.info) == 1 and isinstance(self.info[0], str):
self.expr_string = self.info[0]
self.expr_refs = uparsing.extract_expression_parameters(self.expr_string)
# Reset before re-analysis (handles removed dependencies)
self.expr_refs_time_dep = False
self.expr_refs_profile_dep = False
# Check direct dependencies only (chains detected at model level)
for ref_name in self.expr_refs:
ref_par = self._find_parameter_by_name(ref_name, all_parameters)
if ref_par and ref_par.t_vary:
self.expr_refs_time_dep = True
if ref_par and ref_par.p_vary:
self.expr_refs_profile_dep = True
#
def _find_parameter_by_name(
self, par_name: str, all_parameters: list["Par"]
) -> "Par | None":
"""
Find parameter by name in list.
Parameters
----------
par_name : str
Parameter name to find
all_parameters : list of Par
List to search
Returns
-------
Par or None
Found parameter or None if not found
"""
# Search through all parameters
for par in all_parameters:
if par.name == par_name:
return par
return None
#
def _evaluate_dynamic_expression(
self,
t_ind: int,
all_parameters: list["Par"],
*,
update_t_model: bool = True,
aux_ind: int | None = None,
) -> float:
"""
Evaluate expression with time/profile-dependent parameter values.
For expressions that reference time-dependent or profile-varying
parameters, this evaluates the expression using the current values
of all referenced parameters at the specified time point and
aux-axis index.
Parameters
----------
t_ind : int
Time index for evaluation
all_parameters : list of Par
All parameters in model (to get current values)
update_t_model : bool, default=True
Whether to update Dynamics models before evaluation
aux_ind : int or None, default=None
Auxiliary axis index for profile evaluation
"""
namespace = {}
for ref_name in self.expr_refs:
ref_par = self._find_parameter_by_name(ref_name, all_parameters)
if ref_par:
namespace[ref_name] = ref_par.value(
t_ind, update_t_model=update_t_model, aux_ind=aux_ind
)
# Evaluate expression using asteval (safe, same as lmfit uses)
try:
aeval = Interpreter()
# populate the interpreter symbol table with current parameter values
# (Interpreter.__call__ doesn't accept a namespace argument)
for k, v in namespace.items():
aeval.symtable[k] = v
expr = cast("str", self.expr_string)
result = aeval(expr)
# HOTFIX: asteval may record errors and return None without raising.
if aeval.error:
msg = "; ".join(err.get_error()[1] for err in aeval.error)
raise ValueError(
f"Asteval error while evaluating expression "
f"'{self.expr_string}': {msg}"
)
if result is None:
raise ValueError(f"Expression '{self.expr_string}' evaluated to None")
return float(cast("float", result))
except Exception as e:
raise ValueError(
f"Error evaluating expression '{self.expr_string}': {e}"
) from e
#
[docs]
def get_all_parameters(self) -> list["Par"]:
"""
Get all parameters from parent model.
Returns
-------
list of Par
All parameters in parent model
"""
if hasattr(self, "parent_model") and self.parent_model:
return self.parent_model.get_all_parameters()
return []
#
#
[docs]
class Dynamics(Model):
"""
Time-dependence model for parameters with multi-cycle support.
Dynamics is a specialized Model that describes how a parameter evolves
over time. It uses temporal functions (from trspecfit.functions.time) to
model dynamics like exponential decays, rises, oscillations, and can
include convolution with instrumental response functions.
The name of a Dynamics model must match the parameter it describes
(e.g., 'GLP_01_x0' for the x0 parameter of the GLP_01 component).
Parameters
----------
model_name : str
Name matching the parameter to control (e.g., 'GLP_01_x0')
Attributes
----------
All Model attributes, plus:
frequency : float
Repetition frequency for cyclic dynamics (Hz):
- -1: Single cycle over entire time axis (default)
- >0: Dynamics repeat at this frequency
subcycles : int
Number of subcycles within each main cycle:
- 0: No subcycles (single dynamics for entire cycle)
- N>0: N different dynamics that activate sequentially
time_norm : ndarray or None
Normalized time that resets to 0 at start of each subcycle
n_sub : ndarray or None
Subcycle number active at each time point (1, 2, ..., subcycles)
n_counter : ndarray or None
Cumulative subcycle counter (increments each subcycle)
Notes
-----
**Single Cycle (frequency=-1):**
Dynamics apply once over the entire time axis. Appropriate for:
- Single pump-probe experiments
- Irreversible reactions
- One-time perturbations
**Multi-Cycle (frequency>0):**
Dynamics repeat periodically. Appropriate for:
- Lock-in detection experiments
- Periodic excitation (lasers, electrical pulses)
- Steady-state oscillations
**Subcycles:**
Within each main cycle, different dynamics can activate sequentially.
Example: pump-probe-pump experiments where:
- Subcycle 1: First pump response
- Subcycle 2: Second pump response
Components are assigned to subcycles via comp_subcycle parameter:
- subcycle=0: Active for all times (e.g., IRF convolution)
- subcycle=1,2,...: Active only during that subcycle
**Time Normalization:**
For multi-cycle dynamics:
- time_norm resets to 0 at each subcycle start
- n_sub tracks which subcycle is active (1, 2, 3, ...)
- n_counter cumulative count of subcycles
**Evaluation:**
The dynamics model evaluates to value_1d, which is added to the base
parameter value: param_total(t) = param_base + dynamics.value_1d[t]
"""
#
def __init__(self, model_name: str) -> None:
super().__init__(model_name)
# repetition frequency of time-dependent model behaviour
self.frequency: float = -1
# number of subcycles (within time = 1/frequency)
# "number of models -1" (model_info) in File.load_model()]
self.subcycles: int = 0
# "normalized time" attributes (all have same length as time axis)
self.time_norm: np.ndarray | None = None # restarts at 0 for every subcycle
self.n_sub: np.ndarray | None = None # active subcycle at time step (t_i)
self.n_counter: np.ndarray | None = None # cummulative counter of subcycles
self.parent_model: Model | None = None
#
[docs]
def set_frequency(self, frequency: float) -> None:
"""
Set repetition frequency and update time normalization.
Configures the Dynamics for cyclic behavior and updates all components
with proper subcycle timing information.
Parameters
----------
frequency : float
Repetition frequency in Hz:
- -1: No repetition (single cycle)
- >0: Repeat at this frequency
Raises
------
ValueError
If frequency > 0 and subcycles == 0 (single dynamics model).
Multi-cycle requires at least 2 entries in model_info.
Notes
-----
After setting frequency:
- time_norm, n_sub, n_counter are computed via normalize_time()
- Each component receives time_n_sub mask (1=active, 0=inactive)
- Components with subcycle>0 use time_norm instead of time
**model_info length and subcycle assignment:**
The number of entries in model_info determines the subcycle structure.
The first entry is always the global component (subcycle=0), which
evaluates on the raw time axis regardless of frequency. The remaining
entries are repeating subcycles.
- 1 entry: single dynamics, no frequency support (subcycles=0)
- 2 entries: e.g. ``["none", "MonoExp"]`` or ``["IRF", "MonoExp"]``.
The first model applies globally (IRF convolution or no-op
placeholder), the second repeats at the set frequency as a single
full-period cycle (subcycles=1).
- 3+ entries: e.g. ``["IRF", "ModelA", "ModelB"]``. First model is
global, the rest are subcycles that alternate within each period
(subcycles=len-1).
**Component Updates:**
For each component:
- time_n_sub mask applied (zeros where subcycle doesn't match)
- Normalized time axis inherited (if subcycle != 0)
"""
if frequency > 0 and self.subcycles == 0:
raise ValueError(
"Cannot set frequency on a single dynamics model (subcycles=0). "
"Multi-cycle requires at least 2 entries in model_info: the "
"first is the global component (e.g. IRF or 'none'), the rest "
"are repeating subcycles."
)
self.frequency = frequency
self.normalize_time() # update the normalization of the time axis
if self.n_sub is None:
raise RuntimeError("n_sub not initialized; call normalize_time() first")
# update components accordingly
for comp in self.components:
# <time_n_sub> is 0/1 where subcomponent is in-/active
# reset to all-active before applying mask (idempotent on re-call)
comp.time_n_sub = np.ones(len(self.n_sub))
comp.time_n_sub[self.n_sub != comp.subcycle] = 0
# inherit normalized time from Dynamics model
if comp.subcycle != 0:
comp.time_norm = self.time_norm
# no updating needed on the parameter level as time irrelevant
# parameter level uses index to refer to par.t_model=Dynamics
#
[docs]
def normalize_time(self, time_unit: int = 0, *, show_plot: bool = False) -> None:
"""
Normalize time axis for multi-cycle dynamics with subcycles.
Creates normalized time arrays that reset periodically based on
frequency and subcycle count. This enables complex multi-cycle
dynamics like pump-probe-pump experiments.
Parameters
----------
time_unit : int, default=0
Power of 10 for time units (currently unused)
show_plot : bool, default=False
If True, plot normalized time arrays
Examples
--------
>>> t_model = Dynamics('param')
>>> t_model.time = np.array([0, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3])
>>> t_model.frequency = 10 # 10 Hz = 0.1 s period
>>> t_model.subcycles = 2 # Two subcycles per period
>>> t_model.normalize_time()
>>>
>>> print(t_model.time_norm) # Resets every 0.05 s (half period)
[0, 0.05, 0, 0.05, 0, 0.05, 0]
>>> print(t_model.n_sub) # Which subcycle (1 or 2)
[1, 1, 2, 2, 1, 1, 2]
>>> print(t_model.n_counter) # Cumulative count
[1, 1, 2, 2, 3, 3, 4]
Notes
-----
**Normalization Logic:**
- Subcycle duration = 1 / (frequency * subcycles)
- time_norm resets to 0 at start of each subcycle
- n_sub cycles through 1, 2, ..., subcycles
- n_counter increments by 1 each subcycle
**Negative Times:**
Times t < 0 are assigned:
- time_norm = 0
- n_sub = 0 (baseline/pre-trigger)
- n_counter = 0
**No Repetition (frequency=-1):**
- time_norm = time (unchanged)
- n_sub = 0 (all zeros)
- n_counter = 0 (all zeros)
**Validation:**
Raises ValueError for:
- frequency < 0 and != -1
Note: subcycles=0 with frequency > 0 is rejected upstream in
set_frequency().
- subcycles > 1 with frequency = -1 (inconsistent)
Note: subcycles=1 is rejected at model load time (File.load_model).
"""
if self.time is None:
raise ValueError("Dynamics.time axis must be defined")
if self.frequency < 0 and self.frequency != -1:
raise ValueError(
f'Frequency (f) must be >0 (or "-1" for no repetition). '
f"Got: {self.frequency}"
)
# No repetition within data/time window
if self.frequency == -1:
if self.subcycles > 1:
raise ValueError(
"Cannot use subcycles (N > 1) without a positive frequency. "
f"Got subcycles={self.subcycles} with frequency=-1"
)
self.time_norm = np.asarray(self.time)
self.n_sub = np.zeros(len(self.time))
self.n_counter = np.zeros(len(self.time))
# Frequency >0 is passed
else:
# Compute repetition/normalization number
norm = 10 ** (-time_unit) / self.frequency / self.subcycles
t = np.asarray(self.time)
n_temp = np.floor(t / norm).astype(int)
mask = t >= 0 # Subcycles start at t=0
self.time_norm = np.where(mask, t - n_temp * norm, 0.0)
self.n_sub = np.where(mask, np.floor(n_temp % self.subcycles) + 1, 0.0)
self.n_counter = np.where(mask, n_temp + 1, 0.0)
if show_plot:
legends = ["normalized time", "subcycle counter", "cummulative counter"]
uplt.plot_1d(
data=[self.time_norm, self.n_sub, self.n_counter],
x=self.time,
x_label=f"Time (1E{time_unit}s)",
y_type="log",
legend=legends,
)
#
#
[docs]
class Profile(Model):
"""
Profile model: parameter variation over an auxiliary physical axis.
A Profile model describes how one or more spectral parameters vary across
an auxiliary axis (e.g. depth, position, fluence). When a parameter has
``p_vary=True``, Component.value() evaluates the component at every point
along aux_axis and returns the uniform average — physically representing
integration over the auxiliary dimension.
Analogous to ``Dynamics`` (which varies parameters over the time axis),
but simpler: no frequency or subcycles.
Parameters
----------
model_name : str
Name of this Profile model. Must match the name of the parameter in
the parent model that it describes (same convention as Dynamics).
Attributes
----------
parent_model : Model or None
Reference to the parent energy/2D model. Set by Model.add_profile().
aux_axis : ndarray or None
The auxiliary axis this profile is evaluated over. Inherited from
the parent model when add_profile() is called.
Notes
-----
Profile model components use functions from ``fcts_profile`` and are
evaluated with ``aux_axis`` as their x-axis.
Multiple p_vary parameters on the same component share the same aux_axis
and are evaluated jointly at each aux point (correlated variation).
Profile model parameters can themselves be time-dependent (``t_vary=True``),
enabling spectral diffusion: inhomogeneous broadening that evolves in time.
"""
def __init__(self, model_name: str) -> None:
super().__init__(model_name)
self.parent_model: Model | None = None