"""
Basic PDFs are provided here. Gauss, exponential... that can be used together with Functors to
build larger models.
"""
# Copyright (c) 2019 zfit
import math as mt
from typing import Type, Any
import warnings
import numpy as np
import tensorflow as tf
from zfit import z
from ..util.exception import WorkInProgressError
from ..util.temporary import TemporarilySet
from ..settings import ztypes
from ..util import ztyping
from ..core.limits import Space, ANY_LOWER, ANY_UPPER
from ..core.basepdf import BasePDF
infinity = mt.inf
[docs]class CustomGaussOLD(BasePDF):
def __init__(self, mu, sigma, obs, name="Gauss"):
super().__init__(name=name, obs=obs, params=dict(mu=mu, sigma=sigma))
def _unnormalized_pdf(self, x):
x = x.unstack_x()
mu = self.params['mu']
sigma = self.params['sigma']
gauss = tf.exp(- 0.5 * tf.square((x - mu) / sigma))
return gauss
def _gauss_integral_from_inf_to_inf(limits, params, model):
return tf.sqrt(2 * z.pi) * params['sigma']
CustomGaussOLD.register_analytic_integral(func=_gauss_integral_from_inf_to_inf,
limits=Space.from_axes(limits=(-infinity, infinity), axes=(0,)))
[docs]class Exponential(BasePDF):
_N_OBS = 1
def __init__(self, lambda_, obs: ztyping.ObsTypeInput, name: str = "Exponential",
**kwargs):
"""Exponential function exp(lambda * x).
The function is normalized over a finite range and therefore a pdf. So the PDF is precisely
defined as :math:`\\frac{ e^{\\lambda \\cdot x}}{ \\int_{lower}^{upper} e^{\\lambda \\cdot x} dx}`
Args:
lambda_ (:py:class:`~zfit.Parameter`): Accessed as parameter "lambda".
obs (:py:class:`~zfit.Space`): The :py:class:`~zfit.Space` the pdf is defined in.
name (str): Name of the pdf.
dtype (DType):
"""
params = {'lambda': lambda_}
super().__init__(obs, name=name, params=params, **kwargs)
self._numerics_data_shift = None
def _unnormalized_pdf(self, x):
lambda_ = self.params['lambda']
x = x.unstack_x()
return self._numerics_shifted_exp(x=x, lambda_=lambda_) # Don't use exp! will overflow.
def _numerics_shifted_exp(self, x, lambda_): # needed due to overflow in exp otherwise, prevents by shift
return z.exp(lambda_ * (x - self._numerics_data_shift))
def _set_numerics_data_shift(self, limits):
lower, upper = limits.limits
lower_val = min([lim[0] for lim in lower])
upper_val = max([lim[0] for lim in upper])
value = (upper_val + lower_val) / 2
if max(abs(lower_val - value), abs(upper_val - value)) > 710:
warnings.warn(
"Boundaries can be too wide for exponential (assuming lambda ~ 1), expect `inf` in exp(x) and `NaN`s."
"(upper - lower) * lambda should be smaller than 1400 roughly",
category=RuntimeWarning)
def setter(value):
self._numerics_data_shift = value
def getter():
return self._numerics_data_shift
return TemporarilySet(value=value, getter=getter, setter=setter)
# All hooks are needed to set the right shift when "entering" the pdf. The norm range is taken where both are
# available. No special need needs to be taken for sampling (it samples from the correct region, the limits, and
# uses the predictions by the `unnormalized_prob` -> that is shifted correctly
def _single_hook_integrate(self, limits, norm_range, name='_hook_integrate'):
with self._set_numerics_data_shift(limits=limits):
return super()._single_hook_integrate(limits, norm_range, name)
def _single_hook_analytic_integrate(self, limits, norm_range, name="_hook_analytic_integrate"):
with self._set_numerics_data_shift(limits=limits):
return super()._single_hook_analytic_integrate(limits, norm_range, name)
def _single_hook_numeric_integrate(self, limits, norm_range, name='_hook_numeric_integrate'):
with self._set_numerics_data_shift(limits=limits):
return super()._single_hook_numeric_integrate(limits, norm_range, name)
def _single_hook_partial_integrate(self, x, limits, norm_range, name='_hook_partial_integrate'):
with self._set_numerics_data_shift(limits=limits):
return super()._single_hook_partial_integrate(x, limits, norm_range, name)
def _single_hook_partial_analytic_integrate(self, x, limits, norm_range, name='_hook_partial_analytic_integrate'):
with self._set_numerics_data_shift(limits=limits):
return super()._single_hook_partial_analytic_integrate(x, limits, norm_range, name)
def _single_hook_partial_numeric_integrate(self, x, limits, norm_range, name='_hook_partial_numeric_integrate'):
with self._set_numerics_data_shift(limits=limits):
return super()._single_hook_partial_numeric_integrate(x, limits, norm_range, name)
def _single_hook_normalization(self, limits, name):
with self._set_numerics_data_shift(limits=limits):
return super()._single_hook_normalization(limits, name)
def _single_hook_unnormalized_pdf(self, x, component_norm_range, name):
if component_norm_range.limits is not None:
with self._set_numerics_data_shift(limits=component_norm_range):
return super()._single_hook_unnormalized_pdf(x, component_norm_range, name)
else:
return super()._single_hook_unnormalized_pdf(x, component_norm_range, name)
def _single_hook_pdf(self, x, norm_range, name):
with self._set_numerics_data_shift(limits=norm_range):
return super()._single_hook_pdf(x, norm_range, name)
def _single_hook_log_pdf(self, x, norm_range, name):
with self._set_numerics_data_shift(limits=norm_range):
return super()._single_hook_log_pdf(x, norm_range, name)
def _single_hook_sample(self, n, limits, name):
with self._set_numerics_data_shift(limits=limits):
return super()._single_hook_sample(n, limits, name)
# def _log_pdf(self, x, norm_range: Space):
# lambda_ = self.params['lambda']
# x = z.unstack_x(x)
# func = x * lambda_
# if norm_range.n_limits > 1:
# raise DueToLazynessNotImplementedError(
# "Not implemented, it's more of a hack. I Should implement log_pdf and "
# "norm probarly")
# (lower,), (upper,) = norm_range.limits
# lower = lower[0]
# upper = upper[0]
#
# assert False, "WIP, add log integral"
def _exp_integral_from_any_to_any(limits, params, model):
lambda_ = params['lambda']
def raw_integral(x):
return model._numerics_shifted_exp(x=x, lambda_=lambda_) / lambda_ # needed due to overflow in exp otherwise
(lower,), (upper,) = limits.limits
if lower[0] == - upper[0] == np.inf:
raise NotImplementedError
lower_int = raw_integral(x=z.constant(lower))
upper_int = raw_integral(x=z.constant(upper))
return (upper_int - lower_int)[0]
# Exponential.register_inverse_analytic_integral()
limits = Space.from_axes(axes=0, limits=(ANY_LOWER, ANY_UPPER))
Exponential.register_analytic_integral(func=_exp_integral_from_any_to_any, limits=limits)