Source code for capytaine.tools.block_circulant_matrices

"""Implementation of block circulant matrices to be used for optimizing resolution with symmetries."""
# Copyright (C) 2025 Capytaine developers
# See LICENSE file at <https://github.com/capytaine/capytaine>

import logging
import numpy as np
from typing import List, Union
from numpy.typing import NDArray, ArrayLike
import scipy.linalg as sl

LOG = logging.getLogger(__name__)


[docs] def circular_permutation(l: List, i: int) -> List: return l[-i:] + l[:-i]
[docs] def leading_dimensions_at_the_end(a): """Transform an array of shape (n, m, ...) into (..., n, m). Invert of `leading_dimensions_at_the_end`""" return np.moveaxis(a, [0, 1], [-2, -1])
[docs] def ending_dimensions_at_the_beginning(a): """Transform an array of shape (..., n, m) into (n, m, ...). Invert of `leading_dimensions_at_the_end`""" return np.moveaxis(a, [-2, -1], [0, 1])
[docs] class BlockCirculantMatrix: """Data-sparse representation of a block matrix of the following form ( a d c b ) ( b a d c ) ( c b a d ) ( d c b a ) where a, b, c and d are matrices of the same shape. Parameters ---------- blocks: iterable of matrix-like The **first column** of blocks [a, b, c, d, ...] Each block should have the same shape. """ def __init__(self, blocks: List[ArrayLike]): self.blocks = [b for b in blocks] self.nb_blocks = len(blocks) assert all(self.blocks[0].shape == b.shape for b in self.blocks[1:]) assert all(self.blocks[0].dtype == b.dtype for b in self.blocks[1:]) self.shape = ( self.nb_blocks*self.blocks[0].shape[0], self.nb_blocks*self.blocks[0].shape[1], *self.blocks[0].shape[2:] ) self.ndim = len(self.shape) self.dtype = self.blocks[0].dtype def __array__(self, dtype=None, copy=True): if not copy: raise NotImplementedError if dtype is None: dtype = self.dtype full_blocks = [np.array(b) for b in self.blocks] # Transform all blocks to numpy arrays first_row = [full_blocks[0], *(full_blocks[1:][::-1])] if self.ndim >= 3: first_row = [leading_dimensions_at_the_end(b) for b in first_row] # Need to permute_dims to conform to `block` usage when the array is more than 2D full_matrix = np.block([[b for b in circular_permutation(first_row, i)] for i in range(self.nb_blocks)]).astype(dtype) if self.ndim >= 3: full_matrix = ending_dimensions_at_the_beginning(full_matrix) return full_matrix def __add__(self, other): if isinstance(other, BlockCirculantMatrix) and self.shape == other.shape: return BlockCirculantMatrix([a + b for (a, b) in zip(self.blocks, other.blocks)]) else: return NotImplemented def __sub__(self, other): if isinstance(other, BlockCirculantMatrix) and self.shape == other.shape: return BlockCirculantMatrix([a - b for (a, b) in zip(self.blocks, other.blocks)]) else: return NotImplemented
[docs] def block_diagonalize(self) -> "BlockDiagonalMatrix": if self.ndim == 2 and all(isinstance(b, BlockCirculantMatrix) for b in self.blocks) and self.nb_blocks == 2: a, b = self.blocks return BlockDiagonalMatrix([a + b, a - b]) elif self.ndim == 2 and all(isinstance(b, np.ndarray) for b in self.blocks): return BlockDiagonalMatrix(np.fft.fft(np.array(self.blocks), axis=0)) else: raise NotImplementedError()
[docs] def solve(self, b: np.ndarray) -> np.ndarray: LOG.debug("Called solve on %s of shape %s", self.__class__.__name__, self.shape) n = self.nb_blocks b_fft = np.fft.fft(b.reshape((n, b.shape[0]//n)), axis=0).reshape(b.shape) res_fft = self.block_diagonalize().solve(b_fft) res = np.fft.ifft(res_fft.reshape((n, b.shape[0]//n)), axis=0).reshape(b.shape) LOG.debug("Done") return res
[docs] class BlockDiagonalMatrix: """Data-sparse representation of a block matrix of the following form ( a 0 0 0 ) ( 0 b 0 0 ) ( 0 0 c 0 ) ( 0 0 0 d ) where a, b, c and d are matrices of the same shape. Parameters ---------- blocks: iterable of matrix-like The blocks [a, b, c, d, ...] """ def __init__(self, blocks): self.blocks = [b for b in blocks] self.nb_blocks = len(blocks) assert all(blocks[0].shape == b.shape for b in blocks[1:]) self.shape = ( sum(bl.shape[0] for bl in blocks), sum(bl.shape[1] for bl in blocks) ) assert all(blocks[0].dtype == b.dtype for b in blocks[1:]) self.dtype = blocks[0].dtype def __array__(self, dtype=None, copy=True): if not copy: raise NotImplementedError if dtype is None: dtype = self.dtype full_blocks = [np.array(b) for b in self.blocks] # Transform all blocks to numpy arrays if self.ndim >= 3: full_blocks = [leading_dimensions_at_the_end(b) for b in full_blocks] full_matrix = np.block([ [full_blocks[i] if i == j else np.zeros(full_blocks[i].shape) for j in range(self.nb_blocks)] for i in range(self.nb_blocks)]) if self.ndim >= 3: full_matrix = ending_dimensions_at_the_beginning(full_matrix) return full_matrix
[docs] def solve(self, b: np.ndarray) -> np.ndarray: LOG.debug("Called solve on %s of shape %s", self.__class__.__name__, self.shape) n = self.nb_blocks rhs = np.split(b, n) res = [np.linalg.solve(Ai, bi) if isinstance(Ai, np.ndarray) else Ai.solve(bi) for (Ai, bi) in zip(self.blocks, rhs)] LOG.debug("Done") return np.hstack(res)
MatrixLike = Union[np.ndarray, BlockDiagonalMatrix, BlockCirculantMatrix]
[docs] def lu_decompose(A: MatrixLike, *, overwrite_a : bool = False): if isinstance(A, np.ndarray): return LUDecomposedMatrix(A, overwrite_a=overwrite_a) elif isinstance(A, BlockDiagonalMatrix): return LUDecomposedBlockDiagonalMatrix(A, overwrite_a=overwrite_a) elif isinstance(A, BlockCirculantMatrix): return LUDecomposedBlockCirculantMatrix(A, overwrite_a=overwrite_a) else: raise NotImplementedError()
[docs] class LUDecomposedMatrix: def __init__(self, A: NDArray, *, overwrite_a : bool = False): LOG.debug("LU decomp of %s of shape %s", A.__class__.__name__, A.shape) self._lu_decomp = sl.lu_factor(A, overwrite_a=overwrite_a) self.shape = A.shape self.dtype = A.dtype
[docs] def solve(self, b: np.ndarray) -> np.ndarray: LOG.debug("Called solve on %s of shape %s", self.__class__.__name__, self.shape) return sl.lu_solve(self._lu_decomp, b)
[docs] class LUDecomposedBlockDiagonalMatrix: """LU decomposition of a BlockDiagonalMatrix, stored as the LU decomposition of each block.""" def __init__(self, bdm: BlockDiagonalMatrix, *, overwrite_a : bool = False): LOG.debug("LU decomp of %s of shape %s", bdm.__class__.__name__, bdm.shape) self._lu_decomp = [lu_decompose(bl, overwrite_a=overwrite_a) for bl in bdm.blocks] self.shape = bdm.shape self.nb_blocks = bdm.nb_blocks self.dtype = bdm.dtype
[docs] def solve(self, b: np.ndarray) -> np.ndarray: LOG.debug("Called solve on %s of shape %s", self.__class__.__name__, self.shape) rhs = np.split(b, self.nb_blocks) res = [Ai.solve(bi) for (Ai, bi) in zip(self._lu_decomp, rhs)] return np.hstack(res)
[docs] class LUDecomposedBlockCirculantMatrix: def __init__(self, bcm: BlockCirculantMatrix, *, overwrite_a : bool = False): LOG.debug("LU decomp of %s of shape %s", bcm.__class__.__name__, bcm.shape) self._lu_decomp = lu_decompose(bcm.block_diagonalize(), overwrite_a=overwrite_a) self.shape = bcm.shape self.nb_blocks = bcm.nb_blocks self.dtype = bcm.dtype
[docs] def solve(self, b: np.ndarray) -> np.ndarray: LOG.debug("Called solve on %s of shape %s", self.__class__.__name__, self.shape) n = self.nb_blocks b_fft = np.fft.fft(b.reshape((n, b.shape[0]//n)), axis=0).reshape(b.shape) res_fft = self._lu_decomp.solve(b_fft) res = np.fft.ifft(res_fft.reshape((n, b.shape[0]//n)), axis=0).reshape(b.shape) return res
LUDecomposedMatrixLike = Union[LUDecomposedMatrix, LUDecomposedBlockDiagonalMatrix, LUDecomposedBlockCirculantMatrix]
[docs] def has_been_lu_decomposed(A): # Python 3.8 does not support isinstance(A, LUDecomposedMatrixLike) return isinstance(A, (LUDecomposedMatrix, LUDecomposedBlockDiagonalMatrix, LUDecomposedBlockCirculantMatrix))