Source code for zfit_physics.models.pdf_argus

"""ARGUS PDF (https://en.wikipedia.org/wiki/ARGUS_distribution)"""

from __future__ import annotations

import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
import zfit
from zfit import z
from zfit.util import ztyping


@z.function(wraps="tensor")
def argus_func(
    m: ztyping.NumericalType,
    m0: ztyping.NumericalType,
    c: ztyping.NumericalType,
    p: ztyping.NumericalType,
) -> tf.Tensor:
    r"""`ARGUS shape <https://en.wikipedia.org/wiki/ARGUS_distribution>`_ describing the invariant mass of a particle in
    a continuous background.

    It is defined as

    .. math::

        \mathrm{Argus}(m, m_0, c, p) = m \cdot \left[ 1 - \left( \frac{m}{m_0} \right)^2 \right]^p
        \cdot \exp\left[ c \cdot \left(1 - \left(\frac{m}{m_0}\right)^2 \right) \right]

    The implementation follows the `RooFit version <https://root.cern.ch/doc/master/classRooArgusBG.html>`_

    Args:
        m: Mass of the particle
        m0: Maximal energetically allowed mass, cutoff
        c: peakiness of the distribution
        p: Generalized ARGUS shape, for p = 0.5, the normal ARGUS shape is recovered

    Returns:
        `tf.Tensor`: the values matching the (broadcasted) shapes of the input
    """
    m = tfp.math.clip_by_value_preserve_gradient(m, 0.0, m0)
    m_frac = m / m0

    m_factor = 1 - z.square(m_frac)
    return m * z.pow(m_factor, p) * (z.exp(c * m_factor))


