from typing import Optional
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import matplotlib.cm as cm
import numpy as np
from ..models import Kmodel
from ._common import Vectors, Bounds, split_list, discretize_bounds
__all__ = ["MakeBounds", "MakeVectors"]
[docs]
class MakeVectors:
r"""Makes k_vectors of the a model considering centers
where a denser discretization is required.
Attributes:
model: An instance of :class:`~py4mulas.models.Kmodel`
centers: Contains the kpoints at which a zooming is wanted
format: {coord:(side_length, n)} where n is the number of points along each direction
coarse_n: Number of sampling points along each dimension of the coarse kspace grid.
If not provided, available `k_vectors` are considered for coarse grid
within which centers are discretized.
"""
def __init__(self, model: Kmodel, centers: dict, coarse_n: Optional[int] = None):
if coarse_n is not None:
if not isinstance(coarse_n, int):
raise TypeError(
f"coarse_n is expected to be an int but a {type(coarse_n).__name__} was given"
)
self.model = model
self.zone = model.zone
self.bounds = model.bounds
self.dim = len(self.bounds)
self.centers = centers
self.size = coarse_n
self.local_grids = {}
self.coarse_grid = None
self.filtered_coarse_grid = None
self.non_uniform_grid_points = []
def _generate_patch(self, center: tuple) -> np.ndarray:
patch_size, patch_n = self.centers[center]
axes = [
np.linspace(center[i] - patch_size, center[i] + patch_size, patch_n)
for i in range(self.dim)
]
grid = np.meshgrid(*axes, indexing="ij")
points = np.vstack([g.ravel() for g in grid]).T # shape (N^dim, dim)
if self.zone is None:
return points
zone_mask = np.array([self.zone(*p) for p in points])
return points[zone_mask]
def _generate_coarse_grid(self) -> None:
if self.size is None:
points = np.asarray(self.model.k_vectors)
else:
axes = [
np.linspace(self.bounds[i][0], self.bounds[i][1], self.size)
for i in range(self.dim)
]
grid = np.meshgrid(*axes, indexing="ij")
points = np.vstack([g.ravel() for g in grid]).T
if self.zone is None:
self.coarse_grid = points
else:
zone_mask = np.array([self.zone(*p) for p in points])
self.coarse_grid = points[zone_mask]
def _exclude_overlap_with_coarse_grid(self) -> None:
if self.coarse_grid is None:
self._generate_coarse_grid()
mask = np.ones(len(self.coarse_grid), dtype=bool)
for box in self.local_grids.values():
min_bounds = box.min(axis=0)
max_bounds = box.max(axis=0)
in_box = np.ones(len(self.coarse_grid), dtype=bool)
for dim_i in range(self.dim):
# mask k_points belonging to the coarse region
# falling into small box in all dimensions
in_box &= (self.coarse_grid[:, dim_i] >= min_bounds[dim_i]) & (
self.coarse_grid[:, dim_i] <= max_bounds[dim_i]
)
# mask these
mask[in_box] = False
# then eliminate them
# only coarse points not falling in small boxes will be kept in the filter
self.filtered_coarse_grid = self.coarse_grid[mask]
[docs]
def data(self) -> list[np.ndarray]:
"""
Returns:
A list of k_vectors each consisting of discretized centers
in addition to the coarse grid.
"""
# Generate all local grids
for i, center in enumerate(self.centers):
pts = self._generate_patch(center)
self.local_grids[i] = pts
self.non_uniform_grid_points.append(pts)
# Generate coarse grid and exclude patches
self._generate_coarse_grid()
self._exclude_overlap_with_coarse_grid()
self.non_uniform_grid_points.append(self.filtered_coarse_grid)
return self.non_uniform_grid_points
[docs]
def view(self) -> None:
if self.dim == 1:
raise NotImplementedError("Vizualization is not supported for 1D grids.")
kvectors = self.data()
if self.dim == 2:
for patch_points in kvectors:
plt.scatter(patch_points[:, 0], patch_points[:, 1], s=3)
elif self.dim == 3:
fig = plt.figure()
ax = fig.add_subplot(projection="3d")
n = len(kvectors)
cmap = cm.get_cmap("viridis", n)
colors = [cmap(i) for i in range(n)]
for i, patch_points in enumerate(kvectors):
x, y, z = patch_points[:, 0], patch_points[:, 1], patch_points[:, 2]
# Plot the data
ax.scatter(x, y, z, c=colors[i], marker="o")
plt.show()
[docs]
class MakeBounds:
r"""Makes a list of bounds with focus at some particular centers in kspace.
This works perfectly for periodically ordered centers within model's kspace.
Attributes:
model: An instance of :class:`~py4mulas.models.Kmodel`
centers: In the form {center:length},
with length being the half side-length of the box to be constructed around center.
If centers is None, size will be used to uniformly split model's bounds.
size: The number of points per dimention. Corresponds ideally to the number of processors for which
bounds are discretized.
Example:
>>> centers = {(i, j):0.1 for i in range(-4, 5) for j in range(-3, 4)}
>>> bounds = MakeBounds(model, centers, size=4)
>>> bounds.view()
"""
def __init__(self, model: Kmodel, centers: Optional[dict] = None, size: int = 4):
self.initial_bounds = model.bounds
self.bounds_factory = [self.initial_bounds]
self.centers = centers
self.size = size
self.dim = model.dim
# TODO: when a part of the box falls otside the model this shoud not work
# TODO: when there is overlap between boxes this may not work
# bounds_factory:
# One starts from model bounds in the list [initial_bounds]
# we take a center and project it into the geometry check where it falls
# then crealte new bounds. The intial bounds is removed (as it is splitted). Only
# its childs remain within the factory. Then take the second center
# it shold full in some child room then do the same process within the corresponding area
# where it falls.
[docs]
def update_bounds(self, box):
where = self.where_it_belongs(box)
if where is not None:
room = where
self.delete(room) # any updated room should disapear
self.generate_bounds(room, box)
else:
# if box does not fall in the bulk of some room
# it should be because they have same bounds in some directions
# so the rigid bulk appartenance statment is not fulfiled.
# then remove the corresponding room and recrate bounds by appropriate splitting
where = self.where_it_partially_belongs(box)
# in this case box is confined in some directions
# so it does not respond to the rigid isinside statement
# initial empty region:
# a b
# --------------
# |..............| before box arives
# --------------
# l r
# --------------
# |...|...|......| after box arrived
# --------------
# in this scenario we apply a chales continuation:
# new_bound = (a, l) + (r, b)
# the box bounds are never considered here as they are initially known
# only the bounds resulting from the holes they make are reconstructed
# it is understood that the bounds in the other directions remain the same.
if where is not None:
room = where
self.delete(room) # any updated room should disapear
self.partial_generation(room, box)
else:
# other scenarios are not implemented:
# we assume boxes should not interact:
# a box should not intersect with a line parallel to other box borders.
# boxes should have equal sizes.
# in this case only the above scenarios are expected.
raise Exception(
"Boxes should not interact. "
"MakeBounds expects separated non-interacting sub-regions with same size."
)
[docs]
def partial_generation(self, room, box):
# generates bounds only in relevent directions
# as room and box share bounds in some directions.
directions = self.complementary_directions(room, box)
# the ones intercepted by box so they should be splitted
for direction_i in directions:
a, b = room[direction_i]
left, right = box[direction_i]
extended = [(a, left), (right, b)]
room = list(room)
for i in range(len(extended)):
room[direction_i] = extended[i]
self.bounds_factory.append(tuple(room))
[docs]
def generate_bounds(self, room, box):
room = _padded(room, dim=self.dim)
box = _padded(box, dim=self.dim)
((xm, xp), (ym, yp), (zm, zp)) = room
((xbm, xbp), (ybm, ybp), (zbm, zbp)) = box
# build region around box
z_constant = (zbm, zbp)
# the strip surronding the box bounded by z_constant limits
region1 = ((xm, xp), (ym, ybm), z_constant) # full front
region2 = ((xm, xp), (ybp, yp), z_constant) # full back
region3 = ((xm, xbm), (ybm, ybp), z_constant) # left poket
region4 = ((xbp, xp), (ybm, ybp), z_constant) # right poket
xy_constant = ((xm, xp), (ym, yp))
# region above zbp
region5 = (*xy_constant, (zm, zbm)) # full lower region
region6 = (*xy_constant, (zbp, zp)) # full upper region
for new_bounds in [region1, region2, region3, region4, region5, region6]:
if self.valid(new_bounds, room):
new_bounds = new_bounds[: self.dim] # get rid of padding
self.bounds_factory.append(new_bounds)
[docs]
def delete(self, previous_room):
# remove from factory list
self.bounds_factory.remove(previous_room)
[docs]
def valid(self, new_bounds, room):
# new_bounds cannot be room
return not np.allclose(np.array(new_bounds), np.array(room))
[docs]
def where_it_belongs(self, box):
# where the box falls intirely within the bulk of room without shared bounds
# so bounds jumps occur in all directions around box.
for room in self.bounds_factory:
if self.isinside(room, box):
return room
[docs]
def where_it_partially_belongs(self, box):
# where it falls with shared bounds (walls)
for room in self.bounds_factory:
if any(
self.equal_in_some_directions(room, box)
) and self.valid_appartenance(room, box):
return room
[docs]
def isinside(self, room, box, dim=None):
# rigid statement: box inside or not
if not dim:
dim = self.dim
box_min = np.array(box)[:, 0] # (xbm, ybm, zbm)
box_max = np.array(box)[:, 1] # (xbp, ybp, zbp)
room_min = np.array(room)[:, 0] # (xm, ym, zm)
room_max = np.array(room)[:, 1] # (xp, yp, zp)
condition = all(
list(
room_min[i] < box_min[i] < room_max[i]
and room_min[i] < box_max[i] < room_max[i]
for i in range(dim)
)
)
return condition
[docs]
def equal_in_some_directions(self, room, box):
condition = list((room[i] == box[i] for i in range(self.dim)))
# returns a list of booleans, True in shared directions
return condition
[docs]
def valid_appartenance(self, room, box):
# the appartenance of box to room is only valid
# if some bounds are shared and the complementary directions
# still fall within room.
# Other rooms might share same bounds but situated a part from box.
complementary = list((room[i] != box[i] for i in range(self.dim)))
others = np.where(complementary)[0]
return self.isinside(room, box, dim=len(others))
[docs]
def complementary_directions(self, room, box):
# corresponding complementary directions where bounds should be updated
complementary = list((room[i] != box[i] for i in range(self.dim)))
return np.where(complementary)[0]
[docs]
def build_box_from_center(self, center):
side_length = self.centers[center]
return _make_box_from_center(center, side_length)
[docs]
def apply(self):
self.bounds_factory = [self.initial_bounds]
for center in self.centers:
box = self.build_box_from_center(center)
self.update_bounds(box)
[docs]
def boxes_factory(self):
boxes = []
for center, side_length in self.centers.items():
center_bounds = _make_box_from_center(center, side_length)
boxes.append(center_bounds)
return boxes
[docs]
def split_boxes(self):
# straitforward split from centers
boxes_split_data = []
for center, side_length in self.centers.items():
center_bounds = _make_box_from_center(center, side_length)
splitted = _default_bounds_partition(center_bounds, self.size)
boxes_split_data.append(splitted)
return boxes_split_data
[docs]
def split_others(self) -> list[Bounds]:
# split generated bounds coarsely
# in fact bounds_facory only contains coarse data
# actually it extracts the centers
coarse_slpit_data = []
for coarse_bounds in self.bounds_factory:
splitted = _default_bounds_partition(coarse_bounds, self.size)
coarse_slpit_data.append(splitted)
return coarse_slpit_data
[docs]
def data(self) -> list[list[Bounds]]:
if self.centers is not None:
self.apply()
all_data_splitted = self.split_others() + self.split_boxes()
return all_data_splitted
return _default_bounds_partition(self.bounds, self.size)
[docs]
def view(self):
if self.dim != 2:
raise NotImplementedError("this is only designed for 2 dimensional bounds")
(xmin, xmax), (ymin, ymax) = self.initial_bounds
self.apply()
rectangles = [_rectngle_from_bounds(bounds) for bounds in self.bounds_factory]
n = len(rectangles)
fig, ax = plt.subplots()
cmap = cm.get_cmap("viridis", n)
colors = [cmap(i) for i in range(n)]
for (x, y, w, h), color in zip(rectangles, colors):
rect = patches.Rectangle(
(x, y),
w,
h,
linewidth=2,
edgecolor=color,
facecolor="none",
alpha=0.5,
)
ax.add_patch(rect)
ax.set_xlim(xmin, xmax)
ax.set_ylim(ymin, ymax)
ax.set_aspect("equal")
plt.grid(True)
plt.show()
class _NonUniformVectorsSplitter:
def __init__(self, k_vectors: list[Vectors], size: int = 8):
# splits a non uniform k_vectors in 'size' sublists
# k_vectors = [k_list_i for i in range(n)].
# The logice is : when we have a list in the form [l1, l2]
# where l1 is much larger than l2, we wants the splitting
# to be democratized in a way l1 gets splitted in more pieces.
# size is the target size of sublists for the largest list.
# Smaller len lists will deserve less sublists.
# These k_vectors can be produced using MakeVectors class
# n is expected to not be large
# usually it is the number of special points in the Brouilloin zone
# where difficulties are expected.
# This class will be used to prepare the corresponding k_vectors
# for being scattered in the parallelization process.
# each sublist should be uniformly spaced.
# TODO: raise a worning!
k_vectors = sorted(k_vectors, key=len)
lengths = sorted([len(li) for li in k_vectors])
# assert size <= lengths[0], 'size cannot be smaller than the length of a sublist.'
# how many subdivisions every sub_list would deserve
deserve = {}
# deserve = {length of sub list i : number of chunks it deserves being splitted}
portions = 0
# corresponds to number of times size would fit over the entire list
# for instance if k_vectors = [list(range(size*5)), list(range(size*7))]
# portions = 5 + 7 = 12. But in general it's not integer.
for len_i in lengths:
deserve_i = len_i / size
portions += deserve_i
portions = int(portions)
ds = size / portions # elemetary portion
for i in range(len(lengths)):
len_i = lengths[i]
deserve_i = len_i / size
deserve[(i, lengths[i])] = int(_each_takes(deserve_i, ds))
self.deserve = deserve
self.size = size # the wished final number of splits
self.k_vectors = k_vectors
def __call__(self) -> list[list[Vectors]]:
k_vectors = self.k_vectors
deserve = _finalize(self.deserve, size=self.size)
final_list = []
for i, sub_list in enumerate(k_vectors):
len_i = len(sub_list)
deserved = deserve[(i, len_i)]
splitted = split_list(sub_list, deserved)
final_list += splitted
return final_list
def _default_bounds_partition(bounds: Bounds, size: int) -> list[Bounds]:
# to roughly have a bounds list of len = size
# if size is chosen like a power of dim this would be perfect
dim = len(bounds)
side_length = int(round(size ** (1.0 / dim)))
# we recomand chose size such that size=side_length^dim.
# In this case the splitting is deterministic
# For low size, this can be very rough.
# we discretize into a list of len = side_length^dim elements
return discretize_bounds(bounds, side_length)
def _each_takes(deserve_i, ds):
return int(deserve_i) * ds
def _distribute_the_excess(initial_dict, gift):
dict_with_gift = initial_dict
# initial gift
the_gift = gift
keys = list(dict_with_gift.keys())
i = 0
if gift != 0:
step = 1 if gift > 0 else -1
while the_gift != 0:
key = keys[i % len(keys)]
dict_with_gift[key] += step
the_gift -= step
i += 1
return dict_with_gift
def _finalize(deserve, size=89):
s = sum(deserve.values()) # this should ideally correspond to size
if s == size:
return deserve
else:
excess = size - s
deserve = _distribute_the_excess(deserve, excess)
return deserve
def _rectngle_from_bounds(bounds):
((xmin, xmax), (ymin, ymax)) = bounds
return (xmin, ymin, xmax - xmin, ymax - ymin)
def _padded(bounds: tuple, dim: int) -> tuple:
if _is_one_d(bounds):
bounds = (bounds,)
padding = ((0, 0),) * (3 - dim)
return tuple(bounds) + padding
def _is_one_d(bounds: tuple) -> tuple:
return isinstance(bounds[0], (int, float))
def _make_box_from_center(center: tuple, size: float) -> tuple:
mins = np.array(center) - size
maxs = np.array(center) + size
bounds = zip(mins, maxs)
return tuple(bounds)