""" ╔══════════════════════════════════════════════════════╗ ║ OBSIDIAN FORGE — Kinetic RLM Server ║ ║ ║ ║ PHASE 1 (<1ms): Bind 0.0.0.0:7860, serve 200 OK ║ ║ PHASE 2 (bg): Lazy-import OmniVibeEngine, ║ ║ PoseArchitect, PosePainter, ║ ║ PoseAuditor, ReflectSelect. ║ ║ ║ ║ v4.0: Obsidian Forge — Absolute Black · RLM Core ║ ╚══════════════════════════════════════════════════════╝ """ import json, os, sys, asyncio, time from pathlib import Path from aiohttp import web # ─── CONSTANTS (no heavy imports) ─── PORT = int(os.environ.get("WIZARD_PORT", 7860)) STATIC = Path(__file__).parent / "static" CANONICAL_REPO = "dryymatt/Wizard-Vibe-Studio" # ─── Engine refs — lazy-loaded in Phase 2 ─── _engine = None _ghost = None _reflect = None _engine_loaded = False # ────────────────────────────────────────────────────── # PHASE 1: Instant HTML — absolute black loading page # ────────────────────────────────────────────────────── LOADING_HTML = """ Obsidian Forge

Build anything.

Obsidian Forge initializing…

0.0.0.0:7860 · RLM Core

