| from fastapi import FastAPI |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.responses import FileResponse |
| from pydantic import BaseModel |
| import re, os, requests, base64 |
| from groq import Groq |
|
|
| app = FastAPI() |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
|
|
| def parse_github_url(url): |
| url = url.strip().rstrip('/') |
| m = re.match(r"https://github\.com/([^/]+)/([^/]+)/blob/[^/]+/(.+)", url) |
| if m: |
| return m.group(1), m.group(2), m.group(3) |
| return None, None, None |
|
|
| def get_file_content(owner, repo, path): |
| token = os.getenv("GITHUB_TOKEN", "") |
| headers = {"Authorization": f"token {token}"} if token else {} |
| r = requests.get(f"https://api.github.com/repos/{owner}/{repo}/contents/{path}", headers=headers, timeout=15) |
| if r.status_code != 200: |
| return None, f"GitHub API error: {r.status_code}" |
| return base64.b64decode(r.json()["content"]).decode("utf-8", errors="replace"), None |
|
|
|
|
| def scan_file(owner, repo, file_path): |
| code, err = get_file_content(owner, repo, file_path) |
| if err: |
| return {"error": err} |
| if len(code) > 6000: |
| code = code[:6000] + "\n... [truncated]" |
| groq_key = os.getenv("GROQ_API_KEY", "") |
| if not groq_key: |
| return {"error": "GROQ_API_KEY not configured"} |
| client = Groq(api_key=groq_key) |
| prompt = f"""You are a cybersecurity expert. Analyze this code for security vulnerabilities. |
| |
| File: {file_path} |
| Repository: {owner}/{repo} |
| |
| ``` |
| {code} |
| ``` |
| |
| Provide a detailed security analysis in this exact markdown format: |
| |
| # Security Analysis Report |
| |
| ## File Overview |
| - Repository: {owner}/{repo} |
| - File: {file_path} |
| - Language: [detected language] |
| - Lines analyzed: [count] |
| |
| ## Vulnerabilities Found |
| |
| ### [Vulnerability Name] β [CRITICAL/HIGH/MEDIUM/LOW] |
| - **Line**: [line number] |
| - **Code**: `[snippet]` |
| - **Issue**: [explanation] |
| - **CVE Reference**: [CVE ID if applicable] |
| |
| ## Remediation |
| [Specific fixes with corrected code] |
| |
| ## Risk Summary |
| - Critical: [n] | High: [n] | Medium: [n] | Low: [n] |
| - **Overall Risk**: [CRITICAL/HIGH/MEDIUM/LOW] |
| """ |
| try: |
| response = client.chat.completions.create( |
| model="llama-3.3-70b-versatile", |
| messages=[{"role": "user", "content": prompt}], |
| temperature=0.1, |
| max_tokens=4096, |
| ) |
| return {"result": response.choices[0].message.content} |
| except Exception as e: |
| return {"error": f"AI analysis failed: {str(e)}"} |
|
|
|
|
| |
|
|
| class ScanRequest(BaseModel): |
| url: str |
|
|
| @app.get("/api/health") |
| def health(): |
| return {"status": "ok", "groq_key_set": bool(os.getenv("GROQ_API_KEY"))} |
|
|
| @app.post("/api/scan") |
| def scan(req: ScanRequest): |
| owner, repo, file_path = parse_github_url(req.url) |
| if not owner or not repo or not file_path: |
| return {"error": "Invalid GitHub file URL. Use format: github.com/owner/repo/blob/branch/file"} |
| return scan_file(owner, repo, file_path) |
|
|
| |
|
|
| if os.path.exists("static"): |
| app.mount("/assets", StaticFiles(directory="static/assets"), name="assets") |
|
|
| @app.get("/") |
| def serve_root(): |
| return FileResponse("static/index.html") |
|
|
| @app.get("/{full_path:path}") |
| def serve_spa(full_path: str): |
| return FileResponse("static/index.html") |
|
|