Source code for zfit.core.limits

"""
.. include:: ../../docs/subst_types.rst

# NamedSpace and limits

Limits define a certain interval in a specific dimension. This can be used to define, for example,
the limits of an integral over several dimensions or a normalization range.



### with limits

Therefore a different way of specifying limits is possible, basically by defining chunks of the
lower and the upper limits. The shape of a lower resp. upper limit is (n_limits, n_obs).

Example: 1-dim: 1 to 4, 2.-dim: 21 to 24 AND 1.-dim: 6 to 7, 2.-dim 26 to 27
>>> lower = ((1, 21), (6, 26))
>>> upper = ((4, 24), (7, 27))
>>> limits2 = Space(limits=(lower, upper), obs=('obs1', 'obs2')

General form:

lower = ((lower1_dim1, lower1_dim2, lower1_dim3), (lower2_dim1, lower2_dim2, lower2_dim3),...)
upper = ((upper1_dim1, upper1_dim2, upper1_dim3), (upper2_dim1, upper2_dim2, upper2_dim3),...)

## Using :py:class:`~zfit.Space`

:py:class:`NamedSpace` offers a few useful functions to easier deal with the intervals

### Handling areas

For example when doing a MC integration using the expectation value, it is mandatory to know
the total area of your intervals. You can retrieve the total area or (if multiple limits (=intervals
 are given) the area of each interval.

 >>> area = limits2.area()
 >>> area_1, area_2 = limits2.iter_areas(rel=False)  # if rel is True, return the fraction of 1


### Retrieve the limits

>>> lower, upper = limits2.limits

which you can now iterate through. For example, to calc an integral (assuming there is a function
`integrate` taking the lower and upper limits and returning the function), you can do
>>> def integrate(lower_limit, upper_limit): return 42  # dummy function
>>> integral = sum(integrate(lower_limit=low, upper_limit=up) for low, up in zip(lower, upper))
"""
#  Copyright (c) 2020 zfit

# TODO(Mayou36): update docs above

import functools
import inspect
from collections import OrderedDict
from contextlib import suppress
from typing import Callable, Dict, List, Optional, Tuple, Union

import numpy as np
import tensorflow as tf

from .baseobject import BaseObject
from .dimension import add_spaces, combine_spaces
from .interfaces import ZfitSpace
from ..util import ztyping
from ..util.checks import NOT_SPECIFIED
from ..util.container import convert_to_container
from ..util.exception import (AxesNotSpecifiedError, IntentionNotUnambiguousError, LimitsUnderdefinedError,
                              MultipleLimitsNotImplementedError, NormRangeNotImplementedError, ObsNotSpecifiedError,
                              OverdefinedError, LimitsNotSpecifiedError, )
from ..util.temporary import TemporarilySet


