import logging import os from contextlib import contextmanager from pathlib import Path from typing import Any from dotenv import load_dotenv from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel try: import psycopg except ImportError: # pragma: no cover psycopg = None from ai_agent import handle_task load_dotenv() logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logger = logging.getLogger(__name__) BASE_DIR = Path(__file__).resolve().parent FRONTEND_BUILD_DIR = BASE_DIR / "frontend" / "build" DATABASE_URL = os.getenv("DATABASE_URL", "").strip() app = FastAPI(title="Syntax AI Server") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class RequestModel(BaseModel): taskType: str prompt: str class AuthRequest(BaseModel): username: str password: str action: str class StoreOptionRequest(BaseModel): userId: int | None = None option: str language: str | None = None codePrompt: str | None = None modifyCode: str | None = None modifyLogic: str | None = None class AnalyzeCodeRequest(BaseModel): userId: int | None = None originalCode: str modifiedCode: str memory_users: dict[str, dict[str, Any]] = {} memory_activity: list[dict[str, Any]] = [] memory_user_id = 0 def has_database() -> bool: return bool(DATABASE_URL and psycopg is not None) @contextmanager def get_db_connection(): if not has_database(): raise RuntimeError("DATABASE_URL is not configured.") conn = psycopg.connect(DATABASE_URL) try: yield conn finally: conn.close() def setup_database() -> None: if not has_database(): logger.warning("DATABASE_URL not set. Using in-memory auth and activity storage.") return try: with get_db_connection() as conn: with conn.cursor() as cur: cur.execute( """ CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, username TEXT NOT NULL UNIQUE, password TEXT NOT NULL ) """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS user_activity ( id SERIAL PRIMARY KEY, user_id INTEGER REFERENCES users(id) ON DELETE SET NULL, user_option TEXT, language TEXT, qn TEXT, modify_code_input TEXT, modify_code_logic TEXT, output TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ ) conn.commit() logger.info("Database tables are ready.") except Exception: logger.exception("Database initialization failed. Falling back to in-memory mode.") @app.on_event("startup") def on_startup() -> None: setup_database() def run_ai_task(prompt: str) -> str: result = handle_task(prompt) if hasattr(result, "content"): return str(result.content) if isinstance(result, dict): return str(result) return str(result) def create_memory_user(username: str, password: str) -> int: global memory_user_id if username in memory_users: raise ValueError("Username already exists.") memory_user_id += 1 memory_users[username] = {"id": memory_user_id, "password": password} return memory_user_id @app.post("/process-request") @app.post("/api/process-request") async def process_request(request: RequestModel): try: return {"status": "success", "result": run_ai_task(request.prompt)} except Exception as exc: logger.exception("Error while processing AI request.") raise HTTPException(status_code=500, detail=str(exc)) from exc @app.post("/api/auth") async def auth(request: AuthRequest): try: if has_database(): with get_db_connection() as conn: with conn.cursor() as cur: if request.action == "signup": cur.execute( "INSERT INTO users (username, password) VALUES (%s, %s) RETURNING id", (request.username, request.password), ) user_id = cur.fetchone()[0] conn.commit() return {"success": True, "message": "Signup successful!", "userId": user_id} cur.execute( "SELECT id FROM users WHERE username = %s AND password = %s", (request.username, request.password), ) row = cur.fetchone() if row: return {"success": True, "message": "Login successful!", "userId": row[0]} return {"success": False, "message": "Invalid credentials."} if request.action == "signup": user_id = create_memory_user(request.username, request.password) return {"success": True, "message": "Signup successful!", "userId": user_id} user = memory_users.get(request.username) if user and user["password"] == request.password: return {"success": True, "message": "Login successful!", "userId": user["id"]} return {"success": False, "message": "Invalid credentials."} except Exception as exc: logger.exception("Authentication error.") return {"success": False, "message": f"DB Error: {exc}"} @app.post("/api/store-option") async def store_option(request: StoreOptionRequest): try: final_prompt = "" if request.option == "Generate Code": final_prompt = request.codePrompt or "" elif request.option == "Modify Code": final_prompt = request.modifyCode or "" ai_output = run_ai_task(final_prompt) if has_database(): with get_db_connection() as conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO user_activity ( user_id, user_option, language, qn, modify_code_input, modify_code_logic, output ) VALUES (%s, %s, %s, %s, %s, %s, %s) """, ( request.userId, request.option, request.language, final_prompt, request.modifyCode if request.option == "Modify Code" else None, request.modifyLogic if request.option == "Modify Code" else None, ai_output, ), ) conn.commit() else: memory_activity.append( { "user_id": request.userId, "user_option": request.option, "language": request.language, "qn": final_prompt, "modify_code_input": request.modifyCode if request.option == "Modify Code" else None, "modify_code_logic": request.modifyLogic if request.option == "Modify Code" else None, "output": ai_output, } ) return { "success": True, "message": f"{request.option} data stored successfully!", "aiOutput": ai_output, } except Exception: logger.exception("Error in store-option.") return {"success": False, "message": "Error processing request."} @app.post("/api/analyze-code") async def analyze_code(request: AnalyzeCodeRequest): analysis_prompt = f"""You are a code efficiency analyzer. Compare the ORIGINAL and MODIFIED code below and provide a detailed efficiency analysis. ORIGINAL CODE: {request.originalCode} MODIFIED CODE: {request.modifiedCode} IMPORTANT: You MUST include numerical scores in your analysis. Use the exact format "Label: X/10" for each metric. Include ALL of the following scored sections: TIME COMPLEXITY: - Time Original: [score]/10 — [analysis] - Time Modified: [score]/10 — [analysis] - Best/Average/Worst case analysis SPACE COMPLEXITY: - Space Original: [score]/10 — [analysis] - Space Modified: [score]/10 — [analysis] EXECUTION SPEED: - Speed Original: [score]/10 — [assessment] - Speed Modified: [score]/10 — [assessment] - Bottlenecks identified CODE READABILITY: - Readability Original: [score]/10 — [assessment] - Readability Modified: [score]/10 — [assessment] MAINTAINABILITY: - Maintainability Original: [score]/10 — [assessment] - Maintainability Modified: [score]/10 — [assessment] BEST PRACTICES: - Practices Original: [score]/10 — [compliance] - Practices Modified: [score]/10 — [compliance] SUMMARY: [Overall comparative summary] OVERALL EFFICIENCY SCORE: [score]/10 — [brief justification] Higher scores = better efficiency. Be precise with your /10 ratings.""" try: result = run_ai_task(analysis_prompt) return {"success": True, "message": "Analysis complete!", "analysisResult": result} except Exception: logger.exception("Error analyzing code.") return {"success": False, "message": "Error analyzing code. Please try again later."} if FRONTEND_BUILD_DIR.exists(): app.mount("/static", StaticFiles(directory=FRONTEND_BUILD_DIR / "static"), name="static") @app.get("/healthz") @app.head("/healthz") async def healthcheck(): return {"status": "ok"} @app.get("/") @app.head("/") async def serve_index(): if not FRONTEND_BUILD_DIR.exists(): raise HTTPException( status_code=404, detail="Frontend build not found. Run the React build or deploy with the Dockerfile.", ) return FileResponse(FRONTEND_BUILD_DIR / "index.html") @app.get("/{full_path:path}") @app.head("/{full_path:path}") async def serve_frontend(full_path: str): if not FRONTEND_BUILD_DIR.exists(): raise HTTPException( status_code=404, detail="Frontend build not found. Run the React build or deploy with the Dockerfile.", ) candidate = FRONTEND_BUILD_DIR / full_path if full_path and candidate.exists() and candidate.is_file(): return FileResponse(candidate) return FileResponse(FRONTEND_BUILD_DIR / "index.html") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", "7860")))