""" Wizard-Vibe Studio — Liquid Glass Server ========================================= SSE streaming server with AsyncIterator StreamResponse. Reflect-Select self-healing loop. Sandbox preview bridge. GitHub MCP + HF Spaces one-click deploy. A2A discoverable agent card at /.well-known/agent.json """ import asyncio import json import os import sys import time import uuid from pathlib import Path from datetime import datetime, timezone import aiohttp from aiohttp import web # Add parent for Litehat imports sys.path.insert(0, str(Path(__file__).parent.parent)) from orchestrator import LitehatOrchestrator from reflect_select import ReflectSelectLoop from github_mcp import GitHubMCP from deploy_pipeline import DeployPipeline # ─── Configuration ─────────────────────────────────────────────────────────── PORT = int(os.environ.get("WIZARD_PORT", 8765)) STATIC_DIR = Path(__file__).parent / "static" GENERATED_DIR = Path(__file__).parent / "generated" DEPLOY_DIR = Path(__file__).parent / "deploy" GENERATED_DIR.mkdir(parents=True, exist_ok=True) DEPLOY_DIR.mkdir(parents=True, exist_ok=True) # ─── Global State ───────────────────────────────────────────────────────────── class StudioState: def __init__(self): self.orchestrator = LitehatOrchestrator() self.reflect_loop = ReflectSelectLoop() self.github = GitHubMCP() self.deploy = DeployPipeline() self.active_sessions: dict = {} self.sandbox_status: dict = {} self.publish_ready: dict = {} self.generated_code: dict = {} state = StudioState() # ─── SSE Streaming Engine ───────────────────────────────────────────────────── async def stream_generator(session_id: str, prompt: str): """ AsyncIterator[StreamResponse] — Zero-Delay streaming. Each yield sends an SSE event. Frontend hot-reloads sandbox iframe per chunk. """ session = state.active_sessions[session_id] session["status"] = "streaming" state.sandbox_status[session_id] = "building" # Phase 1: Orchestrate yield f"event: phase\ndata: {json.dumps({'phase': 'orchestrate', 'msg': 'Posing optimal models...'})}\n\n" await asyncio.sleep(0.1) model_plan = state.orchestrator.pose(prompt) yield f"event: plan\ndata: {json.dumps({'plan': model_plan['summary'], 'models': model_plan['assignments']})}\n\n" await asyncio.sleep(0.05) # Phase 2: Generate code in chunks yield f"event: phase\ndata: {json.dumps({'phase': 'generate', 'msg': 'Generating code...'})}\n\n" code_chunks = state.orchestrator.generate_stream(prompt, model_plan) full_code = [] for chunk in code_chunks: full_code.append(chunk) partial = ''.join(full_code) state.generated_code[session_id] = partial yield f"event: code\ndata: {json.dumps({'chunk': chunk, 'partial': partial})}\n\n" await asyncio.sleep(0.05) final_code = ''.join(full_code) # Phase 3: Reflect-Select self-healing yield f"event: phase\ndata: {json.dumps({'phase': 'validate', 'msg': 'Running Reflect-Select self-healing...'})}\n\n" healed_code, errors_found, errors_fixed = state.reflect_loop.heal(final_code, prompt) state.generated_code[session_id] = healed_code yield f"event: heal\ndata: {json.dumps({'errors_found': errors_found, 'errors_fixed': errors_fixed})}\n\n" await asyncio.sleep(0.05) # Phase 4: Sandbox validation yield f"event: phase\ndata: {json.dumps({'phase': 'sandbox', 'msg': 'Validating in sandbox...'})}\n\n" sandbox_result = await validate_in_sandbox(session_id, healed_code) if sandbox_result["success"]: state.sandbox_status[session_id] = "stable" state.publish_ready[session_id] = True yield f"event: sandbox\ndata: {json.dumps({'status': 'stable', 'msg': 'Sandbox stable — Publish unlocked'})}\n\n" else: state.sandbox_status[session_id] = "error" state.publish_ready[session_id] = False yield f"event: sandbox\ndata: {json.dumps({'status': 'error', 'errors': sandbox_result['errors']})}\n\n" # Reflect-Select loop: autonomously fix sandbox errors for attempt in range(3): yield f"event: heal_attempt\ndata: {json.dumps({'attempt': attempt + 1})}\n\n" healed_code = state.reflect_loop.heal(healed_code, prompt, sandbox_errors=sandbox_result['errors'])[0] state.generated_code[session_id] = healed_code sandbox_result = await validate_in_sandbox(session_id, healed_code) if sandbox_result["success"]: state.sandbox_status[session_id] = "stable" state.publish_ready[session_id] = True yield f"event: sandbox\ndata: {json.dumps({'status': 'stable', 'msg': f'Sandbox stable after {attempt + 1} heal attempt(s) — Publish unlocked'})}\n\n" break # Save generated code code_path = GENERATED_DIR / session_id code_path.mkdir(exist_ok=True) (code_path / "index.html").write_text(healed_code) session["status"] = "complete" yield f"event: done\ndata: {json.dumps({'status': state.sandbox_status[session_id]})}\n\n" async def validate_in_sandbox(session_id: str, code: str) -> dict: """Basic sandbox validation — HTML tag checking, JS error detection, Python syntax.""" errors = [] if code.strip().startswith("") else: stack.append(t) if stack: errors.append(f"Unclosed tags: {stack}") if "console.loge(" in code: errors.append("Typo: console.loge → console.log") elif code.strip().startswith("import") or "def " in code: try: compile(code, '', 'exec') except SyntaxError as e: errors.append(f"Python syntax error: {e}") return {"success": len(errors) == 0, "errors": errors} # ─── HTTP Handlers ─────────────────────────────────────────────────────────── async def handle_stream(request: web.Request) -> web.StreamResponse: session_id = str(uuid.uuid4())[:8] data = await request.json() prompt = data.get("prompt", "") state.active_sessions[session_id] = { "id": session_id, "prompt": prompt, "status": "init", "created_at": datetime.now(timezone.utc).isoformat(), } state.sandbox_status[session_id] = "building" state.publish_ready[session_id] = False response = web.StreamResponse( status=200, reason="OK", headers={ "Content-Type": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) await response.prepare(request) try: async for event in stream_generator(session_id, prompt): await response.write(event.encode("utf-8")) await response.write(b"event: close\ndata: {}\n\n") except Exception as e: error_event = f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" await response.write(error_event.encode("utf-8")) state.sandbox_status[session_id] = "error" return response async def handle_publish(request: web.Request) -> web.Response: data = await request.json() session_id = data.get("session_id") repo_name = data.get("repo_name", f"wizard-vibe-{session_id}") description = data.get("description", "") if not state.publish_ready.get(session_id): return web.json_response({"error": "Sandbox not stable"}, status=400) code = state.generated_code.get(session_id, "") if not code: return web.json_response({"error": "No generated code"}, status=400) github_result = await state.github.create_and_push(repo_name=repo_name, files={"index.html": code}, description=description) spaces_result = await state.deploy.deploy_to_spaces(repo_name=repo_name, code=code, session_id=session_id) agent_card = state.deploy.generate_agent_card( name=repo_name, description=description or f"Wizard-Vibe: {session_id}", url=spaces_result.get("url"), capabilities=["code-generation", "vibe-deploy"], ) await state.github.push_file(repo_name=repo_name, path=".well-known/agent.json", content=json.dumps(agent_card, indent=2)) state.active_sessions[session_id]["published"] = True return web.json_response({ "success": True, "github": github_result, "spaces": spaces_result, "agent_card": agent_card, "agent_url": f"{spaces_result.get('url', '')}/.well-known/agent.json", }) async def handle_status(request: web.Request) -> web.Response: sid = request.query.get("session_id", "") if sid and sid in state.active_sessions: return web.json_response({ "status": state.active_sessions[sid]["status"], "sandbox": state.sandbox_status.get(sid), "publish_ready": state.publish_ready.get(sid), }) return web.json_response({"sessions": len(state.active_sessions)}) async def handle_agent_card(request: web.Request) -> web.Response: card = state.deploy.generate_agent_card( name="wizard-vibe-studio", description="Liquid Glass Wizard-Vibe Studio — Zero-Delay code generation with SSE streaming, sandbox preview, Reflect-Select self-healing, and one-click A2A publishing.", url="https://dryymatt-wizard-vibe-studio.hf.space", capabilities=["code-generation", "sse-streaming", "sandbox-preview", "github-publish", "hf-spaces-deploy"], ) return web.json_response(card) async def handle_preview(request: web.Request) -> web.Response: sid = request.query.get("session_id", "") code = state.generated_code.get(sid, "") if code: return web.Response(text=code, content_type="text/html") return web.Response(text="", content_type="text/html") async def handle_health(request: web.Request) -> web.Response: return web.json_response({"status": "alive", "studio": "Wizard-Vibe"}) # ─── Static File Handler ───────────────────────────────────────────────────── async def handle_static(request: web.Request) -> web.Response: path = request.match_info.get("path", "index.html") file_path = STATIC_DIR / path if not file_path.exists() or not file_path.is_file(): return web.Response(text="Not found", status=404) ct = {".html": "text/html", ".css": "text/css", ".js": "application/javascript"}.get(file_path.suffix, "application/octet-stream") return web.Response(text=file_path.read_text(), content_type=ct) # ─── App Factory ───────────────────────────────────────────────────────────── def create_app() -> web.Application: app = web.Application() app.router.add_post("/api/stream", handle_stream) app.router.add_post("/api/publish", handle_publish) app.router.add_get("/api/status", handle_status) app.router.add_get("/api/preview", handle_preview) app.router.add_get("/api/health", handle_health) app.router.add_get("/.well-known/agent.json", handle_agent_card) app.router.add_get("/", handle_static) app.router.add_get("/{path:.*}", handle_static) return app def main(): print("🧙‍♂️ Wizard-Vibe Studio — Liquid Glass Server starting on port", PORT) app = create_app() web.run_app(app, host="0.0.0.0", port=PORT) if __name__ == "__main__": main()