# Copyright (c) 2023 zfit
from __future__ import annotations
from collections.abc import Callable
from collections.abc import Mapping
from typing import TYPE_CHECKING
from ..core.parameter import set_values
if TYPE_CHECKING:
import zfit
import boost_histogram as bh
import hist
import tensorflow as tf
from zfit._variables.axis import histaxes_to_binning, binning_to_histaxes
from zfit.core.interfaces import ZfitBinnedData, ZfitSpace, ZfitData
from zfit.z import numpy as znp
from ..util import ztyping
from ..util.exception import ShapeIncompatibleError
# @tfp.experimental.auto_composite_tensor()
class BinnedHolder(
# tfp.experimental.AutoCompositeTensor
):
def __init__(self, space, values, variances):
self._check_init_values(space, values, variances)
self.space = space
self.values = values
self.variances = variances
def _check_init_values(self, space, values, variances):
value_shape = tf.shape(values)
edges_shape = znp.array(
[tf.shape(znp.reshape(edge, (-1,)))[0] for edge in space.binning.edges]
)
values_rank = value_shape.shape[0]
if variances is not None:
variances_shape = tf.shape(variances)
variances_rank = variances_shape.shape[0]
if values_rank != variances_rank:
raise ShapeIncompatibleError(
f"Values {values} and variances {variances} differ in rank: {values_rank} vs {variances_rank}"
)
tf.assert_equal(
variances_shape,
value_shape,
message=f"Variances and values do not have the same shape:"
f" {variances_shape} vs {value_shape}",
)
binning_rank = len(space.binning.edges)
if binning_rank != values_rank:
raise ShapeIncompatibleError(
f"Values and binning differ in rank: {values_rank} vs {binning_rank}"
)
tf.assert_equal(
edges_shape - 1,
value_shape,
message=f"Edges (minus one) and values do not have the same shape:"
f" {edges_shape} vs {value_shape}",
)
def with_obs(self, obs):
space = self.space.with_obs(obs)
values = move_axis_obs(self.space, space, self.values)
variances = self.variances
if variances is not None:
variances = move_axis_obs(self.space, space, self.variances)
return type(self)(space=space, values=values, variances=variances)
def move_axis_obs(original, target, values):
new_axes = [original.obs.index(ob) for ob in target.obs]
values = znp.moveaxis(values, tuple(range(target.n_obs)), new_axes)
return values
flow = False # TODO: track the flow or not?
# @tfp.experimental.auto_composite_tensor()
[docs]
class BinnedData(
ZfitBinnedData,
# tfp.experimental.AutoCompositeTensor, OverloadableMixinValues, ZfitBinnedData
):
def __init__(self, *, holder):
"""Create a binned data object from a :py:class:`~zfit.core.data.BinnedHolder`.
Prefer to use the constructors ``from_*`` of :py:class:`~zfit.core.data.BinnedData`
like :py:meth:`~zfit.core.data.BinnedData.from_hist`, :py:meth:`~zfit.core.data.BinnedData.from_tensor`
or :py:meth:`~zfit.core.data.BinnedData.from_unbinned`.
Args:
holder:
"""
self.holder: BinnedHolder = holder
self.name = "BinnedData" # TODO: improve naming
[docs]
@classmethod # TODO: add overflow bins if needed
def from_tensor(
cls, space: ZfitSpace, values: znp.array, variances: znp.array | None = None
) -> BinnedData:
"""Create a binned dataset defined in *space* where values are considered to be the counts.
Args:
space: |@doc:binneddata.param.space| Binned space of the data.
The space is used to define the binning and the limits of the data. |@docend:binneddata.param.space|
values: |@doc:binneddata.param.values| Corresponds to the counts of the histogram.
Follows the definition of the
`Unified Histogram Interface (UHI) <https://uhi.readthedocs.io/en/latest/plotting.html#plotting>`_. |@docend:binneddata.param.values|
variances: |@doc:binneddata.param.variances| Corresponds to the uncertainties of the histogram.
If ``True``, the uncertainties are created assuming that ``values``
have been drawn from a Poisson distribution. Follows the definition of the
`Unified Histogram Interface (UHI) <https://uhi.readthedocs.io/en/latest/plotting.html#plotting>`_. |@docend:binneddata.param.variances|
"""
values = znp.asarray(values, znp.float64)
if variances is True:
variances = znp.sqrt(values)
elif variances is not None:
variances = znp.asarray(variances)
return cls(holder=BinnedHolder(space=space, values=values, variances=variances))
[docs]
@classmethod
def from_unbinned(cls, space: ZfitSpace, data: ZfitData):
"""Convert an unbinned dataset to a binned dataset.
Args:
space: |@doc:binneddata.param.space| Binned space of the data.
The space is used to define the binning and the limits of the data. |@docend:binneddata.param.space|
data: Unbinned data to be converted to binned data
Returns:
ZfitBinnedData: The binned data
"""
from zfit.core.binning import unbinned_to_binned
return unbinned_to_binned(data, space)
[docs]
@classmethod
def from_hist(cls, h: hist.NamedHist) -> BinnedData:
"""Create a binned dataset from a ``hist`` histogram.
A histogram (following the UHI definition) with named axes.
Args:
h: A NamedHist. The axes will be used as the binning in zfit.
"""
from zfit import Space
space = Space(binning=histaxes_to_binning(h.axes))
values = znp.asarray(h.values(flow=flow))
variances = h.variances(flow=flow)
if variances is not None:
variances = znp.asarray(variances)
holder = BinnedHolder(space=space, values=values, variances=variances)
return cls(holder=holder)
[docs]
def with_obs(self, obs: ztyping.ObsTypeInput) -> BinnedData:
"""Return a subset of the data in the ordering of *obs*.
Args:
obs: Which obs to return
"""
return type(self)(holder=self.holder.with_obs(obs))
@property
def kind(self):
return "COUNT"
@property
def n_obs(self) -> int:
return self.rank
@property
def rank(self) -> int:
return self.space.n_obs
@property
def obs(self):
return self.space.obs
[docs]
def to_hist(self) -> hist.Hist:
"""Convert the binned data to a :py:class:`~hist.NamedHist`.
While a binned data object can be used inside zfit (PDFs,...), it lacks many convenience features that the
`hist library <https://hist.readthedocs.io/>`_
offers, such as plots.
"""
binning = binning_to_histaxes(self.holder.space.binning)
h = hist.Hist(*binning, storage=bh.storage.Weight())
h.view(flow=flow).value = self.values() # TODO: flow?
h.view(flow=flow).variance = self.variances() # TODO: flow?
return h
def _to_boost_histogram_(self):
binning = binning_to_histaxes(self.holder.space.binning)
h = bh.Histogram(*binning, storage=bh.storage.Weight())
h.view(flow=flow).value = self.values() # TODO: flow?
h.view(flow=flow).variance = self.variances() # TODO: flow?
return h
@property
def space(self):
return self.holder.space
@property
def axes(self):
return self.binning
@property
def binning(self):
return self.space.binning
[docs]
def values(self) -> znp.array: # , flow=False
"""Values of the histogram as an ndim array.
Compared to ``hist``, zfit does not make a difference between a view and a copy; tensors are immutable.
This distinction is made in the traced function by the compilation backend.
Returns:
Tensor of shape (nbins0, nbins1, ...) with nbins the number of bins in each observable.
"""
vals = self.holder.values
# if not flow: # TODO: flow?
# shape = tf.shape(vals)
# vals = tf.slice(vals, znp.ones_like(shape), shape - 2)
return vals
[docs]
def variances(self) -> None | znp.array: # , flow=False
"""Variances, if available, of the histogram as an ndim array.
Compared to ``hist``, zfit does not make a difference between a view and a copy; tensors are immutable.
This distinction is made in the traced function by the compilation backend.
Returns:
Tensor of shape (nbins0, nbins1, ...) with nbins the number of bins in each observable.
"""
vals = self.holder.variances
# if not flow: # TODO: flow?
# shape = tf.shape(vals)
# vals = tf.slice(vals, znp.ones_like(shape), shape - 2)
return vals
[docs]
def counts(self):
"""Effective counts of the histogram as a ndim array.
Compared to ``hist``, zfit does not make a difference between a view and a copy; tensors are immutable.
This distinction is made in the traced function by the compilation backend.
Returns:
Tensor of shape (nbins0, nbins1, ...) with nbins the number of bins in each observable.
"""
return self.values()
# dummy
@property
def data_range(self):
return self.space
@property
def nevents(self):
return znp.sum(self.values())
@property
def n_events(self): # LEGACY, what should be the name?
return self.nevents
@property
def _approx_nevents(self):
return znp.sum(self.values())
def __eq__(self, other):
return id(self) == id(other)
def __hash__(self):
return hash(id(self))
[docs]
def to_unbinned(self):
"""Use the bincenters as unbinned data with values as counts.
Returns:
``ZfitData``: Unbinned data
"""
meshed_center = znp.meshgrid(*self.axes.centers, indexing="ij")
flat_centers = [znp.reshape(center, (-1,)) for center in meshed_center]
centers = znp.stack(flat_centers, axis=-1)
flat_weights = znp.reshape(self.values(), (-1,)) # TODO: flow?
space = self.space.copy(binning=None)
from zfit import Data
return Data.from_tensor(obs=space, tensor=centers, weights=flat_weights)
def __str__(self):
import zfit
if zfit.run.executing_eagerly():
return self.to_hist().__str__()
else:
return f"Binned data, {self.obs} (non-eager)"
def _repr_html_(self):
import zfit
if zfit.run.executing_eagerly():
return self.to_hist()._repr_html_()
else:
return f"Binned data, {self.obs} (non-eager)"
# tensorlike.register_tensor_conversion(BinnedData, name='BinnedData', overload_operators=True)
class SampleHolder(BinnedHolder):
def with_obs(self, obs):
assert False, "INTERNAL ERROR, should never be used directly"
class BinnedSampler(BinnedData):
_cache_counting = 0
def __init__(
self,
dataset: SampleHolder,
sample_func: Callable,
sample_holder: tf.Variable,
n: ztyping.NumericalScalarType | Callable,
fixed_params: dict[zfit.Parameter, ztyping.NumericalScalarType] = None,
):
super().__init__(holder=dataset)
if fixed_params is None:
fixed_params = {}
if isinstance(fixed_params, (list, tuple)):
fixed_params = {param: param.numpy() for param in fixed_params}
self._initial_resampled = False
self.fixed_params = fixed_params
self.sample_holder = sample_holder
self.sample_func = sample_func
self.n = n
self._n_holder = n
self.resample() # to be used for precompilations etc
@property
def n_samples(self):
return self._n_holder
@property
def _approx_nevents(self):
nevents = super()._approx_nevents
if nevents is None:
nevents = self.n
return nevents
@property
def hashint(self) -> int | None:
return None # since the variable can be changed but this may stays static... and using 128 bits we can't have
# a tf.Variable that keeps the int
@classmethod
def get_cache_counting(cls):
counting = cls._cache_counting
cls._cache_counting += 1
return counting
@classmethod
def from_sample(
cls,
sample_func: Callable,
n: ztyping.NumericalScalarType,
obs: ztyping.ObsTypeInput,
fixed_params=None,
dtype=None,
):
from ..core.space import convert_to_space
obs = convert_to_space(obs)
if fixed_params is None:
fixed_params = []
if dtype is None:
from .. import ztypes
dtype = ztypes.float
# from tensorflow.python.ops.variables import VariableV1
sample_holder = tf.Variable(
initial_value=sample_func(n),
dtype=dtype,
trainable=False,
# validate_shape=False,
shape=(None,) * obs.n_obs,
name=f"sample_hist_holder_{cls.get_cache_counting()}",
)
dataset = SampleHolder(space=obs, values=sample_holder, variances=None)
return cls(
dataset=dataset,
sample_holder=sample_holder,
sample_func=sample_func,
fixed_params=fixed_params,
n=n,
)
def resample(self, param_values: Mapping = None, n: int | tf.Tensor = None):
"""Update the sample by newly sampling. This affects any object that used this data already.
All params that are not in the attribute ``fixed_params`` will use their current value for
the creation of the new sample. The value can also be overwritten for one sampling by providing
a mapping with ``param_values`` from ``Parameter`` to the temporary ``value``.
Args:
param_values: a mapping from :py:class:`~zfit.Parameter` to a `value`. For the current sampling,
`Parameter` will use the `value`.
n: the number of samples to produce. If the `Sampler` was created with
anything else then a numerical or tf.Tensor, this can't be used.
"""
if n is None:
n = self.n
temp_param_values = self.fixed_params.copy()
if param_values is not None:
temp_param_values.update(param_values)
with set_values(
list(temp_param_values.keys()), list(temp_param_values.values())
):
new_sample = self.sample_func(n)
self.sample_holder.assign(new_sample, read_value=False)
self._initial_resampled = True
def with_obs(self, obs: ztyping.ObsTypeInput) -> BinnedSampler:
"""Create a new :py:class:`~zfit.core.data.BinnedSampler` with the same sample but different ordered
observables.
Args:
obs: The new observables
"""
from ..core.space import convert_to_space
obs = convert_to_space(obs)
if obs.obs == self.obs:
return self
def new_sample_func(n):
sample = self.sample_func(n)
values = move_axis_obs(self.space, obs, sample)
return values
return BinnedSampler.from_sample(
sample_func=new_sample_func,
n=self.n,
obs=obs,
fixed_params=self.fixed_params,
)
def values(self) -> znp.array:
return znp.asarray(super().values())
def __str__(self) -> str:
return f"<BinnedSampler: {self.name} obs={self.obs}>"