# modules/nuclei_engine.py import subprocess import json import os from utils import logger class NucleiEngine: def __init__(self, target_url, auth_header=None, logger_callback=None): self.target_url = target_url self.auth_header = auth_header self.log = logger_callback if logger_callback else logger.info def start_scan(self): self.log(f"☢️ [NUCLEI] Initiating Advanced CVE & Zero-Day Scan on {self.target_url}...") output_file = "nuclei_results.json" # Command: Scan for CVEs, High/Critical vulnerabilities, and Exposed Panels cmd = [ "nuclei", "-u", self.target_url, "-t", "cves,vulnerabilities,misconfiguration,exposed-panels", "-severity", "critical,high,medium", "-json-export", output_file, "-disable-update-check" ] if self.auth_header: # Inject the cookie into Nuclei so it scans authenticated areas! cmd.extend(["-H", f"Cookie: {self.auth_header}"]) findings = [] try: # Run Nuclei (Timeout after 5 minutes to keep it fast) subprocess.run(cmd, capture_output=True, text=True, timeout=300) # Parse the JSON results exported by Nuclei # Parse the JSON results exported by Nuclei if os.path.exists(output_file): with open(output_file, "r") as f: for line in f: try: parsed = json.loads(line.strip()) # Force everything into a list so we can loop safely items = parsed if isinstance(parsed, list) else [parsed] for data in items: if not isinstance(data, dict): continue info = data.get('info', {}) if not isinstance(info, dict): info = {} # Extract CVE ID if it exists cve_id = "" classification = info.get('classification', {}) if isinstance(classification, dict) and 'cve-id' in classification and classification['cve-id']: cve_id = f"[{classification['cve-id'][0]}] " poc_payload = data.get('curl-command', 'N/A') finding = { 'type': f"NUCLEI: {cve_id}{info.get('name', 'Unknown Vulnerability')}", 'severity': info.get('severity', 'high').upper(), 'url': data.get('matched-at', self.target_url), 'payload': data.get('extracted-results', [''])[0] if data.get('extracted-results') else data.get('matcher-name', 'Template Match'), 'proof_of_concept': poc_payload, 'impact': info.get('description', 'Exploitable CVE identified by Nuclei template engine.'), 'remediation': info.get('remediation', 'Apply the latest vendor patches immediately.') } findings.append(finding) except json.JSONDecodeError: continue os.remove(output_file) # Cleanup if findings: self.log(f"🔥 [NUCLEI] Critical Hit! Found {len(findings)} CVEs/Misconfigurations.") else: self.log("✅ [NUCLEI] No known CVEs detected on the external attack surface.") return findings except subprocess.TimeoutExpired: self.log("⚠️ [NUCLEI] Scan timed out. Returning partial results.") return findings except Exception as e: self.log(f"❌ [NUCLEI] Engine failure: {str(e)}") return []