Source code for py4mulas.mpi.executors

from typing import Union, Any
from .kspace_partitioner import MakeVectors, MakeBounds
from ._common import (
    Vectors,
    Bounds,
    _split_hybrid_kvectors,
    _split_hybrid_kbounds,
    split_list,
    discretize_bounds,
)

__all__ = ["KspaceExecutor"]


[docs] class KspaceExecutor: """Parallel execution over k_vectors or k_bounds Attributes: k_vectors: The momentum vectors to be shared between workers, if not specified default model k_vectors are used. k_bounds: Initial bounds to be shared between workers, if not specified default model bounds are used. Note: We recommand to use num of processors = m^dim when scipy integration is expected. When discrete kspace sum is to be performed, the splitting is anyway deterministic. """ def __init__( self, k_vectors: Union[MakeVectors, Vectors] = None, k_bounds: Union[MakeBounds, Bounds] = None, ): # executor related package availability try: from mpi4py import MPI self.comm = MPI.COMM_WORLD self.rank = self.comm.Get_rank() self.op = MPI.SUM self.size = MPI.COMM_WORLD.Get_size() except ImportError: raise ImportError("mpi is not available") # make data to be shared if (k_vectors is not None) and (k_bounds is not None): raise ValueError( "only 'k_vectors' or 'k_bounds' " "must be supplied for 'discrete' or 'scipy' methods respectively." ) self._k_vectors = k_vectors self._k_bounds = k_bounds if self._k_vectors is not None: _ensure_input_is_supported(self._k_vectors, main_type=MakeVectors) if self._k_bounds is not None: _ensure_input_is_supported(self._k_bounds, main_type=MakeBounds) @property def k_vectors(self): return self._k_vectors @k_vectors.setter def k_vectors(self, vectors): self._k_vectors = vectors @property def k_bounds(self): return self._k_bounds @k_bounds.setter def k_bounds(self, bounds): self._k_bounds = bounds @property def input_data(self): data = self.k_vectors if self.k_bounds is None else self.k_bounds return partitioned_data(data, self.size)
[docs] def __call__(self, fun): rank = self.rank # every rank gets its own data_chunk if rank == 0: input_data = self.input_data else: input_data = None chunk_per_rank = self.comm.scatter(input_data, root=0) integral_per_rank = fun(chunk_per_rank) integral = self.comm.reduce(integral_per_rank, op=self.op, root=0) return integral if rank == 0 else None
def partitioned_data( data: Union[Vectors, MakeVectors, MakeBounds, Bounds], size: int ) -> Union[ list[Vectors], list[tuple[Vectors]], list[Bounds], ]: if data is not None: if isinstance(data, MakeVectors): # -> list[tuple[Vectors]] return _split_hybrid_kvectors(data.data(), size=size) if isinstance(data, MakeBounds): # -> list[list[Bounds]] return _split_hybrid_kbounds(data.data(), size=size) # otherwise data is either Bounds or Vectors if len(data) > 3: # this should be Vectors k_vectors = data # -> list[Vectors] return split_list(k_vectors, size) # otherwise this is k_bounds bounds = data # we obtain list[Bounds]: return _default_bounds_partition(bounds, size) return data def _default_bounds_partition(bounds: Bounds, size: int) -> list[Bounds]: # to roughly have a bounds list of len = size # if size is chosen like a power of dim this would be perfect dim = len(bounds) side_length = int(round(size ** (1.0 / dim))) # we recomand chose size such that size=side_length^dim. # In this case the splitting is deterministic # For low size, this can be very rough. # we discretize into a list of len = side_length^dim elements return discretize_bounds(bounds, side_length) def _is_list_of_tuples(data: Any) -> bool: sample = data[0] return isinstance(sample, tuple) and isinstance(sample[0], (int, float)) def _ensure_input_is_supported(data: Any, main_type: Union[MakeBounds, MakeVectors]): if not any((isinstance(data, main_type), _is_list_of_tuples(data))): raise ValueError( "k_vectors can only be An instance of MakeVectors, " "a list or a list of k_vectors lists" ) pass