Source code for zfit.core.basepdf

"""
This  module defines the `BasePdf` that can be used to inherit from in order to build a custom PDF.

The `BasePDF` implements already a lot of ready-to-use functionality like integral, automatic normalization
and sampling.

Defining your own pdf
---------------------

A simple example:
>>> class MyGauss(BasePDF):
>>>     def __init__(self, mean, stddev, name="MyGauss"):
>>>         super().__init__(mean=mean, stddev=stddev, name=name)
>>>
>>>     def _unnormalized_pdf(self, x):
>>>         return tf.exp((x - mean) ** 2 / (2 * stddev**2))

Notice that *here* we only specify the *function* and no normalization. This
**No** attempt to **explicitly** normalize the function should be done inside `_unnormalized_pdf`.
The normalization is handled with another method depending on the normalization range specified.
(It *is* possible, though discouraged, to directly provide the *normalized probability* by overriding _pdf(), but
there are other, more convenient ways to add improvements like providing an analytical integrals.)

Before we create an instance, we need to create the variables to initialize it
>>> mean = zfit.Parameter("mean1", 2., 0.1, 4.2)  # signature as in RooFit: *name, initial, lower, upper*
>>> stddev = zfit.Parameter("stddev1", 5., 0.3, 10.)
Let's create an instance and some example data
>>> gauss = MyGauss(mean=mean, stddev=stddev)
>>> example_data = np.random.random(10)
Now we can get the probability
>>> probs = gauss.pdf(x=example_data, norm_range=(-30., 30))  # `norm_range` specifies over which range to normalize
Or the integral
>>> integral = gauss.integrate(limits=(-5, 3.1), norm_range=False)  # norm_range is False -> return unnormalized
integral
Or directly sample from it
>>> sample = gauss.sample(n_draws=1000, limits=(-10, 10))  # draw 1000 samples within (-10, 10)

We can create an extended PDF, which will result in anything using a `norm_range` to not return the
probability but the number probability (the function will be normalized to `yield` instead of 1 inside
the `norm_range`)
>>> yield1 = Parameter("yield1", 100, 0, 1000)
>>> gauss_extended = gauss.create_extended(yield1)
>>> gauss.is_extended
True

>>> integral_extended = gauss.integrate(limits=(-10, 10), norm_range=(-10, 10))  # yields approx 100

For more advanced methods and ways to register analytic integrals or overwrite certain methods, see
also the advanced tutorials in `zfit tutorials <https://github.com/zfit/zfit-tutorials>`_
"""

#  Copyright (c) 2020 zfit

import abc
import warnings
from contextlib import suppress
from typing import Union, Type, Dict

import tensorflow as tf

from zfit import z
from zfit.core.sample import extended_sampling
from zfit.util.cache import invalidates_cache
from .basemodel import BaseModel
from .interfaces import ZfitPDF, ZfitParameter
from .limits import Space
from .parameter import Parameter, convert_to_parameter
from ..settings import ztypes, run
from ..util import ztyping
from ..util.container import convert_to_container
from ..util.exception import (AlreadyExtendedPDFError,
                              NotExtendedPDFError, )
from ..util.temporary import TemporarilySet

_BasePDF_USER_IMPL_METHODS_TO_CHECK = {}


def _BasePDF_register_check_support(has_support: bool):
    """Marks a method that the subclass either *has* to or *can't* use the `@supports` decorator.

    Args:
        has_support (bool): If True, flags that it **requires** the `@supports` decorator. If False,
            flags that the `@supports` decorator is **not allowed**.

    """
    if not isinstance(has_support, bool):
        raise TypeError("Has to be boolean.")

    def register(func):
        """Register a method to be checked to (if True) *has* `support` or (if False) has *no* `support`.

        Args:
            func (function):

        Returns:
            function:
        """
        name = func.__name__
        _BasePDF_USER_IMPL_METHODS_TO_CHECK[name] = has_support
        func.__wrapped__ = _BasePDF_register_check_support
        return func

    return register


