KumarArpit8649's picture
Rename server.py (1) to server.py
d3d6415 verified
import os
import re
import json
import asyncio
import zipfile
import io
from typing import Dict, Any, List, Optional
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import httpx
# Load environment variables
load_dotenv()
# Configuration
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
SYSTEM_PROMPT = os.getenv("SYSTEM_PROMPT",
"You are a helpful coding assistant that produces clean, well-structured, "
"production-ready code. Generate readable, commented code and follow best practices. "
"When returning multiple files, enclose each file in a fenced code block with an optional "
"filename attribute (e.g. ```html filename=index.html). If no filename is provided, default to index.html."
)
# Preflight check for API key
if not OPENROUTER_API_KEY:
raise RuntimeError(
"OPENROUTER_API_KEY is not set. Set it as an environment variable before running. "
"Get your API key from https://openrouter.ai/keys"
)
app = FastAPI(title="AI Coding Playground", version="1.0.0")
# CORS Configuration
cors_origins = os.getenv("CORS_ORIGINS", "*").split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Code parsing regex - tolerant pattern for fenced code blocks
fence_pattern = re.compile(
r"```(?:\s*(?P<lang>[\w+-]+))?(?:\s+filename=(?P<fname>(?:\"[^\"]+\"|'[^']+'|\S+)))?\s*\n(?P<body>[\s\S]*?)```",
re.MULTILINE
)
# Language to file extension mapping
LANG_TO_EXT = {
'html': '.html',
'css': '.css',
'javascript': '.js',
'js': '.js',
'typescript': '.ts',
'ts': '.ts',
'jsx': '.jsx',
'tsx': '.tsx',
'react': '.jsx',
'json': '.json',
'python': '.py',
'py': '.py',
}
class GenerateRequest(BaseModel):
prompt: str
class ParsedFile(BaseModel):
filename: str
content: str
language: str
def parse_code_blocks(text: str) -> Dict[str, str]:
"""Parse fenced code blocks from LLM output with improved React support"""
files = {}
matches = fence_pattern.finditer(text)
for match in matches:
lang = match.group('lang') or 'html'
fname = match.group('fname')
body = match.group('body').strip()
# Clean filename if quoted
if fname:
fname = fname.strip('"\'')
else:
# Smart default filename based on language
if lang.lower() in ['jsx', 'tsx', 'react']:
# Look for component name in the code
component_match = re.search(r'(?:function|const)\s+([A-Z][a-zA-Z]*)', body)
if component_match:
component_name = component_match.group(1)
fname = f'{component_name}.jsx'
else:
fname = 'App.jsx'
elif lang.lower() == 'javascript' or lang.lower() == 'js':
# Check if this is actually React code (contains JSX)
if 'return (' in body and ('<' in body and '>' in body):
# This looks like JSX, use .jsx extension
component_match = re.search(r'(?:function|const)\s+([A-Z][a-zA-Z]*)', body)
if component_match:
component_name = component_match.group(1)
fname = f'{component_name}.jsx'
else:
fname = 'App.jsx'
else:
fname = 'script.js'
elif lang.lower() == 'css':
# Try to match CSS filename with JSX files
if 'App' in body or not files:
fname = 'App.css'
else:
fname = 'styles.css'
else:
ext = LANG_TO_EXT.get(lang.lower(), '.html')
fname = f'index{ext}'
files[fname] = body
# If no code blocks found but there's content, treat as HTML
if not files and text:
files['index.html'] = text
return files
def create_zip_buffer(files: Dict[str, str]) -> io.BytesIO:
"""Create a zip file buffer from parsed files"""
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for filename, content in files.items():
zip_file.writestr(filename, content)
zip_buffer.seek(0)
return zip_buffer
async def stream_openrouter_response(prompt: str):
"""Stream response from OpenRouter API"""
url = "https://openrouter.ai/api/v1/chat/completions"
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": "qwen/qwen3-coder:free",
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
"stream": True,
"temperature": 0.7
}
try:
async with httpx.AsyncClient(timeout=60.0) as client:
async with client.stream("POST", url, json=payload, headers=headers) as response:
if response.status_code != 200:
error_text = await response.aread()
error_data = {"error": f"OpenRouter API Error: {response.status_code}", "details": error_text.decode()}
yield f"data: {json.dumps(error_data)}\n\n"
return
full_content = ""
async for chunk in response.aiter_lines():
if chunk.startswith("data: "):
data = chunk[6:] # Remove "data: " prefix
if data == "[DONE]":
# Send final parsed files
parsed_files = parse_code_blocks(full_content)
final_data = {
"type": "complete",
"files": parsed_files,
"full_content": full_content
}
yield f"data: {json.dumps(final_data)}\n\n"
break
try:
chunk_data = json.loads(data)
if "choices" in chunk_data and chunk_data["choices"]:
delta = chunk_data["choices"][0].get("delta", {})
if "content" in delta:
content = delta["content"]
full_content += content
# Send streaming content
stream_data = {
"type": "content",
"content": content,
"full_content": full_content
}
yield f"data: {json.dumps(stream_data)}\n\n"
except json.JSONDecodeError:
continue
except httpx.TimeoutException:
error_data = {"error": "Request timeout", "details": "The API request timed out. Please try again."}
yield f"data: {json.dumps(error_data)}\n\n"
except httpx.RequestError as e:
error_data = {"error": "Connection error", "details": f"Failed to connect to OpenRouter API: {str(e)}"}
yield f"data: {json.dumps(error_data)}\n\n"
except Exception as e:
error_data = {"error": "Unexpected error", "details": str(e)}
yield f"data: {json.dumps(error_data)}\n\n"
@app.get("/api/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"api_key_configured": bool(OPENROUTER_API_KEY),
"system_prompt_configured": bool(SYSTEM_PROMPT)
}
@app.post("/api/generate")
async def generate_code(request: GenerateRequest):
"""Generate code using OpenRouter API with streaming"""
if not request.prompt.strip():
raise HTTPException(status_code=400, detail="Prompt cannot be empty")
return StreamingResponse(
stream_openrouter_response(request.prompt),
media_type="text/plain",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
}
)
@app.post("/api/download")
async def download_files(files: Dict[str, str]):
"""Create and download ZIP file from parsed files"""
if not files:
raise HTTPException(status_code=400, detail="No files provided")
try:
zip_buffer = create_zip_buffer(files)
return StreamingResponse(
io.BytesIO(zip_buffer.read()),
media_type="application/zip",
headers={
"Content-Disposition": "attachment; filename=generated-code.zip"
}
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to create ZIP file: {str(e)}")
# Serve React static files (for Hugging Face Spaces deployment)
try:
app.mount("/", StaticFiles(directory="/app/frontend/build", html=True), name="frontend")
except Exception:
# For development - React dev server handles frontend
@app.get("/")
async def root():
return {"message": "AI Coding Playground API", "frontend": "Run React dev server separately"}
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", 8001))
uvicorn.run(app, host="0.0.0.0", port=port)