"""Define Parameter which holds the value."""
# Copyright (c) 2023 zfit
from __future__ import annotations
import abc
import collections
import copy
import functools
import warnings
from collections.abc import Iterable, Callable
from contextlib import suppress
from inspect import signature
from typing import Optional, Dict, Mapping, Union
from weakref import WeakValueDictionary
import dill as dill
import numpy as np
import pydantic
import tensorflow as tf
import tensorflow_probability as tfp
# TF backwards compatibility
from ordered_set import OrderedSet
from pydantic import Field, validator
from tensorflow.python.framework import ops
from tensorflow.python.ops import array_ops
from tensorflow.python.ops.resource_variable_ops import (
ResourceVariable as TFVariable,
VariableSpec,
)
from tensorflow.python.ops.variables import Variable
from tensorflow.python.types.core import Tensor as TensorType
from typing import Literal
from .serialmixin import SerializableMixin
from .. import z
from ..serialization.paramrepr import make_param_constructor
from ..serialization.serializer import BaseRepr, Serializer
znp = z.numpy
import zfit.z.numpy as znp
from .. import z
from ..core.baseobject import BaseNumeric, extract_filter_params
from ..minimizers.interface import ZfitResult
from ..settings import run, ztypes
from ..util import ztyping
from ..util.cache import invalidate_graph
from ..util.checks import NotSpecified
from ..util.container import convert_to_container
from ..util.deprecation import deprecated, deprecated_args
from ..util.exception import (
BreakingAPIChangeError,
FunctionNotImplemented,
IllegalInGraphModeError,
LogicalUndefinedOperationError,
NameAlreadyTakenError,
ParameterNotIndependentError,
)
from ..util.temporary import TemporarilySet
from . import interfaces as zinterfaces
from .dependents import _extract_dependencies
from .interfaces import ZfitIndependentParameter, ZfitModel, ZfitParameter
# todo add type hints in this module for api
class MetaBaseParameter(
type(tf.Variable), type(zinterfaces.ZfitParameter)
): # resolve metaclasses
pass
def register_tensor_conversion(
convertable, name=None, overload_operators=True, priority=10
): # higher than any tf conversion
def _dense_var_to_tensor(var, dtype=None, name=None, as_ref=False):
return var._dense_var_to_tensor(dtype=dtype, name=name, as_ref=as_ref)
tf.register_tensor_conversion_function(
convertable, _dense_var_to_tensor, priority=priority
)
if name:
pass
# _pywrap_utils.RegisterType(name, convertable)
if overload_operators:
convertable._OverloadAllOperators()
class OverloadableMixin(ZfitParameter):
# Conversion to tensor.
@staticmethod
def _TensorConversionFunction(
v, dtype=None, name=None, as_ref=False
): # pylint: disable=invalid-name
"""Utility function for converting a Variable to a Tensor."""
_ = name
if dtype and not dtype.is_compatible_with(v.dtype):
raise ValueError(
"Incompatible type conversion requested to type '%s' for variable "
"of type '%s'" % (dtype.name, v.dtype.name)
)
if as_ref:
return v._ref() # pylint: disable=protected-access
else:
return v.value()
def _dense_var_to_tensor(self, dtype=None, name=None, as_ref=False):
del name
if dtype and not dtype.is_compatible_with(self.dtype):
raise ValueError(
"Incompatible type conversion requested to type '%s' for variable "
"of type '%s'" % (dtype.name, self.dtype.name)
)
if as_ref:
if hasattr(self, "_ref"):
return self._ref()
else:
raise RuntimeError("Why is this needed?")
else:
return self.value()
def _AsTensor(self):
return self.value()
@classmethod
def _OverloadAllOperators(cls): # pylint: disable=invalid-name
"""Register overloads for all operators."""
for operator in tf.Tensor.OVERLOADABLE_OPERATORS:
cls._OverloadOperator(operator)
# For slicing, bind getitem differently than a tensor (use SliceHelperVar
# instead)
# pylint: disable=protected-access
setattr(cls, "__getitem__", array_ops._SliceHelperVar)
@classmethod
def _OverloadOperator(cls, operator): # pylint: disable=invalid-name
"""Defer an operator overload to ``ops.Tensor``.
We pull the operator out of ops.Tensor dynamically to avoid ordering issues.
Args:
operator: string. The operator name.
"""
# We can't use the overload mechanism on __eq__ & __ne__ since __eq__ is
# called when adding a variable to sets. As a result we call a.value() which
# causes infinite recursion when operating within a GradientTape
# TODO(gjn): Consider removing this
if operator == "__eq__" or operator == "__ne__":
return
tensor_oper = getattr(tf.Tensor, operator)
def _run_op(a, *args, **kwargs):
# pylint: disable=protected-access
return tensor_oper(a.value(), *args, **kwargs)
functools.update_wrapper(_run_op, tensor_oper)
setattr(cls, operator, _run_op)
register_tensor_conversion(OverloadableMixin, overload_operators=True)
class WrappedVariable(metaclass=MetaBaseParameter):
def __init__(self, initial_value, constraint, *args, **kwargs):
super().__init__(*args, **kwargs)
self.variable = tf.Variable(
initial_value=initial_value,
constraint=constraint,
name=self.name,
dtype=self.dtype,
)
@property
@abc.abstractmethod
def name(self):
raise NotImplementedError
@property
def constraint(self):
return self.variable.constraint
@property
def dtype(self):
return self.variable.dtype
def value(self):
return self.variable.value()
def read_valu(self):
return self.variable.read_value()
@property
def shape(self):
return self.variable.shape
def numpy(self):
return self.variable.numpy()
def assign(self, value, use_locking=False, name=None, read_value=True):
return self.variable.assign(
value=value, use_locking=use_locking, name=name, read_value=read_value
)
def _dense_var_to_tensor(self, dtype=None, name=None, as_ref=False):
del name
if dtype is not None and dtype != self.dtype:
return NotImplemented
if as_ref:
return self.variable.read_value().op.inputs[0]
else:
return self.variable.value()
def _AsTensor(self):
return self.variable.value()
@staticmethod
def _OverloadAllOperators(): # pylint: disable=invalid-name
"""Register overloads for all operators."""
for operator in tf.Tensor.OVERLOADABLE_OPERATORS:
WrappedVariable._OverloadOperator(operator)
# For slicing, bind getitem differently than a tensor (use SliceHelperVar
# instead)
# pylint: disable=protected-access
setattr(WrappedVariable, "__getitem__", array_ops._SliceHelperVar)
@staticmethod
def _OverloadOperator(operator): # pylint: disable=invalid-name
"""Defer an operator overload to ``ops.Tensor``.
We pull the operator out of ops.Tensor dynamically to avoid ordering issues.
Args:
operator: string. The operator name.
"""
tensor_oper = getattr(tf.Tensor, operator)
def _run_op(a, *args):
# pylint: disable=protected-access
value = a._AsTensor()
return tensor_oper(value, *args)
# Propagate __doc__ to wrapper
try:
_run_op.__doc__ = tensor_oper.__doc__
except AttributeError:
pass
setattr(WrappedVariable, operator, _run_op)
register_tensor_conversion(WrappedVariable, "WrappedVariable", overload_operators=True)
class BaseParameter(Variable, ZfitParameter, TensorType, metaclass=MetaBaseParameter):
def __init__(self, *args, **kwargs):
try:
super().__init__(*args, **kwargs)
except NotImplementedError:
tmp_val = kwargs.pop(
"name", None
) # remove if name is in there, needs to be passed through
if args or kwargs:
kwargs["name"] = tmp_val
raise RuntimeError(
f"The following arguments reached the top of the inheritance tree, the super "
f"init is not implemented (most probably abstract tf.Variable): {args, kwargs}. "
f"If you see this error, please post it as an bug at: "
f"https://github.com/zfit/zfit/issues/new/choose"
)
def __len__(self):
return 1
class ZfitParameterMixin(BaseNumeric):
_existing_params = WeakValueDictionary()
def __init__(self, name, **kwargs):
if name in self._existing_params:
raise NameAlreadyTakenError(
"Another parameter is already named {}. "
"Use a different, unique one.".format(name)
)
self._existing_params.update({name: self})
self._name = name
super().__init__(name=name, **kwargs)
# property needed here to overwrite the name of tf.Variable
@property
def name(self) -> str:
return self._name
def __del__(self):
with suppress(
AttributeError, NotImplementedError
): # if super does not have a __del__
super().__del__(self)
def __add__(self, other):
if isinstance(other, (ZfitModel, ZfitParameter)):
from . import operations
with suppress(FunctionNotImplemented):
return operations.add(self, other)
return super().__add__(other)
def __radd__(self, other):
if isinstance(other, (ZfitModel, ZfitParameter)):
from . import operations
with suppress(FunctionNotImplemented):
return operations.add(other, self)
return super().__radd__(other)
def __mul__(self, other):
if isinstance(other, (ZfitModel, ZfitParameter)):
from . import operations
with suppress(FunctionNotImplemented):
return operations.multiply(self, other)
return super().__mul__(other)
def __rmul__(self, other):
if isinstance(other, (ZfitModel, ZfitParameter)):
from . import operations
with suppress(FunctionNotImplemented):
return operations.multiply(other, self)
return super().__rmul__(other)
def __eq__(self, other):
return id(self) == id(other)
def __hash__(self):
if not hasattr(self, "_cached_hash"):
self._cached_hash = hash(id(self))
hash_value = self._cached_hash
return hash_value
class TFBaseVariable(TFVariable, metaclass=MetaBaseParameter):
# class TFBaseVariable(WrappedVariable, metaclass=MetaBaseParameter):
# Needed, otherwise tf variable complains about the name not having a ':' in there
@property
def _shared_name(self):
return self.name
from weakref import WeakSet
[docs]
class Parameter(
ZfitParameterMixin,
TFBaseVariable,
BaseParameter,
SerializableMixin,
ZfitIndependentParameter,
):
"""Class for fit parameters, derived from TF Variable class."""
_independent = True
_independent_params = WeakSet()
DEFAULT_STEP_SIZE = 0.001
@deprecated_args(None, "Use `lower` instead.", "lower_limit")
@deprecated_args(None, "Use `upper` instead.", "upper_limit")
def __init__(
self,
name: str,
value: ztyping.NumericalScalarType,
lower: ztyping.NumericalScalarType | None = None,
upper: ztyping.NumericalScalarType | None = None,
step_size: ztyping.NumericalScalarType | None = None,
floating: bool = True,
dtype: tf.DType = ztypes.float,
# legacy
lower_limit: ztyping.NumericalScalarType | None = None,
upper_limit: ztyping.NumericalScalarType | None = None,
):
"""
Args:
name : name of the parameter
value : starting value
lower : lower limit
upper : upper limit
step_size : step size
"""
self._independent_params.add(self)
# legacy start
if lower_limit is not None:
lower = lower_limit
if upper_limit is not None:
upper = upper_limit
# legacy end
# TODO: sanitize input for TF2
self._lower_limit_neg_inf = None
self._upper_limit_neg_inf = None
if lower is None:
self._lower_limit_neg_inf = tf.cast(-np.infty, dtype)
if upper is None:
self._upper_limit_neg_inf = tf.cast(np.infty, dtype)
value = tf.cast(value, dtype=ztypes.float)
def constraint(x):
return tfp.math.clip_by_value_preserve_gradient(
x, clip_value_min=self.lower, clip_value_max=self.upper
)
super().__init__(
initial_value=value,
dtype=dtype,
name=name,
constraint=constraint,
params={},
)
self.lower = lower
self.upper = upper
self.floating = floating
self.step_size = step_size
self.set_value(value) # to check that it is in the limits
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
cls._independent = True # overwriting independent only for subclass/instance
@classmethod
def _from_name(cls, name):
for param in cls._independent_params:
if name == param.name:
return param
raise ValueError(f"Parameter {name} does not exist, please create it first.")
@property
def lower(self):
limit = self._lower
if limit is None:
limit = self._lower_limit_neg_inf
return limit
@lower.setter
@invalidate_graph
def lower(self, value):
if value is None and self._lower_limit_neg_inf is None:
self._lower_limit_neg_inf = tf.cast(-np.infty, dtype=ztypes.float)
elif value is not None:
value = tf.cast(value, dtype=ztypes.float)
self._lower = value
@property
def upper(self):
limit = self._upper
if limit is None:
limit = self._upper_limit_neg_inf
return limit
@upper.setter
@invalidate_graph
def upper(self, value):
if value is None and self._upper_limit_neg_inf is None:
self._upper_limit_neg_inf = tf.cast(np.infty, dtype=ztypes.float)
elif value is not None:
value = tf.cast(value, dtype=ztypes.float)
self._upper = value
@property
def has_limits(self) -> bool:
"""If the parameter has limits set or not."""
no_limits = self._lower is None and self._upper is None
return not no_limits
@property
def at_limit(self) -> tf.Tensor:
"""If the value is at the limit (or over it).
Returns:
Boolean ``tf.Tensor`` that tells whether the value is at the limits.
"""
return self._check_at_limit(self.value())
def _check_at_limit(self, value, exact=False):
"""The precision is up to 1e-5 relative or 1e-8 absolute if exact is None.
Args:
value ():
exact ():
Returns:
"""
if not self.has_limits:
return tf.constant(False)
# Adding a slight tolerance to make sure we're not tricked by numerics due to floating point comparison
diff = znp.abs(self.upper - self.lower) # catch if it is minus inf
if not exact:
reltol = 0.005
abstol = 1e-5
else:
reltol = 1e-5
abstol = 1e-7
tol = znp.minimum(diff * reltol, abstol) # if one limit is inf we would get inf
if not exact: # if exact, we wanna allow to set it slightly over the limit.
tol = -tol # If not, we wanna make sure it's inside
at_lower = z.unstable.less_equal(value, self.lower - tol)
at_upper = z.unstable.greater_equal(value, self.upper + tol)
return z.unstable.logical_or(at_lower, at_upper)
def value(self):
value = super().value()
if self.has_limits:
value = self.constraint(value)
return value
[docs]
@deprecated(None, "Use `value` instead.")
def read_value(self):
value = super().read_value()
if self.has_limits:
value = self.constraint(value)
return value
@property
def floating(self):
if self._floating and (hasattr(self, "trainable") and not self.trainable):
raise RuntimeError(
"Floating is set to true but tf Variable is not trainable."
)
return self._floating
@floating.setter
def floating(self, value):
if not isinstance(value, bool):
raise TypeError("floating has to be a boolean.")
self._floating = value
def _get_dependencies(self):
return {self}
@property
def independent(self):
return self._independent
@property
def has_step_size(self):
return self._step_size is not None
@property
def step_size(self) -> tf.Tensor: # TODO: improve default step_size?
"""Step size of the parameter, the estimated order of magnitude of the uncertainty.
This can be crucial to tune for the minimization. A too large ``step_size`` can produce NaNs, a too small won't
converge.
If the step size is not set, the ``DEFAULT_STEP_SIZE`` is used.
Returns:
The step size
"""
step_size = self._step_size
if step_size is None:
# # auto-infer from limits
# step_splits = 1e5
# if self.has_limits:
# step_size = (self.upper_limit - self.lower_limit) / step_splits # TODO improve? can be tensor?
# else:
# step_size = self.DEFAULT_STEP_SIZE
# if np.isnan(step_size):
# if self.lower_limit == -np.infty or self.upper_limit == np.infty:
# step_size = self.DEFAULT_STEP_SIZE
# else:
# raise ValueError("Could not set step size. Is NaN.")
# # step_size = z.to_real(step_size)
# self.step_size = step_size
step_size = self.DEFAULT_STEP_SIZE
# step_size = z.convert_to_tensor(step_size)
return step_size
@step_size.setter
def step_size(self, value):
if value is not None:
value = float(value)
# value = z.convert_to_tensor(value, preferred_dtype=ztypes.float)
# value = tf.cast(value, dtype=ztypes.float)
self._step_size = value
[docs]
def set_value(self, value: ztyping.NumericalScalarType):
"""Set the :py:class:`~zfit.Parameter` to `value` (temporarily if used in a context manager).
This operation won't, compared to the assign, return the read value but an object that *can* act as a context
manager.
Args:
value: The value the parameter will take on.
Raises:
ValueError: If the value is not inside the limits (in normal Python/eager mode)
InvalidArgumentError: If the value is not inside the limits (in JIT/traced/graph mode)
"""
def getter():
return self.value()
def setter(value):
if self.has_limits:
message = (
f"Setting value {value} invalid for parameter {self.name} with limits "
f"{self.lower} - {self.upper}. This is changed."
f" In order to silence this and clip the value, you can use (with caution,"
f" advanced) `Parameter.assign`"
)
if run.executing_eagerly():
if self._check_at_limit(value, exact=True):
raise ValueError(message)
else:
tf.debugging.assert_greater(
tf.cast(value, tf.float64),
tf.cast(self.lower, tf.float64),
message=message,
)
tf.debugging.assert_less(
tf.cast(value, tf.float64),
tf.cast(self.upper, tf.float64),
message=message,
)
# tf.debugging.Assert(self._check_at_limit(value), [value])
self.assign(value=value, read_value=False)
return TemporarilySet(value=value, setter=setter, getter=getter)
[docs]
def assign(self, value, use_locking=None, name=None, read_value=False):
"""Set the :py:class:`~zfit.Parameter` to `value` without any checks.
Compared to ``set_value``, this method cannot be used with a context manager and won't raise an
error
Args:
value: The value the parameter will take on.
"""
return super().assign(
value=value, use_locking=use_locking, name=name, read_value=read_value
)
[docs]
def randomize(
self,
minval: ztyping.NumericalScalarType | None = None,
maxval: ztyping.NumericalScalarType | None = None,
sampler: Callable = np.random.uniform,
) -> tf.Tensor:
"""Update the parameter with a randomised value between minval and maxval and return it.
Args:
minval: The lower bound of the sampler. If not given, ``lower_limit`` is used.
maxval: The upper bound of the sampler. If not given, ``upper_limit`` is used.
sampler: A sampler with the same interface as ``np.random.uniform``
Returns:
The sampled value
"""
if not tf.executing_eagerly():
raise IllegalInGraphModeError(
"Randomizing values in a parameter within Graph mode is most probably not"
" what is "
)
if minval is None:
minval = self.lower
else:
minval = tf.cast(minval, dtype=self.dtype)
if maxval is None:
maxval = self.upper
else:
maxval = tf.cast(maxval, dtype=self.dtype)
if maxval is None or minval is None:
raise RuntimeError(
"Cannot randomize a parameter without limits or limits given."
)
value = sampler(size=self.shape, low=minval, high=maxval)
self.set_value(value=value)
return value
def get_params(
self,
floating: bool | None = True,
is_yield: bool | None = None,
extract_independent: bool | None = True,
only_floating=NotSpecified,
) -> set[ZfitParameter]:
return extract_filter_params(self, floating=floating, extract_independent=False)
def __repr__(self):
if (
tf.executing_eagerly()
): # more explicit: we check for exactly this attribute, nothing inside numpy
value = f"{self.numpy():.4g}"
else:
value = "graph-node"
return f"<zfit.{self.__class__.__name__} '{self.name}' floating={self.floating} value={value}>"
# LEGACY, deprecate?
@property
def lower_limit(self):
return self.lower
@lower_limit.setter
def lower_limit(self, value):
self.lower = value
@property
def upper_limit(self):
return self.upper
@upper_limit.setter
def upper_limit(self, value):
self.upper = value
def __tf_tracing_type__(self, signature_context):
return ParameterType(parameter=self)
# delattr(Parameter, "__tf_tracing_type__")
class ParameterType(VariableSpec):
value_type = property(lambda self: Parameter)
def __init__(
self, shape=None, dtype=None, trainable=True, alias_id=None, *, parameter=None
):
if parameter is None:
raise RuntimeError("DEBUGGING HERE")
if parameter is not None: # initialize from parameter
shape = parameter.shape
dtype = parameter.dtype
trainable = True
alias_id = None
self.parameter_value = parameter
self._name = parameter.name
self.parameter_type = type(self)
if dtype is None:
dtype = tf.float64
super().__init__(
shape=shape, dtype=dtype, trainable=trainable, alias_id=alias_id
)
self.hash = hash(self.name)
@classmethod
def from_value(cls, value):
return cls(parameter=value)
def _to_components(self, value):
return super()._to_components(value)
def _from_components(self, components):
_ = super()._from_components(components) # checking that there is no error
return Parameter._from_name(self.name)
def _to_tensors(self, value):
return [value]
def is_subtype_of(self, other):
return (
type(other) is ParameterType
and self.parameter_type is other.parameter_type
and self.name == other.name
)
def most_specific_common_supertype(self, others):
return self if all(self == other for other in others) else None
def placeholder_value(self, placeholder_context=None):
return self.parameter_value
def __eq__(self, other) -> bool:
return self.parameter_type == type(other) and self.name == other.name
def __hash__(self):
return self.hash
# class ParameterType(VariableSpec):
# value_type = property(lambda self: Parameter)
#
# def __init__(self, shape=None, dtype=None, trainable=True, alias_id=None, *, parameter=None):
# if parameter is not None: # initialize from parameter
# shape = parameter.shape
# dtype = parameter.dtype
# trainable = True
# alias_id = None
# self.parameter_value = parameter
# if dtype is None:
# dtype = tf.float64
# super().__init__(shape=shape, dtype=dtype, trainable=trainable, alias_id=alias_id)
#
# def is_compatible_with(self, spec_or_value):
# return super().is_compatible_with(spec_or_value) and self.name == spec_or_value.name
#
# def is_subtype_of(self, other):
# return super().is_subtype_of(other) and self.name == other.name
#
# def most_specific_compatible_type(self, other: "TypeSpec") -> "TypeSpec":
# if self.is_subtype_of(self, other):
# return self
#
# def placeholder_value(self, placeholder_context):
# return self.parameter_value
# # return super().placeholder_value(placeholder_context) if self.parameter_value is None else self.parameter_value
# class ParameterType(tf.types.experimental.TraceType):
# def __init__(self, parameter):
# self.name = parameter.name
# self.parameter_value = parameter
# self.parameter_type = type(parameter)
#
# def is_subtype_of(self, other) -> bool:
# """Returns True if `self` is a subtype of `other`.
#
# Implements the tf.types.experimental.func.TraceType interface.
#
# If not overridden by a subclass, the default behavior is to assume the
# TypeSpec is covariant upon attributes that implement TraceType and
# invariant upon rest of the attributes as well as the structure and type
# of the TypeSpec.
#
# Args:
# other: A TraceType object.
# """
#
# if self.parameter_type is not type(other):
# return False
#
# return self.name == other.name
#
# def most_specific_common_supertype(self, others):
# if self.name == others.name:
# return self.name
# else:
# return None
#
# def placeholder_value(self, placeholder_context=None):
# return self.parameter_value
#
# def __eq__(self, other):
# return type(other) == self.parameter_type and self.name == other.name
class ParameterRepr(BaseRepr):
_implementation = Parameter
_constructor = pydantic.PrivateAttr(make_param_constructor(Parameter))
hs3_type: Literal["Parameter"] = Field("Parameter", alias="type")
name: str
value: float
lower: Optional[float] = Field(None, alias="min")
upper: Optional[float] = Field(None, alias="max")
step_size: Optional[float] = None
floating: Optional[bool] = None
@validator("value", pre=True)
def _validate_value(cls, v):
if cls.orm_mode(v):
v = v()
return v
def __hash__(self):
return hash(self.name) + hash(type(self))
class BaseComposedParameter(ZfitParameterMixin, OverloadableMixin, BaseParameter):
def __init__(
self, params, value_fn, dtype=None, name="BaseComposedParameter", **kwargs
):
# 0.4 breaking
if "value" in kwargs:
raise BreakingAPIChangeError(
"'value' cannot be provided any longer, `value_fn` is needed."
)
super().__init__(name=name, params=params, **kwargs)
if not hasattr(self, "_composed_param_original_order"):
self._composed_param_original_order = None
if not callable(value_fn):
raise TypeError("`value_fn` is not callable.")
n_func_params = len(signature(value_fn).parameters)
# TODO(0.6): change, remove legacy?
if n_func_params == 0:
if len(params) == 0:
warnings.warn(
"No `params` specified, the `value_fn` is supposed to return a constant. "
"Use preferably `ConstantParameter` instead",
RuntimeWarning,
stacklevel=2,
)
else: # this is the legacy case where the function didn't take arguments
warnings.warn(
"The `value_fn` for composed parameters should take the same number"
" of arguments as `params` are given.",
DeprecationWarning,
stacklevel=3,
)
legacy_value_fn = value_fn
def value_fn(*_):
return legacy_value_fn()
# end legacy
self._value_fn = value_fn
self._dtype = dtype if dtype is not None else ztypes.float
def _get_dependencies(self):
return _extract_dependencies(list(self.params.values()))
@property
def floating(self):
raise LogicalUndefinedOperationError(
"Cannot be floating or not. Look at the dependencies."
)
@floating.setter
def floating(self, value):
raise LogicalUndefinedOperationError(
"Cannot set floating or not. Set in the dependencies (`get_params`)."
)
@property
def params(self):
return self._params
def value(self):
params = self.params
parameters = signature(self._value_fn).parameters
if (
len(parameters) == 1
and (len(params) > 1 or "params" in parameters)
and self._composed_param_original_order is None
):
value = self._value_fn(params)
elif (
self._composed_param_original_order is None
): # TODO: this is a temp fix for legacy behavior
try:
value = self._value_fn(
**params
) # since the order is None, it has to be a dict
except Exception as error:
raise RuntimeError(
"This should not be reached. To fix this, make sure that the params to"
" ComposedParameter are a dict and that the function takes one single argument."
) from error
else:
params = (
self._composed_param_original_order
) # to make sure we have the right order
value = self._value_fn(*params)
return tf.convert_to_tensor(value, dtype=self.dtype)
def read_value(self):
return tf.identity(self.value())
@property
def shape(self):
return self.value().shape
def numpy(self):
return self.value().numpy()
@property
def independent(self):
return False
def set_value(self, value):
"""Set the value of the parameter. Cannot be used for composed parameters!
Args:
value:
"""
raise LogicalUndefinedOperationError(
"Cannot set value of a composed parameter."
" Set the value on its components."
)
def randomize(self, minval=None, maxval=None, sampler=np.random.uniform):
"""Randomize the value of the parameter.
Cannot be used for composed parameters!
"""
raise LogicalUndefinedOperationError(
"Cannot randomize a composed parameter." " Randomize the components."
)
def assign(self, value, use_locking=False, name=None, read_value=True):
"""Assign the value of the parameter.
Cannot be used for composed parameters!
"""
raise LogicalUndefinedOperationError(
"Cannot assign a composed parameter." " Assign the value on its components."
)
[docs]
class ConstantParameter(
OverloadableMixin, ZfitParameterMixin, BaseParameter, SerializableMixin
):
"""Constant parameter.
Value cannot change.
"""
def __init__(self, name, value):
"""Constant parameter that cannot change its value.
Args:
name: Unique identifier of the parameter.
value: Constant value.
"""
super().__init__(name=name, params={}, dtype=ztypes.float)
self._value_np = tf.get_static_value(value, partial=True)
self._value = tf.guarantee_const(tf.convert_to_tensor(value, dtype=self.dtype))
@property
def shape(self):
return self.value().shape
def value(self) -> tf.Tensor:
return self._value
def read_value(self) -> tf.Tensor:
return self.value()
@property
def floating(self):
return False
@floating.setter
def floating(self, value):
raise LogicalUndefinedOperationError(
"Cannot set a ConstantParameter to floating. Use a `Parameter` instead."
)
@property
def independent(self) -> bool:
return False
def _get_dependencies(self) -> ztyping.DependentsType:
return OrderedSet()
@property
def static_value(self):
return self._value_np
def numpy(self):
return self._value_np
def __repr__(self):
value = self._value_np
if value is not None:
value_str = f"{value: .4g}"
else:
value_str = "symbolic"
return f"<zfit.param.{self.__class__.__name__} '{self.name}' dtype={self.dtype.name} value={value_str}>"
register_tensor_conversion(
ConstantParameter, "ConstantParameter", overload_operators=True
)
register_tensor_conversion(
BaseComposedParameter, "BaseComposedParameter", overload_operators=True
)
class ConstantParamRepr(BaseRepr):
_implementation = ConstantParameter
_constructor = pydantic.PrivateAttr(make_param_constructor(ConstantParameter))
hs3_type: Literal["ConstantParameter"] = pydantic.Field(
"ConstantParameter", alias="type"
)
name: str
value: float
floating: bool = False
# lower: Optional[float] = Field(None, alias="min")
# upper: Optional[float] = Field(None, alias="max")
@validator("value", pre=True)
def _validate_value(cls, value):
if cls.orm_mode(value):
value = value()
return value
def _to_orm(self, init):
init = copy.copy(init)
init.pop("floating")
out = super()._to_orm(init)
return out
[docs]
class ComposedParameter(SerializableMixin, BaseComposedParameter):
@deprecated_args(None, "Use `params` instead.", "dependents")
def __init__(
self,
name: str,
value_fn: Callable,
params: (
dict[str, ZfitParameter] | Iterable[ZfitParameter] | ZfitParameter
) = NotSpecified,
dtype: tf.dtypes.DType = ztypes.float,
*,
dependents: (
dict[str, ZfitParameter] | Iterable[ZfitParameter] | ZfitParameter
) = NotSpecified,
):
"""Arbitrary composition of parameters.
A `ComposedParameter` allows for arbitrary combinations of parameters and correlations using an arbitrary
function.
Examples:
.. jupyter-execute::
import zfit
param1 = zfit.Parameter('param1', 1.0, 0.1, 1.8)
param2 = zfit.Parameter('param2', 42.0, 0, 100)
# using a dict for the params
def mult_dict(params):
return params["a"] * params["b"]
mult_param_dict = zfit.ComposedParameter('mult_dict', mult_dict, params={"a": param1, "b": param2})
# using a list for the params
def mult_list(params):
return params[0] * params[1]
mult_param_list = zfit.ComposedParameter('mult_list', mult_list, params=[param1, param2])
Args:
name: Unique name of the Parameter.
value_fn: Function that returns the value of the composed parameter and takes as arguments `params` as
arguments. The function must be able to be called with the same arguments as `params`.
params: If it is a `dict`, this will directly be used as the `params` attribute, otherwise the
parameters will be automatically named with f"param_{i}". The values act as arguments to `value_fn`.
dtype: Output of `value_fn` dtype
dependents:
.. deprecated:: unknown
use `params` instead.
"""
original_init = {"name": name, "value_fn": value_fn}
if dependents is not NotSpecified:
params = dependents
elif params is NotSpecified:
raise ValueError
if isinstance(params, collections.abc.Mapping):
self._composed_param_original_order = None
else:
self._composed_param_original_order = convert_to_container(params)
if isinstance(params, dict):
params_dict = params
else:
params = convert_to_container(params)
if params is None:
params_dict = {}
else:
params_dict = {f"param_{i}": p for i, p in enumerate(params)}
original_init[
"params"
] = params_dict # needs to be here, we need the params to be a dict for the serialization
super().__init__(params=params_dict, value_fn=value_fn, name=name, dtype=dtype)
self.hs3.original_init.update(original_init)
def __repr__(self):
if tf.executing_eagerly():
value = f"{self.numpy():.4g}"
else:
value = "graph-node"
return f"<zfit.{self.__class__.__name__} '{self.name}' params={[(k, p.name) for k, p in self.params.items()]} value={value}>"
class ComposedParameterRepr(BaseRepr):
_implementation = ComposedParameter
_constructor = pydantic.PrivateAttr(make_param_constructor(ComposedParameter))
hs3_type: Literal["ComposedParameter"] = pydantic.Field(
"ComposedParameter", alias="type"
)
name: str
value_fn: str
params: Dict[str, Serializer.types.ParamTypeDiscriminated]
@validator("value_fn", pre=True)
def _validate_value_pre(cls, value):
if cls.orm_mode(value):
value = dill.dumps(value).hex()
return value
@pydantic.root_validator(pre=True)
def validate_all_functor(cls, values):
if cls.orm_mode(values):
values = values["hs3"].original_init
return values
def _to_orm(self, init):
init = copy.copy(init)
value_fn = init.pop("value_fn")
init["value_fn"] = dill.loads(bytes.fromhex(value_fn))
out = super()._to_orm(init)
return out
[docs]
class ComplexParameter(ComposedParameter): # TODO: change to real, imag as input?
def __init__(self, name, value_fn, params, dtype=ztypes.complex):
"""Create a complex parameter.
.. note::
Use the constructor class methods instead of the __init__() constructor:
- :py:meth:`ComplexParameter.from_cartesian`
- :py:meth:`ComplexParameter.from_polar`
"""
super().__init__(name, value_fn=value_fn, params=params, dtype=dtype)
self._conj = None
self._mod = None
self._arg = None
self._imag = None
self._real = None
[docs]
@classmethod
def from_cartesian(
cls, name, real, imag, dtype=ztypes.complex, floating=True
) -> ComplexParameter: # TODO: correct dtype handling, also below
"""Create a complex parameter from cartesian coordinates.
Args:
name: Name of the parameter.
real: Real part of the complex number.
imag: Imaginary part of the complex number.
"""
real = convert_to_parameter(
real, name=name + "_real", prefer_constant=not floating
)
imag = convert_to_parameter(
imag, name=name + "_imag", prefer_constant=not floating
)
param = cls(
name=name,
value_fn=lambda _real, _imag: tf.cast(
tf.complex(_real, _imag), dtype=dtype
),
params=[real, imag],
)
param._real = real
param._imag = imag
return param
[docs]
@classmethod
def from_polar(
cls, name, mod, arg, dtype=ztypes.complex, floating=True, **kwargs
) -> ComplexParameter:
"""Create a complex parameter from polar coordinates.
Args:
name: Name of the parameter.
mod: Modulus (r) the complex number.
arg: Argument (phi) of the complex number.
"""
mod = convert_to_parameter(
mod, name=name + "_mod", prefer_constant=not floating
)
arg = convert_to_parameter(
arg, name=name + "_arg", prefer_constant=not floating
)
param = cls(
name=name,
value_fn=lambda _mod, _arg: tf.cast(
tf.complex(_mod * znp.cos(_arg), _mod * znp.sin(_arg)), dtype=dtype
),
params=[mod, arg],
)
param._mod = mod
param._arg = arg
return param
@property
def conj(self):
"""Returns a complex conjugated copy of the complex parameter."""
if self._conj is None:
self._conj = ComplexParameter(
name=f"{self.name}_conj",
value_fn=lambda: znp.conj(self),
params=self.get_cache_deps(),
dtype=self.dtype,
)
return self._conj
@property
def real(self) -> tf.Tensor:
"""Real part of the complex parameter."""
return znp.real(self)
@property
def imag(self) -> tf.Tensor:
"""Imaginary part of the complex parameter."""
return znp.imag(self)
@property
def mod(self) -> tf.Tensor:
"""Modulus (r) of the complex parameter."""
return znp.abs(self)
@property
def arg(self) -> tf.Tensor:
"""Argument (phi) of the complex parameter."""
return znp.angle(self)
# register_tensor_conversion(ConstantParameter, "ConstantParameter", True)
register_tensor_conversion(ComposedParameter, "ComposedParameter", True)
_auto_number = 0
def get_auto_number():
global _auto_number
auto_number = _auto_number
_auto_number += 1
return auto_number
def _reset_auto_number():
global _auto_number
_auto_number = 0
def convert_to_parameters(
value,
name: str | list[str] | None = None,
prefer_constant: bool = None,
lower=None,
upper=None,
step_size=None,
):
if prefer_constant is None:
prefer_constant = True
if isinstance(value, collections.abc.Mapping):
return convert_to_parameters(**value, prefer_constant=False)
value = convert_to_container(value)
is_param_already = [isinstance(val, ZfitIndependentParameter) for val in value]
if all(is_param_already):
return value
elif any(is_param_already):
raise ValueError(
f"value has to be either ZfitParameters or values, not mixed (currently)."
f" Is {value}."
)
params_dict = {
"value": value,
"name": name,
"lower": lower,
"upper": upper,
"step_size": step_size,
}
params_dict = {
key: convert_to_container(val)
for key, val in params_dict.items()
if val is not None
}
lengths = {len(v) for v in params_dict.values()}
if len(lengths) != 1:
raise ValueError(
f"Inconsistent length in values when converting the parameters: {params_dict}"
)
params = []
for i in range(len(params_dict["value"])):
pdict = {k: params_dict[k][i] for k in params_dict}
params.append(convert_to_parameter(**pdict, prefer_constant=prefer_constant))
return params
[docs]
@deprecated_args(None, "Use `params` instead.", "dependents")
def convert_to_parameter(
value,
name: str | None = None,
prefer_constant: bool = True,
params=None,
lower=None,
upper=None,
step_size=None,
# legacy
dependents=None,
) -> ZfitParameter:
"""Convert a *numerical* to a constant/floating parameter or return if already a parameter.
Args:
value:
name:
prefer_constant: If True, create a ConstantParameter instead of a Parameter, if possible.
params:
lower:
upper:
step_size:
"""
# legacy start
if dependents is not None:
params = dependents
# legacy end
if name is not None:
name = str(name)
if callable(value):
if params is None:
raise ValueError(
"If the value is a callable, the params have to be specified as an empty list/tuple"
)
return ComposedParameter(
f"Composed_autoparam_{get_auto_number()}", value_fn=value, params=params
)
if isinstance(value, ZfitParameter): # TODO(Mayou36): autoconvert variable. TF 2.0?
return value
elif isinstance(value, tf.Variable):
raise TypeError("Currently, cannot autoconvert tf.Variable to zfit.Parameter.")
# convert to Tensor
if not isinstance(value, tf.Tensor):
if isinstance(value, complex):
value = z.to_complex(value)
else:
value = z.to_real(value)
if not run._enable_parameter_autoconversion:
return value
if value.dtype.is_complex:
if name is None:
name = "FIXED_complex_autoparam_" + str(get_auto_number())
if prefer_constant:
complex_params = (
ConstantParameter(name + "_REALPART", value=znp.real(value)),
ConstantParameter(name + "_IMAGPART", value=znp.imag(value)),
)
else:
complex_params = (
Parameter(name + "_REALPART", value=znp.real(value)),
Parameter(name + "_IMAGPART", value=znp.imag(value)),
)
value = ComplexParameter.from_cartesian(
name, real=complex_params[0], imag=complex_params[1]
)
else:
if prefer_constant:
if name is None:
name = (
"FIXED_autoparam_" + str(get_auto_number())
if name is None
else name
)
value = ConstantParameter(name, value=value)
else:
name = "autoparam_" + str(get_auto_number()) if name is None else name
value = Parameter(
name=name, value=value, lower=lower, upper=upper, step_size=step_size
)
return value
@z.function(wraps="params")
def assign_values_jit(
params: Parameter | Iterable[Parameter],
values: ztyping.NumericalScalarType | Iterable[ztyping.NumericalScalarType],
use_locking=False,
):
"""Assign values to parameters jitted.
This method can be significantly faster than `set_values`, however it expects the correct data-type and
cannot, for example, take a `FitResult` as input or function as a context manager. Only use when
performance is critical (such as inside a minimizer).
Args:
params: The parameters to assign the values to.
values: Values to assign to the parameters.
use_locking:
"""
for i, param in enumerate(params):
value = values[i]
if value.dtype != param.dtype:
value = znp.cast(value, param.dtype)
param.assign(value, read_value=False, use_locking=use_locking)
def assign_values(
params: Parameter | Iterable[Parameter],
values: ztyping.NumericalScalarType | Iterable[ztyping.NumericalScalarType],
use_locking=False,
allow_partial: bool | None = None,
):
"""Set the values of multiple parameters in a fast way.
In general, :meth:`set_values` is to be preferred. `assign_values` will ignore out-of-bounds errors,
does not offer a context-manager but is in general (an order of magnitude) faster.
Args:
params: Parameters to set the values.
values: List-like object that supports indexing.
use_locking: if true, lock the parameter to avoid race conditions.
allow_partial: Allow to set only parts of the parameters in case values is a `ZfitResult`
and not all are present in the
*values*. If False, *params* not in *values* will raise an error.
Note that setting this to true will also go with an empty values container.
Raises:
ValueError: If not all *params* are in *values* if *values* is a `FitResult` and `allow_partial` is `False`.
"""
if allow_partial is None:
allow_partial = False
params, values = check_convert_param_values_assign(
params, values, allow_partial=allow_partial
)
# params = tuple(params)
assign_values_jit(params=params, values=values, use_locking=use_locking)
[docs]
def set_values(
params: Parameter | Iterable[Parameter],
values: (
ztyping.NumericalScalarType
| Iterable[ztyping.NumericalScalarType]
| ZfitResult
| Mapping[Union[str, ZfitParameter], ztyping.NumericalScalarType],
),
allow_partial: bool | None = None,
):
"""Set the values (using a context manager or not) of multiple parameters.
Args:
params: Parameters to set the values.
values: List-like object that supports indexing, a `FitResult` or a `Mapping` of parameters or parameter names to values.
allow_partial: Allow to set only parts of the parameters in case values is a `ZfitResult`
and not all are present in the
*values*. If False, *params* not in *values* will raise an error.
Note that setting this to true will also go with an empty values container.
Returns:
An object for a context manager (but can also be used without), can be ignored.
Raises:
ValueError: If the value is not between the limits of the parameter.
ValueError: If not all *params* are in *values* if *values* is a `FitResult` and `allow_partial` is `False`.
"""
if allow_partial is None:
allow_partial = False
params, values = check_convert_param_values_assign(params, values, allow_partial)
def setter(values):
for i, param in enumerate(params):
param.set_value(values[i])
def getter():
return [param.value() for param in params]
return TemporarilySet(values, setter=setter, getter=getter)
def check_convert_param_values_assign(params, values, allow_partial=False):
"""Check if params are valid and convert them if necessary to be used with assign_values.
Args:
params: Parameters to set the values.
values: List-like object that supports indexing or a `ZfitResult` or a `Mapping`.
allow_partial: Allow to set only parts of the parameters in case values is a `ZfitResult` or a `Mapping`. Other parameters will not be updated.
More values than parameters, if they are given in a `Mapping` or `ZfitResult`, are allowed in any case.
Returns:
A tuple of (params, values)
"""
params = convert_to_container(params)
if isinstance(values, ZfitResult):
result = values
new_params = []
values = []
for param in params:
if param in result.params:
values.append(result.params[param]["value"])
new_params.append(param)
elif not allow_partial:
raise ValueError(
f"Cannot set {param} with {repr(result)} as it is not contained. To partially set"
f" the parameters (only those in the result), use allow_partial"
)
params = new_params
elif isinstance(values, Mapping):
new_params = []
new_values = []
for param in params:
if param in values:
new_values.append(values[param])
new_params.append(param)
elif param.name in values:
new_values.append(values[param.name])
new_params.append(param)
elif not allow_partial:
raise ValueError(
f"Cannot set {param} with {repr(values)} as it is not contained. To partially set"
f" the parameters (only those in the result), use allow_partial"
)
values = new_values
params = new_params
elif len(params) > 1:
if not tf.is_tensor(values) or isinstance(values, np.ndarray):
values = convert_to_container(values)
if len(params) != len(values):
raise ValueError(
f"Incompatible length of parameters and values: {params}, {values}"
)
not_param = [param for param in params if not isinstance(param, ZfitParameter)]
if not_param:
raise TypeError(
f"The following are not parameters (but should be): {not_param}"
)
non_independent_params = [param for param in params if not param.independent]
if non_independent_params:
raise ParameterNotIndependentError(
f"trying to set value of parameters that are not independent "
f"{non_independent_params}"
)
values = znp.asarray(values, dtype=znp.float64)
return params, values