from __future__ import annotations
import logging
from itertools import chain, accumulate
from typing import Union, List, Optional
from functools import cached_property, lru_cache
import numpy as np
import xarray as xr
from capytaine.bodies.dofs import (
AbstractDof,
DofOnSubmesh,
add_dofs_labels_to_vector,
add_dofs_labels_to_matrix
)
LOG = logging.getLogger(__name__)
[docs]
class Multibody:
def __init__(
self,
bodies: List[Union[FloatingBody, Multibody]],
# own_dofs: Optional[Dict[str, np.array]] = None,
*,
name: Optional[str] = None
):
self.bodies = bodies
if len(set(b.name for b in self.bodies)) < len(self.bodies):
raise ValueError(
"In Multibody, all component bodies must have a distinct name.\n"
f"Got: {[b.name for b in self.bodies]}"
)
# if own_dofs is None:
# self.own_dofs = {}
# else:
# self.own_dofs = own_dofs
if name is None:
self.name = '+'.join(b.name for b in self.bodies)
else:
self.name = name
# Keep legacy behavior of former mesh joining
for matrix_name in ["inertia_matrix", "hydrostatic_stiffness"]:
if all(hasattr(body, matrix_name) for body in bodies):
from scipy.linalg import block_diag
setattr(self, matrix_name, self.add_dofs_labels_to_matrix(
block_diag(*[getattr(body, matrix_name) for body in bodies])
))
LOG.debug(f"New multibody: {self.__str__()}.")
[docs]
@lru_cache
def as_FloatingBody(self):
from capytaine.bodies.bodies import FloatingBody
if all(body.mass is not None for body in self.bodies):
total_mass = sum(body.mass for body in self.bodies)
else:
total_mass = None
if (all(body.mass is not None for body in self.bodies)
and all(body.center_of_mass is not None for body in self.bodies)):
new_cog = sum(body.mass*np.asarray(body.center_of_mass) for body in self.bodies)/total_mass
else:
new_cog = None
return FloatingBody(
mesh=self.mesh,
dofs=self.dofs,
lid_mesh=self.lid_mesh,
mass=total_mass,
center_of_mass=new_cog,
name=self.name,
)
def __str__(self):
short_bodies = ', '.join(b.__short_str__() for b in self.bodies)
# short_dofs = '{' + ', '.join('"{}": ...'.format(d) for d in self.own_dofs) + '}'
return f"Multibody({short_bodies})" #, own_dofs={short_dofs})"
def __short_str__(self):
return str(self)
def __repr__(self):
return str(self)
def _check_dofs_shape_consistency(self):
# TODO
...
@cached_property
def minimal_computable_wavelength(self):
return min(b.minimal_computable_wavelength for b in self.bodies)
[docs]
def first_irregular_frequency_estimate(self, *args, **kwargs):
return min(b.first_irregular_frequency_estimate(*args, **kwargs) for b in self.bodies)
@cached_property
def mesh(self):
return self.bodies[0].mesh.join_meshes(*[b.mesh for b in self.bodies[1:]])
@cached_property
def lid_mesh(self):
if all(body.lid_mesh is None for body in self.bodies):
return None
else:
lid_meshes = [body.lid_mesh.copy() for body in self.bodies if body.lid_mesh is not None]
joined_lid = lid_meshes[0].join_meshes(*lid_meshes[1:], name=f"{self.name}_lid_mesh")
return joined_lid
@cached_property
def mesh_including_lid(self):
return self.bodies[0].mesh_including_lid.join_meshes(*[b.mesh_including_lid for b in self.bodies[1:]])
@cached_property
def hull_mask(self):
return np.concatenate([b.hull_mask for b in self.bodies])
@property
def nb_dofs(self):
return sum(b.nb_dofs for b in self.bodies) # + len(self.own_dofs)
@cached_property
def dofs(self):
for body in self.bodies:
body._check_dofs_shape_consistency()
componenents_dofs = {}
cum_nb_faces = accumulate(chain([0], (body.mesh.nb_faces for body in self.bodies)))
total_nb_faces = sum(body.mesh.nb_faces for body in self.bodies)
for body, nbf in zip(self.bodies, cum_nb_faces):
# nbf is the cumulative number of faces of the previous subbodies,
# that is the offset of the indices of the faces of the current body.
for name, dof in body.dofs.items():
if isinstance(dof, AbstractDof):
new_dof = DofOnSubmesh(dof, range(nbf, nbf+body.mesh.nb_faces))
else:
new_dof = np.zeros((total_nb_faces, 3))
new_dof[nbf:nbf+len(dof), :] = dof
if '__' not in name:
new_dof_name = '__'.join([body.name, name])
else:
# The body is probably a combination of bodies already.
# So for the associativity of the + operation,
# it is better to keep the same name.
new_dof_name = name
componenents_dofs[new_dof_name] = new_dof
return {**componenents_dofs} #, **self.own_dofs}
[docs]
def add_dofs_labels_to_vector(self, vector):
"""Helper function turning a bare vector into a vector labelled by the name of the dofs of the body,
to be used for instance for the computation of RAO."""
return add_dofs_labels_to_vector(self.dofs.keys(), vector)
[docs]
def add_dofs_labels_to_matrix(self, matrix):
"""Helper function turning a bare matrix into a matrix labelled by the name of the dofs of the body,
to be used for instance for the computation of RAO."""
return add_dofs_labels_to_matrix(self.dofs.keys(), matrix)
[docs]
def immersed_part(self, *args, **kwargs):
return Multibody(
[b.immersed_part() for b in self.bodies],
# own_dofs=None, # TODO
)
def __add__(self, body_to_add):
return self.join_bodies(body_to_add)
[docs]
def join_bodies(*bodies, name=None):
from capytaine.bodies.multibodies import Multibody
return Multibody(bodies, name=name)
[docs]
def integrate_pressure(self, pressure):
return self.as_FloatingBody().integrate_pressure(pressure)
@cached_property
def center_of_buoyancy(self):
return {b.name: b.center_of_buoyancy for b in self.bodies}
@cached_property
def center_of_mass(self):
return {b.name: b.center_of_mass for b in self.bodies}
@cached_property
def volume(self):
return {b.name: b.volume for b in self.bodies}
@cached_property
def mass(self):
return {b.name: b.mass for b in self.bodies}
def _combine_component_matrices(self, matrices):
for m, b in zip(matrices, self.bodies):
m.coords['radiating_dof'] = np.array([b.name + '__' + k for k in m.coords['radiating_dof'].values])
m.coords['influenced_dof'] = np.array([b.name + '__' + k for k in m.coords['influenced_dof'].values])
return xr.concat(
matrices,
dim="radiating_dof",
fill_value=0.0
).sel(
radiating_dof=list(self.dofs.keys()),
influenced_dof=list(self.dofs.keys())
)
[docs]
def compute_hydrostatic_stiffness(self, *, rho=1000.0, g=9.81):
return self._combine_component_matrices([b.compute_hydrostatic_stiffness(rho=rho, g=g) for b in self.bodies])
[docs]
def compute_rigid_body_inertia(self, rho=1000.0):
return self._combine_component_matrices([b.compute_rigid_body_inertia(rho=rho) for b in self.bodies])