# Copyright (c) 2024 zfit
from __future__ import annotations
import warnings
from typing import TYPE_CHECKING
import jacobi
if TYPE_CHECKING:
import zfit
import logging
from collections.abc import Callable
from functools import lru_cache, wraps
import numpy as np
import scipy.stats
import tensorflow as tf
from scipy import optimize
import zfit.z.numpy as znp
from .. import z
from ..core.interfaces import ZfitIndependentParameter
from ..core.parameter import assign_values
from ..util.container import convert_to_container
class NewMinimum(Exception):
"""Exception class for cases where a new minimum is found."""
class FailEvalLossNaN(Exception):
pass
class RootFound(Exception):
"""Exception class for cases where a root is found, since SciPy root solvers don't really respect tol or xtol on
initial evaluation."""
[docs]
def compute_errors(
result: zfit.result.FitResult,
params: list[ZfitIndependentParameter],
*,
cl: float | None = None,
rtol: float | None = None,
method: str | None = None,
covariance_method: str | Callable | None = None,
sigma: float | None = None,
) -> tuple[
dict[ZfitIndependentParameter, dict[str, float]],
zfit.result.FitResult | None,
]:
"""Compute asymmetric errors of parameters by profiling the loss function in the fit result.
This method finds the value for a given parameter where the loss function is ``cl`` away: for example
for a cl of 68.3%, this is one (multiplied by the errordef). The other parameters are also minimized and
not fixed. This method is comparably computationally intensive and, if possible, ``hesse`` should be used.
However, since ``hesse`` does not capture asymmetric or non-parabolic shaped profiles well, this method is
preferable.
Args:
result: fit result to be used to compute the uncertainties.
params: The parameters to calculate the
error. If None, use all parameters.
sigma: Number of sigmas to calculate the error. Alternative to ``cl``.
cl: Confidence Level of the parameter to be determined. Defaults to 68.3%. Alternative to ``sigma``.
rtol: relative tol between the computed and the exact roots
method: type of solver, ``method`` argument of :py:func:`scipy.optimize.root`. Defaults to "hybr".
covariance_method: The method to use to calculate the correlation matrix, will be forwarded directly
to :py:meth:`FitResult.covariance`. Valid choices are
by default {'minuit_hesse', 'hesse_np'} (or any other method defined in the result)
or a Callable.
Returns:
out:
A ``dict`` containing as keys the parameter and as value a ``dict`` which
contains two keys 'lower' and 'upper', holding the calculated errors.
Example: result[par1]['upper'] -> the asymmetric upper error of 'par1'
out: a fit result is returned when a new minimum is found during the loss scan
"""
if rtol is None:
rtol = 0.003
method = "hybr" if method is None else method
if cl is None:
if sigma is None:
sigma = 1
elif sigma is None:
sigma = scipy.stats.chi2(1).ppf(cl) ** 0.5
else:
msg = "Cannot specify both sigma and cl."
raise ValueError(msg)
# TODO: how to name things, sigma or cl?
params = convert_to_container(params)
new_result = None
all_params = list(result.params.keys())
loss = result.loss
errordef = loss.errordef
fmin = result.fminopt
rtol *= errordef
minimizer = result.minimizer
covariance = result.covariance(method=covariance_method, as_dict=True)
if covariance is None:
covariance = result.covariance(method="hesse_np", as_dict=True)
if covariance is None:
msg = "Could not compute covariance matrix. Check if the minimum is valid."
raise RuntimeError(msg)
param_errors = {param: covariance[(param, param)] ** 0.5 for param in all_params}
param_scale = np.array(list(param_errors.values()))
ncalls = 0
loss_min_tol = minimizer.tol * errordef * 2 # 2 is just to be tolerant
try:
to_return = {}
for param in params:
assign_values(all_params, result)
logging.info(f"profiling the parameter {param}") # noqa: G004
param_error = param_errors[param]
param_value = result.params[param]["value"]
initial_values = {"lower": [], "upper": []}
direction = {"lower": -1, "upper": 1}
for ap in all_params:
ap_value = result.params[ap]["value"]
error_factor = covariance[(param, ap)] * (2 * errordef / param_error**2) ** 0.5
for d in ["lower", "upper"]:
step = direction[d] * error_factor * sigma
for ntrial in range(50): # noqa: B007
ap_value_init = ap_value + step
if ap_value_init < ap.lower or ap_value_init > ap.upper:
step *= 0.8
else:
break
else:
msg = (
f"Could not find a valid initial value for {ap} in {d} direction after {ntrial + 1} trials."
f" step tried: {step}. This should not happes, the error probably looks weird. Maybe plot"
f" the loss function for different parameter values and check if it looks reasonable."
)
raise RuntimeError(msg)
initial_values[d].append(ap_value_init)
index_poi = all_params.index(param) # remember the index
_ = loss.value_gradient(params=all_params, full=False) # to make sure the loss is compiled
def make_optimized_loss_gradient_func(index_poi):
@z.function(wraps="gradient", keepalive=True)
def wrappedfunc(values, index):
assert isinstance(index, int)
assign_values(all_params, values)
loss_value, gradient = loss.value_gradient(params=all_params, full=False)
if isinstance(gradient, (tuple, list)):
gradient = znp.asarray(gradient)
gradient = znp.concatenate([gradient[:index_poi], gradient[index_poi + 1 :]])
return loss_value, gradient
return wrappedfunc
optimized_loss_gradient = make_optimized_loss_gradient_func(index_poi)
# TODO: improvement, use jacobian?
root = None
ntol = 999 # if it's right in the beginning, we think it's fine
# TODO: should we add a "robust" or similar option to not skip this?
# or evaluate and then decide ,maybe use krylov as it doesn't do a lot of calls in the beginning, it
# approximates the jacobian
def make_func(index_poi, optimized_loss_gradient, param): # bind these values
def wrappedfunc(values, args):
nonlocal ncalls, root, ntol
ncalls += 1
swap_sign = args
try:
loss_value, gradient = optimized_loss_gradient(values, index_poi)
except tf.errors.InvalidArgumentError:
loss_value = znp.array(9999999999.0)
gradient = z.random.normal(stddev=0.1, shape=(len(all_params) - 1,))
# raise FailEvalLossNaN(msg)
zeroed_loss = loss_value - fmin
gradient = znp.array(gradient)
if swap_sign(param): # mirror at x-axis to remove second zero
zeroed_loss = -zeroed_loss
gradient = -gradient
logging.info("Swapping sign in error calculation 'zfit_error'")
elif zeroed_loss < -loss_min_tol:
assign_values(all_params, values) # set values to the new minimum
msg = "A new minimum is found."
raise NewMinimum(msg)
downward_shift = errordef * sigma**2
shifted_loss = zeroed_loss - downward_shift
if abs(shifted_loss) < rtol:
if ntol > 3:
root = values[index_poi]
raise RootFound()
ntol += 1
else:
ntol = 0
return znp.concatenate([[shifted_loss], gradient])
return wrappedfunc
func = make_func(index_poi, optimized_loss_gradient, param)
to_return[param] = {}
swap_sign = {
"lower": lambda p, pval=param_value: p > pval,
"upper": lambda p, pval=param_value: p < pval,
}
for d in ["lower", "upper"]:
try:
root_result = optimize.root(
fun=func,
args=(swap_sign[d],),
x0=np.array(initial_values[d]),
tol=rtol * 0.1, # we won't stop like this anyway
options={
"diag": 1 / param_scale, # scale factor for variables
},
method=method,
)
except RootFound:
assert root is not None, "Should be changed inside function."
else:
warnings.warn(
f"The root finding did not converge below {rtol} but stopped by its own criteria.",
RuntimeWarning,
stacklevel=2,
)
root = root_result.x[index_poi]
to_return[param][d] = root - param_value
assign_values(all_params, result)
except NewMinimum:
from .. import settings
if settings.get_verbosity() >= 5:
pass
minimizer = result.minimizer
loss = result.loss
new_found_fmin = loss.value(full=False)
new_result = minimizer.minimize(loss=loss)
if new_result.fminopt >= new_found_fmin + loss_min_tol:
msg = (
"A new minimum was discovered but the minimizer was not able to find this on himself. "
"This behavior is currently an exception but will most likely change in the future."
)
raise RuntimeError(msg) from None
to_return, new_result_ = compute_errors(result=new_result, params=params, cl=cl, rtol=rtol, method=method)
if new_result_ is not None:
new_result = new_result_
return to_return, new_result
def numerical_pdf_jacobian(func, params): # TODO: jit?
"""
Args:
func: The function for which the Jacobian will be computed.
params: A dictionary of parameter values for the function.
Returns:
The numerical Jacobian of the given function with respect to the parameters.
"""
params = list(params.values())
return znp.asarray(jacobi.jacobi(func, params)[0].T)
# @z.function(wraps="autodiff")
def autodiff_pdf_jacobian(func, params):
"""Computes the Jacobian matrix of a function using automatic differentiation.
Args:
func: A callable representing the function for which the Jacobian is to be calculated.
params: A dictionary of parameters with their values that are passed to the function.
"""
params = list(params.values())
# TODO(WrappedVariable): this is needed if we want to use wrapped Variables
# params = z.math._extract_tfparams(params)
# the below fails for some cases (i.e. CB) with an internal error
# ValueError: Internal error: Tried to take gradients (or similar) of a variable without handle data:
# Tensor("Placeholder_1:0", shape=(), dtype=resource)
# we didn't report that yet, it's too hard to reproduce with a minimal example currently.
# with tf.GradientTape(watch_accessed_variables=False) as t2:
# t2.watch(params)
# with tf.GradientTape(watch_accessed_variables=False) as t1:
# t1.watch(params)
# values = func()
#
# grad = t1.gradient(values, params)
# grad = tf.convert_to_tensor(grad)
# jacobian = t2.jacobian(grad, params)
columns = []
for p in params:
vector = np.zeros(len(params))
vector[params.index(p)] = 1.0
with tf.autodiff.ForwardAccumulator(params, list(vector)) as acc:
values = func()
columns.append(acc.jvp(values))
return z.convert_to_tensor(columns)
def covariance_with_weights(method, result, params):
"""Compute the covariance matrix of the parameters with weights."""
from .. import run
run.assert_executing_eagerly()
loss = result.loss
model = loss.model
data = loss.data
yields = None
if loss.is_extended:
yields = []
for m in model:
yields.append(m.get_yield())
constraints = None
if loss.constraints:
constraints = loss.constraints
old_vals = np.asarray(params)
Hinv_dict = method(result=result, params=params) # inverse of the hessian matrix
if not Hinv_dict:
return {}
Hinv = dict_to_matrix(params, Hinv_dict)
def func():
values = []
for i, (m, d) in enumerate(zip(model, data)):
v = m.log_pdf(d)
weights = d.weights
if weights is not None:
v *= weights
values.append(v)
if yields is not None:
yi = yields[i]
nevents_collected = tf.reduce_sum(weights) if weights is not None else d.nevents
term_new = tf.nn.log_poisson_loss(nevents_collected, znp.log(yi), compute_full_loss=True)[
..., None
] # make it an array like the others
values.append(term_new)
if constraints is not None:
for constraint in constraints:
values.append(znp.reshape(constraint.value(), (-1,)))
return znp.concatenate(values, axis=0)
params_dict = {p.name: p for p in params}
if run.get_autograd_mode():
try:
jacobian = autodiff_pdf_jacobian(func=func, params=params_dict)
except ValueError as error:
msg = (
"The autodiff could not be performed for the jacobian matrix. This can have a natural cause (see error above)"
" or be a bug in the autodiff. In the latter case, you can try to use the numerical jacobian instead using:\n"
"with zfit.run.set_autograd_mode(False):\n"
" # calculate here, i.e. result.errors()"
)
raise ValueError(msg) from error
else:
def wrapped_func(values):
assign_values(list(params_dict.values()), values)
return np.array(func())
jacobian = numerical_pdf_jacobian(func=wrapped_func, params=params_dict)
C = znp.matmul(jacobian, znp.transpose(jacobian))
covariance = np.asarray(znp.matmul(Hinv, znp.matmul(C, Hinv)))
assign_values(params, old_vals)
return matrix_to_dict(params, covariance)
def dict_to_matrix(params, matrix_dict):
nparams = len(params)
matrix = np.empty((nparams, nparams))
if not matrix_dict:
return None
for i in range(nparams):
pi = params[i]
for j in range(i, nparams):
pj = params[j]
matrix[i, j] = matrix_dict[(pi, pj)]
if i != j:
matrix[j, i] = matrix_dict[(pi, pj)]
return matrix
def matrix_to_dict(params, matrix):
nparams = len(params)
matrix_dict = {}
if matrix is None:
return matrix_dict
for i in range(nparams):
pi = params[i]
for j in range(i, nparams):
pj = params[j]
matrix_dict[(pi, pj)] = matrix[i, j]
if i != j:
matrix_dict[(pj, pi)] = matrix[i, j]
return matrix_dict
def np_cache(*args, **kwargs):
"""LRU cache implementation for functions whose FIRST parameter is a numpy array.
>>> array = np.array([[1, 2, 3], [4, 5, 6]])
>>> @np_cache(maxsize=256)
... def multiply(array, factor):
... print("Calculating...")
... return factor*array
>>> multiply(array, 2)
Calculating...
array([[ 2, 4, 6],
[ 8, 10, 12]])
>>> multiply(array, 2)
array([[ 2, 4, 6],
[ 8, 10, 12]])
>>> multiply.cache_info()
CacheInfo(hits=1, misses=1, maxsize=256, currsize=1)
"""
def decorator(function):
@wraps(function)
def wrapper(np_array, *args, **kwargs):
hashable_array = array_to_tuple(np_array)
return cached_wrapper(hashable_array, *args, **kwargs)
@lru_cache(*args, **kwargs)
def cached_wrapper(hashable_array, *args, **kwargs):
array = np.array(hashable_array)
return function(array, *args, **kwargs)
def array_to_tuple(np_array):
"""Iterates recursively."""
try:
return tuple(array_to_tuple(_) for _ in np_array)
except TypeError:
return np_array
# copy lru_cache attributes over too
wrapper.cache_info = cached_wrapper.cache_info
wrapper.cache_clear = cached_wrapper.cache_clear
return wrapper
return decorator