"""ARGUS PDF (https://en.wikipedia.org/wiki/ARGUS_distribution)"""
from __future__ import annotations
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
import zfit
from zfit import z
from zfit.util import ztyping
@z.function(wraps="tensor")
def argus_func(
m: ztyping.NumericalType,
m0: ztyping.NumericalType,
c: ztyping.NumericalType,
p: ztyping.NumericalType,
) -> tf.Tensor:
r"""`ARGUS shape <https://en.wikipedia.org/wiki/ARGUS_distribution>`_ describing the invariant mass of a particle in
a continuous background.
It is defined as
.. math::
\mathrm{Argus}(m, m_0, c, p) = m \cdot \left[ 1 - \left( \frac{m}{m_0} \right)^2 \right]^p
\cdot \exp\left[ c \cdot \left(1 - \left(\frac{m}{m_0}\right)^2 \right) \right]
The implementation follows the `RooFit version <https://root.cern.ch/doc/master/classRooArgusBG.html>`_
Args:
m: Mass of the particle
m0: Maximal energetically allowed mass, cutoff
c: peakiness of the distribution
p: Generalized ARGUS shape, for p = 0.5, the normal ARGUS shape is recovered
Returns:
`tf.Tensor`: the values matching the (broadcasted) shapes of the input
"""
m = tfp.math.clip_by_value_preserve_gradient(m, 0.0, m0)
m_frac = m / m0
m_factor = 1 - z.square(m_frac)
return m * z.pow(m_factor, p) * (z.exp(c * m_factor))
[docs]
class Argus(zfit.pdf.BasePDF):
def __init__(
self,
*,
m0: ztyping.ParamTypeInput,
c: ztyping.ParamTypeInput,
p: ztyping.ParamTypeInput,
obs: ztyping.ObsTypeInput,
extended: ztyping.ParamTypeInput | None = None,
norm: ztyping.NormTypeInput = None,
name: str = "ArgusPDF",
label: str | None = None,
):
r"""`ARGUS shape <https://en.wikipedia.org/wiki/ARGUS_distribution>`_ describing the invariant mass of a particle
in a continuous background.
The ARGUS shaped function describes the reconstructed invariant mass of a decayed particle, especially at the
kinematic boundaries of the maximum beam energy. It is defined as
.. math::
\mathrm{Argus}(m, m_0, c, p) = m \cdot \left[ 1 - \left( \frac{m}{m_0} \right)^2 \right]^p
\cdot \exp\left[ c \cdot \left(1 - \left(\frac{m}{m_0}\right)^2 \right) \right]
and normalized to one over the `norm` (which defaults to `obs`).
The implementation follows the `RooFit version <https://root.cern.ch/doc/master/classRooArgusBG.html>`_
Args:
obs: |@doc:pdf.init.obs| Observables of the
model. This will be used as the default space of the PDF and,
if not given explicitly, as the normalization range.
The default space is used for example in the sample method: if no
sampling limits are given, the default space is used.
If the observables are binned and the model is unbinned, the
model will be a binned model, by wrapping the model in a
:py:class:`~zfit.pdf.BinnedFromUnbinnedPDF`, equivalent to
calling :py:meth:`~zfit.pdf.BasePDF.to_binned`.
If the observables are binned and the model is unbinned, the
model will be a binned model, by wrapping the model in a
:py:class:`~zfit.pdf.BinnedFromUnbinnedPDF`, equivalent to
calling :py:meth:`~zfit.pdf.BasePDF.to_binned`.
The observables are not equal to the domain as it does not restrict or
truncate the model outside this range. |@docend:pdf.init.obs|
m0: Maximal energetically allowed mass, cutoff
c: Shape parameter; "peakiness" of the distribution
p: Generalization of the ARGUS shape, for p = 0.5, the normal ARGUS shape is recovered
extended: |@doc:pdf.init.extended| The overall yield of the PDF.
If this is parameter-like, it will be used as the yield,
the expected number of events, and the PDF will be extended.
An extended PDF has additional functionality, such as the
``ext_*`` methods and the ``counts`` (for binned PDFs). |@docend:pdf.init.extended|
norm: |@doc:pdf.init.norm| Normalization of the PDF.
By default, this is the same as the default space of the PDF. |@docend:pdf.init.norm|
name: |@doc:pdf.init.name| Name of the PDF.
Maybe has implications on the serialization and deserialization of the PDF.
For a human-readable name, use the label. |@docend:pdf.init.name|
label: |@doc:pdf.init.label| Human-readable name
or label of
the PDF for a better description, to be used with plots etc.
Has no programmatical functional purpose as identification. |@docend:pdf.init.label|
Returns:
`tf.Tensor`: the values matching the (broadcasted) shapes of the input
"""
params = {"m0": m0, "c": c, "p": p}
super().__init__(obs=obs, name=name, params=params, extended=extended, norm=norm, label=label)
if isinstance(p, zfit.param.ConstantParameter):
p_is_half = np.isclose(p.static_value, 0.5)
else:
try:
p_is_half = np.isclose(p, 0.5)
except NotImplementedError:
p_is_half = False # we cannot know, unfortunately
if isinstance(c, zfit.param.ConstantParameter):
c_is_negative = c.static_value < 0
else:
try:
c_is_negative = c < 0
except NotImplementedError:
c_is_negative = False
self._argus_p_is_half = p_is_half
self._argus_c_is_positive = c_is_negative
@zfit.supports(norm=False)
def _pdf(self, x, norm, params):
"""
Calculation of ARGUS PDF value
(Docs: https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.argus.html)
"""
m = x[0]
del norm
m0 = params["m0"]
c = params["c"]
p = params["p"]
return argus_func(m, m0, c, p)
# Keep? move to math?
# @z.function_tf
def uppergamma(s, x):
return tf.math.igammac(s, x=x) * z.exp(tf.math.lgamma(x))
@z.function(wraps="tensor")
def argus_cdf_p_half_nonpositive(lim, c, m0):
lim = tf.clip_by_value(lim, 0.0, m0)
return tf.cond(
tf.math.less(c, 0.0),
lambda: argus_cdf_p_half_c_neg(lim=lim, c=c, m0=m0),
lambda: argus_cdf_p_half_c_zero(lim=lim, c=c, m0=m0),
)
# Does not work, why?
# # @z.function_tf
# def argus_cdf_p_half_sympy(lim, c, m0):
# # lim = tf.where(tf.less_equal(lim, m0), lim, m0) # take the smaller one, only integrate up to m0
# # lim = tf.where(tf.greater(lim, 0.), lim, z.constant(0.)) # start from 0 as minimum
# lim = tf.clip_by_value(lim, 0., m0)
# lim_square = z.square(lim)
# m0_squared = z.square(m0)
# return (-0.5 * m0_squared * z.pow(-c * (1 - lim_square / m0_squared), -0.5)
# * z.sqrt(1 - lim_square / m0_squared) * uppergamma((z.constant(1.5)),
# -c * (1 - lim_square / m0_squared)) / c)
@z.function(wraps="tensor")
def argus_cdf_p_half_c_neg(lim, c, m0):
f1 = 1 - z.square(lim / m0)
cdf = -0.5 * z.square(m0)
cdf *= z.exp(c * f1) * z.sqrt(f1) / c + 0.5 / z.pow(-c, 1.5) * z.sqrt(z.pi) * tf.math.erf(z.sqrt(-c * f1))
return cdf
@z.function(wraps="tensor")
def argus_cdf_p_half_c_zero(lim, c, m0):
del c
f1 = 1 - z.square(lim / m0)
return -z.square(m0) / 3.0 * f1 * z.sqrt(f1)
# TODO: add Faddeev function approximation
# def argus_cdf_p_half_c_pos(lim, c, m0):
# f1 = 1 - z.square(lim)
# cdf = 0.5 * z.square(m0) * z.exp(c * f1) / (c * z.sqrt(c))
# # cdf *= (0.5 * z.sqrt(z.pi) * (RooMath::faddeeva(sqrt(c * f1))).imag() - z.sqrt(c * f1))
# return cdf
@z.function(wraps="tensor")
def argus_integral_p_half_func(lower, upper, c, m0):
return argus_cdf_p_half_nonpositive(upper, c=c, m0=m0) - argus_cdf_p_half_nonpositive(lower, c=c, m0=m0)
def argus_integral_p_half(limits, params, model):
c = params["c"]
if not model._argus_p_is_half:
raise zfit.exception.AnalyticIntegralNotImplementedError()
if not model._argus_c_is_positive:
raise zfit.exception.AnalyticIntegralNotImplementedError()
m0 = params["m0"]
lower, upper = limits.limit1d
lower = z.convert_to_tensor(lower)
upper = z.convert_to_tensor(upper)
return argus_integral_p_half_func(lower=lower, upper=upper, c=c, m0=m0)
argus_integral_limits = zfit.Space(axes=(0,), limits=(zfit.Space.ANY_LOWER, zfit.Space.ANY_UPPER))
Argus.register_analytic_integral(func=argus_integral_p_half, limits=argus_integral_limits)
if __name__ == "__main__":
# create the integral
import sympy as sp
N = sp.Symbol("N")
m = sp.Symbol("m")
m0 = sp.Symbol("m0")
c = sp.Symbol("c")
t = sp.Symbol("t")
mu = sp.Symbol("mu")
sigma = sp.Symbol("sigma")
# p = sp.Symbol('p')
p = 0.5
low = sp.Symbol("low")
lim = sp.Symbol("up")
from sympy.assumptions.assume import global_assumptions
global_assumptions.add(sp.Q.positive(N))
global_assumptions.add(sp.Q.finite(N))
global_assumptions.add(sp.Q.positive(sigma))
global_assumptions.add(sp.Q.finite(sigma))
global_assumptions.add(sp.Q.positive(m))
global_assumptions.add(sp.Q.finite(m))
global_assumptions.add(sp.Q.positive(m / m0))
global_assumptions.add(sp.Q.finite(m / m0))
global_assumptions.add(sp.Q.positive(p))
global_assumptions.add(sp.Q.finite(p))
# global_assumptions.add(sp.Q.integer(p))
global_assumptions.add(sp.Q.finite(c))
global_assumptions.add(sp.Q.positive(c))
m_factor = 1 - (m / m0) ** 2
integral_expression = m * m_factor**p * (sp.exp(c * m_factor))
# integral_expression = (N * m * (1 - (m / m0) ** 2) ** p * sp.exp(c * (1 - (m / m0) ** 2)))
integral = sp.integrate(integral_expression, m)
func1 = sp.lambdify(integral.free_symbols, integral, "tensorflow")
import inspect
source = inspect.getsource(func1)
# sp.lambdify()