tengfeiluo's picture
Upload folder using huggingface_hub
01a0b26 verified
"""Model runner with registry and adapter pattern for heterogeneous signatures.
The :class:`ModelRunner` provides a unified interface for running any registered
model against a :class:`CompositeSystem`. Each model is registered with an
adapter callable that extracts the correct arguments from the system object.
References
----------
Architecture follows the Adapter pattern: each model function has its own
signature (core uses P_m/P_f, thermal uses k_m/k_f, CTE needs K/G/alpha, etc.).
The adapter normalises these into a dict of kwargs for the model function.
"""
from __future__ import annotations
import warnings
from dataclasses import dataclass, field
from typing import Any, Callable, Literal
import numpy as np
from compositecalc.materials import CompositeSystem, PropertyDomain
from compositecalc.results import ModelResults
# ---------------------------------------------------------------------------
# ModelSpec — registry entry for a single model
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class ModelSpec:
"""Registry entry describing how to call one model function.
Parameters
----------
func : Callable
The model function to call.
domain : str
Property domain: ``"thermal"``, ``"electrical"``, etc., or ``"any"``
for core models that work with any constitutive property.
category : str
Grouping: ``"bounds"``, ``"conductivity"``, ``"cte"``,
``"percolation"``, ``"auxiliary"``.
description : str
One-line description with citation.
adapter : Callable[[CompositeSystem, dict], dict]
Extracts the right keyword arguments from the system + user kwargs.
Returns a dict that is splatted into *func*. Should raise
:class:`SkipModel` if required properties are missing.
optional_kwargs : dict[str, Any]
Default values for model-specific parameters (e.g. ``n``, ``A``).
returns_bounds : bool
True when the function returns a ``(lower, upper)`` tuple.
"""
func: Callable
domain: str
category: str
description: str
adapter: Callable[[CompositeSystem, dict], dict]
optional_kwargs: dict[str, Any] = field(default_factory=dict)
returns_bounds: bool = False
class SkipModel(Exception):
"""Raised by an adapter when required properties are missing."""
# ---------------------------------------------------------------------------
# Adapters — extract args from CompositeSystem
# ---------------------------------------------------------------------------
def _get_property(system: CompositeSystem, domain: str) -> tuple[float, float]:
"""Return (P_m, P_f) for the given domain, or raise SkipModel."""
mapping = {
"thermal": ("thermal_conductivity", "thermal_conductivity"),
"electrical": ("electrical_conductivity", "electrical_conductivity"),
"dielectric": ("dielectric_permittivity", "dielectric_permittivity"),
"optical": ("dielectric_permittivity", "dielectric_permittivity"),
}
m_attr, f_attr = mapping[domain]
P_m = getattr(system.matrix, m_attr, None)
P_f = getattr(system.primary_filler().material, f_attr, None)
if P_m is None or P_f is None:
raise SkipModel(f"Missing {domain} property for matrix or filler.")
return P_m, P_f
def _adapt_core(system: CompositeSystem, kwargs: dict, domain: str) -> dict:
"""Adapter shared by all core (P_m, P_f, phi) models."""
P_m, P_f = _get_property(system, domain)
# Core models enforce dtype=float; use real part for complex (optical) inputs
if isinstance(P_m, complex):
P_m = P_m.real
if isinstance(P_f, complex):
P_f = P_f.real
filler = system.primary_filler()
return {"P_m": P_m, "P_f": P_f, "phi": filler.volume_fraction}
def _adapt_core_L(system: CompositeSystem, kwargs: dict, domain: str) -> dict:
"""Core model with optional depolarization factor L."""
d = _adapt_core(system, kwargs, domain)
if "L" in kwargs:
d["L"] = kwargs["L"]
return d
def _adapt_thermal(system: CompositeSystem, kwargs: dict) -> dict:
"""Adapter for thermal (k_m, k_f, phi) models."""
P_m, P_f = _get_property(system, "thermal")
filler = system.primary_filler()
return {"k_m": P_m, "k_f": P_f, "phi": filler.volume_fraction}
def _adapt_hamilton_crosser(system: CompositeSystem, kwargs: dict) -> dict:
d = _adapt_thermal(system, kwargs)
if "n" in kwargs:
d["n"] = kwargs["n"]
return d
def _adapt_lewis_nielsen(system: CompositeSystem, kwargs: dict) -> dict:
d = _adapt_thermal(system, kwargs)
if "A" in kwargs:
d["A"] = kwargs["A"]
if "phi_max" in kwargs:
d["phi_max"] = kwargs["phi_max"]
return d
def _adapt_agari_uno(system: CompositeSystem, kwargs: dict) -> dict:
d = _adapt_thermal(system, kwargs)
if "C1" in kwargs:
d["C1"] = kwargs["C1"]
if "C2" in kwargs:
d["C2"] = kwargs["C2"]
return d
def _adapt_nan_spherical(system: CompositeSystem, kwargs: dict) -> dict:
d = _adapt_thermal(system, kwargs)
filler = system.primary_filler()
# R_K from interphase or kwargs
R_K = kwargs.get("R_K")
if R_K is None and system.interphase is not None:
R_K = system.interphase.kapitza_resistance
if R_K is None:
raise SkipModel("nan_spherical requires R_K (Kapitza resistance).")
# radius from filler or kwargs
radius = kwargs.get("radius")
if radius is None:
radius = filler.radius
if radius is None:
raise SkipModel("nan_spherical requires particle radius.")
d["R_K"] = R_K
d["radius"] = radius
return d
def _adapt_nan_ellipsoid(system: CompositeSystem, kwargs: dict) -> dict:
d = _adapt_thermal(system, kwargs)
filler = system.primary_filler()
R_K = kwargs.get("R_K")
if R_K is None and system.interphase is not None:
R_K = system.interphase.kapitza_resistance
if R_K is None:
raise SkipModel("nan_ellipsoid requires R_K (Kapitza resistance).")
a_1 = kwargs.get("a_1")
if a_1 is None:
a_1 = filler.radius
if a_1 is None:
raise SkipModel("nan_ellipsoid requires semi-axis a_1 (or particle radius).")
L_11 = kwargs.get("L_11")
L_33 = kwargs.get("L_33")
if L_11 is None or L_33 is None:
raise SkipModel("nan_ellipsoid requires L_11 and L_33.")
d.update({"R_K": R_K, "a_1": a_1, "L_11": L_11, "L_33": L_33})
return d
def _adapt_nan_fiber_cnt(system: CompositeSystem, kwargs: dict) -> dict:
d = _adapt_thermal(system, kwargs)
filler = system.primary_filler()
R_K = kwargs.get("R_K")
if R_K is None and system.interphase is not None:
R_K = system.interphase.kapitza_resistance
if R_K is None:
raise SkipModel("nan_fiber_cnt requires R_K (Kapitza resistance).")
dia = kwargs.get("d")
if dia is None and filler.radius is not None:
dia = 2.0 * filler.radius
if dia is None:
raise SkipModel("nan_fiber_cnt requires fiber diameter d.")
length = kwargs.get("length")
if length is None and filler.radius is not None and filler.aspect_ratio > 1:
length = filler.radius * filler.aspect_ratio
if length is None:
raise SkipModel("nan_fiber_cnt requires fiber length.")
d.update({"R_K": R_K, "d": dia, "length": length})
return d
def _adapt_foygel(system: CompositeSystem, kwargs: dict) -> dict:
filler = system.primary_filler()
k_0 = kwargs.get("k_0")
phi_c = kwargs.get("phi_c")
t = kwargs.get("t")
if k_0 is None or phi_c is None or t is None:
raise SkipModel("foygel requires k_0, phi_c, and t kwargs.")
return {"k_0": k_0, "phi": filler.volume_fraction, "phi_c": phi_c, "t": t}
def _adapt_turner_cte(system: CompositeSystem, kwargs: dict) -> dict:
filler = system.primary_filler()
alpha_m = system.matrix.cte
alpha_f = filler.material.cte
K_m = system.matrix.bulk_modulus
K_f = filler.material.bulk_modulus
if any(v is None for v in (alpha_m, alpha_f, K_m, K_f)):
raise SkipModel("turner_cte requires cte and bulk_modulus on both phases.")
return {
"alpha_m": alpha_m, "alpha_f": alpha_f,
"K_m": K_m, "K_f": K_f,
"phi": filler.volume_fraction,
}
def _adapt_kerner_cte(system: CompositeSystem, kwargs: dict) -> dict:
filler = system.primary_filler()
alpha_m = system.matrix.cte
alpha_f = filler.material.cte
K_m = system.matrix.bulk_modulus
K_f = filler.material.bulk_modulus
G_m = system.matrix.shear_modulus
if any(v is None for v in (alpha_m, alpha_f, K_m, K_f, G_m)):
raise SkipModel(
"kerner_cte requires cte, bulk_modulus on both phases "
"and shear_modulus on matrix."
)
return {
"alpha_m": alpha_m, "alpha_f": alpha_f,
"K_m": K_m, "K_f": K_f, "G_m": G_m,
"phi": filler.volume_fraction,
}
def _adapt_schapery_bounds(system: CompositeSystem, kwargs: dict) -> dict:
filler = system.primary_filler()
alpha_m = system.matrix.cte
alpha_f = filler.material.cte
K_m = system.matrix.bulk_modulus
K_f = filler.material.bulk_modulus
G_m = system.matrix.shear_modulus
G_f = filler.material.shear_modulus
if any(v is None for v in (alpha_m, alpha_f, K_m, K_f, G_m, G_f)):
raise SkipModel(
"schapery_bounds requires cte, bulk_modulus, and shear_modulus "
"on both phases."
)
return {
"alpha_m": alpha_m, "alpha_f": alpha_f,
"K_m": K_m, "K_f": K_f,
"G_m": G_m, "G_f": G_f,
"phi": filler.volume_fraction,
}
def _adapt_density_mix(system: CompositeSystem, kwargs: dict) -> dict:
filler = system.primary_filler()
rho_m = system.matrix.density
rho_f = filler.material.density
if rho_m is None or rho_f is None:
raise SkipModel("density_mix requires density on both phases.")
return {"rho_m": rho_m, "rho_f": rho_f, "phi": filler.volume_fraction}
def _adapt_specific_heat_mix(system: CompositeSystem, kwargs: dict) -> dict:
filler = system.primary_filler()
c_m = system.matrix.specific_heat
c_f = filler.material.specific_heat
rho_m = system.matrix.density
rho_f = filler.material.density
if any(v is None for v in (c_m, c_f, rho_m, rho_f)):
raise SkipModel(
"specific_heat_mix requires specific_heat and density on both phases."
)
return {
"c_m": c_m, "c_f": c_f,
"rho_m": rho_m, "rho_f": rho_f,
"phi": filler.volume_fraction,
}
def _adapt_sihvola_kong(system: CompositeSystem, kwargs: dict, domain: str) -> dict:
d = _adapt_core(system, kwargs, domain)
if "nu" in kwargs:
d["nu"] = kwargs["nu"]
return d
def _adapt_mclachlan_gem(system: CompositeSystem, kwargs: dict, domain: str) -> dict:
d = _adapt_core(system, kwargs, domain)
if "phi_c" in kwargs:
d["phi_c"] = kwargs["phi_c"]
if "t" in kwargs:
d["t"] = kwargs["t"]
return d
def _adapt_bruggeman_dem(system: CompositeSystem, kwargs: dict, domain: str) -> dict:
"""bruggeman_dem uses phi_eval (not phi) as its parameter name."""
P_m, P_f = _get_property(system, domain)
# Core models enforce dtype=float; use real part for complex (optical) inputs
if isinstance(P_m, complex):
P_m = P_m.real
if isinstance(P_f, complex):
P_f = P_f.real
filler = system.primary_filler()
d: dict[str, Any] = {
"P_m": P_m, "P_f": P_f, "phi_eval": filler.volume_fraction,
}
if "L" in kwargs:
d["L"] = kwargs["L"]
return d
def _adapt_power_law(system: CompositeSystem, kwargs: dict, domain: str) -> dict:
d = _adapt_core(system, kwargs, domain)
if "n" in kwargs:
d["n"] = kwargs["n"]
return d
# --- Electrical adapters ---
def _adapt_electrical(system: CompositeSystem, kwargs: dict) -> dict:
"""Adapter for electrical (sigma_m, sigma_f, phi) models."""
P_m, P_f = _get_property(system, "electrical")
filler = system.primary_filler()
return {"sigma_m": P_m, "sigma_f": P_f, "phi": filler.volume_fraction}
def _adapt_kirkpatrick(system: CompositeSystem, kwargs: dict) -> dict:
d = _adapt_electrical(system, kwargs)
if "z" in kwargs:
d["z"] = kwargs["z"]
return d
def _adapt_percolation_power_law(system: CompositeSystem, kwargs: dict) -> dict:
filler = system.primary_filler()
sigma_0 = kwargs.get("sigma_0")
phi_c = kwargs.get("phi_c")
t = kwargs.get("t")
if any(v is None for v in (sigma_0, phi_c, t)):
raise SkipModel("percolation_power_law requires sigma_0, phi_c, and t kwargs.")
return {"sigma_0": sigma_0, "phi": filler.volume_fraction, "phi_c": phi_c, "t": t}
# --- Dielectric adapters ---
def _adapt_dielectric(system: CompositeSystem, kwargs: dict) -> dict:
"""Adapter for dielectric (eps_m, eps_f, phi) models."""
P_m, P_f = _get_property(system, "dielectric")
filler = system.primary_filler()
return {"eps_m": P_m, "eps_f": P_f, "phi": filler.volume_fraction}
def _adapt_yamada(system: CompositeSystem, kwargs: dict) -> dict:
d = _adapt_dielectric(system, kwargs)
if "n" in kwargs:
d["n"] = kwargs["n"]
return d
# --- Optical adapters ---
def _adapt_ri_mixing(system: CompositeSystem, kwargs: dict) -> dict:
"""Adapter for refractive-index mixing (n_m, n_f, phi) models."""
filler = system.primary_filler()
n_m = kwargs.get("n_m")
if n_m is None and system.matrix.refractive_index is not None:
n_m = system.matrix.refractive_index.real if isinstance(
system.matrix.refractive_index, complex
) else system.matrix.refractive_index
n_f = kwargs.get("n_f")
if n_f is None and filler.material.refractive_index is not None:
n_f = filler.material.refractive_index.real if isinstance(
filler.material.refractive_index, complex
) else filler.material.refractive_index
if n_m is None or n_f is None:
raise SkipModel("Requires n_m and n_f (refractive indices).")
return {"n_m": float(n_m), "n_f": float(n_f), "phi": filler.volume_fraction}
def _adapt_complex_emt(system: CompositeSystem, kwargs: dict) -> dict:
"""Adapter for complex-valued EMT (eps_m, eps_f, phi) optical models."""
P_m, P_f = _get_property(system, "optical")
filler = system.primary_filler()
return {"eps_m": complex(P_m), "eps_f": complex(P_f), "phi": filler.volume_fraction}
# ---------------------------------------------------------------------------
# Registry — built lazily to avoid import cycles
# ---------------------------------------------------------------------------
_REGISTRY: dict[str, ModelSpec] | None = None
def _build_registry() -> dict[str, ModelSpec]:
"""Lazily build the model registry on first access."""
from compositecalc.core import (
bruggeman,
bruggeman_dem,
geometric_mean,
hashin_shtrikman_lower,
hashin_shtrikman_upper,
looyenga,
maxwell_garnett,
mclachlan_gem,
power_law,
reuss,
sihvola_kong,
voigt,
)
from compositecalc.thermal import (
agari_uno,
cheng_vachon,
density_mix,
foygel,
hamilton_crosser,
kerner_cte,
lewis_nielsen,
nan_ellipsoid,
nan_fiber_cnt,
nan_spherical,
russell,
schapery_bounds,
specific_heat_mix,
turner_cte,
)
from compositecalc.electrical.percolation import (
kirkpatrick_resistor_network,
percolation_power_law,
)
from compositecalc.dielectric.mixing import (
jayasundere_smith,
poon_shin,
yamada,
)
from compositecalc.optical.refractive_index import (
arago_biot,
gladstone_dale,
heller,
lorentz_lorenz,
newton_mixing,
)
from compositecalc.optical.complex_emt import (
complex_bruggeman,
complex_maxwell_garnett,
)
registry: dict[str, ModelSpec] = {}
# --- Bounds (domain-generic) ---
registry["voigt"] = ModelSpec(
func=voigt, domain="any", category="bounds",
description="Voigt (1889) parallel upper bound.",
adapter=lambda s, kw: _adapt_core(s, kw, kw.pop("_domain", "thermal")),
)
registry["reuss"] = ModelSpec(
func=reuss, domain="any", category="bounds",
description="Reuss (1929) series lower bound.",
adapter=lambda s, kw: _adapt_core(s, kw, kw.pop("_domain", "thermal")),
)
registry["hashin_shtrikman_lower"] = ModelSpec(
func=hashin_shtrikman_lower, domain="any", category="bounds",
description="Hashin–Shtrikman (1962) lower bound.",
adapter=lambda s, kw: _adapt_core(s, kw, kw.pop("_domain", "thermal")),
)
registry["hashin_shtrikman_upper"] = ModelSpec(
func=hashin_shtrikman_upper, domain="any", category="bounds",
description="Hashin–Shtrikman (1962) upper bound.",
adapter=lambda s, kw: _adapt_core(s, kw, kw.pop("_domain", "thermal")),
)
# --- Core conductivity models (domain-generic) ---
registry["maxwell_garnett"] = ModelSpec(
func=maxwell_garnett, domain="any", category="conductivity",
description="Maxwell-Garnett (1904) / Clausius–Mossotti EMT.",
adapter=lambda s, kw: _adapt_core_L(s, kw, kw.pop("_domain", "thermal")),
optional_kwargs={"L": 1.0 / 3.0},
)
registry["mori_tanaka"] = ModelSpec(
func=maxwell_garnett, domain="any", category="conductivity",
description="Mori–Tanaka (1973) mean-field (= MG for scalar L).",
adapter=lambda s, kw: _adapt_core_L(s, kw, kw.pop("_domain", "thermal")),
optional_kwargs={"L": 1.0 / 3.0},
)
registry["bruggeman"] = ModelSpec(
func=bruggeman, domain="any", category="conductivity",
description="Bruggeman (1935) symmetric self-consistent EMT.",
adapter=lambda s, kw: _adapt_core_L(s, kw, kw.pop("_domain", "thermal")),
optional_kwargs={"L": 1.0 / 3.0},
)
registry["bruggeman_dem"] = ModelSpec(
func=bruggeman_dem, domain="any", category="conductivity",
description="Bruggeman DEM (differential effective medium).",
adapter=lambda s, kw: _adapt_bruggeman_dem(s, kw, kw.pop("_domain", "thermal")),
optional_kwargs={"L": 1.0 / 3.0},
)
registry["geometric_mean"] = ModelSpec(
func=geometric_mean, domain="any", category="conductivity",
description="Lichtenecker (1926) geometric / logarithmic mixing rule.",
adapter=lambda s, kw: _adapt_core(s, kw, kw.pop("_domain", "thermal")),
)
registry["power_law"] = ModelSpec(
func=power_law, domain="any", category="conductivity",
description="General power-law mixing rule (Lichtenecker family).",
adapter=lambda s, kw: _adapt_power_law(s, kw, kw.pop("_domain", "thermal")),
optional_kwargs={"n": 1.0 / 3.0},
)
registry["looyenga"] = ModelSpec(
func=looyenga, domain="any", category="conductivity",
description="Looyenga (1965) cube-root mixing rule (power_law n=1/3).",
adapter=lambda s, kw: _adapt_core(s, kw, kw.pop("_domain", "thermal")),
)
registry["sihvola_kong"] = ModelSpec(
func=sihvola_kong, domain="any", category="conductivity",
description="Sihvola–Kong (1988) unified ν-parameter EMT.",
adapter=lambda s, kw: _adapt_sihvola_kong(
s, kw, kw.pop("_domain", "thermal")
),
optional_kwargs={"nu": 0.0},
)
registry["mclachlan_gem"] = ModelSpec(
func=mclachlan_gem, domain="any", category="conductivity",
description="McLachlan (1987) General Effective Media equation.",
adapter=lambda s, kw: _adapt_mclachlan_gem(
s, kw, kw.pop("_domain", "thermal")
),
optional_kwargs={"phi_c": 1.0 / 3.0, "t": 2.0},
)
# --- Thermal-specific conductivity models ---
registry["hamilton_crosser"] = ModelSpec(
func=hamilton_crosser, domain="thermal", category="conductivity",
description="Hamilton–Crosser (1962) shape-factor model.",
adapter=_adapt_hamilton_crosser,
optional_kwargs={"n": 3.0},
)
registry["lewis_nielsen"] = ModelSpec(
func=lewis_nielsen, domain="thermal", category="conductivity",
description="Lewis–Nielsen (1970) max-packing model.",
adapter=_adapt_lewis_nielsen,
optional_kwargs={"A": 1.5, "phi_max": 0.637},
)
registry["russell"] = ModelSpec(
func=russell, domain="thermal", category="conductivity",
description="Russell (1935) cubic-particle model.",
adapter=_adapt_thermal,
)
registry["cheng_vachon"] = ModelSpec(
func=cheng_vachon, domain="thermal", category="conductivity",
description="Cheng–Vachon (1969) parabolic distribution model.",
adapter=_adapt_thermal,
)
registry["agari_uno"] = ModelSpec(
func=agari_uno, domain="thermal", category="conductivity",
description="Agari–Uno (1986) semi-empirical logarithmic model.",
adapter=_adapt_agari_uno,
optional_kwargs={"C1": 1.0, "C2": 1.0},
)
registry["nan_spherical"] = ModelSpec(
func=nan_spherical, domain="thermal", category="conductivity",
description="Nan (1997) spheres with Kapitza resistance.",
adapter=_adapt_nan_spherical,
)
registry["nan_ellipsoid"] = ModelSpec(
func=nan_ellipsoid, domain="thermal", category="conductivity",
description="Nan (1997) random spheroids with Kapitza resistance.",
adapter=_adapt_nan_ellipsoid,
)
registry["nan_fiber_cnt"] = ModelSpec(
func=nan_fiber_cnt, domain="thermal", category="conductivity",
description="Nan (2004) CNT/fiber dilute Kapitza model.",
adapter=_adapt_nan_fiber_cnt,
)
# --- CTE models ---
registry["turner_cte"] = ModelSpec(
func=turner_cte, domain="thermal", category="cte",
description="Turner (1946) CTE — K-weighted average.",
adapter=_adapt_turner_cte,
)
registry["kerner_cte"] = ModelSpec(
func=kerner_cte, domain="thermal", category="cte",
description="Kerner (1956) CTE — spherical inclusions.",
adapter=_adapt_kerner_cte,
)
registry["schapery_bounds"] = ModelSpec(
func=schapery_bounds, domain="thermal", category="cte",
description="Schapery (1968) tightest isotropic CTE bounds.",
adapter=_adapt_schapery_bounds,
returns_bounds=True,
)
# --- Auxiliary thermal models ---
registry["density_mix"] = ModelSpec(
func=density_mix, domain="thermal", category="auxiliary",
description="Exact volume-weighted density mixing rule.",
adapter=_adapt_density_mix,
)
registry["specific_heat_mix"] = ModelSpec(
func=specific_heat_mix, domain="thermal", category="auxiliary",
description="Exact mass-weighted specific heat mixing rule.",
adapter=_adapt_specific_heat_mix,
)
# --- Percolation ---
registry["foygel"] = ModelSpec(
func=foygel, domain="thermal", category="percolation",
description="Foygel (2005) percolation power-law model.",
adapter=_adapt_foygel,
)
# --- Electrical-specific models ---
registry["kirkpatrick"] = ModelSpec(
func=kirkpatrick_resistor_network, domain="electrical",
category="conductivity",
description="Kirkpatrick (1973) random resistor network EMT.",
adapter=_adapt_kirkpatrick,
optional_kwargs={"z": 6.0},
)
registry["percolation_power_law"] = ModelSpec(
func=percolation_power_law, domain="electrical",
category="percolation",
description="Classical percolation power law for σ above φ_c.",
adapter=_adapt_percolation_power_law,
)
# --- Dielectric-specific mixing models ---
registry["yamada"] = ModelSpec(
func=yamada, domain="dielectric", category="conductivity",
description="Yamada (1982) shape-factor dielectric mixing.",
adapter=_adapt_yamada,
optional_kwargs={"n": 3.0},
)
registry["jayasundere_smith"] = ModelSpec(
func=jayasundere_smith, domain="dielectric", category="conductivity",
description="Jayasundere–Smith (1993) dipole-interaction mixing.",
adapter=_adapt_dielectric,
)
registry["poon_shin"] = ModelSpec(
func=poon_shin, domain="dielectric", category="conductivity",
description="Poon–Shin (2004) self-consistent dielectric mixing.",
adapter=_adapt_dielectric,
)
# --- Optical refractive-index mixing models ---
registry["lorentz_lorenz"] = ModelSpec(
func=lorentz_lorenz, domain="optical", category="conductivity",
description="Lorentz–Lorenz / Clausius–Mossotti refractive index mixing.",
adapter=_adapt_ri_mixing,
)
registry["gladstone_dale"] = ModelSpec(
func=gladstone_dale, domain="optical", category="conductivity",
description="Gladstone–Dale (1863) linear (n-1) mixing.",
adapter=_adapt_ri_mixing,
)
registry["arago_biot"] = ModelSpec(
func=arago_biot, domain="optical", category="conductivity",
description="Arago–Biot linear n mixing.",
adapter=_adapt_ri_mixing,
)
registry["heller"] = ModelSpec(
func=heller, domain="optical", category="conductivity",
description="Heller (1945) dilute-suspension approximation.",
adapter=_adapt_ri_mixing,
)
registry["newton_mixing"] = ModelSpec(
func=newton_mixing, domain="optical", category="conductivity",
description="Newton n² averaging rule.",
adapter=_adapt_ri_mixing,
)
registry["complex_maxwell_garnett"] = ModelSpec(
func=complex_maxwell_garnett, domain="optical",
category="conductivity",
description="Maxwell-Garnett EMT for complex dielectric functions.",
adapter=_adapt_complex_emt,
)
registry["complex_bruggeman"] = ModelSpec(
func=complex_bruggeman, domain="optical", category="conductivity",
description="Bruggeman self-consistent EMT for complex ε.",
adapter=_adapt_complex_emt,
)
return registry
def _get_registry() -> dict[str, ModelSpec]:
"""Return the global model registry, building it on first call."""
global _REGISTRY
if _REGISTRY is None:
_REGISTRY = _build_registry()
return _REGISTRY
# ---------------------------------------------------------------------------
# ModelRunner
# ---------------------------------------------------------------------------
# Domain → units mapping
_DOMAIN_UNITS: dict[str, str] = {
"thermal": "W/(m·K)",
"electrical": "S/m",
"dielectric": "",
"optical": "",
}
class ModelRunner:
"""Run multiple analytical models against a :class:`CompositeSystem`.
Parameters
----------
system : CompositeSystem
The composite to evaluate.
domain : str
Property domain: ``"thermal"``, ``"electrical"``, etc.
Examples
--------
>>> from compositecalc import Material, Filler, CompositeSystem, ModelRunner
>>> matrix = Material(name="epoxy", thermal_conductivity=0.2)
>>> filler_mat = Material(name="Al2O3", thermal_conductivity=30.0)
>>> filler = Filler(material=filler_mat, volume_fraction=0.2, shape="sphere")
>>> system = CompositeSystem(matrix=matrix, fillers=[filler])
>>> runner = ModelRunner(system, domain="thermal")
>>> results = runner.run()
>>> "maxwell_garnett" in results.predictions
True
"""
def __init__(
self,
system: CompositeSystem,
domain: str = "thermal",
) -> None:
self.system = system
self.domain = domain
self._registry = _get_registry()
# ---- Querying available models ----
def available_models(self, category: str | None = None) -> list[str]:
"""Return names of models usable with the current system and domain.
Parameters
----------
category : str, optional
Filter by category (``"bounds"``, ``"conductivity"``, ``"cte"``,
``"percolation"``, ``"auxiliary"``).
Returns
-------
list[str]
Sorted list of model names.
"""
result = []
for name, spec in self._registry.items():
if spec.domain not in ("any", self.domain):
continue
if category is not None and spec.category != category:
continue
result.append(name)
return sorted(result)
@staticmethod
def list_all_models(domain: str | None = None) -> list[str]:
"""Return all registered model names, optionally filtered by domain.
Parameters
----------
domain : str, optional
If given, only models matching this domain (or ``"any"``) are
returned.
Returns
-------
list[str]
Sorted list of model names.
"""
registry = _get_registry()
result = []
for name, spec in registry.items():
if domain is not None and spec.domain not in ("any", domain):
continue
result.append(name)
return sorted(result)
# ---- Running models ----
def run(
self,
models: list[str] | None = None,
category: str | None = None,
**kwargs: Any,
) -> ModelResults:
"""Evaluate models at the system's filler volume fraction.
Parameters
----------
models : list[str], optional
Specific model names to run. If ``None``, runs all available.
category : str, optional
Filter to a single category. Ignored if *models* is given.
**kwargs
Model-specific overrides (e.g. ``n=6``, ``R_K=1e-8``).
Returns
-------
ModelResults
Container with one scalar prediction per model.
"""
targets = self._resolve_targets(models, category)
predictions: dict[str, float | complex | np.ndarray] = {}
for name in targets:
spec = self._registry[name]
try:
adapted = self._call_adapter(spec, kwargs)
result = spec.func(**adapted)
if spec.returns_bounds:
lower, upper = result
predictions[f"{name}_lower"] = float(lower)
predictions[f"{name}_upper"] = float(upper)
elif isinstance(result, complex) and result.imag != 0:
predictions[name] = result
else:
predictions[name] = float(
result.real if isinstance(result, complex) else result
)
except SkipModel:
continue
except Exception as exc:
warnings.warn(
f"Model '{name}' raised {type(exc).__name__}: {exc}",
stacklevel=2,
)
continue
phi = self.system.primary_filler().volume_fraction
return ModelResults(
domain=self.domain,
phi=phi,
predictions=predictions,
inputs={"system": repr(self.system), **kwargs},
units=_DOMAIN_UNITS.get(self.domain, ""),
)
def sweep(
self,
phi: np.ndarray,
models: list[str] | None = None,
category: str | None = None,
**kwargs: Any,
) -> ModelResults:
"""Evaluate models over a range of volume fractions.
The phi array is passed directly to vectorized model functions where
possible. For models that raise on certain phi values (e.g.
``cheng_vachon`` for phi > ~0.67), the offending entries are filled
with NaN.
Parameters
----------
phi : np.ndarray
1-D array of filler volume fractions.
models : list[str], optional
Specific model names. If ``None``, runs all available.
category : str, optional
Filter to a single category. Ignored if *models* is given.
**kwargs
Model-specific overrides.
Returns
-------
ModelResults
Container with one array prediction per model.
"""
phi = np.asarray(phi, dtype=float)
targets = self._resolve_targets(models, category)
predictions: dict[str, float | complex | np.ndarray] = {}
for name in targets:
spec = self._registry[name]
try:
adapted = self._call_adapter(spec, kwargs)
# Override the phi value with the sweep array
phi_key = self._find_phi_key(adapted)
adapted[phi_key] = phi
result = spec.func(**adapted)
if spec.returns_bounds:
lower, upper = result
predictions[f"{name}_lower"] = np.asarray(lower, dtype=float)
predictions[f"{name}_upper"] = np.asarray(upper, dtype=float)
else:
arr = np.asarray(result)
# Preserve complex arrays; cast real to float
if not np.issubdtype(arr.dtype, np.complexfloating):
arr = arr.astype(float)
predictions[name] = arr
except SkipModel:
continue
except Exception:
# Try element-wise fallback for models that fail on some phi
try:
arr = self._sweep_elementwise(name, spec, phi, kwargs)
if spec.returns_bounds:
predictions[f"{name}_lower"] = arr[0]
predictions[f"{name}_upper"] = arr[1]
else:
predictions[name] = arr
except SkipModel:
continue
return ModelResults(
domain=self.domain,
phi=phi,
predictions=predictions,
inputs={"system": repr(self.system), **kwargs},
units=_DOMAIN_UNITS.get(self.domain, ""),
)
# ---- Internal helpers ----
def _resolve_targets(
self,
models: list[str] | None,
category: str | None,
) -> list[str]:
"""Return the ordered list of model names to evaluate."""
if models is not None:
# Validate requested names
unknown = set(models) - set(self._registry)
if unknown:
raise ValueError(f"Unknown model(s): {sorted(unknown)}")
return list(models)
return self.available_models(category=category)
def _call_adapter(self, spec: ModelSpec, kwargs: dict) -> dict:
"""Call a spec's adapter with domain injection for core models."""
kw = dict(kwargs) # copy so adapter can pop _domain
if spec.domain == "any":
kw["_domain"] = self.domain
return spec.adapter(self.system, kw)
@staticmethod
def _find_phi_key(adapted: dict) -> str:
"""Find the phi-like key in an adapted kwargs dict."""
for key in ("phi", "phi_eval"):
if key in adapted:
return key
raise KeyError("No phi key found in adapted kwargs.")
def _sweep_elementwise(
self,
name: str,
spec: ModelSpec,
phi_arr: np.ndarray,
kwargs: dict,
) -> np.ndarray | tuple[np.ndarray, np.ndarray]:
"""Element-wise sweep fallback for models that fail on vectorized phi."""
if spec.returns_bounds:
lower = np.full_like(phi_arr, np.nan)
upper = np.full_like(phi_arr, np.nan)
else:
result_arr: np.ndarray | None = None
for i, phi_val in enumerate(phi_arr):
try:
adapted = self._call_adapter(spec, kwargs)
phi_key = self._find_phi_key(adapted)
adapted[phi_key] = phi_val
val = spec.func(**adapted)
if spec.returns_bounds:
lower[i], upper[i] = val
else:
# Allocate on first success to pick correct dtype
if result_arr is None:
dt = complex if isinstance(val, complex) else float
result_arr = np.full(len(phi_arr), np.nan, dtype=dt)
result_arr[i] = val
except Exception:
continue
if spec.returns_bounds:
return lower, upper
if result_arr is None:
result_arr = np.full_like(phi_arr, np.nan)
return result_arr