[docs] class Argus(zfit.pdf.BasePDF): def __init__( self, *, m0, c, p, obs: ztyping.ObsTypeInput, extended: ztyping.ParamTypeInput | None = None, norm: ztyping.NormTypeInput = None, name: str = "ArgusPDF", label: str | None = None, ): r"""`ARGUS shape <https://en.wikipedia.org/wiki/ARGUS_distribution>`_ describing the invariant mass of a particle in a continuous background. The ARGUS shaped function describes the reconstructed invariant mass of a decayed particle, especially at the kinematic boundaries of the maximum beam energy. It is defined as .. math:: \mathrm{Argus}(m, m_0, c, p) = m \cdot \left[ 1 - \left( \frac{m}{m_0} \right)^2 \right]^p \cdot \exp\left[ c \cdot \left(1 - \left(\frac{m}{m_0}\right)^2 \right) \right] and normalized to one over the `norm_range` (which defaults to `obs`). The implementation follows the `RooFit version <https://root.cern.ch/doc/master/classRooArgusBG.html>`_ Args: obs: |@doc:pdf.init.obs| Observables of the model. This will be used as the default space of the PDF and, if not given explicitly, as the normalization range. The default space is used for example in the sample method: if no sampling limits are given, the default space is used. The observables are not equal to the domain as it does not restrict or truncate the model outside this range. |@docend:pdf.init.obs| m0: Maximal energetically allowed mass, cutoff c: Shape parameter; "peakiness" of the distribution p: Generalization of the ARGUS shape, for p = 0.5, the normal ARGUS shape is recovered extended: |@doc:pdf.init.extended| The overall yield of the PDF. If this is parameter-like, it will be used as the yield, the expected number of events, and the PDF will be extended. An extended PDF has additional functionality, such as the ``ext_*`` methods and the ``counts`` (for binned PDFs). |@docend:pdf.init.extended| norm: |@doc:pdf.init.norm| Normalization of the PDF. By default, this is the same as the default space of the PDF. |@docend:pdf.init.norm| name: |@doc:pdf.init.name| Human-readable name or label of the PDF for better identification. |@docend:pdf.init.name| label: |@doc:pdf.init.label| Label of the PDF, if None is given, it will be the name. |@docend:pdf.init.label| Returns: `tf.Tensor`: the values matching the (broadcasted) shapes of the input """ params = {"m0": m0, "c": c, "p": p} super().__init__(obs=obs, name=name, params=params, extended=extended, norm=norm, label=label) _N_OBS = 1 @zfit.supports() def _unnormalized_pdf(self, x, params): """ Calculation of ARGUS PDF value (Docs: https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.argus.html) """ m = x[0] m0 = params["m0"] c = params["c"] p = params["p"] return argus_func(m, m0, c, p)
# Keep? move to math? # @z.function_tf def uppergamma(s, x): return tf.math.igammac(s, x=x) * z.exp(tf.math.lgamma(x)) @z.function(wraps="tensor") def argus_cdf_p_half_nonpositive(lim, c, m0): lim = tf.clip_by_value(lim, 0.0, m0) return tf.cond( tf.math.less(c, 0.0), lambda: argus_cdf_p_half_c_neg(lim=lim, c=c, m0=m0), lambda: argus_cdf_p_half_c_zero(lim=lim, c=c, m0=m0), ) # Does not work, why? # # @z.function_tf # def argus_cdf_p_half_sympy(lim, c, m0): # # lim = tf.where(tf.less_equal(lim, m0), lim, m0) # take the smaller one, only integrate up to m0 # # lim = tf.where(tf.greater(lim, 0.), lim, z.constant(0.)) # start from 0 as minimum # lim = tf.clip_by_value(lim, 0., m0) # lim_square = z.square(lim) # m0_squared = z.square(m0) # return (-0.5 * m0_squared * z.pow(-c * (1 - lim_square / m0_squared), -0.5) # * z.sqrt(1 - lim_square / m0_squared) * uppergamma((z.constant(1.5)), # -c * (1 - lim_square / m0_squared)) / c) @z.function(wraps="tensor") def argus_cdf_p_half_c_neg(lim, c, m0): f1 = 1 - z.square(lim / m0) cdf = -0.5 * z.square(m0) cdf *= z.exp(c * f1) * z.sqrt(f1) / c + 0.5 / z.pow(-c, 1.5) * z.sqrt(z.pi) * tf.math.erf(z.sqrt(-c * f1)) return cdf @z.function(wraps="tensor") def argus_cdf_p_half_c_zero(lim, c, m0): del c f1 = 1 - z.square(lim / m0) return -z.square(m0) / 3.0 * f1 * z.sqrt(f1) # TODO: add Faddeev function approximation # def argus_cdf_p_half_c_pos(lim, c, m0): # f1 = 1 - z.square(lim) # cdf = 0.5 * z.square(m0) * z.exp(c * f1) / (c * z.sqrt(c)) # # cdf *= (0.5 * z.sqrt(z.pi) * (RooMath::faddeeva(sqrt(c * f1))).imag() - z.sqrt(c * f1)) # return cdf @z.function(wraps="tensor") def argus_integral_p_half_func(lower, upper, c, m0): return argus_cdf_p_half_nonpositive(upper, c=c, m0=m0) - argus_cdf_p_half_nonpositive(lower, c=c, m0=m0) def argus_integral_p_half(limits, params, model): del model p = params["p"] if not isinstance(p, zfit.param.ConstantParameter) or not np.isclose(p.static_value, 0.5): raise zfit.exception.AnalyticIntegralNotImplementedError() c = params["c"] if not isinstance(c, zfit.param.ConstantParameter) or c.static_value > 0: raise zfit.exception.AnalyticIntegralNotImplementedError() m0 = params["m0"] lower, upper = limits.limit1d lower = z.convert_to_tensor(lower) upper = z.convert_to_tensor(upper) return argus_integral_p_half_func(lower=lower, upper=upper, c=c, m0=m0) argus_integral_limits = zfit.Space(axes=(0,), limits=(zfit.Space.ANY_LOWER, zfit.Space.ANY_UPPER)) Argus.register_analytic_integral(func=argus_integral_p_half, limits=argus_integral_limits) if __name__ == "__main__": # create the integral import sympy as sp N = sp.Symbol("N") m = sp.Symbol("m") m0 = sp.Symbol("m0") c = sp.Symbol("c") t = sp.Symbol("t") mu = sp.Symbol("mu") sigma = sp.Symbol("sigma") # p = sp.Symbol('p') p = 0.5 low = sp.Symbol("low") lim = sp.Symbol("up") from sympy.assumptions.assume import global_assumptions global_assumptions.add(sp.Q.positive(N)) global_assumptions.add(sp.Q.finite(N)) global_assumptions.add(sp.Q.positive(sigma)) global_assumptions.add(sp.Q.finite(sigma)) global_assumptions.add(sp.Q.positive(m)) global_assumptions.add(sp.Q.finite(m)) global_assumptions.add(sp.Q.positive(m / m0)) global_assumptions.add(sp.Q.finite(m / m0)) global_assumptions.add(sp.Q.positive(p)) global_assumptions.add(sp.Q.finite(p)) # global_assumptions.add(sp.Q.integer(p)) global_assumptions.add(sp.Q.finite(c)) global_assumptions.add(sp.Q.positive(c)) m_factor = 1 - (m / m0) ** 2 integral_expression = m * m_factor**p * (sp.exp(c * m_factor)) # integral_expression = (N * m * (1 - (m / m0) ** 2) ** p * sp.exp(c * (1 - (m / m0) ** 2))) integral = sp.integrate(integral_expression, m) func1 = sp.lambdify(integral.free_symbols, integral, "tensorflow") import inspect source = inspect.getsource(func1) # sp.lambdify()