AdityaDevx's picture
fix: better 404 error
65f2935
raw
history blame
3.28 kB
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import re, os, requests, asyncio, concurrent.futures
from dotenv import load_dotenv
from groq import Groq
load_dotenv()
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def parse_github_url(url):
m = re.match(r"https://github\.com/([^/]+)/([^/]+)/blob/[^/]+/(.+)", url.strip())
if m:
return m.group(1), m.group(2), m.group(3)
return None, None, None
def get_file_content(owner, repo, path):
token = os.getenv("GITHUB_TOKEN", "")
headers = {"Authorization": f"token {token}"} if (token and token != "your_github_personal_access_token_here") else {}
url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}"
r = requests.get(url, headers=headers, timeout=15)
if r.status_code != 200:
return None, f"GitHub API error: {r.status_code}"
import base64
data = r.json()
content = base64.b64decode(data["content"]).decode("utf-8", errors="replace")
return content, None
def run_scan(owner, repo, file_path):
code, err = get_file_content(owner, repo, file_path)
if err:
return {"error": err}
# Truncate large files
if len(code) > 8000:
code = code[:8000] + "\n... [truncated]"
client = Groq(api_key=os.getenv("GROQ_API_KEY", ""))
prompt = f"""You are a cybersecurity expert. Analyze this code for security vulnerabilities.
File: {file_path}
Repository: {owner}/{repo}
```
{code}
```
Provide a detailed security analysis in this exact markdown format:
# Security Analysis Report
## File Overview
- Repository: {owner}/{repo}
- File: {file_path}
- Language: [detected language]
- Lines analyzed: [count]
## Vulnerabilities Found
[For each vulnerability found:]
### [Vulnerability Name] — [CRITICAL/HIGH/MEDIUM/LOW]
- **Line**: [line number or range]
- **Code**: `[vulnerable snippet]`
- **Issue**: [clear explanation]
- **CVE Reference**: [relevant CVE ID if applicable]
## Remediation
[Specific fix for each vulnerability with corrected code]
## Risk Summary
- Critical: [n] | High: [n] | Medium: [n] | Low: [n]
- **Overall Risk**: [CRITICAL/HIGH/MEDIUM/LOW]
"""
response = client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[{"role": "user", "content": prompt}],
temperature=0.1,
max_tokens=4096,
)
result = response.choices[0].message.content
return {"result": result}
class ScanRequest(BaseModel):
url: str
@app.post("/api/scan")
async def scan(req: ScanRequest):
owner, repo, file_path = parse_github_url(req.url)
if not owner or not repo or not file_path:
return {"error": "Invalid GitHub file URL. Must contain /blob/."}
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, run_scan, owner, repo, file_path)
return result
@app.get("/api/health")
def health():
return {"status": "ok", "groq_key_set": bool(os.getenv("GROQ_API_KEY"))}
@app.get("/")
def root():
return {"status": "Vulnerability Scanner API running"}