Source code for zfit.models.basic

"""
Basic PDFs are provided here. Gauss, exponential... that can be used together with Functors to
build larger models.
"""

#  Copyright (c) 2019 zfit

import math as mt
from typing import Type, Any
import warnings

import numpy as np
import tensorflow as tf



from zfit import z
from ..util.exception import WorkInProgressError
from ..util.temporary import TemporarilySet
from ..settings import ztypes
from ..util import ztyping
from ..core.limits import Space, ANY_LOWER, ANY_UPPER
from ..core.basepdf import BasePDF

infinity = mt.inf


[docs]class CustomGaussOLD(BasePDF): def __init__(self, mu, sigma, obs, name="Gauss"): super().__init__(name=name, obs=obs, params=dict(mu=mu, sigma=sigma)) def _unnormalized_pdf(self, x): x = x.unstack_x() mu = self.params['mu'] sigma = self.params['sigma'] gauss = tf.exp(- 0.5 * tf.square((x - mu) / sigma)) return gauss
def _gauss_integral_from_inf_to_inf(limits, params, model): return tf.sqrt(2 * z.pi) * params['sigma'] CustomGaussOLD.register_analytic_integral(func=_gauss_integral_from_inf_to_inf, limits=Space.from_axes(limits=(-infinity, infinity), axes=(0,)))
[docs]class Exponential(BasePDF): _N_OBS = 1 def __init__(self, lambda_, obs: ztyping.ObsTypeInput, name: str = "Exponential", **kwargs): """Exponential function exp(lambda * x). The function is normalized over a finite range and therefore a pdf. So the PDF is precisely defined as :math:`\\frac{ e^{\\lambda \\cdot x}}{ \\int_{lower}^{upper} e^{\\lambda \\cdot x} dx}` Args: lambda_ (:py:class:`~zfit.Parameter`): Accessed as parameter "lambda". obs (:py:class:`~zfit.Space`): The :py:class:`~zfit.Space` the pdf is defined in. name (str): Name of the pdf. dtype (DType): """ params = {'lambda': lambda_} super().__init__(obs, name=name, params=params, **kwargs) self._numerics_data_shift = None def _unnormalized_pdf(self, x): lambda_ = self.params['lambda'] x = x.unstack_x() return self._numerics_shifted_exp(x=x, lambda_=lambda_) # Don't use exp! will overflow. def _numerics_shifted_exp(self, x, lambda_): # needed due to overflow in exp otherwise, prevents by shift return z.exp(lambda_ * (x - self._numerics_data_shift)) def _set_numerics_data_shift(self, limits): lower, upper = limits.limits lower_val = min([lim[0] for lim in lower]) upper_val = max([lim[0] for lim in upper]) value = (upper_val + lower_val) / 2 if max(abs(lower_val - value), abs(upper_val - value)) > 710: warnings.warn( "Boundaries can be too wide for exponential (assuming lambda ~ 1), expect `inf` in exp(x) and `NaN`s." "(upper - lower) * lambda should be smaller than 1400 roughly", category=RuntimeWarning) def setter(value): self._numerics_data_shift = value def getter(): return self._numerics_data_shift return TemporarilySet(value=value, getter=getter, setter=setter) # All hooks are needed to set the right shift when "entering" the pdf. The norm range is taken where both are # available. No special need needs to be taken for sampling (it samples from the correct region, the limits, and # uses the predictions by the `unnormalized_prob` -> that is shifted correctly def _single_hook_integrate(self, limits, norm_range, name='_hook_integrate'): with self._set_numerics_data_shift(limits=limits): return super()._single_hook_integrate(limits, norm_range, name) def _single_hook_analytic_integrate(self, limits, norm_range, name="_hook_analytic_integrate"): with self._set_numerics_data_shift(limits=limits): return super()._single_hook_analytic_integrate(limits, norm_range, name) def _single_hook_numeric_integrate(self, limits, norm_range, name='_hook_numeric_integrate'): with self._set_numerics_data_shift(limits=limits): return super()._single_hook_numeric_integrate(limits, norm_range, name) def _single_hook_partial_integrate(self, x, limits, norm_range, name='_hook_partial_integrate'): with self._set_numerics_data_shift(limits=limits): return super()._single_hook_partial_integrate(x, limits, norm_range, name) def _single_hook_partial_analytic_integrate(self, x, limits, norm_range, name='_hook_partial_analytic_integrate'): with self._set_numerics_data_shift(limits=limits): return super()._single_hook_partial_analytic_integrate(x, limits, norm_range, name) def _single_hook_partial_numeric_integrate(self, x, limits, norm_range, name='_hook_partial_numeric_integrate'): with self._set_numerics_data_shift(limits=limits): return super()._single_hook_partial_numeric_integrate(x, limits, norm_range, name) def _single_hook_normalization(self, limits, name): with self._set_numerics_data_shift(limits=limits): return super()._single_hook_normalization(limits, name) def _single_hook_unnormalized_pdf(self, x, component_norm_range, name): if component_norm_range.limits is not None: with self._set_numerics_data_shift(limits=component_norm_range): return super()._single_hook_unnormalized_pdf(x, component_norm_range, name) else: return super()._single_hook_unnormalized_pdf(x, component_norm_range, name) def _single_hook_pdf(self, x, norm_range, name): with self._set_numerics_data_shift(limits=norm_range): return super()._single_hook_pdf(x, norm_range, name) def _single_hook_log_pdf(self, x, norm_range, name): with self._set_numerics_data_shift(limits=norm_range): return super()._single_hook_log_pdf(x, norm_range, name) def _single_hook_sample(self, n, limits, name): with self._set_numerics_data_shift(limits=limits): return super()._single_hook_sample(n, limits, name)
# def _log_pdf(self, x, norm_range: Space): # lambda_ = self.params['lambda'] # x = z.unstack_x(x) # func = x * lambda_ # if norm_range.n_limits > 1: # raise DueToLazynessNotImplementedError( # "Not implemented, it's more of a hack. I Should implement log_pdf and " # "norm probarly") # (lower,), (upper,) = norm_range.limits # lower = lower[0] # upper = upper[0] # # assert False, "WIP, add log integral" def _exp_integral_from_any_to_any(limits, params, model): lambda_ = params['lambda'] def raw_integral(x): return model._numerics_shifted_exp(x=x, lambda_=lambda_) / lambda_ # needed due to overflow in exp otherwise (lower,), (upper,) = limits.limits if lower[0] == - upper[0] == np.inf: raise NotImplementedError lower_int = raw_integral(x=z.constant(lower)) upper_int = raw_integral(x=z.constant(upper)) return (upper_int - lower_int)[0] # Exponential.register_inverse_analytic_integral() limits = Space.from_axes(axes=0, limits=(ANY_LOWER, ANY_UPPER)) Exponential.register_analytic_integral(func=_exp_integral_from_any_to_any, limits=limits)