Source code for zfit.core.limits

"""
.. include:: ../../docs/subst_types.rst

# NamedSpace and limits

Limits define a certain interval in a specific dimension. This can be used to define, for example,
the limits of an integral over several dimensions or a normalization range.



### with limits

Therefore a different way of specifying limits is possible, basically by defining chunks of the
lower and the upper limits. The shape of a lower resp. upper limit is (n_limits, n_obs).

Example: 1-dim: 1 to 4, 2.-dim: 21 to 24 AND 1.-dim: 6 to 7, 2.-dim 26 to 27
>>> lower = ((1, 21), (6, 26))
>>> upper = ((4, 24), (7, 27))
>>> limits2 = Space(limits=(lower, upper), obs=('obs1', 'obs2')

General form:

lower = ((lower1_dim1, lower1_dim2, lower1_dim3), (lower2_dim1, lower2_dim2, lower2_dim3),...)
upper = ((upper1_dim1, upper1_dim2, upper1_dim3), (upper2_dim1, upper2_dim2, upper2_dim3),...)

## Using :py:class:`~zfit.Space`

:py:class:`NamedSpace` offers a few useful functions to easier deal with the intervals

### Handling areas

For example when doing a MC integration using the expectation value, it is mandatory to know
the total area of your intervals. You can retrieve the total area or (if multiple limits (=intervals
 are given) the area of each interval.

 >>> area = limits2.area()
 >>> area_1, area_2 = limits2.iter_areas(rel=False)  # if rel is True, return the fraction of 1


### Retrieve the limits

>>> lower, upper = limits2.limits

which you can now iterate through. For example, to calc an integral (assuming there is a function
`integrate` taking the lower and upper limits and returning the function), you can do
>>> def integrate(lower_limit, upper_limit): return 42  # dummy function
>>> integral = sum(integrate(lower_limit=low, upper_limit=up) for low, up in zip(lower, upper))
"""
#  Copyright (c) 2019 zfit

# TODO(Mayou36): update docs above

from collections import OrderedDict
import copy
import functools
import inspect
from typing import Callable, Dict, List, Optional, Tuple, Union

import numpy as np

from .dimension import add_spaces, combine_spaces
from .baseobject import BaseObject
from .interfaces import ZfitSpace
from ..util import ztyping
from ..util.checks import NOT_SPECIFIED
from ..util.container import convert_to_container
from ..util.exception import (AxesNotSpecifiedError, IntentionNotUnambiguousError, LimitsUnderdefinedError,
                              MultipleLimitsNotImplementedError, NormRangeNotImplementedError, ObsNotSpecifiedError,
                              OverdefinedError, LimitsNotSpecifiedError, )
from ..util.temporary import TemporarilySet


