""" ╔══════════════════════════════════════════════════════╗ ║ OMNI-VIBE STUDIO — INSTANT WAKE Server ║ ║ ║ ║ PHASE 1 (<1ms): Bind 0.0.0.0:7860, serve 200 OK ║ ║ health + loading HTML immediately. ║ ║ PHASE 2 (bg): Lazy-import OmniVibeEngine, ║ ║ PoseArchitect, PosePainter, ║ ║ PoseAuditor, ReflectSelect. ║ ║ ║ ║ No import of core/ghost_deploy at module level. ║ ║ First byte on wire within milliseconds of CMD. ║ ╚══════════════════════════════════════════════════════╝ """ import json, os, sys, asyncio, time from pathlib import Path from aiohttp import web # ─── CONSTANTS (no heavy imports) ─── PORT = int(os.environ.get("WIZARD_PORT", 7860)) STATIC = Path(__file__).parent / "static" CANONICAL_REPO = "dryymatt/Wizard-Vibe-Studio" WIZARD_HAT_COLOR = "steady-gold" # ─── Engine refs — lazy-loaded in Phase 2 ─── _engine = None _ghost = None _reflect = None _engine_loaded = False # ────────────────────────────────────────────────────── # PHASE 1: Instant HTML — served before any imports # ────────────────────────────────────────────────────── LOADING_HTML = """ Omni-Vibe Studio

Omni-Vibe Studio

Specialized Swarm — Pose Architect · Painter · Auditor

Initializing LiteRT engine…

0.0.0.0:7860 · Steady Gold

