"""Define Parameter which holds the value."""
# Copyright (c) 2020 zfit
from collections import OrderedDict
from contextlib import suppress
from typing import Callable
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
# TF backwards compatibility
from ordered_set import OrderedSet
from tensorflow.python import ops, array_ops
from tensorflow.python.client.session import register_session_run_conversion_functions
from tensorflow.python.ops.resource_variable_ops import ResourceVariable as TFBaseVariable
from zfit import z
from zfit.util.container import convert_to_container
from . import interfaces as zinterfaces
from .interfaces import ZfitModel, ZfitParameter
from ..core.baseobject import BaseNumeric
from ..settings import ztypes, run
from ..util import ztyping
from ..util.cache import invalidates_cache
from ..util.exception import LogicalUndefinedOperationError, NameAlreadyTakenError, BreakingAPIChangeError, \
WorkInProgressError
from ..util.temporary import TemporarilySet
[docs]def register_tensor_conversion(convertable, overload_operators=True, priority=1): # higher then any tf conversion
fetch_function = lambda variable: ([variable.read_value()],
lambda val: val[0])
feed_function = lambda feed, feed_val: [(feed.read_value(), feed_val)]
feed_function_for_partial_run = lambda feed: [feed.read_value()]
register_session_run_conversion_functions(tensor_type=convertable, fetch_function=fetch_function,
feed_function=feed_function,
feed_function_for_partial_run=feed_function_for_partial_run)
def _dense_var_to_tensor(var, dtype=None, name=None, as_ref=False):
return var._dense_var_to_tensor(dtype=dtype, name=name, as_ref=as_ref)
ops.register_tensor_conversion_function(convertable, _dense_var_to_tensor, priority=priority)
if overload_operators:
convertable._OverloadAllOperators()
# drop-in replacement for ResourceVariable
# class ZfitBaseVariable(metaclass=type(TFBaseVariable)):
[docs]class ZfitBaseVariable(metaclass=MetaBaseParameter): # TODO(Mayou36): upgrade to tf2
# class ZfitBaseVariable:
def __init__(self, variable: tf.Variable, **kwargs):
self.variable = variable
# @property
# def name(self):
# return self.variable.op.name
@property
def dtype(self):
return self.variable.dtype
[docs] def value(self):
return self.variable.value()
[docs] def assign(self, value, use_locking=False, name=None, read_value=True):
return self.variable.assign(value=value, use_locking=use_locking,
name=name, read_value=read_value)
def _dense_var_to_tensor(self, dtype=None, name=None, as_ref=False):
del name
if dtype is not None and dtype != self.dtype:
return NotImplemented
if as_ref:
return self.variable.read_value().op.inputs[0]
else:
return self.variable.value()
def _AsTensor(self):
return self.variable.value()
@staticmethod
def _OverloadAllOperators(): # pylint: disable=invalid-name
"""Register overloads for all operators."""
for operator in ops.Tensor.OVERLOADABLE_OPERATORS:
ZfitBaseVariable._OverloadOperator(operator)
# For slicing, bind getitem differently than a tensor (use SliceHelperVar
# instead)
# pylint: disable=protected-access
setattr(ZfitBaseVariable, "__getitem__", array_ops._SliceHelperVar)
@staticmethod
def _OverloadOperator(operator): # pylint: disable=invalid-name
"""Defer an operator overload to `ops.Tensor`.
We pull the operator out of ops.Tensor dynamically to avoid ordering issues.
Args:
operator: string. The operator name.
"""
tensor_oper = getattr(ops.Tensor, operator)
def _run_op(a, *args):
# pylint: disable=protected-access
value = a._AsTensor()
return tensor_oper(value, *args)
# Propagate __doc__ to wrapper
try:
_run_op.__doc__ = tensor_oper.__doc__
except AttributeError:
pass
setattr(ZfitBaseVariable, operator, _run_op)
register_tensor_conversion(ZfitBaseVariable, overload_operators=True)
# def _dense_var_to_tensor(var, dtype=None, name=None, as_ref=False):
# return var._dense_var_to_tensor(dtype=dtype, name=name, as_ref=as_ref)
#
#
# ops.register_tensor_conversion_function(ZfitBaseVariable, _dense_var_to_tensor)
#
# ZfitBaseVariable._OverloadAllOperators()
# class ComposedResourceVariable(ResourceVariable):
# def __init__(self, name, initial_value, **kwargs):
# super().__init__(name=name, initial_value=initial_value, **kwargs)
# self._value_tensor = initial_value
#
# def value(self):
# # with tf.control_dependencies([self._value_tensor]):
# # return 5.
# return self._value_tensor
#
# def read_value(self):
# # raise RuntimeError()
# return self._value_tensor
# class ComposedVariable(tf.Variable, metaclass=type(tf.Variable)):
# class ComposedVariable(ResourceVariable, metaclass=type(tf.Variable)):
# class ComposedVariable(metaclass=MetaBaseParameter):
[docs]class ComposedVariable:
def __init__(self, name: str, value_fn: Callable, **kwargs):
super().__init__(name=name, **kwargs)
if not callable(value_fn):
raise TypeError("`value_fn` is not callable.")
self._value_fn = value_fn
[docs] def value(self):
return tf.convert_to_tensor(self._value_fn(), dtype=self.dtype)
[docs] def read_value(self):
return self.value()
[docs] def numpy(self):
return self.value().numpy()
[docs] def assign(self, value, use_locking=False, name=None, read_value=True):
raise LogicalUndefinedOperationError("Cannot assign to a fixed/composed parameter")
def _dense_var_to_tensor(self, dtype=None, name=None, as_ref=False):
del name
if dtype is not None and dtype != self.dtype:
return NotImplemented
if as_ref:
# return "NEVER READ THIS"
raise LogicalUndefinedOperationError("There is no ref for the fixed/composed parameter")
else:
return self.value()
def _AsTensor(self):
return self.value()
@staticmethod
def _OverloadAllOperators(): # pylint: disable=invalid-name
"""Register overloads for all operators."""
for operator in ops.Tensor.OVERLOADABLE_OPERATORS:
ComposedVariable._OverloadOperator(operator)
# For slicing, bind getitem differently than a tensor (use SliceHelperVar
# instead)
# pylint: disable=protected-access
setattr(ComposedVariable, "__getitem__", array_ops._SliceHelperVar)
@staticmethod
def _OverloadOperator(operator): # pylint: disable=invalid-name
"""Defer an operator overload to `ops.Tensor`.
We pull the operator out of ops.Tensor dynamically to avoid ordering issues.
Args:
operator: string. The operator name.
"""
tensor_oper = getattr(ops.Tensor, operator)
def _run_op(a, *args):
# pylint: disable=protected-access
value = a._AsTensor()
return tensor_oper(value, *args)
# Propagate __doc__ to wrapper
try:
_run_op.__doc__ = tensor_oper.__doc__
except AttributeError:
pass
setattr(ComposedVariable, operator, _run_op)
register_tensor_conversion(ComposedVariable, overload_operators=True)
[docs]class BaseParameter(ZfitParameter, metaclass=MetaBaseParameter): # TODO(Mayou36): upgrade to tf2
# class BaseParameter(ZfitParameter):
pass
[docs]class ZfitParameterMixin(BaseNumeric):
_existing_names = OrderedDict()
def __init__(self, name, **kwargs):
if name in self._existing_names:
raise NameAlreadyTakenError("Another parameter is already named {}. "
"Use a different, unique one.".format(name))
self._existing_names.update({name: self})
self._name = name
super().__init__(name=name, **kwargs)
@property
def floating(self):
if self._floating and not self.trainable:
raise RuntimeError("Floating is set to true but tf Variable is not trainable.")
return self._floating
@floating.setter
def floating(self, value):
if not isinstance(value, bool):
raise TypeError("floating has to be a boolean.")
self._floating = value
def __del__(self):
if self.name in self._existing_names: # bug, creates segmentation fault in unittests
del self._existing_names[self.name]
with suppress(AttributeError): # if super does not have a __del__
super().__del__(self)
def __add__(self, other):
if isinstance(other, (ZfitModel, ZfitParameter)):
from . import operations
with suppress(NotImplementedError):
return operations.add(self, other)
return super().__add__(other)
def __radd__(self, other):
if isinstance(other, (ZfitModel, ZfitParameter)):
from . import operations
with suppress(NotImplementedError):
return operations.add(other, self)
return super().__radd__(other)
def __mul__(self, other):
if isinstance(other, (ZfitModel, ZfitParameter)):
from . import operations
with suppress(NotImplementedError):
return operations.multiply(self, other)
return super().__mul__(other)
def __rmul__(self, other):
if isinstance(other, (ZfitModel, ZfitParameter)):
from . import operations
with suppress(NotImplementedError):
return operations.multiply(other, self)
return super().__rmul__(other)
def __eq__(self, other):
return id(self) == id(other)
def __hash__(self):
return id(self)
[docs]class TFBaseVariable(TFBaseVariable, metaclass=MetaBaseParameter):
pass
[docs]class Parameter(ZfitParameterMixin, TFBaseVariable, BaseParameter):
"""Class for fit parameters, derived from TF Variable class.
"""
_independent = True
_independent_params = []
def __init__(self, name, value, lower_limit=None, upper_limit=None, step_size=None, floating=True,
dtype=ztypes.float, **kwargs):
"""
name : name of the parameter,
value : starting value
lower_limit : lower limit
upper_limit : upper limit
step_size : step size (set to 0 for fixed parameters)
"""
self._independent_params.append(self)
# TODO: sanitize input for TF2
self._lower_limit_neg_inf = None
self._upper_limit_neg_inf = None
if lower_limit is None:
self._lower_limit_neg_inf = tf.cast(-np.infty, dtype)
if upper_limit is None:
self._upper_limit_neg_inf = tf.cast(np.infty, dtype)
# no_limits = -lower_limit == upper_limit == np.infty
value = tf.cast(value, dtype=ztypes.float)
def constraint(x):
return tfp.math.clip_by_value_preserve_gradient(x, clip_value_min=self.lower_limit,
clip_value_max=self.upper_limit)
# self.constraint = constraint
super().__init__(initial_value=value, dtype=dtype, name=name, constraint=constraint,
params={}, **kwargs)
self._true_name = name
self.lower_limit = tf.cast(lower_limit, dtype=ztypes.float) if lower_limit is not None else lower_limit
self.upper_limit = tf.cast(upper_limit, dtype=ztypes.float) if upper_limit is not None else upper_limit
# value = tf.cast(value, dtype=ztypes.float) # TODO: init value mandatory?
self.floating = floating
self.step_size = step_size
# zfit.run.auto_initialize(self)
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
cls._independent = True # overwriting independent only for subclass/instance
@property
def name(self):
return self._true_name
def _hack_set_tf_name(self):
def setter(name):
self._true_name = name
def getter():
return self.name
return TemporarilySet(value=super().name, getter=getter, setter=setter)
@property
def lower_limit(self):
limit = self._lower_limit
if limit is None:
limit = self._lower_limit_neg_inf
return limit
@lower_limit.setter
@invalidates_cache
def lower_limit(self, value):
if value is None and self._lower_limit_neg_inf is None:
self._lower_limit_neg_inf = tf.cast(-np.infty, dtype=ztypes.float)
self._lower_limit = value
@property
def upper_limit(self):
limit = self._upper_limit
if limit is None:
limit = self._upper_limit_neg_inf
return limit
@upper_limit.setter
@invalidates_cache
def upper_limit(self, value):
if value is None and self._upper_limit_neg_inf is None:
self._upper_limit_neg_inf = tf.cast(np.infty, dtype=ztypes.float)
self._upper_limit = value
@property
def has_limits(self):
no_limits = self._lower_limit is None and self._upper_limit is None
return not no_limits
[docs] def value(self):
value = super().value()
if self.has_limits:
value = self.constraint(value)
return value
[docs] def read_value(self):
value = super().read_value()
if self.has_limits:
value = self.constraint(value)
return value
def _get_dependents(self):
return {self}
@property
def independent(self):
return self._independent
@property
def step_size(self): # TODO: improve default step_size?
step_size = self._step_size
if step_size is None:
# auto-infer from limits
# step_splits = 1e4
# if self.has_limits:
# step_size = (self.upper_limit - self.lower_limit) / step_splits # TODO improve? can be tensor?
# else:
step_size = 0.001
if np.isnan(step_size):
if self.lower_limit == -np.infty or self.upper_limit == np.infty:
step_size = 0.001
else:
raise ValueError("Could not set step size. Is NaN.")
# TODO: how to deal with infinities?
step_size = z.to_real(step_size)
self.step_size = step_size
return step_size
@step_size.setter
def step_size(self, value):
if value is not None:
value = z.convert_to_tensor(value, preferred_dtype=ztypes.float)
value = tf.cast(value, dtype=ztypes.float)
self._step_size = value
[docs] def set_value(self, value: ztyping.NumericalScalarType):
"""Set the :py:class:`~zfit.Parameter` to `value` (temporarily if used in a context manager).
Args:
value (float): The value the parameter will take on.
"""
super_assign = super().assign
def getter():
return self.value()
def setter(value):
super_assign(value=value)
return TemporarilySet(value=value, setter=setter, getter=getter)
# TODO: make it a random variable? return tensor that evaluates new all the time?
[docs] def randomize(self, minval=None, maxval=None, sampler=np.random.uniform):
"""Update the value with a randomised value between minval and maxval.
Args:
minval (Numerical):
maxval (Numerical):
sampler ():
"""
if minval is None:
minval = self.sess.run(self.lower_limit)
# else:
# minval = tf.cast(minval, dtype=self.dtype)
if maxval is None:
maxval = self.sess.run(self.upper_limit)
# else:
# maxval = tf.cast(maxval, dtype=self.dtype)
# value = z.random_uniform(shape=self.shape, minval=minval, maxval=maxval, dtype=self.dtype)
shape = self.shape.as_list()
# if shape == []:
# size = 1
# value = self.sess.run(value)
# eps = 1e-8
# value = sampler(size=self.shape, low=minval + eps, high=maxval - eps)
value = sampler(size=self.shape, low=minval, high=maxval)
# value = np.random.uniform(size=size, low=minval, high=maxval)
# if shape == []:
# value = value[0]
self.load(value=value)
return value
def __del__(self):
self._independent_params.remove(self)
super().__del__()
def __repr__(self):
if hasattr(self, "numpy"): # more explicit: we check for exactly this attribute, nothing inside numpy
value = self.numpy()
else:
value = "graph-node"
return f"<zfit.{self.__class__.__name__} '{self.name}' floating={self.floating} value={value:.4g}>"
[docs]class BaseZParameter(ZfitParameterMixin, ComposedVariable, BaseParameter):
pass
[docs]class BaseComposedParameter(BaseZParameter):
def __init__(self, params, value_fn, name="BaseComposedParameter", **kwargs):
# 0.4 breaking
if 'value' in kwargs:
raise BreakingAPIChangeError("'value' cannot be provided any longer, `value_fn` tois needed.")
super().__init__(value_fn=value_fn, name=name, params=params, **kwargs)
# self.params = params
def _get_dependents(self):
dependents = self._extract_dependents(list(self.params.values()))
return dependents
@property
def floating(self):
raise LogicalUndefinedOperationError("Cannot be floating or not. Look at the dependents.")
@property
def params(self):
return self._params
@params.setter
def params(self, value):
if not isinstance(value, dict):
raise TypeError("Parameters has to be a dict")
self._params = value
@property
def independent(self):
return False
[docs]class ConstantParameter(BaseZParameter):
def __init__(self, name, value, dtype=ztypes.float):
super().__init__(name=name, value_fn=lambda: value, params={}, dtype=dtype)
@property
def floating(self):
return False
@floating.setter
def floating(self, value):
raise LogicalUndefinedOperationError("Cannot set a fixed parameter.")
@property
def independent(self) -> bool:
return False
def _get_dependents(self) -> ztyping.DependentsType:
return OrderedSet()
[docs]class ComposedParameter(BaseComposedParameter):
def __init__(self, name, value_fn, dependents, dtype=ztypes.float, **kwargs): # TODO: automatize dependents
dependents = convert_to_container(dependents)
if dependents is None:
params = OrderedSet()
else:
params = self._extract_dependents(dependents)
params = {p.name: p for p in params}
super().__init__(params=params, value_fn=value_fn, name=name, dtype=dtype, **kwargs)
def __repr__(self):
if hasattr(self, "numpy"): # more explicit: we check for exactly this attribute, nothing inside numpy
value = self.numpy()
else:
value = "graph-node"
return f"<zfit.{self.__class__.__name__} '{self.name}' dtype={self.dtype.name} value={value:.4g}>"
[docs]class ComplexParameter(ComposedParameter):
def __init__(self, name, value_fn, dependents, dtype=ztypes.complex, **kwargs):
super().__init__(name, value_fn=value_fn, dependents=dependents, dtype=dtype, **kwargs)
self._conj = None
self._mod = None
self._arg = None
self._imag = None
self._real = None
[docs] @staticmethod
def from_cartesian(name, real, imag, dtype=ztypes.complex, floating=True,
**kwargs): # TODO: correct dtype handling, also below
real = convert_to_parameter(real, name=name + "_real", prefer_floating=floating)
imag = convert_to_parameter(imag, name=name + "_imag", prefer_floating=floating)
param = ComplexParameter(name=name, value_fn=lambda: tf.cast(tf.complex(real, imag), dtype=dtype),
dependents=[real, imag],
**kwargs)
param._real = real
param._imag = imag
return param
[docs] @staticmethod
def from_polar(name, mod, arg, dtype=ztypes.complex, floating=True, **kwargs):
mod = convert_to_parameter(mod, name=name + "_mod", prefer_floating=floating)
arg = convert_to_parameter(arg, name=name + "_arg", prefer_floating=floating)
param = ComplexParameter(name=name,
value_fn=lambda: tf.cast(tf.complex(mod * tf.math.cos(arg),
mod * tf.math.sin(arg)),
dtype=dtype),
dependents=[mod, arg],
**kwargs)
param._mod = mod
param._arg = arg
return param
@property
def conj(self):
if self._conj is None:
self._conj = ComplexParameter(name='{}_conj'.format(self.name), value_fn=lambda: tf.math.conj(self),
dependents=self.get_dependents(),
dtype=self.dtype)
return self._conj
@property
def real(self):
real = self._real
if real is None:
real = z.to_real(self)
return real
@property
def imag(self):
imag = self._imag
if imag is None:
imag = tf.math.imag(tf.convert_to_tensor(value=self, dtype_hint=self.dtype)) # HACK tf bug #30029
return imag
@property
def mod(self):
mod = self._mod
if mod is None:
mod = tf.math.abs(self)
return mod
@property
def arg(self):
arg = self._arg
if arg is None:
arg = tf.math.atan(self.imag / self.real)
return arg
_auto_number = 0
[docs]def get_auto_number():
global _auto_number
auto_number = _auto_number
_auto_number += 1
return auto_number
[docs]def convert_to_parameter(value, name=None, prefer_floating=False, dependents=None, graph_mode=False) -> "ZfitParameter":
"""Convert a *numerical* to a fixed/floating parameter or return if already a parameter.
Args:
value ():
name ():
prefer_floating: If True, create a Parameter instead of a FixedParameter _if possible_.
"""
is_python = False
if name is not None:
name = str(name)
if callable(value):
if dependents is None:
raise ValueError("If the value is a callable, the dependents have to be specified as an empty list/tuple")
return ComposedParameter(f"Composed_autoparam_{get_auto_number()}", value_fn=value, dependents=dependents)
if isinstance(value, ZfitParameter): # TODO(Mayou36): autoconvert variable. TF 2.0?
return value
elif isinstance(value, tf.Variable):
raise TypeError("Currently, cannot autoconvert tf.Variable to zfit.Parameter.")
# convert to Tensor
if not isinstance(value, tf.Tensor):
is_python = True
if isinstance(value, complex):
value = z.to_complex(value)
else:
value = z.to_real(value)
if not run._enable_parameter_autoconversion:
return value
if value.dtype.is_complex:
if name is None:
name = "FIXED_complex_autoparam_" + str(get_auto_number())
if not prefer_floating:
raise WorkInProgressError("Constant complex param not here yet, complex Mixin?")
value = ComplexParameter(name, value_fn=value, floating=prefer_floating)
else:
if prefer_floating:
name = "autoparam_" + str(get_auto_number()) if name is None else name
value = Parameter(name=name, value=value)
else:
if name is None:
name = "FIXED_autoparam_" + str(get_auto_number()) if name is None else name
value = ConstantParameter(name, value=value)
return value