Source code for zfit.core.basepdf

"""This  module defines the `BasePdf` that can be used to inherit from in order to build a custom PDF.

The `BasePDF` implements already a lot of ready-to-use functionality like integral, automatic normalization
and sampling.

Defining your own pdf

A simple example:
>>> import zfit
>>> import zfit.z.numpy as znp
>>> class MyGauss(BasePDF):
>>>     def __init__(self, mean, stddev, name="MyGauss"):
>>>         super().__init__(mean=mean, stddev=stddev, name=name)
>>>     def _unnormalized_pdf(self, x):
>>>         return znp.exp((x - mean) ** 2 / (2 * stddev**2))

Notice that *here* we only specify the *function* and no normalization. This
**No** attempt to **explicitly** normalize the function should be done inside `_unnormalized_pdf`.
The normalization is handled with another method depending on the normalization range specified.
(It *is* possible, though discouraged, to directly provide the *normalized probability* by overriding _pdf(), but
there are other, more convenient ways to add improvements like providing an analytical integrals.)

Before we create an instance, we need to create the variables to initialize it
>>> mean = zfit.Parameter("mean1", 2., 0.1, 4.2)  # signature as in RooFit: *name, initial, lower, upper*
>>> stddev = zfit.Parameter("stddev1", 5., 0.3, 10.)
Let's create an instance and some example data
>>> gauss = MyGauss(mean=mean, stddev=stddev)
>>> example_data = np.random.random(10)
Now we can get the probability
>>> probs = gauss.pdf(example_data)  # `norm` specifies over which range to normalize
Or the integral
>>> integral = gauss.integrate(limits=(-5, 3.1),norm=False)  # norm_range is False -> return unnormalized
Or directly sample from it
>>> sample = gauss.sample(n_draws=1000, limits=(-10, 10))  # draw 1000 samples within (-10, 10)

We can create an extended PDF, which will result in anything using a `norm_range` to not return the
probability but the number probability (the function will be normalized to `yield` instead of 1 inside
the `norm_range`)
>>> yield1 = Parameter("yield1", 100, 0, 1000)
>>> gauss_extended = gauss.create_extended(yield1)
>>> gauss.is_extended

>>> integral_extended = gauss.ext_integrate(limits=(-10, 10),norm=(-10, 10))  # yields approx 100

For more advanced methods and ways to register analytic integrals or overwrite certain methods, see
also the advanced models in `zfit models <>`_
#  Copyright (c) 2022 zfit

from __future__ import annotations

from typing import TYPE_CHECKING

    import zfit

import warnings
from contextlib import suppress

import tensorflow as tf

import zfit.z.numpy as znp
from zfit import z
from .basemodel import BaseModel
from .baseobject import extract_filter_params
from .interfaces import ZfitParameter, ZfitPDF
from .parameter import Parameter, convert_to_parameter
from .sample import extended_sampling
from .space import Space
from ..settings import run, ztypes
from ..util import ztyping
from ..util.cache import invalidate_graph
from ..util.deprecation import deprecated, deprecated_norm_range, deprecated_args
from ..util.exception import (
from ..util.temporary import TemporarilySet


def _BasePDF_register_check_support(has_support: bool):
    """Marks a method that the subclass either *has* to or *can't* use the `@supports` decorator.

        has_support: If True, flags that it **requires** the `@supports` decorator. If False,
            flags that the `@supports` decorator is **not allowed**.
    if not isinstance(has_support, bool):
        raise TypeError("Has to be boolean.")

    def register(func):
        """Register a method to be checked to (if True) *has* `support` or (if False) has *no* `support`.


        name = func.__name__
        _BasePDF_USER_IMPL_METHODS_TO_CHECK[name] = has_support
        func.__wrapped__ = _BasePDF_register_check_support
        return func

    return register

[docs]class BasePDF(ZfitPDF, BaseModel): def __init__( self, obs: ztyping.ObsTypeInput, params: dict[str, ZfitParameter] = None, dtype: type = ztypes.float, name: str = "BasePDF", extended=None, norm=None, **kwargs, ): super().__init__(obs=obs, dtype=dtype, name=name, params=params, **kwargs) self._yield = None self._norm = norm if extended not in (False, None): self._set_yield(extended) def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) cls._subclass_check_support( methods_to_check=_BasePDF_USER_IMPL_METHODS_TO_CHECK, wrapper_not_overwritten=_BasePDF_register_check_support, ) # @property # def space(self) -> "zfit.Space": # if self._norm_range is not None: # space = self._norm_range # else: # space = super().space # # return space def _check_input_norm(self, norm, none_is_error=False): if norm is None: norm = self.norm return super()._check_input_norm(norm=norm, none_is_error=none_is_error) def _check_input_params(self, *params): return tuple(convert_to_parameter(p) for p in params) def _func_to_integrate(self, x: ztyping.XType): return self.pdf(x, norm=False) def _func_to_sample_from(self, x): return self.pdf(x, norm=False) @property @deprecated(None, "Use the `norm` attribute instead.") def norm_range(self) -> Space | None | bool: """Return the current normalization range. If None and the `obs` have limits, they are returned. Returns: The current normalization range. """ return self.norm @property def norm(self) -> Space | None | bool: """Return the current normalization range. If None and the `obs` have limits, they are returned. Returns: The current normalization range. """ norm = self._norm if norm is None: norm = return norm
[docs] @invalidate_graph def set_norm_range(self, norm: ztyping.LimitsTypeInput): """Set the normalization range (temporarily if used with contextmanager). Args: norm: """ norm = self._check_input_norm(norm) def setter(value): self._norm = value def getter(): return self._norm return TemporarilySet(value=norm, setter=setter, getter=getter)
@_BasePDF_register_check_support(True) def _normalization(self, limits): raise SpecificFunctionNotImplemented
[docs] def normalization( self, limits: ztyping.LimitsType, *, options=None ) -> ztyping.XType: """Return the normalization of the function (usually the integral over `limits`). Args: * (): options (): limits: The limits on where to normalize over Returns: The normalization value """ if options is None: options = {} limits = self._check_input_limits(limits=limits) return self._single_hook_normalization(limits=limits)
def _single_hook_normalization(self, limits): # TODO(Mayou36): add yield? return self._hook_normalization(limits=limits) def _hook_normalization(self, limits): return self._call_normalization(limits=limits) # no _norm_* needed def _call_normalization(self, limits): # TODO: caching? alternative with suppress(FunctionNotImplemented): return self._normalization(limits=limits) return self._fallback_normalization(limits) def _fallback_normalization(self, limits): return self._hook_integrate(limits=limits, norm=False, options=None) def _unnormalized_pdf(self, x): raise SpecificFunctionNotImplemented
[docs] @deprecated(None, "Use `pdf(norm=False)` instead") def unnormalized_pdf(self, x: ztyping.XType) -> ztyping.XType: """PDF "unnormalized". Use `functions` for unnormalized pdfs. this is only for performance in special cases. Args: x: The value, have to be convertible to a Tensor Returns: 1-dimensional :py:class:`tf.Tensor` containing the unnormalized pdf. """ with self._convert_sort_x(x) as x: return self._single_hook_unnormalized_pdf(x)
def _single_hook_unnormalized_pdf(self, x): return self._call_unnormalized_pdf(x=x) def _call_unnormalized_pdf(self, x): # try: return self._unnormalized_pdf(x) # except ValueError as error: # raise ShapeIncompatibleError("Most probably, the number of obs the pdf was designed for" # "does not coincide with the `n_obs` from the `space`/`obs`" # "it received on initialization." # "Original Error: {}".format(error)) @z.function(wraps="model") @deprecated_norm_range def ext_pdf( self, x: ztyping.XTypeInput, norm: ztyping.LimitsTypeInput = None, *, norm_range=None, ) -> ztyping.XType: """Probability density function scaled by yield, normalized over `norm_range`. Args: x: `float` or `double` `Tensor`. norm: :py:class:`~zfit.Space` to normalize over Returns: :py:class:`tf.Tensor` of type `self.dtype`. """ assert norm_range is None if not self.is_extended: raise NotExtendedPDFError(f"{self} is not extended, cannot call `ext_pdf`") with self._convert_sort_x(x) as x: return self._call_ext_pdf(x, norm) def _call_ext_pdf(self, x, norm): with suppress(SpecificFunctionNotImplemented): return self._auto_ext_pdf(x, norm) # fallback return self.pdf(x=x, norm=norm) * self.get_yield() def _auto_ext_pdf(self, x, norm): try: probs = self._ext_pdf(x, norm) except NormNotImplemented: unnorm_probs = self._ext_pdf(x, False) normalization = self.normalization(norm) probs = unnorm_probs / normalization return probs @_BasePDF_register_check_support(True) def _ext_pdf(self, x, norm, *, norm_range=None): raise SpecificFunctionNotImplemented # TODO: implement properly @z.function(wraps="model") @deprecated_norm_range def ext_log_pdf( self, x: ztyping.XTypeInput, norm: ztyping.LimitsTypeInput = None, *, norm_range=None, ) -> ztyping.XType: """Log of probability density function scaled by yield, normalized over `norm_range`. Args: x: `float` or `double` `Tensor`. norm: :py:class:`~zfit.Space` to normalize over Returns: :py:class:`tf.Tensor` of type `self.dtype`. """ assert norm_range is None if not self.is_extended: raise NotExtendedPDFError(f"{self} is not extended, cannot call `ext_pdf`") with self._convert_sort_x(x) as x: return self._call_ext_log_pdf(x, norm) def _call_ext_log_pdf(self, x, norm): with suppress(SpecificFunctionNotImplemented): return self._auto_ext_log_pdf(x, norm) # fallback return self.log_pdf(x=x, norm=norm) + znp.log(self.get_yield()) def _auto_ext_log_pdf(self, x, norm): try: pdf = self._ext_log_pdf(x, norm) except NormNotImplemented: unnormed_pdf = self._ext_log_pdf(x, False) normalization = znp.log(self.normalization(norm)) pdf = unnormed_pdf - normalization return pdf @_BasePDF_register_check_support(True) def _ext_log_pdf(self, x, norm): raise SpecificFunctionNotImplemented @_BasePDF_register_check_support(True) def _pdf(self, x, norm, *, norm_range=None): raise SpecificFunctionNotImplemented @deprecated_norm_range @z.function(wraps="model") def pdf( self, x: ztyping.XTypeInput, norm: ztyping.LimitsTypeInput = None, *, norm_range=None, ) -> ztyping.XType: """Probability density function, normalized over `norm`. Args: norm (): x: `float` or `double` `Tensor`. norm: :py:class:`~zfit.Space` to normalize over Returns: :py:class:`tf.Tensor` of type `self.dtype`. """ assert norm_range is None norm = self._check_input_norm(norm, none_is_error=True) with self._convert_sort_x(x) as x: value = self._single_hook_pdf(x=x, norm=norm) if run.numeric_checks: z.check_numerics( value, message="Check if pdf output contains any NaNs of Infs" ) return znp.asarray(z.to_real(value)) def _single_hook_pdf(self, x, norm): return self._hook_pdf(x=x, norm=norm) def _hook_pdf(self, x, norm): return self._norm_pdf(x=x, norm=norm) def _norm_pdf(self, x, norm): return self._call_pdf(x=x, norm=norm) def _call_pdf(self, x, norm): with suppress(FunctionNotImplemented): return self._pdf(x, norm) with suppress(FunctionNotImplemented): return znp.exp(self._log_pdf(x, norm)) if self.is_extended: with suppress(FunctionNotImplemented): return ( self._ext_pdf(x, norm) / self.get_yield() ) # TODO: extend/refactor the calling return self._fallback_pdf(x, norm) def _fallback_pdf(self, x, norm): pdf = self._call_unnormalized_pdf(x) if norm.has_limits: pdf /= self._hook_normalization(limits=norm) return pdf @_BasePDF_register_check_support(False) @deprecated_norm_range def _log_pdf(self, x, norm): raise SpecificFunctionNotImplemented
[docs] @deprecated_norm_range def log_pdf( self, x: ztyping.XType, norm: ztyping.LimitsType = None, *, norm_range=None ) -> ztyping.XType: """Log probability density function normalized over `norm_range`. Args: x: `float` or `double` `Tensor`. norm: :py:class:`~zfit.Space` to normalize over Returns: A `Tensor` of type `self.dtype`. """ assert norm_range is None norm = self._check_input_norm(norm) with self._convert_sort_x(x) as x: return znp.asarray(z.to_real(self._single_hook_log_pdf(x=x, norm=norm)))
def _single_hook_log_pdf(self, x, norm): return self._hook_log_pdf(x=x, norm=norm) def _hook_log_pdf(self, x, norm): log_prob = self._norm_log_pdf(x=x, norm=norm) return log_prob def _norm_log_pdf(self, x, norm): return self._call_log_pdf(x=x, norm=norm) def _call_log_pdf(self, x, norm): with suppress(FunctionNotImplemented): return self._log_pdf(x, norm) with suppress(FunctionNotImplemented): return znp.log(self._pdf(x, norm)) return self._fallback_log_pdf(x, norm) def _fallback_log_pdf(self, x, norm): return znp.log(self._hook_pdf(x=x, norm=norm)) def gradient( self, x: ztyping.XType, norm: ztyping.LimitsType, params: ztyping.ParamsTypeOpt = None, ): raise BreakingAPIChangeError("Removed with 0.5.x: is this needed?") @z.function(wraps="model") @deprecated_norm_range def ext_integrate( self, limits: ztyping.LimitsType, norm: ztyping.LimitsType = None, *, norm_range=None, options=None, ) -> ztyping.XType: """Integrate the function over `limits` (normalized over `norm_range` if not False). Args: options (): limits: the limits to integrate over norm: the limits to normalize over or False to integrate the unnormalized probability Returns: The integral value as a scalar with shape () """ if options is None: options = {} assert norm_range is None norm = self._check_input_norm(norm) limits = self._check_input_limits(limits=limits) if not self.is_extended: raise NotExtendedPDFError(f"{self} is not extended, cannot call `ext_pdf`") return ( self.integrate(limits=limits, norm=norm, options=options) * self.get_yield() ) def _apply_yield( self, value: float, norm: ztyping.LimitsType, log: bool ) -> float | tf.Tensor: if self.is_extended and not norm.limits_are_false: if log: value += znp.log(self.get_yield()) else: value *= self.get_yield() return value
[docs] def apply_yield( self, value: float | tf.Tensor, norm: ztyping.LimitsTypeInput = False, log: bool = False, ) -> float | tf.Tensor: """If a norm_range is given, the value will be multiplied by the yield. Args: value: norm: log: Returns: Numerical """ norm = self._check_input_norm() return self._apply_yield(value=value, norm=norm, log=log)
@deprecated(None, "Use the public `set_yield` instead.") def _set_yield_inplace(self, value: ZfitParameter | float | None): """Make the model extended by setting a yield. This does not alter the general behavior of the PDF. If there is a `norm_range` given, the output of the above functions does not represent a normalized probability density function anymore but corresponds to a number probability. Args: value: """ self._set_yield(value=value)
[docs] def create_extended( self, yield_: ztyping.ParamTypeInput, name_addition="_extended" ) -> ZfitPDF: """Return an extended version of this pdf with yield `yield_`. The parameters are shared. Args: yield_: name_addition: Returns: :py:class:`~zfit.core.interfaces.ZfitPDF` """ # TODO(Mayou36): fix copy from zfit.models.functor import ProductPDF if isinstance(self, ProductPDF): warnings.warn( "As `copy` is not yet properly implemented, this may fails (for ProductPDF for example?). This" "will be fixed in the future." ) if self.is_extended: raise AlreadyExtendedPDFError( "This PDF is already extended, cannot create an extended one." ) new_pdf = self.copy( + str(name_addition)) new_pdf.set_yield(value=yield_) return new_pdf
[docs] def set_yield(self, value): """Make the model extended by setting a yield. If possible, prefer to use `create_extended`. This does not alter the general behavior of the PDF. The `pdf` and `integrate` and similar methods will continue to return the same - normalized to 1 - values. However, not only can this parameter be accessed via `get_yield`, the methods `ext_pdf` and `ext_integral` provide a version of `pdf` and `integrate` respecetively that is multiplied by the yield. These can be useful for plotting and for binned likelihoods. Args: value (): """ self._set_yield(value=value)
def _set_yield(self, value: ztyping.ParamTypeInput): if value is None: raise BreakingAPIChangeError("Cannot unset a yield (anymore).") if self.is_extended: raise AlreadyExtendedPDFError(f"Cannot extend {self}, is already extended.") value = convert_to_parameter(value) self.add_cache_deps(value) self._yield = value @property def is_extended(self) -> bool: """Flag to tell whether the model is extended or not. Returns: A boolean. """ return self._yield is not None def _hook_sample(self, limits, n): if n is None and self.is_extended: n = "extended" if isinstance(n, str) and n == "extended": if not self.is_extended: raise NotExtendedPDFError( "Cannot use 'extended' as value for `n` on a non-extended pdf." ) samples = extended_sampling(pdfs=self, limits=limits) elif isinstance(n, str): raise ValueError( "`n` is a string and not 'extended'. Other options are currently not implemented." ) elif n is None: raise tf.errors.InvalidArgumentError( "`n` cannot be `None` if pdf is not extended." ) else: samples = super()._hook_sample(limits=limits, n=n) return samples
[docs] def get_yield(self) -> Parameter | None: """Return the yield (only for extended models). Returns: The yield of the current model or None """ # if not self.is_extended: # raise zexception.ExtendedPDFError("PDF is not extended, cannot get yield.") return self._yield
def _get_params( self, floating: bool | None = True, is_yield: bool | None = None, extract_independent: bool | None = True, ) -> set[ZfitParameter]: params = super()._get_params( floating, is_yield=is_yield, extract_independent=extract_independent ) if is_yield is not False: if self.is_extended: yield_params = extract_filter_params( self.get_yield(), floating=floating, extract_independent=extract_independent, ) yield_params.update(params) # putting the yields at the beginning params = yield_params elif is_yield is True: raise NotExtendedPDFError( "PDF is not extended but only yield parameters were requested." ) return params
[docs] @deprecated_args(None, "Use `limits` instead.", "limits_to_integrate") def create_projection_pdf( self, limits: ztyping.LimitsTypeInput, *, options=None, limits_to_integrate=None ) -> ZfitPDF: """Create a PDF projection by integrating out some of the dimensions. The new projection pdf is still fully dependent on the pdf it was created with. Args: * (): options (): limits: Returns: A pdf without the dimensions from `limits_to_integrate`. """ from ..models.special import SimpleFunctorPDF if limits_to_integrate is not None: limits = limits_to_integrate def partial_integrate_wrapped(self_simple, x): return self.partial_integrate(x, limits=limits, options=options) new_pdf = SimpleFunctorPDF( obs=[obs for obs in self.obs if obs not in limits.obs] ), pdfs=(self,), func=partial_integrate_wrapped, ) return new_pdf
[docs] def copy(self, **override_parameters) -> BasePDF: """Creates a copy of the model. Note: the copy model may continue to depend on the original initialization arguments. Args: **override_parameters: String/value dictionary of initialization arguments to override with new value. Returns: A new instance of `type(self)` initialized from the union of self.parameters and override_parameters, i.e., `dict(self.parameters, **override_parameters)`. """ obs = self.norm # HACK(Mayou36): remove once copy is proper implemented from ..models.dist_tfp import WrapDistribution from ..models.kde import GaussianKDE1DimV1 from ..models.polynomials import RecursivePolynomial if ( type(self) == WrapDistribution ): # NOT isinstance! Because e.g. Gauss wraps that and takes different args parameters = dict( distribution=self._distribution, dist_params=self.dist_params ) else: # HACK END parameters = dict(self.params) lam = parameters.pop("lambda", None) if lam is not None: parameters["lam"] = lam if type(self) == GaussianKDE1DimV1: raise RuntimeError( "Cannot copy `GaussianKDE1DimV1` (yet). If you tried to make it extended, use " "`set_yield`" " instead and set it inplace." ) parameters["data"] = self._original_data # HACK(Mayou36): copy the polynomial correct, replace 'c_0' with coeff0/coeff_0 or similar if isinstance(self, RecursivePolynomial): parameters["coeff0"] = parameters.pop("c_0", None) coeffs = [] i_coeff = 1 # collect coeffs and convert to 'coeff' list while True: coeff_name = f"c_{i_coeff}" try: coeff = parameters.pop(coeff_name) except KeyError: break else: coeffs.append(coeff) i_coeff += 1 parameters["coeffs"] = coeffs from zfit.models.functor import BaseFunctor, SumPDF if isinstance(self, BaseFunctor): parameters = {} if isinstance(self, SumPDF): fracs = self.fracs if not self.is_extended: fracs = fracs[:-1] parameters.update(fracs=fracs) parameters.update(pdfs=self.pdfs) parameters.update(obs=obs, parameters.update(**override_parameters) # if hasattr(self, "distribution"): # parameters.update(distribution=self.distribution) yield_ = parameters.pop("yield", None) new_instance = type(self)(**parameters) if yield_ is not None: new_instance.set_yield(yield_) return new_instance
[docs] @deprecated_norm_range def as_func(self, norm: ztyping.LimitsType = False, *, norm_range=None): """Return a `Function` with the function `model(x, norm=norm)`. Args: norm: """ from .operations import convert_pdf_to_func # prevent circular import return convert_pdf_to_func(pdf=self, norm=norm)
def __str__(self): return ( "zfit.model.{type_name}(" '"{self_name}"' ", dtype={dtype})".format( type_name=type(self).__name__,,, ) )