Spaces:
Sleeping
Sleeping
File size: 5,144 Bytes
792e3b9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | import sys
import io
if sys.stdout.encoding != 'utf-8':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
from flask import Flask, request, jsonify
from flask_cors import CORS
import threading
import uuid
import time
import os
from agents.recon_agent import run as run_recon
from agents.vuln_scanner import run as run_vuln
from agents.report_writer import run as run_report
def sanitize_error(e):
error_str = str(e)
# Hide Groq organization IDs and technical rate limit details
if "rate_limit_exceeded" in error_str or "413" in error_str or "TPM" in error_str:
return "AI Rate Limit Exceeded (TPM). Please wait 1 minute and try again. The free-tier API is currently congested."
if "org_" in error_str:
import re
error_str = re.sub(r'org_[a-zA-Z0-9]+', '[REDACTED_ORG]', error_str)
# Generic clean up for other common API errors
if "Authentication" in error_str:
return "API Authentication Error. Please check your API keys in Settings."
return error_str
app = Flask(__name__)
CORS(app) # Enable CORS for the React frontend
@app.route('/', methods=['GET'])
def health_check():
return """
<html>
<head>
<title>AI Pentest API</title>
<style>
body { font-family: sans-serif; display: flex; justify-content: center; align-items: center; height: 100vh; background: #0f1117; color: #e2e4ef; margin: 0; }
.card { background: #1a1d27; padding: 40px; border-radius: 16px; border: 1px solid #2e3141; text-align: center; }
h1 { color: #7c6af7; margin-bottom: 10px; }
.status { color: #10b981; font-weight: bold; }
</style>
</head>
<body>
<div class="card">
<h1>AI Pentest Scanner Backend</h1>
<p>Status: <span class="status">RUNNING</span></p>
<p style="color: #8b8fa8; font-size: 0.9em;">API Endpoint: /api/scan</p>
</div>
</body>
</html>
"""
# In-memory storage for scan jobs (For production, use Redis/DB)
scan_jobs = {}
def execute_scan(job_id, target):
scan_jobs[job_id]['status'] = 'running'
scan_jobs[job_id]['progress'] = 'Initializing target acquisition and reconnaissance...'
def progress_cb(msg):
scan_jobs[job_id]['progress'] = msg
try:
# 1. Recon Phase
progress_cb('Engaging Multi-Agent Reconnaissance Protocol...')
recon_results = run_recon(target, progress_callback=progress_cb)
scan_jobs[job_id]['recon'] = recon_results
# 2. Vulnerability Scanning Phase
progress_cb('Commencing advanced heuristic vulnerability assessment...')
vuln_results = run_vuln(recon_results, progress_callback=progress_cb)
scan_jobs[job_id]['vulns'] = vuln_results
# 3. Final Report Generation Phase
progress_cb('Synthesizing professional security assessment report...')
# We mock exploit data for now as it's not always run
exploit_data = {"results": [], "session_start": time.strftime("%Y-%m-%d %H:%M:%S")}
report_results = run_report(recon_results, vuln_results, exploit_data)
scan_jobs[job_id]['report'] = report_results
scan_jobs[job_id]['status'] = 'completed'
scan_jobs[job_id]['progress'] = 'Scan complete. Professional report generated.'
except Exception as e:
scan_jobs[job_id]['status'] = 'failed'
scan_jobs[job_id]['error'] = sanitize_error(e)
scan_jobs[job_id]['progress'] = 'Security analysis terminated due to a critical exception.'
@app.route('/api/scan', methods=['POST'])
def start_scan():
data = request.json
target = data.get('target')
if not target:
return jsonify({'error': 'Target is required'}), 400
job_id = str(uuid.uuid4())
scan_jobs[job_id] = {
'target': target,
'status': 'pending',
'progress': 'Queued...',
'recon': {},
'vulns': {},
'timestamp': time.time()
}
# Run scan asynchronously to avoid HTTP timeouts
thread = threading.Thread(target=execute_scan, args=(job_id, target))
thread.daemon = True
thread.start()
return jsonify({'job_id': job_id, 'status': 'started'})
@app.route('/api/scan/<job_id>', methods=['GET'])
def get_scan_status(job_id):
job = scan_jobs.get(job_id)
if not job:
return jsonify({'error': 'Job not found'}), 404
return jsonify(job)
@app.route('/api/report/<job_id>', methods=['GET'])
def get_report(job_id):
job = scan_jobs.get(job_id)
if not job or 'report' not in job:
return jsonify({'error': 'Report not found'}), 404
html_path = job['report'].get('html_path')
if not html_path or not os.path.exists(html_path):
return jsonify({'error': 'Report file missing'}), 404
from flask import send_file
return send_file(html_path)
if __name__ == '__main__':
port = int(os.environ.get('PORT', 5000))
app.run(host='0.0.0.0', port=port, threaded=True)
|