| from groq import Groq |
| import json |
| import time |
| import re |
|
|
| class AIEngine: |
| def __init__(self, api_key): |
| self.client = Groq(api_key=api_key) |
| self.model_id = 'llama-3.3-70b-versatile' |
|
|
| def enrich_findings(self, vulns, log_callback): |
| log_callback("🧠 AI THINKING: Analyzing threat landscape via Groq LPUs...") |
| |
| grouped_vulns = {} |
| for v in vulns: |
| v_type = v['type'] |
| if v_type not in grouped_vulns: |
| grouped_vulns[v_type] = v |
|
|
| enriched_library = {} |
| |
| for v_type, v_data in grouped_vulns.items(): |
| prompt = f""" |
| Analyze this vulnerability: {v_type} |
| Provide: |
| 1. Impact: Real-world exploitation scenario. |
| 2. Remediation: Technical fix. |
| Output JSON format: {{"impact": "...", "remediation": "..."}} |
| """ |
| try: |
| response = self.client.chat.completions.create( |
| model=self.model_id, |
| messages=[ |
| {"role": "system", "content": "You are a cyber security auditor. Always output valid JSON."}, |
| {"role": "user", "content": prompt} |
| ], |
| response_format={"type": "json_object"}, |
| temperature=0.2 |
| ) |
| |
| clean_json = response.choices[0].message.content |
| enriched_library[v_type] = json.loads(clean_json) |
| time.sleep(0.5) |
| except Exception: |
| enriched_library[v_type] = { |
| "impact": "Potential security compromise.", |
| "remediation": "Review OWASP guidelines." |
| } |
|
|
| for v in vulns: |
| if v['type'] in enriched_library: |
| v['impact'] = enriched_library[v['type']]['impact'] |
| v['remediation'] = enriched_library[v['type']]['remediation'] |
| |
| return vulns |
|
|
| def audit_code(self, file_path, code_content, log_callback): |
| if len(code_content) > 20000: |
| code_content = code_content[:20000] + "\n...[TRUNCATED]..." |
|
|
| |
| prompt = f""" |
| Review this file for SECURITY VULNERABILITIES (Secrets, SQLi, XSS, RCE, Auth Bypass, IDOR, Insecure Config). |
| File: {file_path} |
| Code: |
| {code_content} |
| |
| Task: |
| 1. Ignore best practices. Focus ONLY on severe vulnerabilities. |
| 2. If Clean: Return "vulnerable": false. |
| 3. If Vulnerable: Return "vulnerable": true and detailed findings. |
| |
| Output JSON format exactly like this: |
| {{ |
| "vulnerable": true, |
| "findings": [ |
| {{ |
| "type": "Vulnerability Name (e.g., IDOR, SQLi)", |
| "severity": "CRITICAL, HIGH, or MEDIUM", |
| "line": "Exact line of code that is vulnerable", |
| "impact": "Explain EXACTLY how a hacker exploits this specific line of code and the real-world damage.", |
| "remediation": "Provide the EXACT corrected code snippet to fix this." |
| }} |
| ], |
| "message": "Brief summary" |
| }} |
| """ |
| |
| try: |
| response = self.client.chat.completions.create( |
| model=self.model_id, |
| messages=[ |
| {"role": "system", "content": "You are a strict Source Code Security Auditor. Always output valid JSON."}, |
| {"role": "user", "content": prompt} |
| ], |
| response_format={"type": "json_object"}, |
| temperature=0.1 |
| ) |
| |
| clean_json = response.choices[0].message.content |
| return json.loads(clean_json) |
|
|
| except Exception as e: |
| error_msg = str(e) |
| if "Rate limit" in error_msg or "429" in error_msg: |
| return {"vulnerable": False, "findings": [], "message": "⚠️ API Quota Limit Exceeded."} |
| return {"vulnerable": False, "findings": [], "message": f"AI Error: {error_msg[:60]}..."} |