RFTs_Forecasts / app.py
RFTSystems's picture
Update app.py
754b80e verified
raw
history blame
17.7 kB
# app.py
# ===============================================================
# Rendered Frame Theory — Live Prediction Console
# Domains: Atmospheric / Seismic / Magnetic / Solar
# Full transparency: exact inputs + computed z, τ_eff, Ω_obs, α_R, index, and decision rule.
# Single-file.
# ===============================================================
import math
from datetime import datetime, timezone, timedelta
import gradio as gr
import httpx
import numpy as np
import pandas as pd
APP_NAME = "Rendered Frame Theory — Live Prediction Console"
UA = {"User-Agent": "RFTSystems/LivePredictionConsole"}
# ---------------------------
# Core constants
# ---------------------------
T_EARTH = 365.2422 * 24 * 3600.0
OMEGA_OBS = 2.0 * math.pi / T_EARTH
K_TAU = 1.38
ALPHA_R = 1.02
# ---------------------------
# Regions
# ---------------------------
REGION_BBOX = {
"Global": None,
"EMEA": (-35.0, -20.0, 70.0, 60.0),
"AMER": (-60.0, -170.0, 72.0, -30.0),
"APAC": (-50.0, 60.0, 60.0, 180.0),
}
RING_OF_FIRE_BBOXES = [
(-60.0, 120.0, 60.0, 180.0),
(-60.0, -180.0, 60.0, -100.0),
(10.0, -90.0, 60.0, -60.0),
]
# ---------------------------
# Helpers
# ---------------------------
def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
def clamp(x: float, a: float, b: float) -> float:
return max(a, min(b, x))
def tau_eff_from_z(z: float) -> float:
z = max(0.0, float(z))
return K_TAU * math.log(1.0 + z)
def stable_log_ratio(x: float, x0: float) -> float:
x = max(float(x), 1e-30)
x0 = max(float(x0), 1e-30)
return math.log(x / x0)
def index_from_tau(tau: float) -> float:
return float(OMEGA_OBS * float(tau) * ALPHA_R)
# ---------------------------
# Live adapters
# ---------------------------
def geocode_location(q: str):
q = (q or "").strip()
if not q:
return None, None, "Empty location"
url = "https://geocoding-api.open-meteo.com/v1/search"
params = {"name": q, "count": 1, "language": "en", "format": "json"}
r = httpx.get(url, params=params, headers=UA, timeout=12)
r.raise_for_status()
js = r.json()
results = js.get("results") or []
if not results:
return None, None, f"Could not geocode '{q}'"
top = results[0]
lat = float(top["latitude"])
lon = float(top["longitude"])
display = f"{top.get('name','')}, {top.get('country_code','')}".strip().strip(",")
return lat, lon, display
def fetch_openmeteo_hourly(lat: float, lon: float, past_days: int = 1):
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat,
"longitude": lon,
"hourly": "temperature_2m,relative_humidity_2m,pressure_msl,wind_speed_10m",
"past_days": past_days,
"forecast_days": 1,
"timezone": "UTC",
}
r = httpx.get(url, params=params, headers=UA, timeout=18)
r.raise_for_status()
js = r.json()
hourly = js.get("hourly") or {}
return {
"time": hourly.get("time") or [],
"temp": hourly.get("temperature_2m") or [],
"rh": hourly.get("relative_humidity_2m") or [],
"p": hourly.get("pressure_msl") or [],
"wind": hourly.get("wind_speed_10m") or [],
}
def fetch_kp_last_24h():
url = "https://services.swpc.noaa.gov/json/planetary_k_index_1m.json"
r = httpx.get(url, headers=UA, timeout=15)
r.raise_for_status()
js = r.json()
if not isinstance(js, list) or not js:
return []
vals = []
for row in js:
kp = row.get("kp_index")
if kp is None:
continue
try:
vals.append(float(kp))
except Exception:
pass
return vals[-1440:]
def fetch_goes_xray_1day():
url = "https://services.swpc.noaa.gov/json/goes/primary/xrays-1-day.json"
r = httpx.get(url, headers=UA, timeout=15)
r.raise_for_status()
js = r.json()
if not isinstance(js, list) or not js:
return []
out = []
for row in js:
f = row.get("flux")
if f is None:
continue
try:
out.append(float(f))
except Exception:
pass
return out
def fetch_usgs_quakes(hours: int, minmag: float, bbox=None):
url = "https://earthquake.usgs.gov/fdsnws/event/1/query"
end = datetime.now(timezone.utc)
start = end - timedelta(hours=int(hours))
params = {
"format": "geojson",
"starttime": start.isoformat().replace("+00:00", "Z"),
"endtime": end.isoformat().replace("+00:00", "Z"),
"minmagnitude": str(float(minmag)),
"orderby": "time",
}
if bbox is not None:
minlat, minlon, maxlat, maxlon = bbox
params.update(
{
"minlatitude": str(minlat),
"minlongitude": str(minlon),
"maxlatitude": str(maxlat),
"maxlongitude": str(maxlon),
}
)
r = httpx.get(url, params=params, headers=UA, timeout=22)
r.raise_for_status()
js = r.json()
feats = js.get("features") if isinstance(js, dict) else None
if not feats:
return []
out = []
for f in feats:
props = f.get("properties") or {}
out.append({"id": f.get("id"), "mag": props.get("mag"), "place": props.get("place"), "time": props.get("time")})
return out
# ---------------------------
# Agents
# ---------------------------
def magnetic_agent():
kp = fetch_kp_last_24h()
if len(kp) < 30:
return {"enabled": False, "reason": "NOAA Kp feed too short"}
last = float(kp[-1])
tail = kp[-360:] if len(kp) >= 360 else kp
drift = float(np.std(tail)) if len(tail) >= 10 else 0.0
slope = float((tail[-1] - tail[0]) / max(1, len(tail) - 1))
z = clamp((last / 9.0) + (drift / 2.0) + 2.0 * abs(slope), 0.0, 3.0)
tau = tau_eff_from_z(z)
idx = index_from_tau(tau)
if last >= 7.0 or z >= 2.0:
pred = "warning"
rule = "Kp>=7 OR z>=2.0"
elif last >= 5.0 or z >= 1.2:
pred = "watch"
rule = "Kp>=5 OR z>=1.2"
elif last >= 4.0 or z >= 0.8:
pred = "monitor"
rule = "Kp>=4 OR z>=0.8"
else:
pred = "hold"
rule = "else"
live = f"Global Kp={last:.1f} | drift={drift:.2f} | slope={slope:.4f}"
return {
"enabled": True,
"domain": "Magnetic",
"prediction": pred,
"rule_fired": rule,
"z": float(z),
"tau_eff": float(tau),
"omega_obs": float(OMEGA_OBS),
"alpha_r": float(ALPHA_R),
"index": float(idx),
"live_status": live,
"truth_source": "NOAA SWPC planetary_k_index_1m (global)",
"inputs_used": {"kp_last": last, "kp_drift": drift, "kp_slope": slope, "tail_len": len(tail)},
"why": "z_mag compresses magnitude+variability into a bounded stress coordinate; τ_eff rises as ln(1+z).",
"how": "Fetch Kp → compute last/variability/slope → z_mag → τ_eff=1.38 ln(1+z) → Index=Ω_obs·τ_eff·α_R → label via fixed thresholds.",
}
def solar_agent():
flux = fetch_goes_xray_1day()
if len(flux) < 50:
return {"enabled": False, "reason": "GOES X-ray feed too short"}
tail = flux[-120:] if len(flux) >= 120 else flux[-60:]
f_mean = float(np.mean(tail))
f_peak = float(np.max(tail))
lr = stable_log_ratio(f_mean, 1e-8)
z = clamp(lr / 10.0, 0.0, 3.0)
tau = tau_eff_from_z(z)
idx = index_from_tau(tau)
if f_peak >= 1e-4 or z >= 2.2:
pred = "flare likely"
rule = "peak>=1e-4 OR z>=2.2"
elif f_peak >= 1e-5 or z >= 1.5:
pred = "flare watch"
rule = "peak>=1e-5 OR z>=1.5"
elif f_mean >= 1e-6 or z >= 0.9:
pred = "monitor"
rule = "mean>=1e-6 OR z>=0.9"
else:
pred = "hold"
rule = "else"
live = f"Global GOES mean={f_mean:.2e} | peak={f_peak:.2e}"
return {
"enabled": True,
"domain": "Solar",
"prediction": pred,
"rule_fired": rule,
"z": float(z),
"tau_eff": float(tau),
"omega_obs": float(OMEGA_OBS),
"alpha_r": float(ALPHA_R),
"index": float(idx),
"live_status": live,
"truth_source": "NOAA SWPC GOES primary xrays-1-day (global)",
"inputs_used": {"flux_mean": f_mean, "flux_peak": f_peak, "tail_len": len(tail)},
"why": "z_solar is derived from flux relative to baseline via a log ratio; τ_eff rises as ln(1+z).",
"how": "Fetch GOES flux → mean/peak → z_solar=clamp(ln(F_mean/1e-8)/10) → τ_eff → Index → label via fixed thresholds.",
}
def atmospheric_agent(lat: float, lon: float, display: str):
wx = fetch_openmeteo_hourly(lat, lon, past_days=1)
temp = wx["temp"]
p = wx["p"]
wind = wx["wind"]
if len(temp) < 13:
return {"enabled": False, "reason": "Open-Meteo hourly series too short"}
t12 = [float(x) for x in temp[-13:]]
dT = float(max(t12) - min(t12))
dp = None
if len(p) >= 13:
p12 = [float(x) for x in p[-13:]]
dp = float(p12[-1] - p12[0])
w_mean = None
if len(wind) >= 13:
w12 = [float(x) for x in wind[-13:]]
w_mean = float(np.mean(w12))
z_dt = clamp(dT / 10.0, 0.0, 2.0)
z_dp = clamp((abs(dp) / 12.0) if dp is not None else 0.0, 0.0, 1.5)
z = clamp(z_dt + z_dp, 0.0, 3.0)
tau = tau_eff_from_z(z)
idx = index_from_tau(tau)
if dT >= 10.0 or (dp is not None and dp <= -10.0):
pred = "storm risk"
rule = "ΔT>=10 OR ΔP<=-10"
elif dT >= 7.0 or (dp is not None and dp <= -6.0):
pred = "swing"
rule = "ΔT>=7 OR ΔP<=-6"
elif dT >= 4.0:
pred = "mild swing"
rule = "ΔT>=4"
else:
pred = "stable"
rule = "else"
parts = [f"{display} ΔT(12h)={dT:.1f}°C"]
if dp is not None:
parts.append(f"ΔP(12h)={dp:.1f} hPa")
if w_mean is not None:
parts.append(f"wind≈{w_mean:.1f} m/s")
live = " | ".join(parts)
return {
"enabled": True,
"domain": "Atmospheric",
"prediction": pred,
"rule_fired": rule,
"z": float(z),
"tau_eff": float(tau),
"omega_obs": float(OMEGA_OBS),
"alpha_r": float(ALPHA_R),
"index": float(idx),
"live_status": live,
"truth_source": "Open-Meteo hourly (location-based)",
"inputs_used": {"dT_12h": dT, "dP_12h": dp, "wind_mean": w_mean, "lat": lat, "lon": lon},
"why": "z_atm is derived from thermal swing and pressure change as a bounded stress coordinate; τ_eff rises as ln(1+z).",
"how": "Geocode → fetch hourly series → compute ΔT and ΔP (last ~12h) → z_atm → τ_eff → Index → label via fixed thresholds.",
}
def seismic_agent(region: str):
if region == "RingOfFire":
seen = set()
eqs = []
for bb in RING_OF_FIRE_BBOXES:
chunk = fetch_usgs_quakes(hours=24, minmag=2.5, bbox=bb)
for e in chunk:
eid = e.get("id")
if eid and eid not in seen:
seen.add(eid)
eqs.append(e)
else:
bbox = REGION_BBOX.get(region, None)
eqs = fetch_usgs_quakes(hours=24, minmag=2.5, bbox=bbox)
N = int(len(eqs))
mags = []
for e in eqs:
m = e.get("mag")
if m is None:
continue
try:
mags.append(float(m))
except Exception:
pass
Mmax = float(max(mags)) if mags else 0.0
z_count = clamp(N / 60.0, 0.0, 1.5)
z_mag = clamp(max(0.0, Mmax - 4.0) / 2.5, 0.0, 1.5)
z = clamp(z_count + z_mag, 0.0, 3.0)
tau = tau_eff_from_z(z)
idx = index_from_tau(tau)
if Mmax >= 6.5 or z >= 2.2:
pred = "alert"
rule = "Mmax>=6.5 OR z>=2.2"
elif Mmax >= 5.5 or z >= 1.5:
pred = "watch"
rule = "Mmax>=5.5 OR z>=1.5"
elif N >= 25 or z >= 1.0:
pred = "monitor"
rule = "N>=25 OR z>=1.0"
else:
pred = "quiet"
rule = "else"
scope = "Global" if region == "Global" else region
live = f"{scope} quakes(24h,M≥2.5)={N} | max M{Mmax:.1f}"
return {
"enabled": True,
"domain": "Seismic",
"prediction": pred,
"rule_fired": rule,
"z": float(z),
"tau_eff": float(tau),
"omega_obs": float(OMEGA_OBS),
"alpha_r": float(ALPHA_R),
"index": float(idx),
"live_status": live,
"truth_source": "USGS FDSN event feed (region-filtered)",
"inputs_used": {"count_24h": N, "max_mag_24h": Mmax, "region": region},
"why": "z_seis compresses activity density and severity into a bounded stress coordinate; τ_eff rises as ln(1+z).",
"how": "Fetch USGS in region → count + max magnitude → z_seis → τ_eff → Index → label via fixed thresholds.",
}
# ---------------------------
# Orchestrator
# ---------------------------
def run_forecast(location_text: str, seismic_region: str):
try:
lat, lon, display = geocode_location(location_text)
except Exception as e:
df = pd.DataFrame([{"Domain": "Error", "RFT Prediction": "DISABLED", "Live Status": f"Geocode error: {e}"}])
empty = {"enabled": False, "reason": f"Geocode error: {e}"}
return f"❌ Geocode error: {e}", df, empty, empty, empty, empty
if lat is None:
df = pd.DataFrame([{"Domain": "Error", "RFT Prediction": "DISABLED", "Live Status": display}])
empty = {"enabled": False, "reason": display}
return f"❌ {display}", df, empty, empty, empty, empty
try:
atm = atmospheric_agent(lat, lon, display)
except Exception as e:
atm = {"enabled": False, "reason": f"atmos error: {e}"}
try:
sei = seismic_agent(seismic_region)
except Exception as e:
sei = {"enabled": False, "reason": f"seismic error: {e}"}
try:
mag = magnetic_agent()
except Exception as e:
mag = {"enabled": False, "reason": f"magnetic error: {e}"}
try:
sol = solar_agent()
except Exception as e:
sol = {"enabled": False, "reason": f"solar error: {e}"}
def fmt_row(domain: str, out: dict):
if not out.get("enabled"):
return {"Domain": domain, "RFT Prediction": "DISABLED", "Live Status": out.get("reason", "missing inputs")}
idx = out.get("index", None)
z = out.get("z", None)
tau = out.get("tau_eff", None)
idx_s = f"{float(idx):.3e}" if isinstance(idx, (int, float)) else "n/a"
z_s = f"{float(z):.2f}" if isinstance(z, (int, float)) else "n/a"
t_s = f"{float(tau):.3f}" if isinstance(tau, (int, float)) else "n/a"
return {
"Domain": domain,
"RFT Prediction": f"{out.get('prediction','hold')} | idx={idx_s} | z={z_s} | τ={t_s}",
"Live Status": out.get("live_status", ""),
}
df = pd.DataFrame(
[
fmt_row("Atmospheric", atm),
fmt_row("Seismic", sei),
fmt_row("Magnetic", mag),
fmt_row("Solar", sol),
]
)
ts = utc_now_iso()
header = f"**Location:** {display} (lat {lat:.3f}, lon {lon:.3f}) | **UTC:** {ts}"
return header, df, atm, sei, mag, sol
# ---------------------------
# Open Method
# ---------------------------
METHOD_MD = f"""
## Open method
**Shared core:**
- `τ_eff = {K_TAU} · ln(1 + z)`
- `Ω_obs = 2π / T_earth = {OMEGA_OBS:.6e}`
- `α_R = {ALPHA_R}`
- `Index = Ω_obs · τ_eff · α_R`
### Atmospheric (Open-Meteo, location)
- Inputs: hourly temp, pressure, wind (last ~12h).
- `ΔT = max(T) - min(T)`
- `ΔP = P_end - P_start` (if available)
- `z_atm = clamp( clamp(ΔT/10,0..2) + clamp(|ΔP|/12,0..1.5), 0..3 )`
- Label rule in agent output.
### Seismic (USGS, region)
- Inputs: USGS events in last 24h (M≥2.5), filtered by region.
- Count `N`, max magnitude `Mmax`
- `z_seis = clamp( clamp(N/60,0..1.5) + clamp(max(0,Mmax-4)/2.5,0..1.5), 0..3 )`
- Label rule in agent output.
### Magnetic (NOAA Kp, global)
- Inputs: planetary Kp 1-min stream.
- `z_mag = clamp( (Kp_last/9) + (drift/2) + 2·|slope|, 0..3 )`
- Label rule in agent output.
### Solar (GOES X-ray, global)
- Inputs: GOES 1–8Å flux (1-day stream).
- `z_solar = clamp( ln(F_mean/1e-8) / 10, 0..3 )`
- Label rule in agent output.
Missing/short feeds show **DISABLED**.
"""
# ---------------------------
# UI
# ---------------------------
with gr.Blocks(title=APP_NAME) as demo:
gr.Markdown(f"# {APP_NAME}")
with gr.Tab("Live Forecast"):
with gr.Row():
loc = gr.Textbox(label="Location", value="London")
region = gr.Dropdown(["Global", "EMEA", "AMER", "APAC", "RingOfFire"], value="EMEA", label="Seismic Region")
btn = gr.Button("Run Forecast", variant="primary")
header_md = gr.Markdown()
table = gr.Dataframe(headers=["Domain", "RFT Prediction", "Live Status"], interactive=False)
with gr.Accordion("Atmospheric details", open=False):
atm_json = gr.JSON(label="Atmospheric agent output")
with gr.Accordion("Seismic details", open=False):
sei_json = gr.JSON(label="Seismic agent output")
with gr.Accordion("Magnetic details", open=False):
mag_json = gr.JSON(label="Magnetic agent output")
with gr.Accordion("Solar details", open=False):
sol_json = gr.JSON(label="Solar agent output")
btn.click(run_forecast, inputs=[loc, region], outputs=[header_md, table, atm_json, sei_json, mag_json, sol_json])
with gr.Tab("Method (Open)"):
gr.Markdown(METHOD_MD)
if __name__ == "__main__":
demo.launch()