Source code for zfit.util.execution
# Copyright (c) 2020 zfit
import contextlib
import multiprocessing
import os
import sys
import warnings
from typing import List, Union
import numpy as np
import tensorflow as tf
from .container import DotDict, is_container
[docs]class RunManager:
def __init__(self, n_cpu='auto'):
"""Handle the resources and runtime specific options. The `run` method is equivalent to `sess.run`"""
self.MAX_CHUNK_SIZE = sys.maxsize
self.chunking = DotDict()
self._cpu = []
self._n_cpu = None
self._inter_cpus = None
self._intra_cpus = None
self._strict = False
self.numeric_checks = True
self.set_n_cpu(n_cpu=n_cpu)
# HACK
self._enable_parameter_autoconversion = True
# HACK END
# set default values
self.chunking.active = False # not yet implemented the chunking...
self.chunking.max_n_points = 1000000
@property
def chunksize(self):
if self.chunking.active:
return self.chunking.max_n_points
else:
return self.MAX_CHUNK_SIZE
@property
def n_cpu(self):
return len(self._cpu)
[docs] def set_n_cpu(self, n_cpu: Union[str, int] = 'auto', strict: bool = False) -> None:
"""Set the number of cpus to be used by zfit. For more control, use `set_cpus_explicit`.
Args:
n_cpu: Number of cpus, will be the number for inter-op parallelism
strict: If strict, sets intra parallelism to 1
"""
if n_cpu == 'auto':
try:
cpu = sorted(os.sched_getaffinity(0))
except AttributeError:
cpu = range(multiprocessing.cpu_count())
warnings.warn("Not running on Linux. Determining available cpus for thread can fail"
"and be overestimated. Workaround (only if too many cpus are used):"
"`zfit.run.set_n_cpu(your_cpu_number)`")
elif isinstance(n_cpu, int):
cpu = range(n_cpu)
self._cpu = ['dummy_cpu{}'.format(i) for i in cpu]
n_cpu = len(cpu)
if strict ^ self._strict:
intra = 1 if strict else 2
inter = n_cpu
self.set_cpus_explicit(intra=intra, inter=inter)
[docs] def set_cpus_explicit(self, intra: int, inter: int) -> None:
"""Set the number of threads (cpus) used for inter-op and intra-op parallelism
Args:
intra: Number of threads used to perform an operation. For larger operations, e.g. large Tensors, this
is usually beneficial to have >= 2.
inter: Parallelization on the level of ops. This is beneficial, if many operations can be computed
independently in parallel.
"""
try:
tf.config.threading.set_intra_op_parallelism_threads(intra)
tf.config.threading.set_inter_op_parallelism_threads(inter)
self._n_cpu = inter + intra
except RuntimeError as err:
raise RuntimeError("Cannot set the number of cpus after initialization, has to be at the beginning."
f" Original message: {err}")
[docs] @contextlib.contextmanager
def aquire_cpu(self, max_cpu: int = -1) -> List[str]:
if isinstance(max_cpu, int):
if max_cpu < 0:
max_cpu = max((self.n_cpu + 1 + max_cpu, 0)) # -1 means all
if max_cpu == 0:
cpu = []
else:
n_cpu = min((max_cpu, self.n_cpu))
cpu = self._cpu[-n_cpu:]
self._cpu = self._cpu[:-n_cpu]
yield cpu
self._cpu.extend(cpu)
def __call__(self, *args, **kwargs):
flattened_args = tf.nest.flatten(args)
evaluated_args = [arg.numpy() for arg in flattened_args]
values = tf.nest.pack_sequence_as(args, flat_sequence=evaluated_args)
# tf.nest.map_structure(lambda *args: [arg.numpy() for arg in args], *args)
if kwargs:
raise RuntimeError("Why kwargs provided? Still under conversion from TF 1.x to 2.x")
was_container = is_container(args[0]) and not isinstance(args[0], np.ndarray, )
# to_convert = convert_to_container(args[0])
# values = [arg.numpy() for arg in to_convert if isinstance(arg, (tf.Tensor, tf.Variable))]
if not was_container and values:
values = values[0]
return values