# Copyright (c) 2024 zfit
from __future__ import annotations
from collections.abc import Callable
from typing import TYPE_CHECKING, Optional
import numpy as np
import xxhash
from tensorflow.python.util.deprecation import deprecated
from zfit_interface.typing import TensorLike
from ..core.baseobject import convert_param_values
if TYPE_CHECKING:
pass
import boost_histogram as bh
import hist
import tensorflow as tf
from zfit.z import numpy as znp
from .._variables.axis import binning_to_histaxes, histaxes_to_binning
from ..core.interfaces import ZfitBinnedData, ZfitData, ZfitSpace
from ..util import ztyping
from ..util.exception import BreakingAPIChangeError, ShapeIncompatibleError
# @tfp.experimental.auto_composite_tensor()
class BinnedHolder:
def __init__(self, space, values, variances):
self._check_init_values(space, values, variances)
self.space = space
self.values = values
self.variances = variances
def _check_init_values(self, space, values, variances):
value_shape = tf.shape(values)
edges_shape = znp.array([tf.shape(znp.reshape(edge, (-1,)))[0] for edge in space.binning.edges])
values_rank = value_shape.shape[0]
if variances is not None:
variances_shape = tf.shape(variances)
variances_rank = variances_shape.shape[0]
if values_rank != variances_rank:
msg = f"Values {values} and variances {variances} differ in rank: {values_rank} vs {variances_rank}"
raise ShapeIncompatibleError(msg)
tf.assert_equal(
variances_shape,
value_shape,
message=f"Variances and values do not have the same shape:" f" {variances_shape} vs {value_shape}",
)
if (binning_rank := len(space.binning.edges)) != values_rank:
msg = f"Values and binning differ in rank: {values_rank} vs {binning_rank}"
raise ShapeIncompatibleError(msg)
tf.assert_equal(
edges_shape - 1,
value_shape,
message=f"Edges (minus one) and values do not have the same shape:" f" {edges_shape} vs {value_shape}",
)
def with_obs(self, obs: ztyping.ObsTypeInput):
"""Return a new binned data object with updated observables order.
Args:
obs: The observables in the new order.
"""
space = self.space.with_obs(obs)
values, variances = move_axis_obs(self.space, space, values=self.values, variances=self.variances)
return type(self)(space=space, values=values, variances=variances)
def with_variances(self, variances: znp.array | None):
"""Return a new binned data object with updated variances.
Args:
variances: The new variances
Returns:
BinnedHolder: A new binned data object with updated variances.
"""
if variances is not None:
variances = znp.asarray(variances)
from zfit import run
if run.numeric_checks:
tf.debugging.assert_equal(
tf.shape(variances),
tf.shape(self.values),
message=f"Variances {variances} and values {self.values} do not have the same shape",
)
return type(self)(space=self.space, values=self.values, variances=variances)
def move_axis_obs(original, target, values, variances=None):
new_axes = [original.obs.index(ob) for ob in target.obs]
newobs = tuple(range(target.n_obs))
values = znp.moveaxis(values, newobs, new_axes)
if variances is not None:
variances = znp.moveaxis(variances, newobs, new_axes)
return values, variances
flow = False # TODO: track the flow or not?
# @tfp.experimental.auto_composite_tensor()
[docs]
class BinnedData(
ZfitBinnedData,
# tfp.experimental.AutoCompositeTensor, OverloadableMixinValues, ZfitBinnedData
):
USE_HASH = False
def __init__(self, *, holder, use_hash=None, name: Optional[str] = None, label: Optional[str] = None):
"""Create a binned data object from a :py:class:`~zfit.core.data.BinnedHolder`.
Prefer to use the constructors ``from_*`` of :py:class:`~zfit.core.data.BinnedData`
like :py:meth:`~zfit.core.data.BinnedData.from_hist`, :py:meth:`~zfit.core.data.BinnedData.from_tensor`
or :py:meth:`~zfit.core.data.BinnedData.from_unbinned`.
Args:
holder:
"""
if use_hash is None:
use_hash = self.USE_HASH
self._use_hash = use_hash
self._hashint = None
self.holder: BinnedHolder = holder
self.name = name or "BinnedData"
self.label = label or self.name
self._update_hash()
[docs]
def with_variances(self, variances: znp.array) -> BinnedData:
"""Return a new binned data object with updated variances.
Args:
variances: The new variances
"""
return type(self)(holder=self.holder.with_variances(variances), name=self.name, label=self.label)
[docs]
def enable_hashing(self):
"""Enable hashing for this data object if it was disabled.
A hash allows some objects to be cached and reused. If a hash is enabled, the data object will be hashed and the
hash _can_ be used for caching. This can speedup various objects, however, it maybe doesn't have an effect at
all. For example, if an object was already called before with the data object, the hash will probably not be
used, as the object is already compiled.
"""
from zfit import run
run.assert_executing_eagerly()
self._use_hash = True
self._update_hash()
@property
def _using_hash(self):
from zfit import run
return self._use_hash and run.hashing_data()
[docs]
@classmethod # TODO: add overflow bins if needed
def from_tensor(
cls,
space: ZfitSpace,
values: znp.array,
variances: znp.array | None = None,
name: str | None = None,
label: str | None = None,
use_hash: bool | None = None,
) -> BinnedData:
"""Create a binned dataset defined in *space* where values are considered to be the counts.
Args:
space: |@doc:binneddata.param.space| Binned space of the data.
The space is used to define the binning and the limits of the data. |@docend:binneddata.param.space|
values: |@doc:binneddata.param.values| Corresponds to the counts of the histogram.
Follows the definition of the
`Unified Histogram Interface (UHI) <https://uhi.readthedocs.io/en/latest/plotting.html#plotting>`_. |@docend:binneddata.param.values|
variances: |@doc:binneddata.param.variances| Corresponds to the uncertainties of the histogram.
If ``True``, the uncertainties are created assuming that ``values``
have been drawn from a Poisson distribution. Follows the definition of the
`Unified Histogram Interface (UHI) <https://uhi.readthedocs.io/en/latest/plotting.html#plotting>`_. |@docend:binneddata.param.variances|
"""
values = znp.asarray(values, znp.float64)
if variances is True:
variances = znp.sqrt(values)
elif variances is not None:
variances = znp.asarray(variances)
return cls(
holder=BinnedHolder(space=space, values=values, variances=variances),
name=name,
label=label,
use_hash=use_hash,
)
[docs]
@classmethod
def from_unbinned(
cls,
space: ZfitSpace,
data: ZfitData,
*,
use_hash: bool | None = None,
name: str | None = None,
label: str | None = None,
) -> BinnedData:
"""Convert an unbinned dataset to a binned dataset.
Args:
space: |@doc:binneddata.param.space| Binned space of the data.
The space is used to define the binning and the limits of the data. |@docend:binneddata.param.space|
data: Unbinned data to be converted to binned data
Returns:
ZfitBinnedData: The binned data
"""
from zfit.core.binning import unbinned_to_binned
return unbinned_to_binned(
data,
space,
binned_class=cls,
initkwargs={
"name": name or data.name,
"label": label or data.label,
"use_hash": use_hash if use_hash is not None else data._use_hash,
},
)
[docs]
@classmethod
def from_hist(cls, h: hist.NamedHist) -> BinnedData:
"""Create a binned dataset from a ``hist`` histogram.
A histogram (following the UHI definition) with named axes.
Args:
h: A NamedHist. The axes will be used as the binning in zfit.
"""
from zfit import Space
space = Space(binning=histaxes_to_binning(h.axes))
values = znp.asarray(h.values(flow=flow))
variances = h.variances(flow=flow)
if variances is not None:
variances = znp.asarray(variances)
holder = BinnedHolder(space=space, values=values, variances=variances)
return cls(holder=holder)
[docs]
def with_obs(self, obs: ztyping.ObsTypeInput) -> BinnedData:
"""Return a subset of the data in the ordering of *obs*.
Args:
obs: Which obs to return
"""
return BinnedData(holder=self.holder.with_obs(obs), name=self.name, label=self.label)
# no subclass, as this allows the sampler to be the same still and not reinitiated
def _update_hash(self):
from zfit import run
if not run.executing_eagerly() or not self._using_hash:
self._hashint = None
else:
hashval = xxhash.xxh128(np.asarray(self.values()))
if (variances := self.variances()) is not None:
hashval.update(np.asarray(variances))
if hasattr(self, "_hashint"):
self._hashint = hashval.intdigest() % (64**2)
else: # if the dataset is not yet initialized; this is allowed
self._hashint = None
@property
def hashint(self) -> int | None:
return self._hashint
@property
def kind(self):
return "COUNT"
@property
def n_obs(self) -> int:
return self.rank
@property
def rank(self) -> int:
return self.space.n_obs
@property
def obs(self):
return self.space.obs
[docs]
def to_hist(self) -> hist.Hist:
"""Convert the binned data to a :py:class:`~hist.NamedHist`.
While a binned data object can be used inside zfit (PDFs,...), it lacks many convenience features that the
`hist library <https://hist.readthedocs.io/>`_
offers, such as plots.
"""
binning = binning_to_histaxes(self.holder.space.binning)
h = hist.Hist(*binning, storage=bh.storage.Weight())
h.view(flow=flow).value = self.values() # TODO: flow?
h.view(flow=flow).variance = self.variances() # TODO: flow?
return h
def _to_boost_histogram_(self):
binning = binning_to_histaxes(self.holder.space.binning)
h = bh.Histogram(*binning, storage=bh.storage.Weight())
h.view(flow=flow).value = self.values() # TODO: flow?
h.view(flow=flow).variance = self.variances() # TODO: flow?
return h
@property
def space(self):
return self.holder.space
@property
def axes(self):
return self.binning
@property
def binning(self):
return self.space.binning
[docs]
def values(self) -> znp.array: # , flow=False
"""Values of the histogram as an ndim array.
Compared to ``hist``, zfit does not make a difference between a view and a copy; tensors are immutable.
This distinction is made in the traced function by the compilation backend.
Returns:
Tensor of shape (nbins0, nbins1, ...) with nbins the number of bins in each observable.
"""
return self.holder.values
# if not flow: # TODO: flow?
# shape = tf.shape(vals)
# vals = tf.slice(vals, znp.ones_like(shape), shape - 2)
[docs]
def variances(self) -> None | znp.array: # , flow=False
"""Variances, if available, of the histogram as an ndim array.
Compared to ``hist``, zfit does not make a difference between a view and a copy; tensors are immutable.
This distinction is made in the traced function by the compilation backend.
Returns:
Tensor of shape (nbins0, nbins1, ...) with nbins the number of bins in each observable.
"""
return self.holder.variances
# if not flow: # TODO: flow?
# shape = tf.shape(vals)
# vals = tf.slice(vals, znp.ones_like(shape), shape - 2)
[docs]
def counts(self):
"""Effective counts of the histogram as a ndim array.
Compared to ``hist``, zfit does not make a difference between a view and a copy; tensors are immutable.
This distinction is made in the traced function by the compilation backend.
Returns:
Tensor of shape (nbins0, nbins1, ...) with nbins the number of bins in each observable.
"""
return self.values()
# dummy
@property
def data_range(self):
return self.space
@property
def nevents(self):
return znp.sum(self.values())
@property
def n_events(self): # LEGACY, what should be the name?
return self.nevents
@property
def _approx_nevents(self):
return znp.sum(self.values())
def __eq__(self, other):
return id(self) == id(other)
def __hash__(self):
return hash(id(self))
[docs]
def to_unbinned(self):
"""Use the bincenters as unbinned data with values as counts.
Returns:
``ZfitData``: Unbinned data
"""
meshed_center = znp.meshgrid(*self.axes.centers, indexing="ij")
flat_centers = [znp.reshape(center, (-1,)) for center in meshed_center]
centers = znp.stack(flat_centers, axis=-1)
flat_weights = znp.reshape(self.values(), (-1,)) # TODO: flow?
space = self.space.copy(binning=None)
from zfit import Data
return Data.from_tensor(obs=space, tensor=centers, weights=flat_weights)
def __str__(self):
import zfit
if zfit.run.executing_eagerly():
return self.to_hist().__str__()
return f"Binned data {self.axes} (compiled, no preview)"
def _repr_html_(self):
import zfit
if zfit.run.executing_eagerly():
return self.to_hist()._repr_html_()
return f"Binned data {self.axes} (compiled, no preview)"
# tensorlike.register_tensor_conversion(BinnedData, name='BinnedData', overload_operators=True)
[docs]
class BinnedSamplerData(BinnedData):
_cache_counting = 0
def __init__(
self,
dataset: BinnedHolder,
*,
sample_and_variances_func: Callable,
sample_holder: tf.Variable = None,
variances_holder: tf.Variable = None,
n: ztyping.NumericalScalarType | Callable = None,
params: ztyping.ParamValuesMap = None,
name: str | None = None,
label: str | None = None,
):
"""The ``BinnedSampler`` is a binned data object that can be resampled, i.e. modified in-place.
Use `from_sampler` to create a `BinnedSampler`.
Args:
dataset: The data holder that contains the sample and the variances.
sample_and_variances_func: A function that samples the data and returns the sample and the variances.
sample_holder: The tensor that holds the sample.
variances_holder: The tensor that holds the variances.
n: The number of samples to produce. If the `SamplerData` was created with
anything else then a numerical or tf.Tensor, this can't be used.
params: A mapping from `Parameter` to a fixed value that should be used for the sampling.
name: The name of the data object.
label: The label of the data object.
"""
super().__init__(holder=dataset, name=name, label=label, use_hash=True)
params = convert_param_values(params)
self._initial_resampled = False
self.params = params
self._sample_holder = sample_holder
self._variances_holder = variances_holder
self._sample_and_variances_func = sample_and_variances_func
self.n = n
self._n_holder = n
# we need to use a hash because it could change -> for loss etc to know when data changes
self._hashint_holder = tf.Variable(
initial_value=0,
dtype=tf.int64,
trainable=False,
shape=(),
)
self.update_data(dataset.values, variances=dataset.variances)
@property
@deprecated(None, "Use `params` instead.")
def fixed_params(self):
return self.params
@property
def n_samples(self):
return self._n_holder
@property
def _approx_nevents(self):
nevents = super()._approx_nevents
if nevents is None:
nevents = self.n
return nevents
@property
def hashint(self) -> int | None:
return self._hashint_holder.value()
def _update_hash(self):
super()._update_hash()
if hasattr(self, "_hashint_holder"): # initialization
self._hashint_holder.assign(self._hashint % (64**2))
@classmethod
def get_cache_counting(cls):
counting = cls._cache_counting
cls._cache_counting += 1
return counting
@classmethod
def from_sample(
cls,
sample_func: Callable, # noqa: ARG003
n: ztyping.NumericalScalarType, # noqa: ARG003
obs: ztyping.ObsTypeInput, # noqa: ARG003
fixed_params=None, # noqa: ARG003
):
msg = " Use `from_sampler` (with `r` at the end instead."
raise BreakingAPIChangeError(msg)
[docs]
@classmethod
def from_sampler(
cls,
*,
sample_func: Callable | None = None,
sample_and_variances_func: Callable | None = None,
n: ztyping.NumericalScalarType,
obs: ztyping.AxesTypeInput,
params: ztyping.ParamValuesMap = None,
fixed_params=None,
name: str | None = None,
label: str | None = None,
):
"""Create a binned sampler from a sample function.
This is a binned data object that can be modified in-place by updating/resampling the sample.
Args:
sample_func: A function that samples the data.
sample_and_variances_func: A function that samples the data and returns the sample and the variances.
n: The number of samples to produce.
obs: The observables of the data.
params: A mapping from :py:class:~`zfit.Parameter` or string (the name) to a fixed value that should be used for the sampling.
name: The name of the data object.
label: The label of the data object.
"""
if fixed_params is not None:
msg = "Use `params` instead of `fixed_params`."
raise BreakingAPIChangeError(msg)
if int(sample_func is not None) + int(sample_and_variances_func is not None) != 1:
msg = "Exactly one of `sample`, `sample_func` or `sample_and_variances_func` must be provided."
raise ValueError(msg)
if sample_func is not None:
def sample_and_variances_func(n, params, *, sample_func=sample_func):
sample = sample_func(n, params=params)
return sample, None
del sample_func
from ..core.space import convert_to_space
obs = convert_to_space(obs)
from .. import ztypes
dtype = ztypes.float
params = convert_param_values(params)
initval, initvar = sample_and_variances_func(n, params=params) # todo: preprocess, cut data?
sample_holder = tf.Variable(
initial_value=initval,
dtype=dtype,
trainable=False,
# validate_shape=False,
shape=(None,) * obs.n_obs,
name=f"sample_hist_holder_{cls.get_cache_counting()}",
)
if initvar is not None:
variances_holder = tf.Variable(
initial_value=initvar,
dtype=dtype,
trainable=False,
# validate_shape=False,
shape=(None,) * obs.n_obs,
name=f"variances_hist_holder_{cls.get_cache_counting()}",
)
else:
variances_holder = None
dataset = BinnedHolder(space=obs, values=sample_holder, variances=variances_holder)
return cls(
dataset=dataset,
sample_holder=sample_holder,
sample_and_variances_func=sample_and_variances_func,
variances_holder=variances_holder,
name=name,
label=label,
params=params,
n=n,
)
[docs]
def resample(
self,
params: ztyping.ParamValuesMap = None,
*,
n: int | tf.Tensor = None,
param_values: ztyping.ParamValuesMap = None,
):
"""Update the sample by new sampling *inplace*; This affects any object that used this data already.
All params that are not in the attribute ``params`` will use their current value for
the creation of the new sample. The value can also be overwritten for one sampling by providing
a mapping with ``param_values`` from ``Parameter`` to the temporary ``value``.
Args:
params: a mapping from :py:class:`~zfit.Parameter` to a `value` that should be used for the sampling.
Any parameter that is not in this mapping will use the value in `params`.
n: the number of samples to produce. If the `SamplerData` was created with
anything else then a numerical or tf.Tensor, this can't be used.
"""
if n is None:
n = self.n
if param_values is not None:
if params is not None:
msg = "Cannot specify both `fixed_params` and `params`."
raise ValueError(msg)
params = param_values
temp_param_values = self.params.copy()
if params is not None:
params = convert_param_values(params)
temp_param_values.update(params)
new_sample, new_variances = self._sample_and_variances_func(n, params=temp_param_values)
self.update_data(new_sample, new_variances)
[docs]
def update_data(self, sample: TensorLike, variances: TensorLike | None = None):
"""Update the data, and optionally the variances, of the sampler in-place.
This change will be reflected in any object that used this data already.
Args:
sample: The new sample.
variances: The new variances. Can only be provided if the sampler was initialized with variances and *must* be
provided if the sampler was initialized with variances.
"""
sample = znp.asarray(sample)
if variances is not None:
variances = znp.asarray(variances)
self._sample_holder.assign(sample, read_value=False)
if variances is not None:
if self._variances_holder is None:
msg = "Variances were not initialized, cannot update them."
raise ValueError(msg)
self._variances_holder.assign(variances, read_value=False)
elif self._variances_holder is not None:
msg = "Variances were initialized, cannot remove them."
raise ValueError(msg)
self._initial_resampled = True
self._update_hash()
[docs]
def values(self) -> znp.array:
"""Values/counts of the histogram as an ndim array.
Returns:
Tensor of shape (nbins0, nbins1, ...) with nbins the number of bins in each observable.
"""
return znp.asarray(super().values()) # otherwise, shape is not correct -> use handler if variable is needed
[docs]
def variances(self) -> znp.array:
"""Variances of the histogram as an ndim array or `None` if no variances are available.
Returns:
Tensor of shape (nbins0, nbins1, ...) with nbins the number of bins in each observable.
"""
if (variances := super().variances()) is not None:
variances = znp.asarray(variances)
return variances
def __repr__(self) -> str:
return f"<BinnedSampler: {self.name} obs={self.obs}>"