import os import re import json import asyncio import zipfile import io from typing import Dict, Any, List, Optional from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Request from fastapi.responses import StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import httpx # Load environment variables load_dotenv() # Configuration OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") SYSTEM_PROMPT = os.getenv("SYSTEM_PROMPT", "You are a helpful coding assistant that produces clean, well-structured, " "production-ready code. Generate readable, commented code and follow best practices. " "When returning multiple files, enclose each file in a fenced code block with an optional " "filename attribute (e.g. ```html filename=index.html). If no filename is provided, default to index.html." ) # Preflight check for API key if not OPENROUTER_API_KEY: raise RuntimeError( "OPENROUTER_API_KEY is not set. Set it as an environment variable before running. " "Get your API key from https://openrouter.ai/keys" ) app = FastAPI(title="AI Coding Playground", version="1.0.0") # CORS Configuration cors_origins = os.getenv("CORS_ORIGINS", "*").split(",") app.add_middleware( CORSMiddleware, allow_origins=cors_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Code parsing regex - tolerant pattern for fenced code blocks fence_pattern = re.compile( r"```(?:\s*(?P[\w+-]+))?(?:\s+filename=(?P(?:\"[^\"]+\"|'[^']+'|\S+)))?\s*\n(?P[\s\S]*?)```", re.MULTILINE ) # Language to file extension mapping LANG_TO_EXT = { 'html': '.html', 'css': '.css', 'javascript': '.js', 'js': '.js', 'typescript': '.ts', 'ts': '.ts', 'jsx': '.jsx', 'tsx': '.tsx', 'react': '.jsx', 'json': '.json', 'python': '.py', 'py': '.py', } class GenerateRequest(BaseModel): prompt: str class ParsedFile(BaseModel): filename: str content: str language: str def parse_code_blocks(text: str) -> Dict[str, str]: """Parse fenced code blocks from LLM output with improved React support""" files = {} matches = fence_pattern.finditer(text) for match in matches: lang = match.group('lang') or 'html' fname = match.group('fname') body = match.group('body').strip() # Clean filename if quoted if fname: fname = fname.strip('"\'') else: # Smart default filename based on language if lang.lower() in ['jsx', 'tsx', 'react']: # Look for component name in the code component_match = re.search(r'(?:function|const)\s+([A-Z][a-zA-Z]*)', body) if component_match: component_name = component_match.group(1) fname = f'{component_name}.jsx' else: fname = 'App.jsx' elif lang.lower() == 'javascript' or lang.lower() == 'js': # Check if this is actually React code (contains JSX) if 'return (' in body and ('<' in body and '>' in body): # This looks like JSX, use .jsx extension component_match = re.search(r'(?:function|const)\s+([A-Z][a-zA-Z]*)', body) if component_match: component_name = component_match.group(1) fname = f'{component_name}.jsx' else: fname = 'App.jsx' else: fname = 'script.js' elif lang.lower() == 'css': # Try to match CSS filename with JSX files if 'App' in body or not files: fname = 'App.css' else: fname = 'styles.css' else: ext = LANG_TO_EXT.get(lang.lower(), '.html') fname = f'index{ext}' files[fname] = body # If no code blocks found but there's content, treat as HTML if not files and text: files['index.html'] = text return files def create_zip_buffer(files: Dict[str, str]) -> io.BytesIO: """Create a zip file buffer from parsed files""" zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: for filename, content in files.items(): zip_file.writestr(filename, content) zip_buffer.seek(0) return zip_buffer async def stream_openrouter_response(prompt: str): """Stream response from OpenRouter API""" url = "https://openrouter.ai/api/v1/chat/completions" headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", } payload = { "model": "qwen/qwen3-coder:free", "messages": [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": prompt} ], "stream": True, "temperature": 0.7 } try: async with httpx.AsyncClient(timeout=60.0) as client: async with client.stream("POST", url, json=payload, headers=headers) as response: if response.status_code != 200: error_text = await response.aread() error_data = {"error": f"OpenRouter API Error: {response.status_code}", "details": error_text.decode()} yield f"data: {json.dumps(error_data)}\n\n" return full_content = "" async for chunk in response.aiter_lines(): if chunk.startswith("data: "): data = chunk[6:] # Remove "data: " prefix if data == "[DONE]": # Send final parsed files parsed_files = parse_code_blocks(full_content) final_data = { "type": "complete", "files": parsed_files, "full_content": full_content } yield f"data: {json.dumps(final_data)}\n\n" break try: chunk_data = json.loads(data) if "choices" in chunk_data and chunk_data["choices"]: delta = chunk_data["choices"][0].get("delta", {}) if "content" in delta: content = delta["content"] full_content += content # Send streaming content stream_data = { "type": "content", "content": content, "full_content": full_content } yield f"data: {json.dumps(stream_data)}\n\n" except json.JSONDecodeError: continue except httpx.TimeoutException: error_data = {"error": "Request timeout", "details": "The API request timed out. Please try again."} yield f"data: {json.dumps(error_data)}\n\n" except httpx.RequestError as e: error_data = {"error": "Connection error", "details": f"Failed to connect to OpenRouter API: {str(e)}"} yield f"data: {json.dumps(error_data)}\n\n" except Exception as e: error_data = {"error": "Unexpected error", "details": str(e)} yield f"data: {json.dumps(error_data)}\n\n" @app.get("/api/health") async def health_check(): """Health check endpoint""" return { "status": "healthy", "api_key_configured": bool(OPENROUTER_API_KEY), "system_prompt_configured": bool(SYSTEM_PROMPT) } @app.post("/api/generate") async def generate_code(request: GenerateRequest): """Generate code using OpenRouter API with streaming""" if not request.prompt.strip(): raise HTTPException(status_code=400, detail="Prompt cannot be empty") return StreamingResponse( stream_openrouter_response(request.prompt), media_type="text/plain", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", } ) @app.post("/api/download") async def download_files(files: Dict[str, str]): """Create and download ZIP file from parsed files""" if not files: raise HTTPException(status_code=400, detail="No files provided") try: zip_buffer = create_zip_buffer(files) return StreamingResponse( io.BytesIO(zip_buffer.read()), media_type="application/zip", headers={ "Content-Disposition": "attachment; filename=generated-code.zip" } ) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to create ZIP file: {str(e)}") # Serve React static files (for Hugging Face Spaces deployment) try: app.mount("/", StaticFiles(directory="/app/frontend/build", html=True), name="frontend") except Exception: # For development - React dev server handles frontend @app.get("/") async def root(): return {"message": "AI Coding Playground API", "frontend": "Run React dev server separately"} if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", 8001)) uvicorn.run(app, host="0.0.0.0", port=port)