Spaces:
Running
Running
| # ============================================================ | |
| # RFT-Ω FRAMEWORK — TOTAL-PROOF API (Sprint 1) | |
| # Author: Liam Grinstead (RFT Systems) | All Rights Reserved | |
| # ============================================================ | |
| """ | |
| This Space is a deterministic, signed validation harness for the | |
| Rendered Frame Theory (RFT-Ω) harmonic stability kernel. | |
| Core guarantees: | |
| - Deterministic replay via seed + config. | |
| - Multiple schedules (single/ramp/random/impulse/step). | |
| - Multiple noise distributions (gauss/uniform). | |
| - Signed outputs: SHA-512 over (config+results). | |
| - Downloadable bundle: run.json, run.sha512, ABOUT.json, NOTICE.txt. | |
| - Health & About endpoints for ops integration. | |
| Legal: | |
| All Rights Reserved — RFT-IPURL v1.0 (UK / Berne). | |
| Research validation use only. No reverse-engineering or derivative | |
| kernels without written consent from the Author. | |
| Contact: liamgrinstead2@gmail.com | |
| """ | |
| import os, io, json, time, math, hashlib, zipfile, random | |
| from datetime import datetime | |
| from typing import Dict, Any, List, Tuple | |
| import numpy as np | |
| import gradio as gr | |
| # Optional FastAPI for /healthz and /about | |
| try: | |
| from fastapi import FastAPI | |
| HAVE_FASTAPI = True | |
| except Exception: | |
| HAVE_FASTAPI = False | |
| # ------------------ Version / About ------------------------- | |
| RFT_VERSION = "v4.0-total-proof-sprint1" | |
| RFT_DOI = "https://doi.org/10.5281/zenodo.17466722" | |
| HF_URL = "https://rftsystems-rft-omega-api.hf.space" | |
| LEGAL_NOTICE = ( | |
| "All Rights Reserved — RFT-IPURL v1.0 (UK / Berne). " | |
| "Research validation use only. No reverse-engineering or derivative kernels without written consent." | |
| ) | |
| ABOUT_BLOCK = { | |
| "name": "RFT-Ω Framework — Total-Proof API", | |
| "version": RFT_VERSION, | |
| "doi": RFT_DOI, | |
| "space": HF_URL, | |
| "profiles": ["AI / Neural", "SpaceX / Aerospace", "Energy / RHES", "Extreme Perturbation"], | |
| "noise_distributions": ["gauss", "uniform"], | |
| "schedules": ["single", "ramp", "random", "impulse", "step"], | |
| "metrics": ["QΩ", "ζ_sync", "status"], | |
| "integrity": ["run_id", "sha512(config+results)", "bundle(zip)"], | |
| "legal": LEGAL_NOTICE, | |
| } | |
| # ------------------ Soft Rate Limit ------------------------- | |
| RUN_HISTORY_TS: List[float] = [] # timestamps of recent runs | |
| MAX_RUNS_PER_MINUTE = 60 # global fairness guard | |
| def _rate_limit_ok() -> Tuple[bool, str]: | |
| now = time.time() | |
| # prune >60s | |
| while RUN_HISTORY_TS and now - RUN_HISTORY_TS[0] > 60.0: | |
| RUN_HISTORY_TS.pop(0) | |
| if len(RUN_HISTORY_TS) >= MAX_RUNS_PER_MINUTE: | |
| return False, "Rate limit exceeded (demo fairness). Please retry shortly." | |
| RUN_HISTORY_TS.append(now) | |
| return True, "ok" | |
| # ------------------ Profiles & Simulator -------------------- | |
| PROFILES = { | |
| "AI / Neural": {"base": (0.86, 0.80), "w": (0.65, 0.35)}, | |
| "SpaceX / Aerospace": {"base": (0.84, 0.79), "w": (0.60, 0.40)}, | |
| "Energy / RHES": {"base": (0.83, 0.78), "w": (0.55, 0.45)}, | |
| "Extreme Perturbation":{"base": (0.82, 0.77), "w": (0.50, 0.50)}, | |
| } | |
| def _rng(seed: int) -> np.random.RandomState: | |
| return np.random.RandomState(seed) | |
| def simulate_step(rng: np.random.RandomState, profile: str, sigma: float, noise_dist: str) -> Dict[str, Any]: | |
| base_q, base_z = PROFILES[profile]["base"] | |
| wq, wz = PROFILES[profile]["w"] | |
| if noise_dist == "uniform": | |
| q_noise = rng.uniform(-sigma, sigma) | |
| z_noise = rng.uniform(-sigma * 0.8, sigma * 0.8) | |
| else: # "gauss" | |
| q_noise = rng.normal(0, sigma) | |
| z_noise = rng.normal(0, sigma * 0.8) | |
| q = float(np.clip(base_q + wq * q_noise, 0.0, 0.99)) | |
| z = float(np.clip(base_z + wz * z_noise, 0.0, 0.99)) | |
| variance = abs(q_noise) + abs(z_noise) | |
| if variance > 0.15: | |
| status = "critical" | |
| elif variance > 0.07: | |
| status = "perturbed" | |
| else: | |
| status = "nominal" | |
| return {"sigma": float(round(sigma, 6)), "QΩ": q, "ζ_sync": z, "status": status} | |
| # ------------------ Schedules ------------------------------- | |
| def build_sigma_series(schedule_type: str, params: Dict[str, Any]) -> List[float]: | |
| # robust defaults | |
| if schedule_type == "single": | |
| sigma = float(params.get("sigma", 0.05)) | |
| return [sigma] | |
| elif schedule_type == "ramp": | |
| start = float(params.get("start", 0.0)) | |
| stop = float(params.get("stop", 0.3)) | |
| steps = int(params.get("steps", 10)) | |
| steps = max(1, steps) | |
| return list(np.linspace(start, stop, steps)) | |
| elif schedule_type == "random": | |
| smin = float(params.get("min", 0.0)) | |
| smax = float(params.get("max", 0.3)) | |
| steps = int(params.get("steps", 10)) | |
| seed = int(params.get("seed", 0)) | |
| r = random.Random(seed) | |
| return [r.uniform(smin, smax) for _ in range(max(1, steps))] | |
| elif schedule_type == "impulse": | |
| base = float(params.get("base", 0.05)) | |
| spike = float(params.get("spike", 0.25)) | |
| at = int(params.get("at", 5)) | |
| steps = int(params.get("steps", 10)) | |
| series = [base] * max(1, steps) | |
| if 0 <= at < len(series): | |
| series[at] = spike | |
| return series | |
| elif schedule_type == "step": | |
| before = float(params.get("before", 0.05)) | |
| after = float(params.get("after", 0.20)) | |
| at = int(params.get("at", 5)) | |
| steps = int(params.get("steps", 10)) | |
| series = [before] * max(1, steps) | |
| for i in range(len(series)): | |
| if i >= at: | |
| series[i] = after | |
| return series | |
| else: | |
| # fallback | |
| return [float(params.get("sigma", 0.05))] | |
| # ------------------ Integrity / Bundling -------------------- | |
| def sha512_hex(s: str) -> str: | |
| return hashlib.sha512(s.encode("utf-8")).hexdigest() | |
| def make_run_id(seed: int, profile: str) -> str: | |
| # Mix timestamp + seed + profile to generate a short run id | |
| raw = f"{time.time_ns()}::{seed}::{profile}::{random.random()}" | |
| return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:16] | |
| def write_bundle(run_dir: str, config: Dict[str, Any], results: Dict[str, Any]) -> Tuple[str, str]: | |
| os.makedirs(run_dir, exist_ok=True) | |
| run_json_path = os.path.join(run_dir, "run.json") | |
| with open(run_json_path, "w") as f: | |
| json.dump({"config": config, "results": results}, f, indent=2) | |
| # SHA-512 over canonical (sorted) JSON string | |
| canonical = json.dumps({"config": config, "results": results}, sort_keys=True) | |
| digest = sha512_hex(canonical) | |
| sha_path = os.path.join(run_dir, "run.sha512") | |
| with open(sha_path, "w") as f: | |
| f.write(digest + "\n") | |
| about_path = os.path.join(run_dir, "ABOUT.json") | |
| with open(about_path, "w") as f: | |
| json.dump(ABOUT_BLOCK, f, indent=2) | |
| notice_path = os.path.join(run_dir, "NOTICE.txt") | |
| with open(notice_path, "w") as f: | |
| f.write(LEGAL_NOTICE + "\n") | |
| # zip it | |
| zip_path = os.path.join(run_dir, "rft_run_bundle.zip") | |
| with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as z: | |
| z.write(run_json_path, arcname="run.json") | |
| z.write(sha_path, arcname="run.sha512") | |
| z.write(about_path, arcname="ABOUT.json") | |
| z.write(notice_path, arcname="NOTICE.txt") | |
| return zip_path, digest | |
| # ------------------ Core Runner ----------------------------- | |
| def run_total_proof( | |
| profile: str, | |
| noise_dist: str, | |
| schedule_type: str, | |
| schedule_params_text: str, | |
| seed: int, | |
| samples: int, | |
| ): | |
| ok, msg = _rate_limit_ok() | |
| if not ok: | |
| return {"error": msg, "rft_notice": LEGAL_NOTICE}, None | |
| # parse schedule params | |
| try: | |
| params = json.loads(schedule_params_text) if schedule_params_text.strip() else {} | |
| if not isinstance(params, dict): | |
| raise ValueError("schedule_params must be a JSON object.") | |
| except Exception as e: | |
| return {"error": f"Invalid schedule_params JSON: {e}", "rft_notice": LEGAL_NOTICE}, None | |
| # build sigma series | |
| sigma_series = build_sigma_series(schedule_type, params) | |
| # deterministic RNG per step (derived from master seed + index) | |
| rng_master = _rng(seed) | |
| results_steps = [] | |
| trigger_count = 0 | |
| for i, sigma in enumerate(sigma_series): | |
| # average over 'samples' repeats for this step | |
| qs, zs, statuses = [], [], [] | |
| # derive a sub-seed for determinism but step-unique noise | |
| sub_seed = int(rng_master.randint(0, 2**31 - 1)) | |
| step_rng = _rng(sub_seed) | |
| for _ in range(max(1, samples)): | |
| out = simulate_step(step_rng, profile, float(sigma), noise_dist) | |
| qs.append(out["QΩ"]); zs.append(out["ζ_sync"]); statuses.append(out["status"]) | |
| q_mean = float(np.mean(qs)) | |
| z_mean = float(np.mean(zs)) | |
| # majority status (ties → worst) | |
| status_counts = {"nominal": 0, "perturbed": 0, "critical": 0} | |
| for s in statuses: | |
| status_counts[s] += 1 | |
| # order critical > perturbed > nominal | |
| majority = max(status_counts.items(), key=lambda kv: (kv[1], ["nominal","perturbed","critical"].index(kv[0]))) | |
| status_final = majority[0] | |
| if status_final != "nominal": | |
| trigger_count += 1 | |
| results_steps.append({ | |
| "index": i, | |
| "sigma": float(round(float(sigma), 6)), | |
| "QΩ_mean": float(round(q_mean, 6)), | |
| "ζ_sync_mean": float(round(z_mean, 6)), | |
| "status_majority": status_final, | |
| "samples": int(max(1, samples)) | |
| }) | |
| # summary | |
| statuses = [s["status_majority"] for s in results_steps] | |
| summary = { | |
| "steps": len(results_steps), | |
| "nominal_count": int(sum(1 for s in statuses if s == "nominal")), | |
| "perturbed_count": int(sum(1 for s in statuses if s == "perturbed")), | |
| "critical_count": int(sum(1 for s in statuses if s == "critical")), | |
| "triggers_non_nominal": int(trigger_count) | |
| } | |
| # build config and results blocks | |
| run_id = make_run_id(seed, profile) | |
| config = { | |
| "run_id": run_id, | |
| "timestamp_utc": datetime.utcnow().isoformat() + "Z", | |
| "profile": profile, | |
| "noise_distribution": noise_dist, | |
| "schedule_type": schedule_type, | |
| "schedule_params": params, | |
| "seed": int(seed), | |
| "samples_per_step": int(max(1, samples)), | |
| "about_version": RFT_VERSION, | |
| "about_doi": RFT_DOI, | |
| } | |
| results = { | |
| "series": results_steps, | |
| "summary": summary, | |
| "rft_notice": LEGAL_NOTICE | |
| } | |
| # write bundle | |
| run_dir = f"/tmp/rft_run_{run_id}" | |
| zip_path, digest = write_bundle(run_dir, config, results) | |
| # minimal top-level response | |
| head = { | |
| "run_id": run_id, | |
| "sha512": digest, | |
| "sha512_short": digest[:16] + "…", | |
| "steps": summary["steps"], | |
| "counts": { | |
| "nominal": summary["nominal_count"], | |
| "perturbed": summary["perturbed_count"], | |
| "critical": summary["critical_count"], | |
| }, | |
| "rft_notice": LEGAL_NOTICE | |
| } | |
| return {"config": config, "head": head, "results": results}, zip_path | |
| # ------------------ Gradio UI ------------------------------- | |
| DEFAULT_PARAMS = json.dumps({ | |
| "sigma": 0.05 # valid for "single" | |
| }, indent=2) | |
| HELP_TEXT = ( | |
| "Schedule JSON examples:\n" | |
| "- single: {\"sigma\": 0.05}\n" | |
| "- ramp: {\"start\": 0.0, \"stop\": 0.30, \"steps\": 12}\n" | |
| "- random: {\"min\": 0.01, \"max\": 0.30, \"steps\": 10, \"seed\": 0}\n" | |
| "- impulse:{\"base\": 0.05, \"spike\": 0.25, \"at\": 5, \"steps\": 12}\n" | |
| "- step: {\"before\": 0.05, \"after\": 0.20, \"at\": 6, \"steps\": 12}\n" | |
| ) | |
| with gr.Blocks(title="RFT-Ω Total-Proof API") as demo: | |
| gr.Markdown(f"### RFT-Ω Total-Proof API ({RFT_VERSION}) \n" | |
| f"A deterministic, signed validation harness for harmonic stability (QΩ) and coherence (ζ_sync). \n" | |
| f"**DOI:** {RFT_DOI} \n" | |
| f"**Legal:** {LEGAL_NOTICE}") | |
| with gr.Row(): | |
| profile = gr.Radio(list(PROFILES.keys()), label="System Profile", value="AI / Neural") | |
| noise_dist = gr.Radio(["gauss", "uniform"], label="Noise Distribution", value="gauss") | |
| with gr.Row(): | |
| schedule_type = gr.Radio(["single", "ramp", "random", "impulse", "step"], label="Schedule Type", value="single") | |
| seed_in = gr.Number(value=123, precision=0, label="Seed (int)") | |
| samples_in = gr.Slider(1, 20, value=5, step=1, label="Samples per Step") | |
| schedule_params = gr.Code(value=DEFAULT_PARAMS, language="json", label="Schedule Parameters (JSON)") | |
| gr.Markdown(HELP_TEXT) | |
| run_btn = gr.Button("Run Deterministic Simulation & Sign Results") | |
| out_json = gr.JSON(label="Signed Run Summary (config, head, results)") | |
| out_bundle = gr.File(label="Download Signed Bundle (zip)") | |
| def _on_run(p, nd, st, sp_json, seed, samples): | |
| data, zip_path = run_total_proof(p, nd, st, sp_json, int(seed), int(samples)) | |
| return data, zip_path | |
| run_btn.click(_on_run, inputs=[profile, noise_dist, schedule_type, schedule_params, seed_in, samples_in], | |
| outputs=[out_json, out_bundle]) | |
| gr.Markdown("**Ops endpoints:** `/healthz`, `/about` (FastAPI mounted if available).") | |
| # ------------------ FastAPI Mount (optional) ---------------- | |
| if HAVE_FASTAPI: | |
| api = FastAPI(title="RFT-Ω Total-Proof Ops") | |
| def healthz(): | |
| return {"ok": True, "service": "RFT-Ω Total-Proof", "version": RFT_VERSION} | |
| def about(): | |
| return ABOUT_BLOCK | |
| app = gr.mount_gradio_app(api, demo, path="/") | |
| else: | |
| # Gradio app only | |
| app = demo | |
| # ------------------ Main ----------------------------------- | |
| if __name__ == "__main__": | |
| # Gradio will handle launch in HF; local dev: | |
| demo.launch(server_name="0.0.0.0", server_port=7860) |