""" Core computation functions for the ODE research platform. This module contains pure computational functions that preserve the numerical logic from the original Streamlit implementation while being reusable and testable. """ import numpy as np from scipy.integrate import solve_ivp from typing import Tuple, List, Dict, Any, Optional import pandas as pd def build_angles(num_points: int, circle_start: float, circle_end: float) -> np.ndarray: """ Generate angles for initial conditions on a circle or sector. Args: num_points: Number of points to generate circle_start: Starting angle in degrees (0-360) circle_end: Ending angle in degrees (0-360) Returns: Array of angles in radians """ cs_val = circle_start % 360 ce_val = circle_end % 360 span = (ce_val - cs_val) % 360 if np.isclose(span, 0.0): angles = np.linspace(0, 2 * np.pi, num_points, endpoint=False) else: if ce_val >= cs_val: degs = np.linspace(cs_val, ce_val, num_points, endpoint=False) else: span2 = (ce_val + 360) - cs_val degs = (cs_val + np.linspace(0, span2, num_points, endpoint=False)) % 360 angles = np.deg2rad(degs) return angles def build_initial_conditions( center_x: float, center_y: float, radius: float, angles: np.ndarray ) -> List[Tuple[float, float]]: """ Build initial conditions from center, radius, and angles. Args: center_x: X coordinate of circle center center_y: Y coordinate of circle center radius: Radius of the circle angles: Array of angles in radians Returns: List of (x, y) initial condition tuples """ initial_conditions = [] for angle in angles: x0 = center_x + radius * np.cos(angle) y0 = center_y + radius * np.sin(angle) initial_conditions.append((x0, y0)) return initial_conditions def gene_regulatory_rhs( alpha_val: float, K_val: float, b_val: float, g1_val: float, g2_val: float ): """ Create the right-hand side function for the gene regulatory ODE system. The system is: dx/dt = (K * x^(1/alpha))/(b^(1/alpha) + x^(1/alpha)) - gamma1 * x dy/dt = (K * y^(1/alpha))/(b^(1/alpha) + y^(1/alpha)) - gamma2 * y Args: alpha_val: Alpha parameter (controls exponent) K_val: K parameter b_val: b parameter g1_val: gamma1 parameter g2_val: gamma2 parameter Returns: RHS function suitable for scipy.integrate.solve_ivp """ def rhs(t, state): x, y = state n = 1.0 / alpha_val if n > 1000: frac_x = K_val if x > b_val else 0.0 frac_y = K_val if y > b_val else 0.0 else: x_pos, y_pos = max(x, 0.0), max(y, 0.0) try: pow_b = np.power(b_val, n) pow_x = np.power(x_pos, n) pow_y = np.power(y_pos, n) frac_x = (K_val * pow_x) / (pow_b + pow_x) if np.isfinite(pow_x) else (K_val if x > b_val else 0.0) frac_y = (K_val * pow_y) / (pow_b + pow_y) if np.isfinite(pow_y) else (K_val if y > b_val else 0.0) except Exception: frac_x, frac_y = (K_val if x > b_val else 0.0), (K_val if y > b_val else 0.0) return [frac_x - g1_val * x, frac_y - g2_val * y] return rhs class SciPySolver: """ Solver wrapper using scipy.integrate.solve_ivp. """ DEFAULT_METHOD = 'DOP853' DEFAULT_RTOL = 1e-9 DEFAULT_ATOL = 1e-9 def __init__( self, method: str = DEFAULT_METHOD, rtol: float = DEFAULT_RTOL, atol: float = DEFAULT_ATOL ): self.method = method self.rtol = rtol self.atol = atol def solve( self, rhs, x0: Tuple[float, float], t_eval: np.ndarray ) -> Tuple[bool, Optional[np.ndarray], Optional[np.ndarray]]: """ Solve ODE using scipy.integrate.solve_ivp. Args: rhs: Right-hand side function of the ODE system x0: Initial conditions (x0, y0) t_eval: Time points to evaluate Returns: Tuple of (success, x_solution, y_solution) """ try: sol = solve_ivp( rhs, (t_eval[0], t_eval[-1]), x0, method=self.method, rtol=self.rtol, atol=self.atol, t_eval=t_eval ) if sol.success: return True, sol.y[0], sol.y[1] else: return False, None, None except Exception: return False, None, None def solve_trajectory_set( rhs_func, initial_conditions: List[Tuple[float, float]], t_full: np.ndarray, t_train: np.ndarray, solver_method: str = 'DOP853' ) -> Tuple[List[Dict], List[Dict]]: """ Solve ODE for a set of initial conditions and compute metrics. Args: rhs_func: RHS function for the ODE system initial_conditions: List of (x0, y0) tuples t_full: Time array for full integration t_train: Time array for training interval solver_method: Solver method to use ('DOP853' or 'RK45') Returns: Tuple of (solutions, metrics) where each is a list of dicts """ solutions = [] metrics = [] solver = SciPySolver(method=solver_method) for idx, (x0, y0) in enumerate(initial_conditions): success, x_full, y_full = solver.solve(rhs_func, (x0, y0), t_full) if not success or x_full is None or y_full is None: solver_fallback = SciPySolver(method='RK45') success, x_full, y_full = solver_fallback.solve(rhs_func, (x0, y0), t_full) if not success or x_full is None or y_full is None: continue train_success, x_train, y_train = solver.solve(rhs_func, (x0, y0), t_train) if not train_success: continue solutions.append({ "idx": idx, "x": x_full, "y": y_full, "t_full": t_full, "x_train": x_train, "y_train": y_train, "t_train": t_train, }) amp = float(np.max(np.sqrt(x_full * x_full + y_full * y_full)) - np.min(np.sqrt(x_full * x_full + y_full * y_full))) ftle, final_d, ftle_r2 = compute_ftle_metrics(rhs_func, x0, y0, t_full[-1], t_full, x_full, y_full) hx = hurst_rs(x_full) hy = hurst_rs(y_full) hurst_val = np.nanmean([hx, hy]) cr_stats = curvature_radius_stats(x_full, y_full, t_full) curv_mean = cr_stats["mean"] curv_median = cr_stats["median"] curv_std = cr_stats["std"] path_len = compute_path_length(x_full, y_full) kappa_arr = cr_stats.get("kappa_array") kappa_vals = np.array(kappa_arr) if kappa_arr is not None else np.array([]) kappa_vals = kappa_vals[np.isfinite(kappa_vals)] if kappa_vals.size > 0 else np.array([]) max_kappa = float(np.nanmax(kappa_vals)) if kappa_vals.size > 0 else np.nan frac_high_curv = float(np.sum(kappa_vals > 0.1) / len(t_full)) if kappa_vals.size > 0 else np.nan metrics.append({ "idx": idx, "ftle": ftle, "ftle_r2": ftle_r2, "amp": amp, "final_dist": final_d, "hurst": hurst_val, "curv_radius_mean": curv_mean, "curv_radius_median": curv_median, "curv_radius_std": curv_std, "curv_p10": cr_stats["p10"], "curv_p90": cr_stats["p90"], "curv_count_finite": cr_stats["count_finite"], "initial_x": float(x0), "initial_y": float(y0), "path_len": path_len, "max_kappa": max_kappa, "frac_high_curv": frac_high_curv, }) # Compute local z-score after all metrics are collected local_z = compute_local_zscore(metrics) for i, m in enumerate(metrics): m["curv_radius_local_zscore"] = float(local_z[i]) if local_z[i] is not None else np.nan # Compute anomaly score df_temp = pd.DataFrame(metrics) df_temp["anomaly_score"] = compute_anomaly_score(df_temp) for i, m in enumerate(metrics): m["anomaly_score"] = float(df_temp.iloc[i]["anomaly_score"]) return solutions, metrics def compute_ftle_metrics( rhs, x0: float, y0: float, te: float, t_eval: np.ndarray, x: np.ndarray, y: np.ndarray ) -> Tuple[float, float, float]: """ Computes FTLE (Finite-Time Lyapunov Exponent) and related metrics. Args: rhs: Right-hand side function of the ODE system x0, y0: Initial conditions te: End time t_eval: Time points array x, y: Solution arrays from the main trajectory Returns: tuple: (ftle, final_d, ftle_r2) or (np.nan, np.nan, np.nan) if computation fails """ eps = 1e-6 * (1.0 + abs(x0) + abs(y0)) xp0, yp0 = x0 + eps, y0 + 0.5 * eps try: sol_p = solve_ivp(rhs, (0, te), (xp0, yp0), method='DOP853', t_eval=t_eval) if sol_p.success: xp, yp = sol_p.y dist = np.sqrt((x - xp) ** 2 + (y - yp) ** 2) dist = np.where(dist <= 0, 1e-12, dist) final_d = float(dist[-1]) s_idx, e_idx = int(0.25 * len(t_eval)), int(0.75 * len(t_eval)) if e_idx > s_idx + 1: d_slice = dist[s_idx:e_idx] t_slice = t_eval[s_idx:e_idx] d_slice = np.clip(d_slice, 1e-12, None) ln_d = np.log(d_slice) slope, intercept = np.polyfit(t_slice, ln_d, 1) ftle = float(slope) resid = ln_d - (slope * t_slice + intercept) ss_res = np.sum(resid ** 2) ss_tot = np.sum((ln_d - np.mean(ln_d)) ** 2) ftle_r2 = 1 - ss_res / ss_tot if ss_tot > 0 else np.nan return ftle, final_d, ftle_r2 return np.nan, np.nan, np.nan except Exception: return np.nan, np.nan, np.nan def hurst_rs(ts: np.ndarray) -> float: """ Compute the Hurst exponent using the Rescaled Range (R/S) method. Args: ts: Time series data Returns: float: Hurst exponent or np.nan if computation fails """ x = np.array(ts, dtype=float) N = len(x) if N < 20: return np.nan x = x - np.mean(x) Y = np.cumsum(x) R = np.zeros(N) S = np.zeros(N) for n in range(10, N // 2 + 1): seg = x[:n] Yseg = Y[:n] Rn = np.max(Yseg) - np.min(Yseg) Sn = np.std(seg, ddof=0) if Sn > 0: R[n - 1] = Rn S[n - 1] = Sn valid = (S > 0) & (R > 0) if np.sum(valid) < 3: return np.nan rs = R[valid] / S[valid] ns = np.arange(1, N + 1)[valid] try: H = np.polyfit(np.log(ns), np.log(rs), 1)[0] except Exception: H = np.nan return float(H) def curvature_radius_stats( x: np.ndarray, y: np.ndarray, t: np.ndarray, max_radius: float = 1e6, clip_inf: bool = True ) -> Dict[str, Any]: """ Compute robust curvature/radius statistics for a parametric curve (x(t), y(t)). Args: x, y: Coordinates of the curve t: Parameter values max_radius: Maximum radius to consider clip_inf: Whether to clip infinite/very large radii Returns: dict: Dictionary containing various curvature statistics """ x_t = np.gradient(x, t) y_t = np.gradient(y, t) x_tt = np.gradient(x_t, t) y_tt = np.gradient(y_t, t) denom = (x_t ** 2 + y_t ** 2) ** 1.5 num = np.abs(x_t * y_tt - y_t * x_tt) with np.errstate(divide='ignore', invalid='ignore'): kappa = np.where(denom > 0, num / denom, np.nan) radius = np.where(np.isfinite(kappa) & (kappa != 0), 1.0 / kappa, np.nan) if clip_inf: radius = np.where(radius > max_radius, np.nan, radius) finite = np.isfinite(radius) stats = { "count_total": len(radius), "count_finite": int(np.sum(finite)), "frac_finite": float(np.sum(finite) / len(radius)), "mean": float(np.nanmean(radius)) if np.isfinite(np.nanmean(radius)) else np.nan, "median": float(np.nanmedian(radius)) if np.isfinite(np.nanmedian(radius)) else np.nan, "p10": float(np.nanpercentile(radius, 10)) if np.isfinite(np.nanpercentile(radius, 10)) else np.nan, "p90": float(np.nanpercentile(radius, 90)) if np.isfinite(np.nanpercentile(radius, 90)) else np.nan, "std": float(np.nanstd(radius)) if np.isfinite(np.nanstd(radius)) else np.nan, "radius_array": radius, "kappa_array": (1.0 / radius), } return stats def compute_path_length(x: np.ndarray, y: np.ndarray) -> float: """ Compute the total path length of a curve (x(t), y(t)). Args: x, y: Coordinates of the curve Returns: float: Total path length """ dx = np.diff(x) dy = np.diff(y) seg_lengths = np.sqrt(dx * dx + dy * dy) return float(np.sum(seg_lengths)) def compute_local_zscore(metrics: List[Dict]) -> List[Optional[float]]: """ Compute local z-score of curvature median versus nearest neighbors. Args: metrics: List of metric dictionaries Returns: List of local z-scores (or None if computation fails) """ n = len(metrics) if n <= 1: return [np.nan] * n arr_init = np.array([[m["initial_x"], m["initial_y"]] for m in metrics]) rad_meds = np.array([m["curv_radius_median"] for m in metrics]) local_z = np.full(n, np.nan) try: from sklearn.neighbors import NearestNeighbors use_sklearn = True except Exception: use_sklearn = False if use_sklearn: nbrs_k = min(5, n - 1) nbrs = NearestNeighbors(n_neighbors=nbrs_k + 1).fit(arr_init) distances, indices = nbrs.kneighbors(arr_init) for i in range(n): neigh_idx = indices[i, 1:] neigh_vals = rad_meds[neigh_idx] neigh_vals = neigh_vals[np.isfinite(neigh_vals)] if not np.isfinite(rad_meds[i]) or len(neigh_vals) < 1: local_z[i] = np.nan else: mu = np.mean(neigh_vals) sigma = np.std(neigh_vals) local_z[i] = (rad_meds[i] - mu) / sigma if sigma != 0 else np.nan else: nbrs_k = min(5, n - 1) for i in range(n): dists = np.linalg.norm(arr_init - arr_init[i : i + 1], axis=1) order = np.argsort(dists) neigh_idx = order[1 : 1 + nbrs_k] neigh_vals = rad_meds[neigh_idx] neigh_vals = neigh_vals[np.isfinite(neigh_vals)] if not np.isfinite(rad_meds[i]) or len(neigh_vals) < 1: local_z[i] = np.nan else: mu = np.mean(neigh_vals) sigma = np.std(neigh_vals) local_z[i] = (rad_meds[i] - mu) / sigma if sigma != 0 else np.nan return list(local_z) def robust_z(arr: np.ndarray) -> np.ndarray: """ Compute robust z-score using median and IQR. Args: arr: Input array Returns: Array of robust z-scores """ arr = np.array(arr, dtype=float) finite = np.isfinite(arr) out = np.full_like(arr, np.nan) if np.sum(finite) == 0: return out median = np.nanmedian(arr[finite]) q1 = np.nanpercentile(arr[finite], 25) q3 = np.nanpercentile(arr[finite], 75) iqr = q3 - q1 if q3 - q1 != 0 else 1.0 out[finite] = (arr[finite] - median) / iqr return out def compute_anomaly_score(df) -> np.ndarray: """ Compute anomaly score combining multiple indicators. Uses robust z-scores (IQR-based) to combine: - FTLE (higher = more anomalous) - path_len (higher = more anomalous) - max_kappa (higher = more anomalous) - ftle_r2 (lower = more anomalous, so we subtract) - hurst (higher = more anomalous) Args: df: DataFrame with metrics Returns: Array of anomaly scores """ if df.empty: return np.array([]) ftle_z = robust_z(df['ftle'].values) path_z = robust_z(df['path_len'].values) kappa_z = robust_z(df['max_kappa'].values) r2_z = robust_z(df['ftle_r2'].fillna(0).values) hurst_z = robust_z(df['hurst'].fillna(0).values) score_arr = ftle_z + path_z + kappa_z - r2_z + hurst_z return score_arr def compute_shadowing_diagnostics( solutions: List[Dict], rhs_func, epsilon_threshold: float = 1e-3 ) -> Dict[str, Any]: """ Compute shadowing diagnostics comparing trajectory solutions. Args: solutions: List of solution dictionaries (each with 'x', 'y', 't_full') rhs_func: Right-hand side function for the ODE system epsilon_threshold: Threshold for shadowing breakdown Returns: Dictionary with shadowing diagnostics """ epsilon_t_list = [] shadowing_times = [] for sol in solutions: x = sol['x'] y = sol['y'] t_full = sol['t_full'] eps = 1e-6 * (1.0 + abs(x[0]) + abs(y[0])) xp0, yp0 = x[0] + eps, y[0] + 0.5 * eps try: sol_p = solve_ivp( rhs_func, (t_full[0], t_full[-1]), (xp0, yp0), method='DOP853', t_eval=t_full ) if sol_p.success: xp, yp = sol_p.y dist = np.sqrt((x - xp) ** 2 + (y - yp) ** 2) epsilon_t = np.maximum.accumulate(dist) epsilon_t_list.append(epsilon_t) exceed_indices = np.where(epsilon_t > epsilon_threshold)[0] if len(exceed_indices) > 0: first_exceed_idx = exceed_indices[0] shadowing_times.append(t_full[first_exceed_idx]) else: shadowing_times.append(None) except Exception: continue return { 'epsilon_t_list': epsilon_t_list, 'shadowing_times': shadowing_times, }