"""
.. include:: ../../docs/subst_types.rst
# NamedSpace and limits
Limits define a certain interval in a specific dimension. This can be used to define, for example,
the limits of an integral over several dimensions or a normalization range.
### with limits
Therefore a different way of specifying limits is possible, basically by defining chunks of the
lower and the upper limits. The shape of a lower resp. upper limit is (n_limits, n_obs).
Example: 1-dim: 1 to 4, 2.-dim: 21 to 24 AND 1.-dim: 6 to 7, 2.-dim 26 to 27
>>> lower = ((1, 21), (6, 26))
>>> upper = ((4, 24), (7, 27))
>>> limits2 = Space(limits=(lower, upper), obs=('obs1', 'obs2')
General form:
lower = ((lower1_dim1, lower1_dim2, lower1_dim3), (lower2_dim1, lower2_dim2, lower2_dim3),...)
upper = ((upper1_dim1, upper1_dim2, upper1_dim3), (upper2_dim1, upper2_dim2, upper2_dim3),...)
## Using :py:class:`~zfit.Space`
:py:class:`NamedSpace` offers a few useful functions to easier deal with the intervals
### Handling areas
For example when doing a MC integration using the expectation value, it is mandatory to know
the total area of your intervals. You can retrieve the total area or (if multiple limits (=intervals
are given) the area of each interval.
>>> area = limits2.area()
>>> area_1, area_2 = limits2.iter_areas(rel=False) # if rel is True, return the fraction of 1
### Retrieve the limits
>>> lower, upper = limits2.limits
which you can now iterate through. For example, to calc an integral (assuming there is a function
`integrate` taking the lower and upper limits and returning the function), you can do
>>> def integrate(lower_limit, upper_limit): return 42 # dummy function
>>> integral = sum(integrate(lower_limit=low, upper_limit=up) for low, up in zip(lower, upper))
"""
# Copyright (c) 2020 zfit
# TODO(Mayou36): update docs above
import functools
import inspect
from collections import OrderedDict
from contextlib import suppress
from typing import Callable, Dict, List, Optional, Tuple, Union
import numpy as np
import tensorflow as tf
from .baseobject import BaseObject
from .dimension import add_spaces, combine_spaces
from .interfaces import ZfitSpace
from ..util import ztyping
from ..util.checks import NOT_SPECIFIED
from ..util.container import convert_to_container
from ..util.exception import (AxesNotSpecifiedError, IntentionNotUnambiguousError, LimitsUnderdefinedError,
MultipleLimitsNotImplementedError, NormRangeNotImplementedError, ObsNotSpecifiedError,
OverdefinedError, LimitsNotSpecifiedError, )
from ..util.temporary import TemporarilySet
# Singleton
[docs]class Any:
_singleton_instance = None
def __new__(cls, *args, **kwargs):
instance = cls._singleton_instance
if instance is None:
instance = super().__new__(cls)
cls._singleton_instance = instance
return instance
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
cls._singleton_instance = None # each subclass is a singleton of "itself"
def __repr__(self):
return '<Any>'
def __lt__(self, other):
return True
def __le__(self, other):
return True
# def __eq__(self, other):
# return True
def __ge__(self, other):
return True
def __gt__(self, other):
return True
# def __hash__(self):
# return
[docs]class AnyLower(Any):
def __repr__(self):
return '<Any Lower Limit>'
# def __eq__(self, other):
# return False
def __ge__(self, other):
return False
def __gt__(self, other):
return False
[docs]class AnyUpper(Any):
def __repr__(self):
return '<Any Upper Limit>'
# def __eq__(self, other):
# return False
def __le__(self, other):
return False
def __lt__(self, other):
return False
ANY = Any()
ANY_LOWER = AnyLower()
ANY_UPPER = AnyUpper()
[docs]class Space(ZfitSpace, BaseObject):
AUTO_FILL = object()
ANY = ANY
ANY_LOWER = ANY_LOWER # TODO: needed? or move everything inside?
ANY_UPPER = ANY_UPPER
def __init__(self, obs: ztyping.ObsTypeInput, limits: Optional[ztyping.LimitsTypeInput] = None,
name: Optional[str] = "Space"):
"""Define a space with the name (`obs`) of the axes (and it's number) and possibly it's limits.
Args:
obs (str, List[str,...]):
limits ():
name (str):
"""
obs = self._check_convert_input_obs(obs)
if name is None:
name = "Space_" + "_".join(obs)
super().__init__(name=name)
self._axes = None
self._obs = obs
self._check_set_limits(limits=limits)
@classmethod
def _from_any(cls, obs: ztyping.ObsTypeInput = None, axes: ztyping.AxesTypeInput = None,
limits: Optional[ztyping.LimitsTypeInput] = None,
name: str = None) -> "zfit.Space":
if obs is None:
new_space = cls.from_axes(axes=axes, limits=limits, name=name)
else:
new_space = cls(obs=obs, limits=limits, name=name)
new_space._axes = axes
return new_space
[docs] @classmethod
def from_axes(cls, axes: ztyping.AxesTypeInput,
limits: Optional[ztyping.LimitsTypeInput] = None,
name: str = None) -> "zfit.Space":
"""Create a space from `axes` instead of from `obs`.
Args:
axes ():
limits ():
name (str):
Returns:
:py:class:`~zfit.Space`
"""
axes = convert_to_container(value=axes, container=tuple)
if axes is None:
raise AxesNotSpecifiedError("Axes cannot be `None`")
fake_obs = (str(axis) for axis in axes) # in order to create an instance
new_space = cls(obs=fake_obs, limits=limits, name=name)
new_space._obs = None
new_space._axes = new_space._check_convert_input_axes(axes)
return new_space
@staticmethod
def _convert_obs_to_str(obs):
if isinstance(obs, Space):
obs = obs.obs
else:
obs = convert_to_container(obs, container=tuple)
return obs
@staticmethod
def _convert_axes_to_int(axes):
if isinstance(axes, Space):
axes = axes.axes
else:
axes = convert_to_container(axes, container=tuple)
return axes
def _check_set_limits(self, limits: ztyping.LimitsTypeInput):
if limits is not None and limits is not False:
lower, upper = limits
limits = self._check_convert_input_lower_upper(lower=lower, upper=upper)
self._limits = limits
def _check_convert_input_lower_upper(self, lower, upper):
lower = self._check_convert_input_limit(limit=lower)
upper = self._check_convert_input_limit(limit=upper)
lower_is_iterable = lower is not None or lower is not False
upper_is_iterable = upper is not None or upper is not False
if not (lower_is_iterable or upper_is_iterable) and lower is not upper:
ValueError("Lower and upper limits wrong:"
"\nlower = {lower}"
"\nupper = {upper}".format(lower=lower, upper=upper))
if lower_is_iterable ^ upper_is_iterable:
raise ValueError("Lower and upper limits wrong:"
"\nlower = {l}"
"\nupper = {u}".format(l=lower, u=upper))
if lower_is_iterable and upper_is_iterable:
if not shape_np_tf(lower) == shape_np_tf(upper) or (
len(shape_np_tf(lower)) not in (2, 3)): # 3 for EventSpace eager
raise ValueError("Lower and/or upper limits invalid:"
"\nlower: {lower}"
"\nupper: {upper}".format(lower=lower, upper=upper))
if not shape_np_tf(lower)[1] == self.n_obs:
raise ValueError("Limits shape not compatible with number of obs/axes"
"\nlower: {lower}"
"\nupper: {upper}"
"\nn_obs: {n_obs}".format(lower=lower, upper=upper, n_obs=self.n_obs))
return lower, upper
def _check_convert_input_limit(self, limit: Union[ztyping.LowerTypeInput, ztyping.UpperTypeInput],
replace=None) -> Union[ztyping.LowerTypeReturn, ztyping.UpperTypeReturn]:
"""Check and sanitize the lower or upper limit.
Args:
limit ():
Returns:
"""
replace = {} if replace is None else replace
if limit is NOT_SPECIFIED or limit is None:
return None
if (isinstance(limit, tuple) and limit == ()) or (isinstance(limit, np.ndarray) and limit.size == 0):
raise ValueError("Currently, () is not supported as limits. Should this be default for None?")
shape = shape_np_tf(limit)
if shape == ():
limit = ((limit,),)
shape = shape_np_tf(limit[0])
if shape == ():
raise ValueError("Shape of limit {} wrong.".format(limit))
# replace
if replace:
limit = tuple(tuple(replace.get(l, l) for l in lim) for lim in limit)
return limit
def _check_set_lower_upper(self, lower: ztyping.LowerTypeInput, upper: ztyping.UpperTypeInput):
if lower is None or lower is False:
limits = lower
else:
lower = self._check_convert_input_limit(lower)
upper = self._check_convert_input_limit(upper)
limits = lower, upper
self._check_set_limits(limits=limits)
def _check_convert_input_axes(self, axes: ztyping.AxesTypeInput,
allow_none: bool = False) -> ztyping.AxesTypeReturn:
if axes is None:
if allow_none:
return None
else:
raise AxesNotSpecifiedError("TODO: Cannot be None")
axes = convert_to_container(value=axes, container=tuple) # TODO(Mayou36): extend like _check_obs?
return axes
def _check_convert_input_obs(self, obs: ztyping.ObsTypeInput,
allow_none: bool = False) -> ztyping.ObsTypeReturn:
"""Input check: Convert `NOT_SPECIFIED` to None or check if obs are all strings.
Args:
obs (str, List[str], None, NOT_SPECIFIED):
Returns:
type:
"""
if obs is None:
if allow_none:
return None
else:
raise ObsNotSpecifiedError("TODO: Cannot be None")
if isinstance(obs, Space):
obs = obs.obs
else:
obs = convert_to_container(obs, container=tuple)
obs_not_str = tuple(o for o in obs if not isinstance(o, str))
if obs_not_str:
raise ValueError("The following observables are not strings: {}".format(obs_not_str))
return obs
@property
def limits(self) -> ztyping.LimitsTypeReturn:
"""Return the limits.
Returns:
"""
return self._limits
@property
def limit1d(self) -> Tuple[float, float]:
"""Simplified limits getter for 1 obs, 1 limit only: return the tuple(lower, upper).
Returns:
tuple(float, float): so :code:`lower, upper = space.limit1d` for a simple, 1 obs limit.
Raises:
RuntimeError: if the conditions (n_obs or n_limits) are not satisfied.
"""
if self.n_obs > 1:
raise RuntimeError("Cannot call `limit1d, as `Space` has more than one observables: {}".format(self.n_obs))
if self.n_limits > 1:
raise RuntimeError("Cannot call `limit1d, as `Space` has several limits: {}".format(self.n_limits))
limits = self.limits
if limits in (None, False):
limit = limits
else:
(lower,), (upper,) = limits
limit = lower[0], upper[0]
return limit
@property
def limit2d(self) -> Tuple[float, float, float, float]:
"""Simplified `limits` for exactly 2 obs, 1 limit: return the tuple(low_obs1, low_obs2, up_obs1, up_obs2).
Returns:
tuple(float, float, float, float): so `low_x, low_y, up_x, up_y = space.limit2d` for a single, 2 obs limit.
low_x is the lower limit in x, up_x is the upper limit in x etc.
Raises:
RuntimeError: if the conditions (n_obs or n_limits) are not satisfied.
"""
if self.n_obs != 2:
raise RuntimeError("Cannot call `limit2d, as `Space` has not two observables: {}".format(self.n_obs))
if self.n_limits > 1:
raise RuntimeError("Cannot call `limit2d, as `Space` has several limits: {}".format(self.n_limits))
limits = self.limits
if limits in (None, False):
limit = limits
else:
(lower,), (upper,) = limits
limit = *lower, *upper
return limit
@property
def limits1d(self) -> Tuple[float]:
"""Simplified `.limits` for exactly 1 obs, n limits: return the tuple(low_1, ..., low_n, up_1, ..., up_n).
Returns:
tuple(float, float, ...): so `low_1, low_2, up_1, up_2 = space.limits1d` for several, 1 obs limits.
low_1 to up_1 is the first interval, low_2 to up_2 is the second interval etc.
Raises:
RuntimeError: if the conditions (n_obs or n_limits) are not satisfied.
"""
if self.n_obs > 1:
raise RuntimeError("Cannot call `limits1d, as `Space` has more than one observable: {}".format(self.n_obs))
# if self.n_limits > 1:
# raise RuntimeError("Cannot call `limit1d, as `Space` has several limits: {}".format(self.n_limits))
limits = self.limits
if limits in (None, False):
limit = limits
else:
new_lower, new_upper = [], []
for lower, upper in self.iter_limits(as_tuple=True):
new_lower.append(lower[0])
new_upper.append(upper[0])
new_lower = tuple(new_lower)
new_upper = tuple(new_upper)
limit = *new_lower, *new_upper
return limit
@property
def lower(self) -> ztyping.LowerTypeReturn:
"""Return the lower limits.
Returns:
"""
limits = self.limits
if limits is None or limits is False:
return limits
else:
return limits[0]
@property
def upper(self) -> ztyping.UpperTypeReturn:
"""Return the upper limits.
Returns:
"""
limits = self.limits
if limits is None or limits is False:
return limits
else:
return self.limits[1]
@property
def n_obs(self) -> int: # TODO(naming): better name? Like rank?
"""Return the number of observables/axes.
Returns:
int >= 1
"""
if self.obs is None:
length = len(self.axes)
else:
length = len(self.obs)
return length
@property
def n_limits(self) -> int:
"""The number of different limits.
Returns:
int >= 1
"""
if self.lower is None or self.lower is False:
return 0
return len(self.lower)
@property
def obs(self) -> ztyping.ObsTypeReturn:
"""The observables ("axes with str")the space is defined in.
Returns:
"""
return self._obs
@property
def axes(self) -> ztyping.AxesTypeReturn:
"""The axes ("obs with int") the space is defined in.
Returns:
"""
return self._axes
[docs] def get_axes(self, obs: ztyping.ObsTypeInput = None,
as_dict: bool = False,
autofill: bool = False) -> Union[ztyping.AxesTypeReturn, Dict[str, int]]:
"""Return the axes corresponding to the `obs` (or all if None).
Args:
obs ():
as_dict (bool): If True, returns a ordered dictionary with {obs: axis}
autofill (bool): If True and the axes are not specified, automatically fill
them with the default numbering and return (not setting them).
Returns:
Tuple, OrderedDict
Raises:
ValueError: if the requested `obs` do not match with the one defined in the range
AxesNotSpecifiedError: If the axes in this :py:class:`~zfit.Space` have not been specified.
"""
# check input
obs = self._check_convert_input_obs(obs=obs, allow_none=True)
axes = self.axes
if axes is None:
if autofill:
axes = tuple(range(self.n_obs))
else:
raise AxesNotSpecifiedError("The axes have not been specified")
if obs is not None:
try:
axes = tuple(axes[self.obs.index(o)] for o in obs)
except KeyError:
missing_obs = set(obs) - set(self.obs)
raise ValueError("The requested observables {mis} are not contained in the defined "
"observables {obs}".format(mis=missing_obs, obs=self.obs))
else:
obs = self.obs
if as_dict:
axes = OrderedDict((o, ax) for o, ax in zip(obs, axes))
return axes
[docs] def iter_limits(self, as_tuple: bool = True) -> ztyping._IterLimitsTypeReturn:
"""Return the limits, either as :py:class:`~zfit.Space` objects or as pure limits-tuple.
This makes iterating over limits easier: `for limit in space.iter_limits()`
allows to, for example, pass `limit` to a function that can deal with simple limits
only or if `as_tuple` is True the `limit` can be directly used to calculate something.
Example:
.. code:: python
for lower, upper in space.iter_limits(as_tuple=True):
integrals = integrate(lower, upper) # calculate integral
integral = sum(integrals)
Returns:
List[:py:class:`~zfit.Space`] or List[limit,...]:
"""
if not self.limits:
raise LimitsNotSpecifiedError("Space does not have limits, cannot iterate over them.")
if as_tuple:
return tuple(zip(self.lower, self.upper))
else:
space_objects = []
for lower, upper in self.iter_limits(as_tuple=True):
if not (lower is None or lower is False):
lower = (lower,)
upper = (upper,)
limit = lower, upper
else:
limit = lower
space = type(self)._from_any(obs=self.obs, axes=self.axes, limits=limit)
space_objects.append(space)
return tuple(space_objects)
[docs] def with_limits(self, limits: ztyping.LimitsTypeInput, name: Optional[str] = None) -> "zfit.Space":
"""Return a copy of the space with the new `limits` (and the new `name`).
Args:
limits ():
name (str):
Returns:
:py:class:`~zfit.Space`
"""
new_space = self.copy(limits=limits, name=name)
return new_space
[docs] def with_obs(self, obs: ztyping.ObsTypeInput) -> "zfit.Space":
"""Sort by `obs` and return the new instance.
Args:
obs ():
Returns:
:py:class:`~zfit.Space`
"""
if obs is None or obs == self.obs:
return self
obs = self._convert_obs_to_str(obs)
new_indices = self.get_reorder_indices(obs=obs)
new_space = self.reorder_by_indices(indices=new_indices)
return new_space
[docs] def with_axes(self, axes: ztyping.AxesTypeInput) -> "zfit.Space":
"""Sort by `obs` and return the new instance.
Args:
axes ():
Returns:
:py:class:`~zfit.Space`
"""
# TODO: what if self.axes is None? Just add them?
if axes is None or axes == self.axes:
return self
axes = self._convert_axes_to_int(axes)
new_indices = self.get_reorder_indices(axes=axes)
new_space = self.copy()
new_space.reorder_by_indices(indices=new_indices)
return new_space
[docs] def get_reorder_indices(self, obs: ztyping.ObsTypeInput = None,
axes: ztyping.AxesTypeInput = None) -> Tuple[int]:
"""Indices that would order `self.obs` as `obs` respectively `self.axes` as `axes`.
Args:
obs ():
axes ():
Returns:
"""
obs_none = obs is None
axes_none = axes is None
obs_is_defined = self.obs is not None and not obs_none
axes_is_defined = self.axes is not None and not axes_none
if not (obs_is_defined or axes_is_defined):
raise ValueError("Neither the `obs` nor `axes` are defined.")
if obs_is_defined:
old, new = self.obs, [o for o in obs if o in self.obs]
else:
old, new = self.axes, [a for a in axes if a in self.axes]
new_indices = _reorder_indices(old=old, new=new)
return new_indices
[docs] def reorder_by_indices(self, indices: Tuple[int]):
"""Return a :py:class:`~zfit.Space` reordered by the indices.
Args:
indices ():
"""
new_space = self.copy()
new_space._reorder_limits(indices=indices)
new_space._reorder_axes(indices=indices)
new_space._reorder_obs(indices=indices)
return new_space
def _reorder_limits(self, indices: Tuple[int], inplace: bool = True) -> ztyping.LimitsTypeReturn:
limits = self.limits
if limits is not None and limits is not False:
lower, upper = limits
lower = tuple(tuple(lower[i] for i in indices) for lower in lower)
upper = tuple(tuple(upper[i] for i in indices) for upper in upper)
limits = lower, upper
if inplace:
self._limits = limits
return limits
def _reorder_axes(self, indices: Tuple[int], inplace: bool = True) -> ztyping.AxesTypeReturn:
axes = self.axes
if axes is not None:
axes = tuple(axes[i] for i in indices)
if inplace:
self._axes = axes
return axes
def _reorder_obs(self, indices: Tuple[int], inplace: bool = True) -> ztyping.ObsTypeReturn:
obs = self.obs
if obs is not None:
obs = tuple(obs[i] for i in indices)
if inplace:
self._obs = obs
return obs
[docs] def get_obs_axes(self, obs: ztyping.ObsTypeInput = None, axes: ztyping.AxesTypeInput = None):
if self.obs is None:
raise ObsNotSpecifiedError("Obs not specified, cannot create `obs_axes`")
if self.axes is None:
raise AxesNotSpecifiedError("Axes not specified, cannot create `obs_axes`")
obs = self._check_convert_input_obs(obs, allow_none=True)
axes = self._check_convert_input_axes(axes, allow_none=True)
if obs is not None and axes is not None:
raise OverdefinedError("Cannot use `obs` and `axes` to define which subset to access.")
obs = self.obs if obs is None else obs
axes = self.axes if axes is None else axes
# only membership testing below
obs = frozenset(obs)
axes = frozenset(axes)
# create obs_axes dict
obs_axes = OrderedDict((o, ax) for o, ax in self.obs_axes.items() if o in obs or ax in axes)
return obs_axes
@property
def obs_axes(self):
# TODO(Mayou36): what if axes is None?
return OrderedDict((o, ax) for o, ax in zip(self.obs, self.axes))
def _set_obs_axes(self, obs_axes: Union[ztyping.OrderedDict[str, int], Dict[str, int]], ordered: bool = False,
allow_subset=False):
"""(Reorder) set the observables and the `axes`.
Temporarily if used with a context manager.
Args:
obs_axes (OrderedDict[str, int]): An (ordered) dict with {obs: axes}.
allow_subset ():
Returns:
"""
if ordered and not isinstance(obs_axes, OrderedDict):
raise IntentionNotUnambiguousError("`ordered` is True but not an `OrderedDict` was given."
"Error due to safety (in Python <3.7, dicts are not guaranteed to be"
"ordered).")
tmp_obs = self.obs if self.obs is not None else obs_axes.keys()
self_obs_set = frozenset(tmp_obs)
tmp_axes = self.axes if self.axes is not None else obs_axes.values()
self_axes_set = frozenset(tmp_axes)
if ordered:
if self.obs is not None:
# if not frozenset(obs_axes.keys()) <= self_obs_set:
# raise ValueError("TODO observables not contained")
if not allow_subset and frozenset(obs_axes.keys()) < self_obs_set:
raise ValueError("subset not allowed but `obs` is only a subset of `self.obs`")
permutation_index = tuple(
self.obs.index(o) for o in obs_axes if o in self_obs_set) # the future index of the space
self_axes_set = set(obs_axes[o] for o in self.obs if o in obs_axes)
elif self.axes is not None:
if not frozenset(obs_axes.values()) <= self_axes_set:
raise ValueError("TODO axes not contained")
if not allow_subset and frozenset(obs_axes.values()) < self_axes_set:
raise ValueError("subset not allowed but `axes` is only a subset of `self.axes`")
permutation_index = tuple(
self.axes.index(ax) for ax in obs_axes.values() if
ax in self_axes_set) # the future index of the space
self_obs_set = set(o for o, ax in obs_axes.items() if ax in self.axes)
else:
assert False, "This should never be reached."
limits = self._reorder_limits(indices=permutation_index, inplace=False)
obs = tuple(o for o in obs_axes.keys() if o in self_obs_set)
axes = tuple(ax for ax in obs_axes.values() if ax in self_axes_set)
else:
if self.obs is not None:
if not allow_subset and frozenset(obs_axes.keys()) < self_obs_set:
raise ValueError("subset not allowed TODO")
obs = self.obs
axes = tuple(obs_axes[o] for o in obs)
elif self.axes is not None:
if not allow_subset and frozenset(obs_axes.values()) < self_axes_set:
raise ValueError("subset not allowed TODO")
axes = self.axes
axes_obs = {v: k for k, v in obs_axes.items()}
obs = tuple(axes_obs[ax] for ax in axes)
else:
raise ValueError("Either `obs` or `axes` have to be specified if the `obs_axes` dict"
"is not ordered and `ordered` is False.")
limits = self.limits
value = limits, obs, axes
def setter(arguments):
limits, obs, axes = arguments
self._obs = obs
self._axes = axes
self._check_set_limits(limits=limits)
def getter():
return self.limits, self.obs, self.axes
return TemporarilySet(value=value, setter=setter, getter=getter)
[docs] def with_obs_axes(self, obs_axes: Union[ztyping.OrderedDict[str, int], Dict[str, int]], ordered: bool = False,
allow_subset=False) -> "zfit.Space":
"""Return a new :py:class:`~zfit.Space` with reordered observables and set the `axes`.
Args:
obs_axes (OrderedDict[str, int]): An ordered dict with {obs: axes}.
ordered (bool): If True (and the `obs_axes` is an `OrderedDict`), the
allow_subset ():
Returns:
:py:class:`~zfit.Space`:
"""
new_space = type(self)._from_any(obs=self.obs, axes=self.axes, limits=self.limits)
new_space._set_obs_axes(obs_axes=obs_axes, ordered=ordered, allow_subset=allow_subset)
return new_space
[docs] def with_autofill_axes(self, overwrite: bool = False) -> "zfit.Space":
"""Return a :py:class:`~zfit.Space` with filled axes corresponding to range(len(n_obs)).
Args:
overwrite (bool): If `self.axes` is not None, replace the axes with the autofilled ones.
If axes is already set, don't do anything if `overwrite` is False.
Returns:
:py:class:`~zfit.Space`
"""
if self.axes is None or overwrite:
new_axes = tuple(range(self.n_obs))
new_space = self.copy(axes=new_axes)
else:
new_space = self
return new_space
[docs] def area(self) -> float:
"""Return the total area of all the limits and axes. Useful, for example, for MC integration."""
return sum(self.iter_areas(rel=False))
[docs] def iter_areas(self, rel: bool = False) -> Tuple[float, ...]:
"""Return the areas of each interval
Args:
rel (bool): If True, return the relative fraction of each interval
Returns:
Tuple[float]:
"""
areas = self._calculate_areas(limits=self.limits)
if rel:
areas = np.array(areas)
areas /= areas.sum()
areas = tuple(areas)
return areas
@staticmethod
@functools.lru_cache()
def _calculate_areas(limits) -> Tuple[float]:
areas = tuple(float(np.prod(np.array(up) - np.array(low))) for low, up in zip(*limits))
return areas
[docs] def get_subspace(self, obs: ztyping.ObsTypeInput = None, axes: ztyping.AxesTypeInput = None,
name: Optional[str] = None) -> "zfit.Space":
"""Create a :py:class:`~zfit.Space` consisting of only a subset of the `obs`/`axes` (only one allowed).
Args:
obs (str, Tuple[str]):
axes (int, Tuple[int]):
name ():
Returns:
"""
if obs is not None and axes is not None:
raise ValueError("Cannot specify `obs` *and* `axes` to get subspace.")
if axes is None and obs is None:
raise ValueError("Either `obs` or `axes` has to be specified and not None")
# try to use observables to get index
obs = self._check_convert_input_obs(obs=obs, allow_none=True)
if obs is not None:
try:
sub_index = tuple(self.obs.index(o) for o in obs)
except ValueError as error:
print("Original message: ", error)
raise KeyError("Cannot get subspace from `obs` {} as this observables are not defined"
"in this space. Only {} is defined.".format(set(obs) - set(self.obs), set(self.obs)))
except AttributeError: # `obs` is None -> has not attribute `index`
raise ObsNotSpecifiedError("Observables have not been specified in this space.")
# try to use axes to get index
axes = self._check_convert_input_axes(axes=axes, allow_none=True)
if axes is not None:
try:
sub_index = tuple(self.axes.index(ax) for ax in axes)
except ValueError as error:
print("Original message: ", error)
raise KeyError("Cannot get subspace from `axes` {} as this axes are not defined"
"in this space. Only the following axes are {}"
"".format(set(axes) - set(self.axes), self.axes))
except AttributeError:
raise AxesNotSpecifiedError("Axes have not been specified for this space.")
sub_obs = self.obs if self.obs is None else tuple(self.obs[i] for i in sub_index)
sub_axes = self.axes if self.axes is None else tuple(self.axes[i] for i in sub_index)
# use index to get limits
limits = self.limits
if limits is None or limits is False:
sub_limits = limits
else:
lower, upper = limits
sub_lower = tuple(tuple(lim[i] for i in sub_index) for lim in lower)
sub_upper = tuple(tuple(lim[i] for i in sub_index) for lim in upper)
sub_limits = sub_lower, sub_upper
new_space = type(self)._from_any(obs=sub_obs, axes=sub_axes, limits=sub_limits, name=name)
return new_space
# Operators
[docs] def copy(self, name: Optional[str] = None, **overwrite_kwargs) -> "zfit.Space":
"""Create a new :py:class:`~zfit.Space` using the current attributes and overwriting with
`overwrite_overwrite_kwargs`.
Args:
name (str): The new name. If not given, the new instance will be named the same as the
current one.
**overwrite_kwargs ():
Returns:
:py:class:`~zfit.Space`
"""
name = self.name if name is None else name
kwargs = {'name': name,
'limits': self.limits,
'axes': self.axes,
'obs': self.obs}
kwargs.update(overwrite_kwargs)
if set(overwrite_kwargs) - set(kwargs):
raise KeyError("Not usable keys in `overwrite_kwargs`: {}".format(set(overwrite_kwargs) - set(kwargs)))
new_space = type(self)._from_any(**kwargs)
return new_space
def __le__(self, other):
if not isinstance(other, type(self)):
return NotImplemented
axes_not_none = self.axes is not None and other.axes is not None
obs_not_none = self.obs is not None and other.obs is not None
if not (axes_not_none or obs_not_none): # if both are None
return False
if axes_not_none:
if set(self.axes) != set(other.axes):
return False
if obs_not_none:
if set(self.obs) != set(other.obs):
return False
# check limits
if self.limits is None:
if other.limits is None:
return True
else:
return False
elif self.limits is False:
if other.limits is False:
return True
else:
return False
reorder_indices = other.get_reorder_indices(obs=self.obs, axes=self.axes)
other = other.reorder_by_indices(reorder_indices)
# check explicitly if they match
# for each limit in self, find another matching in other
for lower, upper in self.iter_limits(as_tuple=True):
limit_is_le = False
for other_lower, other_upper in other.iter_limits(as_tuple=True):
# each entry *has to* match the entry of the other limit, otherwise it's not the same
for low, up, other_low, other_up in zip(lower, upper, other_lower, other_upper):
axis_le = 0 # False
axis_le += other_low == low and up == other_up # TODO: approx limit comparison?
axis_le += other_low == low and other_up is ANY_UPPER # TODO: approx limit
# comparison?
axis_le += other_low is ANY_LOWER and up == other_up # TODO: approx limit
# comparison?
axis_le += other_low is ANY_LOWER and other_up is ANY_UPPER
if not axis_le: # if not the same, don't test other dims
break
else:
limit_is_le = True # no break -> all axes coincide
if not limit_is_le: # for this `limit`, no other_limit matched
return False
return True
[docs] def add(self, other: ztyping.SpaceOrSpacesTypeInput):
"""Add the limits of the spaces. Only works for the same obs.
In case the observables are different, the order of the first space is taken.
Args:
other (:py:class:`~zfit.Space`):
Returns:
:py:class:`~zfit.Space`:
"""
other = convert_to_container(other, container=list)
new_space = add_spaces([self] + other)
return new_space
[docs] def combine(self, other: ztyping.SpaceOrSpacesTypeInput) -> ZfitSpace:
"""Combine spaces with different obs (but consistent limits).
Args:
other (:py:class:`~zfit.Space`):
Returns:
:py:class:`~zfit.Space`:
"""
other = convert_to_container(other, container=list)
new_space = combine_spaces([self] + other)
return new_space
def __add__(self, other):
if not isinstance(other, ZfitSpace):
raise TypeError("Cannot add a {} and a {}".format(type(self), type(other)))
return self.add(other)
def __mul__(self, other):
if not isinstance(other, ZfitSpace):
raise TypeError("Cannot combine a {} and a {}".format(type(self), type(other)))
return self.combine(other)
def __ge__(self, other):
return other.__le__(self)
def __eq__(self, other):
if not isinstance(self, type(other)): # TODO(Mayou36): what is a proper comparison?
return NotImplemented
is_eq = True
is_eq *= self.obs == other.obs
is_eq *= self.axes == other.axes or self.axes is None or other.axes is None
is_eq *= self.limits == other.limits
return bool(is_eq)
def __hash__(self):
lower = self.lower
upper = self.upper
if not (lower is None or lower is False): # we want to be non-restrictive: it's just a hash, not the __eq__
lower = frozenset(frozenset(lim) for lim in lower)
if not (upper is None or upper is False):
upper = frozenset(frozenset(lim) for lim in upper)
return hash((lower, upper))
[docs]def convert_to_space(obs: Optional[ztyping.ObsTypeInput] = None, axes: Optional[ztyping.AxesTypeInput] = None,
limits: Optional[ztyping.LimitsTypeInput] = None,
*, overwrite_limits: bool = False, one_dim_limits_only: bool = True,
simple_limits_only: bool = True) -> Union[None, Space, bool]:
"""Convert *limits* to a :py:class:`~zfit.Space` object if not already None or False.
Args:
obs (Union[Tuple[float, float], :py:class:`~zfit.Space`]):
limits ():
axes ():
overwrite_limits (bool): If `obs` or `axes` is a :py:class:`~zfit.Space` _and_ `limits` are given, return an instance
of :py:class:`~zfit.Space` with the new limits. If the flag is `False`, the `limits` argument will be
ignored if
one_dim_limits_only (bool):
simple_limits_only (bool):
Returns:
Union[:py:class:`~zfit.Space`, False, None]:
Raises:
OverdefinedError: if `obs` or `axes` is a :py:class:`~zfit.Space` and `axes` respectively `obs` is not `None`.
"""
space = None
# Test if already `Space` and handle
if isinstance(obs, Space):
if axes is not None:
raise OverdefinedError("if `obs` is a `Space`, `axes` cannot be defined.")
space = obs
elif isinstance(axes, Space):
if obs is not None:
raise OverdefinedError("if `axes` is a `Space`, `obs` cannot be defined.")
space = axes
elif isinstance(limits, Space):
return limits
if space is not None:
# set the limits if given
if limits is not None and (overwrite_limits or space.limits is None):
if isinstance(limits, Space): # figure out if compatible if limits is `Space`
if not (limits.obs == space.obs or
(limits.axes == space.axes and limits.obs is None and space.obs is None)):
raise IntentionNotUnambiguousError(
"`obs`/`axes` is a `Space` as well as the `limits`, but the "
"obs/axes of them do not match")
else:
limits = limits.limits
space = space.with_limits(limits=limits)
return space
# space is None again
if not (obs is None and axes is None):
# check if limits are allowed
space = Space._from_any(obs=obs, axes=axes, limits=limits) # create and test if valid
if one_dim_limits_only and space.n_obs > 1 and space.limits:
raise LimitsUnderdefinedError(
"Limits more sophisticated than 1-dim cannot be auto-created from tuples. Use `Space` instead.")
if simple_limits_only and space.limits and space.n_limits > 1:
raise LimitsUnderdefinedError("Limits with multiple limits cannot be auto-created"
" from tuples. Use `Space` instead.")
return space
def _reorder_indices(old: Union[List, Tuple], new: Union[List, Tuple]) -> Tuple[int]:
new_indices = tuple(old.index(o) for o in new)
return new_indices
[docs]def no_norm_range(func):
"""Decorator: Catch the 'norm_range' kwargs. If not None, raise NormRangeNotImplementedError."""
parameters = inspect.signature(func).parameters
keys = list(parameters.keys())
if 'norm_range' in keys:
norm_range_index = keys.index('norm_range')
else:
norm_range_index = None
@functools.wraps(func)
def new_func(*args, **kwargs):
norm_range = kwargs.get('norm_range')
if isinstance(norm_range, Space):
norm_range_not_false = not (norm_range.limits is None or norm_range.limits is False)
else:
norm_range_not_false = not (norm_range is None or norm_range is False)
if norm_range_index is not None:
norm_range_is_arg = len(args) > norm_range_index
else:
norm_range_is_arg = False
kwargs.pop('norm_range', None) # remove if in signature (= norm_range_index not None)
if norm_range_not_false or norm_range_is_arg:
raise NormRangeNotImplementedError()
else:
return func(*args, **kwargs)
return new_func
[docs]def no_multiple_limits(func):
"""Decorator: Catch the 'limits' kwargs. If it contains multiple limits, raise MultipleLimitsNotImplementedError."""
parameters = inspect.signature(func).parameters
keys = list(parameters.keys())
if 'limits' in keys:
limits_index = keys.index('limits')
else:
return func # no limits as parameters -> no problem
@functools.wraps(func)
def new_func(*args, **kwargs):
limits_is_arg = len(args) > limits_index
if limits_is_arg:
limits = args[limits_index]
else:
limits = kwargs['limits']
if limits.n_limits > 1:
raise MultipleLimitsNotImplementedError
else:
return func(*args, **kwargs)
return new_func
[docs]def supports(*, norm_range: bool = False, multiple_limits: bool = False) -> Callable:
"""Decorator: Add (mandatory for some methods) on a method to control what it can handle.
If any of the flags is set to False, it will check the arguments and, in case they match a flag
(say if a *norm_range* is passed while the *norm_range* flag is set to `False`), it will
raise a corresponding exception (in this example a `NormRangeNotImplementedError`) that will
be catched by an earlier function that knows how to handle things.
Args:
norm_range (bool): If False, no norm_range argument will be passed through resp. will be `None`
multiple_limits (bool): If False, only simple limits are to be expected and no iteration is
therefore required.
"""
decorator_stack = []
if not multiple_limits:
decorator_stack.append(no_multiple_limits)
if not norm_range:
decorator_stack.append(no_norm_range)
def create_deco_stack(func):
for decorator in reversed(decorator_stack):
func = decorator(func)
func.__wrapped__ = supports
return func
return create_deco_stack
[docs]def convert_to_obs_str(obs):
"""Convert `obs` to the list of obs, also if it is a :py:class:`~zfit.Space`.
"""
obs = convert_to_container(value=obs, container=tuple)
new_obs = []
for ob in obs:
if isinstance(ob, Space):
new_obs.extend(ob.obs)
else:
new_obs.append(ob)
return new_obs
[docs]def contains_tensor(object):
tensor_found = isinstance(object, (tf.Tensor, tf.Variable))
with suppress(TypeError):
for obj in object:
if tensor_found:
break
tensor_found += contains_tensor(obj)
return tensor_found
[docs]def shape_np_tf(object):
if contains_tensor(object):
shape = tuple(tf.convert_to_tensor(object).shape.as_list())
else:
shape = np.shape(object)
return shape