File size: 3,276 Bytes
65f2935
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import re, os, requests, asyncio, concurrent.futures
from dotenv import load_dotenv
from groq import Groq

load_dotenv()

app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

def parse_github_url(url):
    m = re.match(r"https://github\.com/([^/]+)/([^/]+)/blob/[^/]+/(.+)", url.strip())
    if m:
        return m.group(1), m.group(2), m.group(3)
    return None, None, None

def get_file_content(owner, repo, path):
    token = os.getenv("GITHUB_TOKEN", "")
    headers = {"Authorization": f"token {token}"} if (token and token != "your_github_personal_access_token_here") else {}
    url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}"
    r = requests.get(url, headers=headers, timeout=15)
    if r.status_code != 200:
        return None, f"GitHub API error: {r.status_code}"
    import base64
    data = r.json()
    content = base64.b64decode(data["content"]).decode("utf-8", errors="replace")
    return content, None

def run_scan(owner, repo, file_path):
    code, err = get_file_content(owner, repo, file_path)
    if err:
        return {"error": err}

    # Truncate large files
    if len(code) > 8000:
        code = code[:8000] + "\n... [truncated]"

    client = Groq(api_key=os.getenv("GROQ_API_KEY", ""))
    
    prompt = f"""You are a cybersecurity expert. Analyze this code for security vulnerabilities.

File: {file_path}
Repository: {owner}/{repo}

```
{code}
```

Provide a detailed security analysis in this exact markdown format:

# Security Analysis Report

## File Overview
- Repository: {owner}/{repo}
- File: {file_path}
- Language: [detected language]
- Lines analyzed: [count]

## Vulnerabilities Found

[For each vulnerability found:]
### [Vulnerability Name] — [CRITICAL/HIGH/MEDIUM/LOW]
- **Line**: [line number or range]
- **Code**: `[vulnerable snippet]`
- **Issue**: [clear explanation]
- **CVE Reference**: [relevant CVE ID if applicable]

## Remediation
[Specific fix for each vulnerability with corrected code]

## Risk Summary
- Critical: [n] | High: [n] | Medium: [n] | Low: [n]
- **Overall Risk**: [CRITICAL/HIGH/MEDIUM/LOW]
"""

    response = client.chat.completions.create(
        model="llama-3.3-70b-versatile",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.1,
        max_tokens=4096,
    )
    result = response.choices[0].message.content
    return {"result": result}


class ScanRequest(BaseModel):
    url: str


@app.post("/api/scan")
async def scan(req: ScanRequest):
    owner, repo, file_path = parse_github_url(req.url)
    if not owner or not repo or not file_path:
        return {"error": "Invalid GitHub file URL. Must contain /blob/."}
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, run_scan, owner, repo, file_path)
    return result


@app.get("/api/health")
def health():
    return {"status": "ok", "groq_key_set": bool(os.getenv("GROQ_API_KEY"))}


@app.get("/")
def root():
    return {"status": "Vulnerability Scanner API running"}