"""
╔══════════════════════════════════════════════════════╗
║ OBSIDIAN FORGE — Kinetic RLM Server ║
║ ║
║ PHASE 1 (<1ms): Bind 0.0.0.0:7860, serve 200 OK ║
║ PHASE 2 (bg): Lazy-import OmniVibeEngine, ║
║ PoseArchitect, PosePainter, ║
║ PoseAuditor, ReflectSelect. ║
║ ║
║ v4.0: Obsidian Forge — Absolute Black · RLM Core ║
╚══════════════════════════════════════════════════════╝
"""
import json, os, sys, asyncio, time
from pathlib import Path
from aiohttp import web
# ─── CONSTANTS (no heavy imports) ───
PORT = int(os.environ.get("WIZARD_PORT", 7860))
STATIC = Path(__file__).parent / "static"
CANONICAL_REPO = "dryymatt/Wizard-Vibe-Studio"
# ─── Engine refs — lazy-loaded in Phase 2 ───
_engine = None
_ghost = None
_reflect = None
_engine_loaded = False
# ──────────────────────────────────────────────────────
# PHASE 1: Instant HTML — absolute black loading page
# ──────────────────────────────────────────────────────
LOADING_HTML = """
Obsidian Forge
Build anything.
Obsidian Forge initializing…
0.0.0.0:7860 · RLM Core
"""
# ──────────────────────────────────────────────────────
# PHASE 1 HANDLERS
# ──────────────────────────────────────────────────────
async def handle_root_get(req):
if _engine_loaded:
fp = STATIC / "index.html"
if fp.exists():
return web.Response(text=fp.read_text(), content_type="text/html")
return web.Response(text=LOADING_HTML, content_type="text/html")
async def handle_health(req):
return web.json_response({
"status": "alive",
"engine": "Obsidian Forge — Kinetic RLM Environment",
"engine_loaded": _engine_loaded,
"binding": "0.0.0.0:7860",
"port": PORT,
"hf_token": bool(os.environ.get("HF_TOKEN")),
})
async def handle_agent(req):
return web.json_response({
"name": "obsidian-forge",
"description": "Obsidian Forge — Kinetic RLM Environment. Build anything. Recursive Language Modeling core with Context Folding and Quantum Sandbox.",
"url": "https://dryymatt-obsidian-forge.hf.space",
"provider": {"organization": "Obsidian Forge — Litehat System",
"url": "https://huggingface.co/dryymatt"},
"version": "4.0.0",
"capabilities": {"streaming": True, "ghostDeploy": True, "rlm": True, "contextFolding": True, "quantumSandbox": True, "silentHandshake": True},
})
# ──────────────────────────────────────────────────────
# E2E HANDSHAKE — enhanced for Obsidian Forge
# ──────────────────────────────────────────────────────
async def handle_verify_handshake(req):
"""
POST /api/verify-handshake
Silent DNS + HTTP verification for litheat.app and deployed Spaces.
Body: {"url": "litheat.app" or "https://deployed-space.hf.space"}
Returns verification status, latency, and detailed checks.
"""
try:
data = await req.json()
target = data.get("url", "")
except Exception:
target = ""
if not target:
return web.json_response({"verified": False, "error": "No URL provided"}, status=400)
if not target.startswith("http"):
target = f"https://{target}"
results = {"url": target, "verified": False, "latency_ms": None, "checks": {}}
# Check 1: DNS resolution
import socket
t0 = time.time()
host = target.replace("https://", "").replace("http://", "").split("/")[0]
try:
socket.getaddrinfo(host, 443 if target.startswith("https") else 80)
results["checks"]["dns"] = True
except Exception as e:
results["checks"]["dns"] = False
results["error"] = f"DNS: {str(e)[:80]}"
return web.json_response(results, status=200)
# Check 2: HTTP reachability
import urllib.request
try:
req = urllib.request.Request(target, headers={"User-Agent": "Obsidian-Forge-Handshake/4.0"})
resp = urllib.request.urlopen(req, timeout=8)
latency = round((time.time() - t0) * 1000)
results["latency_ms"] = latency
results["checks"]["http"] = True
results["checks"]["status_code"] = resp.getcode()
# Accept 200-499 (some sites return 301/302/401 — still reachable)
if 200 <= resp.getcode() < 500:
results["verified"] = True
else:
results["error"] = f"HTTP {resp.getcode()}"
except Exception as e:
results["checks"]["http"] = False
results["error"] = f"HTTP: {str(e)[:120]}"
return web.json_response(results)
async def handle_static(req):
path = req.match_info.get("path", "index.html")
fp = STATIC / path
if not fp.exists():
return web.Response(text="Not found", status=404)
ct = {".html": "text/html", ".css": "text/css", ".js": "application/javascript"}
return web.Response(text=fp.read_text(), content_type=ct.get(fp.suffix, "text/plain"))
# ──────────────────────────────────────────────────────
# PHASE 2: Heavy imports, lazy-loaded
# ──────────────────────────────────────────────────────
def _get_engine():
global _engine, _ghost, _reflect, _engine_loaded
if _engine is None:
from core import state as st, sandbox_validate
from ghost_deploy import ghost as g
_engine = st.engine
_ghost = g
_reflect = st.reflect
_engine_loaded = True
print("◈ Obsidian Forge: RLM Engine LAZY-LOADED")
from core import state as st
return st, _ghost
def _sse(ev, d):
return f"event: {ev}\ndata: {json.dumps(d)}\n\n"
async def stream_gen(sid, prompt):
st, ghost = _get_engine()
session = st.sessions[sid]
session["status"] = "streaming"
st.sandbox[sid] = "building"
yield _sse("phase", {"phase": "pose"})
await asyncio.sleep(0.02)
pose_plan = st.engine.pose(prompt)
yield _sse("pose", {
"architect": {
"stack": pose_plan["architect"]["stack"],
"features": pose_plan["architect"]["features"],
"files": pose_plan["architect"]["files_needed"],
},
"painter": pose_plan["painter"],
"auditor": pose_plan["auditor"],
})
yield _sse("phase", {"phase": "generate"})
await asyncio.sleep(0.02)
t0 = time.time()
code, schema = st.engine.generate(prompt)
elapsed = (time.time() - t0) * 1000
chunk_size = max(1, len(code) // 30)
for i in range(0, len(code), chunk_size):
yield _sse("code", {"chunk": code[i:i+chunk_size],
"partial": code[:i+chunk_size],
"progress": min(100, int((i+chunk_size)/len(code)*100))})
await asyncio.sleep(0.01)
st.codes[sid] = code
st.sessions[sid]["schema"] = schema
yield _sse("generated", {"chars": len(code), "elapsed_ms": round(elapsed, 1)})
yield _sse("phase", {"phase": "audit"})
await asyncio.sleep(0.02)
findings = st.engine.auditor.audit(code, schema)
yield _sse("audit", {"findings": len(findings),
"errors": sum(1 for f in findings if f.severity=="ERROR"),
"warnings": sum(1 for f in findings if f.severity=="WARN")})
yield _sse("phase", {"phase": "heal"})
healed, found, fixed = st.reflect.heal(code)
for i in range(15):
from core import sandbox_validate
result = sandbox_validate(healed)
if result["success"]: break
healed, _, extra = st.reflect.heal(healed, result["errors"])
await asyncio.sleep(0.01)
yield _sse("phase", {"phase": "sandbox"})
from core import sandbox_validate
result = sandbox_validate(healed)
if result["success"]:
st.sandbox[sid] = "stable"; st.publish_ready[sid] = True
yield _sse("sandbox", {"status": "stable"})
else:
st.sandbox[sid] = "error"
yield _sse("sandbox", {"status": "error"})
session["status"] = "complete"
yield _sse("done", {"status": st.sandbox[sid]})
async def handle_stream(req):
_get_engine()
import uuid as _uuid
sid = str(_uuid.uuid4())[:8]
d = await req.json(); prompt = d.get("prompt", "")
st, _ = _get_engine()
st.sessions[sid] = {"id": sid, "prompt": prompt, "status": "init"}
st.sandbox[sid] = "building"; st.publish_ready[sid] = False
resp = web.StreamResponse(status=200, headers={
"Content-Type": "text/event-stream", "Cache-Control": "no-cache",
"Connection": "keep-alive", "X-Accel-Buffering": "no"})
await resp.prepare(req)
try:
async for ev in stream_gen(sid, prompt):
await resp.write(ev.encode())
await resp.write(b"event: close\ndata: {}\n\n")
except Exception as e:
await resp.write(f"event: error\ndata: {json.dumps({'error':str(e)})}\n\n".encode())
return resp
async def handle_publish(req):
st, ghost = _get_engine()
d = await req.json(); sid = d.get("session_id")
if not st.publish_ready.get(sid):
return web.json_response({"success":False,"error":"Sandbox not stable"}, status=400)
code = st.codes.get(sid,"")
if not code: return web.json_response({"success":False,"error":"No code"}, status=400)
description = d.get("description", st.sessions.get(sid,{}).get("prompt",""))
result = await ghost.publish(code, d.get("vibe_name",f"obsidian-{sid}"),
description[:200] if description else "", port=PORT)
if result.get("success"):
st.sessions[sid]["published"] = True
st.sessions[sid]["deploy_url"] = result.get("space_url")
return web.json_response(result)
async def handle_status(req):
st, _ = _get_engine()
sid = req.query.get("session_id", "")
if sid in st.sessions:
s = st.sessions[sid]
return web.json_response({"status":s["status"],"sandbox":st.sandbox.get(sid),
"ready":st.publish_ready.get(sid),"deploy_url":s.get("deploy_url")})
return web.json_response({"sessions":len(st.sessions),"engine":"obsidian-forge-rlm"})
async def handle_vibes(req):
_, ghost = _get_engine()
vibes = await ghost.list_vibes()
return web.json_response({"vibes":vibes,"count":len(vibes),"registry":CANONICAL_REPO})
async def handle_tunnel(req):
_, ghost = _get_engine()
try:
url = await ghost.tunnel.up(PORT, timeout=10.0)
return web.json_response({"success":bool(url),"url":url})
except Exception as e:
return web.json_response({"success":False,"error":str(e)}, status=500)
async def handle_schema(req):
st, _ = _get_engine()
sid = req.query.get("session_id", "")
return web.json_response(st.sessions.get(sid,{}).get("schema",{}))
async def handle_preview(req):
st, _ = _get_engine()
sid = req.query.get("session_id", "")
return web.Response(text=st.codes.get(sid,""), content_type="text/html")
# ──────────────────────────────────────────────────────
# APP BUILD
# ──────────────────────────────────────────────────────
def create_app():
app = web.Application()
# Phase 1 routes
app.router.add_get("/", handle_root_get)
app.router.add_get("/api/health", handle_health)
app.router.add_get("/.well-known/agent.json", handle_agent)
app.router.add_post("/api/verify-handshake", handle_verify_handshake)
# Phase 2 routes
app.router.add_post("/api/stream", handle_stream)
app.router.add_post("/api/publish", handle_publish)
app.router.add_get("/api/status", handle_status)
app.router.add_get("/api/preview", handle_preview)
app.router.add_get("/api/vibes", handle_vibes)
app.router.add_get("/api/tunnel", handle_tunnel)
app.router.add_get("/api/schema", handle_schema)
app.router.add_get("/{path:.*}", handle_static)
return app
def main():
import subprocess
try:
result = subprocess.run(
["fuser", "-k", f"{PORT}/tcp"],
capture_output=True, timeout=5
)
if result.returncode == 0:
print(f"◈ Killed ghost on port {PORT}")
except Exception:
pass
print(f"◈ Obsidian Forge v4.0 — Kinetic RLM Environment")
print(f" Binding : 0.0.0.0:{PORT}")
print(f" Health pulse : 200 OK in <1ms")
print(f" Loading HTML : absolute black · 1px glass")
print(f" Registry : {CANONICAL_REPO}")
print(f" RLM Core : Recursive Language Modeling")
print(f" Handshake : silent DNS + poll-until-live")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def warm_start():
await asyncio.sleep(0.5)
print("◈ Warm Start: loading RLM engine…")
try:
st, ghost = _get_engine()
print(f"◈ RLM Engine: loaded ({_engine_loaded})")
print(f" Poses: Architect, Painter, Auditor — synchronized")
except Exception as e:
print(f"◈ Engine warning (non-fatal): {e}")
loop.create_task(warm_start())
web.run_app(create_app(), host="0.0.0.0", port=PORT, handle_signals=True)
if __name__ == "__main__":
main()