Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| from contextlib import contextmanager | |
| from pathlib import Path | |
| from typing import Any | |
| from dotenv import load_dotenv | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel | |
| try: | |
| import psycopg | |
| except ImportError: # pragma: no cover | |
| psycopg = None | |
| from ai_agent import handle_task | |
| load_dotenv() | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") | |
| logger = logging.getLogger(__name__) | |
| BASE_DIR = Path(__file__).resolve().parent | |
| FRONTEND_BUILD_DIR = BASE_DIR / "frontend" / "build" | |
| DATABASE_URL = os.getenv("DATABASE_URL", "").strip() | |
| app = FastAPI(title="Syntax AI Server") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| class RequestModel(BaseModel): | |
| taskType: str | |
| prompt: str | |
| class AuthRequest(BaseModel): | |
| username: str | |
| password: str | |
| action: str | |
| class StoreOptionRequest(BaseModel): | |
| userId: int | None = None | |
| option: str | |
| language: str | None = None | |
| codePrompt: str | None = None | |
| modifyCode: str | None = None | |
| modifyLogic: str | None = None | |
| class AnalyzeCodeRequest(BaseModel): | |
| userId: int | None = None | |
| originalCode: str | |
| modifiedCode: str | |
| memory_users: dict[str, dict[str, Any]] = {} | |
| memory_activity: list[dict[str, Any]] = [] | |
| memory_user_id = 0 | |
| def has_database() -> bool: | |
| return bool(DATABASE_URL and psycopg is not None) | |
| def get_db_connection(): | |
| if not has_database(): | |
| raise RuntimeError("DATABASE_URL is not configured.") | |
| conn = psycopg.connect(DATABASE_URL) | |
| try: | |
| yield conn | |
| finally: | |
| conn.close() | |
| def setup_database() -> None: | |
| if not has_database(): | |
| logger.warning("DATABASE_URL not set. Using in-memory auth and activity storage.") | |
| return | |
| try: | |
| with get_db_connection() as conn: | |
| with conn.cursor() as cur: | |
| cur.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS users ( | |
| id SERIAL PRIMARY KEY, | |
| username TEXT NOT NULL UNIQUE, | |
| password TEXT NOT NULL | |
| ) | |
| """ | |
| ) | |
| cur.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS user_activity ( | |
| id SERIAL PRIMARY KEY, | |
| user_id INTEGER REFERENCES users(id) ON DELETE SET NULL, | |
| user_option TEXT, | |
| language TEXT, | |
| qn TEXT, | |
| modify_code_input TEXT, | |
| modify_code_logic TEXT, | |
| output TEXT, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """ | |
| ) | |
| conn.commit() | |
| logger.info("Database tables are ready.") | |
| except Exception: | |
| logger.exception("Database initialization failed. Falling back to in-memory mode.") | |
| def on_startup() -> None: | |
| setup_database() | |
| def run_ai_task(prompt: str) -> str: | |
| result = handle_task(prompt) | |
| if hasattr(result, "content"): | |
| return str(result.content) | |
| if isinstance(result, dict): | |
| return str(result) | |
| return str(result) | |
| def create_memory_user(username: str, password: str) -> int: | |
| global memory_user_id | |
| if username in memory_users: | |
| raise ValueError("Username already exists.") | |
| memory_user_id += 1 | |
| memory_users[username] = {"id": memory_user_id, "password": password} | |
| return memory_user_id | |
| async def process_request(request: RequestModel): | |
| try: | |
| return {"status": "success", "result": run_ai_task(request.prompt)} | |
| except Exception as exc: | |
| logger.exception("Error while processing AI request.") | |
| raise HTTPException(status_code=500, detail=str(exc)) from exc | |
| async def auth(request: AuthRequest): | |
| try: | |
| if has_database(): | |
| with get_db_connection() as conn: | |
| with conn.cursor() as cur: | |
| if request.action == "signup": | |
| cur.execute( | |
| "INSERT INTO users (username, password) VALUES (%s, %s) RETURNING id", | |
| (request.username, request.password), | |
| ) | |
| user_id = cur.fetchone()[0] | |
| conn.commit() | |
| return {"success": True, "message": "Signup successful!", "userId": user_id} | |
| cur.execute( | |
| "SELECT id FROM users WHERE username = %s AND password = %s", | |
| (request.username, request.password), | |
| ) | |
| row = cur.fetchone() | |
| if row: | |
| return {"success": True, "message": "Login successful!", "userId": row[0]} | |
| return {"success": False, "message": "Invalid credentials."} | |
| if request.action == "signup": | |
| user_id = create_memory_user(request.username, request.password) | |
| return {"success": True, "message": "Signup successful!", "userId": user_id} | |
| user = memory_users.get(request.username) | |
| if user and user["password"] == request.password: | |
| return {"success": True, "message": "Login successful!", "userId": user["id"]} | |
| return {"success": False, "message": "Invalid credentials."} | |
| except Exception as exc: | |
| logger.exception("Authentication error.") | |
| return {"success": False, "message": f"DB Error: {exc}"} | |
| async def store_option(request: StoreOptionRequest): | |
| try: | |
| final_prompt = "" | |
| if request.option == "Generate Code": | |
| final_prompt = request.codePrompt or "" | |
| elif request.option == "Modify Code": | |
| final_prompt = request.modifyCode or "" | |
| ai_output = run_ai_task(final_prompt) | |
| if has_database(): | |
| with get_db_connection() as conn: | |
| with conn.cursor() as cur: | |
| cur.execute( | |
| """ | |
| INSERT INTO user_activity ( | |
| user_id, user_option, language, qn, | |
| modify_code_input, modify_code_logic, output | |
| ) | |
| VALUES (%s, %s, %s, %s, %s, %s, %s) | |
| """, | |
| ( | |
| request.userId, | |
| request.option, | |
| request.language, | |
| final_prompt, | |
| request.modifyCode if request.option == "Modify Code" else None, | |
| request.modifyLogic if request.option == "Modify Code" else None, | |
| ai_output, | |
| ), | |
| ) | |
| conn.commit() | |
| else: | |
| memory_activity.append( | |
| { | |
| "user_id": request.userId, | |
| "user_option": request.option, | |
| "language": request.language, | |
| "qn": final_prompt, | |
| "modify_code_input": request.modifyCode if request.option == "Modify Code" else None, | |
| "modify_code_logic": request.modifyLogic if request.option == "Modify Code" else None, | |
| "output": ai_output, | |
| } | |
| ) | |
| return { | |
| "success": True, | |
| "message": f"{request.option} data stored successfully!", | |
| "aiOutput": ai_output, | |
| } | |
| except Exception: | |
| logger.exception("Error in store-option.") | |
| return {"success": False, "message": "Error processing request."} | |
| async def analyze_code(request: AnalyzeCodeRequest): | |
| analysis_prompt = f"""You are a code efficiency analyzer. Compare the ORIGINAL and MODIFIED code below and provide a detailed efficiency analysis. | |
| ORIGINAL CODE: | |
| {request.originalCode} | |
| MODIFIED CODE: | |
| {request.modifiedCode} | |
| IMPORTANT: You MUST include numerical scores in your analysis. Use the exact format "Label: X/10" for each metric. Include ALL of the following scored sections: | |
| TIME COMPLEXITY: | |
| - Time Original: [score]/10 β [analysis] | |
| - Time Modified: [score]/10 β [analysis] | |
| - Best/Average/Worst case analysis | |
| SPACE COMPLEXITY: | |
| - Space Original: [score]/10 β [analysis] | |
| - Space Modified: [score]/10 β [analysis] | |
| EXECUTION SPEED: | |
| - Speed Original: [score]/10 β [assessment] | |
| - Speed Modified: [score]/10 β [assessment] | |
| - Bottlenecks identified | |
| CODE READABILITY: | |
| - Readability Original: [score]/10 β [assessment] | |
| - Readability Modified: [score]/10 β [assessment] | |
| MAINTAINABILITY: | |
| - Maintainability Original: [score]/10 β [assessment] | |
| - Maintainability Modified: [score]/10 β [assessment] | |
| BEST PRACTICES: | |
| - Practices Original: [score]/10 β [compliance] | |
| - Practices Modified: [score]/10 β [compliance] | |
| SUMMARY: [Overall comparative summary] | |
| OVERALL EFFICIENCY SCORE: [score]/10 β [brief justification] | |
| Higher scores = better efficiency. Be precise with your /10 ratings.""" | |
| try: | |
| result = run_ai_task(analysis_prompt) | |
| return {"success": True, "message": "Analysis complete!", "analysisResult": result} | |
| except Exception: | |
| logger.exception("Error analyzing code.") | |
| return {"success": False, "message": "Error analyzing code. Please try again later."} | |
| if FRONTEND_BUILD_DIR.exists(): | |
| app.mount("/static", StaticFiles(directory=FRONTEND_BUILD_DIR / "static"), name="static") | |
| async def healthcheck(): | |
| return {"status": "ok"} | |
| async def serve_index(): | |
| if not FRONTEND_BUILD_DIR.exists(): | |
| raise HTTPException( | |
| status_code=404, | |
| detail="Frontend build not found. Run the React build or deploy with the Dockerfile.", | |
| ) | |
| return FileResponse(FRONTEND_BUILD_DIR / "index.html") | |
| async def serve_frontend(full_path: str): | |
| if not FRONTEND_BUILD_DIR.exists(): | |
| raise HTTPException( | |
| status_code=404, | |
| detail="Frontend build not found. Run the React build or deploy with the Dockerfile.", | |
| ) | |
| candidate = FRONTEND_BUILD_DIR / full_path | |
| if full_path and candidate.exists() and candidate.is_file(): | |
| return FileResponse(candidate) | |
| return FileResponse(FRONTEND_BUILD_DIR / "index.html") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", "7860"))) | |