wizard-vibe-studio / server.py
dryymatt's picture
✨ Wizard-Vibe: server.py
695c646 verified
"""
Wizard-Vibe Studio β€” Liquid Glass Server
=========================================
SSE streaming server with AsyncIterator StreamResponse.
Reflect-Select self-healing loop. Sandbox preview bridge.
GitHub MCP + HF Spaces one-click deploy.
A2A discoverable agent card at /.well-known/agent.json
"""
import asyncio
import json
import os
import sys
import time
import uuid
from pathlib import Path
from datetime import datetime, timezone
import aiohttp
from aiohttp import web
# Add parent for Litehat imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from orchestrator import LitehatOrchestrator
from reflect_select import ReflectSelectLoop
from github_mcp import GitHubMCP
from deploy_pipeline import DeployPipeline
# ─── Configuration ───────────────────────────────────────────────────────────
PORT = int(os.environ.get("WIZARD_PORT", 8765))
STATIC_DIR = Path(__file__).parent / "static"
GENERATED_DIR = Path(__file__).parent / "generated"
DEPLOY_DIR = Path(__file__).parent / "deploy"
GENERATED_DIR.mkdir(parents=True, exist_ok=True)
DEPLOY_DIR.mkdir(parents=True, exist_ok=True)
# ─── Global State ─────────────────────────────────────────────────────────────
class StudioState:
def __init__(self):
self.orchestrator = LitehatOrchestrator()
self.reflect_loop = ReflectSelectLoop()
self.github = GitHubMCP()
self.deploy = DeployPipeline()
self.active_sessions: dict = {}
self.sandbox_status: dict = {}
self.publish_ready: dict = {}
self.generated_code: dict = {}
state = StudioState()
# ─── SSE Streaming Engine ─────────────────────────────────────────────────────
async def stream_generator(session_id: str, prompt: str):
"""
AsyncIterator[StreamResponse] β€” Zero-Delay streaming.
Each yield sends an SSE event. Frontend hot-reloads sandbox iframe per chunk.
"""
session = state.active_sessions[session_id]
session["status"] = "streaming"
state.sandbox_status[session_id] = "building"
# Phase 1: Orchestrate
yield f"event: phase\ndata: {json.dumps({'phase': 'orchestrate', 'msg': 'Posing optimal models...'})}\n\n"
await asyncio.sleep(0.1)
model_plan = state.orchestrator.pose(prompt)
yield f"event: plan\ndata: {json.dumps({'plan': model_plan['summary'], 'models': model_plan['assignments']})}\n\n"
await asyncio.sleep(0.05)
# Phase 2: Generate code in chunks
yield f"event: phase\ndata: {json.dumps({'phase': 'generate', 'msg': 'Generating code...'})}\n\n"
code_chunks = state.orchestrator.generate_stream(prompt, model_plan)
full_code = []
for chunk in code_chunks:
full_code.append(chunk)
partial = ''.join(full_code)
state.generated_code[session_id] = partial
yield f"event: code\ndata: {json.dumps({'chunk': chunk, 'partial': partial})}\n\n"
await asyncio.sleep(0.05)
final_code = ''.join(full_code)
# Phase 3: Reflect-Select self-healing
yield f"event: phase\ndata: {json.dumps({'phase': 'validate', 'msg': 'Running Reflect-Select self-healing...'})}\n\n"
healed_code, errors_found, errors_fixed = state.reflect_loop.heal(final_code, prompt)
state.generated_code[session_id] = healed_code
yield f"event: heal\ndata: {json.dumps({'errors_found': errors_found, 'errors_fixed': errors_fixed})}\n\n"
await asyncio.sleep(0.05)
# Phase 4: Sandbox validation
yield f"event: phase\ndata: {json.dumps({'phase': 'sandbox', 'msg': 'Validating in sandbox...'})}\n\n"
sandbox_result = await validate_in_sandbox(session_id, healed_code)
if sandbox_result["success"]:
state.sandbox_status[session_id] = "stable"
state.publish_ready[session_id] = True
yield f"event: sandbox\ndata: {json.dumps({'status': 'stable', 'msg': 'Sandbox stable β€” Publish unlocked'})}\n\n"
else:
state.sandbox_status[session_id] = "error"
state.publish_ready[session_id] = False
yield f"event: sandbox\ndata: {json.dumps({'status': 'error', 'errors': sandbox_result['errors']})}\n\n"
# Reflect-Select loop: autonomously fix sandbox errors
for attempt in range(3):
yield f"event: heal_attempt\ndata: {json.dumps({'attempt': attempt + 1})}\n\n"
healed_code = state.reflect_loop.heal(healed_code, prompt, sandbox_errors=sandbox_result['errors'])[0]
state.generated_code[session_id] = healed_code
sandbox_result = await validate_in_sandbox(session_id, healed_code)
if sandbox_result["success"]:
state.sandbox_status[session_id] = "stable"
state.publish_ready[session_id] = True
yield f"event: sandbox\ndata: {json.dumps({'status': 'stable', 'msg': f'Sandbox stable after {attempt + 1} heal attempt(s) β€” Publish unlocked'})}\n\n"
break
# Save generated code
code_path = GENERATED_DIR / session_id
code_path.mkdir(exist_ok=True)
(code_path / "index.html").write_text(healed_code)
session["status"] = "complete"
yield f"event: done\ndata: {json.dumps({'status': state.sandbox_status[session_id]})}\n\n"
async def validate_in_sandbox(session_id: str, code: str) -> dict:
"""Basic sandbox validation β€” HTML tag checking, JS error detection, Python syntax."""
errors = []
if code.strip().startswith("<!DOCTYPE") or "<html" in code.lower():
import re
tags = re.findall(r'<(/?)(\w+)', code)
stack = []
void_els = {'br','hr','img','input','meta','link','area','base','col','embed','source','track','wbr'}
for closing, tag in tags:
t = tag.lower()
if t in void_els: continue
if closing:
if stack and stack[-1] == t: stack.pop()
else: errors.append(f"Mismatched closing tag: </{tag}>")
else:
stack.append(t)
if stack: errors.append(f"Unclosed tags: {stack}")
if "console.loge(" in code:
errors.append("Typo: console.loge β†’ console.log")
elif code.strip().startswith("import") or "def " in code:
try:
compile(code, '<sandbox>', 'exec')
except SyntaxError as e:
errors.append(f"Python syntax error: {e}")
return {"success": len(errors) == 0, "errors": errors}
# ─── HTTP Handlers ───────────────────────────────────────────────────────────
async def handle_stream(request: web.Request) -> web.StreamResponse:
session_id = str(uuid.uuid4())[:8]
data = await request.json()
prompt = data.get("prompt", "")
state.active_sessions[session_id] = {
"id": session_id, "prompt": prompt, "status": "init",
"created_at": datetime.now(timezone.utc).isoformat(),
}
state.sandbox_status[session_id] = "building"
state.publish_ready[session_id] = False
response = web.StreamResponse(
status=200, reason="OK",
headers={
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
await response.prepare(request)
try:
async for event in stream_generator(session_id, prompt):
await response.write(event.encode("utf-8"))
await response.write(b"event: close\ndata: {}\n\n")
except Exception as e:
error_event = f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
await response.write(error_event.encode("utf-8"))
state.sandbox_status[session_id] = "error"
return response
async def handle_publish(request: web.Request) -> web.Response:
data = await request.json()
session_id = data.get("session_id")
repo_name = data.get("repo_name", f"wizard-vibe-{session_id}")
description = data.get("description", "")
if not state.publish_ready.get(session_id):
return web.json_response({"error": "Sandbox not stable"}, status=400)
code = state.generated_code.get(session_id, "")
if not code:
return web.json_response({"error": "No generated code"}, status=400)
github_result = await state.github.create_and_push(repo_name=repo_name, files={"index.html": code}, description=description)
spaces_result = await state.deploy.deploy_to_spaces(repo_name=repo_name, code=code, session_id=session_id)
agent_card = state.deploy.generate_agent_card(
name=repo_name, description=description or f"Wizard-Vibe: {session_id}",
url=spaces_result.get("url"),
capabilities=["code-generation", "vibe-deploy"],
)
await state.github.push_file(repo_name=repo_name, path=".well-known/agent.json", content=json.dumps(agent_card, indent=2))
state.active_sessions[session_id]["published"] = True
return web.json_response({
"success": True, "github": github_result, "spaces": spaces_result,
"agent_card": agent_card,
"agent_url": f"{spaces_result.get('url', '')}/.well-known/agent.json",
})
async def handle_status(request: web.Request) -> web.Response:
sid = request.query.get("session_id", "")
if sid and sid in state.active_sessions:
return web.json_response({
"status": state.active_sessions[sid]["status"],
"sandbox": state.sandbox_status.get(sid),
"publish_ready": state.publish_ready.get(sid),
})
return web.json_response({"sessions": len(state.active_sessions)})
async def handle_agent_card(request: web.Request) -> web.Response:
card = state.deploy.generate_agent_card(
name="wizard-vibe-studio",
description="Liquid Glass Wizard-Vibe Studio β€” Zero-Delay code generation with SSE streaming, sandbox preview, Reflect-Select self-healing, and one-click A2A publishing.",
url="https://dryymatt-wizard-vibe-studio.hf.space",
capabilities=["code-generation", "sse-streaming", "sandbox-preview", "github-publish", "hf-spaces-deploy"],
)
return web.json_response(card)
async def handle_preview(request: web.Request) -> web.Response:
sid = request.query.get("session_id", "")
code = state.generated_code.get(sid, "")
if code:
return web.Response(text=code, content_type="text/html")
return web.Response(text="<!-- No code yet -->", content_type="text/html")
async def handle_health(request: web.Request) -> web.Response:
return web.json_response({"status": "alive", "studio": "Wizard-Vibe"})
# ─── Static File Handler ─────────────────────────────────────────────────────
async def handle_static(request: web.Request) -> web.Response:
path = request.match_info.get("path", "index.html")
file_path = STATIC_DIR / path
if not file_path.exists() or not file_path.is_file():
return web.Response(text="Not found", status=404)
ct = {".html": "text/html", ".css": "text/css", ".js": "application/javascript"}.get(file_path.suffix, "application/octet-stream")
return web.Response(text=file_path.read_text(), content_type=ct)
# ─── App Factory ─────────────────────────────────────────────────────────────
def create_app() -> web.Application:
app = web.Application()
app.router.add_post("/api/stream", handle_stream)
app.router.add_post("/api/publish", handle_publish)
app.router.add_get("/api/status", handle_status)
app.router.add_get("/api/preview", handle_preview)
app.router.add_get("/api/health", handle_health)
app.router.add_get("/.well-known/agent.json", handle_agent_card)
app.router.add_get("/", handle_static)
app.router.add_get("/{path:.*}", handle_static)
return app
def main():
print("πŸ§™β€β™‚οΈ Wizard-Vibe Studio β€” Liquid Glass Server starting on port", PORT)
app = create_app()
web.run_app(app, host="0.0.0.0", port=PORT)
if __name__ == "__main__":
main()