| | """ |
| | FastAPI backend for AnyCoder - provides REST API endpoints |
| | """ |
| | from fastapi import FastAPI, HTTPException, Header, WebSocket, WebSocketDisconnect |
| | from fastapi.middleware.cors import CORSMiddleware |
| | from fastapi.responses import StreamingResponse |
| | from pydantic import BaseModel |
| | from typing import Optional, List, Dict, AsyncGenerator |
| | import json |
| | import asyncio |
| | from datetime import datetime |
| |
|
| | |
| | import sys |
| | import os |
| | from huggingface_hub import InferenceClient |
| |
|
| | |
| | AVAILABLE_MODELS = [ |
| | {"name": "Sherlock Dash Alpha", "id": "openrouter/sherlock-dash-alpha", "description": "Sherlock Dash Alpha model via OpenRouter"}, |
| | {"name": "MiniMax M2", "id": "MiniMaxAI/MiniMax-M2", "description": "MiniMax M2 model via HuggingFace InferenceClient with Novita provider"}, |
| | {"name": "DeepSeek V3.2-Exp", "id": "deepseek-ai/DeepSeek-V3.2-Exp", "description": "DeepSeek V3.2 Experimental via HuggingFace"}, |
| | {"name": "DeepSeek R1", "id": "deepseek-ai/DeepSeek-R1-0528", "description": "DeepSeek R1 model for code generation"}, |
| | {"name": "GPT-5", "id": "gpt-5", "description": "OpenAI GPT-5 via OpenRouter"}, |
| | {"name": "Gemini Flash Latest", "id": "gemini-flash-latest", "description": "Google Gemini Flash via OpenRouter"}, |
| | {"name": "Qwen3 Max Preview", "id": "qwen3-max-preview", "description": "Qwen3 Max Preview via DashScope API"}, |
| | ] |
| |
|
| | LANGUAGE_CHOICES = ["html", "gradio", "transformers.js", "streamlit", "comfyui", "react"] |
| |
|
| | app = FastAPI(title="AnyCoder API", version="1.0.0") |
| |
|
| | |
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["http://localhost:3000", "http://localhost:3001"], |
| | allow_credentials=True, |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| |
|
| | |
| | class CodeGenerationRequest(BaseModel): |
| | query: str |
| | language: str = "html" |
| | model_id: str = "openrouter/sherlock-dash-alpha" |
| | provider: str = "auto" |
| | history: List[List[str]] = [] |
| | agent_mode: bool = False |
| |
|
| |
|
| | class DeploymentRequest(BaseModel): |
| | code: str |
| | space_name: str |
| | language: str |
| | requirements: Optional[str] = None |
| |
|
| |
|
| | class AuthStatus(BaseModel): |
| | authenticated: bool |
| | username: Optional[str] = None |
| | message: str |
| |
|
| |
|
| | class ModelInfo(BaseModel): |
| | name: str |
| | id: str |
| | description: str |
| |
|
| |
|
| | class CodeGenerationResponse(BaseModel): |
| | code: str |
| | history: List[List[str]] |
| | status: str |
| |
|
| |
|
| | |
| | |
| | class MockAuth: |
| | def __init__(self, token: Optional[str] = None): |
| | self.token = token |
| | |
| | if token and token.startswith("dev_token_"): |
| | |
| | parts = token.split("_") |
| | self.username = parts[2] if len(parts) > 2 else "user" |
| | else: |
| | self.username = "user" if token else None |
| | |
| | def is_authenticated(self): |
| | |
| | return bool(self.token) |
| |
|
| |
|
| | def get_auth_from_header(authorization: Optional[str] = None): |
| | """Extract authentication from header""" |
| | if not authorization: |
| | return MockAuth(None) |
| | |
| | |
| | if authorization.startswith("Bearer "): |
| | token = authorization.replace("Bearer ", "") |
| | else: |
| | token = authorization |
| | |
| | return MockAuth(token) |
| |
|
| |
|
| | @app.get("/") |
| | async def root(): |
| | """Health check endpoint""" |
| | return {"status": "ok", "message": "AnyCoder API is running"} |
| |
|
| |
|
| | @app.get("/api/models", response_model=List[ModelInfo]) |
| | async def get_models(): |
| | """Get available AI models""" |
| | return [ |
| | ModelInfo( |
| | name=model["name"], |
| | id=model["id"], |
| | description=model["description"] |
| | ) |
| | for model in AVAILABLE_MODELS |
| | ] |
| |
|
| |
|
| | @app.get("/api/languages") |
| | async def get_languages(): |
| | """Get available programming languages/frameworks""" |
| | return {"languages": LANGUAGE_CHOICES} |
| |
|
| |
|
| | @app.get("/api/auth/status") |
| | async def auth_status(authorization: Optional[str] = Header(None)): |
| | """Check authentication status""" |
| | auth = get_auth_from_header(authorization) |
| | if auth.is_authenticated(): |
| | return AuthStatus( |
| | authenticated=True, |
| | username=auth.username, |
| | message=f"Authenticated as {auth.username}" |
| | ) |
| | return AuthStatus( |
| | authenticated=False, |
| | username=None, |
| | message="Not authenticated" |
| | ) |
| |
|
| |
|
| | @app.get("/api/generate") |
| | async def generate_code( |
| | query: str, |
| | language: str = "html", |
| | model_id: str = "openrouter/sherlock-dash-alpha", |
| | provider: str = "auto", |
| | authorization: Optional[str] = Header(None) |
| | ): |
| | """Generate code based on user query - returns streaming response""" |
| | |
| | |
| | |
| | async def event_stream() -> AsyncGenerator[str, None]: |
| | """Stream generated code chunks""" |
| | try: |
| | |
| | selected_model = None |
| | for model in AVAILABLE_MODELS: |
| | if model["id"] == model_id: |
| | selected_model = model |
| | break |
| | |
| | if not selected_model: |
| | selected_model = AVAILABLE_MODELS[0] |
| | |
| | |
| | generated_code = "" |
| | |
| | |
| | system_prompt = "You are a helpful AI assistant that generates code based on user requirements. Generate clean, well-commented code." |
| | |
| | |
| | actual_model_id = selected_model["id"] |
| | |
| | |
| | if actual_model_id.startswith("openrouter/"): |
| | |
| | api_key = os.getenv("OPENROUTER_API_KEY") or os.getenv("HF_TOKEN") |
| | client = InferenceClient(api_key=api_key, provider="openai", base_url="https://openrouter.ai/api/v1") |
| | |
| | elif actual_model_id == "MiniMaxAI/MiniMax-M2": |
| | |
| | hf_token = os.getenv("HF_TOKEN") |
| | if not hf_token: |
| | error_data = json.dumps({ |
| | "type": "error", |
| | "message": "HF_TOKEN environment variable not set. Please set it in your terminal.", |
| | "timestamp": datetime.now().isoformat() |
| | }) |
| | yield f"data: {error_data}\n\n" |
| | return |
| | |
| | |
| | from openai import OpenAI |
| | client = OpenAI( |
| | base_url="https://router.huggingface.co/v1", |
| | api_key=hf_token, |
| | default_headers={ |
| | "X-HF-Bill-To": "huggingface" |
| | } |
| | ) |
| | |
| | actual_model_id = "MiniMaxAI/MiniMax-M2:novita" |
| | elif actual_model_id.startswith("deepseek-ai/"): |
| | |
| | client = InferenceClient(token=os.getenv("HF_TOKEN")) |
| | elif actual_model_id == "qwen3-max-preview": |
| | |
| | |
| | client = InferenceClient(token=os.getenv("HF_TOKEN")) |
| | else: |
| | |
| | client = InferenceClient(token=os.getenv("HF_TOKEN")) |
| | |
| | |
| | messages = [ |
| | {"role": "system", "content": system_prompt}, |
| | {"role": "user", "content": f"Generate a {language} application: {query}"} |
| | ] |
| | |
| | |
| | try: |
| | stream = client.chat.completions.create( |
| | model=actual_model_id, |
| | messages=messages, |
| | temperature=0.7, |
| | max_tokens=10000, |
| | stream=True |
| | ) |
| | |
| | for chunk in stream: |
| | |
| | if (hasattr(chunk, 'choices') and |
| | chunk.choices and |
| | len(chunk.choices) > 0 and |
| | hasattr(chunk.choices[0], 'delta') and |
| | hasattr(chunk.choices[0].delta, 'content') and |
| | chunk.choices[0].delta.content): |
| | content = chunk.choices[0].delta.content |
| | generated_code += content |
| | |
| | |
| | event_data = json.dumps({ |
| | "type": "chunk", |
| | "content": content, |
| | "timestamp": datetime.now().isoformat() |
| | }) |
| | yield f"data: {event_data}\n\n" |
| | await asyncio.sleep(0) |
| | |
| | |
| | completion_data = json.dumps({ |
| | "type": "complete", |
| | "code": generated_code, |
| | "timestamp": datetime.now().isoformat() |
| | }) |
| | yield f"data: {completion_data}\n\n" |
| | |
| | except Exception as e: |
| | error_data = json.dumps({ |
| | "type": "error", |
| | "message": str(e), |
| | "timestamp": datetime.now().isoformat() |
| | }) |
| | yield f"data: {error_data}\n\n" |
| | |
| | except Exception as e: |
| | error_data = json.dumps({ |
| | "type": "error", |
| | "message": f"Generation error: {str(e)}", |
| | "timestamp": datetime.now().isoformat() |
| | }) |
| | yield f"data: {error_data}\n\n" |
| | |
| | return StreamingResponse( |
| | event_stream(), |
| | media_type="text/event-stream", |
| | headers={ |
| | "Cache-Control": "no-cache", |
| | "Connection": "keep-alive", |
| | "X-Accel-Buffering": "no" |
| | } |
| | ) |
| |
|
| |
|
| | @app.post("/api/deploy") |
| | async def deploy( |
| | request: DeploymentRequest, |
| | authorization: Optional[str] = Header(None) |
| | ): |
| | """Deploy generated code to HuggingFace Spaces""" |
| | auth = get_auth_from_header(authorization) |
| | |
| | if not auth.is_authenticated(): |
| | raise HTTPException(status_code=401, detail="Authentication required") |
| | |
| | |
| | if auth.token and auth.token.startswith("dev_token_"): |
| | |
| | import urllib.parse |
| | base_url = "https://huggingface.co/new-space" |
| | |
| | |
| | language_to_sdk = { |
| | "gradio": "gradio", |
| | "streamlit": "docker", |
| | "react": "docker", |
| | "html": "static", |
| | "transformers.js": "static", |
| | "comfyui": "static" |
| | } |
| | sdk = language_to_sdk.get(request.language, "gradio") |
| | |
| | params = urllib.parse.urlencode({ |
| | "name": request.space_name or "my-anycoder-app", |
| | "sdk": sdk |
| | }) |
| | |
| | |
| | if request.language in ["html", "transformers.js", "comfyui"]: |
| | file_path = "index.html" |
| | else: |
| | file_path = "app.py" |
| | |
| | files_params = urllib.parse.urlencode({ |
| | "files[0][path]": file_path, |
| | "files[0][content]": request.code |
| | }) |
| | |
| | space_url = f"{base_url}?{params}&{files_params}" |
| | |
| | return { |
| | "success": True, |
| | "space_url": space_url, |
| | "message": "Dev mode: Please create the space manually", |
| | "dev_mode": True |
| | } |
| | |
| | |
| | try: |
| | from huggingface_hub import HfApi |
| | import tempfile |
| | import uuid |
| | |
| | |
| | user_token = auth.token if auth.token else os.getenv("HF_TOKEN") |
| | |
| | if not user_token: |
| | raise HTTPException(status_code=401, detail="No HuggingFace token available") |
| | |
| | |
| | api = HfApi(token=user_token) |
| | |
| | |
| | space_name = request.space_name or f"anycoder-{uuid.uuid4().hex[:8]}" |
| | repo_id = f"{auth.username}/{space_name}" |
| | |
| | |
| | language_to_sdk = { |
| | "gradio": "gradio", |
| | "streamlit": "docker", |
| | "react": "docker", |
| | "html": "static", |
| | "transformers.js": "static", |
| | "comfyui": "static" |
| | } |
| | sdk = language_to_sdk.get(request.language, "gradio") |
| | |
| | |
| | try: |
| | api.create_repo( |
| | repo_id=repo_id, |
| | repo_type="space", |
| | space_sdk=sdk, |
| | exist_ok=False |
| | ) |
| | except Exception as e: |
| | if "already exists" in str(e).lower(): |
| | |
| | pass |
| | else: |
| | raise |
| | |
| | |
| | if request.language in ["html", "transformers.js", "comfyui"]: |
| | file_name = "index.html" |
| | else: |
| | file_name = "app.py" |
| | |
| | with tempfile.NamedTemporaryFile("w", suffix=f".{file_name.split('.')[-1]}", delete=False) as f: |
| | f.write(request.code) |
| | temp_path = f.name |
| | |
| | try: |
| | api.upload_file( |
| | path_or_fileobj=temp_path, |
| | path_in_repo=file_name, |
| | repo_id=repo_id, |
| | repo_type="space" |
| | ) |
| | |
| | |
| | if request.language == "gradio": |
| | |
| | requirements = "gradio>=4.0.0\n" |
| | with tempfile.NamedTemporaryFile("w", suffix=".txt", delete=False) as req_f: |
| | req_f.write(requirements) |
| | req_temp_path = req_f.name |
| | |
| | try: |
| | api.upload_file( |
| | path_or_fileobj=req_temp_path, |
| | path_in_repo="requirements.txt", |
| | repo_id=repo_id, |
| | repo_type="space" |
| | ) |
| | finally: |
| | os.unlink(req_temp_path) |
| | |
| | space_url = f"https://huggingface.co/spaces/{repo_id}" |
| | |
| | return { |
| | "success": True, |
| | "space_url": space_url, |
| | "message": f"✅ Deployed successfully to {repo_id}!" |
| | } |
| | |
| | finally: |
| | os.unlink(temp_path) |
| | |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=f"Deployment failed: {str(e)}") |
| |
|
| |
|
| | @app.websocket("/ws/generate") |
| | async def websocket_generate(websocket: WebSocket): |
| | """WebSocket endpoint for real-time code generation""" |
| | await websocket.accept() |
| | |
| | try: |
| | while True: |
| | |
| | data = await websocket.receive_json() |
| | |
| | query = data.get("query") |
| | language = data.get("language", "html") |
| | model_id = data.get("model_id", "openrouter/sherlock-dash-alpha") |
| | |
| | |
| | await websocket.send_json({ |
| | "type": "status", |
| | "message": "Generating code..." |
| | }) |
| | |
| | |
| | await asyncio.sleep(0.5) |
| | |
| | |
| | sample_code = f"<!-- Generated {language} code -->\n<h1>Hello from AnyCoder!</h1>" |
| | |
| | for i, char in enumerate(sample_code): |
| | await websocket.send_json({ |
| | "type": "chunk", |
| | "content": char, |
| | "progress": (i + 1) / len(sample_code) * 100 |
| | }) |
| | await asyncio.sleep(0.01) |
| | |
| | |
| | await websocket.send_json({ |
| | "type": "complete", |
| | "code": sample_code |
| | }) |
| | |
| | except WebSocketDisconnect: |
| | print("Client disconnected") |
| | except Exception as e: |
| | await websocket.send_json({ |
| | "type": "error", |
| | "message": str(e) |
| | }) |
| | await websocket.close() |
| |
|
| |
|
| | if __name__ == "__main__": |
| | import uvicorn |
| | uvicorn.run("backend_api:app", host="0.0.0.0", port=8000, reload=True) |
| |
|
| |
|