""" # ────────────────────────────────────────────────────── # PHASE 1 HANDLERS # ────────────────────────────────────────────────────── async def handle_root_get(req): if _engine_loaded: fp = STATIC / "index.html" if fp.exists(): return web.Response(text=fp.read_text(), content_type="text/html") return web.Response(text=LOADING_HTML, content_type="text/html") async def handle_health(req): return web.json_response({ "status": "alive", "engine": "Obsidian Forge — Kinetic RLM Environment", "engine_loaded": _engine_loaded, "binding": "0.0.0.0:7860", "port": PORT, "hf_token": bool(os.environ.get("HF_TOKEN")), }) async def handle_agent(req): return web.json_response({ "name": "obsidian-forge", "description": "Obsidian Forge — Kinetic RLM Environment. Build anything. Recursive Language Modeling core with Context Folding and Quantum Sandbox.", "url": "https://dryymatt-obsidian-forge.hf.space", "provider": {"organization": "Obsidian Forge — Litehat System", "url": "https://huggingface.co/dryymatt"}, "version": "4.0.0", "capabilities": {"streaming": True, "ghostDeploy": True, "rlm": True, "contextFolding": True, "quantumSandbox": True, "silentHandshake": True}, }) # ────────────────────────────────────────────────────── # E2E HANDSHAKE — enhanced for Obsidian Forge # ────────────────────────────────────────────────────── async def handle_verify_handshake(req): """ POST /api/verify-handshake Silent DNS + HTTP verification for litheat.app and deployed Spaces. Body: {"url": "litheat.app" or "https://deployed-space.hf.space"} Returns verification status, latency, and detailed checks. """ try: data = await req.json() target = data.get("url", "") except Exception: target = "" if not target: return web.json_response({"verified": False, "error": "No URL provided"}, status=400) if not target.startswith("http"): target = f"https://{target}" results = {"url": target, "verified": False, "latency_ms": None, "checks": {}} # Check 1: DNS resolution import socket t0 = time.time() host = target.replace("https://", "").replace("http://", "").split("/")[0] try: socket.getaddrinfo(host, 443 if target.startswith("https") else 80) results["checks"]["dns"] = True except Exception as e: results["checks"]["dns"] = False results["error"] = f"DNS: {str(e)[:80]}" return web.json_response(results, status=200) # Check 2: HTTP reachability import urllib.request try: req = urllib.request.Request(target, headers={"User-Agent": "Obsidian-Forge-Handshake/4.0"}) resp = urllib.request.urlopen(req, timeout=8) latency = round((time.time() - t0) * 1000) results["latency_ms"] = latency results["checks"]["http"] = True results["checks"]["status_code"] = resp.getcode() # Accept 200-499 (some sites return 301/302/401 — still reachable) if 200 <= resp.getcode() < 500: results["verified"] = True else: results["error"] = f"HTTP {resp.getcode()}" except Exception as e: results["checks"]["http"] = False results["error"] = f"HTTP: {str(e)[:120]}" return web.json_response(results) async def handle_static(req): path = req.match_info.get("path", "index.html") fp = STATIC / path if not fp.exists(): return web.Response(text="Not found", status=404) ct = {".html": "text/html", ".css": "text/css", ".js": "application/javascript"} return web.Response(text=fp.read_text(), content_type=ct.get(fp.suffix, "text/plain")) # ────────────────────────────────────────────────────── # PHASE 2: Heavy imports, lazy-loaded # ────────────────────────────────────────────────────── def _get_engine(): global _engine, _ghost, _reflect, _engine_loaded if _engine is None: from core import state as st, sandbox_validate from ghost_deploy import ghost as g _engine = st.engine _ghost = g _reflect = st.reflect _engine_loaded = True print("◈ Obsidian Forge: RLM Engine LAZY-LOADED") from core import state as st return st, _ghost def _sse(ev, d): return f"event: {ev}\ndata: {json.dumps(d)}\n\n" async def stream_gen(sid, prompt): st, ghost = _get_engine() session = st.sessions[sid] session["status"] = "streaming" st.sandbox[sid] = "building" yield _sse("phase", {"phase": "pose"}) await asyncio.sleep(0.02) pose_plan = st.engine.pose(prompt) yield _sse("pose", { "architect": { "stack": pose_plan["architect"]["stack"], "features": pose_plan["architect"]["features"], "files": pose_plan["architect"]["files_needed"], }, "painter": pose_plan["painter"], "auditor": pose_plan["auditor"], }) yield _sse("phase", {"phase": "generate"}) await asyncio.sleep(0.02) t0 = time.time() code, schema = st.engine.generate(prompt) elapsed = (time.time() - t0) * 1000 chunk_size = max(1, len(code) // 30) for i in range(0, len(code), chunk_size): yield _sse("code", {"chunk": code[i:i+chunk_size], "partial": code[:i+chunk_size], "progress": min(100, int((i+chunk_size)/len(code)*100))}) await asyncio.sleep(0.01) st.codes[sid] = code st.sessions[sid]["schema"] = schema yield _sse("generated", {"chars": len(code), "elapsed_ms": round(elapsed, 1)}) yield _sse("phase", {"phase": "audit"}) await asyncio.sleep(0.02) findings = st.engine.auditor.audit(code, schema) yield _sse("audit", {"findings": len(findings), "errors": sum(1 for f in findings if f.severity=="ERROR"), "warnings": sum(1 for f in findings if f.severity=="WARN")}) yield _sse("phase", {"phase": "heal"}) healed, found, fixed = st.reflect.heal(code) for i in range(15): from core import sandbox_validate result = sandbox_validate(healed) if result["success"]: break healed, _, extra = st.reflect.heal(healed, result["errors"]) await asyncio.sleep(0.01) yield _sse("phase", {"phase": "sandbox"}) from core import sandbox_validate result = sandbox_validate(healed) if result["success"]: st.sandbox[sid] = "stable"; st.publish_ready[sid] = True yield _sse("sandbox", {"status": "stable"}) else: st.sandbox[sid] = "error" yield _sse("sandbox", {"status": "error"}) session["status"] = "complete" yield _sse("done", {"status": st.sandbox[sid]}) async def handle_stream(req): _get_engine() import uuid as _uuid sid = str(_uuid.uuid4())[:8] d = await req.json(); prompt = d.get("prompt", "") st, _ = _get_engine() st.sessions[sid] = {"id": sid, "prompt": prompt, "status": "init"} st.sandbox[sid] = "building"; st.publish_ready[sid] = False resp = web.StreamResponse(status=200, headers={ "Content-Type": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}) await resp.prepare(req) try: async for ev in stream_gen(sid, prompt): await resp.write(ev.encode()) await resp.write(b"event: close\ndata: {}\n\n") except Exception as e: await resp.write(f"event: error\ndata: {json.dumps({'error':str(e)})}\n\n".encode()) return resp async def handle_publish(req): st, ghost = _get_engine() d = await req.json(); sid = d.get("session_id") if not st.publish_ready.get(sid): return web.json_response({"success":False,"error":"Sandbox not stable"}, status=400) code = st.codes.get(sid,"") if not code: return web.json_response({"success":False,"error":"No code"}, status=400) description = d.get("description", st.sessions.get(sid,{}).get("prompt","")) result = await ghost.publish(code, d.get("vibe_name",f"obsidian-{sid}"), description[:200] if description else "", port=PORT) if result.get("success"): st.sessions[sid]["published"] = True st.sessions[sid]["deploy_url"] = result.get("space_url") return web.json_response(result) async def handle_status(req): st, _ = _get_engine() sid = req.query.get("session_id", "") if sid in st.sessions: s = st.sessions[sid] return web.json_response({"status":s["status"],"sandbox":st.sandbox.get(sid), "ready":st.publish_ready.get(sid),"deploy_url":s.get("deploy_url")}) return web.json_response({"sessions":len(st.sessions),"engine":"obsidian-forge-rlm"}) async def handle_vibes(req): _, ghost = _get_engine() vibes = await ghost.list_vibes() return web.json_response({"vibes":vibes,"count":len(vibes),"registry":CANONICAL_REPO}) async def handle_tunnel(req): _, ghost = _get_engine() try: url = await ghost.tunnel.up(PORT, timeout=10.0) return web.json_response({"success":bool(url),"url":url}) except Exception as e: return web.json_response({"success":False,"error":str(e)}, status=500) async def handle_schema(req): st, _ = _get_engine() sid = req.query.get("session_id", "") return web.json_response(st.sessions.get(sid,{}).get("schema",{})) async def handle_preview(req): st, _ = _get_engine() sid = req.query.get("session_id", "") return web.Response(text=st.codes.get(sid,""), content_type="text/html") # ────────────────────────────────────────────────────── # APP BUILD # ────────────────────────────────────────────────────── def create_app(): app = web.Application() # Phase 1 routes app.router.add_get("/", handle_root_get) app.router.add_get("/api/health", handle_health) app.router.add_get("/.well-known/agent.json", handle_agent) app.router.add_post("/api/verify-handshake", handle_verify_handshake) # Phase 2 routes app.router.add_post("/api/stream", handle_stream) app.router.add_post("/api/publish", handle_publish) app.router.add_get("/api/status", handle_status) app.router.add_get("/api/preview", handle_preview) app.router.add_get("/api/vibes", handle_vibes) app.router.add_get("/api/tunnel", handle_tunnel) app.router.add_get("/api/schema", handle_schema) app.router.add_get("/{path:.*}", handle_static) return app def main(): import subprocess try: result = subprocess.run( ["fuser", "-k", f"{PORT}/tcp"], capture_output=True, timeout=5 ) if result.returncode == 0: print(f"◈ Killed ghost on port {PORT}") except Exception: pass print(f"◈ Obsidian Forge v4.0 — Kinetic RLM Environment") print(f" Binding : 0.0.0.0:{PORT}") print(f" Health pulse : 200 OK in <1ms") print(f" Loading HTML : absolute black · 1px glass") print(f" Registry : {CANONICAL_REPO}") print(f" RLM Core : Recursive Language Modeling") print(f" Handshake : silent DNS + poll-until-live") loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) async def warm_start(): await asyncio.sleep(0.5) print("◈ Warm Start: loading RLM engine…") try: st, ghost = _get_engine() print(f"◈ RLM Engine: loaded ({_engine_loaded})") print(f" Poses: Architect, Painter, Auditor — synchronized") except Exception as e: print(f"◈ Engine warning (non-fatal): {e}") loop.create_task(warm_start()) web.run_app(create_app(), host="0.0.0.0", port=PORT, handle_signals=True) if __name__ == "__main__": main()