Spaces:
Sleeping
Sleeping
File size: 9,216 Bytes
fe15a7c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 | """FastAPI gateway for code review service."""
import logging
import os
import time
from contextlib import asynccontextmanager
from typing import Annotated, Optional
from fastapi import Depends, FastAPI, HTTPException, Request, Security, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from fastapi.responses import JSONResponse
from app import __version__
from app.config import config
# Lazy import: get_crew imported after env cleanup in lifespan
from app.guardrails import get_guardrail_orchestrator
from app.schemas import HealthResponse, ReviewRequest, ReviewResponse
from app.utils import generate_request_id, sanitize_diff
# Configure logging
config.configure_logging()
logger = logging.getLogger(__name__)
# Security
security = HTTPBearer()
# Rate limiting (simple in-memory store for MVP)
request_timestamps: dict[str, list[float]] = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager."""
logger.info("Starting Code Reviewer CI Agent API")
logger.info(f"Version: {__version__}")
logger.info(f"LLM Provider: {config.llm_provider}")
logger.info(f"LLM Model: {config.llm_model}")
logger.info(f"Ray Serve Enabled: {config.enable_ray_serve}")
# CRITICAL: Clean up unused LLM provider API keys BEFORE importing crew
# CrewAI reads environment variables directly, must remove wrong ones early
if config.llm_provider == "groq":
# Set dummy OPENAI_API_KEY to prevent CrewAI errors (it checks even when not used)
os.environ["OPENAI_API_KEY"] = "sk-dummy-key-not-used"
logger.info("✓ Set dummy OPENAI_API_KEY (using Groq - OpenAI not used)")
elif config.llm_provider == "openai":
os.environ.pop("GROQ_API_KEY", None)
logger.info("✓ Removed GROQ_API_KEY from environment (using OpenAI)")
# Initialize crew (warm up) - import here after env cleanup
try:
from app.crew.crew import get_crew
get_crew()
logger.info("Code review crew initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize crew: {e}")
yield
logger.info("Shutting down Code Reviewer CI Agent API")
# Create FastAPI app
app = FastAPI(
title="Code Reviewer CI Agent",
description="AI-powered code review using CrewAI multi-agent framework",
version=__version__,
lifespan=lifespan,
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=config.cors_origins_list,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Middleware for request logging
@app.middleware("http")
async def log_requests(request: Request, call_next):
"""Log all requests with timing."""
request_id = generate_request_id()
start_time = time.time()
# Add request ID to state
request.state.request_id = request_id
logger.info(
f"[{request_id}] {request.method} {request.url.path} - "
f"Client: {request.client.host if request.client else 'unknown'}"
)
response = await call_next(request)
duration_ms = int((time.time() - start_time) * 1000)
logger.info(
f"[{request_id}] Completed in {duration_ms}ms - Status: {response.status_code}"
)
return response
def verify_api_key(
credentials: Optional[HTTPAuthorizationCredentials] = Security(security)
) -> str:
"""Verify API key from Authorization header.
If review_api_key is empty (demo mode), authentication is disabled.
"""
# Skip authentication if API key is not configured (demo mode)
if not config.review_api_key:
logger.warning("⚠️ Authentication disabled - review_api_key not configured (DEMO MODE)")
return "demo-mode"
if not credentials:
logger.warning("Missing authorization header")
raise HTTPException(
status_code=401,
detail="Missing authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if credentials.credentials != config.review_api_key:
# Log first 10 chars only for security
logger.warning(f"Invalid API key attempt: {credentials.credentials[:10]}...")
raise HTTPException(
status_code=401,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return credentials.credentials
def check_rate_limit(api_key: str) -> None:
"""
Check rate limit for API key.
Args:
api_key: API key to check
Raises:
HTTPException: If rate limit exceeded
"""
current_time = time.time()
minute_ago = current_time - 60
# Clean up old timestamps
if api_key in request_timestamps:
request_timestamps[api_key] = [
ts for ts in request_timestamps[api_key] if ts > minute_ago
]
else:
request_timestamps[api_key] = []
# Check limit
if len(request_timestamps[api_key]) >= config.rate_limit_per_minute:
logger.warning(f"Rate limit exceeded for API key: {api_key[:10]}...")
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"Rate limit exceeded. Maximum {config.rate_limit_per_minute} requests per minute.",
)
# Add current request
request_timestamps[api_key].append(current_time)
@app.get("/health", response_model=HealthResponse, tags=["Health"])
async def health_check() -> HealthResponse:
"""
Health check endpoint.
Returns:
Health status information
"""
return HealthResponse(
status="healthy",
version=__version__,
ray_serve_enabled=config.enable_ray_serve,
llm_provider=config.llm_provider,
)
@app.post("/review", response_model=ReviewResponse, tags=["Review"])
async def review_code(
request: ReviewRequest,
api_key: Annotated[str, Depends(verify_api_key)],
) -> ReviewResponse:
"""
Review code changes using AI agents.
Args:
request: Review request with diff and context
api_key: API key for authentication
Returns:
Structured review response with findings and summary
Raises:
HTTPException: If review fails or timeout occurs
"""
# Check rate limit
check_rate_limit(api_key)
logger.info(f"Received review request for {request.language} code")
try:
# Sanitize diff
sanitized_diff = sanitize_diff(request.diff)
request.diff = sanitized_diff
# Get crew and execute review (lazy import)
from app.crew.crew import get_crew
crew = get_crew()
# Execute with timeout
import asyncio
from concurrent.futures import TimeoutError
try:
# Run crew in thread pool to avoid blocking
loop = asyncio.get_event_loop()
response = await asyncio.wait_for(
loop.run_in_executor(None, crew.review_code, request),
timeout=config.request_timeout_seconds,
)
except asyncio.TimeoutError:
logger.error("Review timed out")
raise HTTPException(
status_code=status.HTTP_504_GATEWAY_TIMEOUT,
detail=f"Review timed out after {config.request_timeout_seconds} seconds",
)
# Apply guardrails
orchestrator = get_guardrail_orchestrator()
response = orchestrator.apply(
response,
context={
"diff": request.diff,
"language": request.language,
},
)
logger.info(
f"Review completed successfully: {len(response.findings)} findings, "
f"score: {response.score:.1f}"
)
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error during code review: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Code review failed: {str(e)}",
)
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""Handle HTTP exceptions with structured error responses."""
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.detail,
"status_code": exc.status_code,
"request_id": getattr(request.state, "request_id", "unknown"),
},
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Handle unexpected exceptions."""
logger.error(f"Unhandled exception: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"error": "Internal server error",
"status_code": 500,
"request_id": getattr(request.state, "request_id", "unknown"),
},
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app.api:app",
host="0.0.0.0",
port=8000,
reload=config.debug,
log_level=config.log_level.lower(),
)
|