Source code for capytaine.bem.solver

# Copyright (C) 2017-2024 Matthieu Ancellin
# See LICENSE file at <https://github.com/capytaine/capytaine>
"""Solver for the BEM problem.

.. code-block:: python

    problem = RadiationProblem(...)
    result = BEMSolver(green_functions=..., engine=...).solve(problem)

"""

import os
import logging

import numpy as np
import pandas as pd

from datetime import datetime

from rich.progress import track

from capytaine.bem.problems_and_results import LinearPotentialFlowProblem, DiffractionProblem
from capytaine.green_functions.delhommeau import Delhommeau
from capytaine.bem.engines import BasicMatrixEngine
from capytaine.io.xarray import problems_from_dataset, assemble_dataset, kochin_data_array
from capytaine.tools.optional_imports import silently_import_optional_dependency
from capytaine.tools.lists_of_points import _normalize_points, _normalize_free_surface_points
from capytaine.tools.symbolic_multiplication import supporting_symbolic_multiplication
from capytaine.tools.timer import Timer

LOG = logging.getLogger(__name__)

[docs] class BEMSolver: """ Solver for linear potential flow problems. Parameters ---------- green_function: AbstractGreenFunction, optional Object handling the computation of the Green function. (default: :class:`~capytaine.green_function.delhommeau.Delhommeau`) engine: MatrixEngine, optional Object handling the building of matrices and the resolution of linear systems with these matrices. (default: :class:`~capytaine.bem.engines.BasicMatrixEngine`) method: string, optional select boundary integral equation used to solve the problems. Accepted values: "indirect" (as in e.g. Nemoh), "direct" (as in e.g. WAMIT) Default value: "indirect" Attributes ---------- timer: dict[str, Timer] Storing the time spent on each subtasks of the resolution exportable_settings : dict Settings of the solver that can be saved to reinit the same solver later. """ def __init__(self, *, green_function=None, engine=None, method="indirect"): self.green_function = Delhommeau() if green_function is None else green_function self.engine = BasicMatrixEngine() if engine is None else engine if method.lower() not in {"direct", "indirect"}: raise ValueError(f"Unrecognized method when initializing solver: {repr(method)}. Expected \"direct\" or \"indirect\".") self.method = method.lower() self.timer = {"Solve total": Timer(), " Green function": Timer(), " Linear solver": Timer()} self.solve = self.timer["Solve total"].wraps_function(self.solve) try: self.exportable_settings = { **self.green_function.exportable_settings, **self.engine.exportable_settings, "method": self.method, } except AttributeError: self.exportable_settings = {} def __str__(self): return f"BEMSolver(engine={self.engine}, green_function={self.green_function})" def __repr__(self): return self.__str__()
[docs] def timer_summary(self): return pd.DataFrame([ { "task": name, "total": self.timer[name].total, "nb_calls": self.timer[name].nb_timings, "mean": self.timer[name].mean } for name in self.timer]).set_index("task")
def _repr_pretty_(self, p, cycle): p.text(self.__str__())
[docs] @classmethod def from_exported_settings(settings): raise NotImplementedError
[docs] def solve(self, problem, method=None, keep_details=True, _check_wavelength=True): """Solve the linear potential flow problem. Parameters ---------- problem: LinearPotentialFlowProblem the problem to be solved method: string, optional select boundary integral equation used to solve the problem. It is recommended to set the method more globally when initializing the solver. If provided here, the value in argument of `solve` overrides the global one. keep_details: bool, optional if True, store the sources and the potential on the floating body in the output object (default: True) _check_wavelength: bool, optional (default: True) If True, the frequencies are compared to the mesh resolution and the estimated first irregular frequency to warn the user. Returns ------- LinearPotentialFlowResult an object storing the problem data and its results """ LOG.info("Solve %s.", problem) if _check_wavelength: self._check_wavelength_and_mesh_resolution([problem]) self._check_wavelength_and_irregular_frequencies([problem]) if isinstance(problem, DiffractionProblem) and float(problem.encounter_omega) in {0.0, np.inf}: raise ValueError("Diffraction problems at zero or infinite frequency are not defined") # This error used to be raised when initializing the problem. # It is now raised here, in order to be catchable by # _solve_and_catch_errors, such that batch resolution # can include this kind of problems without the full batch # failing. # Note that if this error was not raised here, the resolution # would still fail with a less explicit error message. if problem.forward_speed != 0.0: omega, wavenumber = problem.encounter_omega, problem.encounter_wavenumber else: omega, wavenumber = problem.omega, problem.wavenumber linear_solver = supporting_symbolic_multiplication(self.engine.linear_solver) method = method if method is not None else self.method if (method == 'direct'): if problem.forward_speed != 0.0: raise NotImplementedError("Direct solver is not able to solve problems with forward speed.") with self.timer[" Green function"]: S, D = self.engine.build_matrices( problem.body.mesh_including_lid, problem.body.mesh_including_lid, problem.free_surface, problem.water_depth, wavenumber, self.green_function, adjoint_double_layer=False ) rhs = S @ problem.boundary_condition with self.timer[" Linear solver"]: potential = linear_solver(D, rhs) if not potential.shape == problem.boundary_condition.shape: raise ValueError(f"Error in linear solver of {self.engine}: the shape of the output ({potential.shape}) " f"does not match the expected shape ({problem.boundary_condition.shape})") pressure = 1j * omega * problem.rho * potential sources = None else: with self.timer[" Green function"]: S, K = self.engine.build_matrices( problem.body.mesh_including_lid, problem.body.mesh_including_lid, problem.free_surface, problem.water_depth, wavenumber, self.green_function, adjoint_double_layer=True ) with self.timer[" Linear solver"]: sources = linear_solver(K, problem.boundary_condition) if not sources.shape == problem.boundary_condition.shape: raise ValueError(f"Error in linear solver of {self.engine}: the shape of the output ({sources.shape}) " f"does not match the expected shape ({problem.boundary_condition.shape})") potential = S @ sources pressure = 1j * omega * problem.rho * potential if problem.forward_speed != 0.0: result = problem.make_results_container(sources=sources) # Temporary result object to compute the ∇Φ term nabla_phi = self._compute_potential_gradient(problem.body.mesh_including_lid, result) pressure += problem.rho * problem.forward_speed * nabla_phi[:, 0] pressure_on_hull = pressure[:problem.body.mesh.nb_faces] # Discards pressure on lid if any forces = problem.body.integrate_pressure(pressure_on_hull) if not keep_details: result = problem.make_results_container(forces) else: result = problem.make_results_container(forces, sources, potential, pressure) LOG.debug("Done!") return result
def _solve_and_catch_errors(self, problem, *args, **kwargs): """Same as BEMSolver.solve() but returns a FailedLinearPotentialFlowResult when the resolution failed.""" try: res = self.solve(problem, *args, **kwargs) except Exception as e: LOG.info(f"Skipped {problem}\nbecause of {repr(e)}") res = problem.make_failed_results_container(e) return res
[docs] def solve_all(self, problems, *, method=None, n_jobs=1, progress_bar=None, _check_wavelength=True, **kwargs): """Solve several problems. Optional keyword arguments are passed to `BEMSolver.solve`. Parameters ---------- problems: list of LinearPotentialFlowProblem several problems to be solved method: string, optional select boundary integral equation used to solve the problems. It is recommended to set the method more globally when initializing the solver. If provided here, the value in argument of `solve_all` overrides the global one. n_jobs: int, optional (default: 1) the number of jobs to run in parallel using the optional dependency `joblib` By defaults: do not use joblib and solve sequentially. progress_bar: bool, optional Display a progress bar while solving. If no value is provided to this method directly, check whether the environment variable `CAPYTAINE_PROGRESS_BAR` is defined and otherwise default to True. _check_wavelength: bool, optional (default: True) If True, the frequencies are compared to the mesh resolution and the estimated first irregular frequency to warn the user. Returns ------- list of LinearPotentialFlowResult the solved problems """ if _check_wavelength: self._check_wavelength_and_mesh_resolution(problems) self._check_wavelength_and_irregular_frequencies(problems) if progress_bar is None: if "CAPYTAINE_PROGRESS_BAR" in os.environ: env_var = os.environ["CAPYTAINE_PROGRESS_BAR"].lower() if env_var in {'true', '1', 't'}: progress_bar = True elif env_var in {'false', '0', 'f'}: progress_bar = False else: raise ValueError("Invalid value '{}' for the environment variable CAPYTAINE_PROGRESS_BAR.".format(os.environ["CAPYTAINE_PROGRESS_BAR"])) else: progress_bar = True if n_jobs == 1: # force sequential resolution problems = sorted(problems) if progress_bar: problems = track(problems, total=len(problems), description="Solving BEM problems") results = [self._solve_and_catch_errors(pb, method=method, _check_wavelength=False, **kwargs) for pb in problems] else: joblib = silently_import_optional_dependency("joblib") if joblib is None: raise ImportError(f"Setting the `n_jobs` argument to {n_jobs} requires the missing optional dependency 'joblib'.") groups_of_problems = LinearPotentialFlowProblem._group_for_parallel_resolution(problems) parallel = joblib.Parallel(return_as="generator", n_jobs=n_jobs) groups_of_results = parallel(joblib.delayed(self.solve_all)(grp, method=method, n_jobs=1, progress_bar=False, _check_wavelength=False, **kwargs) for grp in groups_of_problems) if progress_bar: groups_of_results = track(groups_of_results, total=len(groups_of_problems), description=f"Solving BEM problems with {n_jobs} threads:") results = [res for grp in groups_of_results for res in grp] # flatten the nested list LOG.info("Solver timer summary:\n%s", self.timer_summary()) return results
@staticmethod def _check_wavelength_and_mesh_resolution(problems): """Display a warning if some of the problems have a mesh resolution that might not be sufficient for the given wavelength.""" LOG.debug("Check wavelength with mesh resolution.") risky_problems = [pb for pb in problems if 0.0 < pb.wavelength < pb.body.minimal_computable_wavelength] nb_risky_problems = len(risky_problems) if nb_risky_problems == 1: pb = risky_problems[0] freq_type = risky_problems[0].provided_freq_type freq = pb.__getattribute__(freq_type) LOG.warning(f"Mesh resolution for {pb}:\n" f"The resolution of the mesh of the body {pb.body.__short_str__()} might " f"be insufficient for {freq_type}={freq}.\n" "This warning appears because the largest panel of this mesh " f"has radius {pb.body.mesh.faces_radiuses.max():.3f} > wavelength/8." ) elif nb_risky_problems > 1: freq_type = risky_problems[0].provided_freq_type freqs = np.array([float(pb.__getattribute__(freq_type)) for pb in risky_problems]) LOG.warning(f"Mesh resolution for {nb_risky_problems} problems:\n" "The resolution of the mesh might be insufficient " f"for {freq_type} ranging from {freqs.min():.3f} to {freqs.max():.3f}.\n" "This warning appears when the largest panel of this mesh " "has radius > wavelength/8." ) @staticmethod def _check_wavelength_and_irregular_frequencies(problems): """Display a warning if some of the problems might encounter irregular frequencies.""" LOG.debug("Check wavelength with estimated irregular frequency.") risky_problems = [pb for pb in problems if pb.free_surface != np.inf and pb.body.first_irregular_frequency_estimate(g=pb.g) < pb.omega < np.inf] nb_risky_problems = len(risky_problems) if nb_risky_problems >= 1: if any(pb.body.lid_mesh is None for pb in problems): recommendation = "Setting a lid for the floating body is recommended." else: recommendation = "The lid might need to be closer to the free surface." if nb_risky_problems == 1: pb = risky_problems[0] freq_type = risky_problems[0].provided_freq_type freq = pb.__getattribute__(freq_type) LOG.warning(f"Irregular frequencies for {pb}:\n" f"The body {pb.body.__short_str__()} might display irregular frequencies " f"for {freq_type}={freq}.\n" + recommendation ) elif nb_risky_problems > 1: freq_type = risky_problems[0].provided_freq_type freqs = np.array([float(pb.__getattribute__(freq_type)) for pb in risky_problems]) LOG.warning(f"Irregular frequencies for {nb_risky_problems} problems:\n" "Irregular frequencies might be encountered " f"for {freq_type} ranging from {freqs.min():.3f} to {freqs.max():.3f}.\n" + recommendation )
[docs] def fill_dataset(self, dataset, bodies, *, method=None, n_jobs=1, _check_wavelength=True, progress_bar=None, **kwargs): """Solve a set of problems defined by the coordinates of an xarray dataset. Parameters ---------- dataset : xarray Dataset dataset containing the problems parameters: frequency, radiating_dof, water_depth, ... bodies : FloatingBody or list of FloatingBody The body or bodies involved in the problems They should all have different names. method: string, optional select boundary integral equation used to solve the problems. It is recommended to set the method more globally when initializing the solver. If provided here, the value in argument of `fill_dataset` overrides the global one. n_jobs: int, optional (default: 1) the number of jobs to run in parallel using the optional dependency `joblib` By defaults: do not use joblib and solve sequentially. progress_bar: bool, optional Display a progress bar while solving. If no value is provided to this method directly, check whether the environment variable `CAPYTAINE_PROGRESS_BAR` is defined and otherwise default to True. _check_wavelength: bool, optional (default: True) If True, the frequencies are compared to the mesh resolution and the estimated first irregular frequency to warn the user. Returns ------- xarray Dataset """ attrs = {'start_of_computation': datetime.now().isoformat(), **self.exportable_settings} if method is not None: # Overrides the method in self.exportable_settings attrs["method"] = method problems = problems_from_dataset(dataset, bodies) if 'theta' in dataset.coords: results = self.solve_all(problems, keep_details=True, method=method, n_jobs=n_jobs, _check_wavelength=_check_wavelength, progress_bar=progress_bar) kochin = kochin_data_array(results, dataset.coords['theta']) dataset = assemble_dataset(results, attrs=attrs, **kwargs) dataset.update(kochin) else: results = self.solve_all(problems, keep_details=False, method=method, n_jobs=n_jobs, _check_wavelength=_check_wavelength, progress_bar=progress_bar) dataset = assemble_dataset(results, attrs=attrs, **kwargs) return dataset
[docs] def compute_potential(self, points, result): """Compute the value of the potential at given points for a previously solved potential flow problem. Parameters ---------- points: array of shape (3,) or (N, 3), or 3-ple of arrays returned by meshgrid, or MeshLike object Coordinates of the point(s) at which the potential should be computed result: LinearPotentialFlowResult The return of the BEM solver Returns ------- complex-valued array of shape (1,) or (N,) or (nx, ny, nz) or (mesh.nb_faces,) depending of the kind of input The value of the potential at the points Raises ------ Exception: if the :code:`LinearPotentialFlowResult` object given as input does not contain the source distribution. """ points, output_shape = _normalize_points(points, keep_mesh=True) if result.sources is None: raise Exception(f"""The values of the sources of {result} cannot been found. They probably have not been stored by the solver because the option keep_details=True have not been set or the direct method has been used. Please re-run the resolution with the indirect method and keep_details=True.""") with self.timer[" Green function"]: S, _ = self.green_function.evaluate(points, result.body.mesh_including_lid, result.free_surface, result.water_depth, result.encounter_wavenumber) potential = S @ result.sources # Sum the contributions of all panels in the mesh return potential.reshape(output_shape)
def _compute_potential_gradient(self, points, result): points, output_shape = _normalize_points(points, keep_mesh=True) if result.sources is None: raise Exception(f"""The values of the sources of {result} cannot been found. They probably have not been stored by the solver because the option keep_details=True have not been set. Please re-run the resolution with this option.""") with self.timer[" Green function"]: _, gradG = self.green_function.evaluate(points, result.body.mesh_including_lid, result.free_surface, result.water_depth, result.encounter_wavenumber, early_dot_product=False) velocities = np.einsum('ijk,j->ik', gradG, result.sources) # Sum the contributions of all panels in the mesh return velocities.reshape((*output_shape, 3))
[docs] def compute_velocity(self, points, result): """Compute the value of the velocity vector at given points for a previously solved potential flow problem. Parameters ---------- points: array of shape (3,) or (N, 3), or 3-ple of arrays returned by meshgrid, or MeshLike object Coordinates of the point(s) at which the velocity should be computed result: LinearPotentialFlowResult The return of the BEM solver Returns ------- complex-valued array of shape (3,) or (N,, 3) or (nx, ny, nz, 3) or (mesh.nb_faces, 3) depending of the kind of input The value of the velocity at the points Raises ------ Exception: if the :code:`LinearPotentialFlowResult` object given as input does not contain the source distribution. """ nabla_phi = self._compute_potential_gradient(points, result) if result.forward_speed != 0.0: nabla_phi[..., 0] -= result.forward_speed return nabla_phi
[docs] def compute_pressure(self, points, result): """Compute the value of the pressure at given points for a previously solved potential flow problem. Parameters ---------- points: array of shape (3,) or (N, 3), or 3-ple of arrays returned by meshgrid, or MeshLike object Coordinates of the point(s) at which the pressure should be computed result: LinearPotentialFlowResult The return of the BEM solver Returns ------- complex-valued array of shape (1,) or (N,) or (nx, ny, nz) or (mesh.nb_faces,) depending of the kind of input The value of the pressure at the points Raises ------ Exception: if the :code:`LinearPotentialFlowResult` object given as input does not contain the source distribution. """ if result.forward_speed != 0: pressure = 1j * result.encounter_omega * result.rho * self.compute_potential(points, result) nabla_phi = self._compute_potential_gradient(points, result) pressure += result.rho * result.forward_speed * nabla_phi[..., 0] else: pressure = 1j * result.omega * result.rho * self.compute_potential(points, result) return pressure
[docs] def compute_free_surface_elevation(self, points, result): """Compute the value of the free surface elevation at given points for a previously solved potential flow problem. Parameters ---------- points: array of shape (2,) or (N, 2), or 2-ple of arrays returned by meshgrid, or MeshLike object Coordinates of the point(s) at which the free surface elevation should be computed result: LinearPotentialFlowResult The return of the BEM solver Returns ------- complex-valued array of shape (1,) or (N,) or (nx, ny, nz) or (mesh.nb_faces,) depending of the kind of input The value of the free surface elevation at the points Raises ------ Exception: if the :code:`LinearPotentialFlowResult` object given as input does not contain the source distribution. """ points, output_shape = _normalize_free_surface_points(points, keep_mesh=True) if result.forward_speed != 0: fs_elevation = -1/result.g * (-1j*result.encounter_omega) * self.compute_potential(points, result) nabla_phi = self._compute_potential_gradient(points, result) fs_elevation += -1/result.g * result.forward_speed * nabla_phi[..., 0] else: fs_elevation = -1/result.g * (-1j*result.omega) * self.compute_potential(points, result) return fs_elevation.reshape(output_shape)
## Legacy
[docs] def get_potential_on_mesh(self, result, mesh, chunk_size=50): """Compute the potential on a mesh for the potential field of a previously solved problem. Since the interaction matrix does not need to be computed in full to compute the matrix-vector product, only a few lines are evaluated at a time to reduce the memory cost of the operation. The newer method :code:`compute_potential` should be preferred in the future. Parameters ---------- result : LinearPotentialFlowResult the return of the BEM solver mesh : MeshLike a mesh chunk_size: int, optional Number of lines to compute in the matrix. (legacy, should be passed as an engine setting instead). Returns ------- array of shape (mesh.nb_faces,) potential on the faces of the mesh Raises ------ Exception: if the :code:`Result` object given as input does not contain the source distribution. """ LOG.info(f"Compute potential on {mesh.name} for {result}.") if result.sources is None: raise Exception(f"""The values of the sources of {result} cannot been found. They probably have not been stored by the solver because the option keep_details=True have not been set or the direct method has been used. Please re-run the resolution with the indirect method and keep_details=True.""") if chunk_size > mesh.nb_faces: S = self.engine.build_S_matrix( mesh, result.body.mesh_including_lid, result.free_surface, result.water_depth, result.wavenumber, self.green_function ) phi = S @ result.sources else: phi = np.empty((mesh.nb_faces,), dtype=np.complex128) for i in range(0, mesh.nb_faces, chunk_size): faces_to_extract = list(range(i, min(i+chunk_size, mesh.nb_faces))) S = self.engine.build_S_matrix( mesh.extract_faces(faces_to_extract), result.body.mesh_including_lid, result.free_surface, result.water_depth, result.wavenumber, self.green_function ) phi[i:i+chunk_size] = S @ result.sources LOG.debug(f"Done computing potential on {mesh.name} for {result}.") return phi
[docs] def get_free_surface_elevation(self, result, free_surface, keep_details=False): """Compute the elevation of the free surface on a mesh for a previously solved problem. The newer method :code:`compute_free_surface_elevation` should be preferred in the future. Parameters ---------- result : LinearPotentialFlowResult the return of the solver free_surface : FreeSurface a meshed free surface keep_details : bool, optional if True, keep the free surface elevation in the LinearPotentialFlowResult (default:False) Returns ------- array of shape (free_surface.nb_faces,) the free surface elevation on each faces of the meshed free surface Raises ------ Exception: if the :code:`Result` object given as input does not contain the source distribution. """ if result.forward_speed != 0.0: raise NotImplementedError("For free surface elevation with forward speed, please use the `compute_free_surface_elevation` method.") fs_elevation = 1j*result.omega/result.g * self.get_potential_on_mesh(result, free_surface.mesh) if keep_details: result.fs_elevation[free_surface] = fs_elevation return fs_elevation