Source code for zfit.minimizers.errors

#  Copyright (c) 2025 zfit

from __future__ import annotations

import logging
import typing
import warnings
from collections.abc import Callable
from enum import Enum
from functools import lru_cache, wraps

import jacobi
import numpy as np
import scipy.stats
import tensorflow as tf
from scipy import optimize

import zfit.z.numpy as znp
from zfit._interfaces import ZfitIndependentParameter

from .. import z
from ..core.parameter import assign_values
from ..util.container import convert_to_container
from ..util.exception import BreakingAPIChangeError

if typing.TYPE_CHECKING:
    import zfit


class NewMinimum(Exception):
    """Exception class for cases where a new minimum is found."""


class FailEvalLossNaN(Exception):
    pass


class RootFound(Exception):
    """Exception class for cases where a root is found, since SciPy root solvers don't really respect tol or xtol on
    initial evaluation."""


# make enum of WeightCorr
class WeightCorr(Enum):
    ASYMPTOTIC: str = "asymptotic"
    FALSE: bool = False
    SUMW2: str = "sumw2"  # sum of weights squared, not the sum of squares of weights


[docs] def compute_errors( result: zfit.result.FitResult, params: list[ZfitIndependentParameter], *, cl: float | None = None, rtol: float | None = None, method: str | None = None, covariance_method: str | Callable | None = None, sigma: float | None = None, ) -> tuple[ dict[ZfitIndependentParameter, dict[str, float]], zfit.result.FitResult | None, ]: """Compute asymmetric errors of parameters by profiling the loss function in the fit result. This method finds the value for a given parameter where the loss function is ``cl`` away: for example for a cl of 68.3%, this is one (multiplied by the errordef). The other parameters are also minimized and not fixed. This method is comparably computationally intensive and, if possible, ``hesse`` should be used. However, since ``hesse`` does not capture asymmetric or non-parabolic shaped profiles well, this method is preferable. Args: result: fit result to be used to compute the uncertainties. params: The parameters to calculate the error. If None, use all parameters. sigma: Number of sigmas to calculate the error. Alternative to ``cl``. cl: Confidence Level of the parameter to be determined. Defaults to 68.3%. Alternative to ``sigma``. rtol: relative tol between the computed and the exact roots method: type of solver, ``method`` argument of :py:func:`scipy.optimize.root`. Defaults to "hybr". covariance_method: The method to use to calculate the correlation matrix, will be forwarded directly to :py:meth:`FitResult.covariance`. Valid choices are by default {'minuit_hesse', 'hesse_np'} (or any other method defined in the result) or a Callable. Returns: out: A ``dict`` containing as keys the parameter and as value a ``dict`` which contains two keys 'lower' and 'upper', holding the calculated errors. Example: result[par1]['upper'] -> the asymmetric upper error of 'par1' out: a fit result is returned when a new minimum is found during the loss scan """ if rtol is None: rtol = 0.003 method = "hybr" if method is None else method if cl is None: if sigma is None: sigma = 1 elif sigma is None: sigma = scipy.stats.chi2(1).ppf(cl) ** 0.5 else: msg = "Cannot specify both sigma and cl." raise ValueError(msg) params = convert_to_container(params) new_result = None all_params = list(result.params.keys()) loss = result.loss errordef = loss.errordef fmin = result.fminopt rtol *= errordef minimizer = result.minimizer covariance = result.covariance(method=covariance_method, as_dict=True) if covariance is None: covariance = result.covariance(method="hesse_np", as_dict=True) if covariance is None: msg = "Could not compute covariance matrix. Check if the minimum is valid." raise RuntimeError(msg) param_errors = {param: covariance[(param, param)] ** 0.5 for param in all_params} param_scale = np.array(list(param_errors.values())) ncalls = 0 loss_min_tol = minimizer.tol * errordef * 2 # 2 is just to be tolerant try: to_return = {} for param in params: assign_values(all_params, result) logging.info(f"profiling the parameter {param}") # noqa: G004 param_error = param_errors[param] param_value = result.params[param]["value"] initial_values = {"lower": [], "upper": []} direction = {"lower": -1, "upper": 1} for ap in all_params: ap_value = result.params[ap]["value"] error_factor = covariance[(param, ap)] * (2 * errordef / param_error**2) ** 0.5 for d in ["lower", "upper"]: step = direction[d] * error_factor * sigma for ntrial in range(50): # noqa: B007 ap_value_init = ap_value + step if ap_value_init < ap.lower or ap_value_init > ap.upper: step *= 0.8 else: break else: msg = ( f"Could not find a valid initial value for {ap} in {d} direction after {ntrial + 1} trials." f" step tried: {step}. This should not happes, the error probably looks weird. Maybe plot" f" the loss function for different parameter values and check if it looks reasonable." ) raise RuntimeError(msg) initial_values[d].append(ap_value_init) index_poi = all_params.index(param) # remember the index _ = loss.value_gradient(params=all_params, full=False) # to make sure the loss is compiled def make_optimized_loss_gradient_func(index_poi): @z.function(wraps="gradient", keepalive=True) def wrappedfunc(values, index): assert isinstance(index, int) assign_values(all_params, values) loss_value, gradient = loss.value_gradient(params=all_params, full=False) if isinstance(gradient, tuple | list): gradient = znp.asarray(gradient) gradient = znp.concatenate([gradient[:index_poi], gradient[index_poi + 1 :]]) return loss_value, gradient return wrappedfunc optimized_loss_gradient = make_optimized_loss_gradient_func(index_poi) # TODO: improvement, use jacobian? root = None ntol = 999 # if it's right in the beginning, we think it's fine # TODO: should we add a "robust" or similar option to not skip this? # or evaluate and then decide ,maybe use krylov as it doesn't do a lot of calls in the beginning, it # approximates the jacobian def make_func(index_poi, optimized_loss_gradient, param): # bind these values def wrappedfunc(values, args): nonlocal ncalls, root, ntol ncalls += 1 swap_sign = args try: loss_value, gradient = optimized_loss_gradient(values, index_poi) except tf.errors.InvalidArgumentError: loss_value = znp.array(9999999999.0) gradient = z.random.normal(stddev=0.1, shape=(len(all_params) - 1,)) # raise FailEvalLossNaN(msg) zeroed_loss = loss_value - fmin gradient = znp.array(gradient) if swap_sign(param): # mirror at x-axis to remove second zero zeroed_loss = -zeroed_loss gradient = -gradient logging.info("Swapping sign in error calculation 'zfit_error'") elif zeroed_loss < -loss_min_tol: assign_values(all_params, values) # set values to the new minimum msg = "A new minimum is found." raise NewMinimum(msg) downward_shift = errordef * sigma**2 shifted_loss = zeroed_loss - downward_shift if abs(shifted_loss) < rtol: if ntol > 3: root = values[index_poi] raise RootFound() ntol += 1 else: ntol = 0 return znp.concatenate([[shifted_loss], gradient]) return wrappedfunc func = make_func(index_poi, optimized_loss_gradient, param) to_return[param] = {} swap_sign = { "lower": lambda p, pval=param_value: p > pval, "upper": lambda p, pval=param_value: p < pval, } for d in ["lower", "upper"]: try: root_result = optimize.root( fun=func, args=(swap_sign[d],), x0=np.array(initial_values[d]), tol=rtol * 0.1, # we won't stop like this anyway options={ "diag": 1 / param_scale, # scale factor for variables }, method=method, ) except RootFound: assert root is not None, "Should be changed inside function." else: warnings.warn( f"The root finding did not converge below {rtol} but stopped by its own criteria.", RuntimeWarning, stacklevel=2, ) root = root_result.x[index_poi] to_return[param][d] = root - param_value assign_values(all_params, result) except NewMinimum: from .. import settings # noqa: PLC0415 if settings.get_verbosity() >= 5: pass minimizer = result.minimizer loss = result.loss new_found_fmin = loss.value(full=False) new_result = minimizer.minimize(loss=loss) if new_result.fminopt >= new_found_fmin + loss_min_tol: msg = ( "A new minimum was discovered but the minimizer was not able to find this on himself. " "This behavior is currently an exception but will most likely change in the future." ) raise RuntimeError(msg) from None to_return, new_result_ = compute_errors(result=new_result, params=params, cl=cl, rtol=rtol, method=method) if new_result_ is not None: new_result = new_result_ return to_return, new_result
def numerical_pdf_jacobian(func, params): # TODO: jit? """ Args: func: The function for which the Jacobian will be computed. params: A dictionary of parameter values for the function. Returns: The numerical Jacobian of the given function with respect to the parameters. """ params = list(params.values()) return znp.asarray(jacobi.jacobi(func, params)[0].T) @z.function(wraps="autodiff") def autodiff_pdf_jacobian(func, params): """Computes the Jacobian matrix of a function using automatic differentiation. Args: func: A callable representing the function for which the Jacobian is to be calculated. params: A dictionary of parameters with their values that are passed to the function. """ params = list(params.values()) # TODO(WrappedVariable): this is needed if we want to use wrapped Variables # params = z.math._extract_tfparams(params) # the below fails for some cases (i.e. CB) with an internal error # ValueError: Internal error: Tried to take gradients (or similar) of a variable without handle data: # Tensor("Placeholder_1:0", shape=(), dtype=resource) # we didn't report that yet, it's too hard to reproduce with a minimal example currently. # with tf.GradientTape(watch_accessed_variables=False) as t2: # t2.watch(params) # with tf.GradientTape(watch_accessed_variables=False) as t1: # t1.watch(params) # values = func() # # grad = t1.gradient(values, params) # grad = tf.convert_to_tensor(grad) # jacobian = t2.jacobian(grad, params) columns = [] for p in params: vector = np.zeros(len(params)) vector[params.index(p)] = 1.0 with tf.autodiff.ForwardAccumulator(params, list(vector)) as acc: values = func() columns.append(acc.jvp(values)) return znp.asarray(columns) # TODO: refactor below, extract separate methods def covariance_with_weights(hinv, result, params, *, weightcorr: WeightCorr = None): """Compute the covariance matrix of the parameters with weights. Args: hinv: The inverse of the Hessian matrix. result: The fit result. params: The parameters for which the covariance matrix is to be calculated. weightcorr: |@doc:result.hesse.weightcorr.method| Method to correct the estimation of the covariance matrix/hesse error for a weighted likelihood. The following methods are available, for a comparison and the derivation of the methods, see [langenbruch1]_: - `False`: no correction, the covariance matrix is calculated as if the likelihood was unweighted. This will generally underestimate the errors. - `asymptotic`: the covariance matrix is corrected by the asymptotic formula for the weighted likelihood. This is the default, yet computationally most expensive method. - `sumw2`: the covariance matrix is corrected by the effective sample size. This is the fastest method but won't yield asymptotically correct results. This is not (yet) guaranteed to fully work for binned fits and maybe under/over represents errors. .. [langenbruch1] Langenbruch, C. Parameter uncertainties in weighted unbinned maximum likelihood fits `Eur. Phys. J. C 82, 393 (2022). <https://doi.org/10.1140/epjc/s10052-022-10254-8>`_. |@docend:result.hesse.weightcorr.method| """ from .. import run # noqa: PLC0415 if weightcorr == "sumw2": msg = "The 'sumw2' option has been renamed to 'sumw2'." raise BreakingAPIChangeError(msg) weightcorr = WeightCorr.ASYMPTOTIC if weightcorr is None else WeightCorr(weightcorr) run.assert_executing_eagerly() loss = result.loss model = loss.model data = loss.data yields = None if loss.is_extended: yields = [] for m in model: yields.append(m.get_yield()) constraints = None if loss.constraints: constraints = loss.constraints old_vals = np.asarray(params) Hinv_dict = hinv(result=result, params=params) # inverse of the hessian matrix if not Hinv_dict: return {} Hinv = dict_to_matrix(params, Hinv_dict) dataweights = [ d.weights if d.has_weights else znp.ones((d.nevents,)) # sum(ones_nevents ** 2) = nevents for d in data ] # extendedweights = [znp.reshape(znp.sum(w), (-1,)) for w in dataweights] if yields else [] # extendedweights = [znp.ones((len(yields),))] if yields else [] constrweights = [znp.ones((len(constraints),))] if constraints else [] allweights = znp.concatenate(dataweights + constrweights, axis=0) allweights2 = allweights**2 sumw2 = znp.sum(allweights2) sumw = znp.sum(allweights) if weightcorr == WeightCorr.ASYMPTOTIC: corrfactor = 1.0 elif weightcorr == WeightCorr.SUMW2: corrfactor = sumw2 / sumw del allweights else: msg = f"Unknown method {weightcorr}, has to be one of {WeightCorr}" raise ValueError(msg) @z.function(wraps="weightcorrfunc") def func(): values = [] for i, (m, d) in enumerate(zip(model, data, strict=True)): v = m.log_pdf(d) # we calculate the unweighted likelihood, correct? # weights = d.weights # print(f"weights: {weights}, corrfactor: {corrfactor}") # if weights is not None: # print(f"weights: {weights}, corrfactor: {corrfactor}") # v *= weights if yields is not None: yi = yields[i] # shouldn't the normal yield be enough? Not quite sure why not # Probably because it's a sum of the weights # nevents_collected = d.samplesize # extendedterm = tf.nn.log_poisson_loss(nevents_collected, znp.log(yi), compute_full_loss=True) # values.append(term_new) extendedterm = znp.log(yi) v += extendedterm # values.append(extendedterm[..., None]) # make it an array like the others values.append(v) if constraints is not None: for constraint in constraints: values.append(znp.reshape(constraint.value(), (-1,))) return znp.concatenate(values, axis=0) params_dict = {p.name: p for p in params} if weightcorr == WeightCorr.ASYMPTOTIC: if run.get_autograd_mode(): try: jacobian = autodiff_pdf_jacobian(func=func, params=params_dict) except ValueError as error: msg = ( "The autodiff could not be performed for the jacobian matrix. This can have a natural cause (see error above)" " or be a bug in the autodiff. In the latter case, you can try to use the numerical jacobian instead using:\n" "with zfit.run.set_autograd_mode(False):\n" " # calculate here, i.e. result.errors()" ) raise ValueError(msg) from error else: def wrapped_func(values): assign_values(list(params_dict.values()), values) return np.array(func()) jacobian = numerical_pdf_jacobian(func=wrapped_func, params=params_dict) C = znp.matmul(jacobian * allweights2, znp.transpose(jacobian)) covariance = np.asarray(znp.matmul(Hinv, znp.matmul(C, Hinv))) elif weightcorr == WeightCorr.SUMW2: # Not quite correct, technically. The weights should be squared in the # NLL calculation covariance = Hinv * corrfactor assign_values(params, old_vals) return matrix_to_dict(params, covariance) def dict_to_matrix(params, matrix_dict): nparams = len(params) matrix = np.empty((nparams, nparams)) if not matrix_dict: return None for i in range(nparams): pi = params[i] for j in range(i, nparams): pj = params[j] matrix[i, j] = matrix_dict[(pi, pj)] if i != j: matrix[j, i] = matrix_dict[(pi, pj)] return matrix def matrix_to_dict(params, matrix): nparams = len(params) matrix_dict = {} if matrix is None: return matrix_dict for i in range(nparams): pi = params[i] for j in range(i, nparams): pj = params[j] matrix_dict[(pi, pj)] = matrix[i, j] if i != j: matrix_dict[(pj, pi)] = matrix[i, j] return matrix_dict def np_cache(*args, **kwargs): """LRU cache implementation for functions whose FIRST parameter is a numpy array. >>> array = np.array([[1, 2, 3], [4, 5, 6]]) >>> @np_cache(maxsize=256) ... def multiply(array, factor): ... print("Calculating...") ... return factor*array >>> multiply(array, 2) Calculating... array([[ 2, 4, 6], [ 8, 10, 12]]) >>> multiply(array, 2) array([[ 2, 4, 6], [ 8, 10, 12]]) >>> multiply.cache_info() CacheInfo(hits=1, misses=1, maxsize=256, currsize=1) """ def decorator(function): @wraps(function) def wrapper(np_array, *args, **kwargs): hashable_array = array_to_tuple(np_array) return cached_wrapper(hashable_array, *args, **kwargs) @lru_cache(*args, **kwargs) def cached_wrapper(hashable_array, *args, **kwargs): array = np.array(hashable_array) return function(array, *args, **kwargs) def array_to_tuple(np_array): """Iterates recursively.""" try: return tuple(array_to_tuple(_) for _ in np_array) except TypeError: return np_array # copy lru_cache attributes over too wrapper.cache_info = cached_wrapper.cache_info wrapper.cache_clear = cached_wrapper.cache_clear return wrapper return decorator