from typing import Union, Any
from .kspace_partitioner import MakeVectors, MakeBounds
from ._common import (
Vectors,
Bounds,
_split_hybrid_kvectors,
_split_hybrid_kbounds,
split_list,
discretize_bounds,
)
__all__ = ["KspaceExecutor"]
[docs]
class KspaceExecutor:
"""Parallel execution over k_vectors or k_bounds
Attributes:
k_vectors: The momentum vectors to be shared between workers, if not specified default
model k_vectors are used.
k_bounds: Initial bounds to be shared between workers, if not specified default
model bounds are used.
Note:
We recommand to use num of processors = m^dim when scipy integration
is expected. When discrete kspace sum is to be performed, the splitting
is anyway deterministic.
"""
def __init__(
self,
k_vectors: Union[MakeVectors, Vectors] = None,
k_bounds: Union[MakeBounds, Bounds] = None,
):
# executor related package availability
try:
from mpi4py import MPI
self.comm = MPI.COMM_WORLD
self.rank = self.comm.Get_rank()
self.op = MPI.SUM
self.size = MPI.COMM_WORLD.Get_size()
except ImportError:
raise ImportError("mpi is not available")
# make data to be shared
if (k_vectors is not None) and (k_bounds is not None):
raise ValueError(
"only 'k_vectors' or 'k_bounds' "
"must be supplied for 'discrete' or 'scipy' methods respectively."
)
self._k_vectors = k_vectors
self._k_bounds = k_bounds
if self._k_vectors is not None:
_ensure_input_is_supported(self._k_vectors, main_type=MakeVectors)
if self._k_bounds is not None:
_ensure_input_is_supported(self._k_bounds, main_type=MakeBounds)
@property
def k_vectors(self):
return self._k_vectors
@k_vectors.setter
def k_vectors(self, vectors):
self._k_vectors = vectors
@property
def k_bounds(self):
return self._k_bounds
@k_bounds.setter
def k_bounds(self, bounds):
self._k_bounds = bounds
@property
def input_data(self):
data = self.k_vectors if self.k_bounds is None else self.k_bounds
return partitioned_data(data, self.size)
[docs]
def __call__(self, fun):
rank = self.rank
# every rank gets its own data_chunk
if rank == 0:
input_data = self.input_data
else:
input_data = None
chunk_per_rank = self.comm.scatter(input_data, root=0)
integral_per_rank = fun(chunk_per_rank)
integral = self.comm.reduce(integral_per_rank, op=self.op, root=0)
return integral if rank == 0 else None
def partitioned_data(
data: Union[Vectors, MakeVectors, MakeBounds, Bounds], size: int
) -> Union[
list[Vectors],
list[tuple[Vectors]],
list[Bounds],
]:
if data is not None:
if isinstance(data, MakeVectors):
# -> list[tuple[Vectors]]
return _split_hybrid_kvectors(data.data(), size=size)
if isinstance(data, MakeBounds):
# -> list[list[Bounds]]
return _split_hybrid_kbounds(data.data(), size=size)
# otherwise data is either Bounds or Vectors
if len(data) > 3: # this should be Vectors
k_vectors = data
# -> list[Vectors]
return split_list(k_vectors, size)
# otherwise this is k_bounds
bounds = data
# we obtain list[Bounds]:
return _default_bounds_partition(bounds, size)
return data
def _default_bounds_partition(bounds: Bounds, size: int) -> list[Bounds]:
# to roughly have a bounds list of len = size
# if size is chosen like a power of dim this would be perfect
dim = len(bounds)
side_length = int(round(size ** (1.0 / dim)))
# we recomand chose size such that size=side_length^dim.
# In this case the splitting is deterministic
# For low size, this can be very rough.
# we discretize into a list of len = side_length^dim elements
return discretize_bounds(bounds, side_length)
def _is_list_of_tuples(data: Any) -> bool:
sample = data[0]
return isinstance(sample, tuple) and isinstance(sample[0], (int, float))
def _ensure_input_is_supported(data: Any, main_type: Union[MakeBounds, MakeVectors]):
if not any((isinstance(data, main_type), _is_list_of_tuples(data))):
raise ValueError(
"k_vectors can only be An instance of MakeVectors, "
"a list or a list of k_vectors lists"
)
pass