# Singleton
[docs]class Any: _singleton_instance = None def __new__(cls, *args, **kwargs): instance = cls._singleton_instance if instance is None: instance = super().__new__(cls) cls._singleton_instance = instance return instance def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) cls._singleton_instance = None # each subclass is a singleton of "itself" def __repr__(self): return '<Any>' def __lt__(self, other): return True def __le__(self, other): return True # def __eq__(self, other): # return True def __ge__(self, other): return True def __gt__(self, other): return True
# def __hash__(self): # return
[docs]class AnyLower(Any): def __repr__(self): return '<Any Lower Limit>' # def __eq__(self, other): # return False def __ge__(self, other): return False def __gt__(self, other): return False
[docs]class AnyUpper(Any): def __repr__(self): return '<Any Upper Limit>' # def __eq__(self, other): # return False def __le__(self, other): return False def __lt__(self, other): return False
ANY = Any() ANY_LOWER = AnyLower() ANY_UPPER = AnyUpper()
[docs]class Space(ZfitSpace, BaseObject): AUTO_FILL = object() ANY = ANY ANY_LOWER = ANY_LOWER # TODO: needed? or move everything inside? ANY_UPPER = ANY_UPPER def __init__(self, obs: ztyping.ObsTypeInput, limits: Optional[ztyping.LimitsTypeInput] = None, name: Optional[str] = "Space"): """Define a space with the name (`obs`) of the axes (and it's number) and possibly it's limits. Args: obs (str, List[str,...]): limits (): name (str): """ obs = self._check_convert_input_obs(obs) if name is None: name = "Space_" + "_".join(obs) super().__init__(name=name) self._axes = None self._obs = obs self._check_set_limits(limits=limits) @classmethod def _from_any(cls, obs: ztyping.ObsTypeInput = None, axes: ztyping.AxesTypeInput = None, limits: Optional[ztyping.LimitsTypeInput] = None, name: str = None) -> "zfit.Space": if obs is None: new_space = cls.from_axes(axes=axes, limits=limits, name=name) else: new_space = cls(obs=obs, limits=limits, name=name) new_space._axes = axes return new_space
[docs] @classmethod def from_axes(cls, axes: ztyping.AxesTypeInput, limits: Optional[ztyping.LimitsTypeInput] = None, name: str = None) -> "zfit.Space": """Create a space from `axes` instead of from `obs`. Args: axes (): limits (): name (str): Returns: :py:class:`~zfit.Space` """ axes = convert_to_container(value=axes, container=tuple) if axes is None: raise AxesNotSpecifiedError("Axes cannot be `None`") fake_obs = (str(axis) for axis in axes) # in order to create an instance new_space = cls(obs=fake_obs, limits=limits, name=name) new_space._obs = None new_space._axes = new_space._check_convert_input_axes(axes) return new_space
@staticmethod def _convert_obs_to_str(obs): if isinstance(obs, Space): obs = obs.obs else: obs = convert_to_container(obs, container=tuple) return obs @staticmethod def _convert_axes_to_int(axes): if isinstance(axes, Space): axes = axes.axes else: axes = convert_to_container(axes, container=tuple) return axes def _check_set_limits(self, limits: ztyping.LimitsTypeInput): if limits is not None and limits is not False: lower, upper = limits limits = self._check_convert_input_lower_upper(lower=lower, upper=upper) self._limits = limits def _check_convert_input_lower_upper(self, lower, upper): lower = self._check_convert_input_limit(limit=lower) upper = self._check_convert_input_limit(limit=upper) lower_is_iterable = lower is not None or lower is not False upper_is_iterable = upper is not None or upper is not False if not (lower_is_iterable or upper_is_iterable) and lower is not upper: ValueError("Lower and upper limits wrong:" "\nlower = {lower}" "\nupper = {upper}".format(lower=lower, upper=upper)) if lower_is_iterable ^ upper_is_iterable: raise ValueError("Lower and upper limits wrong:" "\nlower = {l}" "\nupper = {u}".format(l=lower, u=upper)) if lower_is_iterable and upper_is_iterable: if not np.shape(lower) == np.shape(upper) or len(np.shape(lower)) != 2: raise ValueError("Lower and/or upper limits invalid:" "\nlower: {lower}" "\nupper: {upper}".format(lower=lower, upper=upper)) if not np.shape(lower)[1] == self.n_obs: raise ValueError("Limits shape not compatible with number of obs/axes" "\nlower: {lower}" "\nupper: {upper}" "\nn_obs: {n_obs}".format(lower=lower, upper=upper, n_obs=self.n_obs)) return lower, upper def _check_convert_input_limit(self, limit: Union[ztyping.LowerTypeInput, ztyping.UpperTypeInput], replace=None) -> Union[ztyping.LowerTypeReturn, ztyping.UpperTypeReturn]: """Check and sanitize the lower or upper limit. Args: limit (): Returns: """ replace = {} if replace is None else replace if limit is NOT_SPECIFIED or limit is None: return None if (isinstance(limit, tuple) and limit == ()) or (isinstance(limit, np.ndarray) and limit.size == 0): raise ValueError("Currently, () is not supported as limits. Should this be default for None?") if np.shape(limit) == (): limit = ((limit,),) if np.shape(limit[0]) == (): raise ValueError("Shape of limit {} wrong.".format(limit)) # replace if replace: limit = tuple(tuple(replace.get(l, l) for l in lim) for lim in limit) return limit def _check_set_lower_upper(self, lower: ztyping.LowerTypeInput, upper: ztyping.UpperTypeInput): if lower is None or lower is False: limits = lower else: lower = self._check_convert_input_limit(lower) upper = self._check_convert_input_limit(upper) limits = lower, upper self._check_set_limits(limits=limits) def _check_convert_input_axes(self, axes: ztyping.AxesTypeInput, allow_none: bool = False) -> ztyping.AxesTypeReturn: if axes is None: if allow_none: return None else: raise AxesNotSpecifiedError("TODO: Cannot be None") axes = convert_to_container(value=axes, container=tuple) # TODO(Mayou36): extend like _check_obs? return axes def _check_convert_input_obs(self, obs: ztyping.ObsTypeInput, allow_none: bool = False) -> ztyping.ObsTypeReturn: """Input check: Convert `NOT_SPECIFIED` to None or check if obs are all strings. Args: obs (str, List[str], None, NOT_SPECIFIED): Returns: type: """ if obs is None: if allow_none: return None else: raise ObsNotSpecifiedError("TODO: Cannot be None") if isinstance(obs, Space): obs = obs.obs else: obs = convert_to_container(obs, container=tuple) obs_not_str = tuple(o for o in obs if not isinstance(o, str)) if obs_not_str: raise ValueError("The following observables are not strings: {}".format(obs_not_str)) return obs @property def limits(self) -> ztyping.LimitsTypeReturn: """Return the limits. Returns: """ return self._limits @property def limit1d(self) -> Tuple[float, float]: """Simplified limits getter for 1 obs, 1 limit only: return the tuple(lower, upper). Returns: tuple(float, float): so :code:`lower, upper = space.limit1d` for a simple, 1 obs limit. Raises: RuntimeError: if the conditions (n_obs or n_limits) are not satisfied. """ if self.n_obs > 1: raise RuntimeError("Cannot call `limit1d, as `Space` has more than one observables: {}".format(self.n_obs)) if self.n_limits > 1: raise RuntimeError("Cannot call `limit1d, as `Space` has several limits: {}".format(self.n_limits)) limits = self.limits if limits in (None, False): limit = limits else: (lower,), (upper,) = limits limit = lower[0], upper[0] return limit @property def limit2d(self) -> Tuple[float, float, float, float]: """Simplified `limits` for exactly 2 obs, 1 limit: return the tuple(low_obs1, low_obs2, up_obs1, up_obs2). Returns: tuple(float, float, float, float): so `low_x, low_y, up_x, up_y = space.limit2d` for a single, 2 obs limit. low_x is the lower limit in x, up_x is the upper limit in x etc. Raises: RuntimeError: if the conditions (n_obs or n_limits) are not satisfied. """ if self.n_obs != 2: raise RuntimeError("Cannot call `limit2d, as `Space` has not two observables: {}".format(self.n_obs)) if self.n_limits > 1: raise RuntimeError("Cannot call `limit2d, as `Space` has several limits: {}".format(self.n_limits)) limits = self.limits if limits in (None, False): limit = limits else: (lower,), (upper,) = limits limit = *lower, *upper return limit @property def limits1d(self) -> Tuple[float]: """Simplified `.limits` for exactly 1 obs, n limits: return the tuple(low_1, ..., low_n, up_1, ..., up_n). Returns: tuple(float, float, ...): so `low_1, low_2, up_1, up_2 = space.limits1d` for several, 1 obs limits. low_1 to up_1 is the first interval, low_2 to up_2 is the second interval etc. Raises: RuntimeError: if the conditions (n_obs or n_limits) are not satisfied. """ if self.n_obs > 1: raise RuntimeError("Cannot call `limits1d, as `Space` has more than one observable: {}".format(self.n_obs)) # if self.n_limits > 1: # raise RuntimeError("Cannot call `limit1d, as `Space` has several limits: {}".format(self.n_limits)) limits = self.limits if limits in (None, False): limit = limits else: new_lower, new_upper = [], [] for lower, upper in self.iter_limits(as_tuple=True): new_lower.append(lower[0]) new_upper.append(upper[0]) new_lower = tuple(new_lower) new_upper = tuple(new_upper) limit = *new_lower, *new_upper return limit @property def lower(self) -> ztyping.LowerTypeReturn: """Return the lower limits. Returns: """ limits = self.limits if limits is None or limits is False: return limits else: return limits[0] @property def upper(self) -> ztyping.UpperTypeReturn: """Return the upper limits. Returns: """ limits = self.limits if limits is None or limits is False: return limits else: return self.limits[1] @property def n_obs(self) -> int: # TODO(naming): better name? Like rank? """Return the number of observables/axes. Returns: int >= 1 """ if self.obs is None: length = len(self.axes) else: length = len(self.obs) return length @property def n_limits(self) -> int: """The number of different limits. Returns: int >= 1 """ if self.lower is None or self.lower is False: return 0 return len(self.lower) @property def obs(self) -> ztyping.ObsTypeReturn: """The observables ("axes with str")the space is defined in. Returns: """ return self._obs @property def axes(self) -> ztyping.AxesTypeReturn: """The axes ("obs with int") the space is defined in. Returns: """ return self._axes
[docs] def get_axes(self, obs: ztyping.ObsTypeInput = None, as_dict: bool = False, autofill: bool = False) -> Union[ztyping.AxesTypeReturn, Dict[str, int]]: """Return the axes corresponding to the `obs` (or all if None). Args: obs (): as_dict (bool): If True, returns a ordered dictionary with {obs: axis} autofill (bool): If True and the axes are not specified, automatically fill them with the default numbering and return (not setting them). Returns: Tuple, OrderedDict Raises: ValueError: if the requested `obs` do not match with the one defined in the range AxesNotSpecifiedError: If the axes in this :py:class:`~zfit.Space` have not been specified. """ # check input obs = self._check_convert_input_obs(obs=obs, allow_none=True) axes = self.axes if axes is None: if autofill: axes = tuple(range(self.n_obs)) else: raise AxesNotSpecifiedError("The axes have not been specified") if obs is not None: try: axes = tuple(axes[self.obs.index(o)] for o in obs) except KeyError: missing_obs = set(obs) - set(self.obs) raise ValueError("The requested observables {mis} are not contained in the defined " "observables {obs}".format(mis=missing_obs, obs=self.obs)) else: obs = self.obs if as_dict: axes = OrderedDict((o, ax) for o, ax in zip(obs, axes)) return axes
[docs] def iter_limits(self, as_tuple: bool = True) -> ztyping._IterLimitsTypeReturn: """Return the limits, either as :py:class:`~zfit.Space` objects or as pure limits-tuple. This makes iterating over limits easier: `for limit in space.iter_limits()` allows to, for example, pass `limit` to a function that can deal with simple limits only or if `as_tuple` is True the `limit` can be directly used to calculate something. Example: .. code:: python for lower, upper in space.iter_limits(as_tuple=True): integrals = integrate(lower, upper) # calculate integral integral = sum(integrals) Returns: List[:py:class:`~zfit.Space`] or List[limit,...]: """ if not self.limits: raise LimitsNotSpecifiedError("Space does not have limits, cannot iterate over them.") if as_tuple: return tuple(zip(self.lower, self.upper)) else: space_objects = [] for lower, upper in self.iter_limits(as_tuple=True): if not (lower is None or lower is False): lower = (lower,) upper = (upper,) limit = lower, upper else: limit = lower space = type(self)._from_any(obs=self.obs, axes=self.axes, limits=limit) space_objects.append(space) return tuple(space_objects)
[docs] def with_limits(self, limits: ztyping.LimitsTypeInput, name: Optional[str] = None) -> "zfit.Space": """Return a copy of the space with the new `limits` (and the new `name`). Args: limits (): name (str): Returns: :py:class:`~zfit.Space` """ new_space = self.copy(limits=limits, name=name) return new_space
[docs] def with_obs(self, obs: ztyping.ObsTypeInput) -> "zfit.Space": """Sort by `obs` and return the new instance. Args: obs (): Returns: :py:class:`~zfit.Space` """ if obs is None or obs == self.obs: return self obs = self._convert_obs_to_str(obs) new_indices = self.get_reorder_indices(obs=obs) new_space = self.reorder_by_indices(indices=new_indices) return new_space
[docs] def with_axes(self, axes: ztyping.AxesTypeInput) -> "zfit.Space": """Sort by `obs` and return the new instance. Args: axes (): Returns: :py:class:`~zfit.Space` """ # TODO: what if self.axes is None? Just add them? if axes is None or axes == self.axes: return self axes = self._convert_axes_to_int(axes) new_indices = self.get_reorder_indices(axes=axes) new_space = self.copy() new_space.reorder_by_indices(indices=new_indices) return new_space
[docs] def get_reorder_indices(self, obs: ztyping.ObsTypeInput = None, axes: ztyping.AxesTypeInput = None) -> Tuple[int]: """Indices that would order `self.obs` as `obs` respectively `self.axes` as `axes`. Args: obs (): axes (): Returns: """ obs_none = obs is None axes_none = axes is None obs_is_defined = self.obs is not None and not obs_none axes_is_defined = self.axes is not None and not axes_none if not (obs_is_defined or axes_is_defined): raise ValueError("Neither the `obs` nor `axes` are defined.") if obs_is_defined: old, new = self.obs, [o for o in obs if o in self.obs] else: old, new = self.axes, [a for a in axes if a in self.axes] new_indices = _reorder_indices(old=old, new=new) return new_indices
[docs] def reorder_by_indices(self, indices: Tuple[int]): """Return a :py:class:`~zfit.Space` reordered by the indices. Args: indices (): """ new_space = self.copy() new_space._reorder_limits(indices=indices) new_space._reorder_axes(indices=indices) new_space._reorder_obs(indices=indices) return new_space
def _reorder_limits(self, indices: Tuple[int], inplace: bool = True) -> ztyping.LimitsTypeReturn: limits = self.limits if limits is not None and limits is not False: lower, upper = limits lower = tuple(tuple(lower[i] for i in indices) for lower in lower) upper = tuple(tuple(upper[i] for i in indices) for upper in upper) limits = lower, upper if inplace: self._limits = limits return limits def _reorder_axes(self, indices: Tuple[int], inplace: bool = True) -> ztyping.AxesTypeReturn: axes = self.axes if axes is not None: axes = tuple(axes[i] for i in indices) if inplace: self._axes = axes return axes def _reorder_obs(self, indices: Tuple[int], inplace: bool = True) -> ztyping.ObsTypeReturn: obs = self.obs if obs is not None: obs = tuple(obs[i] for i in indices) if inplace: self._obs = obs return obs
[docs] def get_obs_axes(self, obs: ztyping.ObsTypeInput = None, axes: ztyping.AxesTypeInput = None): if self.obs is None: raise ObsNotSpecifiedError("Obs not specified, cannot create `obs_axes`") if self.axes is None: raise AxesNotSpecifiedError("Axes not specified, cannot create `obs_axes`") obs = self._check_convert_input_obs(obs, allow_none=True) axes = self._check_convert_input_axes(axes, allow_none=True) if obs is not None and axes is not None: raise OverdefinedError("Cannot use `obs` and `axes` to define which subset to access.") obs = self.obs if obs is None else obs axes = self.axes if axes is None else axes # only membership testing below obs = frozenset(obs) axes = frozenset(axes) # create obs_axes dict obs_axes = OrderedDict((o, ax) for o, ax in self.obs_axes.items() if o in obs or ax in axes) return obs_axes
@property def obs_axes(self): # TODO(Mayou36): what if axes is None? return OrderedDict((o, ax) for o, ax in zip(self.obs, self.axes)) def _set_obs_axes(self, obs_axes: Union[ztyping.OrderedDict[str, int], Dict[str, int]], ordered: bool = False, allow_subset=False): """(Reorder) set the observables and the `axes`. Temporarily if used with a context manager. Args: obs_axes (OrderedDict[str, int]): An (ordered) dict with {obs: axes}. allow_subset (): Returns: """ if ordered and not isinstance(obs_axes, OrderedDict): raise IntentionNotUnambiguousError("`ordered` is True but not an `OrderedDict` was given." "Error due to safety (in Python <3.7, dicts are not guaranteed to be" "ordered).") tmp_obs = self.obs if self.obs is not None else obs_axes.keys() self_obs_set = frozenset(tmp_obs) tmp_axes = self.axes if self.axes is not None else obs_axes.values() self_axes_set = frozenset(tmp_axes) if ordered: if self.obs is not None: # if not frozenset(obs_axes.keys()) <= self_obs_set: # raise ValueError("TODO observables not contained") if not allow_subset and frozenset(obs_axes.keys()) < self_obs_set: raise ValueError("subset not allowed but `obs` is only a subset of `self.obs`") permutation_index = tuple( self.obs.index(o) for o in obs_axes if o in self_obs_set) # the future index of the space self_axes_set = set(obs_axes[o] for o in self.obs if o in obs_axes) elif self.axes is not None: if not frozenset(obs_axes.values()) <= self_axes_set: raise ValueError("TODO axes not contained") if not allow_subset and frozenset(obs_axes.values()) < self_axes_set: raise ValueError("subset not allowed but `axes` is only a subset of `self.axes`") permutation_index = tuple( self.axes.index(ax) for ax in obs_axes.values() if ax in self_axes_set) # the future index of the space self_obs_set = set(o for o, ax in obs_axes.items() if ax in self.axes) else: assert False, "This should never be reached." limits = self._reorder_limits(indices=permutation_index, inplace=False) obs = tuple(o for o in obs_axes.keys() if o in self_obs_set) axes = tuple(ax for ax in obs_axes.values() if ax in self_axes_set) else: if self.obs is not None: if not allow_subset and frozenset(obs_axes.keys()) < self_obs_set: raise ValueError("subset not allowed TODO") obs = self.obs axes = tuple(obs_axes[o] for o in obs) elif self.axes is not None: if not allow_subset and frozenset(obs_axes.values()) < self_axes_set: raise ValueError("subset not allowed TODO") axes = self.axes axes_obs = {v: k for k, v in obs_axes.items()} obs = tuple(axes_obs[ax] for ax in axes) else: raise ValueError("Either `obs` or `axes` have to be specified if the `obs_axes` dict" "is not ordered and `ordered` is False.") limits = self.limits value = limits, obs, axes def setter(arguments): limits, obs, axes = arguments self._obs = obs self._axes = axes self._check_set_limits(limits=limits) def getter(): return self.limits, self.obs, self.axes return TemporarilySet(value=value, setter=setter, getter=getter)
[docs] def with_obs_axes(self, obs_axes: Union[ztyping.OrderedDict[str, int], Dict[str, int]], ordered: bool = False, allow_subset=False) -> "zfit.Space": """Return a new :py:class:`~zfit.Space` with reordered observables and set the `axes`. Args: obs_axes (OrderedDict[str, int]): An ordered dict with {obs: axes}. ordered (bool): If True (and the `obs_axes` is an `OrderedDict`), the allow_subset (): Returns: :py:class:`~zfit.Space`: """ new_space = type(self)._from_any(obs=self.obs, axes=self.axes, limits=self.limits) new_space._set_obs_axes(obs_axes=obs_axes, ordered=ordered, allow_subset=allow_subset) return new_space
[docs] def with_autofill_axes(self, overwrite: bool = False) -> "zfit.Space": """Return a :py:class:`~zfit.Space` with filled axes corresponding to range(len(n_obs)). Args: overwrite (bool): If `self.axes` is not None, replace the axes with the autofilled ones. If axes is already set, don't do anything if `overwrite` is False. Returns: :py:class:`~zfit.Space` """ if self.axes is None or overwrite: new_axes = tuple(range(self.n_obs)) new_space = self.copy(axes=new_axes) else: new_space = self return new_space
[docs] def area(self) -> float: """Return the total area of all the limits and axes. Useful, for example, for MC integration.""" return sum(self.iter_areas(rel=False))
[docs] def iter_areas(self, rel: bool = False) -> Tuple[float, ...]: """Return the areas of each interval Args: rel (bool): If True, return the relative fraction of each interval Returns: Tuple[float]: """ areas = self._calculate_areas(limits=self.limits) if rel: areas = np.array(areas) areas /= areas.sum() areas = tuple(areas) return areas
@staticmethod @functools.lru_cache() def _calculate_areas(limits) -> Tuple[float]: areas = tuple(float(np.prod(np.array(up) - np.array(low))) for low, up in zip(*limits)) return areas
[docs] def get_subspace(self, obs: ztyping.ObsTypeInput = None, axes: ztyping.AxesTypeInput = None, name: Optional[str] = None) -> "zfit.Space": """Create a :py:class:`~zfit.Space` consisting of only a subset of the `obs`/`axes` (only one allowed). Args: obs (str, Tuple[str]): axes (int, Tuple[int]): name (): Returns: """ if obs is not None and axes is not None: raise ValueError("Cannot specify `obs` *and* `axes` to get subspace.") if axes is None and obs is None: raise ValueError("Either `obs` or `axes` has to be specified and not None") # try to use observables to get index obs = self._check_convert_input_obs(obs=obs, allow_none=True) if obs is not None: try: sub_index = tuple(self.obs.index(o) for o in obs) except ValueError as error: print("Original message: ", error) raise KeyError("Cannot get subspace from `obs` {} as this observables are not defined" "in this space. Only {} is defined.".format(set(obs) - set(self.obs), set(self.obs))) except AttributeError: # `obs` is None -> has not attribute `index` raise ObsNotSpecifiedError("Observables have not been specified in this space.") # try to use axes to get index axes = self._check_convert_input_axes(axes=axes, allow_none=True) if axes is not None: try: sub_index = tuple(self.axes.index(ax) for ax in axes) except ValueError as error: print("Original message: ", error) raise KeyError("Cannot get subspace from `axes` {} as this axes are not defined" "in this space. Only the following axes are {}" "".format(set(axes) - set(self.axes), self.axes)) except AttributeError: raise AxesNotSpecifiedError("Axes have not been specified for this space.") sub_obs = self.obs if self.obs is None else tuple(self.obs[i] for i in sub_index) sub_axes = self.axes if self.axes is None else tuple(self.axes[i] for i in sub_index) # use index to get limits limits = self.limits if limits is None or limits is False: sub_limits = limits else: lower, upper = limits sub_lower = tuple(tuple(lim[i] for i in sub_index) for lim in lower) sub_upper = tuple(tuple(lim[i] for i in sub_index) for lim in upper) sub_limits = sub_lower, sub_upper new_space = type(self)._from_any(obs=sub_obs, axes=sub_axes, limits=sub_limits, name=name) return new_space
# Operators
[docs] def copy(self, name: Optional[str] = None, **overwrite_kwargs) -> "zfit.Space": """Create a new :py:class:`~zfit.Space` using the current attributes and overwriting with `overwrite_overwrite_kwargs`. Args: name (str): The new name. If not given, the new instance will be named the same as the current one. **overwrite_kwargs (): Returns: :py:class:`~zfit.Space` """ name = self.name if name is None else name kwargs = {'name': name, 'limits': self.limits, 'axes': self.axes, 'obs': self.obs} kwargs.update(overwrite_kwargs) if set(overwrite_kwargs) - set(kwargs): raise KeyError("Not usable keys in `overwrite_kwargs`: {}".format(set(overwrite_kwargs) - set(kwargs))) new_space = type(self)._from_any(**kwargs) return new_space
def __le__(self, other): if not isinstance(other, type(self)): return NotImplemented axes_not_none = self.axes is not None and other.axes is not None obs_not_none = self.obs is not None and other.obs is not None if not (axes_not_none or obs_not_none): # if both are None return False if axes_not_none: if set(self.axes) != set(other.axes): return False if obs_not_none: if set(self.obs) != set(other.obs): return False # check limits if self.limits is None: if other.limits is None: return True else: return False elif self.limits is False: if other.limits is False: return True else: return False reorder_indices = other.get_reorder_indices(obs=self.obs, axes=self.axes) other = other.reorder_by_indices(reorder_indices) # check explicitly if they match # for each limit in self, find another matching in other for lower, upper in self.iter_limits(as_tuple=True): limit_is_le = False for other_lower, other_upper in other.iter_limits(as_tuple=True): # each entry *has to* match the entry of the other limit, otherwise it's not the same for low, up, other_low, other_up in zip(lower, upper, other_lower, other_upper): axis_le = 0 # False axis_le += other_low == low and up == other_up # TODO: approx limit comparison? axis_le += other_low == low and other_up is ANY_UPPER # TODO: approx limit # comparison? axis_le += other_low is ANY_LOWER and up == other_up # TODO: approx limit # comparison? axis_le += other_low is ANY_LOWER and other_up is ANY_UPPER if not axis_le: # if not the same, don't test other dims break else: limit_is_le = True # no break -> all axes coincide if not limit_is_le: # for this `limit`, no other_limit matched return False return True
[docs] def add(self, other: ztyping.SpaceOrSpacesTypeInput): """Add the limits of the spaces. Only works for the same obs. In case the observables are different, the order of the first space is taken. Args: other (:py:class:`~zfit.Space`): Returns: :py:class:`~zfit.Space`: """ other = convert_to_container(other, container=list) new_space = add_spaces([self] + other) return new_space
[docs] def combine(self, other: ztyping.SpaceOrSpacesTypeInput) -> ZfitSpace: """Combine spaces with different obs (but consistent limits). Args: other (:py:class:`~zfit.Space`): Returns: :py:class:`~zfit.Space`: """ other = convert_to_container(other, container=list) new_space = combine_spaces([self] + other) return new_space
def __add__(self, other): if not isinstance(other, ZfitSpace): raise TypeError("Cannot add a {} and a {}".format(type(self), type(other))) return self.add(other) def __mul__(self, other): if not isinstance(other, ZfitSpace): raise TypeError("Cannot combine a {} and a {}".format(type(self), type(other))) return self.combine(other) def __ge__(self, other): return other.__le__(self) def __eq__(self, other): if not isinstance(self, type(other)): # TODO(Mayou36): what is a proper comparison? return NotImplemented is_eq = True is_eq *= self.obs == other.obs is_eq *= self.axes == other.axes or self.axes is None or other.axes is None is_eq *= self.limits == other.limits return bool(is_eq) def __hash__(self): lower = self.lower upper = self.upper if not (lower is None or lower is False): # we want to be non-restrictive: it's just a hash, not the __eq__ lower = frozenset(frozenset(lim) for lim in lower) if not (upper is None or upper is False): upper = frozenset(frozenset(lim) for lim in upper) return hash((lower, upper))
[docs]def convert_to_space(obs: Optional[ztyping.ObsTypeInput] = None, axes: Optional[ztyping.AxesTypeInput] = None, limits: Optional[ztyping.LimitsTypeInput] = None, *, overwrite_limits: bool = False, one_dim_limits_only: bool = True, simple_limits_only: bool = True) -> Union[None, Space, bool]: """Convert *limits* to a :py:class:`~zfit.Space` object if not already None or False. Args: obs (Union[Tuple[float, float], :py:class:`~zfit.Space`]): limits (): axes (): overwrite_limits (bool): If `obs` or `axes` is a :py:class:`~zfit.Space` _and_ `limits` are given, return an instance of :py:class:`~zfit.Space` with the new limits. If the flag is `False`, the `limits` argument will be ignored if one_dim_limits_only (bool): simple_limits_only (bool): Returns: Union[:py:class:`~zfit.Space`, False, None]: Raises: OverdefinedError: if `obs` or `axes` is a :py:class:`~zfit.Space` and `axes` respectively `obs` is not `None`. """ space = None # Test if already `Space` and handle if isinstance(obs, Space): if axes is not None: raise OverdefinedError("if `obs` is a `Space`, `axes` cannot be defined.") space = obs elif isinstance(axes, Space): if obs is not None: raise OverdefinedError("if `axes` is a `Space`, `obs` cannot be defined.") space = axes elif isinstance(limits, Space): return limits if space is not None: # set the limits if given if limits is not None and (overwrite_limits or space.limits is None): if isinstance(limits, Space): # figure out if compatible if limits is `Space` if not (limits.obs == space.obs or (limits.axes == space.axes and limits.obs is None and space.obs is None)): raise IntentionNotUnambiguousError( "`obs`/`axes` is a `Space` as well as the `limits`, but the " "obs/axes of them do not match") else: limits = limits.limits space = space.with_limits(limits=limits) return space # space is None again if not (obs is None and axes is None): # check if limits are allowed space = Space._from_any(obs=obs, axes=axes, limits=limits) # create and test if valid if one_dim_limits_only and space.n_obs > 1 and space.limits: raise LimitsUnderdefinedError( "Limits more sophisticated than 1-dim cannot be auto-created from tuples. Use `Space` instead.") if simple_limits_only and space.limits and space.n_limits > 1: raise LimitsUnderdefinedError("Limits with multiple limits cannot be auto-created" " from tuples. Use `Space` instead.") return space
def _reorder_indices(old: Union[List, Tuple], new: Union[List, Tuple]) -> Tuple[int]: new_indices = tuple(old.index(o) for o in new) return new_indices
[docs]def no_norm_range(func): """Decorator: Catch the 'norm_range' kwargs. If not None, raise NormRangeNotImplementedError.""" parameters = inspect.signature(func).parameters keys = list(parameters.keys()) if 'norm_range' in keys: norm_range_index = keys.index('norm_range') else: norm_range_index = None @functools.wraps(func) def new_func(*args, **kwargs): norm_range = kwargs.get('norm_range') if isinstance(norm_range, Space): norm_range_not_false = not (norm_range.limits is None or norm_range.limits is False) else: norm_range_not_false = not (norm_range is None or norm_range is False) if norm_range_index is not None: norm_range_is_arg = len(args) > norm_range_index else: norm_range_is_arg = False kwargs.pop('norm_range', None) # remove if in signature (= norm_range_index not None) if norm_range_not_false or norm_range_is_arg: raise NormRangeNotImplementedError() else: return func(*args, **kwargs) return new_func
[docs]def no_multiple_limits(func): """Decorator: Catch the 'limits' kwargs. If it contains multiple limits, raise MultipleLimitsNotImplementedError.""" parameters = inspect.signature(func).parameters keys = list(parameters.keys()) if 'limits' in keys: limits_index = keys.index('limits') else: return func # no limits as parameters -> no problem @functools.wraps(func) def new_func(*args, **kwargs): limits_is_arg = len(args) > limits_index if limits_is_arg: limits = args[limits_index] else: limits = kwargs['limits'] if limits.n_limits > 1: raise MultipleLimitsNotImplementedError else: return func(*args, **kwargs) return new_func
[docs]def supports(*, norm_range: bool = False, multiple_limits: bool = False) -> Callable: """Decorator: Add (mandatory for some methods) on a method to control what it can handle. If any of the flags is set to False, it will check the arguments and, in case they match a flag (say if a *norm_range* is passed while the *norm_range* flag is set to `False`), it will raise a corresponding exception (in this example a `NormRangeNotImplementedError`) that will be catched by an earlier function that knows how to handle things. Args: norm_range (bool): If False, no norm_range argument will be passed through resp. will be `None` multiple_limits (bool): If False, only simple limits are to be expected and no iteration is therefore required. """ decorator_stack = [] if not multiple_limits: decorator_stack.append(no_multiple_limits) if not norm_range: decorator_stack.append(no_norm_range) def create_deco_stack(func): for decorator in reversed(decorator_stack): func = decorator(func) func.__wrapped__ = supports return func return create_deco_stack
[docs]def convert_to_obs_str(obs): """Convert `obs` to the list of obs, also if it is a :py:class:`~zfit.Space`. """ obs = convert_to_container(value=obs, container=tuple) new_obs = [] for ob in obs: if isinstance(ob, Space): new_obs.extend(ob.obs) else: new_obs.append(ob) return new_obs