Source code for py4mulas.formulas
from abc import ABC, abstractmethod
import inspect
from typing import Optional, Union
import warnings
import numpy as np
from .models import Kmodel
from .utils import KspaceOptions
from ._common import compute_dk
__all__ = ["KuboFormula", "batcher"]
class _Prebuilder(ABC):
def __init__(self) -> None:
# initialize the store
self.have_store = False
self._store_signature = None
def _get_store_signature(self, eta: float) -> float:
"""Ensures that changing eta rebuilds everything
in case kernel depends on it
"""
return float(eta)
@abstractmethod
def _build_store(self, eta):
pass
def _ensure_bulid_store(self, eta: Optional[float] = None) -> None:
if eta is None:
if not (self.have_store):
# discard eta
self._build_store(eta)
else:
sig = self._get_store_signature(eta)
if not (self.have_store) or (self._store_signature != sig):
self._build_store(eta)
[docs]
class KuboFormula(_Prebuilder):
r"""Computes an arbitrary response formula
.. math::
\sigma = \sum_k\sum_{mn} K_{mn}(k) O_{mn}(k)
with :math:`K_{mn}` being the matrix elements of the energy kernel K
which may depend on transport properties such as :math:`\mu`, :math:`T`, :math:`\eta`
and :math:`E` but should not nvolve :math:`\psi`.
The memeber :math:`O_{mn}` is the operator kernel which may depend on :math:`\psi`.
It is assumed to depend on py4mulas operators.
This can be used to implement an arbitrary formula with tenorial contractions.
The contractions einsum (subscripts) should be provided as an argument.
This separation enables to store the operator kernel for the whole kspace.
So varying transport properties becomes quite cheap.
In cases, where these kernels have further symmetries or simplifications
we should provide the shape of the matrix :math:`O` through ``kernel_norbs``.
This is exploited in :class:`py4mulas.responses.Kubo`.
Attributes:
model: An instance of :class:`~py4mulas.models.Kmodel`
kspace_options: A dictionary specifying chunk_size and precomp.
If chunk_size is not given, a default is used.
Note that if precomp is True, the energy and temperature scans are momentum space free.
If eta is to be changed, then set precomp = False, as the kernels are eta-dependent.
kernel_norbs: The shape of the energy kernel.
If it is set at None, (norbs, norbs) is taken as shape.
The stored array will then have the shape (nk, norbs, norbs)
contractions: Einsum subscripts to be used in the formula.
If not given default 'kij,kij' contraction is taken
for the element wise product of type :math:`K_{ij} * O_{ij}`
Exemple:
>>> kspace_options = dict(chunk_size=1000, precomp=True)
>>> kernel_norbs = (3, 3, 3)
>>> contractions = 'kijl,kijl->'
>>> G = KuboFormula(some_model, kspace_options=kspace_options,
>>> kernel_norbs=kernel_norbs, contractions=contractions)
"""
def __init__(
self,
kmodel: Kmodel,
kspace_options: Optional[dict] = None,
contractions: str = None,
) -> None:
super().__init__()
self.kmodel = kmodel
self.kspace_options = kspace_options
self.dim = kmodel.dim
self.norbs = kmodel.norbs
self._k_vectors = np.asarray(kmodel.k_vectors, dtype=float)
self.bounds = kmodel.bounds
self._chunk_size = _read_from_data(kspace_options, "chunk_size")
self.precomp = _read_from_data(kspace_options, "precomp")
self.k_prefactor = (2 * np.pi) ** (1 - self.dim)
self.contractions = contractions
left, right = self.contractions.split(",")
self.kernel_norbs = (len(right) - 3) * (self.norbs,) # remove 'k' and '->'
# energies for all k
self.Ek = np.empty(self.e_shape, dtype=float)
# operator kernel for all k
self.op_kernel = np.empty(self.kernel_shape, dtype=complex)
@property
def H(self):
return self.kmodel.H
@property
def k_vectors(self):
return self._k_vectors
@k_vectors.setter
def k_vectors(self, vectors):
self._k_vectors = vectors
self.Ek = np.empty(self.e_shape, dtype=float)
self.op_kernel = np.empty(self.kernel_shape, dtype=complex)
self.have_store = False
@property
def num_k(self):
return self.k_vectors.shape[0]
@property
def e_shape(self):
return (self.num_k, self.norbs)
@property
def kernel_shape(self):
return (self.num_k,) + self.kernel_norbs
@property
def chunk_size(self):
if self._chunk_size is None:
size = self.num_k
else:
size = self._chunk_size
return size
@property
def _dk(self):
return compute_dk(self.k_vectors, self.dim)
@property
def prefactor(self):
try:
return self.k_prefactor * self._dk
except IndexError:
# single k_vectors
return self.k_prefactor
def _build_store(self, eta):
"""Precompute and store operator kernel with energies."""
# TODO: separate _build store into eta dependent and independent
k_vectors = self.k_vectors
num_k = self.num_k
dim = self.dim
chunk_size = self.chunk_size
for start in range(0, num_k, chunk_size):
end = start + chunk_size
k_chunk = k_vectors[start:end] # shape: (chunk_size, dim)
k_args = [k_chunk[:, d] for d in range(dim)]
# TODO: check main_kernel taking eta=None is always clean
self.op_kernel[start:end], self.Ek[start:end] = self.main_kernel(
k_args, eta
)
self.have_store = True
if eta is not None:
self._store_signature = self._get_store_signature(eta)
[docs]
@abstractmethod
def energy_kernel(self) -> list:
r"""The kernel which can be called with :math:`E`, :math:`\mu` and :math:`T`.
Typically a distribution function.
But can also be :math:`\eta` dependent.
Returns:
A list of :class:`~py4mulas.energy_kernels.KuboKernel`
"""
# kernel K
pass
[docs]
@abstractmethod
def main_kernel(self, k_args: np.ndarray, eta: float):
"""The main kernal of the formula, which involves the operators product.
Args:
k_args: momentum arguments
eta: mroadening
"""
# kernel O
pass
[docs]
def integrand(
self,
*k: Union[list, tuple, np.ndarray],
mu: float,
temperature: float,
eta: float,
) -> complex:
"""unvectorized integrand, for single k evaluation"""
prefactor = self.k_prefactor
k_args = [[k[d]] for d in range(self.dim)]
op_kernel, en = self.main_kernel(k_args, eta)
sum_terms = 0
for kernel_term in self.energy_kernel():
contractions = kernel_term.contractions
e_kernel = self.energy_kernel(en, mu, temperature, eta)
sum_terms += np.einsum(contractions, e_kernel, op_kernel)
return prefactor * sum_terms
[docs]
def __call__(
self,
mu: float = 0.0,
temperature: float = 0.0,
eta: float = 0.0,
k_resolved: bool = False,
) -> complex:
r"""Computes the response fomula at :math:`\mu`, :math:`T` and :math:`\eta`.
Args:
mu: Chemical potential.
temperature: Temperature.
eta: Broadening (the infinitesimal parameter).
k_resolved: Specifies whether we want a summed transport response
or momentum resolved.
Returns:
A numpy arry, if k_resolved is `True`
or a complex number if it is `False`.
"""
prefactor = self.prefactor
if k_resolved:
if self.kspace_options is not None:
warnings.warn("k_resolved is not compatible with kspace_options")
# as k_space resolved would only be True for representation purposes
# no variation of energy and temperature is expected.
# In this case precomping is not a big deal.
full_k_args = [self.k_vectors[:, d] for d in range(self.dim)]
op_kernel, Ek = self.main_kernel(full_k_args, eta)
sum_terms = 0
for kernel_term in self.energy_kernel():
contractions = kernel_term.contractions
if contractions[-1] == ">":
# then we should add k for k_resolved
contractions += "k"
# keep first axis (momentum)
# otherwise the proveide contraction should
# appropriately handle the momentum indice ("k")
# if k_resolved = False, this distingtion does not really matter.
# TODO: add a contractions input checker!
e_kernel = kernel_term(Ek, mu, temperature, eta)
sum_terms += np.einsum(contractions, e_kernel, op_kernel)
return sum_terms
if self.precomp:
if _is_eta_independent(self.energy_kernel()):
# then in contrast main_kernel should be eta dependent.
# their will be no precomputation advantage for varied eta.
# But is good for varying \mu and T.
self._ensure_bulid_store(eta)
else:
# main_kernel is eta independent and precomputation
# can be frozen regardless of eta values.
# this is disigned to enable eta independent precomputation
# when an external energy_kernel (eta dependent) is provided.
# This is a good compromise as one can still:
# (i) use reduced opera_kernel but with eta dependent precomputation
# which is fast for varying \mu and T,
# slow for variying eta. See above.
# (ii) use unoptimized opera_kernel but with eta independent energy_kernels.
# This should be faster when eta is to be varied.
self._ensure_bulid_store(eta=None)
dim = self.dim
chunk_size = self.chunk_size
num_k = self.num_k
k_vectors = self.k_vectors
chunk_sum = 0
for kernel_term in self.energy_kernel():
contractions = kernel_term.contractions
for start in range(0, num_k, chunk_size):
end = start + chunk_size
if self.precomp:
e_kernel = kernel_term(self.Ek[start:end], mu, temperature, eta)
opera_kernel = self.op_kernel[start:end]
else:
k_chunk = k_vectors[start:end] # shape: (chunk_size, dim)
k_args = [k_chunk[:, d] for d in range(dim)]
opera_kernel, Ek = self.main_kernel(k_args, eta)
e_kernel = kernel_term(Ek, mu, temperature, eta)
chunk_sum += np.einsum(
contractions, e_kernel, opera_kernel, optimize=True
)
return prefactor * chunk_sum
[docs]
def batcher(formula: KuboFormula, data: list[tuple], n: int = 10) -> np.ndarray:
r"""Computes formula, partitioning the kspace into `n` batches, for a full set of `data`.
This can be used when the kspace is too large, to reduce memory load and still
use precomp.
Args:
formula: An instance of :class:`~py4mulas.formulas.KuboFormula`
or :class:`~py4mulas.responses.Kubo`
data: List of tuples in the form of (:math:`\mu`, :math:`T`, :math:`\eta`)
to be passed as arguments to formula.
n: Number of wanted kspace batches
Raises:
ValueError: When precomp is set to `False`
Returns:
An array in the same order of the ``data``
"""
if not formula.precomp:
raise ValueError("partitioner only works for precomptuing formulas.")
k_vectors = np.asarray(formula.kmodel.k_vectors, dtype="float64")
# TODO: make k_vectors array like at level of Kmodel
splitted = _split(k_vectors, n)
response = 0
for sub_kvectors in splitted:
formula.k_vectors = sub_kvectors
sub_result = np.empty(len(data), dtype="float64")
for i, elem in enumerate(data):
mu_T_eta = dict(zip(("mu", "temperature", "eta"), elem))
sub_result[i] = formula(**mu_T_eta)
response += sub_result
return response
def _split(k_vectors: np.ndarray, n: int) -> list[np.ndarray]:
"""Splits k_vectors into n batches
Args:
k_vectors: Array of momentum vectors
n: Number of batches
Returns:
A list of `n` arrays
"""
return np.array_split(k_vectors, n)
def _read_from_data(
data: Union[dict, KspaceOptions], param: str
) -> Union[bool, int, tuple, None]:
"""Reads the value of ``param`` from a dictionary or a dataclass.
Args:
param: The param we read
data: The input data from which we read
Returns:
The values are booleans for `precomplute`,
int for `chunk_size`
"""
if data is None:
data = dict(precomp=False, chunk_size=None)
if isinstance(data, dict):
data = KspaceOptions(**data)
elif not isinstance(data, KspaceOptions):
raise TypeError("unrecognized input data")
return getattr(data, param)
def _is_eta_independent(energy_kernels):
for kernel in energy_kernels:
sig = inspect.signature(kernel)
params = sig.parameters
if "eta" in params:
default = params["eta"].default
if default is inspect.Parameter.empty or default is not None:
return False
return True