Source code for py4mulas.mpi.kspace_partitioner

from typing import Optional

import matplotlib.pyplot as plt
import matplotlib.patches as patches
import matplotlib.cm as cm
import numpy as np

from ..models import Kmodel
from ._common import Vectors, Bounds, split_list, discretize_bounds

__all__ = ["MakeBounds", "MakeVectors"]


[docs] class MakeVectors: r"""Makes k_vectors of the a model considering centers where a denser discretization is required. Attributes: model: An instance of :class:`~py4mulas.models.Kmodel` centers: Contains the kpoints at which a zooming is wanted format: {coord:(side_length, n)} where n is the number of points along each direction coarse_n: Number of sampling points along each dimension of the coarse kspace grid. If not provided, available `k_vectors` are considered for coarse grid within which centers are discretized. """ def __init__(self, model: Kmodel, centers: dict, coarse_n: Optional[int] = None): if coarse_n is not None: if not isinstance(coarse_n, int): raise TypeError( f"coarse_n is expected to be an int but a {type(coarse_n).__name__} was given" ) self.model = model self.zone = model.zone self.bounds = model.bounds self.dim = len(self.bounds) self.centers = centers self.size = coarse_n self.local_grids = {} self.coarse_grid = None self.filtered_coarse_grid = None self.non_uniform_grid_points = [] def _generate_patch(self, center: tuple) -> np.ndarray: patch_size, patch_n = self.centers[center] axes = [ np.linspace(center[i] - patch_size, center[i] + patch_size, patch_n) for i in range(self.dim) ] grid = np.meshgrid(*axes, indexing="ij") points = np.vstack([g.ravel() for g in grid]).T # shape (N^dim, dim) if self.zone is None: return points zone_mask = np.array([self.zone(*p) for p in points]) return points[zone_mask] def _generate_coarse_grid(self) -> None: if self.size is None: points = np.asarray(self.model.k_vectors) else: axes = [ np.linspace(self.bounds[i][0], self.bounds[i][1], self.size) for i in range(self.dim) ] grid = np.meshgrid(*axes, indexing="ij") points = np.vstack([g.ravel() for g in grid]).T if self.zone is None: self.coarse_grid = points else: zone_mask = np.array([self.zone(*p) for p in points]) self.coarse_grid = points[zone_mask] def _exclude_overlap_with_coarse_grid(self) -> None: if self.coarse_grid is None: self._generate_coarse_grid() mask = np.ones(len(self.coarse_grid), dtype=bool) for box in self.local_grids.values(): min_bounds = box.min(axis=0) max_bounds = box.max(axis=0) in_box = np.ones(len(self.coarse_grid), dtype=bool) for dim_i in range(self.dim): # mask k_points belonging to the coarse region # falling into small box in all dimensions in_box &= (self.coarse_grid[:, dim_i] >= min_bounds[dim_i]) & ( self.coarse_grid[:, dim_i] <= max_bounds[dim_i] ) # mask these mask[in_box] = False # then eliminate them # only coarse points not falling in small boxes will be kept in the filter self.filtered_coarse_grid = self.coarse_grid[mask]
[docs] def data(self) -> list[np.ndarray]: """ Returns: A list of k_vectors each consisting of discretized centers in addition to the coarse grid. """ # Generate all local grids for i, center in enumerate(self.centers): pts = self._generate_patch(center) self.local_grids[i] = pts self.non_uniform_grid_points.append(pts) # Generate coarse grid and exclude patches self._generate_coarse_grid() self._exclude_overlap_with_coarse_grid() self.non_uniform_grid_points.append(self.filtered_coarse_grid) return self.non_uniform_grid_points
[docs] def view(self) -> None: if self.dim == 1: raise NotImplementedError("Vizualization is not supported for 1D grids.") kvectors = self.data() if self.dim == 2: for patch_points in kvectors: plt.scatter(patch_points[:, 0], patch_points[:, 1], s=3) elif self.dim == 3: fig = plt.figure() ax = fig.add_subplot(projection="3d") n = len(kvectors) cmap = cm.get_cmap("viridis", n) colors = [cmap(i) for i in range(n)] for i, patch_points in enumerate(kvectors): x, y, z = patch_points[:, 0], patch_points[:, 1], patch_points[:, 2] # Plot the data ax.scatter(x, y, z, c=colors[i], marker="o") plt.show()
[docs] class MakeBounds: r"""Makes a list of bounds with focus at some particular centers in kspace. This works perfectly for periodically ordered centers within model's kspace. Attributes: model: An instance of :class:`~py4mulas.models.Kmodel` centers: In the form {center:length}, with length being the half side-length of the box to be constructed around center. If centers is None, size will be used to uniformly split model's bounds. size: The number of points per dimention. Corresponds ideally to the number of processors for which bounds are discretized. Example: >>> centers = {(i, j):0.1 for i in range(-4, 5) for j in range(-3, 4)} >>> bounds = MakeBounds(model, centers, size=4) >>> bounds.view() """ def __init__(self, model: Kmodel, centers: Optional[dict] = None, size: int = 4): self.initial_bounds = model.bounds self.bounds_factory = [self.initial_bounds] self.centers = centers self.size = size self.dim = model.dim # TODO: when a part of the box falls otside the model this shoud not work # TODO: when there is overlap between boxes this may not work # bounds_factory: # One starts from model bounds in the list [initial_bounds] # we take a center and project it into the geometry check where it falls # then crealte new bounds. The intial bounds is removed (as it is splitted). Only # its childs remain within the factory. Then take the second center # it shold full in some child room then do the same process within the corresponding area # where it falls.
[docs] def update_bounds(self, box): where = self.where_it_belongs(box) if where is not None: room = where self.delete(room) # any updated room should disapear self.generate_bounds(room, box) else: # if box does not fall in the bulk of some room # it should be because they have same bounds in some directions # so the rigid bulk appartenance statment is not fulfiled. # then remove the corresponding room and recrate bounds by appropriate splitting where = self.where_it_partially_belongs(box) # in this case box is confined in some directions # so it does not respond to the rigid isinside statement # initial empty region: # a b # -------------- # |..............| before box arives # -------------- # l r # -------------- # |...|...|......| after box arrived # -------------- # in this scenario we apply a chales continuation: # new_bound = (a, l) + (r, b) # the box bounds are never considered here as they are initially known # only the bounds resulting from the holes they make are reconstructed # it is understood that the bounds in the other directions remain the same. if where is not None: room = where self.delete(room) # any updated room should disapear self.partial_generation(room, box) else: # other scenarios are not implemented: # we assume boxes should not interact: # a box should not intersect with a line parallel to other box borders. # boxes should have equal sizes. # in this case only the above scenarios are expected. raise Exception( "Boxes should not interact. " "MakeBounds expects separated non-interacting sub-regions with same size." )
[docs] def partial_generation(self, room, box): # generates bounds only in relevent directions # as room and box share bounds in some directions. directions = self.complementary_directions(room, box) # the ones intercepted by box so they should be splitted for direction_i in directions: a, b = room[direction_i] left, right = box[direction_i] extended = [(a, left), (right, b)] room = list(room) for i in range(len(extended)): room[direction_i] = extended[i] self.bounds_factory.append(tuple(room))
[docs] def generate_bounds(self, room, box): room = _padded(room, dim=self.dim) box = _padded(box, dim=self.dim) ((xm, xp), (ym, yp), (zm, zp)) = room ((xbm, xbp), (ybm, ybp), (zbm, zbp)) = box # build region around box z_constant = (zbm, zbp) # the strip surronding the box bounded by z_constant limits region1 = ((xm, xp), (ym, ybm), z_constant) # full front region2 = ((xm, xp), (ybp, yp), z_constant) # full back region3 = ((xm, xbm), (ybm, ybp), z_constant) # left poket region4 = ((xbp, xp), (ybm, ybp), z_constant) # right poket xy_constant = ((xm, xp), (ym, yp)) # region above zbp region5 = (*xy_constant, (zm, zbm)) # full lower region region6 = (*xy_constant, (zbp, zp)) # full upper region for new_bounds in [region1, region2, region3, region4, region5, region6]: if self.valid(new_bounds, room): new_bounds = new_bounds[: self.dim] # get rid of padding self.bounds_factory.append(new_bounds)
[docs] def delete(self, previous_room): # remove from factory list self.bounds_factory.remove(previous_room)
[docs] def valid(self, new_bounds, room): # new_bounds cannot be room return not np.allclose(np.array(new_bounds), np.array(room))
[docs] def where_it_belongs(self, box): # where the box falls intirely within the bulk of room without shared bounds # so bounds jumps occur in all directions around box. for room in self.bounds_factory: if self.isinside(room, box): return room
[docs] def where_it_partially_belongs(self, box): # where it falls with shared bounds (walls) for room in self.bounds_factory: if any( self.equal_in_some_directions(room, box) ) and self.valid_appartenance(room, box): return room
[docs] def isinside(self, room, box, dim=None): # rigid statement: box inside or not if not dim: dim = self.dim box_min = np.array(box)[:, 0] # (xbm, ybm, zbm) box_max = np.array(box)[:, 1] # (xbp, ybp, zbp) room_min = np.array(room)[:, 0] # (xm, ym, zm) room_max = np.array(room)[:, 1] # (xp, yp, zp) condition = all( list( room_min[i] < box_min[i] < room_max[i] and room_min[i] < box_max[i] < room_max[i] for i in range(dim) ) ) return condition
[docs] def equal_in_some_directions(self, room, box): condition = list((room[i] == box[i] for i in range(self.dim))) # returns a list of booleans, True in shared directions return condition
[docs] def valid_appartenance(self, room, box): # the appartenance of box to room is only valid # if some bounds are shared and the complementary directions # still fall within room. # Other rooms might share same bounds but situated a part from box. complementary = list((room[i] != box[i] for i in range(self.dim))) others = np.where(complementary)[0] return self.isinside(room, box, dim=len(others))
[docs] def complementary_directions(self, room, box): # corresponding complementary directions where bounds should be updated complementary = list((room[i] != box[i] for i in range(self.dim))) return np.where(complementary)[0]
[docs] def build_box_from_center(self, center): side_length = self.centers[center] return _make_box_from_center(center, side_length)
[docs] def apply(self): self.bounds_factory = [self.initial_bounds] for center in self.centers: box = self.build_box_from_center(center) self.update_bounds(box)
[docs] def boxes_factory(self): boxes = [] for center, side_length in self.centers.items(): center_bounds = _make_box_from_center(center, side_length) boxes.append(center_bounds) return boxes
[docs] def split_boxes(self): # straitforward split from centers boxes_split_data = [] for center, side_length in self.centers.items(): center_bounds = _make_box_from_center(center, side_length) splitted = _default_bounds_partition(center_bounds, self.size) boxes_split_data.append(splitted) return boxes_split_data
[docs] def split_others(self) -> list[Bounds]: # split generated bounds coarsely # in fact bounds_facory only contains coarse data # actually it extracts the centers coarse_slpit_data = [] for coarse_bounds in self.bounds_factory: splitted = _default_bounds_partition(coarse_bounds, self.size) coarse_slpit_data.append(splitted) return coarse_slpit_data
[docs] def data(self) -> list[list[Bounds]]: if self.centers is not None: self.apply() all_data_splitted = self.split_others() + self.split_boxes() return all_data_splitted return _default_bounds_partition(self.bounds, self.size)
[docs] def view(self): if self.dim != 2: raise NotImplementedError("this is only designed for 2 dimensional bounds") (xmin, xmax), (ymin, ymax) = self.initial_bounds self.apply() rectangles = [_rectngle_from_bounds(bounds) for bounds in self.bounds_factory] n = len(rectangles) fig, ax = plt.subplots() cmap = cm.get_cmap("viridis", n) colors = [cmap(i) for i in range(n)] for (x, y, w, h), color in zip(rectangles, colors): rect = patches.Rectangle( (x, y), w, h, linewidth=2, edgecolor=color, facecolor="none", alpha=0.5, ) ax.add_patch(rect) ax.set_xlim(xmin, xmax) ax.set_ylim(ymin, ymax) ax.set_aspect("equal") plt.grid(True) plt.show()
class _NonUniformVectorsSplitter: def __init__(self, k_vectors: list[Vectors], size: int = 8): # splits a non uniform k_vectors in 'size' sublists # k_vectors = [k_list_i for i in range(n)]. # The logice is : when we have a list in the form [l1, l2] # where l1 is much larger than l2, we wants the splitting # to be democratized in a way l1 gets splitted in more pieces. # size is the target size of sublists for the largest list. # Smaller len lists will deserve less sublists. # These k_vectors can be produced using MakeVectors class # n is expected to not be large # usually it is the number of special points in the Brouilloin zone # where difficulties are expected. # This class will be used to prepare the corresponding k_vectors # for being scattered in the parallelization process. # each sublist should be uniformly spaced. # TODO: raise a worning! k_vectors = sorted(k_vectors, key=len) lengths = sorted([len(li) for li in k_vectors]) # assert size <= lengths[0], 'size cannot be smaller than the length of a sublist.' # how many subdivisions every sub_list would deserve deserve = {} # deserve = {length of sub list i : number of chunks it deserves being splitted} portions = 0 # corresponds to number of times size would fit over the entire list # for instance if k_vectors = [list(range(size*5)), list(range(size*7))] # portions = 5 + 7 = 12. But in general it's not integer. for len_i in lengths: deserve_i = len_i / size portions += deserve_i portions = int(portions) ds = size / portions # elemetary portion for i in range(len(lengths)): len_i = lengths[i] deserve_i = len_i / size deserve[(i, lengths[i])] = int(_each_takes(deserve_i, ds)) self.deserve = deserve self.size = size # the wished final number of splits self.k_vectors = k_vectors def __call__(self) -> list[list[Vectors]]: k_vectors = self.k_vectors deserve = _finalize(self.deserve, size=self.size) final_list = [] for i, sub_list in enumerate(k_vectors): len_i = len(sub_list) deserved = deserve[(i, len_i)] splitted = split_list(sub_list, deserved) final_list += splitted return final_list def _default_bounds_partition(bounds: Bounds, size: int) -> list[Bounds]: # to roughly have a bounds list of len = size # if size is chosen like a power of dim this would be perfect dim = len(bounds) side_length = int(round(size ** (1.0 / dim))) # we recomand chose size such that size=side_length^dim. # In this case the splitting is deterministic # For low size, this can be very rough. # we discretize into a list of len = side_length^dim elements return discretize_bounds(bounds, side_length) def _each_takes(deserve_i, ds): return int(deserve_i) * ds def _distribute_the_excess(initial_dict, gift): dict_with_gift = initial_dict # initial gift the_gift = gift keys = list(dict_with_gift.keys()) i = 0 if gift != 0: step = 1 if gift > 0 else -1 while the_gift != 0: key = keys[i % len(keys)] dict_with_gift[key] += step the_gift -= step i += 1 return dict_with_gift def _finalize(deserve, size=89): s = sum(deserve.values()) # this should ideally correspond to size if s == size: return deserve else: excess = size - s deserve = _distribute_the_excess(deserve, excess) return deserve def _rectngle_from_bounds(bounds): ((xmin, xmax), (ymin, ymax)) = bounds return (xmin, ymin, xmax - xmin, ymax - ymin) def _padded(bounds: tuple, dim: int) -> tuple: if _is_one_d(bounds): bounds = (bounds,) padding = ((0, 0),) * (3 - dim) return tuple(bounds) + padding def _is_one_d(bounds: tuple) -> tuple: return isinstance(bounds[0], (int, float)) def _make_box_from_center(center: tuple, size: float) -> tuple: mins = np.array(center) - size maxs = np.array(center) + size bounds = zip(mins, maxs) return tuple(bounds)