Source code for zfit.core.loss

#  Copyright (c) 2020 zfit

import abc
import warnings
from typing import Optional, Union, List, Callable, Iterable

import tensorflow as tf
from ordered_set import OrderedSet

from zfit import z, settings

ztf = z
from zfit.z.math import numerical_gradient, autodiff_gradient, autodiff_value_gradients, numerical_value_gradients, \
    automatic_value_gradients_hessian, numerical_value_gradients_hessian
from ..util import ztyping
from ..util.cache import Cachable
from ..util.checks import ZfitNotImplemented, NOT_SPECIFIED
from .baseobject import BaseObject
from .dependents import BaseDependentsMixin
from .interfaces import ZfitLoss, ZfitSpace, ZfitModel, ZfitData, ZfitConstraint
from ..util.container import convert_to_container, is_container
from ..util.exception import IntentionNotUnambiguousError, NotExtendedPDFError, WorkInProgressError, \
    BreakingAPIChangeError
from .constraint import BaseConstraint, SimpleConstraint


# @z.function
def _unbinned_nll_tf(model: ztyping.PDFInputType, data: ztyping.DataInputType, fit_range: ZfitSpace):
    """Return unbinned negative log likelihood graph for a PDF

    Args:
        model (ZfitModel): PDFs with a `.pdf` method. Has to be as many models as data
        data (ZfitData):
        fit_range ():

    Returns:
        graph: the unbinned nll

    Raises:
        ValueError: if both `probs` and `log_probs` are specified.
    """

    if is_container(model):
        nlls = [_unbinned_nll_tf(model=p, data=d, fit_range=r)
                for p, d, r in zip(model, data, fit_range)]
        nll_finished = tf.reduce_sum(input_tensor=nlls)
    else:
        with data.set_data_range(fit_range):
            probs = model.pdf(data, norm_range=fit_range)
        log_probs = tf.math.log(probs)
        if data.weights is not None:
            log_probs *= data.weights  # because it's prob ** weights
        nll = -tf.reduce_sum(input_tensor=log_probs)
        nll_finished = nll
    return nll_finished


def _nll_constraints_tf(constraints):
    if not constraints:
        return ztf.constant(0.)  # adding 0 to nll
    probs = []
    for param, dist in constraints.items():
        probs.append(dist.pdf(param))
    # probs = [dist.pdf(param) for param, dist in constraints.items()]
    constraints_neg_log_prob = -tf.reduce_sum(input_tensor=tf.math.log(probs))
    return constraints_neg_log_prob


def _constraint_check_convert(constraints):
    checked_constraints = []
    for constr in constraints:
        if isinstance(constr, BaseConstraint):
            checked_constraints.append(constr)
        else:
            checked_constraints.append(SimpleConstraint(func=lambda: constr))
    return checked_constraints


