differ-hug / differ_hug /compute.py
componavt's picture
fix of step 1
2e27876
Raw
History Blame Contribute Delete
18.6 kB
"""
Core computation functions for the ODE research platform.
This module contains pure computational functions that preserve the
numerical logic from the original Streamlit implementation while
being reusable and testable.
"""
import numpy as np
from scipy.integrate import solve_ivp
from typing import Tuple, List, Dict, Any, Optional
import pandas as pd
def build_angles(num_points: int, circle_start: float, circle_end: float) -> np.ndarray:
"""
Generate angles for initial conditions on a circle or sector.
Args:
num_points: Number of points to generate
circle_start: Starting angle in degrees (0-360)
circle_end: Ending angle in degrees (0-360)
Returns:
Array of angles in radians
"""
cs_val = circle_start % 360
ce_val = circle_end % 360
span = (ce_val - cs_val) % 360
if np.isclose(span, 0.0):
angles = np.linspace(0, 2 * np.pi, num_points, endpoint=False)
else:
if ce_val >= cs_val:
degs = np.linspace(cs_val, ce_val, num_points, endpoint=False)
else:
span2 = (ce_val + 360) - cs_val
degs = (cs_val + np.linspace(0, span2, num_points, endpoint=False)) % 360
angles = np.deg2rad(degs)
return angles
def build_initial_conditions(
center_x: float,
center_y: float,
radius: float,
angles: np.ndarray
) -> List[Tuple[float, float]]:
"""
Build initial conditions from center, radius, and angles.
Args:
center_x: X coordinate of circle center
center_y: Y coordinate of circle center
radius: Radius of the circle
angles: Array of angles in radians
Returns:
List of (x, y) initial condition tuples
"""
initial_conditions = []
for angle in angles:
x0 = center_x + radius * np.cos(angle)
y0 = center_y + radius * np.sin(angle)
initial_conditions.append((x0, y0))
return initial_conditions
def gene_regulatory_rhs(
alpha_val: float,
K_val: float,
b_val: float,
g1_val: float,
g2_val: float
):
"""
Create the right-hand side function for the gene regulatory ODE system.
The system is:
dx/dt = (K * x^(1/alpha))/(b^(1/alpha) + x^(1/alpha)) - gamma1 * x
dy/dt = (K * y^(1/alpha))/(b^(1/alpha) + y^(1/alpha)) - gamma2 * y
Args:
alpha_val: Alpha parameter (controls exponent)
K_val: K parameter
b_val: b parameter
g1_val: gamma1 parameter
g2_val: gamma2 parameter
Returns:
RHS function suitable for scipy.integrate.solve_ivp
"""
def rhs(t, state):
x, y = state
n = 1.0 / alpha_val
if n > 1000:
frac_x = K_val if x > b_val else 0.0
frac_y = K_val if y > b_val else 0.0
else:
x_pos, y_pos = max(x, 0.0), max(y, 0.0)
try:
pow_b = np.power(b_val, n)
pow_x = np.power(x_pos, n)
pow_y = np.power(y_pos, n)
frac_x = (K_val * pow_x) / (pow_b + pow_x) if np.isfinite(pow_x) else (K_val if x > b_val else 0.0)
frac_y = (K_val * pow_y) / (pow_b + pow_y) if np.isfinite(pow_y) else (K_val if y > b_val else 0.0)
except Exception:
frac_x, frac_y = (K_val if x > b_val else 0.0), (K_val if y > b_val else 0.0)
return [frac_x - g1_val * x, frac_y - g2_val * y]
return rhs
class SciPySolver:
"""
Solver wrapper using scipy.integrate.solve_ivp.
"""
DEFAULT_METHOD = 'DOP853'
DEFAULT_RTOL = 1e-9
DEFAULT_ATOL = 1e-9
def __init__(
self,
method: str = DEFAULT_METHOD,
rtol: float = DEFAULT_RTOL,
atol: float = DEFAULT_ATOL
):
self.method = method
self.rtol = rtol
self.atol = atol
def solve(
self,
rhs,
x0: Tuple[float, float],
t_eval: np.ndarray
) -> Tuple[bool, Optional[np.ndarray], Optional[np.ndarray]]:
"""
Solve ODE using scipy.integrate.solve_ivp.
Args:
rhs: Right-hand side function of the ODE system
x0: Initial conditions (x0, y0)
t_eval: Time points to evaluate
Returns:
Tuple of (success, x_solution, y_solution)
"""
try:
sol = solve_ivp(
rhs,
(t_eval[0], t_eval[-1]),
x0,
method=self.method,
rtol=self.rtol,
atol=self.atol,
t_eval=t_eval
)
if sol.success:
return True, sol.y[0], sol.y[1]
else:
return False, None, None
except Exception:
return False, None, None
def solve_trajectory_set(
rhs_func,
initial_conditions: List[Tuple[float, float]],
t_full: np.ndarray,
t_train: np.ndarray,
solver_method: str = 'DOP853'
) -> Tuple[List[Dict], List[Dict]]:
"""
Solve ODE for a set of initial conditions and compute metrics.
Args:
rhs_func: RHS function for the ODE system
initial_conditions: List of (x0, y0) tuples
t_full: Time array for full integration
t_train: Time array for training interval
solver_method: Solver method to use ('DOP853' or 'RK45')
Returns:
Tuple of (solutions, metrics) where each is a list of dicts
"""
solutions = []
metrics = []
solver = SciPySolver(method=solver_method)
for idx, (x0, y0) in enumerate(initial_conditions):
success, x_full, y_full = solver.solve(rhs_func, (x0, y0), t_full)
if not success or x_full is None or y_full is None:
solver_fallback = SciPySolver(method='RK45')
success, x_full, y_full = solver_fallback.solve(rhs_func, (x0, y0), t_full)
if not success or x_full is None or y_full is None:
continue
train_success, x_train, y_train = solver.solve(rhs_func, (x0, y0), t_train)
if not train_success:
continue
solutions.append({
"idx": idx,
"x": x_full,
"y": y_full,
"t_full": t_full,
"x_train": x_train,
"y_train": y_train,
"t_train": t_train,
})
amp = float(np.max(np.sqrt(x_full * x_full + y_full * y_full)) -
np.min(np.sqrt(x_full * x_full + y_full * y_full)))
ftle, final_d, ftle_r2 = compute_ftle_metrics(rhs_func, x0, y0, t_full[-1], t_full, x_full, y_full)
hx = hurst_rs(x_full)
hy = hurst_rs(y_full)
hurst_val = np.nanmean([hx, hy])
cr_stats = curvature_radius_stats(x_full, y_full, t_full)
curv_mean = cr_stats["mean"]
curv_median = cr_stats["median"]
curv_std = cr_stats["std"]
path_len = compute_path_length(x_full, y_full)
kappa_arr = cr_stats.get("kappa_array")
kappa_vals = np.array(kappa_arr) if kappa_arr is not None else np.array([])
kappa_vals = kappa_vals[np.isfinite(kappa_vals)] if kappa_vals.size > 0 else np.array([])
max_kappa = float(np.nanmax(kappa_vals)) if kappa_vals.size > 0 else np.nan
frac_high_curv = float(np.sum(kappa_vals > 0.1) / len(t_full)) if kappa_vals.size > 0 else np.nan
metrics.append({
"idx": idx,
"ftle": ftle,
"ftle_r2": ftle_r2,
"amp": amp,
"final_dist": final_d,
"hurst": hurst_val,
"curv_radius_mean": curv_mean,
"curv_radius_median": curv_median,
"curv_radius_std": curv_std,
"curv_p10": cr_stats["p10"],
"curv_p90": cr_stats["p90"],
"curv_count_finite": cr_stats["count_finite"],
"initial_x": float(x0),
"initial_y": float(y0),
"path_len": path_len,
"max_kappa": max_kappa,
"frac_high_curv": frac_high_curv,
})
# Compute local z-score after all metrics are collected
local_z = compute_local_zscore(metrics)
for i, m in enumerate(metrics):
m["curv_radius_local_zscore"] = float(local_z[i]) if local_z[i] is not None else np.nan
# Compute anomaly score
df_temp = pd.DataFrame(metrics)
df_temp["anomaly_score"] = compute_anomaly_score(df_temp)
for i, m in enumerate(metrics):
m["anomaly_score"] = float(df_temp.iloc[i]["anomaly_score"])
return solutions, metrics
def compute_ftle_metrics(
rhs,
x0: float,
y0: float,
te: float,
t_eval: np.ndarray,
x: np.ndarray,
y: np.ndarray
) -> Tuple[float, float, float]:
"""
Computes FTLE (Finite-Time Lyapunov Exponent) and related metrics.
Args:
rhs: Right-hand side function of the ODE system
x0, y0: Initial conditions
te: End time
t_eval: Time points array
x, y: Solution arrays from the main trajectory
Returns:
tuple: (ftle, final_d, ftle_r2) or (np.nan, np.nan, np.nan) if computation fails
"""
eps = 1e-6 * (1.0 + abs(x0) + abs(y0))
xp0, yp0 = x0 + eps, y0 + 0.5 * eps
try:
sol_p = solve_ivp(rhs, (0, te), (xp0, yp0), method='DOP853', t_eval=t_eval)
if sol_p.success:
xp, yp = sol_p.y
dist = np.sqrt((x - xp) ** 2 + (y - yp) ** 2)
dist = np.where(dist <= 0, 1e-12, dist)
final_d = float(dist[-1])
s_idx, e_idx = int(0.25 * len(t_eval)), int(0.75 * len(t_eval))
if e_idx > s_idx + 1:
d_slice = dist[s_idx:e_idx]
t_slice = t_eval[s_idx:e_idx]
d_slice = np.clip(d_slice, 1e-12, None)
ln_d = np.log(d_slice)
slope, intercept = np.polyfit(t_slice, ln_d, 1)
ftle = float(slope)
resid = ln_d - (slope * t_slice + intercept)
ss_res = np.sum(resid ** 2)
ss_tot = np.sum((ln_d - np.mean(ln_d)) ** 2)
ftle_r2 = 1 - ss_res / ss_tot if ss_tot > 0 else np.nan
return ftle, final_d, ftle_r2
return np.nan, np.nan, np.nan
except Exception:
return np.nan, np.nan, np.nan
def hurst_rs(ts: np.ndarray) -> float:
"""
Compute the Hurst exponent using the Rescaled Range (R/S) method.
Args:
ts: Time series data
Returns:
float: Hurst exponent or np.nan if computation fails
"""
x = np.array(ts, dtype=float)
N = len(x)
if N < 20:
return np.nan
x = x - np.mean(x)
Y = np.cumsum(x)
R = np.zeros(N)
S = np.zeros(N)
for n in range(10, N // 2 + 1):
seg = x[:n]
Yseg = Y[:n]
Rn = np.max(Yseg) - np.min(Yseg)
Sn = np.std(seg, ddof=0)
if Sn > 0:
R[n - 1] = Rn
S[n - 1] = Sn
valid = (S > 0) & (R > 0)
if np.sum(valid) < 3:
return np.nan
rs = R[valid] / S[valid]
ns = np.arange(1, N + 1)[valid]
try:
H = np.polyfit(np.log(ns), np.log(rs), 1)[0]
except Exception:
H = np.nan
return float(H)
def curvature_radius_stats(
x: np.ndarray,
y: np.ndarray,
t: np.ndarray,
max_radius: float = 1e6,
clip_inf: bool = True
) -> Dict[str, Any]:
"""
Compute robust curvature/radius statistics for a parametric curve (x(t), y(t)).
Args:
x, y: Coordinates of the curve
t: Parameter values
max_radius: Maximum radius to consider
clip_inf: Whether to clip infinite/very large radii
Returns:
dict: Dictionary containing various curvature statistics
"""
x_t = np.gradient(x, t)
y_t = np.gradient(y, t)
x_tt = np.gradient(x_t, t)
y_tt = np.gradient(y_t, t)
denom = (x_t ** 2 + y_t ** 2) ** 1.5
num = np.abs(x_t * y_tt - y_t * x_tt)
with np.errstate(divide='ignore', invalid='ignore'):
kappa = np.where(denom > 0, num / denom, np.nan)
radius = np.where(np.isfinite(kappa) & (kappa != 0), 1.0 / kappa, np.nan)
if clip_inf:
radius = np.where(radius > max_radius, np.nan, radius)
finite = np.isfinite(radius)
stats = {
"count_total": len(radius),
"count_finite": int(np.sum(finite)),
"frac_finite": float(np.sum(finite) / len(radius)),
"mean": float(np.nanmean(radius)) if np.isfinite(np.nanmean(radius)) else np.nan,
"median": float(np.nanmedian(radius)) if np.isfinite(np.nanmedian(radius)) else np.nan,
"p10": float(np.nanpercentile(radius, 10)) if np.isfinite(np.nanpercentile(radius, 10)) else np.nan,
"p90": float(np.nanpercentile(radius, 90)) if np.isfinite(np.nanpercentile(radius, 90)) else np.nan,
"std": float(np.nanstd(radius)) if np.isfinite(np.nanstd(radius)) else np.nan,
"radius_array": radius,
"kappa_array": (1.0 / radius),
}
return stats
def compute_path_length(x: np.ndarray, y: np.ndarray) -> float:
"""
Compute the total path length of a curve (x(t), y(t)).
Args:
x, y: Coordinates of the curve
Returns:
float: Total path length
"""
dx = np.diff(x)
dy = np.diff(y)
seg_lengths = np.sqrt(dx * dx + dy * dy)
return float(np.sum(seg_lengths))
def compute_local_zscore(metrics: List[Dict]) -> List[Optional[float]]:
"""
Compute local z-score of curvature median versus nearest neighbors.
Args:
metrics: List of metric dictionaries
Returns:
List of local z-scores (or None if computation fails)
"""
n = len(metrics)
if n <= 1:
return [np.nan] * n
arr_init = np.array([[m["initial_x"], m["initial_y"]] for m in metrics])
rad_meds = np.array([m["curv_radius_median"] for m in metrics])
local_z = np.full(n, np.nan)
try:
from sklearn.neighbors import NearestNeighbors
use_sklearn = True
except Exception:
use_sklearn = False
if use_sklearn:
nbrs_k = min(5, n - 1)
nbrs = NearestNeighbors(n_neighbors=nbrs_k + 1).fit(arr_init)
distances, indices = nbrs.kneighbors(arr_init)
for i in range(n):
neigh_idx = indices[i, 1:]
neigh_vals = rad_meds[neigh_idx]
neigh_vals = neigh_vals[np.isfinite(neigh_vals)]
if not np.isfinite(rad_meds[i]) or len(neigh_vals) < 1:
local_z[i] = np.nan
else:
mu = np.mean(neigh_vals)
sigma = np.std(neigh_vals)
local_z[i] = (rad_meds[i] - mu) / sigma if sigma != 0 else np.nan
else:
nbrs_k = min(5, n - 1)
for i in range(n):
dists = np.linalg.norm(arr_init - arr_init[i : i + 1], axis=1)
order = np.argsort(dists)
neigh_idx = order[1 : 1 + nbrs_k]
neigh_vals = rad_meds[neigh_idx]
neigh_vals = neigh_vals[np.isfinite(neigh_vals)]
if not np.isfinite(rad_meds[i]) or len(neigh_vals) < 1:
local_z[i] = np.nan
else:
mu = np.mean(neigh_vals)
sigma = np.std(neigh_vals)
local_z[i] = (rad_meds[i] - mu) / sigma if sigma != 0 else np.nan
return list(local_z)
def robust_z(arr: np.ndarray) -> np.ndarray:
"""
Compute robust z-score using median and IQR.
Args:
arr: Input array
Returns:
Array of robust z-scores
"""
arr = np.array(arr, dtype=float)
finite = np.isfinite(arr)
out = np.full_like(arr, np.nan)
if np.sum(finite) == 0:
return out
median = np.nanmedian(arr[finite])
q1 = np.nanpercentile(arr[finite], 25)
q3 = np.nanpercentile(arr[finite], 75)
iqr = q3 - q1 if q3 - q1 != 0 else 1.0
out[finite] = (arr[finite] - median) / iqr
return out
def compute_anomaly_score(df) -> np.ndarray:
"""
Compute anomaly score combining multiple indicators.
Uses robust z-scores (IQR-based) to combine:
- FTLE (higher = more anomalous)
- path_len (higher = more anomalous)
- max_kappa (higher = more anomalous)
- ftle_r2 (lower = more anomalous, so we subtract)
- hurst (higher = more anomalous)
Args:
df: DataFrame with metrics
Returns:
Array of anomaly scores
"""
if df.empty:
return np.array([])
ftle_z = robust_z(df['ftle'].values)
path_z = robust_z(df['path_len'].values)
kappa_z = robust_z(df['max_kappa'].values)
r2_z = robust_z(df['ftle_r2'].fillna(0).values)
hurst_z = robust_z(df['hurst'].fillna(0).values)
score_arr = ftle_z + path_z + kappa_z - r2_z + hurst_z
return score_arr
def compute_shadowing_diagnostics(
solutions: List[Dict],
rhs_func,
epsilon_threshold: float = 1e-3
) -> Dict[str, Any]:
"""
Compute shadowing diagnostics comparing trajectory solutions.
Args:
solutions: List of solution dictionaries (each with 'x', 'y', 't_full')
rhs_func: Right-hand side function for the ODE system
epsilon_threshold: Threshold for shadowing breakdown
Returns:
Dictionary with shadowing diagnostics
"""
epsilon_t_list = []
shadowing_times = []
for sol in solutions:
x = sol['x']
y = sol['y']
t_full = sol['t_full']
eps = 1e-6 * (1.0 + abs(x[0]) + abs(y[0]))
xp0, yp0 = x[0] + eps, y[0] + 0.5 * eps
try:
sol_p = solve_ivp(
rhs_func,
(t_full[0], t_full[-1]),
(xp0, yp0),
method='DOP853',
t_eval=t_full
)
if sol_p.success:
xp, yp = sol_p.y
dist = np.sqrt((x - xp) ** 2 + (y - yp) ** 2)
epsilon_t = np.maximum.accumulate(dist)
epsilon_t_list.append(epsilon_t)
exceed_indices = np.where(epsilon_t > epsilon_threshold)[0]
if len(exceed_indices) > 0:
first_exceed_idx = exceed_indices[0]
shadowing_times.append(t_full[first_exceed_idx])
else:
shadowing_times.append(None)
except Exception:
continue
return {
'epsilon_t_list': epsilon_t_list,
'shadowing_times': shadowing_times,
}