# Copyright (c) 2024 zfit
from __future__ import annotations
from collections.abc import Callable
from typing import ClassVar, Literal, Optional, Union
import numpy as np
import pydantic
import tensorflow as tf
import tensorflow_probability as tfp
from tensorflow_probability.python import distributions as tfd
import zfit.z.numpy as znp
from .. import z
from ..core.basepdf import BasePDF
from ..core.interfaces import ZfitData, ZfitParameter, ZfitSpace
from ..core.serialmixin import SerializableMixin
from ..serialization import Serializer, SpaceRepr
from ..serialization.pdfrepr import BasePDFRepr
from ..settings import run, ztypes
from ..util import (
binning as binning_util,
)
from ..util import (
convolution as convolution_util,
)
from ..util import (
improved_sheather_jones as isj_util,
)
from ..util import (
ztyping,
)
from ..util.exception import OverdefinedError, ShapeIncompatibleError
from ..util.ztyping import ExtendedInputType, NormInputType
from ..z.math import weighted_quantile
from .dist_tfp import WrapDistribution
@z.function(wraps="tensor", keepalive=True)
def bandwidth_rule_of_thumb(
data: znp.array,
weights: znp.array | None,
factor: float | int | znp.array = None,
) -> znp.array:
r"""Calculate the bandwidth of *data* using a rule of thumb.
This calculates a global, single bandwidth for all kernels using a rule of thumb.
|@doc:pdf.kde.bandwidth.explain_global| A global bandwidth
is a single parameter that is shared amongst all kernels.
While this is a fast and robust method,
it is a rule of thumb approximation. Due to its global nature,
it cannot take into account the different varying
local densities. It uses notably the least amount of memory
of all methods. |@docend:pdf.kde.bandwidth.explain_global|
.. math::
h = factor * min\left(\hat{\sigma}, \frac{IQR}{1.34}\right)\,n^{-\frac{1}{5}}
Args:
data: |@doc:pdf.kde.bandwidth.data| Data points to determine the bandwidth
from. |@docend:pdf.kde.bandwidth.data|
weights: |@doc:pdf.kde.bandwidth.weights| Weights of each event
in *data*, can be None or Tensor-like with shape compatible
with *data*. This will change the count of the events, whereas
weight :math:`w_i` of :math:`x_i`. |@docend:pdf.kde.bandwidth.weights|
factor (default: 0.9): Factor that scales the rule of thumb. Ofter used are 0.9 for
silvermans rule of thumb or 1.059 for scotts rule of thumb.
Returns:
Estimated bandwidth
"""
if factor is None:
factor = tf.constant(0.9)
return min_std_or_iqr(data, weights) * znp.asarray(tf.shape(data)[0], ztypes.float) ** (-1 / 5.0) * factor
@z.function(wraps="tensor", keepalive=True)
def bandwidth_silverman(data, weights):
r"""Calculate the bandwidth of *data* using silvermans rule of thumb.
|@doc:pdf.kde.bandwidth.explain_global| A global bandwidth
is a single parameter that is shared amongst all kernels.
While this is a fast and robust method,
it is a rule of thumb approximation. Due to its global nature,
it cannot take into account the different varying
local densities. It uses notably the least amount of memory
of all methods. |@docend:pdf.kde.bandwidth.explain_global|
.. math::
h = 0.9 * min\left(\hat{\sigma}, \frac{IQR}{1.34}\right)\,n^{-\frac{1}{5}}
Args:
data: |@doc:pdf.kde.bandwidth.data| Data points to determine the bandwidth
from. |@docend:pdf.kde.bandwidth.data|
weights: |@doc:pdf.kde.bandwidth.weights| Weights of each event
in *data*, can be None or Tensor-like with shape compatible
with *data*. This will change the count of the events, whereas
weight :math:`w_i` of :math:`x_i`. |@docend:pdf.kde.bandwidth.weights|
Returns:
Estimated bandwidth
"""
return bandwidth_rule_of_thumb(data=data, weights=weights, factor=znp.array(0.9, dtype=ztypes.float))
@z.function(wraps="tensor", keepalive=True)
def bandwidth_scott(data, weights):
r"""Calculate the bandwidth of *data* using silvermans rule of thumb.
|@doc:pdf.kde.bandwidth.explain_global| A global bandwidth
is a single parameter that is shared amongst all kernels.
While this is a fast and robust method,
it is a rule of thumb approximation. Due to its global nature,
it cannot take into account the different varying
local densities. It uses notably the least amount of memory
of all methods. |@docend:pdf.kde.bandwidth.explain_global|
.. math::
h = 1.059 * min\left(\hat{\sigma}, \frac{IQR}{1.34}\right)\,n^{-\frac{1}{5}}
Args:
data: |@doc:pdf.kde.bandwidth.data| Data points to determine the bandwidth
from. |@docend:pdf.kde.bandwidth.data|
weights: |@doc:pdf.kde.bandwidth.weights| Weights of each event
in *data*, can be None or Tensor-like with shape compatible
with *data*. This will change the count of the events, whereas
weight :math:`w_i` of :math:`x_i`. |@docend:pdf.kde.bandwidth.weights|
Returns:
Estimated bandwidth
"""
return bandwidth_rule_of_thumb(data=data, weights=weights, factor=znp.array(1.059, dtype=ztypes.float))
def bandwidth_isj(data, weights):
r"""Calculate the bandwidth of *data* using the improved Sheather-Jones Algorithm.
The ISJ is an adaptive kernel density estimator based on linear diffusion processes
and an estimation for the optimal bandwidth :footcite:t:`Botev_2010`
|@doc:pdf.kde.bandwidth.explain_global| A global bandwidth
is a single parameter that is shared amongst all kernels.
While this is a fast and robust method,
it is a rule of thumb approximation. Due to its global nature,
it cannot take into account the different varying
local densities. It uses notably the least amount of memory
of all methods. |@docend:pdf.kde.bandwidth.explain_global|
.. footbibliography::
Args:
data: |@doc:pdf.kde.bandwidth.data| Data points to determine the bandwidth
from. |@docend:pdf.kde.bandwidth.data|
weights: |@doc:pdf.kde.bandwidth.weights| Weights of each event
in *data*, can be None or Tensor-like with shape compatible
with *data*. This will change the count of the events, whereas
weight :math:`w_i` of :math:`x_i`. |@docend:pdf.kde.bandwidth.weights|
Returns:
Estimated bandwidth
"""
return isj_util.calculate_bandwidth(data, num_grid_points=1024, binning_method="linear", weights=weights)
def bandwidth_adaptive_geomV1(data, func, weights):
r"""Local, adaptive bandwidth estimation scaling by the geometric mean.
The implementation follows Eq. 2 and 3 of :footcite:t:`7761150`. However,
experimental results hint towards a non-optimal performance, which can be caused
by a mistake in the implementation.
|@doc:pdf.kde.bandwidth.explain_adaptive| Adaptive bandwidths are
a way to reduce the dependence on the bandwidth parameter
and are usually local bandwidths that take into account
the local densities.
Adaptive bandwidths are constructed by using an initial estimate
of the local density in order to calculate a sensible bandwidth
for each kernel. The initial estimator can be a kernel density
estimation using a global bandwidth with a rule of thumb.
The adaptive bandwidth h is obtained using this estimate, where
usually
.. math::
h_{i} \propto f( x_{i} ) ^ {-1/2}
Estimates can still differ in the overall scaling of this
bandwidth. |@docend:pdf.kde.bandwidth.explain_adaptive|
|@doc:pdf.kde.bandwidth.explain_local| A local bandwidth
means that each kernel :math:`i` has a different bandwidth.
In other words, given some data points with size n,
we will need n bandwidth parameters.
This is often more accurate than a global bandwidth,
as it allows to have larger bandwiths in areas of smaller density,
where, due to the small local sample size, we have less certainty
over the true density while having a smaller bandwidth in denser
populated areas.
The accuracy comes at the cost of a longer pre-calculation to obtain
the local bandwidth (there are different methods available), an
increased runtime and - most importantly - a peak memory usage.
This can be especially costly for a large number (> few thousand) of
kernels and/or evaluating on large datasets (> 10'000). |@docend:pdf.kde.bandwidth.explain_local|
.. footbibliography::
Args:
data: |@doc:pdf.kde.bandwidth.data| Data points to determine the bandwidth
from. |@docend:pdf.kde.bandwidth.data|
weights: |@doc:pdf.kde.bandwidth.weights| Weights of each event
in *data*, can be None or Tensor-like with shape compatible
with *data*. This will change the count of the events, whereas
weight :math:`w_i` of :math:`x_i`. |@docend:pdf.kde.bandwidth.weights|
Returns:
Estimated bandwidth of size data
"""
data = z.convert_to_tensor(data)
n = znp.sum(weights) if weights is not None else znp.asarray(tf.shape(data)[0], ztypes.float)
probs = func(data)
lambda_i = 1 / znp.sqrt(probs / z.math.reduce_geometric_mean(probs, weights=weights))
return lambda_i * n ** (-1.0 / 5.0) * min_std_or_iqr(data, weights)
def bandwidth_adaptive_zfitV1(data, func, weights) -> znp.array:
r"""(Naive) Local, adaptive bandwidth estimation using a normalized scaling.
This approach is an ad-hoc scaling. It works well but is not found in any paper.
|@doc:pdf.kde.bandwidth.explain_adaptive| Adaptive bandwidths are
a way to reduce the dependence on the bandwidth parameter
and are usually local bandwidths that take into account
the local densities.
Adaptive bandwidths are constructed by using an initial estimate
of the local density in order to calculate a sensible bandwidth
for each kernel. The initial estimator can be a kernel density
estimation using a global bandwidth with a rule of thumb.
The adaptive bandwidth h is obtained using this estimate, where
usually
.. math::
h_{i} \propto f( x_{i} ) ^ {-1/2}
Estimates can still differ in the overall scaling of this
bandwidth. |@docend:pdf.kde.bandwidth.explain_adaptive|
|@doc:pdf.kde.bandwidth.explain_local| A local bandwidth
means that each kernel :math:`i` has a different bandwidth.
In other words, given some data points with size n,
we will need n bandwidth parameters.
This is often more accurate than a global bandwidth,
as it allows to have larger bandwiths in areas of smaller density,
where, due to the small local sample size, we have less certainty
over the true density while having a smaller bandwidth in denser
populated areas.
The accuracy comes at the cost of a longer pre-calculation to obtain
the local bandwidth (there are different methods available), an
increased runtime and - most importantly - a peak memory usage.
This can be especially costly for a large number (> few thousand) of
kernels and/or evaluating on large datasets (> 10'000). |@docend:pdf.kde.bandwidth.explain_local|
.. footbibliography::
Args:
data: |@doc:pdf.kde.bandwidth.data| Data points to determine the bandwidth
from. |@docend:pdf.kde.bandwidth.data|
weights: |@doc:pdf.kde.bandwidth.weights| Weights of each event
in *data*, can be None or Tensor-like with shape compatible
with *data*. This will change the count of the events, whereas
weight :math:`w_i` of :math:`x_i`. |@docend:pdf.kde.bandwidth.weights|
Returns:
Estimated bandwidth array of same size as data
"""
data = z.convert_to_tensor(data)
probs = func(data)
estimate = bandwidth_scott(data, weights=weights)
factor = znp.sqrt(probs) / znp.mean(znp.sqrt(probs))
return estimate / factor
def bandwidth_adaptive_stdV1(data, func, weights):
r"""Local, adaptive bandwidth estimation scaling by the std of the data.
The implementation follows Eq. 2 and 3 of :footcite:t:`Cranmer_2001`. However,
experimental results hint towards a non-optimal performance, which can be caused
by a mistake in the implementation.
|@doc:pdf.kde.bandwidth.explain_adaptive| Adaptive bandwidths are
a way to reduce the dependence on the bandwidth parameter
and are usually local bandwidths that take into account
the local densities.
Adaptive bandwidths are constructed by using an initial estimate
of the local density in order to calculate a sensible bandwidth
for each kernel. The initial estimator can be a kernel density
estimation using a global bandwidth with a rule of thumb.
The adaptive bandwidth h is obtained using this estimate, where
usually
.. math::
h_{i} \propto f( x_{i} ) ^ {-1/2}
Estimates can still differ in the overall scaling of this
bandwidth. |@docend:pdf.kde.bandwidth.explain_adaptive|
|@doc:pdf.kde.bandwidth.explain_local| A local bandwidth
means that each kernel :math:`i` has a different bandwidth.
In other words, given some data points with size n,
we will need n bandwidth parameters.
This is often more accurate than a global bandwidth,
as it allows to have larger bandwiths in areas of smaller density,
where, due to the small local sample size, we have less certainty
over the true density while having a smaller bandwidth in denser
populated areas.
The accuracy comes at the cost of a longer pre-calculation to obtain
the local bandwidth (there are different methods available), an
increased runtime and - most importantly - a peak memory usage.
This can be especially costly for a large number (> few thousand) of
kernels and/or evaluating on large datasets (> 10'000). |@docend:pdf.kde.bandwidth.explain_local|
.. footbibliography::
Args:
data: |@doc:pdf.kde.bandwidth.data| Data points to determine the bandwidth
from. |@docend:pdf.kde.bandwidth.data|
weights: |@doc:pdf.kde.bandwidth.weights| Weights of each event
in *data*, can be None or Tensor-like with shape compatible
with *data*. This will change the count of the events, whereas
weight :math:`w_i` of :math:`x_i`. |@docend:pdf.kde.bandwidth.weights|
Returns:
Estimated bandwidth array of same size as data
"""
data = z.convert_to_tensor(data)
n = znp.sum(weights) if weights is not None else znp.asarray(tf.shape(data)[0], ztypes.float)
probs = func(data)
divisor = min_std_or_iqr(data, weights)
bandwidth = z.sqrt(divisor / probs)
bandwidth *= znp.asarray(n, ztypes.float) ** (-1.0 / 5.0) * 1.059
return bandwidth
def adaptive_factory(func, grid):
if grid:
def adaptive(constructor, data, **kwargs):
kwargs.pop("name", None)
kde_silverman = constructor(
bandwidth="silverman",
data=data,
name="INTERNAL_for_adaptive_kde",
**kwargs,
)
grid = kde_silverman._grid
weights = kde_silverman._grid_data
return func(
data=grid,
func=kde_silverman.pdf,
weights=weights * znp.asarray(tf.shape(data)[0], ztypes.float),
)
else:
def adaptive(constructor, data, weights, **kwargs):
kwargs.pop("name", None)
kde_silverman = constructor(
bandwidth="silverman",
data=data,
name="INTERNAL_for_adaptive_kde",
**kwargs,
)
return func(data=data, func=kde_silverman.pdf, weights=weights)
return adaptive
_adaptive_geom_bandwidth_grid_KDEV1 = adaptive_factory(bandwidth_adaptive_geomV1, grid=True)
_adaptive_geom_bandwidth_KDEV1 = adaptive_factory(bandwidth_adaptive_geomV1, grid=False)
_adaptive_std_bandwidth_grid_KDEV1 = adaptive_factory(bandwidth_adaptive_stdV1, grid=True)
_adaptive_std_bandwidth_KDEV1 = adaptive_factory(bandwidth_adaptive_stdV1, grid=False)
_adaptive_zfit_bandwidth_grid_KDEV1 = adaptive_factory(bandwidth_adaptive_zfitV1, grid=True)
_adaptive_zfit_bandwidth_KDEV1 = adaptive_factory(bandwidth_adaptive_zfitV1, grid=False)
def _bandwidth_scott_KDEV1(data, weights, *_, **__):
return bandwidth_scott(
data,
weights=weights,
)
def _bandwidth_silverman_KDEV1(data, weights, *_, **__):
return bandwidth_silverman(
data,
weights=weights,
)
def _bandwidth_isj_KDEV1(data, weights, *_, **__):
return bandwidth_isj(data, weights=weights)
def check_bw_grid_shapes(bandwidth, grid=None, n_grid=None):
if run.executing_eagerly() and bw_is_arraylike(bandwidth, allow1d=False):
n_grid = grid.shape[0] if grid is not None else n_grid
if n_grid is None:
msg = "Either the grid or n_grid must be given."
raise ValueError(msg)
if bandwidth.shape[0] != n_grid:
msg = "The bandwidth array must have the same length as the grid"
raise ShapeIncompatibleError(msg)
@z.function(wraps="tensor", keepalive=True)
def min_std_or_iqr(x, weights):
if weights is not None:
return znp.minimum(
znp.sqrt(tf.nn.weighted_moments(x, axes=[0], frequency_weights=weights)[1]),
weighted_quantile(x, 0.75, weights=weights)[0] - weighted_quantile(x, 0.25, weights=weights)[0],
)
else:
return znp.minimum(znp.std(x), (tfp.stats.percentile(x, 75) - tfp.stats.percentile(x, 25)))
@z.function(wraps="tensor", keepalive=True)
def calc_kernel_probs(size, weights):
if weights is not None:
return weights / znp.sum(weights)
else:
return tf.broadcast_to(1 / size, shape=(znp.asarray(size, tf.int32),))
class KDEHelper:
_bandwidth_methods: ClassVar = {
"scott": _bandwidth_scott_KDEV1,
"silverman": _bandwidth_silverman_KDEV1,
}
_default_padding = False
_default_num_grid_points = 1024
def _convert_init_data_weights_size(self, data, weights, padding, limits=None, bandwidth=None):
self._original_data = data # for copying
if isinstance(data, ZfitData):
if data.weights is not None:
if weights is not None:
msg = "Cannot specify weights and use a `ZfitData` with weights."
raise OverdefinedError(msg)
weights = data.weights
if data.n_obs > 1:
msg = f"KDE is 1 dimensional, but data {data} has {data.n_obs} observables."
raise ShapeIncompatibleError(msg)
data = z.unstack_x(data)
if callable(padding):
data, weights, bandwidth = padding(data=data, weights=weights, limits=limits, bandwidth=bandwidth)
elif padding is not False:
data, weights, bandwidth = padreflect_data_weights_1dim(
data, weights=weights, mode=padding, limits=limits, bandwidth=bandwidth
)
shape_data = tf.shape(data)
size = znp.asarray(shape_data[0], ztypes.float)
data = znp.asarray(data, dtype=ztypes.float)
return data, size, weights, bandwidth
def _convert_input_bandwidth(self, bandwidth, data, **kwargs):
if bandwidth is None:
bandwidth = "silverman"
# estimate bandwidth
bandwidth_param = bandwidth
if isinstance(bandwidth, str):
bandwidth = self._bandwidth_methods.get(bandwidth)
if bandwidth is None:
msg = (
f"Cannot use {bandwidth} as a bandwidth method. Use a numerical value or one of"
f" the defined methods: {list(self._bandwidth_methods.keys())}"
)
raise ValueError(msg)
if (not isinstance(bandwidth, ZfitParameter)) and callable(bandwidth):
bandwidth = bandwidth(constructor=type(self), data=data, **kwargs)
is_arraylike = bw_is_arraylike(bandwidth_param, allow1d=True)
if (
bandwidth_param is None
or is_arraylike
or bandwidth_param
in (
"adaptiveV1",
"adaptive",
"adaptive_zfit",
"adaptive_std",
"adaptive_geom",
)
):
bandwidth_param = -999999 # dummy value, not needed. Better solution?
else:
bandwidth_param = bandwidth
if is_arraylike:
bandwidth = znp.asarray(bandwidth)
return bandwidth, bandwidth_param
def padreflect_data_weights_1dim(data, mode, weights=None, limits=None, bandwidth=None):
if mode is False:
return data, weights
if mode is True:
mode = znp.array(0.1)
if not isinstance(mode, dict):
mode = {"lowermirror": mode, "uppermirror": mode}
for key in mode:
if key not in ("lowermirror", "uppermirror"):
msg = (
f"Key '{key}' is not a valid padding specification, use 'lowermirror' or 'uppermirror'"
f" in order to mirror the data."
)
raise ValueError(msg)
if limits is None:
minimum = znp.min(data)
maximum = znp.max(data)
else: # todo: debug: check if limits are correct?
minimum = znp.array(limits[0])
maximum = znp.array(limits[1])
diff = maximum - minimum
new_data = []
new_weights = []
bw_is_array = bw_is_arraylike(bandwidth, allow1d=False)
if bw_is_array:
new_bw = []
if (lower := mode.get("lowermirror")) is not None:
dx_lower = diff * lower
lower_area = data < minimum + dx_lower
lower_index = znp.where(lower_area)[0]
lower_data = tf.gather(data, indices=lower_index)
lower_data_mirrored = -lower_data + 2 * minimum
new_data.append(lower_data_mirrored)
if weights is not None:
lower_weights = tf.gather(weights, indices=lower_index)
new_weights.append(lower_weights)
if bw_is_array:
new_bw.append(tf.gather(bandwidth, indices=lower_index))
new_data.append(data)
new_weights.append(weights)
if bw_is_array:
new_bw.append(bandwidth)
if (upper := mode.get("uppermirror")) is not None:
dx_upper = diff * upper
upper_area = data > maximum - dx_upper
upper_index = znp.where(upper_area)[0]
upper_data = tf.gather(data, indices=upper_index)
upper_data_mirrored = -upper_data + 2 * maximum
new_data.append(upper_data_mirrored)
if weights is not None:
upper_weights = tf.gather(weights, indices=upper_index)
new_weights.append(upper_weights)
if bw_is_array:
new_bw.append(tf.gather(bandwidth, indices=upper_index))
data = tf.concat(new_data, axis=0)
if weights is not None:
weights = tf.concat(new_weights, axis=0)
if bw_is_array:
bandwidth = tf.concat(new_bw, axis=0)
return data, weights, bandwidth
[docs]
class GaussianKDE1DimV1(KDEHelper, WrapDistribution):
_N_OBS = 1
_bandwidth_methods = KDEHelper._bandwidth_methods.copy()
_bandwidth_methods.update({"adaptive": _adaptive_std_bandwidth_KDEV1, "isj": _bandwidth_isj_KDEV1})
def __init__(
self,
obs: ztyping.ObsTypeInput,
data: ztyping.ParamTypeInput,
bandwidth: ztyping.ParamTypeInput | str = None,
weights: None | np.ndarray | tf.Tensor = None,
truncate: bool = False,
*,
extended: ExtendedInputType = None,
norm: NormInputType = None,
name: str = "GaussianKDE1DimV1",
label: str | None = None,
):
r"""EXPERIMENTAL, `FEEDBACK WELCOME.
<https://github.com/zfit/zfit/issues/new?assignees=&labels=&template=other.md&title=>`_ Exact, one dimensional,
(truncated) Kernel Density Estimation with a Gaussian Kernel.
This implementation features an exact implementation as is preferably used for smaller (max. ~ a few thousand
points) data sets. For larger data sets, methods such as :py:class:`~zfit.pdf.KDE1DimGrid`
that bin the dataset may be more efficient
Kernel Density Estimation is a non-parametric method to approximate the density of given points.
.. math::
f_h(x) = \frac{1}{nh} \sum_{i=1}^n K\Big(\frac{x-x_i}{h}\Big)
where the kernel in this case is a (truncated) Gaussian
.. math::
K = \exp \Big(\frac{(x - x_i)^2}{\sigma^2}\Big)
The bandwidth of the kernel can be estimated in different ways. It can either be a global bandwidth,
corresponding to a single value, or a local bandwidth, each corresponding to one data point.
**Usage**
The KDE can be instantiated by using a numpy-like data sample, preferably a `zfit.Data` object. This
will be used as the mean of the kernels. The bandwidth can either be given as a parameter (with length
1 for a global one or length equal to the data size) - a rather advanced concept for methods such as
cross validation - or determined from the data automatically, either through a simple method like
scott or silverman rule of thumbs or through an iterative, adaptive method.
Examples
--------
.. code-block:: python
# generate some example kernels
size = 150
data = np.random.normal(size=size, loc=2, scale=3)
limits = (-15, 5)
obs = zfit.Space("obs1", limits=limits)
kde_silverman = zfit.pdf.GaussianKDE1DimV1(data=data, obs=obs, bandwidth='silverman')
# for a better smoothing of the kernels, use an adaptive approach
kde = zfit.pdf.GaussianKDE1DimV1(data=data, obs=obs, bandwidth='adaptive')
Args:
data: |@doc:pdf.kde.init.data| Data sample to approximate
the density from. The points represent positions of the *kernel*,
the :math:`x_i`. This is preferrably a ``ZfitData``, but can also
be an array-like object.
If the data has weights, they will be taken into account.
This will change the count of the events, whereas
weight :math:`w_i` of :math:`x_i` will scale the value of
:math:`K_i( x_i)`, resulting in a factor of :math:`\frac{w_i}{\sum w_i} `.
If no weights are given, each kernel will be scaled by the same
constant :math:`\frac{1}{n_{data}}`. |@docend:pdf.kde.init.data|
obs: |@doc:pdf.kde.init.obs| Observable space of the KDE.
As with any other PDF, this will be used as the default *norm*, but
does not define the domain of the PDF. Namely, this can be a smaller
space than *data*, as long as the name of the observable match.
Using a larger dataset is actually good practice avoiding
bountary biases, see also :ref:`sec-boundary-bias-and-padding`. |@docend:pdf.kde.init.obs|
bandwidth: Valid pre-defined options are {'silverman', 'scott', 'adaptive'}.
|@doc:pdf.kde.init.bandwidth| Bandwidth of the kernel,
often also denoted as :math:`h`. For a Gaussian kernel, this
corresponds to *sigma*. This can be calculated using
pre-defined options or by specifying a numerical value that is
broadcastable to *data* -- a scalar or an array-like
object with the same size as *data*.
A scalar value is usually referred to as a global bandwidth while
an array holds local bandwidths |@docend:pdf.kde.init.bandwidth|
The bandwidth can also be a parameter, which should be used with caution. However,
it allows to use it in cross-valitadion with a likelihood method.
weights: |@doc:pdf.kde.init.weights| Weights of each event
in *data*, can be None or Tensor-like with shape compatible
with *data*. Instead of using this parameter, it is preferred
to use a ``ZfitData`` as *data* that contains weights.
This will change the count of the events, whereas
weight :math:`w_i` of :math:`x_i` will scale the value of :math:`K_i( x_i)`,
resulting in a factor of :math:`\frac{w_i}{\sum w_i} `.
If no weights are given, each kernel will be scaled by the same
constant :math:`\frac{1}{n_{data}}`. |@docend:pdf.kde.init.weights|
truncate: If a truncated Gaussian kernel should be used with the limits given by the `obs` lower and
upper limits. This can cause NaNs in case datapoints are outside the limits.
extended: |@doc:pdf.init.extended| The overall yield of the PDF.
If this is parameter-like, it will be used as the yield,
the expected number of events, and the PDF will be extended.
An extended PDF has additional functionality, such as the
``ext_*`` methods and the ``counts`` (for binned PDFs). |@docend:pdf.init.extended|
norm: |@doc:pdf.init.norm| Normalization of the PDF.
By default, this is the same as the default space of the PDF. |@docend:pdf.init.norm|
extended: |@doc:pdf.init.extended| The overall yield of the PDF.
If this is parameter-like, it will be used as the yield,
the expected number of events, and the PDF will be extended.
An extended PDF has additional functionality, such as the
``ext_*`` methods and the ``counts`` (for binned PDFs). |@docend:pdf.init.extended|
name: |@doc:pdf.init.name| Name of the PDF.
Maybe has implications on the serialization and deserialization of the PDF.
For a human-readable name, use the label. |@docend:pdf.init.name|
label: |@doc:pdf.init.label| Human-readable name
or label of
the PDF for a better description, to be used with plots etc.
Has no programmatical functional purpose as identification. |@docend:pdf.init.label|
"""
original_data = data
data, size, weights, _ = self._convert_init_data_weights_size(
data,
weights,
padding=False,
limits=None,
bandwidth=bandwidth,
)
bandwidth, bandwidth_param = self._convert_input_bandwidth(
bandwidth=bandwidth,
data=data,
truncate=truncate,
name=name,
obs=obs,
weights=weights,
)
params = {"bandwidth": bandwidth_param}
probs = calc_kernel_probs(size, weights)
categorical = tfd.Categorical(probs=probs) # no grad -> no need to recreate
# create distribution factory
if truncate:
if not isinstance(obs, ZfitSpace):
msg = "`obs` has to be a `ZfitSpace` if `truncated` is True."
raise ValueError(msg)
inside = obs.inside(data)
all_inside = znp.all(inside)
tf.debugging.assert_equal(
all_inside,
True,
message="Not all data points are inside the limits but" " a truncate kernel was chosen.",
)
def kernel_factory():
return tfp.distributions.TruncatedNormal(
loc=self._data,
scale=self._bandwidth,
low=self.space.v1.lower,
high=self.space.v1.upper,
)
else:
def kernel_factory():
return tfp.distributions.Normal(loc=self._data, scale=self._bandwidth)
def dist_kwargs():
return {
"mixture_distribution": categorical,
"components_distribution": kernel_factory(),
}
distribution = tfd.MixtureSameFamily
super().__init__(
obs=obs,
params=params,
dist_params={},
dist_kwargs=dist_kwargs,
distribution=distribution,
extended=extended,
norm=norm,
name=name,
label=label,
)
self._data_weights = weights
self._bandwidth = bandwidth
self._data = data
self._original_data = original_data # for copying
self._truncate = truncate
[docs]
class KDE1DimExact(KDEHelper, WrapDistribution, SerializableMixin):
_bandwidth_methods = KDEHelper._bandwidth_methods.copy()
_bandwidth_methods.update(
{
"adaptive_geom": _adaptive_geom_bandwidth_KDEV1,
"adaptive_std": _adaptive_std_bandwidth_KDEV1,
"adaptive_zfit": _adaptive_zfit_bandwidth_KDEV1,
"isj": _bandwidth_isj_KDEV1,
}
)
def __init__(
self,
data: ztyping.XTypeInput,
*,
obs: ztyping.ObsTypeInput | None = None,
bandwidth: ztyping.ParamTypeInput | str | Callable | None = None,
kernel: tfd.Distribution = None,
padding: callable | str | bool | None = None,
weights: np.ndarray | tf.Tensor | None = None,
extended: ExtendedInputType = None,
norm: NormInputType = None,
name: str | None = "ExactKDE1DimV1",
label: str | None = None,
):
r"""Kernel Density Estimation is a non-parametric method to approximate the density of given points.
For a more in-depth explanation, see also in the section about Kernel Density Estimation
:ref:`section-exact-kdes`
Given a sample *data* we want to estimate the
.. math::
f_h(x) = \frac{1}{nh} \sum_{i=1}^n K\Big(\frac{x-x_i}{h}\Big)
This PDF features an exact implementation as is preferable for smaller (max. ~ a few thousand
points) data sets. For larger data sets, methods such as :py:class:`~zfit.pdf.KDE1DimGrid`
that bin the dataset may be more efficient
Kernel Density Estimation is a non-parametric method to approximate the density of given points.
.. math::
f_h(x) = \frac{1}{nh} \sum_{i=1}^n K\Big(\frac{x-x_i}{h}\Big)
where the kernel in this case is a (truncated) Gaussian
.. math::
K = \exp \Big(\frac{(x - x_i)^2}{\sigma^2}\Big)
The bandwidth of the kernel can be estimated in different ways. It can either be a global bandwidth,
corresponding to a single value, or a local bandwidth, each corresponding to one data point.
Args:
data: |@doc:pdf.kde.init.data| Data sample to approximate
the density from. The points represent positions of the *kernel*,
the :math:`x_i`. This is preferrably a ``ZfitData``, but can also
be an array-like object.
If the data has weights, they will be taken into account.
This will change the count of the events, whereas
weight :math:`w_i` of :math:`x_i` will scale the value of
:math:`K_i( x_i)`, resulting in a factor of :math:`\frac{w_i}{\sum w_i} `.
If no weights are given, each kernel will be scaled by the same
constant :math:`\frac{1}{n_{data}}`. |@docend:pdf.kde.init.data|
obs: |@doc:pdf.kde.init.obs| Observable space of the KDE.
As with any other PDF, this will be used as the default *norm*, but
does not define the domain of the PDF. Namely, this can be a smaller
space than *data*, as long as the name of the observable match.
Using a larger dataset is actually good practice avoiding
bountary biases, see also :ref:`sec-boundary-bias-and-padding`. |@docend:pdf.kde.init.obs|
bandwidth: Valid pre-defined options are {'silverman', 'scott',
'adaptive_zfit', 'adaptive_geom', 'adaptive_std', 'isj'}.
|@doc:pdf.kde.init.bandwidth| Bandwidth of the kernel,
often also denoted as :math:`h`. For a Gaussian kernel, this
corresponds to *sigma*. This can be calculated using
pre-defined options or by specifying a numerical value that is
broadcastable to *data* -- a scalar or an array-like
object with the same size as *data*.
A scalar value is usually referred to as a global bandwidth while
an array holds local bandwidths |@docend:pdf.kde.init.bandwidth|
The bandwidth can also be a parameter, which should be used with caution. However,
it allows to use it in cross-valitadion with a likelihood method.
kernel: |@doc:pdf.kde.init.kernel| The kernel is the heart
of the Kernel Density Estimation, which consists of the sum of
kernels around each sample point. Therefore, a kernel should represent
the distribution probability of a single data point as close as
possible.
The most widespread kernel is a Gaussian, or Normal, distribution. Due
to the law of large numbers, the sum of many (arbitrary) random variables
-- this is the case for most real world observable as they are the result of
multiple consecutive random effects -- results in a Gaussian distribution.
However, there are many cases where this assumption is not per-se true. In
this cases an alternative kernel may offer a better choice.
Valid choices are callables that return a
:py:class:`~tensorflow_probability.distribution.Distribution`, such as all distributions
that belong to the loc-scale family. |@docend:pdf.kde.init.kernel|
padding: |@doc:pdf.kde.init.padding| KDEs have a peculiar
weakness: the boundaries, as the outside has a zero density. This makes the KDE
go down at the bountary as well, as the density approaches zero, no matter what the
density inside the boundary was.
There are two ways to circumvent this problem:
- the best solution: providing a larger dataset than the default space the PDF is used in
- mirroring the existing data at the boundaries, which is equivalent to a boundary condition
with a zero derivative. This is a padding technique and can improve the boundaries.
However, one important drawback of this method is to keep in mind that this will actually
alter the PDF *to look mirrored*. If the PDF is plotted in a larger range, this becomes
clear.
Possible options are a number (default 0.1) that depicts the fraction of the overall space
that defines the data mirrored on both sides. For example, for a space from 0 to 5, a value of
0.3 means that all data in the region of 0 to 1.5 is taken, mirrored around 0 as well as
all data from 3.5 to 5 and mirrored at 5. The new data will go from -1.5 to 6.5, so the
KDE is also having a shape outside the desired range. Using it only for the range 0 to 5
hides this.
Using a dict, each side separately (or only a single one) can be mirrored, like ``{'lowermirror: 0.1}``
or ``{'lowermirror: 0.2, 'uppermirror': 0.1}``.
For more control, a callable that takes data and weights can also be used. |@docend:pdf.kde.init.padding|
weights: |@doc:pdf.kde.init.weights| Weights of each event
in *data*, can be None or Tensor-like with shape compatible
with *data*. Instead of using this parameter, it is preferred
to use a ``ZfitData`` as *data* that contains weights.
This will change the count of the events, whereas
weight :math:`w_i` of :math:`x_i` will scale the value of :math:`K_i( x_i)`,
resulting in a factor of :math:`\frac{w_i}{\sum w_i} `.
If no weights are given, each kernel will be scaled by the same
constant :math:`\frac{1}{n_{data}}`. |@docend:pdf.kde.init.weights|
extended: |@doc:pdf.init.extended| The overall yield of the PDF.
If this is parameter-like, it will be used as the yield,
the expected number of events, and the PDF will be extended.
An extended PDF has additional functionality, such as the
``ext_*`` methods and the ``counts`` (for binned PDFs). |@docend:pdf.init.extended|
norm: |@doc:pdf.init.norm| Normalization of the PDF.
By default, this is the same as the default space of the PDF. |@docend:pdf.init.norm|
extended: |@doc:pdf.init.extended| The overall yield of the PDF.
If this is parameter-like, it will be used as the yield,
the expected number of events, and the PDF will be extended.
An extended PDF has additional functionality, such as the
``ext_*`` methods and the ``counts`` (for binned PDFs). |@docend:pdf.init.extended|
name: |@doc:pdf.init.name| Name of the PDF.
Maybe has implications on the serialization and deserialization of the PDF.
For a human-readable name, use the label. |@docend:pdf.init.name|
label: |@doc:pdf.init.label| Human-readable name
or label of
the PDF for a better description, to be used with plots etc.
Has no programmatical functional purpose as identification. |@docend:pdf.init.label|
"""
original_init = {
"data": data,
"obs": obs,
"bandwidth": bandwidth,
"kernel": kernel,
"padding": padding,
"weights": weights,
"extended": extended,
"name": name,
}
if kernel is None:
kernel = tfd.Normal
if padding is None:
padding = self._default_padding
if obs is None:
if not isinstance(data, ZfitData) or not data.space.has_limits:
msg = "obs can only be None if data is ZfitData with limits."
raise ValueError(msg)
obs = data.space
data, size, weights, bandwidth = self._convert_init_data_weights_size(
data, weights, padding=padding, limits=obs.v1.limits, bandwidth=bandwidth
)
self._padding = padding
bandwidth, bandwidth_param = self._convert_input_bandwidth(
bandwidth=bandwidth,
data=data,
name=name,
obs=obs,
weights=weights,
padding=False,
kernel=kernel,
label=label,
)
self._original_data = data # for copying
def components_distribution_generator(loc, scale):
return tfd.Independent(kernel(loc=loc, scale=scale))
self._data = data
self._bandwidth = bandwidth
self._kernel = kernel
self._weights = weights
probs = calc_kernel_probs(size, weights)
mixture_distribution = tfd.Categorical(probs=probs)
components_distribution = components_distribution_generator(loc=self._data, scale=self._bandwidth)
def dist_kwargs():
return {
"mixture_distribution": mixture_distribution,
"components_distribution": components_distribution,
}
distribution = tfd.MixtureSameFamily
params = {"bandwidth": bandwidth_param}
super().__init__(
obs=obs,
params=params,
dist_params={},
dist_kwargs=dist_kwargs,
distribution=distribution,
extended=extended,
norm=norm,
name=name,
)
self.hs3.original_init.update(original_init)
class KDE1DimExactRepr(BasePDFRepr):
_implementation = KDE1DimExact
hs3_type: Literal["KDE1DimExact"] = pydantic.Field("KDE1DimExact", alias="type")
data: Union[np.ndarray, Serializer.types.DataTypeDiscriminated]
obs: Optional[SpaceRepr] = None
bandwidth: Optional[Union[str, float]] = None
kernel: None = None
padding: Optional[Union[bool, str]] = None
weights: Optional[Union[np.ndarray, tf.Tensor]] = None
name: Optional[str] = "KDE1DimExact"
@pydantic.validator("kernel", pre=True)
def validate_kernel(cls, v):
if v is not None:
if v != tfd.Normal:
msg = "Kernel must be None for KDE1DimExact to be serialized."
raise ValueError(msg)
v = None
return v
@pydantic.root_validator(pre=True)
def validate_all(cls, values):
values = dict(values)
if cls.orm_mode(values):
for k, v in values["hs3"].original_init.items():
values[k] = v
return values
[docs]
class KDE1DimGrid(KDEHelper, WrapDistribution, SerializableMixin):
_N_OBS = 1
_bandwidth_methods = KDEHelper._bandwidth_methods.copy()
_bandwidth_methods.update(
{
"adaptive_geom": _adaptive_geom_bandwidth_grid_KDEV1,
"adaptive_zfit": _adaptive_zfit_bandwidth_grid_KDEV1,
# 'adaptive_std': _adaptive_std_bandwidth_grid_KDEV1,
}
)
def __init__(
self,
data: ztyping.XTypeInput,
*,
bandwidth: ztyping.ParamTypeInput | str | Callable | None = None,
kernel: tfd.Distribution = None,
padding: callable | str | bool | None = None,
num_grid_points: int | None = None,
binning_method: str | None = None,
obs: ztyping.ObsTypeInput | None = None,
weights: np.ndarray | tf.Tensor | None = None,
extended: ExtendedInputType = None,
norm: NormInputType = None,
name: str = "GridKDE1DimV1",
label: str | None = None,
):
r"""Kernel Density Estimation is a non-parametric method to approximate the density of given points.
For a more in-depth explanation, see also in the section about Kernel Density Estimation
:ref:`sec-grid-kdes`
.. math::
f_h(x) = \frac{1}{nh} \sum_{i=1}^n K\Big(\frac{x-x_i}{h}\Big)
Args:
data: |@doc:pdf.kde.init.data| Data sample to approximate
the density from. The points represent positions of the *kernel*,
the :math:`x_i`. This is preferrably a ``ZfitData``, but can also
be an array-like object.
If the data has weights, they will be taken into account.
This will change the count of the events, whereas
weight :math:`w_i` of :math:`x_i` will scale the value of
:math:`K_i( x_i)`, resulting in a factor of :math:`\frac{w_i}{\sum w_i} `.
If no weights are given, each kernel will be scaled by the same
constant :math:`\frac{1}{n_{data}}`. |@docend:pdf.kde.init.data|
bandwidth: Valid pre-defined options are {'silverman', 'scott',
'adaptive_zfit', 'adaptive_geom'}.
|@doc:pdf.kde.init.bandwidth| Bandwidth of the kernel,
often also denoted as :math:`h`. For a Gaussian kernel, this
corresponds to *sigma*. This can be calculated using
pre-defined options or by specifying a numerical value that is
broadcastable to *data* -- a scalar or an array-like
object with the same size as *data*.
A scalar value is usually referred to as a global bandwidth while
an array holds local bandwidths |@docend:pdf.kde.init.bandwidth|
kernel: |@doc:pdf.kde.init.kernel| The kernel is the heart
of the Kernel Density Estimation, which consists of the sum of
kernels around each sample point. Therefore, a kernel should represent
the distribution probability of a single data point as close as
possible.
The most widespread kernel is a Gaussian, or Normal, distribution. Due
to the law of large numbers, the sum of many (arbitrary) random variables
-- this is the case for most real world observable as they are the result of
multiple consecutive random effects -- results in a Gaussian distribution.
However, there are many cases where this assumption is not per-se true. In
this cases an alternative kernel may offer a better choice.
Valid choices are callables that return a
:py:class:`~tensorflow_probability.distribution.Distribution`, such as all distributions
that belong to the loc-scale family. |@docend:pdf.kde.init.kernel|
padding: |@doc:pdf.kde.init.padding| KDEs have a peculiar
weakness: the boundaries, as the outside has a zero density. This makes the KDE
go down at the bountary as well, as the density approaches zero, no matter what the
density inside the boundary was.
There are two ways to circumvent this problem:
- the best solution: providing a larger dataset than the default space the PDF is used in
- mirroring the existing data at the boundaries, which is equivalent to a boundary condition
with a zero derivative. This is a padding technique and can improve the boundaries.
However, one important drawback of this method is to keep in mind that this will actually
alter the PDF *to look mirrored*. If the PDF is plotted in a larger range, this becomes
clear.
Possible options are a number (default 0.1) that depicts the fraction of the overall space
that defines the data mirrored on both sides. For example, for a space from 0 to 5, a value of
0.3 means that all data in the region of 0 to 1.5 is taken, mirrored around 0 as well as
all data from 3.5 to 5 and mirrored at 5. The new data will go from -1.5 to 6.5, so the
KDE is also having a shape outside the desired range. Using it only for the range 0 to 5
hides this.
Using a dict, each side separately (or only a single one) can be mirrored, like ``{'lowermirror: 0.1}``
or ``{'lowermirror: 0.2, 'uppermirror': 0.1}``.
For more control, a callable that takes data and weights can also be used. |@docend:pdf.kde.init.padding|
num_grid_points: |@doc:pdf.kde.init.num_grid_points| Number of points in
the binning grid.
The data will be binned using the *binning_method* in *num_grid_points*
and this histogram grid will then be used as kernel points. This has the
advantage to have a constant computational complexity independent of the data
size.
A number from 32 on can already yield good results, while the default is set
to 1024, creating a fine grid. Lowering the number increases the performance
at the cost of accuracy. |@docend:pdf.kde.init.num_grid_points|
binning_method: |@doc:pdf.kde.init.binning_method| Method to be used for
binning the data. Options are 'linear', 'simple'.
The data can be binned in the usual way ('simple'), but this is less precise
for KDEs, where we are interested in the shape of the histogram and smoothing
it. Therefore, a better suited method, 'linear', is available.
In normal binnig, each event (or weight) falls into the bin within the bin edges,
while the neighbouring bins get zero counts from this event.
In linear binning, the event is split between two bins, proportional to its
closeness to each bin.
The 'linear' method provides superior performance, most notably in small (~32)
grids. |@docend:pdf.kde.init.binning_method|
obs: |@doc:pdf.kde.init.obs| Observable space of the KDE.
As with any other PDF, this will be used as the default *norm*, but
does not define the domain of the PDF. Namely, this can be a smaller
space than *data*, as long as the name of the observable match.
Using a larger dataset is actually good practice avoiding
bountary biases, see also :ref:`sec-boundary-bias-and-padding`. |@docend:pdf.kde.init.obs|
weights: |@doc:pdf.kde.init.weights| Weights of each event
in *data*, can be None or Tensor-like with shape compatible
with *data*. Instead of using this parameter, it is preferred
to use a ``ZfitData`` as *data* that contains weights.
This will change the count of the events, whereas
weight :math:`w_i` of :math:`x_i` will scale the value of :math:`K_i( x_i)`,
resulting in a factor of :math:`\frac{w_i}{\sum w_i} `.
If no weights are given, each kernel will be scaled by the same
constant :math:`\frac{1}{n_{data}}`. |@docend:pdf.kde.init.weights|
extended: |@doc:pdf.init.extended| The overall yield of the PDF.
If this is parameter-like, it will be used as the yield,
the expected number of events, and the PDF will be extended.
An extended PDF has additional functionality, such as the
``ext_*`` methods and the ``counts`` (for binned PDFs). |@docend:pdf.init.extended|
norm: |@doc:pdf.init.norm| Normalization of the PDF.
By default, this is the same as the default space of the PDF. |@docend:pdf.init.norm|
extended: |@doc:pdf.init.extended| The overall yield of the PDF.
If this is parameter-like, it will be used as the yield,
the expected number of events, and the PDF will be extended.
An extended PDF has additional functionality, such as the
``ext_*`` methods and the ``counts`` (for binned PDFs). |@docend:pdf.init.extended|
name: |@doc:pdf.init.name| Name of the PDF.
Maybe has implications on the serialization and deserialization of the PDF.
For a human-readable name, use the label. |@docend:pdf.init.name|
label: |@doc:pdf.init.label| Human-readable name
or label of
the PDF for a better description, to be used with plots etc.
Has no programmatical functional purpose as identification. |@docend:pdf.init.label|
"""
original_init = {
"data": data,
"bandwidth": bandwidth,
"kernel": kernel,
"binning_method": binning_method,
"num_grid_points": num_grid_points,
"obs": obs,
"weights": weights,
"padding": padding,
"name": name,
"extended": extended,
"norm": norm,
}
if kernel is None:
kernel = tfd.Normal
if binning_method is None:
binning_method = "linear"
if num_grid_points is None:
num_grid_points = self._default_num_grid_points
if isinstance(bandwidth, str): # numpy arrays cannot be compared with equal-> "use any, all"
if bandwidth == "isj":
msg = "isj not supported in GridKDE, use directly 'KDE1DimISJ'"
raise ValueError(msg)
if bandwidth == "adaptive_std":
msg = (
"adaptive_std not supported in GridKDE due to very bad results. This is maybe caused"
" by an issue regarding weights of the underlaying implementation."
)
raise ValueError(msg)
if padding is None:
padding = self._default_padding
if obs is None:
if not isinstance(data, ZfitData) or not data.space.has_limits:
msg = "obs can only be None if data is ZfitData with limits."
raise ValueError(msg)
obs = data.space
data, size, weights, _ = self._convert_init_data_weights_size(
data, weights, padding=padding, limits=obs.v1.limits, bandwidth=bandwidth
)
self._padding = padding
self._original_data = data # for copying
def components_distribution_generator(loc, scale):
return tfd.Independent(kernel(loc=loc, scale=scale))
if num_grid_points is not None:
num_grid_points = tf.minimum(znp.asarray(size, ztypes.int), znp.asarray(num_grid_points, ztypes.int))
self._num_grid_points = num_grid_points
self._binning_method = binning_method
self._data = data
self._grid = binning_util.generate_1d_grid(self._data, num_grid_points=self._num_grid_points)
bandwidth, bandwidth_param = self._convert_input_bandwidth(
bandwidth=bandwidth,
data=data,
binning_method=binning_method,
num_grid_points=num_grid_points,
padding=False,
kernel=kernel,
name=name,
obs=obs,
weights=weights,
)
self._bandwidth = bandwidth
self._kernel = kernel
self._weights = weights
self._grid_data = binning_util.bin_1d(self._binning_method, self._data, self._grid, self._weights)
mixture_distribution = tfd.Categorical(probs=self._grid_data)
check_bw_grid_shapes(self._bandwidth, self._grid)
components_distribution = components_distribution_generator(loc=self._grid, scale=self._bandwidth)
def dist_kwargs():
return {
"mixture_distribution": mixture_distribution,
"components_distribution": components_distribution,
}
distribution = tfd.MixtureSameFamily
params = {"bandwidth": bandwidth_param}
super().__init__(
obs=obs,
params=params,
dist_params={},
dist_kwargs=dist_kwargs,
distribution=distribution,
extended=extended,
norm=norm,
name=name,
label=label,
)
self.hs3.original_init.update(original_init)
def bw_is_arraylike(bw, allow1d):
return (
hasattr(bw, "shape")
and bw.shape
and len(bw.shape) > 0
and (bw.shape[0] is None or (bw.shape[0] > 1 or allow1d))
)
class KDE1DimGridRepr(BasePDFRepr):
_implementation = KDE1DimGrid
hs3_type: Literal["KDE1DimGrid"] = pydantic.Field("KDE1DimGrid", alias="type")
data: Union[np.ndarray, Serializer.types.DataTypeDiscriminated]
obs: Optional[SpaceRepr] = None
bandwidth: Optional[Union[str, float]] = None
num_grid_points: Optional[int] = None
binning_method: Optional[str] = None
kernel: None = None
padding: Optional[Union[bool, str]] = None
weights: Optional[Union[np.ndarray, tf.Tensor]] = None
name: Optional[str] = "GridKDE1DimV1"
@pydantic.validator("kernel", pre=True)
def validate_kernel(cls, v):
if v is not None:
if v != tfd.Normal:
msg = "Kernel must be None for GridKDE1DimV1 to be serialized."
raise ValueError(msg)
v = None
return v
@pydantic.root_validator(pre=True)
def validate_all(cls, values):
values = dict(values)
if cls.orm_mode(values):
for k, v in values["hs3"].original_init.items():
values[k] = v
return values
[docs]
class KDE1DimFFT(KDEHelper, BasePDF, SerializableMixin):
_N_OBS = 1
def __init__(
self,
data: ztyping.XTypeInput,
*,
obs: ztyping.ObsTypeInput | None = None,
bandwidth: ztyping.ParamTypeInput | str | Callable | None = None,
kernel: tfd.Distribution = None,
num_grid_points: int | None = None,
binning_method: str | None = None,
support=None,
fft_method: str | None = None,
padding: callable | str | bool | None = None,
weights: np.ndarray | tf.Tensor | None = None,
extended: ExtendedInputType = None,
norm: NormInputType = None,
name: str = "KDE1DimFFT",
label: str | None = None,
):
r"""Kernel Density Estimation is a non-parametric method to approximate the density of given points.
For a more in-depth explanation, see also in the section about Kernel Density Estimation
:ref:`sec-fft-kdes`
.. math::
f_h(x) = \frac{1}{nh} \sum_{i=1}^n K\Big(\frac{x-x_i}{h}\Big)
It is computed by using a convolution of the data with the kernels evaluated at fixed grid points and then
interpolating between this points to get an estimate for x.
Args:
data: |@doc:pdf.kde.init.data| Data sample to approximate
the density from. The points represent positions of the *kernel*,
the :math:`x_i`. This is preferrably a ``ZfitData``, but can also
be an array-like object.
If the data has weights, they will be taken into account.
This will change the count of the events, whereas
weight :math:`w_i` of :math:`x_i` will scale the value of
:math:`K_i( x_i)`, resulting in a factor of :math:`\frac{w_i}{\sum w_i} `.
If no weights are given, each kernel will be scaled by the same
constant :math:`\frac{1}{n_{data}}`. |@docend:pdf.kde.init.data|
bandwidth: |@doc:pdf.kde.init.bandwidth| Bandwidth of the kernel,
often also denoted as :math:`h`. For a Gaussian kernel, this
corresponds to *sigma*. This can be calculated using
pre-defined options or by specifying a numerical value that is
broadcastable to *data* -- a scalar or an array-like
object with the same size as *data*.
A scalar value is usually referred to as a global bandwidth while
an array holds local bandwidths |@docend:pdf.kde.init.bandwidth|
kernel: |@doc:pdf.kde.init.kernel| The kernel is the heart
of the Kernel Density Estimation, which consists of the sum of
kernels around each sample point. Therefore, a kernel should represent
the distribution probability of a single data point as close as
possible.
The most widespread kernel is a Gaussian, or Normal, distribution. Due
to the law of large numbers, the sum of many (arbitrary) random variables
-- this is the case for most real world observable as they are the result of
multiple consecutive random effects -- results in a Gaussian distribution.
However, there are many cases where this assumption is not per-se true. In
this cases an alternative kernel may offer a better choice.
Valid choices are callables that return a
:py:class:`~tensorflow_probability.distribution.Distribution`, such as all distributions
that belong to the loc-scale family. |@docend:pdf.kde.init.kernel|
padding: |@doc:pdf.kde.init.padding| KDEs have a peculiar
weakness: the boundaries, as the outside has a zero density. This makes the KDE
go down at the bountary as well, as the density approaches zero, no matter what the
density inside the boundary was.
There are two ways to circumvent this problem:
- the best solution: providing a larger dataset than the default space the PDF is used in
- mirroring the existing data at the boundaries, which is equivalent to a boundary condition
with a zero derivative. This is a padding technique and can improve the boundaries.
However, one important drawback of this method is to keep in mind that this will actually
alter the PDF *to look mirrored*. If the PDF is plotted in a larger range, this becomes
clear.
Possible options are a number (default 0.1) that depicts the fraction of the overall space
that defines the data mirrored on both sides. For example, for a space from 0 to 5, a value of
0.3 means that all data in the region of 0 to 1.5 is taken, mirrored around 0 as well as
all data from 3.5 to 5 and mirrored at 5. The new data will go from -1.5 to 6.5, so the
KDE is also having a shape outside the desired range. Using it only for the range 0 to 5
hides this.
Using a dict, each side separately (or only a single one) can be mirrored, like ``{'lowermirror: 0.1}``
or ``{'lowermirror: 0.2, 'uppermirror': 0.1}``.
For more control, a callable that takes data and weights can also be used. |@docend:pdf.kde.init.padding|
num_grid_points: |@doc:pdf.kde.init.num_grid_points| Number of points in
the binning grid.
The data will be binned using the *binning_method* in *num_grid_points*
and this histogram grid will then be used as kernel points. This has the
advantage to have a constant computational complexity independent of the data
size.
A number from 32 on can already yield good results, while the default is set
to 1024, creating a fine grid. Lowering the number increases the performance
at the cost of accuracy. |@docend:pdf.kde.init.num_grid_points|
binning_method: |@doc:pdf.kde.init.binning_method| Method to be used for
binning the data. Options are 'linear', 'simple'.
The data can be binned in the usual way ('simple'), but this is less precise
for KDEs, where we are interested in the shape of the histogram and smoothing
it. Therefore, a better suited method, 'linear', is available.
In normal binnig, each event (or weight) falls into the bin within the bin edges,
while the neighbouring bins get zero counts from this event.
In linear binning, the event is split between two bins, proportional to its
closeness to each bin.
The 'linear' method provides superior performance, most notably in small (~32)
grids. |@docend:pdf.kde.init.binning_method|
support:
fft_method:
obs: |@doc:pdf.kde.init.obs| Observable space of the KDE.
As with any other PDF, this will be used as the default *norm*, but
does not define the domain of the PDF. Namely, this can be a smaller
space than *data*, as long as the name of the observable match.
Using a larger dataset is actually good practice avoiding
bountary biases, see also :ref:`sec-boundary-bias-and-padding`. |@docend:pdf.kde.init.obs|
weights: |@doc:pdf.kde.init.weights| Weights of each event
in *data*, can be None or Tensor-like with shape compatible
with *data*. Instead of using this parameter, it is preferred
to use a ``ZfitData`` as *data* that contains weights.
This will change the count of the events, whereas
weight :math:`w_i` of :math:`x_i` will scale the value of :math:`K_i( x_i)`,
resulting in a factor of :math:`\frac{w_i}{\sum w_i} `.
If no weights are given, each kernel will be scaled by the same
constant :math:`\frac{1}{n_{data}}`. |@docend:pdf.kde.init.weights|
extended: |@doc:pdf.init.extended| The overall yield of the PDF.
If this is parameter-like, it will be used as the yield,
the expected number of events, and the PDF will be extended.
An extended PDF has additional functionality, such as the
``ext_*`` methods and the ``counts`` (for binned PDFs). |@docend:pdf.init.extended|
norm: |@doc:pdf.init.norm| Normalization of the PDF.
By default, this is the same as the default space of the PDF. |@docend:pdf.init.norm|
extended: |@doc:pdf.init.extended| The overall yield of the PDF.
If this is parameter-like, it will be used as the yield,
the expected number of events, and the PDF will be extended.
An extended PDF has additional functionality, such as the
``ext_*`` methods and the ``counts`` (for binned PDFs). |@docend:pdf.init.extended|
name: |@doc:pdf.init.name| Name of the PDF.
Maybe has implications on the serialization and deserialization of the PDF.
For a human-readable name, use the label. |@docend:pdf.init.name|
label: |@doc:pdf.init.label| Human-readable name
or label of
the PDF for a better description, to be used with plots etc.
Has no programmatical functional purpose as identification. |@docend:pdf.init.label|
"""
original_init = {
"data": data,
"bandwidth": bandwidth,
"num_grid_points": num_grid_points,
"binning_method": binning_method,
"support": support,
"fft_method": fft_method,
"obs": obs,
"weights": weights,
"extended": extended,
"norm": norm,
"name": name,
}
if isinstance(bandwidth, ZfitParameter):
msg = "bandwidth cannot be a Parameter for the FFT KDE."
raise TypeError(msg)
if num_grid_points is None:
num_grid_points = self._default_num_grid_points
if binning_method is None:
binning_method = "linear"
if fft_method is None:
fft_method = "conv1d"
if kernel is None:
kernel = tfd.Normal
if padding is None:
padding = self._default_padding
if obs is None:
if not isinstance(data, ZfitData) or not data.space.has_limits:
msg = "obs can only be None if data is ZfitData with limits."
raise ValueError(msg)
obs = data.space
data, size, weights, _ = self._convert_init_data_weights_size(
data, weights, padding=padding, limits=obs.v1.limits, bandwidth=bandwidth
)
self._padding = padding
bandwidth, bandwidth_param = self._convert_input_bandwidth(
bandwidth=bandwidth,
data=data,
padding=False,
kernel=kernel,
support=support,
fft_method=fft_method,
name=name,
obs=obs,
weights=weights,
)
num_grid_points = tf.minimum(znp.asarray(size, ztypes.int), tf.constant(num_grid_points, ztypes.int))
check_bw_grid_shapes(bandwidth, n_grid=num_grid_points)
self._num_grid_points = num_grid_points
self._binning_method = binning_method
self._fft_method = fft_method
self._data = data
self._bandwidth = bandwidth
params = {"bandwidth": self._bandwidth}
super().__init__(obs=obs, name=name, params=params, extended=extended, norm=norm, label=label)
self._kernel = kernel
self._weights = weights
if support is None:
area = znp.reshape(self.space.volume, ())
if area is not None:
support = area * 1.2
self._support = support
self._grid = None
self._grid_data = None
self._grid = binning_util.generate_1d_grid(self._data, num_grid_points=self._num_grid_points)
self._grid_data = binning_util.bin_1d(self._binning_method, self._data, self._grid, self._weights)
self._grid_estimations = convolution_util.convolve_1d_data_with_kernel(
self._kernel,
self._bandwidth,
self._grid_data,
self._grid,
self._support,
self._fft_method,
)
self.hs3.original_init.update(original_init)
def _unnormalized_pdf(self, x):
x = z.unstack_x(x)
x_min = self._grid[0]
x_max = self._grid[-1]
value = tfp.math.interp_regular_1d_grid(x, x_min, x_max, self._grid_estimations)
value.set_shape(x.shape)
return value
class KDE1DimFFTRepr(BasePDFRepr):
_implementation = KDE1DimFFT
hs3_type: Literal["KDE1DimFFT"] = pydantic.Field("KDE1DimFFT", alias="type")
data: Union[np.ndarray, Serializer.types.DataTypeDiscriminated]
obs: Optional[SpaceRepr] = None
bandwidth: Optional[Union[str, float]] = None
num_grid_points: Optional[int] = None
binning_method: Optional[str] = None
kernel: None = None
support: Optional[float] = None
fft_method: Optional[str] = None
padding: Optional[Union[bool, str]] = None
weights: Optional[Union[np.ndarray, tf.Tensor]] = None
name: Optional[str] = "KDE1DimFFT"
@pydantic.validator("kernel", pre=True)
def validate_kernel(cls, v):
if v is not None:
if v != tfd.Normal:
msg = "Kernel must be None for GridKDE1DimV1 to be serialized."
raise ValueError(msg)
v = None
return v
@pydantic.root_validator(pre=True)
def validate_all(cls, values):
values = dict(values)
if cls.orm_mode(values):
for k, v in values["hs3"].original_init.items():
values[k] = v
return values
[docs]
class KDE1DimISJ(KDEHelper, BasePDF, SerializableMixin):
_N_OBS = 1
def __init__(
self,
data: ztyping.XTypeInput,
*,
obs: ztyping.ObsTypeInput | None = None,
padding: callable | str | bool | None = None,
num_grid_points: int | None = None,
binning_method: str | None = None,
weights: np.ndarray | tf.Tensor | None = None,
extended: ExtendedInputType = None,
norm: NormInputType = None,
name: str = "KDE1DimISJ",
label: str | None = None,
):
r"""Kernel Density Estimation is a non-parametric method to approximate the density of given points.
For a more in-depth explanation, see also in the section about Kernel Density Estimation
:ref:`sec-isj-kde`
.. math::
f_h(x) = \frac{1}{nh} \sum_{i=1}^n K\Big(\frac{x-x_i}{h}\Big)
The bandwidth is computed by using a trick described in a paper by Botev et al. that uses the fact,
that the Kernel Density Estimation
with a Gaussian Kernel is a solution to the Heat Equation.
Args:
data: |@doc:pdf.kde.init.data| Data sample to approximate
the density from. The points represent positions of the *kernel*,
the :math:`x_i`. This is preferrably a ``ZfitData``, but can also
be an array-like object.
If the data has weights, they will be taken into account.
This will change the count of the events, whereas
weight :math:`w_i` of :math:`x_i` will scale the value of
:math:`K_i( x_i)`, resulting in a factor of :math:`\frac{w_i}{\sum w_i} `.
If no weights are given, each kernel will be scaled by the same
constant :math:`\frac{1}{n_{data}}`. |@docend:pdf.kde.init.data|
padding: |@doc:pdf.kde.init.padding| KDEs have a peculiar
weakness: the boundaries, as the outside has a zero density. This makes the KDE
go down at the bountary as well, as the density approaches zero, no matter what the
density inside the boundary was.
There are two ways to circumvent this problem:
- the best solution: providing a larger dataset than the default space the PDF is used in
- mirroring the existing data at the boundaries, which is equivalent to a boundary condition
with a zero derivative. This is a padding technique and can improve the boundaries.
However, one important drawback of this method is to keep in mind that this will actually
alter the PDF *to look mirrored*. If the PDF is plotted in a larger range, this becomes
clear.
Possible options are a number (default 0.1) that depicts the fraction of the overall space
that defines the data mirrored on both sides. For example, for a space from 0 to 5, a value of
0.3 means that all data in the region of 0 to 1.5 is taken, mirrored around 0 as well as
all data from 3.5 to 5 and mirrored at 5. The new data will go from -1.5 to 6.5, so the
KDE is also having a shape outside the desired range. Using it only for the range 0 to 5
hides this.
Using a dict, each side separately (or only a single one) can be mirrored, like ``{'lowermirror: 0.1}``
or ``{'lowermirror: 0.2, 'uppermirror': 0.1}``.
For more control, a callable that takes data and weights can also be used. |@docend:pdf.kde.init.padding|
num_grid_points: |@doc:pdf.kde.init.num_grid_points| Number of points in
the binning grid.
The data will be binned using the *binning_method* in *num_grid_points*
and this histogram grid will then be used as kernel points. This has the
advantage to have a constant computational complexity independent of the data
size.
A number from 32 on can already yield good results, while the default is set
to 1024, creating a fine grid. Lowering the number increases the performance
at the cost of accuracy. |@docend:pdf.kde.init.num_grid_points|
binning_method: |@doc:pdf.kde.init.binning_method| Method to be used for
binning the data. Options are 'linear', 'simple'.
The data can be binned in the usual way ('simple'), but this is less precise
for KDEs, where we are interested in the shape of the histogram and smoothing
it. Therefore, a better suited method, 'linear', is available.
In normal binnig, each event (or weight) falls into the bin within the bin edges,
while the neighbouring bins get zero counts from this event.
In linear binning, the event is split between two bins, proportional to its
closeness to each bin.
The 'linear' method provides superior performance, most notably in small (~32)
grids. |@docend:pdf.kde.init.binning_method|
obs: |@doc:pdf.kde.init.obs| Observable space of the KDE.
As with any other PDF, this will be used as the default *norm*, but
does not define the domain of the PDF. Namely, this can be a smaller
space than *data*, as long as the name of the observable match.
Using a larger dataset is actually good practice avoiding
bountary biases, see also :ref:`sec-boundary-bias-and-padding`. |@docend:pdf.kde.init.obs|
weights: |@doc:pdf.kde.init.weights| Weights of each event
in *data*, can be None or Tensor-like with shape compatible
with *data*. Instead of using this parameter, it is preferred
to use a ``ZfitData`` as *data* that contains weights.
This will change the count of the events, whereas
weight :math:`w_i` of :math:`x_i` will scale the value of :math:`K_i( x_i)`,
resulting in a factor of :math:`\frac{w_i}{\sum w_i} `.
If no weights are given, each kernel will be scaled by the same
constant :math:`\frac{1}{n_{data}}`. |@docend:pdf.kde.init.weights|
norm: |@doc:pdf.init.norm| Normalization of the PDF.
By default, this is the same as the default space of the PDF. |@docend:pdf.init.norm|
extended: |@doc:pdf.init.extended| The overall yield of the PDF.
If this is parameter-like, it will be used as the yield,
the expected number of events, and the PDF will be extended.
An extended PDF has additional functionality, such as the
``ext_*`` methods and the ``counts`` (for binned PDFs). |@docend:pdf.init.extended|
name: |@doc:pdf.init.name| Name of the PDF.
Maybe has implications on the serialization and deserialization of the PDF.
For a human-readable name, use the label. |@docend:pdf.init.name|
label: |@doc:pdf.init.label| Human-readable name
or label of
the PDF for a better description, to be used with plots etc.
Has no programmatical functional purpose as identification. |@docend:pdf.init.label|
"""
original_init = {
"data": data,
"weights": weights,
"num_grid_points": num_grid_points,
"binning_method": binning_method,
"obs": obs,
"padding": padding,
"name": name,
"norm": norm,
"extended": extended,
}
if num_grid_points is None:
num_grid_points = self._default_num_grid_points
if binning_method is None:
binning_method = "linear"
if padding is None:
padding = self._default_padding
if obs is None:
if not isinstance(data, ZfitData) or not data.space.has_limits:
msg = "obs can only be None if data is ZfitData with limits."
raise ValueError(msg)
obs = data.space
data, size, weights, _ = self._convert_init_data_weights_size(
data, weights, padding=padding, limits=obs.v1.limits, bandwidth=None
)
self._padding = padding
num_grid_points = tf.minimum(znp.asarray(size, ztypes.int), tf.constant(num_grid_points, ztypes.int))
self._num_grid_points = num_grid_points
self._binning_method = binning_method
self._data = tf.convert_to_tensor(data, ztypes.float)
self._weights = weights
self._grid = None
self._grid_data = None
(
self._bandwidth,
self._grid_estimations,
self._grid,
) = isj_util.calculate_bandwidth_and_density(
self._data, self._num_grid_points, self._binning_method, self._weights
)
params = {}
super().__init__(obs=obs, name=name, params=params, extended=extended, norm=norm, label=label)
self.hs3.original_init.update(original_init)
def _unnormalized_pdf(self, x):
x = z.unstack_x(x)
x_min = self._grid[0]
x_max = self._grid[-1]
value = tfp.math.interp_regular_1d_grid(x, x_min, x_max, self._grid_estimations)
value.set_shape(x.shape)
return value
class KDE1DimISJRepr(BasePDFRepr):
_implementation = KDE1DimISJ
hs3_type: Literal["KDE1DimISJ"] = pydantic.Field("KDE1DimISJ", alias="type")
data: Union[np.ndarray, Serializer.types.DataTypeDiscriminated]
obs: Optional[SpaceRepr] = None
bandwidth: Optional[Union[str, float]] = None
num_grid_points: Optional[int] = None
binning_method: Optional[str] = None
kernel: None = None
padding: Optional[Union[bool, str]] = None
weights: Optional[Union[np.ndarray, tf.Tensor]] = None
name: Optional[str] = "KDE1DimISJ"
@pydantic.validator("kernel", pre=True)
def validate_kernel(cls, v):
if v is not None:
if v != tfd.Normal:
msg = "Kernel must be None for GridKDE1DimV1 to be serialized."
raise ValueError(msg)
v = None
return v
@pydantic.root_validator(pre=True)
def validate_all(cls, values):
values = dict(values)
if cls.orm_mode(values):
for k, v in values["hs3"].original_init.items():
values[k] = v
return values