"""
╔══════════════════════════════════════════════════════╗
║ OMNI-VIBE STUDIO — INSTANT WAKE Server ║
║ ║
║ PHASE 1 (<1ms): Bind 0.0.0.0:7860, serve 200 OK ║
║ health + loading HTML immediately. ║
║ PHASE 2 (bg): Lazy-import OmniVibeEngine, ║
║ PoseArchitect, PosePainter, ║
║ PoseAuditor, ReflectSelect. ║
║ ║
║ No import of core/ghost_deploy at module level. ║
║ First byte on wire within milliseconds of CMD. ║
╚══════════════════════════════════════════════════════╝
"""
import json, os, sys, asyncio, time
from pathlib import Path
from aiohttp import web
# ─── CONSTANTS (no heavy imports) ───
PORT = int(os.environ.get("WIZARD_PORT", 7860))
STATIC = Path(__file__).parent / "static"
CANONICAL_REPO = "dryymatt/Wizard-Vibe-Studio"
WIZARD_HAT_COLOR = "steady-gold"
# ─── Engine refs — lazy-loaded in Phase 2 ───
_engine = None
_ghost = None
_reflect = None
_engine_loaded = False
# ──────────────────────────────────────────────────────
# PHASE 1: Instant HTML — served before any imports
# ──────────────────────────────────────────────────────
LOADING_HTML = """
Omni-Vibe Studio
Omni-Vibe Studio
Specialized Swarm — Pose Architect · Painter · Auditor
Initializing LiteRT engine…
0.0.0.0:7860 · Steady Gold
"""
async def handle_root_get(req):
if _engine_loaded:
fp = STATIC / "index.html"
if fp.exists():
return web.Response(text=fp.read_text(), content_type="text/html")
return web.Response(text=LOADING_HTML, content_type="text/html")
async def handle_health(req):
return web.json_response({
"status": "alive",
"engine": "Omni-Vibe Studio — Specialized Swarm",
"engine_loaded": _engine_loaded,
"binding": "0.0.0.0:7860",
"port": PORT,
"hat": WIZARD_HAT_COLOR,
"hf_token": bool(os.environ.get("HF_TOKEN")),
})
async def handle_agent(req):
return web.json_response({
"name": "omni-vibe-studio",
"description": "Omni-Vibe Studio — Specialized AI Swarm. 0.0.0.0:7860. LiteRT engine.",
"url": "https://dryymatt-wizard-vibe-studio-v2.hf.space",
"provider": {"organization": "Omni-Vibe Studio — Litehat System",
"url": "https://huggingface.co/dryymatt"},
"version": "3.0.0",
"capabilities": {"streaming": True, "ghostDeploy": True, "liquidGlass": True},
})
async def handle_static(req):
path = req.match_info.get("path", "index.html")
fp = STATIC / path
if not fp.exists():
return web.Response(text="Not found", status=404)
ct = {".html": "text/html", ".css": "text/css", ".js": "application/javascript"}
return web.Response(text=fp.read_text(), content_type=ct.get(fp.suffix, "text/plain"))
def _get_engine():
global _engine, _ghost, _reflect, _engine_loaded
if _engine is None:
from core import state as st, sandbox_validate
from ghost_deploy import ghost as g
_engine = st.engine
_ghost = g
_reflect = st.reflect
_engine_loaded = True
print("⚡ Omni-Vibe Engine: LAZY-LOADED")
from core import state as st
return st, _ghost
def _sse(ev, d):
return f"event: {ev}\ndata: {json.dumps(d)}\n\n"
async def stream_gen(sid, prompt):
st, ghost = _get_engine()
session = st.sessions[sid]
session["status"] = "streaming"
st.sandbox[sid] = "building"
yield _sse("phase", {"phase": "pose", "hat": WIZARD_HAT_COLOR})
await asyncio.sleep(0.02)
pose_plan = st.engine.pose(prompt)
yield _sse("pose", {"architect": {"stack":pose_plan["architect"]["stack"],"features":pose_plan["architect"]["features"],"files":pose_plan["architect"]["files_needed"]},"painter":pose_plan["painter"],"auditor":pose_plan["auditor"]})
yield _sse("phase", {"phase": "generate"})
await asyncio.sleep(0.02)
t0 = time.time()
code, schema = st.engine.generate(prompt)
elapsed = (time.time()-t0)*1000
chunk_size = max(1, len(code)//30)
for i in range(0, len(code), chunk_size):
yield _sse("code", {"chunk": code[i:i+chunk_size], "partial": code[:i+chunk_size], "progress": min(100, int((i+chunk_size)/len(code)*100))})
await asyncio.sleep(0.01)
st.codes[sid] = code
st.sessions[sid]["schema"] = schema
yield _sse("generated", {"chars": len(code), "elapsed_ms": round(elapsed,1)})
yield _sse("phase", {"phase": "audit"})
await asyncio.sleep(0.02)
findings = st.engine.auditor.audit(code, schema)
yield _sse("audit", {"findings":len(findings),"errors":sum(1 for f in findings if f.severity=="ERROR"),"warnings":sum(1 for f in findings if f.severity=="WARN")})
yield _sse("phase", {"phase": "heal"})
code, auto_fixes = st.engine.auditor.heal_findings(code, findings)
st.codes[sid] = code
healed, found, fixed = st.reflect.heal(code)
for i in range(15):
from core import sandbox_validate
result = sandbox_validate(healed)
if result["success"]: break
healed, _, extra = st.reflect.heal(healed, result["errors"])
await asyncio.sleep(0.01)
yield _sse("phase", {"phase": "sandbox"})
from core import sandbox_validate
result = sandbox_validate(healed)
if result["success"]:
st.sandbox[sid] = "stable"; st.publish_ready[sid] = True
yield _sse("sandbox", {"status":"stable","hat":"steady-gold"})
else:
st.sandbox[sid] = "error"
yield _sse("sandbox", {"status":"error"})
session["status"] = "complete"
yield _sse("done", {"status":st.sandbox[sid]})
async def handle_stream(req):
_get_engine()
sid = __import__('uuid').uuid4().hex[:8]
d = await req.json(); prompt = d.get("prompt","")
st, _ = _get_engine()
st.sessions[sid] = {"id":sid,"prompt":prompt,"status":"init"}
st.sandbox[sid] = "building"; st.publish_ready[sid] = False
resp = web.StreamResponse(status=200, headers={"Content-Type":"text/event-stream","Cache-Control":"no-cache","Connection":"keep-alive","X-Accel-Buffering":"no"})
await resp.prepare(req)
try:
async for ev in stream_gen(sid, prompt): await resp.write(ev.encode())
await resp.write(b"event: close\ndata: {}\n\n")
except Exception as e:
await resp.write(f"event: error\ndata: {json.dumps({'error':str(e)})}\n\n".encode())
return resp
async def handle_publish(req):
st, ghost = _get_engine()
d = await req.json(); sid = d.get("session_id")
if not st.publish_ready.get(sid):
return web.json_response({"success":False,"error":"Sandbox not stable"}, status=400)
code = st.codes.get(sid,"")
if not code: return web.json_response({"success":False,"error":"No code"}, status=400)
description = d.get("description", st.sessions.get(sid,{}).get("prompt",""))
result = await ghost.publish(code, d.get("vibe_name",f"omni-vibe-{sid}"), description[:200] if description else "", port=PORT)
if result.get("success"):
st.sessions[sid]["published"] = True
st.sessions[sid]["deploy_url"] = result.get("space_url")
return web.json_response(result)
async def handle_status(req):
st, _ = _get_engine()
sid = req.query.get("session_id","")
if sid in st.sessions:
s = st.sessions[sid]
return web.json_response({"status":s["status"],"sandbox":st.sandbox.get(sid),"ready":st.publish_ready.get(sid),"deploy_url":s.get("deploy_url"),"hat":"steady-gold" if st.publish_ready.get(sid) else "cyan-blink"})
return web.json_response({"sessions":len(st.sessions),"engine":"omni-vibe","hat":WIZARD_HAT_COLOR})
async def handle_vibes(req):
_, ghost = _get_engine()
vibes = await ghost.list_vibes()
return web.json_response({"vibes":vibes,"count":len(vibes),"registry":CANONICAL_REPO})
async def handle_tunnel(req):
_, ghost = _get_engine()
try:
url = await ghost.tunnel.up(PORT, timeout=10.0)
return web.json_response({"success":bool(url),"url":url})
except Exception as e:
return web.json_response({"success":False,"error":str(e)}, status=500)
async def handle_schema(req):
st, _ = _get_engine()
sid = req.query.get("session_id","")
return web.json_response(st.sessions.get(sid,{}).get("schema",{}))
async def handle_preview(req):
st, _ = _get_engine()
sid = req.query.get("session_id","")
return web.Response(text=st.codes.get(sid,""), content_type="text/html")
def create_app():
app = web.Application()
# Phase 1: no engine needed
app.router.add_get("/", handle_root_get)
app.router.add_get("/api/health", handle_health)
app.router.add_get("/.well-known/agent.json", handle_agent)
# Phase 2: lazy-load engine
app.router.add_post("/api/stream", handle_stream)
app.router.add_post("/api/publish", handle_publish)
app.router.add_get("/api/status", handle_status)
app.router.add_get("/api/preview", handle_preview)
app.router.add_get("/api/vibes", handle_vibes)
app.router.add_get("/api/tunnel", handle_tunnel)
app.router.add_get("/api/schema", handle_schema)
app.router.add_get("/{path:.*}", handle_static)
return app
def main():
import subprocess
try:
subprocess.run(["fuser","-k",f"{PORT}/tcp"], capture_output=True, timeout=5)
except Exception: pass
print(f"🧙♂️ Omni-Vibe Studio — INSTANT WAKE")
print(f" Binding : 0.0.0.0:{PORT}")
print(f" Health pulse : 200 OK in <1ms")
print(f" Loading HTML : served before engine import")
print(f" Registry : {CANONICAL_REPO}")
print(f" Hat : {WIZARD_HAT_COLOR}")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def warm_start():
await asyncio.sleep(0.5)
print("⚡ Warm Start: loading OmniVibeEngine…")
try:
_get_engine()
print(f"⚡ Engine active — all Poses synchronized")
except Exception as e:
print(f"⚠ Engine load warning: {e}")
loop.create_task(warm_start())
web.run_app(create_app(), host="0.0.0.0", port=PORT, handle_signals=True)
if __name__ == "__main__":
main()