[docs]class BasePDF(ZfitPDF, BaseModel): def __init__(self, obs: ztyping.ObsTypeInput, params: Dict[str, ZfitParameter] = None, dtype: Type = ztypes.float, name: str = "BasePDF", **kwargs): super().__init__(obs=obs, dtype=dtype, name=name, params=params, **kwargs) self._yield = None self._temp_yield = None self._norm_range = None self._normalization_value = None def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) cls._subclass_check_support(methods_to_check=_BasePDF_USER_IMPL_METHODS_TO_CHECK, wrapper_not_overwritten=_BasePDF_register_check_support) @property def space(self) -> "zfit.Space": if self._norm_range is not None: space = self._norm_range else: space = super().space return space def _check_input_norm_range(self, norm_range, caller_name="", none_is_error=True): if norm_range is None: norm_range = self.norm_range return super()._check_input_norm_range(norm_range=norm_range, caller_name=caller_name, none_is_error=none_is_error) def _check_input_params(self, *params): return tuple(convert_to_parameter(p) for p in params) def _func_to_integrate(self, x: ztyping.XType): return self.unnormalized_pdf(x) def _func_to_sample_from(self, x): return self.unnormalized_pdf(x) def _single_hook_integrate(self, limits, norm_range, name='hook_integrate'): integral = super()._single_hook_integrate(limits=limits, norm_range=norm_range, name=name) integral = self.apply_yield(integral, norm_range=norm_range) return integral def _single_hook_analytic_integrate(self, limits, norm_range, name='hook_analytic_integrate'): integral = super()._single_hook_analytic_integrate(limits=limits, norm_range=norm_range, name=name) integral = self.apply_yield(integral, norm_range=norm_range) return integral def _single_hook_numeric_integrate(self, limits, norm_range, name='hook_numeric_integrate'): numeric_integral = super()._single_hook_numeric_integrate(limits=limits, norm_range=norm_range, name=name) numeric_integral = self.apply_yield(numeric_integral, norm_range=norm_range) return numeric_integral def _single_hook_partial_integrate(self, x, limits, norm_range, name='hook_partial_integrate'): partial_integral = super()._single_hook_partial_integrate(x=x, limits=limits, norm_range=norm_range, name=name) partial_integral = self.apply_yield(partial_integral, norm_range=norm_range) return partial_integral def _single_hook_partial_analytic_integrate(self, x, limits, norm_range, name='hook_partial_analytic_integrate'): part_analytic_int = super()._single_hook_partial_analytic_integrate(x=x, limits=limits, norm_range=norm_range, name=name) part_analytic_int = self.apply_yield(part_analytic_int, norm_range=norm_range) return part_analytic_int def _single_hook_partial_numeric_integrate(self, x, limits, norm_range, name='hook_partial_numeric_integrate'): part_numeric_int = super()._single_hook_partial_numeric_integrate(x=x, limits=limits, norm_range=norm_range, name=name) part_numeric_int = self.apply_yield(part_numeric_int, norm_range=norm_range) return part_numeric_int @property def norm_range(self) -> Union[Space, None, bool]: """Return the current normalization range. If None and the `obs`have limits, they are returned. Returns: :py:class:`~zfit.Space` or None: The current normalization range """ norm_range = self._norm_range if norm_range is None: norm_range = self.space return norm_range
[docs] @invalidates_cache def set_norm_range(self, norm_range: ztyping.LimitsTypeInput): """Set the normalization range (temporarily if used with contextmanager). Args: norm_range (tuple, :py:class:`~zfit.Space`): """ norm_range = self._check_input_norm_range(norm_range=norm_range) def setter(value): self._norm_range = value def getter(): return self._norm_range return TemporarilySet(value=norm_range, setter=setter, getter=getter)
@property def _yield(self): """For internal use, the yield or None""" return self.params.get('yield') @_yield.setter def _yield(self, value): if value is None: # unset self._params.pop('yield', None) # safely remove if still there else: self._params['yield'] = value @_BasePDF_register_check_support(True) def _normalization(self, limits): raise NotImplementedError
[docs] def normalization(self, limits: ztyping.LimitsType, name: str = "normalization") -> ztyping.XType: """Return the normalization of the function (usually the integral over `limits`). Args: limits (tuple, :py:class:`~zfit.Space`): The limits on where to normalize over name (str): Returns: Tensor: the normalization value """ limits = self._check_input_limits(limits=limits, caller_name=name) return self._single_hook_normalization(limits=limits, name=name)
def _single_hook_normalization(self, limits, name): # TODO(Mayou36): add yield? return self._hook_normalization(limits=limits, name=name) def _hook_normalization(self, limits, name="_hook_normalization"): return self._call_normalization(limits=limits, name=name) # no _norm_* needed def _call_normalization(self, limits, name): # TODO: caching? alternative with self._name_scope(name, values=[limits]): with suppress(NotImplementedError): return self._normalization(limits=limits) return self._fallback_normalization(limits) def _fallback_normalization(self, limits): return self._hook_integrate(limits=limits, norm_range=False) @abc.abstractmethod def _unnormalized_pdf(self, x): raise NotImplementedError
[docs] def unnormalized_pdf(self, x: ztyping.XType, component_norm_range: ztyping.LimitsTypeInput = None, name: str = "unnormalized_pdf") -> ztyping.XType: """PDF "unnormalized". Use `functions` for unnormalized pdfs. this is only for performance in special cases. Args: x (numerical): The value, have to be convertible to a Tensor component_norm_range (:py:class:`~zfit.Space`): The normalization range for the components. Needed for certain composition pdfs. name (str): Returns: :py:class:`tf.Tensor`: 1-dimensional :py:class:`tf.Tensor` containing the unnormalized pdf. """ # if component_norm_range is None: # component_norm_range = self._get with self._convert_sort_x(x) as x: component_norm_range = self._check_input_norm_range(component_norm_range, caller_name=name, none_is_error=False) return self._single_hook_unnormalized_pdf(x, component_norm_range, name)
def _single_hook_unnormalized_pdf(self, x, component_norm_range, name): return self._call_unnormalized_pdf(x=x, name=name) def _call_unnormalized_pdf(self, x, name): with self._name_scope(name, values=[x]): # try: return self._unnormalized_pdf(x) # except ValueError as error: # raise ShapeIncompatibleError("Most probably, the number of obs the pdf was designed for" # "does not coincide with the `n_obs` from the `space`/`obs`" # "it received on initialization." # "Original Error: {}".format(error)) @_BasePDF_register_check_support(False) def _pdf(self, x, norm_range): raise NotImplementedError # @func_simple @z.function def pdf(self, x: ztyping.XTypeInput, norm_range: ztyping.LimitsTypeInput = None, name: str = "model") -> ztyping.XType: """Probability density function, normalized over `norm_range`. Args: x (numerical): `float` or `double` `Tensor`. norm_range (tuple, :py:class:`~zfit.Space`): :py:class:`~zfit.Space` to normalize over name (str): Prepended to names of ops created by this function. Returns: :py:class:`tf.Tensor` of type `self.dtype`. """ norm_range = self._check_input_norm_range(norm_range, caller_name=name, none_is_error=True) with self._convert_sort_x(x) as x: value = self._single_hook_pdf(x=x, norm_range=norm_range, name=name) if run.numeric_checks: assert_op = z.check_numerics(value, message="Check if pdf output contains any NaNs of Infs") assert_op = [assert_op] else: assert_op = [] with tf.control_dependencies(assert_op): return z.to_real(value) def _single_hook_pdf(self, x, norm_range, name): return self._hook_pdf(x=x, norm_range=norm_range, name=name) def _hook_pdf(self, x, norm_range, name="_hook_pdf"): return self._norm_pdf(x=x, norm_range=norm_range, name=name) def _norm_pdf(self, x, norm_range, name='norm_pdf'): return self._call_pdf(x=x, norm_range=norm_range, name=name) def _call_pdf(self, x, norm_range, name): with self._name_scope(name, values=[x, norm_range]): with suppress(NotImplementedError): return self._pdf(x, norm_range=norm_range) with suppress(NotImplementedError): return tf.exp(self._log_pdf(x=x, norm_range=norm_range)) return self._fallback_pdf(x=x, norm_range=norm_range) def _fallback_pdf(self, x, norm_range): pdf = self._call_unnormalized_pdf(x, name="_call_unnormalized_pdf") if norm_range.limits is not False: # identity check! pdf /= self._hook_normalization(limits=norm_range) return pdf @_BasePDF_register_check_support(False) def _log_pdf(self, x, norm_range): raise NotImplementedError
[docs] def log_pdf(self, x: ztyping.XType, norm_range: ztyping.LimitsType = None, name: str = "log_pdf") -> ztyping.XType: """Log probability density function normalized over `norm_range`. Args: x (numerical): `float` or `double` `Tensor`. norm_range (tuple, :py:class:`~zfit.Space`): :py:class:`~zfit.Space` to normalize over name (str): Prepended to names of ops created by this function. Returns: log_pdf: a `Tensor` of type `self.dtype`. """ norm_range = self._check_input_norm_range(norm_range, caller_name=name) with self._convert_sort_x(x) as x: return self._single_hook_log_pdf(x=x, norm_range=norm_range, name=name)
def _single_hook_log_pdf(self, x, norm_range, name): return self._hook_log_pdf(x=x, norm_range=norm_range, name=name) def _hook_log_pdf(self, x, norm_range, name): log_prob = self._norm_log_pdf(x=x, norm_range=norm_range, name=name) return log_prob def _norm_log_pdf(self, x, norm_range, name='norm_log_pdf'): return self._call_log_pdf(x=x, norm_range=norm_range, name=name) def _call_log_pdf(self, x, norm_range, name): with self._name_scope(name, values=[x, norm_range]): with suppress(NotImplementedError): return self._log_pdf(x=x, norm_range=norm_range) with suppress(NotImplementedError): return tf.math.log(self._pdf(x=x, norm_range=norm_range)) return self._fallback_log_pdf(x=x, norm_range=norm_range) def _fallback_log_pdf(self, x, norm_range): return tf.math.log(self._hook_pdf(x=x, norm_range=norm_range))
[docs] def gradients(self, x: ztyping.XType, norm_range: ztyping.LimitsType, params: ztyping.ParamsTypeOpt = None): warnings.warn("Taking the gradient *this way* in TensorFlow is inefficient! Consider taking it with" "respect to the loss function.") if params is not None: params = convert_to_container(params) if params is None or isinstance(params[0], str): params = self.get_params(only_floating=False, names=params) probs = self.pdf(x, norm_range=norm_range) gradients = [tf.gradients(ys=prob, xs=params) for prob in z.unstack_x(probs, always_list=True)] return tf.stack(gradients)
def _apply_yield(self, value: float, norm_range: ztyping.LimitsType, log: bool) -> Union[float, tf.Tensor]: if self.is_extended and norm_range.limits is not False: if log: value += tf.math.log(self.get_yield()) else: value *= self.get_yield() return value
[docs] def apply_yield(self, value: Union[float, tf.Tensor], norm_range: ztyping.LimitsTypeInput = False, log: bool = False) -> Union[float, tf.Tensor]: """If a norm_range is given, the value will be multiplied by the yield. Args: value (numerical): norm_range (): log (bool): Returns: numerical """ norm_range = self._check_input_norm_range(norm_range=norm_range) return self._apply_yield(value=value, norm_range=norm_range, log=log)
@invalidates_cache def _set_yield_inplace(self, value: Union[ZfitParameter, float, None]): """Make the model extended by (temporarily) setting a yield. This alters the behavior of `model` and similar and `integrate` and similar. If there is a `norm_range` given, the output of the above functions does not represent a normalized probability density function anymore but corresponds to a number probability. Args: value (): """ # TODO(Mayou36): check input for yield? def setter(value): self._set_yield(value=value) def getter(): return self.get_yield() return TemporarilySet(value=value, setter=setter, getter=getter)
[docs] def create_extended(self, yield_: ztyping.ParamTypeInput, name_addition="_extended") -> "ZfitPDF": """Return an extended version of this pdf with yield `yield_`. The parameters are shared. Args: yield_ (numeric, :py:class:`~zfit.Parameter`): name_addition (str): Returns: :py:class:`~zfit.core.interfaces.ZfitPDF` """ # TODO(Mayou36): fix copy from zfit.models.functor import ProductPDF if isinstance(self, ProductPDF): warnings.warn( "As `copy` is not yet properly implemented, this may fails (for ProductPDF for example?). This" "will be fixed in the future.") if self.is_extended: raise AlreadyExtendedPDFError("This PDF is already extended, cannot create an extended one.") new_pdf = self.copy(name=self.name + str(name_addition)) new_pdf._set_yield_inplace(value=yield_) return new_pdf
def _set_yield(self, value: Union[Parameter, None]): if value is not None: value = convert_to_parameter(value) self._yield = value @property def is_extended(self) -> bool: """Flag to tell whether the model is extended or not. Returns: bool: """ return self._yield is not None def _hook_sample(self, limits, n, name='hook_sample'): if n is None and self.is_extended: n = 'extended' if isinstance(n, str) and n == 'extended': if not self.is_extended: raise NotExtendedPDFError("Cannot use 'extended' as value for `n` on a non-extended pdf.") samples = extended_sampling(pdfs=self, limits=limits) elif isinstance(n, str): raise ValueError("`n` is a string and not 'extended'. Other options are currently not implemented.") elif n is None: raise tf.errors.InvalidArgumentError("`n` cannot be `None` if pdf is not extended.") else: samples = super()._hook_sample(limits=limits, n=n, name=name) return samples
[docs] def get_yield(self) -> Union[Parameter, None]: """Return the yield (only for extended models). Returns: :py:class:`~zfit.Parameter`: the yield of the current model or None """ # if not self.is_extended: # raise zexception.ExtendedPDFError("PDF is not extended, cannot get yield.") return self._yield
[docs] def create_projection_pdf(self, limits_to_integrate: ztyping.LimitsTypeInput) -> 'ZfitPDF': """Create a PDF projection by integrating out some of the dimensions. The new projection pdf is still fully dependent on the pdf it was created with. Args: limits_to_integrate (:py:class:`~zfit.Space`): Returns: ZfitPDF: a pdf without the dimensions from `limits_to_integrate`. """ from ..models.special import SimpleFunctorPDF def partial_integrate_wrapped(self_simple, x): norm_range = self_simple._get_component_norm_range() if norm_range not in (None, False) and norm_range.limits not in (None, False): from zfit.models.functor import BaseFunctor if isinstance(self, BaseFunctor): self._set_component_norm_range(norm_range) return self.partial_integrate(x, limits=limits_to_integrate, norm_range=False) new_pdf = SimpleFunctorPDF(obs=self.space.get_subspace(obs=[obs for obs in self.obs if obs not in limits_to_integrate.obs]), pdfs=(self,), func=partial_integrate_wrapped) return new_pdf
[docs] def copy(self, **override_parameters) -> 'BasePDF': """Creates a copy of the model. Note: the copy model may continue to depend on the original initialization arguments. Args: **override_parameters: String/value dictionary of initialization arguments to override with new value. Returns: model: A new instance of `type(self)` initialized from the union of self.parameters and override_parameters, i.e., `dict(self.parameters, **override_parameters)`. """ obs = self.norm_range # if obs.limits is None: # obs = self.space # HACK(Mayou36): remove once copy is proper implemented from ..models.dist_tfp import WrapDistribution from ..models.polynomials import RecursivePolynomial if type(self) == WrapDistribution: # NOT isinstance! Because e.g. Gauss wraps that and takes different args parameters = dict(distribution=self._distribution, dist_params=self.dist_params) else: # HACK END parameters = dict(self.params) lambda_ = parameters.pop('lambda', None) if lambda_ is not None: parameters['lambda_'] = lambda_ # HACK(Mayou36): copy the polynomial correct, replace 'c_0' with coeff0/coeff_0 or similar if isinstance(self, RecursivePolynomial): parameters['coeff0'] = parameters.pop('c_0', None) coeffs = [] i_coeff = 1 # collect coeffs and convert to 'coeff' list while True: coeff_name = f'c_{i_coeff}' try: coeff = parameters.pop(coeff_name) except KeyError: break else: coeffs.append(coeff) i_coeff += 1 parameters['coeffs'] = coeffs from zfit.models.functor import BaseFunctor, SumPDF if isinstance(self, BaseFunctor): parameters = {} if isinstance(self, SumPDF): fracs = self.fracs if not self.is_extended: fracs = fracs[:-1] parameters.update(fracs=fracs) parameters.update(pdfs=self.pdfs) parameters.update(obs=obs, name=self.name) parameters.update(**override_parameters) # if hasattr(self, "distribution"): # parameters.update(distribution=self.distribution) yield_ = parameters.pop('yield', None) new_instance = type(self)(**parameters) if yield_ is not None: new_instance._set_yield_inplace(yield_) return new_instance
[docs] def as_func(self, norm_range: ztyping.LimitsType = False): """Return a `Function` with the function `model(x, norm_range=norm_range)`. Args: norm_range (): """ from .operations import convert_pdf_to_func # prevent circular import return convert_pdf_to_func(pdf=self, norm_range=norm_range)
def __str__(self): return ("zfit.model.{type_name}(" "\"{self_name}\"" ", dtype={dtype})".format( type_name=type(self).__name__, self_name=self.name, dtype=self.dtype.name))