| from fastapi import FastAPI |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel |
| import re, os, requests, asyncio, concurrent.futures |
| from dotenv import load_dotenv |
| from groq import Groq |
|
|
| load_dotenv() |
|
|
| app = FastAPI() |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| def parse_github_url(url): |
| m = re.match(r"https://github\.com/([^/]+)/([^/]+)/blob/[^/]+/(.+)", url.strip()) |
| if m: |
| return m.group(1), m.group(2), m.group(3) |
| return None, None, None |
|
|
| def get_file_content(owner, repo, path): |
| token = os.getenv("GITHUB_TOKEN", "") |
| headers = {"Authorization": f"token {token}"} if (token and token != "your_github_personal_access_token_here") else {} |
| url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}" |
| r = requests.get(url, headers=headers, timeout=15) |
| if r.status_code != 200: |
| return None, f"GitHub API error: {r.status_code}" |
| import base64 |
| data = r.json() |
| content = base64.b64decode(data["content"]).decode("utf-8", errors="replace") |
| return content, None |
|
|
| def run_scan(owner, repo, file_path): |
| code, err = get_file_content(owner, repo, file_path) |
| if err: |
| return {"error": err} |
|
|
| |
| if len(code) > 8000: |
| code = code[:8000] + "\n... [truncated]" |
|
|
| client = Groq(api_key=os.getenv("GROQ_API_KEY", "")) |
| |
| prompt = f"""You are a cybersecurity expert. Analyze this code for security vulnerabilities. |
| |
| File: {file_path} |
| Repository: {owner}/{repo} |
| |
| ``` |
| {code} |
| ``` |
| |
| Provide a detailed security analysis in this exact markdown format: |
| |
| # Security Analysis Report |
| |
| ## File Overview |
| - Repository: {owner}/{repo} |
| - File: {file_path} |
| - Language: [detected language] |
| - Lines analyzed: [count] |
| |
| ## Vulnerabilities Found |
| |
| [For each vulnerability found:] |
| ### [Vulnerability Name] — [CRITICAL/HIGH/MEDIUM/LOW] |
| - **Line**: [line number or range] |
| - **Code**: `[vulnerable snippet]` |
| - **Issue**: [clear explanation] |
| - **CVE Reference**: [relevant CVE ID if applicable] |
| |
| ## Remediation |
| [Specific fix for each vulnerability with corrected code] |
| |
| ## Risk Summary |
| - Critical: [n] | High: [n] | Medium: [n] | Low: [n] |
| - **Overall Risk**: [CRITICAL/HIGH/MEDIUM/LOW] |
| """ |
|
|
| response = client.chat.completions.create( |
| model="llama-3.3-70b-versatile", |
| messages=[{"role": "user", "content": prompt}], |
| temperature=0.1, |
| max_tokens=4096, |
| ) |
| result = response.choices[0].message.content |
| return {"result": result} |
|
|
|
|
| class ScanRequest(BaseModel): |
| url: str |
|
|
|
|
| @app.post("/api/scan") |
| async def scan(req: ScanRequest): |
| owner, repo, file_path = parse_github_url(req.url) |
| if not owner or not repo or not file_path: |
| return {"error": "Invalid GitHub file URL. Must contain /blob/."} |
| loop = asyncio.get_event_loop() |
| with concurrent.futures.ThreadPoolExecutor() as pool: |
| result = await loop.run_in_executor(pool, run_scan, owner, repo, file_path) |
| return result |
|
|
|
|
| @app.get("/api/health") |
| def health(): |
| return {"status": "ok", "groq_key_set": bool(os.getenv("GROQ_API_KEY"))} |
|
|
|
|
| @app.get("/") |
| def root(): |
| return {"status": "Vulnerability Scanner API running"} |
|
|
|
|