[docs]class BaseLoss(BaseDependentsMixin, ZfitLoss, Cachable, BaseObject): def __init__(self, model: ztyping.ModelsInputType, data: ztyping.DataInputType, fit_range: ztyping.LimitsTypeInput = NOT_SPECIFIED, constraints: Iterable[Union[ZfitConstraint, Callable]] = None): # first doc line left blank on purpose, subclass adds class docstring (Sphinx autodoc adds the two) """ A "simultaneous fit" can be performed by giving one or more `model`, `data`, `fit_range` to the loss. The length of each has to match the length of the others. Args: model (Iterable[ZfitModel]): The model or models to evaluate the data on data (Iterable[ZfitData]): Data to use fit_range (Iterable[:py:class:`~zfit.Space`]): The fitting range. It's the norm_range for the models (if they have a norm_range) and the data_range for the data. constraints (Iterable[tf.Tensor): A Tensor representing a loss constraint. Using `zfit.constraint.*` allows for easy use of predefined constraints. """ super().__init__(name=type(self).__name__) if fit_range is NOT_SPECIFIED: # depreceation fit_range = None else: warnings.warn("The fit_range argument is depreceated and will maybe removed in future releases. " "It is preferred to define the range in the space" " when creating the data and the model.") self.computed_gradients = {} model, data, fit_range = self._input_check(pdf=model, data=data, fit_range=fit_range) self._model = model self._data = data self._fit_range = fit_range if constraints is None: constraints = [] self._constraints = _constraint_check_convert(convert_to_container(constraints, list)) def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) cls._name = "UnnamedSubBaseLoss" def _input_check(self, pdf, data, fit_range): if is_container(pdf) ^ is_container(data): raise ValueError("`pdf` and `data` either both have to be a list or not.") if not is_container(pdf): if isinstance(fit_range, list): raise TypeError("`pdf` and `data` are not a `list`, `fit_range` can't be a `list` then.") if isinstance(pdf, tuple): raise TypeError("`pdf` has to be a pdf or a list of pdfs, not a tuple.") if isinstance(data, tuple): raise TypeError("`data` has to be a data or a list of data, not a tuple.") pdf, data = (convert_to_container(obj, non_containers=[tuple]) for obj in (pdf, data)) # TODO: data, range consistency? if fit_range is None: fit_range = [] for p, d in zip(pdf, data): if not p.norm_range == d.data_range: raise IntentionNotUnambiguousError("No `fit_range` is specified and `pdf` {} as " "well as `data` {} have different ranges they" "are defined in. Either make them (all) consistent" "or specify the `fit_range`") fit_range.append(p.norm_range) else: fit_range = convert_to_container(fit_range, non_containers=[tuple]) # simultaneous fit # if is_container(pdf): # if not is_container(fit_range) or not isinstance(fit_range[0], Space): # raise ValueError( # "If several pdfs are specified, the `fit_range` has to be given as a list of `Space` " # "objects and not as pure tuples.") # else: # fit_range = pdf.convert_sort_space(limits=fit_range) # fit_range may be a tuple if not len(pdf) == len(data) == len(fit_range): raise ValueError("pdf, data and fit_range don't have the same number of components:" "\npdf: {}" "\ndata: {}" "\nfit_range: {}".format(pdf, data, fit_range)) # sanitize fit_range fit_range = [p.convert_sort_space(limits=range_) for p, range_ in zip(pdf, fit_range)] # TODO: sanitize pdf, data? self.add_cache_dependents(cache_dependents=pdf) self.add_cache_dependents(cache_dependents=data) self.add_cache_dependents(cache_dependents=fit_range) return pdf, data, fit_range
[docs] def gradients(self, params: ztyping.ParamTypeInput = None) -> List[tf.Tensor]: if params is None: params = list(self.get_dependents()) else: params = convert_to_container(params) return self._gradients(params=params)
[docs] def add_constraints(self, constraints): constraints = convert_to_container(constraints) return self._add_constraints(constraints)
def _add_constraints(self, constraints): constraints = _constraint_check_convert(convert_to_container(constraints, container=list)) self._constraints.extend(constraints) return constraints @property def name(self): return self._name @property def model(self): return self._model @property def data(self): return self._data @property def fit_range(self): fit_range = self._fit_range return fit_range @property def constraints(self): return self._constraints def _get_dependents(self): pdf_dependents = self._extract_dependents(self.model) return pdf_dependents @abc.abstractmethod def _loss_func(self, model, data, fit_range, constraints): raise NotImplementedError
[docs] def value(self): return self._value()
@property def errordef(self) -> Union[float, int]: return self._errordef def _value(self): try: return self._loss_func(model=self.model, data=self.data, fit_range=self.fit_range, constraints=self.constraints) except NotImplementedError: raise NotImplementedError("_loss_func not properly defined!") def __add__(self, other): if not isinstance(other, BaseLoss): raise TypeError("Has to be a subclass of `BaseLoss` or overwrite `__add__`.") if not type(other) == type(self): raise ValueError("cannot safely add two different kind of loss.") model = self.model + other.model data = self.data + other.data fit_range = self.fit_range + other.fit_range constraints = self.constraints + other.constraints loss = type(self)(model=model, data=data, fit_range=fit_range, constraints=constraints) return loss def _gradients(self, params): if settings.options['numerical_grad']: gradients = numerical_gradient(self.value, params=params) else: gradients = autodiff_gradient(self.value, params=params) return gradients
[docs] def value_gradients(self, params): return self._value_gradients(params=params)
@z.function def _value_gradients(self, params): if settings.options['numerical_grad']: value, gradients = numerical_value_gradients(self.value, params=params) else: value, gradients = autodiff_value_gradients(self.value, params=params) return value, gradients
[docs] def value_gradients_hessian(self, params): vals = self._value_gradients_hessian(params=params) if vals is ZfitNotImplemented: vals = self._value_gradients_hessian_fallback(params=params) return vals
@z.function def _value_gradients_hessian_fallback(self, params): if settings.options['numerical_grad']: value, gradients = numerical_value_gradients_hessian(self.value, params=params) else: value, gradients = automatic_value_gradients_hessian(self.value, params=params) return value, gradients def _value_gradients_hessian(self, params): return ZfitNotImplemented
[docs]class CachedLoss(BaseLoss): def __init__(self, model, data, fit_range=None, constraints=None): raise WorkInProgressError("Currently, caching is not implemented in the loss and does not make" "sense, it is 'not yet upgraded to TF2'") super().__init__(model=model, data=data, fit_range=fit_range, constraints=constraints) @abc.abstractmethod def _cache_add_constraints(self, constraints): raise NotImplementedError def _value(self): if self._cache.get('loss') is None: loss = super()._value() self._cache['loss'] = loss else: loss = self._cache['loss'] return loss def _add_constraints(self, constraints): super()._add_constraints(constraints=constraints) self._cache_add_constraints(constraints=constraints) def _gradients(self, params): params_cache = self._cache.get('gradients', {}) params_todo = [] for param in params: if param not in params_cache: params_todo.append(param) if params_todo: gradients = {(p, grad) for p, grad in zip(params_todo, super()._gradients(params_todo))} params_cache.update(gradients) self._cache['gradients'] = params_cache param_gradients = [params_cache[param] for param in params] return param_gradients
# class UnbinnedNLL(CachedLoss):
[docs]class UnbinnedNLL(BaseLoss): """The Unbinned Negative Log Likelihood.""" _name = "UnbinnedNLL" def __init__(self, model, data, fit_range=NOT_SPECIFIED, constraints=None): super().__init__(model=model, data=data, fit_range=fit_range, constraints=constraints) self._errordef = 0.5 @z.function def _loss_func(self, model, data, fit_range, constraints): # with tf.GradientTape(persistent=True) as tape: nll = self._loss_func_watched(constraints, data, fit_range, model) # variables = tape.watched_variables() # gradients = tape.gradient(nll, sources=variables) # if any(grad is None for grad in tf.unstack(gradients, axis=0)): # none_dict = {var: grad for var, grad in zip(variables, tf.unstack(gradients, axis=0)) if grad is None} # raise LogicalUndefinedOperationError(f"One or more gradients are None and therefore the function does not" # f" depend on them:" # f" {none_dict}") # for param, grad in zip(variables, gradients): # if param in self.computed_gradients: # continue # self.computed_gradients[param] = grad return nll @z.function def _loss_func_watched(self, constraints, data, fit_range, model): nll = _unbinned_nll_tf(model=model, data=data, fit_range=fit_range) if constraints: constraints = ztf.reduce_sum([c.value() for c in constraints]) nll += constraints return nll
# def _cache_add_constraints(self, constraints): # if self._cache.get('loss') is not None: # constraints = [c.value() for c in constraints] # self._cache['loss'] += ztf.reduce_sum(constraints)
[docs]class ExtendedUnbinnedNLL(UnbinnedNLL): """An Unbinned Negative Log Likelihood with an additional poisson term for the""" @z.function def _loss_func(self, model, data, fit_range, constraints): nll = super()._loss_func(model=model, data=data, fit_range=fit_range, constraints=constraints) poisson_terms = [] for mod, dat in zip(model, data): if not mod.is_extended: raise NotExtendedPDFError("The pdf {} is not extended but has to be (for an extended fit)".format(mod)) nevents = dat.nevents if dat.weights is None else ztf.reduce_sum(dat.weights) poisson_terms.append(-mod.get_yield() + ztf.to_real(nevents) * tf.math.log(mod.get_yield())) nll -= tf.reduce_sum(input_tensor=poisson_terms) return nll
[docs]class SimpleLoss(BaseLoss): _name = "SimpleLoss" def __init__(self, func: Callable, dependents: Iterable["zfit.Parameter"] = NOT_SPECIFIED, errordef: Optional[float] = None): """Loss from a (function returning a ) Tensor. Args: func: Callable that constructs the loss and returns a tensor. dependents: The dependents (independent `zfit.Parameter`) of the loss. If not given, the dependents are figured out automatically. errordef: Definition of which change in the loss corresponds to a change of 1 sigma. For example, 1 for Chi squared, 0.5 for negative log-likelihood. """ if dependents is NOT_SPECIFIED: # depreceation raise BreakingAPIChangeError("Dependents need to be specified explicitly due to the upgrade to 0.4." "More information can be found in the upgrade guide on the website.") @z.function def wrapped_func(): return func() self._simple_func = wrapped_func self._simple_errordef = errordef self._errordef = errordef self.computed_gradients = {} dependents = convert_to_container(dependents, container=OrderedSet) self._simple_func_dependents = self._extract_dependents(dependents) super().__init__(model=[], data=[], fit_range=[]) def _get_dependents(self): dependents = self._simple_func_dependents # if dependents is None: # independent_params = tf.compat.v1.get_collection("zfit_independent") # dependents = get_dependents_auto(tensor=self.value(), candidates=independent_params) # self._simple_func_dependents = dependents return dependents @property def errordef(self): errordef = self._simple_errordef if errordef is None: errordef = -999 # raise RuntimeError("For this SimpleLoss, no error calculation is possible.") else: return errordef def _loss_func(self, model, data, fit_range, constraints=None): return self._simple_func() def __add__(self, other): raise IntentionNotUnambiguousError("Cannot add a SimpleLoss, 'addition' of losses can mean anything." "Add them manually") def _cache_add_constraints(self, constraints): raise WorkInProgressError("Needed? will probably provided in future")