Source code for zfit.core.integration

"""
This module contains functions for the numeric as well as the analytic (partial) integration.
"""

#  Copyright (c) 2019 zfit

import collections
import numpy as np

import tensorflow as tf


import tensorflow_probability as tfp
from typing import Callable, Optional, Union, Type, Tuple, List

import zfit
from zfit import ztf
from zfit.core.dimension import BaseDimensional
from zfit.core.interfaces import ZfitData, ZfitSpace, ZfitModel
from zfit.util.container import convert_to_container
from zfit.util.temporary import TemporarilySet
from ..util import ztyping
from ..util.exception import DueToLazynessNotImplementedError
from .limits import convert_to_space, Space, supports
from ..settings import ztypes


[docs]@supports() def auto_integrate(func, limits, n_axes, x=None, method="AUTO", dtype=ztypes.float, mc_sampler=tfp.mcmc.sample_halton_sequence, mc_options=None): if method == "AUTO": # TODO unfinished, other methods? method = "mc" # TODO method if method.lower() == "mc": mc_options = mc_options or {} draws_per_dim = mc_options['draws_per_dim'] integral = mc_integrate(x=x, func=func, limits=limits, n_axes=n_axes, method=method, dtype=dtype, mc_sampler=mc_sampler, draws_per_dim=draws_per_dim, importance_sampling=None) return integral
# TODO implement numerical integration method
[docs]def numeric_integrate(): """Integrate `func` using numerical methods.""" integral = None return integral
[docs]def mc_integrate(func: Callable, limits: ztyping.LimitsType, axes: Optional[ztyping.AxesTypeInput] = None, x: Optional[ztyping.XType] = None, n_axes: Optional[int] = None, draws_per_dim: int = 20000, method: str = None, dtype: Type = ztypes.float, mc_sampler: Callable = tfp.mcmc.sample_halton_sequence, importance_sampling: Optional[Callable] = None) -> tf.Tensor: """Monte Carlo integration of `func` over `limits`. Args: func (callable): The function to be integrated over limits (:py:class:`~zfit.Space`): The limits of the integral axes (tuple(int)): The row to integrate over. None means integration over all value x (numeric): If a partial integration is performed, this are the value where x will be evaluated. n_axes (int): the number of total dimensions (old?) draws_per_dim (int): How many random points to draw per dimensions method (str): Which integration method to use dtype (dtype): |dtype_arg_descr| mc_sampler (callable): A function that takes one argument (`n_draws` or similar) and returns random value between 0 and 1. importance_sampling (): Returns: numerical: the integral """ if axes is not None and n_axes is not None: raise ValueError("Either specify axes or n_axes") limits = convert_to_space(limits) axes = limits.axes partial = (axes is not None) and (x is not None) # axes, value can be tensors if axes is not None and n_axes is None: n_axes = len(axes) if n_axes is not None and axes is None: axes = tuple(range(n_axes)) lower, upper = limits.limits if np.infty in upper[0] or -np.infty in lower[0]: raise ValueError("MC integration does (currently) not support unbound limits (np.infty) as given here:" "\nlower: {}, upper: {}".format(lower, upper)) lower = ztf.convert_to_tensor(lower, dtype=dtype) upper = ztf.convert_to_tensor(upper, dtype=dtype) n_samples = draws_per_dim chunked_normalization = zfit.run.chunksize < n_samples # chunked_normalization = True if chunked_normalization: if partial: raise DueToLazynessNotImplementedError("This feature is not yet implemented: needs new Datasets") n_chunks = int(np.ceil(n_samples / zfit.run.chunksize)) chunksize = int(np.ceil(n_samples / n_chunks)) print("starting normalization with {} chunks and a chunksize of {}".format(n_chunks, chunksize)) avg = normalization_chunked(func=func, n_axes=n_axes, dtype=dtype, x=x, num_batches=n_chunks, batch_size=chunksize, space=limits) else: # TODO: deal with n_obs properly? samples_normed = mc_sampler(dim=n_axes, num_results=n_samples, dtype=dtype) # samples_normed = tf.reshape(samples_normed, shape=(n_vals, int(n_samples / n_vals), n_axes)) # samples_normed = tf.expand_dims(samples_normed, axis=0) samples = samples_normed * (upper - lower) + lower # samples is [0, 1], stretch it # samples = tf.transpose(samples, perm=[2, 0, 1]) if partial: # TODO(Mayou36): shape of partial integral? data_obs = x.obs new_obs = [] x = x.value() value_list = [] index_samples = 0 index_values = 0 if len(x.shape) == 1: x = tf.expand_dims(x, axis=1) for i in range(n_axes + x.shape[-1]): if i in axes: new_obs.append(limits.obs[index_samples]) value_list.append(samples[:, index_samples]) index_samples += 1 else: new_obs.append(data_obs[index_values]) value_list.append(tf.expand_dims(x[:, index_values], axis=1)) index_values += 1 value_list = [tf.cast(val, dtype=dtype) for val in value_list] x = PartialIntegralSampleData(sample=value_list, space=Space(obs=new_obs)) else: x = samples # convert rnd samples with value to feedable vector reduce_axis = 1 if partial else None avg = tf.reduce_mean(input_tensor=func(x), axis=reduce_axis) # avg = tfp.monte_carlo.expectation(f=func, samples=x, axis=reduce_axis) # TODO: importance sampling? # avg = tfb.monte_carlo.expectation_importance_sampler(f=func, samples=value,axis=reduce_axis) integral = avg * tf.cast(ztf.convert_to_tensor(limits.area()), dtype=avg.dtype) return integral
# return ztf.to_real(integral, dtype=dtype) # TODO(Mayou36): Make more flexible for sampling
[docs]def normalization_nograd(func, n_axes, batch_size, num_batches, dtype, space, x=None, shape_after=()): upper, lower = space.limits lower = ztf.convert_to_tensor(lower, dtype=dtype) upper = ztf.convert_to_tensor(upper, dtype=dtype) def body(batch_num, mean): start_idx = batch_num * batch_size end_idx = start_idx + batch_size indices = tf.range(start_idx, end_idx, dtype=tf.int32) samples_normed = tfp.mcmc.sample_halton_sequence(n_axes, # num_results=batch_size, sequence_indices=indices, dtype=dtype, randomized=False) # halton_sample = tf.random_uniform(shape=(n_axes, batch_size), dtype=dtype) samples_normed.set_shape((batch_size, n_axes)) samples_normed = tf.expand_dims(samples_normed, axis=0) samples = samples_normed * (upper - lower) + lower func_vals = func(samples) if shape_after == (): reduce_axis = None else: reduce_axis = 1 if len(func_vals.shape) == 1: func_vals = tf.expand_dims(func_vals, -1) batch_mean = tf.reduce_mean(input_tensor=func_vals, axis=reduce_axis) # if there are gradients # batch_mean = tf.reduce_mean(sample) # batch_mean = tf.guarantee_const(batch_mean) # with tf.control_dependencies([batch_mean]): err_weight = 1 / tf.cast(batch_num + 1, dtype=tf.float64) # err_weight /= err_weight + 1 # print_op = tf.print(batch_mean) do_print = False if do_print: deps = [tf.print(batch_num + 1)] else: deps = [] with tf.control_dependencies(deps): return batch_num + 1, mean + err_weight * (batch_mean - mean) cond = lambda batch_num, _: batch_num < num_batches initial_mean = tf.constant(0, shape=shape_after, dtype=dtype) initial_body_args = (0, initial_mean) _, final_mean = tf.while_loop(cond=cond, body=body, loop_vars=initial_body_args, parallel_iterations=1, swap_memory=False, back_prop=True) # def normalization_grad(x): return final_mean
[docs]def normalization_chunked(func, n_axes, batch_size, num_batches, dtype, space, x=None, shape_after=()): x_is_none = x is None @tf.custom_gradient def normalization_func(x): if x_is_none: x = None value = normalization_nograd(func=func, n_axes=n_axes, batch_size=batch_size, num_batches=num_batches, dtype=dtype, space=space, x=x, shape_after=shape_after) def grad_fn(dy, variables=None): if variables is None: return dy, None normed_grad = normalization_nograd(func=lambda x: tf.stack(tf.gradients(ys=func(x), xs=variables)), n_axes=n_axes, batch_size=batch_size, num_batches=num_batches, dtype=dtype, space=space, x=x, shape_after=(len(variables),)) return dy, tf.unstack(normed_grad) return value, grad_fn fake_x = 1 if x_is_none else x return normalization_func(fake_x)
def chunked_average(func, x, num_batches, batch_size, space, mc_sampler): avg = normalization_nograd()
[docs]def chunked_average(func, x, num_batches, batch_size, space, mc_sampler): lower, upper = space.limits fake_resource_var = tf.compat.v1.get_variable("fake_hack_ResVar_for_custom_gradient", initializer=ztf.constant(4242.)) fake_x = ztf.constant(42.) * fake_resource_var @tf.custom_gradient def dummy_func(fake_x): # to make working with custom_gradient if x is not None: raise DueToLazynessNotImplementedError("partial not yet implemented") def body(batch_num, mean): if mc_sampler == tfp.mcmc.sample_halton_sequence: start_idx = batch_num * batch_size end_idx = start_idx + batch_size indices = tf.range(start_idx, end_idx, dtype=tf.int32) sample = mc_sampler(space.n_obs, sequence_indices=indices, dtype=ztypes.float, randomized=False) else: sample = mc_sampler(shape=(batch_size, space.n_obs), dtype=ztypes.float) sample = tf.guarantee_const(sample) sample = (np.array(upper[0]) - np.array(lower[0])) * sample + lower[0] sample = tf.transpose(a=sample) sample = func(sample) sample = tf.guarantee_const(sample) batch_mean = tf.reduce_mean(input_tensor=sample) batch_mean = tf.guarantee_const(batch_mean) # with tf.control_dependencies([batch_mean]): err_weight = 1 / tf.cast(batch_num + 1, dtype=tf.float64) # err_weight /= err_weight + 1 # print_op = tf.print(batch_mean) do_print = False if do_print: deps = [tf.print(batch_num + 1, mean, err_weight * (batch_mean - mean))] else: deps = [] with tf.control_dependencies(deps): return batch_num + 1, mean + err_weight * (batch_mean - mean) # return batch_num + 1, tf.guarantee_const(mean + err_weight * (batch_mean - mean)) cond = lambda batch_num, _: batch_num < num_batches initial_mean = tf.convert_to_tensor(value=0, dtype=ztypes.float) _, final_mean = tf.while_loop(cond=cond, body=body, loop_vars=(0, initial_mean), parallel_iterations=1, swap_memory=False, back_prop=False, maximum_iterations=num_batches) def dummy_grad_with_var(dy, variables=None): raise DueToLazynessNotImplementedError("Who called me? Mayou36") if variables is None: raise DueToLazynessNotImplementedError("Is this needed? Why? It's not a NN. Please make an issue.") def dummy_grad_func(x): values = func(x) if variables: gradients = tf.gradients(ys=values, xs=variables, grad_ys=dy) else: gradients = None return gradients return chunked_average(func=dummy_grad_func, x=x, num_batches=num_batches, batch_size=batch_size, space=space, mc_sampler=mc_sampler) def dummy_grad_without_var(dy): return dummy_grad_with_var(dy=dy, variables=None) do_print = False if do_print: deps = [tf.print(final_mean)] else: deps = [] with tf.control_dependencies(deps): return tf.guarantee_const(final_mean), dummy_grad_with_var try: return dummy_func(fake_x) except TypeError: return dummy_func(fake_x)
[docs]class PartialIntegralSampleData(BaseDimensional, ZfitData): def __init__(self, sample: List[tf.Tensor], space: ZfitSpace): """Takes a list of tensors and "fakes" a dataset. Useful for tensors with non-matching shapes. Args: sample (List[tf.Tensor]): space (): """ if not isinstance(sample, list): raise TypeError("Sample has to be a list of tf.Tensors") super().__init__() self._space = space self._sample = sample self._reorder_indices_list = list(range(len(sample))) @property def weights(self): raise NotImplementedError("Weights for PartialIntegralsampleData are not implemented. Are they needed?") @property def space(self) -> "zfit.Space": return self._space
[docs] def sort_by_axes(self, axes, allow_superset: bool = False): axes = convert_to_container(axes) new_reorder_list = [self._reorder_indices_list[self.space.axes.index(ax)] for ax in axes] value = self.space.with_axes(axes=axes), new_reorder_list getter = lambda: (self.space, self._reorder_indices_list) def setter(value): self._space, self._reorder_indices_list = value return TemporarilySet(value=value, getter=getter, setter=setter)
[docs] def sort_by_obs(self, obs, allow_superset: bool = False): obs = convert_to_container(obs) new_reorder_list = [self._reorder_indices_list[self.space.obs.index(ob)] for ob in obs] value = self.space.with_obs(obs=obs), new_reorder_list getter = lambda: (self.space, self._reorder_indices_list) def setter(value): self._space, self._reorder_indices_list = value return TemporarilySet(value=value, getter=getter, setter=setter)
[docs] def value(self, obs: List[str] = None): return self
[docs] def unstack_x(self, always_list=False): unstacked_x = [self._sample[i] for i in self._reorder_indices_list] if len(unstacked_x) == 1 and not always_list: unstacked_x = unstacked_x[0] return unstacked_x
[docs]class AnalyticIntegral: def __init__(self, *args, **kwargs): """Hold analytic integrals and manage their dimensions, limits etc.""" super(AnalyticIntegral, self).__init__(*args, **kwargs) self._integrals = collections.defaultdict(dict)
[docs] def get_max_axes(self, limits: ztyping.LimitsType, axes: ztyping.AxesTypeInput = None) -> Tuple[int]: """Return the maximal available axes to integrate over analytically for given limits Args: limits (:py:class:`~zfit.Space`): The integral function will be able to integrate over this limits axes (tuple): The axes over which (or over a subset) it will integrate Returns: Tuple[int]: """ if not isinstance(limits, Space): raise TypeError("`limits` have to be a `Space`") # limits = convert_to_space(limits=limits) return self._get_max_axes_limits(limits, out_of_axes=limits.axes)[0] # only axes
def _get_max_axes_limits(self, limits, out_of_axes): # TODO: automatic caching? but most probably not relevant if out_of_axes: out_of_axes = frozenset(out_of_axes) implemented_axes = frozenset(d for d in self._integrals.keys() if d <= out_of_axes) else: implemented_axes = set(self._integrals.keys()) implemented_axes = sorted(implemented_axes, key=len, reverse=True) # iter through biggest first for axes in implemented_axes: limits_matched = [] for lim, integ in self._integrals[axes].items(): if integ.limits >= limits: limits_matched.append(lim) if limits_matched: # one or more integrals available return tuple(sorted(axes)), limits_matched return (), () # no integral available for this axes
[docs] def get_max_integral(self, limits: ztyping.LimitsType, axes: ztyping.AxesTypeInput = None) -> Union[None, "Integral"]: """Return the integral over the `limits` with `axes` (or a subset of them). Args: limits (:py:class:`~zfit.Space`): axes (Tuple[int]): Returns: Union[None, Integral]: Return a callable that integrated over the given limits. """ limits = convert_to_space(limits=limits, axes=axes) axes, limits = self._get_max_axes_limits(limits=limits, out_of_axes=axes) axes = frozenset(axes) integrals = [self._integrals[axes][lim] for lim in limits] integral_fn = max(integrals, key=lambda l: l.priority, default=None) return integral_fn
[docs] def register(self, func: Callable, limits: ztyping.LimitsType, priority: int = 50, *, supports_norm_range: bool = False, supports_multiple_limits: bool = False) -> None: """Register an analytic integral. Args: func (callable): The integral function. Takes 1 argument. axes (tuple): |dims_arg_descr| limits (:py:class:`~zfit.Space`): |limits_arg_descr| `Limits` can be None if `func` works for any possible limits priority (int): If two or more integrals can integrate over certain limits, the one with the higher priority is taken (usually around 0-100). supports_norm_range (bool): If True, norm_range will (if needed) be given to `func` as an argument. supports_multiple_limits (bool): If True, multiple limits may be given as an argument to `func`. """ # if limits is False: # raise ValueError("Limits for the analytical integral have to be specified or None (for any limits).") # if limits is None: # limits = tuple((Space.ANY_LOWER, Space.ANY_UPPER) for _ in range(len(axes))) # limits = convert_to_space(axes=axes, limits=limits) # else: # limits = convert_to_space(axes=self.axes, limits=limits) # limits = limits.get_limits() if not isinstance(limits, Space): raise TypeError("Limits for registering an integral have to be `Space`") axes = frozenset(limits.axes) # add catching everything unsupported: func = supports(norm_range=supports_norm_range, multiple_limits=supports_multiple_limits)(func) limits = limits.with_axes(axes=tuple(sorted(limits.axes))) self._integrals[axes][limits.limits] = Integral(func=func, limits=limits, priority=priority) # TODO improve with
# database-like access
[docs] def integrate(self, x: Optional[ztyping.XType], limits: ztyping.LimitsType, axes: ztyping.AxesTypeInput = None, norm_range: ztyping.LimitsType = None, model: ZfitModel = None, params: dict = None) -> ztyping.XType: """Integrate analytically over the axes if available. Args: x (numeric): If a partial integration is made, x are the value to be evaluated for the partial integrated function. If a full integration is performed, this should be `None`. limits (:py:class:`~zfit.Space`): The limits to integrate axes (Tuple[int]): The dimensions to integrate over norm_range (bool): |norm_range_arg_descr| params (dict): The parameters of the function Returns: Union[tf.Tensor, float]: Raises: NotImplementedError: If the requested integral is not available. """ if axes is None: axes = limits.axes axes = frozenset(axes) integral_holder = self._integrals.get(axes) # limits = convert_to_space(axes=self.axes, limits=limits) if integral_holder is None: raise NotImplementedError("Analytic integral is not available for axes {}".format(axes)) integral_fn = self.get_max_integral(limits=limits) if integral_fn is None: raise NotImplementedError( "Integral is available for axes {}, but not for limits {}".format(axes, limits)) if x is None: try: integral = integral_fn(x=x, limits=limits, norm_range=norm_range, params=params, model=model) except TypeError: integral = integral_fn(limits=limits, norm_range=norm_range, params=params, model=model) else: integral = integral_fn(x=x, limits=limits, norm_range=norm_range, params=params, model=model) return integral
[docs]class Integral: # TODO analytic integral def __init__(self, func: Callable, limits: "zfit.Space", priority: Union[int, float]): """A lightweight holder for the integral function.""" self.limits = limits self.integrate = func self.axes = limits.axes self.priority = priority def __call__(self, *args, **kwargs): return self.integrate(*args, **kwargs)
# to be "the" future integral class
[docs]class Integration: def __init__(self, mc_sampler, draws_per_dim): self.mc_sampler = mc_sampler self.draws_per_dim = draws_per_dim