""" async def handle_root_get(req): if _engine_loaded: fp = STATIC / "index.html" if fp.exists(): return web.Response(text=fp.read_text(), content_type="text/html") return web.Response(text=LOADING_HTML, content_type="text/html") async def handle_health(req): return web.json_response({ "status": "alive", "engine": "Omni-Vibe Studio — Specialized Swarm", "engine_loaded": _engine_loaded, "binding": "0.0.0.0:7860", "port": PORT, "hat": WIZARD_HAT_COLOR, "hf_token": bool(os.environ.get("HF_TOKEN")), }) async def handle_agent(req): return web.json_response({ "name": "omni-vibe-studio", "description": "Omni-Vibe Studio — Specialized AI Swarm. 0.0.0.0:7860. LiteRT engine.", "url": "https://dryymatt-wizard-vibe-studio-v2.hf.space", "provider": {"organization": "Omni-Vibe Studio — Litehat System", "url": "https://huggingface.co/dryymatt"}, "version": "3.0.0", "capabilities": {"streaming": True, "ghostDeploy": True, "liquidGlass": True}, }) async def handle_static(req): path = req.match_info.get("path", "index.html") fp = STATIC / path if not fp.exists(): return web.Response(text="Not found", status=404) ct = {".html": "text/html", ".css": "text/css", ".js": "application/javascript"} return web.Response(text=fp.read_text(), content_type=ct.get(fp.suffix, "text/plain")) def _get_engine(): global _engine, _ghost, _reflect, _engine_loaded if _engine is None: from core import state as st, sandbox_validate from ghost_deploy import ghost as g _engine = st.engine _ghost = g _reflect = st.reflect _engine_loaded = True print("⚡ Omni-Vibe Engine: LAZY-LOADED") from core import state as st return st, _ghost def _sse(ev, d): return f"event: {ev}\ndata: {json.dumps(d)}\n\n" async def stream_gen(sid, prompt): st, ghost = _get_engine() session = st.sessions[sid] session["status"] = "streaming" st.sandbox[sid] = "building" yield _sse("phase", {"phase": "pose", "hat": WIZARD_HAT_COLOR}) await asyncio.sleep(0.02) pose_plan = st.engine.pose(prompt) yield _sse("pose", {"architect": {"stack":pose_plan["architect"]["stack"],"features":pose_plan["architect"]["features"],"files":pose_plan["architect"]["files_needed"]},"painter":pose_plan["painter"],"auditor":pose_plan["auditor"]}) yield _sse("phase", {"phase": "generate"}) await asyncio.sleep(0.02) t0 = time.time() code, schema = st.engine.generate(prompt) elapsed = (time.time()-t0)*1000 chunk_size = max(1, len(code)//30) for i in range(0, len(code), chunk_size): yield _sse("code", {"chunk": code[i:i+chunk_size], "partial": code[:i+chunk_size], "progress": min(100, int((i+chunk_size)/len(code)*100))}) await asyncio.sleep(0.01) st.codes[sid] = code st.sessions[sid]["schema"] = schema yield _sse("generated", {"chars": len(code), "elapsed_ms": round(elapsed,1)}) yield _sse("phase", {"phase": "audit"}) await asyncio.sleep(0.02) findings = st.engine.auditor.audit(code, schema) yield _sse("audit", {"findings":len(findings),"errors":sum(1 for f in findings if f.severity=="ERROR"),"warnings":sum(1 for f in findings if f.severity=="WARN")}) yield _sse("phase", {"phase": "heal"}) code, auto_fixes = st.engine.auditor.heal_findings(code, findings) st.codes[sid] = code healed, found, fixed = st.reflect.heal(code) for i in range(15): from core import sandbox_validate result = sandbox_validate(healed) if result["success"]: break healed, _, extra = st.reflect.heal(healed, result["errors"]) await asyncio.sleep(0.01) yield _sse("phase", {"phase": "sandbox"}) from core import sandbox_validate result = sandbox_validate(healed) if result["success"]: st.sandbox[sid] = "stable"; st.publish_ready[sid] = True yield _sse("sandbox", {"status":"stable","hat":"steady-gold"}) else: st.sandbox[sid] = "error" yield _sse("sandbox", {"status":"error"}) session["status"] = "complete" yield _sse("done", {"status":st.sandbox[sid]}) async def handle_stream(req): _get_engine() sid = __import__('uuid').uuid4().hex[:8] d = await req.json(); prompt = d.get("prompt","") st, _ = _get_engine() st.sessions[sid] = {"id":sid,"prompt":prompt,"status":"init"} st.sandbox[sid] = "building"; st.publish_ready[sid] = False resp = web.StreamResponse(status=200, headers={"Content-Type":"text/event-stream","Cache-Control":"no-cache","Connection":"keep-alive","X-Accel-Buffering":"no"}) await resp.prepare(req) try: async for ev in stream_gen(sid, prompt): await resp.write(ev.encode()) await resp.write(b"event: close\ndata: {}\n\n") except Exception as e: await resp.write(f"event: error\ndata: {json.dumps({'error':str(e)})}\n\n".encode()) return resp async def handle_publish(req): st, ghost = _get_engine() d = await req.json(); sid = d.get("session_id") if not st.publish_ready.get(sid): return web.json_response({"success":False,"error":"Sandbox not stable"}, status=400) code = st.codes.get(sid,"") if not code: return web.json_response({"success":False,"error":"No code"}, status=400) description = d.get("description", st.sessions.get(sid,{}).get("prompt","")) result = await ghost.publish(code, d.get("vibe_name",f"omni-vibe-{sid}"), description[:200] if description else "", port=PORT) if result.get("success"): st.sessions[sid]["published"] = True st.sessions[sid]["deploy_url"] = result.get("space_url") return web.json_response(result) async def handle_status(req): st, _ = _get_engine() sid = req.query.get("session_id","") if sid in st.sessions: s = st.sessions[sid] return web.json_response({"status":s["status"],"sandbox":st.sandbox.get(sid),"ready":st.publish_ready.get(sid),"deploy_url":s.get("deploy_url"),"hat":"steady-gold" if st.publish_ready.get(sid) else "cyan-blink"}) return web.json_response({"sessions":len(st.sessions),"engine":"omni-vibe","hat":WIZARD_HAT_COLOR}) async def handle_vibes(req): _, ghost = _get_engine() vibes = await ghost.list_vibes() return web.json_response({"vibes":vibes,"count":len(vibes),"registry":CANONICAL_REPO}) async def handle_tunnel(req): _, ghost = _get_engine() try: url = await ghost.tunnel.up(PORT, timeout=10.0) return web.json_response({"success":bool(url),"url":url}) except Exception as e: return web.json_response({"success":False,"error":str(e)}, status=500) async def handle_schema(req): st, _ = _get_engine() sid = req.query.get("session_id","") return web.json_response(st.sessions.get(sid,{}).get("schema",{})) async def handle_preview(req): st, _ = _get_engine() sid = req.query.get("session_id","") return web.Response(text=st.codes.get(sid,""), content_type="text/html") def create_app(): app = web.Application() # Phase 1: no engine needed app.router.add_get("/", handle_root_get) app.router.add_get("/api/health", handle_health) app.router.add_get("/.well-known/agent.json", handle_agent) # Phase 2: lazy-load engine app.router.add_post("/api/stream", handle_stream) app.router.add_post("/api/publish", handle_publish) app.router.add_get("/api/status", handle_status) app.router.add_get("/api/preview", handle_preview) app.router.add_get("/api/vibes", handle_vibes) app.router.add_get("/api/tunnel", handle_tunnel) app.router.add_get("/api/schema", handle_schema) app.router.add_get("/{path:.*}", handle_static) return app def main(): import subprocess try: subprocess.run(["fuser","-k",f"{PORT}/tcp"], capture_output=True, timeout=5) except Exception: pass print(f"🧙‍♂️ Omni-Vibe Studio — INSTANT WAKE") print(f" Binding : 0.0.0.0:{PORT}") print(f" Health pulse : 200 OK in <1ms") print(f" Loading HTML : served before engine import") print(f" Registry : {CANONICAL_REPO}") print(f" Hat : {WIZARD_HAT_COLOR}") loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) async def warm_start(): await asyncio.sleep(0.5) print("⚡ Warm Start: loading OmniVibeEngine…") try: _get_engine() print(f"⚡ Engine active — all Poses synchronized") except Exception as e: print(f"⚠ Engine load warning: {e}") loop.create_task(warm_start()) web.run_app(create_app(), host="0.0.0.0", port=PORT, handle_signals=True) if __name__ == "__main__": main()