# Singleton
[docs]class Any: _singleton_instance = None def __new__(cls, *args, **kwargs): instance = cls._singleton_instance if instance is None: instance = super().__new__(cls) cls._singleton_instance = instance return instance def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) cls._singleton_instance = None # each subclass is a singleton of "itself" def __repr__(self): return '<Any>' def __lt__(self, other): return True def __le__(self, other): return True # def __eq__(self, other): # return True def __ge__(self, other): return True def __gt__(self, other): return True
# def __hash__(self): # return
[docs]class AnyLower(Any): def __repr__(self): return '<Any Lower Limit>' # def __eq__(self, other): # return False def __ge__(self, other): return False def __gt__(self, other): return False
[docs]class AnyUpper(Any): def __repr__(self): return '<Any Upper Limit>' # def __eq__(self, other): # return False def __le__(self, other): return False def __lt__(self, other): return False
ANY = Any() ANY_LOWER = AnyLower() ANY_UPPER = AnyUpper()
[docs]class Space(ZfitSpace, BaseObject): AUTO_FILL = object() ANY = ANY ANY_LOWER = ANY_LOWER # TODO: needed? or move everything inside? ANY_UPPER = ANY_UPPER def __init__(self, obs: ztyping.ObsTypeInput, limits: Optional[ztyping.LimitsTypeInput] = None, name: Optional[str] = "Space"): """Define a space with the name (`obs`) of the axes (and it's number) and possibly it's limits. Args: obs (str, List[str,...]): limits (): name (str): """ obs = self._check_convert_input_obs(obs) if name is None: name = "Space_" + "_".join(obs) super().__init__(name=name) self._axes = None self._obs = obs self._check_set_limits(limits=limits) @classmethod def _from_any(cls, obs: ztyping.ObsTypeInput = None, axes: ztyping.AxesTypeInput = None, limits: Optional[ztyping.LimitsTypeInput] = None, name: str = None) -> "zfit.Space": if obs is None: new_space = cls.from_axes(axes=axes, limits=limits, name=name) else: new_space = cls(obs=obs, limits=limits, name=name) new_space._axes = axes return new_space
[docs] @classmethod def from_axes(cls, axes: ztyping.AxesTypeInput, limits: Optional[ztyping.LimitsTypeInput] = None, name: str = None) -> "zfit.Space": """Create a space from `axes` instead of from `obs`. Args: axes (): limits (): name (str): Returns: :py:class:`~zfit.Space` """ axes = convert_to_container(value=axes, container=tuple) if axes is None: raise AxesNotSpecifiedError("Axes cannot be `None`") fake_obs = (str(axis) for axis in axes) # in order to create an instance new_space = cls(obs=fake_obs, limits=limits, name=name) new_space._obs = None new_space._axes = new_space._check_convert_input_axes(axes) return new_space
@staticmethod def _convert_obs_to_str(obs): if isinstance(obs, Space): obs = obs.obs else: obs = convert_to_container(obs, container=tuple) return obs @staticmethod def _convert_axes_to_int(axes): if isinstance(axes, Space): axes = axes.axes else: axes = convert_to_container(axes, container=tuple) return axes def _check_set_limits(self, limits: ztyping.LimitsTypeInput): if limits is not None and limits is not False: lower, upper = limits limits = self._check_convert_input_lower_upper(lower=lower, upper=upper) self._limits = limits def _check_convert_input_lower_upper(self, lower, upper): lower = self._check_convert_input_limit(limit=lower) upper = self._check_convert_input_limit(limit=upper) lower_is_iterable = lower is not None or lower is not False upper_is_iterable = upper is not None or upper is not False if not (lower_is_iterable or upper_is_iterable) and lower is not upper: ValueError("Lower and upper limits wrong:" "\nlower = {lower}" "\nupper = {upper}".format(lower=lower, upper=upper)) if lower_is_iterable ^ upper_is_iterable: raise ValueError("Lower and upper limits wrong:" "\nlower = {l}" "\nupper = {u}".format(l=lower, u=upper)) if lower_is_iterable and upper_is_iterable: if not shape_np_tf(lower) == shape_np_tf(upper) or ( len(shape_np_tf(lower)) not in (2, 3)): # 3 for EventSpace eager raise ValueError("Lower and/or upper limits invalid:" "\nlower: {lower}" "\nupper: {upper}".format(lower=lower, upper=upper)) if not shape_np_tf(lower)[1] == self.n_obs: raise ValueError("Limits shape not compatible with number of obs/axes" "\nlower: {lower}" "\nupper: {upper}" "\nn_obs: {n_obs}".format(lower=lower, upper=upper, n_obs=self.n_obs)) return lower, upper def _check_convert_input_limit(self, limit: Union[ztyping.LowerTypeInput, ztyping.UpperTypeInput], replace=None) -> Union[ztyping.LowerTypeReturn, ztyping.UpperTypeReturn]: """Check and sanitize the lower or upper limit. Args: limit (): Returns: """ replace = {} if replace is None else replace if limit is NOT_SPECIFIED or limit is None: return None if (isinstance(limit, tuple) and limit == ()) or (isinstance(limit, np.ndarray) and limit.size == 0): raise ValueError("Currently, () is not supported as limits. Should this be default for None?") shape = shape_np_tf(limit) if shape == (): limit = ((limit,),) shape = shape_np_tf(limit[0]) if shape == (): raise ValueError("Shape of limit {} wrong.".format(limit)) # replace if replace: limit = tuple(tuple(replace.get(l, l) for l in lim) for lim in limit) return limit def _check_set_lower_upper(self, lower: ztyping.LowerTypeInput, upper: ztyping.UpperTypeInput): if lower is None or lower is False: limits = lower else: lower = self._check_convert_input_limit(lower) upper = self._check_convert_input_limit(upper) limits = lower, upper self._check_set_limits(limits=limits) def _check_convert_input_axes(self, axes: ztyping.AxesTypeInput, allow_none: bool = False) -> ztyping.AxesTypeReturn: if axes is None: if allow_none: return None else: raise AxesNotSpecifiedError("TODO: Cannot be None") axes = convert_to_container(value=axes, container=tuple) # TODO(Mayou36): extend like _check_obs? return axes def _check_convert_input_obs(self, obs: ztyping.ObsTypeInput, allow_none: bool = False) -> ztyping.ObsTypeReturn: """Input check: Convert `NOT_SPECIFIED` to None or check if obs are all strings. Args: obs (str, List[str], None, NOT_SPECIFIED): Returns: type: """ if obs is None: if allow_none: return None else: raise ObsNotSpecifiedError("TODO: Cannot be None") if isinstance(obs, Space): obs = obs.obs else: obs = convert_to_container(obs, container=tuple) obs_not_str = tuple(o for o in obs if not isinstance(o, str)) if obs_not_str: raise ValueError("The following observables are not strings: {}".format(obs_not_str)) return obs @property def limits(self) -> ztyping.LimitsTypeReturn: """Return the limits. Returns: """ return self._limits @property def limit1d(self) -> Tuple[float, float]: """Simplified limits getter for 1 obs, 1 limit only: return the tuple(lower, upper). Returns: tuple(float, float): so :code:`lower, upper = space.limit1d` for a simple, 1 obs limit. Raises: RuntimeError: if the conditions (n_obs or n_limits) are not satisfied. """ if self.n_obs > 1: raise RuntimeError("Cannot call `limit1d, as `Space` has more than one observables: {}".format(self.n_obs)) if self.n_limits > 1: raise RuntimeError("Cannot call `limit1d, as `Space` has several limits: {}".format(self.n_limits)) limits = self.limits if limits in (None, False): limit = limits else: (lower,), (upper,) = limits limit = lower[0], upper[0] return limit @property def limit2d(self) -> Tuple[float, float, float, float]: """Simplified `limits` for exactly 2 obs, 1 limit: return the tuple(low_obs1, low_obs2, up_obs1, up_obs2). Returns: tuple(float, float, float, float): so `low_x, low_y, up_x, up_y = space.limit2d` for a single, 2 obs limit. low_x is the lower limit in x, up_x is the upper limit in x etc. Raises: RuntimeError: if the conditions (n_obs or n_limits) are not satisfied. """ if self.n_obs != 2: raise RuntimeError("Cannot call `limit2d, as `Space` has not two observables: {}".format(self.n_obs)) if self.n_limits > 1: raise RuntimeError("Cannot call `limit2d, as `Space` has several limits: {}".format(self.n_limits)) limits = self.limits if limits in (None, False): limit = limits else: (lower,), (upper,) = limits limit = *lower, *upper return limit @property def limits1d(self) -> Tuple[float]: """Simplified `.limits` for exactly 1 obs, n limits: return the tuple(low_1, ..., low_n, up_1, ..., up_n). Returns: tuple(float, float, ...): so `low_1, low_2, up_1, up_2 = space.limits1d` for several, 1 obs limits. low_1 to up_1 is the first interval, low_2 to up_2 is the second interval etc. Raises: RuntimeError: if the conditions (n_obs or n_limits) are not satisfied. """ if self.n_obs > 1: raise RuntimeError("Cannot call `limits1d, as `Space` has more than one observable: {}".format(self.n_obs)) # if self.n_limits > 1: # raise RuntimeError("Cannot call `limit1d, as `Space` has several limits: {}".format(self.n_limits)) limits = self.limits if limits in (None, False): limit = limits else: new_lower, new_upper = [], [] for lower, upper in self.iter_limits(as_tuple=True): new_lower.append(lower[0]) new_upper.append(upper[0]) new_lower = tuple(new_lower) new_upper = tuple(new_upper) limit = *new_lower, *new_upper return limit @property def lower(self) -> ztyping.LowerTypeReturn: """Return the lower limits. Returns: """ limits = self.limits if limits is None or limits is False: return limits else: return limits[0] @property def upper(self) -> ztyping.UpperTypeReturn: """Return the upper limits. Returns: """ limits = self.limits if limits is None or limits is False: return limits else: return self.limits[1] @property def n_obs(self) -> int: # TODO(naming): better name? Like rank? """Return the number of observables/axes. Returns: int >= 1 """ if self.obs is None: length = len(self.axes) else: length = len(self.obs) return length @property def n_limits(self) -> int: """The number of different limits. Returns: int >= 1 """ if self.lower is None or self.lower is False: return 0 return len(self.lower) @property def obs(self) -> ztyping.ObsTypeReturn: """The observables ("axes with str")the space is defined in. Returns: """ return self._obs @property def axes(self) -> ztyping.AxesTypeReturn: """The axes ("obs with int") the space is defined in. Returns: """ return self._axes
[docs] def get_axes(self, obs: ztyping.ObsTypeInput = None, as_dict: bool = False, autofill: bool = False) -> Union[ztyping.AxesTypeReturn, Dict[str, int]]: """Return the axes corresponding to the `obs` (or all if None). Args: obs (): as_dict (bool): If True, returns a ordered dictionary with {obs: axis} autofill (bool): If True and the axes are not specified, automatically fill them with the default numbering and return (not setting them). Returns: Tuple, OrderedDict Raises: ValueError: if the requested `obs` do not match with the one defined in the range AxesNotSpecifiedError: If the axes in this :py:class:`~zfit.Space` have not been specified. """ # check input obs = self._check_convert_input_obs(obs=obs, allow_none=True) axes = self.axes if axes is None: if autofill: axes = tuple(range(self.n_obs)) else: raise AxesNotSpecifiedError("The axes have not been specified") if obs is not None: try: axes = tuple(axes[self.obs.index(o)] for o in obs) except KeyError: missing_obs = set(obs) - set(self.obs) raise ValueError("The requested observables {mis} are not contained in the defined " "observables {obs}".format(mis=missing_obs, obs=self.obs)) else: obs = self.obs if as_dict: axes = OrderedDict((o, ax) for o, ax in zip(obs, axes)) return axes
[docs] def iter_limits(self, as_tuple: bool = True) -> ztyping._IterLimitsTypeReturn: """Return the limits, either as :py:class:`~zfit.Space` objects or as pure limits-tuple. This makes iterating over limits easier: `for limit in space.iter_limits()` allows to, for example, pass `limit` to a function that can deal with simple limits only or if `as_tuple` is True the `limit` can be directly used to calculate something. Example: .. code:: python for lower, upper in space.iter_limits(as_tuple=True): integrals = integrate(lower, upper) # calculate integral integral = sum(integrals) Returns: List[:py:class:`~zfit.Space`] or List[limit,...]: """ if not self.limits: raise LimitsNotSpecifiedError("Space does not have limits, cannot iterate over them.") if as_tuple: return tuple(zip(self.lower, self.upper)) else: space_objects = [] for lower, upper in self.iter_limits(as_tuple=True): if not (lower is None or lower is False): lower = (lower,) upper = (upper,) limit = lower, upper else: limit = lower space = type(self)._from_any(obs=self.obs, axes=self.axes, limits=limit) space_objects.append(space) return tuple(space_objects)
[docs] def with_limits(self, limits: ztyping.LimitsTypeInput, name: Optional[str] = None) -> "zfit.Space": """Return a copy of the space with the new `limits` (and the new `name`). Args: limits (): name (str): Returns: :py:class:`~zfit.Space` """ new_space = self.copy(limits=limits, name=name) return new_space
[docs] def with_obs(self, obs: ztyping.ObsTypeInput) -> "zfit.Space": """Sort by `obs` and return the new instance. Args: obs (): Returns: :py:class:`~zfit.Space` """ if obs is None or obs == self.obs: return self obs = self._convert_obs_to_str(obs) new_indices = self.get_reorder_indices(obs=obs) new_space = self.reorder_by_indices(indices=new_indices) return new_space
[docs] def with_axes(self, axes: ztyping.AxesTypeInput) -> "zfit.Space": """Sort by `obs` and return the new instance. Args: axes (): Returns: :py:class:`~zfit.Space` """ # TODO: what if self.axes is None? Just add them? if axes is None or axes == self.axes: return self axes = self._convert_axes_to_int(axes) new_indices = self.get_reorder_indices(axes=axes) new_space = self.copy() new_space.reorder_by_indices(indices=new_indices) return new_space
[docs] def get_reorder_indices(self, obs: ztyping.ObsTypeInput = None, axes: ztyping.AxesTypeInput = None) -> Tuple[int]: """Indices that would order `self.obs` as `obs` respectively `self.axes` as `axes`. Args: obs (): axes (): Returns: """ obs_none = obs is None axes_none = axes is None obs_is_defined = self.obs is not None and not obs_none axes_is_defined = self.axes is not None and not axes_none if not (obs_is_defined or axes_is_defined): raise ValueError("Neither the `obs` nor `axes` are defined.") if obs_is_defined: old, new = self.obs, [o for o in obs if o in self.obs] else: old, new = self.axes, [a for a in axes if a in self.axes] new_indices = _reorder_indices(old=old, new=new) return new_indices
[docs] def reorder_by_indices(self, indices: Tuple[int]): """Return a :py:class:`~zfit.Space` reordered by the indices. Args: indices (): """ new_space = self.copy() new_space._reorder_limits(indices=indices) new_space._reorder_axes(indices=indices) new_space._reorder_obs(indices=indices) return new_space
def _reorder_limits(self, indices: Tuple[int], inplace: bool = True) -> ztyping.LimitsTypeReturn: limits = self.limits if limits is not None and limits is not False: lower, upper = limits lower = tuple(tuple(lower[i] for i in indices) for lower in lower) upper = tuple(tuple(upper[i] for i in indices) for upper in upper) limits = lower, upper if inplace: self._limits = limits return limits def _reorder_axes(self, indices: Tuple[int], inplace: bool = True) -> ztyping.AxesTypeReturn: axes = self.axes if axes is not None: axes = tuple(axes[i] for i in indices) if inplace: self._axes = axes return axes def _reorder_obs(self, indices: Tuple[int], inplace: bool = True) -> ztyping.ObsTypeReturn: obs = self.obs if obs is not None: obs = tuple(obs[i] for i in indices) if inplace: self._obs = obs return obs
[docs] def get_obs_axes(self, obs: ztyping.ObsTypeInput = None, axes: ztyping.AxesTypeInput = None): if self.obs is None: raise ObsNotSpecifiedError("Obs not specified, cannot create `obs_axes`") if self.axes is None: raise AxesNotSpecifiedError("Axes not specified, cannot create `obs_axes`") obs = self._check_convert_input_obs(obs, allow_none=True) axes = self._check_convert_input_axes(axes, allow_none=True) if obs is not None and axes is not None: raise OverdefinedError("Cannot use `obs` and `axes` to define which subset to access.") obs = self.obs if obs is None else obs axes = self.axes if axes is None else axes # only membership testing below obs = frozenset(obs) axes = frozenset(axes) # create obs_axes dict obs_axes = OrderedDict((o, ax) for o, ax in self.obs_axes.items() if o in obs or ax in axes) return obs_axes
@property def obs_axes(self): # TODO(Mayou36): what if axes is None? return OrderedDict((o, ax) for o, ax in zip(self.obs, self.axes)) def _set_obs_axes(self, obs_axes: Union[ztyping.OrderedDict[str, int], Dict[str, int]], ordered: bool = False, allow_subset=False): """(Reorder) set the observables and the `axes`. Temporarily if used with a context manager. Args: obs_axes (OrderedDict[str, int]): An (ordered) dict with {obs: axes}. allow_subset (): Returns: """ if ordered and not isinstance(obs_axes, OrderedDict): raise IntentionNotUnambiguousError("`ordered` is True but not an `OrderedDict` was given." "Error due to safety (in Python <3.7, dicts are not guaranteed to be" "ordered).") tmp_obs = self.obs if self.obs is not None else obs_axes.keys() self_obs_set = frozenset(tmp_obs) tmp_axes = self.axes if self.axes is not None else obs_axes.values() self_axes_set = frozenset(tmp_axes) if ordered: if self.obs is not None: # if not frozenset(obs_axes.keys()) <= self_obs_set: # raise ValueError("TODO observables not contained") if not allow_subset and frozenset(obs_axes.keys()) < self_obs_set: raise ValueError("subset not allowed but `obs` is only a subset of `self.obs`") permutation_index = tuple( self.obs.index(o) for o in obs_axes if o in self_obs_set) # the future index of the space self_axes_set = set(obs_axes[o] for o in self.obs if o in obs_axes) elif self.axes is not None: if not frozenset(obs_axes.values()) <= self_axes_set: raise ValueError("TODO axes not contained") if not allow_subset and frozenset(obs_axes.values()) < self_axes_set: raise ValueError("subset not allowed but `axes` is only a subset of `self.axes`") permutation_index = tuple( self.axes.index(ax) for ax in obs_axes.values() if ax in self_axes_set) # the future index of the space self_obs_set = set(o for o, ax in obs_axes.items() if ax in self.axes) else: assert False, "This should never be reached." limits = self._reorder_limits(indices=permutation_index, inplace=False) obs = tuple(o for o in obs_axes.keys() if o in self_obs_set) axes = tuple(ax for ax in obs_axes.values() if ax in self_axes_set) else: if self.obs is not None: if not allow_subset and frozenset(obs_axes.keys()) < self_obs_set: raise ValueError("subset not allowed TODO") obs = self.obs axes = tuple(obs_axes[o] for o in obs) elif self.axes is not None: if not allow_subset and frozenset(obs_axes.values()) < self_axes_set: raise ValueError("subset not allowed TODO") axes = self.axes axes_obs = {v: k for k, v in obs_axes.items()} obs = tuple(axes_obs[ax] for ax in axes) else: raise ValueError("Either `obs` or `axes` have to be specified if the `obs_axes` dict" "is not ordered and `ordered` is False.") limits = self.limits value = limits, obs, axes def setter(arguments): limits, obs, axes = arguments self._obs = obs self._axes = axes self._check_set_limits(limits=limits) def getter(): return self.limits, self.obs, self.axes return TemporarilySet(value=value, setter=setter, getter=getter)
[docs] def with_obs_axes(self, obs_axes: Union[ztyping.OrderedDict[str, int], Dict[str, int]], ordered: bool = False, allow_subset=False) -> "zfit.Space": """Return a new :py:class:`~zfit.Space` with reordered observables and set the `axes`. Args: obs_axes (OrderedDict[str, int]): An ordered dict with {obs: axes}. ordered (bool): If True (and the `obs_axes` is an `OrderedDict`), the allow_subset (): Returns: :py:class:`~zfit.Space`: """ new_space = type(self)._from_any(obs=self.obs, axes=self.axes, limits=self.limits) new_space._set_obs_axes(obs_axes=obs_axes, ordered=ordered, allow_subset=allow_subset) return new_space
[docs] def with_autofill_axes(self, overwrite: bool = False) -> "zfit.Space": """Return a :py:class:`~zfit.Space` with filled axes corresponding to range(len(n_obs)). Args: overwrite (bool): If `self.axes` is not None, replace the axes with the autofilled ones. If axes is already set, don't do anything if `overwrite` is False. Returns: :py:class:`~zfit.Space` """ if self.axes is None or overwrite: new_axes = tuple(range(self.n_obs)) new_space = self.copy(axes=new_axes) else: new_space = self return new_space
[docs] def area(self) -> float: """Return the total area of all the limits and axes. Useful, for example, for MC integration.""" return sum(self.iter_areas(rel=False))
[docs] def iter_areas(self, rel: bool = False) -> Tuple[float, ...]: """Return the areas of each interval Args: rel (bool): If True, return the relative fraction of each interval Returns: Tuple[float]: """ areas = self._calculate_areas(limits=self.limits) if rel: areas = np.array(areas) areas /= areas.sum() areas = tuple(areas) return areas
@staticmethod @functools.lru_cache() def _calculate_areas(limits) -> Tuple[float]: areas = tuple(float(np.prod(np.array(up) - np.array(low))) for low, up in zip(*limits)) return areas
[docs] def get_subspace(self, obs: ztyping.ObsTypeInput = None, axes: ztyping.AxesTypeInput = None, name: Optional[str] = None) -> "zfit.Space": """Create a :py:class:`~zfit.Space` consisting of only a subset of the `obs`/`axes` (only one allowed). Args: obs (str, Tuple[str]): axes (int, Tuple[int]): name (): Returns: """ if obs is not None and axes is not None: raise ValueError("Cannot specify `obs` *and* `axes` to get subspace.") if axes is None and obs is None: raise ValueError("Either `obs` or `axes` has to be specified and not None") # try to use observables to get index obs = self._check_convert_input_obs(obs=obs, allow_none=True) if obs is not None: try: sub_index = tuple(self.obs.index(o) for o in obs) except ValueError as error: print("Original message: ", error) raise KeyError("Cannot get subspace from `obs` {} as this observables are not defined" "in this space. Only {} is defined.".format(set(obs) - set(self.obs), set(self.obs))) except AttributeError: # `obs` is None -> has not attribute `index` raise ObsNotSpecifiedError("Observables have not been specified in this space.") # try to use axes to get index axes = self._check_convert_input_axes(axes=axes, allow_none=True) if axes is not None: try: sub_index = tuple(self.axes.index(ax) for ax in axes) except ValueError as error: print("Original message: ", error) raise KeyError("Cannot get subspace from `axes` {} as this axes are not defined" "in this space. Only the following axes are {}" "".format(set(axes) - set(self.axes), self.axes)) except AttributeError: raise AxesNotSpecifiedError("Axes have not been specified for this space.") sub_obs = self.obs if self.obs is None else tuple(self.obs[i] for i in sub_index) sub_axes = self.axes if self.axes is None else tuple(self.axes[i] for i in sub_index) # use index to get limits limits = self.limits if limits is None or limits is False: sub_limits = limits else: lower, upper = limits sub_lower = tuple(tuple(lim[i] for i in sub_index) for lim in lower) sub_upper = tuple(tuple(lim[i] for i in sub_index) for lim in upper) sub_limits = sub_lower, sub_upper new_space = type(self)._from_any(obs=sub_obs, axes=sub_axes, limits=sub_limits, name=name) return new_space
# Operators
[docs] def copy(self, name: Optional[str] = None, **overwrite_kwargs) -> "zfit.Space": """Create a new :py:class:`~zfit.Space` using the current attributes and overwriting with `overwrite_overwrite_kwargs`. Args: name (str): The new name. If not given, the new instance will be named the same as the current one. **overwrite_kwargs (): Returns: :py:class:`~zfit.Space` """ name = self.name if name is None else name kwargs = {'name': name, 'limits': self.limits, 'axes': self.axes, 'obs': self.obs} kwargs.update(overwrite_kwargs) if set(overwrite_kwargs) - set(kwargs): raise KeyError("Not usable keys in `overwrite_kwargs`: {}".format(set(overwrite_kwargs) - set(kwargs))) new_space = type(self)._from_any(**kwargs) return new_space
def __le__(self, other): if not isinstance(other, type(self)): return NotImplemented axes_not_none = self.axes is not None and other.axes is not None obs_not_none = self.obs is not None and other.obs is not None if not (axes_not_none or obs_not_none): # if both are None return False if axes_not_none: if set(self.axes) != set(other.axes): return False if obs_not_none: if set(self.obs) != set(other.obs): return False # check limits if self.limits is None: if other.limits is None: return True else: return False elif self.limits is False: if other.limits is False: return True else: return False reorder_indices = other.get_reorder_indices(obs=self.obs, axes=self.axes) other = other.reorder_by_indices(reorder_indices) # check explicitly if they match # for each limit in self, find another matching in other for lower, upper in self.iter_limits(as_tuple=True): limit_is_le = False for other_lower, other_upper in other.iter_limits(as_tuple=True): # each entry *has to* match the entry of the other limit, otherwise it's not the same for low, up, other_low, other_up in zip(lower, upper, other_lower, other_upper): axis_le = 0 # False axis_le += other_low == low and up == other_up # TODO: approx limit comparison? axis_le += other_low == low and other_up is ANY_UPPER # TODO: approx limit # comparison? axis_le += other_low is ANY_LOWER and up == other_up # TODO: approx limit # comparison? axis_le += other_low is ANY_LOWER and other_up is ANY_UPPER if not axis_le: # if not the same, don't test other dims break else: limit_is_le = True # no break -> all axes coincide if not limit_is_le: # for this `limit`, no other_limit matched return False return True
[docs] def add(self, other: ztyping.SpaceOrSpacesTypeInput): """Add the limits of the spaces. Only works for the same obs. In case the observables are different, the order of the first space is taken. Args: other (:py:class:`~zfit.Space`): Returns: :py:class:`~zfit.Space`: """ other = convert_to_container(other, container=list) new_space = add_spaces([self] + other) return new_space
[docs] def combine(self, other: ztyping.SpaceOrSpacesTypeInput) -> ZfitSpace: """Combine spaces with different obs (but consistent limits). Args: other (:py:class:`~zfit.Space`): Returns: :py:class:`~zfit.Space`: """ other = convert_to_container(other, container=list) new_space = combine_spaces([self] + other) return new_space
def __add__(self, other): if not isinstance(other, ZfitSpace): raise TypeError("Cannot add a {} and a {}".format(type(self), type(other))) return self.add(other) def __mul__(self, other): if not isinstance(other, ZfitSpace): raise TypeError("Cannot combine a {} and a {}".format(type(self), type(other))) return self.combine(other) def __ge__(self, other): return other.__le__(self) def __eq__(self, other): if not isinstance(self, type(other)): # TODO(Mayou36): what is a proper comparison? return NotImplemented is_eq = True is_eq *= self.obs == other.obs is_eq *= self.axes == other.axes or self.axes is None or other.axes is None is_eq *= self.limits == other.limits return bool(is_eq) def __hash__(self): lower = self.lower upper = self.upper if not (lower is None or lower is False): # we want to be non-restrictive: it's just a hash, not the __eq__ lower = frozenset(frozenset(lim) for lim in lower) if not (upper is None or upper is False): upper = frozenset(frozenset(lim) for lim in upper) return hash((lower, upper))
[docs]def convert_to_space(obs: Optional[ztyping.ObsTypeInput] = None, axes: Optional[ztyping.AxesTypeInput] = None, limits: Optional[ztyping.LimitsTypeInput] = None, *, overwrite_limits: bool = False, one_dim_limits_only: bool = True, simple_limits_only: bool = True) -> Union[None, Space, bool]: """Convert *limits* to a :py:class:`~zfit.Space` object if not already None or False. Args: obs (Union[Tuple[float, float], :py:class:`~zfit.Space`]): limits (): axes (): overwrite_limits (bool): If `obs` or `axes` is a :py:class:`~zfit.Space` _and_ `limits` are given, return an instance of :py:class:`~zfit.Space` with the new limits. If the flag is `False`, the `limits` argument will be ignored if one_dim_limits_only (bool): simple_limits_only (bool): Returns: Union[:py:class:`~zfit.Space`, False, None]: Raises: OverdefinedError: if `obs` or `axes` is a :py:class:`~zfit.Space` and `axes` respectively `obs` is not `None`. """ space = None # Test if already `Space` and handle if isinstance(obs, Space): if axes is not None: raise OverdefinedError("if `obs` is a `Space`, `axes` cannot be defined.") space = obs elif isinstance(axes, Space): if obs is not None: raise OverdefinedError("if `axes` is a `Space`, `obs` cannot be defined.") space = axes elif isinstance(limits, Space): return limits if space is not None: # set the limits if given if limits is not None and (overwrite_limits or space.limits is None): if isinstance(limits, Space): # figure out if compatible if limits is `Space` if not (limits.obs == space.obs or (limits.axes == space.axes and limits.obs is None and space.obs is None)): raise IntentionNotUnambiguousError( "`obs`/`axes` is a `Space` as well as the `limits`, but the " "obs/axes of them do not match") else: limits = limits.limits space = space.with_limits(limits=limits) return space # space is None again if not (obs is None and axes is None): # check if limits are allowed space = Space._from_any(obs=obs, axes=axes, limits=limits) # create and test if valid if one_dim_limits_only and space.n_obs > 1 and space.limits: raise LimitsUnderdefinedError( "Limits more sophisticated than 1-dim cannot be auto-created from tuples. Use `Space` instead.") if simple_limits_only and space.limits and space.n_limits > 1: raise LimitsUnderdefinedError("Limits with multiple limits cannot be auto-created" " from tuples. Use `Space` instead.") return space
def _reorder_indices(old: Union[List, Tuple], new: Union[List, Tuple]) -> Tuple[int]: new_indices = tuple(old.index(o) for o in new) return new_indices
[docs]def no_norm_range(func): """Decorator: Catch the 'norm_range' kwargs. If not None, raise NormRangeNotImplementedError.""" parameters = inspect.signature(func).parameters keys = list(parameters.keys()) if 'norm_range' in keys: norm_range_index = keys.index('norm_range') else: norm_range_index = None @functools.wraps(func) def new_func(*args, **kwargs): norm_range = kwargs.get('norm_range') if isinstance(norm_range, Space): norm_range_not_false = not (norm_range.limits is None or norm_range.limits is False) else: norm_range_not_false = not (norm_range is None or norm_range is False) if norm_range_index is not None: norm_range_is_arg = len(args) > norm_range_index else: norm_range_is_arg = False kwargs.pop('norm_range', None) # remove if in signature (= norm_range_index not None) if norm_range_not_false or norm_range_is_arg: raise NormRangeNotImplementedError() else: return func(*args, **kwargs) return new_func
[docs]def no_multiple_limits(func): """Decorator: Catch the 'limits' kwargs. If it contains multiple limits, raise MultipleLimitsNotImplementedError.""" parameters = inspect.signature(func).parameters keys = list(parameters.keys()) if 'limits' in keys: limits_index = keys.index('limits') else: return func # no limits as parameters -> no problem @functools.wraps(func) def new_func(*args, **kwargs): limits_is_arg = len(args) > limits_index if limits_is_arg: limits = args[limits_index] else: limits = kwargs['limits'] if limits.n_limits > 1: raise MultipleLimitsNotImplementedError else: return func(*args, **kwargs) return new_func
[docs]def supports(*, norm_range: bool = False, multiple_limits: bool = False) -> Callable: """Decorator: Add (mandatory for some methods) on a method to control what it can handle. If any of the flags is set to False, it will check the arguments and, in case they match a flag (say if a *norm_range* is passed while the *norm_range* flag is set to `False`), it will raise a corresponding exception (in this example a `NormRangeNotImplementedError`) that will be catched by an earlier function that knows how to handle things. Args: norm_range (bool): If False, no norm_range argument will be passed through resp. will be `None` multiple_limits (bool): If False, only simple limits are to be expected and no iteration is therefore required. """ decorator_stack = [] if not multiple_limits: decorator_stack.append(no_multiple_limits) if not norm_range: decorator_stack.append(no_norm_range) def create_deco_stack(func): for decorator in reversed(decorator_stack): func = decorator(func) func.__wrapped__ = supports return func return create_deco_stack
[docs]def convert_to_obs_str(obs): """Convert `obs` to the list of obs, also if it is a :py:class:`~zfit.Space`. """ obs = convert_to_container(value=obs, container=tuple) new_obs = [] for ob in obs: if isinstance(ob, Space): new_obs.extend(ob.obs) else: new_obs.append(ob) return new_obs
[docs]def contains_tensor(object): tensor_found = isinstance(object, (tf.Tensor, tf.Variable)) with suppress(TypeError): for obj in object: if tensor_found: break tensor_found += contains_tensor(obj) return tensor_found
[docs]def shape_np_tf(object): if contains_tensor(object): shape = tuple(tf.convert_to_tensor(object).shape.as_list()) else: shape = np.shape(object) return shape