File size: 2,919 Bytes
7b4f5dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
"""
CodeSentry Backend — FastAPI Application
AI Security Copilot for AI-Generated Code

Endpoints:
  POST /api/scan         — Initiate a scan, returns scanId
  GET  /api/scan/stream/{scanId} — SSE stream of agent events
  GET  /api/health       — Health check
"""

import asyncio
import json
import uuid
from typing import AsyncGenerator

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

from agents.orchestrator import run_scan_pipeline

app = FastAPI(
    title="CodeSentry API",
    description="AI Security Copilot — Backend API",
    version="1.0.0",
)

# CORS for Vite dev server
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:5173", "http://localhost:5174", "http://localhost:3000", "*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.get("/")
async def root():
    return {
        "status": "online",
        "name": "CodeSentry AI Security API",
        "version": "1.0.0",
        "endpoints": {
            "health": "/api/health",
            "docs": "/docs",
            "scan": "/api/scan"
        }
    }


# In-memory scan registry
scans: dict[str, dict] = {}


class ScanRequest(BaseModel):
    type: str  # "github" | "code"
    url: str | None = None
    code: str | None = None
    language: str | None = "python"


@app.get("/api/health")
async def health():
    return {"status": "ok", "service": "codesentry-api"}


@app.post("/api/scan")
async def create_scan(request: ScanRequest):
    scan_id = f"cs-{uuid.uuid4().hex[:8]}"
    scans[scan_id] = {
        "id": scan_id,
        "request": request.dict(),
        "status": "pending",
        "events": [],
    }
    return {"scanId": scan_id, "status": "pending"}


@app.get("/api/scan/stream/{scan_id}")
async def stream_scan(scan_id: str):
    if scan_id not in scans:
        async def error_stream():
            yield f"event: error\ndata: {json.dumps({'message': 'Scan not found'})}\n\n"
        return StreamingResponse(error_stream(), media_type="text/event-stream")

    scan = scans[scan_id]
    request = ScanRequest(**scan["request"])

    async def event_stream() -> AsyncGenerator[str, None]:
        try:
            async for event_type, event_data in run_scan_pipeline(request):
                payload = json.dumps(event_data)
                yield f"event: {event_type}\ndata: {payload}\n\n"
                await asyncio.sleep(0)
        except Exception as e:
            error_payload = json.dumps({"message": str(e)})
            yield f"event: error\ndata: {error_payload}\n\n"

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
            "Connection": "keep-alive",
        },
    )