Source code for zfit.core.parameter

"""Define Parameter which holds the value."""
#  Copyright (c) 2019 zfit

from contextlib import suppress

import numpy as np
import tensorflow as tf

# TF backwards compatibility
from ordered_set import OrderedSet
from tensorflow.python import ops, array_ops

import zfit
from zfit import ztf

from tensorflow.python.ops.resource_variable_ops import ResourceVariable as TFBaseVariable
from tensorflow.python.ops.resource_variable_ops import ResourceVariable

from ..util.temporary import TemporarilySet
from ..core.baseobject import BaseNumeric, BaseObject
from ..util.cache import Cachable, invalidates_cache
from ..util import ztyping
from ..util.execution import SessionHolderMixin
from .interfaces import ZfitModel, ZfitParameter
from ..util.graph import get_dependents_auto
from ..util.exception import LogicalUndefinedOperationError, NameAlreadyTakenError
from . import baseobject as zbaseobject
from . import interfaces as zinterfaces
from ..settings import ztypes, run


[docs]class MetaBaseParameter(type(TFBaseVariable), type(zinterfaces.ZfitParameter)): # resolve metaclasses pass
# drop-in replacement for ResourceVariable # class ZfitBaseVariable(metaclass=type(TFBaseVariable)): # class ZfitBaseVariable(metaclass=MetaBaseParameter): # TODO(Mayou36): upgrade to tf2
[docs]class ZfitBaseVariable: def __init__(self, variable: tf.Variable, **kwargs): self.variable = variable # @property # def name(self): # return self.variable.op.name @property def dtype(self): return self.variable.dtype
[docs] def value(self): return self.variable.value()
[docs] def assign(self, value, use_locking=False, name=None, read_value=True): return self.variable.assign(value=value, use_locking=use_locking, name=name, read_value=read_value)
def _dense_var_to_tensor(self, dtype=None, name=None, as_ref=False): del name if dtype is not None and dtype != self.dtype: return NotImplemented if as_ref: return self.variable.read_value().op.inputs[0] else: return self.variable.value() def _AsTensor(self): return self.variable.value() @staticmethod def _OverloadAllOperators(): # pylint: disable=invalid-name """Register overloads for all operators.""" for operator in ops.Tensor.OVERLOADABLE_OPERATORS: ZfitBaseVariable._OverloadOperator(operator) # For slicing, bind getitem differently than a tensor (use SliceHelperVar # instead) # pylint: disable=protected-access setattr(ZfitBaseVariable, "__getitem__", array_ops._SliceHelperVar) @staticmethod def _OverloadOperator(operator): # pylint: disable=invalid-name """Defer an operator overload to `ops.Tensor`. We pull the operator out of ops.Tensor dynamically to avoid ordering issues. Args: operator: string. The operator name. """ tensor_oper = getattr(ops.Tensor, operator) def _run_op(a, *args): # pylint: disable=protected-access value = a._AsTensor() return tensor_oper(value, *args) # Propagate __doc__ to wrapper try: _run_op.__doc__ = tensor_oper.__doc__ except AttributeError: pass setattr(ZfitBaseVariable, operator, _run_op)
def _dense_var_to_tensor(var, dtype=None, name=None, as_ref=False): return var._dense_var_to_tensor(dtype=dtype, name=name, as_ref=as_ref) ops.register_tensor_conversion_function(ZfitBaseVariable, _dense_var_to_tensor) # ops.register_session_run_conversion_functions() ZfitBaseVariable._OverloadAllOperators()
[docs]class ComposedResourceVariable(ResourceVariable): def __init__(self, name, initial_value, **kwargs): super().__init__(name=name, initial_value=initial_value, **kwargs) self._value_tensor = initial_value
[docs] def value(self): # with tf.control_dependencies([self._value_tensor]): # return 5. return self._value_tensor
[docs] def read_value(self): # raise RuntimeError() return self._value_tensor
# class ComposedVariable(tf.Variable, metaclass=type(tf.Variable)): # class ComposedVariable(ResourceVariable, metaclass=type(tf.Variable)): # class ComposedVariable(metaclass=MetaBaseParameter): # TODO(Mayou36): upgrade to tf2
[docs]class ComposedVariable: def __init__(self, name: str, initial_value: tf.Tensor, **kwargs): # super().__init__(initial_value=initial_value, **kwargs, use_resource=True) super().__init__(name=name, **kwargs) self._value_tensor = tf.convert_to_tensor(value=initial_value, dtype_hint=ztypes.float) # self._name = name @property def name(self): return self.name @property def dtype(self): return self._value_tensor.dtype
[docs] def value(self): return self._value_tensor
[docs] def read_value(self): return self.value()
[docs] def assign(self, value, use_locking=False, name=None, read_value=True): raise LogicalUndefinedOperationError("Cannot assign to a fixed/composed parameter")
[docs] def load(self, value, session=None): raise LogicalUndefinedOperationError("Cannot load to a fixed/composed parameter")
def _dense_var_to_tensor(self, dtype=None, name=None, as_ref=False): del name if dtype is not None and dtype != self.dtype: return NotImplemented if as_ref: # return "NEVER READ THIS" raise LogicalUndefinedOperationError("There is no ref for the fixed/composed parameter") else: return self._value_tensor def _AsTensor(self): return self._value_tensor @staticmethod def _OverloadAllOperators(): # pylint: disable=invalid-name """Register overloads for all operators.""" for operator in ops.Tensor.OVERLOADABLE_OPERATORS: ComposedVariable._OverloadOperator(operator) # For slicing, bind getitem differently than a tensor (use SliceHelperVar # instead) # pylint: disable=protected-access setattr(ComposedVariable, "__getitem__", array_ops._SliceHelperVar) @staticmethod def _OverloadOperator(operator): # pylint: disable=invalid-name """Defer an operator overload to `ops.Tensor`. We pull the operator out of ops.Tensor dynamically to avoid ordering issues. Args: operator: string. The operator name. """ tensor_oper = getattr(ops.Tensor, operator) def _run_op(a, *args): # pylint: disable=protected-access value = a._AsTensor() return tensor_oper(value, *args) # Propagate __doc__ to wrapper try: _run_op.__doc__ = tensor_oper.__doc__ except AttributeError: pass setattr(ComposedVariable, operator, _run_op)
def _dense_var_to_tensor(var, dtype=None, name=None, as_ref=False): return var._dense_var_to_tensor(dtype=dtype, name=name, as_ref=as_ref) ops.register_tensor_conversion_function(ComposedVariable, _dense_var_to_tensor) fetch_function = lambda variable: ([variable.read_value()], lambda val: val[0]) feed_function = lambda feed, feed_val: [(feed.read_value(), feed_val)] feed_function_for_partial_run = lambda feed: [feed.read_value()] from tensorflow.python.client.session import register_session_run_conversion_functions # ops.register_dense_tensor_like_type() register_session_run_conversion_functions(tensor_type=ComposedResourceVariable, fetch_function=fetch_function, feed_function=feed_function, feed_function_for_partial_run=feed_function_for_partial_run) register_session_run_conversion_functions(tensor_type=ComposedVariable, fetch_function=fetch_function, feed_function=feed_function, feed_function_for_partial_run=feed_function_for_partial_run) ComposedVariable._OverloadAllOperators() # class BaseParameter(ZfitParameter, metaclass=MetaBaseParameter): # TODO(Mayou36): upgrade to tf2
[docs]class BaseParameter(ZfitParameter): pass
[docs]class ZfitParameterMixin(BaseNumeric): _existing_names = set() def __init__(self, name, initial_value, **kwargs): if name in self._existing_names: raise NameAlreadyTakenError("Another parameter is already named {}. " "Use a different, unique one.".format(name)) self._existing_names.update((name,)) self._name = name super().__init__(initial_value=initial_value, name=name, **kwargs) # try: # new_name = self.op.name # except AttributeError: # no `op` attribute -> take normal name # new_name = self.name # new_name = self.name.rsplit(':', 1)[0] # get rid of tf node # new_name = self.name # get rid of tf node # new_name = new_name.rsplit('/', 1)[-1] # get rid of the scope preceding the name # if not new_name == name: # name has been mangled because it already exists # raise NameAlreadyTakenError("Another parameter is already named {}. " # "Use a different, unique one.".format(name)) @property def name(self): return self._name @property def floating(self): if self._floating and not self.trainable: raise RuntimeError("Floating is set to true but tf Variable is not trainable.") return self._floating @floating.setter def floating(self, value): if not isinstance(value, bool): raise TypeError("floating has to be a boolean.") self._floating = value def __add__(self, other): if isinstance(other, (ZfitModel, ZfitParameter)): from . import operations with suppress(NotImplementedError): return operations.add(self, other) return super().__add__(other) def __radd__(self, other): if isinstance(other, (ZfitModel, ZfitParameter)): from . import operations with suppress(NotImplementedError): return operations.add(other, self) return super().__radd__(other) def __mul__(self, other): if isinstance(other, (ZfitModel, ZfitParameter)): from . import operations with suppress(NotImplementedError): return operations.multiply(self, other) return super().__mul__(other) def __rmul__(self, other): if isinstance(other, (ZfitModel, ZfitParameter)): from . import operations with suppress(NotImplementedError): return operations.multiply(other, self) return super().__rmul__(other) def __eq__(self, other): return id(self) == id(other) def __hash__(self): return super().__hash__()
# solve metaclass confict # class TFBaseVariable(TFBaseVariable, metaclass=MetaBaseParameter): # TODO(Mayou36): upgrade to tf2
[docs]class TFBaseVariable(TFBaseVariable): pass
[docs]class Parameter(SessionHolderMixin, ZfitParameterMixin, TFBaseVariable, BaseParameter): """Class for fit parameters, derived from TF Variable class. """ _independent = True def __init__(self, name, value, lower_limit=None, upper_limit=None, step_size=None, floating=True, dtype=ztypes.float, **kwargs): """ Constructor. name : name of the parameter, value : starting value lower_limit : lower limit upper_limit : upper limit step_size : step size (set to 0 for fixed parameters) """ # TODO: sanitize input self._lower_limit_neg_inf = None self._upper_limit_neg_inf = None if lower_limit is None: self._lower_limit_neg_inf = tf.cast(-np.infty, dtype) if upper_limit is None: self._upper_limit_neg_inf = tf.cast(np.infty, dtype) # no_limits = -lower_limit == upper_limit == np.infty value = tf.cast(value, dtype=ztypes.float) def constraint(x): return tf.clip_by_value(x, clip_value_min=self.lower_limit, clip_value_max=self.upper_limit) # self.constraint = constraint super().__init__(initial_value=value, dtype=dtype, name=name, constraint=constraint, params={}, **kwargs) self.lower_limit = tf.cast(lower_limit, dtype=ztypes.float) if lower_limit is not None else lower_limit self.upper_limit = tf.cast(upper_limit, dtype=ztypes.float) if upper_limit is not None else upper_limit if self.independent: tf.compat.v1.add_to_collection("zfit_independent", self) else: tf.compat.v1.add_to_collection("zfit_dependent", self) # value = tf.cast(value, dtype=ztypes.float) # TODO: init value mandatory? self.floating = floating self.step_size = step_size zfit.run.auto_initialize(self) def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) cls._independent = True # overwriting independent only for subclass/instance @property def lower_limit(self): limit = self._lower_limit if limit is None: limit = self._lower_limit_neg_inf return limit @lower_limit.setter @invalidates_cache def lower_limit(self, value): if value is None and self._lower_limit_neg_inf is None: self._lower_limit_neg_inf = tf.cast(-np.infty, dtype=ztypes.float) self._lower_limit = value @property def upper_limit(self): limit = self._upper_limit if limit is None: limit = self._upper_limit_neg_inf return limit @upper_limit.setter @invalidates_cache def upper_limit(self, value): if value is None and self._upper_limit_neg_inf is None: self._upper_limit_neg_inf = tf.cast(np.infty, dtype=ztypes.float) self._upper_limit = value @property def has_limits(self): no_limits = self._lower_limit is None and self._upper_limit is None return not no_limits
[docs] def value(self): value = super().value() if self.has_limits: value = self.constraint(value) return value
[docs] def read_value(self): value = super().read_value() if self.has_limits: value = self.constraint(value) return value
def _get_dependents(self): return {self} @property def independent(self): return self._independent @property def step_size(self): # TODO: improve default step_size? step_size = self._step_size if step_size is None: # auto-infer from limits # step_splits = 1e4 # if self.has_limits: # step_size = (self.upper_limit - self.lower_limit) / step_splits # TODO improve? can be tensor? # else: step_size = 0.001 if np.isnan(step_size): if self.lower_limit == -np.infty or self.upper_limit == np.infty: step_size = 0.001 else: raise ValueError("Could not set step size. Is NaN.") # TODO: how to deal with infinities? step_size = ztf.to_real(step_size) self.step_size = step_size return step_size @step_size.setter def step_size(self, value): if value is not None: value = ztf.convert_to_tensor(value, preferred_dtype=ztypes.float) value = tf.cast(value, dtype=ztypes.float) self._step_size = value
[docs] def load(self, value: ztyping.NumericalScalarType): """:py:class:`~zfit.Parameter` takes on the `value`. Is not part of the graph, does a session run. Args: value (numerical): """ return super().load(value=value, session=self.sess)
[docs] def set_value(self, value: ztyping.NumericalScalarType): """Set the :py:class:`~zfit.Parameter` to `value` (temporarily if used in a context manager). Args: value (float): The value the parameter will take on. """ super_load = super().load def getter(): return self.sess.run(self) def setter(value): super_load(value=value, session=self.sess) return TemporarilySet(value=value, setter=setter, getter=getter)
# TODO: make it a random variable? return tensor that evaluates new all the time?
[docs] def randomize(self, minval=None, maxval=None, sampler=np.random.uniform): """Update the value with a randomised value between minval and maxval. Args: minval (Numerical): maxval (Numerical): sampler (): """ if minval is None: minval = self.sess.run(self.lower_limit) # else: # minval = tf.cast(minval, dtype=self.dtype) if maxval is None: maxval = self.sess.run(self.upper_limit) # else: # maxval = tf.cast(maxval, dtype=self.dtype) # value = ztf.random_uniform(shape=self.shape, minval=minval, maxval=maxval, dtype=self.dtype) shape = self.shape.as_list() # if shape == []: # size = 1 # value = self.sess.run(value) # eps = 1e-8 # value = sampler(size=self.shape, low=minval + eps, high=maxval - eps) value = sampler(size=self.shape, low=minval, high=maxval) # value = np.random.uniform(size=size, low=minval, high=maxval) # if shape == []: # value = value[0] self.load(value=value) return value
def __repr__(self): return f"<zfit.Parameter '{self.name}' floating={self.floating}>"
[docs]class BaseZParameter(ZfitParameterMixin, ComposedVariable, BaseParameter): pass
[docs]class BaseComposedParameter(BaseZParameter): def __init__(self, params, value, name="BaseComposedParameter", **kwargs): super().__init__(initial_value=value, name=name, params=params, **kwargs) # self.params = params def _get_dependents(self): dependents = self._extract_dependents(list(self.params.values())) return dependents @property def floating(self): raise LogicalUndefinedOperationError("Cannot be floating or not. Look at the dependents.") @property def params(self): return self._params @params.setter def params(self, value): if not isinstance(value, dict): raise TypeError("Parameters has to be a dict") self._params = value @property def independent(self): return False
[docs]class ConstantParameter(BaseZParameter): def __init__(self, name, value): super().__init__(name=name, initial_value=value, dtype=ztypes.float, params={}) @property def floating(self): return False @floating.setter def floating(self, value): raise LogicalUndefinedOperationError("Cannot set a fixed parameter.") @property def independent(self) -> bool: return False def _get_dependents(self) -> ztyping.DependentsType: return OrderedSet()
[docs]class ComposedParameter(BaseComposedParameter): def __init__(self, name, tensor, dtype=ztypes.float, **kwargs): tensor = ztf.convert_to_tensor(tensor, dtype=dtype) independent_params = tf.compat.v1.get_collection("zfit_independent") params = get_dependents_auto(tensor=tensor, candidates=independent_params) # params_init_op = [param.initializer for param in params] params = {p.name: p for p in params} # with tf.control_dependencies(params_init_op): super().__init__(params=params, value=tensor, name=name, dtype=dtype, **kwargs) def __repr__(self): return f"<zfit.{self.__class__.__name__} '{self.name}' dtype={self.dtype.name}>"
[docs]class ComplexParameter(ComposedParameter): def __init__(self, name, value, dtype=ztypes.complex, **kwargs): self._conj = None self._mod = None self._arg = None self._imag = None self._real = None super().__init__(name, value, dtype, **kwargs)
[docs] @staticmethod def from_cartesian(name, real, imag, dtype=ztypes.complex, floating=True, **kwargs): # TODO: correct dtype handling, also below real = convert_to_parameter(real, name=name + "_real", prefer_floating=floating) imag = convert_to_parameter(imag, name=name + "_imag", prefer_floating=floating) param = ComplexParameter(name=name, value=tf.cast(tf.complex(real, imag), dtype=dtype), **kwargs) param._real = real param._imag = imag return param
[docs] @staticmethod def from_polar(name, mod, arg, dtype=ztypes.complex, floating=True, **kwargs): mod = convert_to_parameter(mod, name=name + "_mod", prefer_floating=floating) arg = convert_to_parameter(arg, name=name + "_arg", prefer_floating=floating) param = ComplexParameter(name=name, value=tf.cast(tf.complex(mod * tf.math.cos(arg), mod * tf.math.sin(arg)), dtype=dtype), **kwargs) param._mod = mod param._arg = arg return param
@property def conj(self): if self._conj is None: self._conj = ComplexParameter(name='{}_conj'.format(self.name), value=tf.math.conj(self), dtype=self.dtype) return self._conj @property def real(self): real = self._real if real is None: real = ztf.to_real(self) return real @property def imag(self): imag = self._imag if imag is None: imag = tf.math.imag(tf.convert_to_tensor(value=self, dtype_hint=self.dtype)) # HACK tf bug #30029 return imag @property def mod(self): mod = self._mod if mod is None: mod = tf.math.abs(self) return mod @property def arg(self): arg = self._arg if arg is None: arg = tf.math.atan(self.imag / self.real) return arg
_auto_number = 0
[docs]def get_auto_number(): global _auto_number auto_number = _auto_number _auto_number += 1 return auto_number
[docs]def convert_to_parameter(value, name=None, prefer_floating=False) -> "ZfitParameter": """Convert a *numerical* to a fixed parameter or return if already a parameter. Args: value (): name (): prefer_floating: If True, create a Parameter instead of a FixedParameter _if possible_. """ floating = False is_python = False if name is not None: name = str(name) if isinstance(value, ZfitParameter): # TODO(Mayou36): autoconvert variable. TF 2.0? return value elif isinstance(value, tf.Variable): raise TypeError("Currently, cannot autoconvert tf.Variable to zfit.Parameter.") # convert to Tensor if not yet if not isinstance(value, tf.Tensor): is_python = True if isinstance(value, complex): value = ztf.to_complex(value) else: floating = prefer_floating value = ztf.to_real(value) if not run._enable_parameter_autoconversion: return value if value.dtype.is_complex: if name is None: name = "FIXED_complex_autoparam_" + str(get_auto_number()) value = ComplexParameter(name, value=value, floating=False) else: # value = Parameter("FIXED_autoparam_" + str(get_auto_number()), value=value, floating=False) if is_python: params = {} else: independend_params = tf.compat.v1.get_collection("zfit_independent") params = get_dependents_auto(tensor=value, candidates=independend_params) if params: if name is None: name = "composite_autoparam_" + str(get_auto_number()) value = ComposedParameter(name, tensor=value) else: if prefer_floating: name = "autoparam_" + str(get_auto_number()) if name is None else name value = Parameter(name=name, value=value) else: if name is None: name = "FIXED_autoparam_" + str(get_auto_number()) if name is None else name value = ConstantParameter(name, value=value) return value