Spaces:
Runtime error
Runtime error
| """ | |
| Wizard-Vibe Studio β Liquid Glass Server | |
| ========================================= | |
| SSE streaming server with AsyncIterator StreamResponse. | |
| Reflect-Select self-healing loop. Sandbox preview bridge. | |
| GitHub MCP + HF Spaces one-click deploy. | |
| A2A discoverable agent card at /.well-known/agent.json | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| import sys | |
| import time | |
| import uuid | |
| from pathlib import Path | |
| from datetime import datetime, timezone | |
| import aiohttp | |
| from aiohttp import web | |
| # Add parent for Litehat imports | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from orchestrator import LitehatOrchestrator | |
| from reflect_select import ReflectSelectLoop | |
| from github_mcp import GitHubMCP | |
| from deploy_pipeline import DeployPipeline | |
| # βββ Configuration βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| PORT = int(os.environ.get("WIZARD_PORT", 8765)) | |
| STATIC_DIR = Path(__file__).parent / "static" | |
| GENERATED_DIR = Path(__file__).parent / "generated" | |
| DEPLOY_DIR = Path(__file__).parent / "deploy" | |
| GENERATED_DIR.mkdir(parents=True, exist_ok=True) | |
| DEPLOY_DIR.mkdir(parents=True, exist_ok=True) | |
| # βββ Global State βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class StudioState: | |
| def __init__(self): | |
| self.orchestrator = LitehatOrchestrator() | |
| self.reflect_loop = ReflectSelectLoop() | |
| self.github = GitHubMCP() | |
| self.deploy = DeployPipeline() | |
| self.active_sessions: dict = {} | |
| self.sandbox_status: dict = {} | |
| self.publish_ready: dict = {} | |
| self.generated_code: dict = {} | |
| state = StudioState() | |
| # βββ SSE Streaming Engine βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def stream_generator(session_id: str, prompt: str): | |
| """ | |
| AsyncIterator[StreamResponse] β Zero-Delay streaming. | |
| Each yield sends an SSE event. Frontend hot-reloads sandbox iframe per chunk. | |
| """ | |
| session = state.active_sessions[session_id] | |
| session["status"] = "streaming" | |
| state.sandbox_status[session_id] = "building" | |
| # Phase 1: Orchestrate | |
| yield f"event: phase\ndata: {json.dumps({'phase': 'orchestrate', 'msg': 'Posing optimal models...'})}\n\n" | |
| await asyncio.sleep(0.1) | |
| model_plan = state.orchestrator.pose(prompt) | |
| yield f"event: plan\ndata: {json.dumps({'plan': model_plan['summary'], 'models': model_plan['assignments']})}\n\n" | |
| await asyncio.sleep(0.05) | |
| # Phase 2: Generate code in chunks | |
| yield f"event: phase\ndata: {json.dumps({'phase': 'generate', 'msg': 'Generating code...'})}\n\n" | |
| code_chunks = state.orchestrator.generate_stream(prompt, model_plan) | |
| full_code = [] | |
| for chunk in code_chunks: | |
| full_code.append(chunk) | |
| partial = ''.join(full_code) | |
| state.generated_code[session_id] = partial | |
| yield f"event: code\ndata: {json.dumps({'chunk': chunk, 'partial': partial})}\n\n" | |
| await asyncio.sleep(0.05) | |
| final_code = ''.join(full_code) | |
| # Phase 3: Reflect-Select self-healing | |
| yield f"event: phase\ndata: {json.dumps({'phase': 'validate', 'msg': 'Running Reflect-Select self-healing...'})}\n\n" | |
| healed_code, errors_found, errors_fixed = state.reflect_loop.heal(final_code, prompt) | |
| state.generated_code[session_id] = healed_code | |
| yield f"event: heal\ndata: {json.dumps({'errors_found': errors_found, 'errors_fixed': errors_fixed})}\n\n" | |
| await asyncio.sleep(0.05) | |
| # Phase 4: Sandbox validation | |
| yield f"event: phase\ndata: {json.dumps({'phase': 'sandbox', 'msg': 'Validating in sandbox...'})}\n\n" | |
| sandbox_result = await validate_in_sandbox(session_id, healed_code) | |
| if sandbox_result["success"]: | |
| state.sandbox_status[session_id] = "stable" | |
| state.publish_ready[session_id] = True | |
| yield f"event: sandbox\ndata: {json.dumps({'status': 'stable', 'msg': 'Sandbox stable β Publish unlocked'})}\n\n" | |
| else: | |
| state.sandbox_status[session_id] = "error" | |
| state.publish_ready[session_id] = False | |
| yield f"event: sandbox\ndata: {json.dumps({'status': 'error', 'errors': sandbox_result['errors']})}\n\n" | |
| # Reflect-Select loop: autonomously fix sandbox errors | |
| for attempt in range(3): | |
| yield f"event: heal_attempt\ndata: {json.dumps({'attempt': attempt + 1})}\n\n" | |
| healed_code = state.reflect_loop.heal(healed_code, prompt, sandbox_errors=sandbox_result['errors'])[0] | |
| state.generated_code[session_id] = healed_code | |
| sandbox_result = await validate_in_sandbox(session_id, healed_code) | |
| if sandbox_result["success"]: | |
| state.sandbox_status[session_id] = "stable" | |
| state.publish_ready[session_id] = True | |
| yield f"event: sandbox\ndata: {json.dumps({'status': 'stable', 'msg': f'Sandbox stable after {attempt + 1} heal attempt(s) β Publish unlocked'})}\n\n" | |
| break | |
| # Save generated code | |
| code_path = GENERATED_DIR / session_id | |
| code_path.mkdir(exist_ok=True) | |
| (code_path / "index.html").write_text(healed_code) | |
| session["status"] = "complete" | |
| yield f"event: done\ndata: {json.dumps({'status': state.sandbox_status[session_id]})}\n\n" | |
| async def validate_in_sandbox(session_id: str, code: str) -> dict: | |
| """Basic sandbox validation β HTML tag checking, JS error detection, Python syntax.""" | |
| errors = [] | |
| if code.strip().startswith("<!DOCTYPE") or "<html" in code.lower(): | |
| import re | |
| tags = re.findall(r'<(/?)(\w+)', code) | |
| stack = [] | |
| void_els = {'br','hr','img','input','meta','link','area','base','col','embed','source','track','wbr'} | |
| for closing, tag in tags: | |
| t = tag.lower() | |
| if t in void_els: continue | |
| if closing: | |
| if stack and stack[-1] == t: stack.pop() | |
| else: errors.append(f"Mismatched closing tag: </{tag}>") | |
| else: | |
| stack.append(t) | |
| if stack: errors.append(f"Unclosed tags: {stack}") | |
| if "console.loge(" in code: | |
| errors.append("Typo: console.loge β console.log") | |
| elif code.strip().startswith("import") or "def " in code: | |
| try: | |
| compile(code, '<sandbox>', 'exec') | |
| except SyntaxError as e: | |
| errors.append(f"Python syntax error: {e}") | |
| return {"success": len(errors) == 0, "errors": errors} | |
| # βββ HTTP Handlers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def handle_stream(request: web.Request) -> web.StreamResponse: | |
| session_id = str(uuid.uuid4())[:8] | |
| data = await request.json() | |
| prompt = data.get("prompt", "") | |
| state.active_sessions[session_id] = { | |
| "id": session_id, "prompt": prompt, "status": "init", | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| } | |
| state.sandbox_status[session_id] = "building" | |
| state.publish_ready[session_id] = False | |
| response = web.StreamResponse( | |
| status=200, reason="OK", | |
| headers={ | |
| "Content-Type": "text/event-stream", | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| }, | |
| ) | |
| await response.prepare(request) | |
| try: | |
| async for event in stream_generator(session_id, prompt): | |
| await response.write(event.encode("utf-8")) | |
| await response.write(b"event: close\ndata: {}\n\n") | |
| except Exception as e: | |
| error_event = f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" | |
| await response.write(error_event.encode("utf-8")) | |
| state.sandbox_status[session_id] = "error" | |
| return response | |
| async def handle_publish(request: web.Request) -> web.Response: | |
| data = await request.json() | |
| session_id = data.get("session_id") | |
| repo_name = data.get("repo_name", f"wizard-vibe-{session_id}") | |
| description = data.get("description", "") | |
| if not state.publish_ready.get(session_id): | |
| return web.json_response({"error": "Sandbox not stable"}, status=400) | |
| code = state.generated_code.get(session_id, "") | |
| if not code: | |
| return web.json_response({"error": "No generated code"}, status=400) | |
| github_result = await state.github.create_and_push(repo_name=repo_name, files={"index.html": code}, description=description) | |
| spaces_result = await state.deploy.deploy_to_spaces(repo_name=repo_name, code=code, session_id=session_id) | |
| agent_card = state.deploy.generate_agent_card( | |
| name=repo_name, description=description or f"Wizard-Vibe: {session_id}", | |
| url=spaces_result.get("url"), | |
| capabilities=["code-generation", "vibe-deploy"], | |
| ) | |
| await state.github.push_file(repo_name=repo_name, path=".well-known/agent.json", content=json.dumps(agent_card, indent=2)) | |
| state.active_sessions[session_id]["published"] = True | |
| return web.json_response({ | |
| "success": True, "github": github_result, "spaces": spaces_result, | |
| "agent_card": agent_card, | |
| "agent_url": f"{spaces_result.get('url', '')}/.well-known/agent.json", | |
| }) | |
| async def handle_status(request: web.Request) -> web.Response: | |
| sid = request.query.get("session_id", "") | |
| if sid and sid in state.active_sessions: | |
| return web.json_response({ | |
| "status": state.active_sessions[sid]["status"], | |
| "sandbox": state.sandbox_status.get(sid), | |
| "publish_ready": state.publish_ready.get(sid), | |
| }) | |
| return web.json_response({"sessions": len(state.active_sessions)}) | |
| async def handle_agent_card(request: web.Request) -> web.Response: | |
| card = state.deploy.generate_agent_card( | |
| name="wizard-vibe-studio", | |
| description="Liquid Glass Wizard-Vibe Studio β Zero-Delay code generation with SSE streaming, sandbox preview, Reflect-Select self-healing, and one-click A2A publishing.", | |
| url="https://dryymatt-wizard-vibe-studio.hf.space", | |
| capabilities=["code-generation", "sse-streaming", "sandbox-preview", "github-publish", "hf-spaces-deploy"], | |
| ) | |
| return web.json_response(card) | |
| async def handle_preview(request: web.Request) -> web.Response: | |
| sid = request.query.get("session_id", "") | |
| code = state.generated_code.get(sid, "") | |
| if code: | |
| return web.Response(text=code, content_type="text/html") | |
| return web.Response(text="<!-- No code yet -->", content_type="text/html") | |
| async def handle_health(request: web.Request) -> web.Response: | |
| return web.json_response({"status": "alive", "studio": "Wizard-Vibe"}) | |
| # βββ Static File Handler βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def handle_static(request: web.Request) -> web.Response: | |
| path = request.match_info.get("path", "index.html") | |
| file_path = STATIC_DIR / path | |
| if not file_path.exists() or not file_path.is_file(): | |
| return web.Response(text="Not found", status=404) | |
| ct = {".html": "text/html", ".css": "text/css", ".js": "application/javascript"}.get(file_path.suffix, "application/octet-stream") | |
| return web.Response(text=file_path.read_text(), content_type=ct) | |
| # βββ App Factory βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def create_app() -> web.Application: | |
| app = web.Application() | |
| app.router.add_post("/api/stream", handle_stream) | |
| app.router.add_post("/api/publish", handle_publish) | |
| app.router.add_get("/api/status", handle_status) | |
| app.router.add_get("/api/preview", handle_preview) | |
| app.router.add_get("/api/health", handle_health) | |
| app.router.add_get("/.well-known/agent.json", handle_agent_card) | |
| app.router.add_get("/", handle_static) | |
| app.router.add_get("/{path:.*}", handle_static) | |
| return app | |
| def main(): | |
| print("π§ββοΈ Wizard-Vibe Studio β Liquid Glass Server starting on port", PORT) | |
| app = create_app() | |
| web.run_app(app, host="0.0.0.0", port=PORT) | |
| if __name__ == "__main__": | |
| main() | |