Spaces:
Build error
Build error
| import os | |
| import re | |
| import json | |
| import asyncio | |
| import zipfile | |
| import io | |
| from typing import Dict, Any, List, Optional | |
| from dotenv import load_dotenv | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.responses import StreamingResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| import httpx | |
| # Load environment variables | |
| load_dotenv() | |
| # Configuration | |
| OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") | |
| SYSTEM_PROMPT = os.getenv("SYSTEM_PROMPT", | |
| "You are a helpful coding assistant that produces clean, well-structured, " | |
| "production-ready code. Generate readable, commented code and follow best practices. " | |
| "When returning multiple files, enclose each file in a fenced code block with an optional " | |
| "filename attribute (e.g. ```html filename=index.html). If no filename is provided, default to index.html." | |
| ) | |
| # Preflight check for API key | |
| if not OPENROUTER_API_KEY: | |
| raise RuntimeError( | |
| "OPENROUTER_API_KEY is not set. Set it as an environment variable before running. " | |
| "Get your API key from https://openrouter.ai/keys" | |
| ) | |
| app = FastAPI(title="AI Coding Playground", version="1.0.0") | |
| # CORS Configuration | |
| cors_origins = os.getenv("CORS_ORIGINS", "*").split(",") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=cors_origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Code parsing regex - tolerant pattern for fenced code blocks | |
| fence_pattern = re.compile( | |
| r"```(?:\s*(?P<lang>[\w+-]+))?(?:\s+filename=(?P<fname>(?:\"[^\"]+\"|'[^']+'|\S+)))?\s*\n(?P<body>[\s\S]*?)```", | |
| re.MULTILINE | |
| ) | |
| # Language to file extension mapping | |
| LANG_TO_EXT = { | |
| 'html': '.html', | |
| 'css': '.css', | |
| 'javascript': '.js', | |
| 'js': '.js', | |
| 'typescript': '.ts', | |
| 'ts': '.ts', | |
| 'jsx': '.jsx', | |
| 'tsx': '.tsx', | |
| 'react': '.jsx', | |
| 'json': '.json', | |
| 'python': '.py', | |
| 'py': '.py', | |
| } | |
| class GenerateRequest(BaseModel): | |
| prompt: str | |
| class ParsedFile(BaseModel): | |
| filename: str | |
| content: str | |
| language: str | |
| def parse_code_blocks(text: str) -> Dict[str, str]: | |
| """Parse fenced code blocks from LLM output with improved React support""" | |
| files = {} | |
| matches = fence_pattern.finditer(text) | |
| for match in matches: | |
| lang = match.group('lang') or 'html' | |
| fname = match.group('fname') | |
| body = match.group('body').strip() | |
| # Clean filename if quoted | |
| if fname: | |
| fname = fname.strip('"\'') | |
| else: | |
| # Smart default filename based on language | |
| if lang.lower() in ['jsx', 'tsx', 'react']: | |
| # Look for component name in the code | |
| component_match = re.search(r'(?:function|const)\s+([A-Z][a-zA-Z]*)', body) | |
| if component_match: | |
| component_name = component_match.group(1) | |
| fname = f'{component_name}.jsx' | |
| else: | |
| fname = 'App.jsx' | |
| elif lang.lower() == 'javascript' or lang.lower() == 'js': | |
| # Check if this is actually React code (contains JSX) | |
| if 'return (' in body and ('<' in body and '>' in body): | |
| # This looks like JSX, use .jsx extension | |
| component_match = re.search(r'(?:function|const)\s+([A-Z][a-zA-Z]*)', body) | |
| if component_match: | |
| component_name = component_match.group(1) | |
| fname = f'{component_name}.jsx' | |
| else: | |
| fname = 'App.jsx' | |
| else: | |
| fname = 'script.js' | |
| elif lang.lower() == 'css': | |
| # Try to match CSS filename with JSX files | |
| if 'App' in body or not files: | |
| fname = 'App.css' | |
| else: | |
| fname = 'styles.css' | |
| else: | |
| ext = LANG_TO_EXT.get(lang.lower(), '.html') | |
| fname = f'index{ext}' | |
| files[fname] = body | |
| # If no code blocks found but there's content, treat as HTML | |
| if not files and text: | |
| files['index.html'] = text | |
| return files | |
| def create_zip_buffer(files: Dict[str, str]) -> io.BytesIO: | |
| """Create a zip file buffer from parsed files""" | |
| zip_buffer = io.BytesIO() | |
| with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
| for filename, content in files.items(): | |
| zip_file.writestr(filename, content) | |
| zip_buffer.seek(0) | |
| return zip_buffer | |
| async def stream_openrouter_response(prompt: str): | |
| """Stream response from OpenRouter API""" | |
| url = "https://openrouter.ai/api/v1/chat/completions" | |
| headers = { | |
| "Authorization": f"Bearer {OPENROUTER_API_KEY}", | |
| "Content-Type": "application/json", | |
| } | |
| payload = { | |
| "model": "qwen/qwen3-coder:free", | |
| "messages": [ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| "stream": True, | |
| "temperature": 0.7 | |
| } | |
| try: | |
| async with httpx.AsyncClient(timeout=60.0) as client: | |
| async with client.stream("POST", url, json=payload, headers=headers) as response: | |
| if response.status_code != 200: | |
| error_text = await response.aread() | |
| error_data = {"error": f"OpenRouter API Error: {response.status_code}", "details": error_text.decode()} | |
| yield f"data: {json.dumps(error_data)}\n\n" | |
| return | |
| full_content = "" | |
| async for chunk in response.aiter_lines(): | |
| if chunk.startswith("data: "): | |
| data = chunk[6:] # Remove "data: " prefix | |
| if data == "[DONE]": | |
| # Send final parsed files | |
| parsed_files = parse_code_blocks(full_content) | |
| final_data = { | |
| "type": "complete", | |
| "files": parsed_files, | |
| "full_content": full_content | |
| } | |
| yield f"data: {json.dumps(final_data)}\n\n" | |
| break | |
| try: | |
| chunk_data = json.loads(data) | |
| if "choices" in chunk_data and chunk_data["choices"]: | |
| delta = chunk_data["choices"][0].get("delta", {}) | |
| if "content" in delta: | |
| content = delta["content"] | |
| full_content += content | |
| # Send streaming content | |
| stream_data = { | |
| "type": "content", | |
| "content": content, | |
| "full_content": full_content | |
| } | |
| yield f"data: {json.dumps(stream_data)}\n\n" | |
| except json.JSONDecodeError: | |
| continue | |
| except httpx.TimeoutException: | |
| error_data = {"error": "Request timeout", "details": "The API request timed out. Please try again."} | |
| yield f"data: {json.dumps(error_data)}\n\n" | |
| except httpx.RequestError as e: | |
| error_data = {"error": "Connection error", "details": f"Failed to connect to OpenRouter API: {str(e)}"} | |
| yield f"data: {json.dumps(error_data)}\n\n" | |
| except Exception as e: | |
| error_data = {"error": "Unexpected error", "details": str(e)} | |
| yield f"data: {json.dumps(error_data)}\n\n" | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return { | |
| "status": "healthy", | |
| "api_key_configured": bool(OPENROUTER_API_KEY), | |
| "system_prompt_configured": bool(SYSTEM_PROMPT) | |
| } | |
| async def generate_code(request: GenerateRequest): | |
| """Generate code using OpenRouter API with streaming""" | |
| if not request.prompt.strip(): | |
| raise HTTPException(status_code=400, detail="Prompt cannot be empty") | |
| return StreamingResponse( | |
| stream_openrouter_response(request.prompt), | |
| media_type="text/plain", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "Access-Control-Allow-Origin": "*", | |
| } | |
| ) | |
| async def download_files(files: Dict[str, str]): | |
| """Create and download ZIP file from parsed files""" | |
| if not files: | |
| raise HTTPException(status_code=400, detail="No files provided") | |
| try: | |
| zip_buffer = create_zip_buffer(files) | |
| return StreamingResponse( | |
| io.BytesIO(zip_buffer.read()), | |
| media_type="application/zip", | |
| headers={ | |
| "Content-Disposition": "attachment; filename=generated-code.zip" | |
| } | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to create ZIP file: {str(e)}") | |
| # Serve React static files (for Hugging Face Spaces deployment) | |
| try: | |
| app.mount("/", StaticFiles(directory="/app/frontend/build", html=True), name="frontend") | |
| except Exception: | |
| # For development - React dev server handles frontend | |
| async def root(): | |
| return {"message": "AI Coding Playground API", "frontend": "Run React dev server separately"} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.getenv("PORT", 8001)) | |
| uvicorn.run(app, host="0.0.0.0", port=port) |