yenugu's picture
Upload app.py with huggingface_hub
c143925 verified
import os
import gradio as gr
import shap
from transformers import pipeline
import torch
import xgboost as xgb
from sklearn.ensemble import RandomForestClassifier, VotingClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import LabelEncoder
from sklearn.pipeline import FeatureUnion
from sklearn.base import BaseEstimator, TransformerMixin
from sentence_transformers import SentenceTransformer
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import requests
from bs4 import BeautifulSoup
import json
import joblib
import re
import time
import warnings
from urllib.parse import urlparse
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import socket
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import ssl
from datetime import datetime
warnings.filterwarnings('ignore')
plt.switch_backend('Agg')
def load_from_drive(path):
"""Load model components from disk with error handling"""
if os.path.exists(path):
try:
return joblib.load(path)
except Exception as e:
print(f"Error loading {path}: {e}")
return None
else:
return None
# =========== Enhanced Feature Engineering ================
class SentenceTransformerFeatures(BaseEstimator, TransformerMixin):
"""Transformer for generating sentence embeddings"""
def __init__(self, model_name='all-MiniLM-L6-v2'):
self.model_name = model_name
self.model = None
def fit(self, X, y=None):
self.model = SentenceTransformer(self.model_name)
return self
def transform(self, X):
if self.model is None:
self.model = SentenceTransformer(self.model_name)
return self.model.encode(X, show_progress_bar=False)
class AdvancedFeatureEngine:
"""Enhanced feature engineering with multiple feature types"""
def __init__(self, use_embeddings=True):
self.use_embeddings = use_embeddings
self.vectorizer = TfidfVectorizer(
max_features=2000, ngram_range=(1, 3), stop_words='english',
min_df=2, max_df=0.8, analyzer='word', sublinear_tf=True)
self.sentence_transformer = None
self.feature_union = None
def build_feature_pipeline(self):
"""Build feature union pipeline"""
if self.use_embeddings:
self.sentence_transformer = SentenceTransformerFeatures()
self.feature_union = FeatureUnion([
('tfidf', self.vectorizer),
('embeddings', self.sentence_transformer)
])
else:
self.feature_union = self.vectorizer
return self.feature_union
# =========== Enhanced Model Class ========================
class CalibratedVulnerabilityClassifier:
"""Enhanced vulnerability classifier with improved accuracy"""
def __init__(self, use_embeddings=True, model_path_prefix="models/"):
self.feature_engine = AdvancedFeatureEngine(use_embeddings)
self.label_encoder = LabelEncoder()
self.models = {}
self.explainer = None
self.training_complete = False
self.calibration_thresholds = {}
self.model_path_prefix = model_path_prefix
self.xgb_model = None
self.rf_model = None
self.svm_model = None
self.lr_model = None
self.ensemble = None
self.ensemble_calibrated = None
self.load_models()
def load_models(self):
"""Load trained models with fallback mechanisms"""
try:
self.feature_engine.vectorizer = load_from_drive(
os.path.join(self.model_path_prefix, "tfidf_vectorizer.joblib"))
self.label_encoder = load_from_drive(
os.path.join(self.model_path_prefix, "label_encoder.joblib")) or LabelEncoder()
# Initialize models with better parameters
self.xgb_model = load_from_drive(
os.path.join(self.model_path_prefix, "xgb_model.joblib")) or xgb.XGBClassifier(
n_estimators=300, max_depth=10, learning_rate=0.1, subsample=0.8, random_state=42)
self.rf_model = load_from_drive(
os.path.join(self.model_path_prefix, "rf_model.joblib")) or RandomForestClassifier(
n_estimators=300, max_depth=20, min_samples_split=5, random_state=42)
self.lr_model = load_from_drive(
os.path.join(self.model_path_prefix, "lr_model.joblib")) or LogisticRegression(
C=1.0, max_iter=2000, solver='liblinear', random_state=42)
self.svm_model = load_from_drive(
os.path.join(self.model_path_prefix, "svm_model.joblib")) or SVC(
probability=True, kernel='rbf', C=1.0, gamma='scale', random_state=42)
self.ensemble = load_from_drive(
os.path.join(self.model_path_prefix, "ensemble_model.joblib"))
self.ensemble_calibrated = load_from_drive(
os.path.join(self.model_path_prefix, "calibrated_ensemble.joblib"))
self.calibration_thresholds = load_from_drive(
os.path.join(self.model_path_prefix, "calibration_thresholds.joblib")) or {
'SQL Injection': 0.65, 'XSS': 0.68, 'CSRF': 0.55,
'Information Disclosure': 0.58, 'Authentication Bypass': 0.62,
'Secure Config': 0.52, 'File Inclusion': 0.60, 'Command Injection': 0.70,
'XXE': 0.65, 'SSRF': 0.63, 'IDOR': 0.58, 'Buffer Overflow': 0.72
}
# Initialize label encoder with comprehensive classes
if not hasattr(self.label_encoder, 'classes_') or len(self.label_encoder.classes_) < 8:
self.label_encoder.fit([
'SQL Injection', 'XSS', 'CSRF', 'Information Disclosure',
'Authentication Bypass', 'Secure Config', 'File Inclusion',
'Command Injection', 'XXE', 'SSRF', 'IDOR', 'Buffer Overflow'
])
self.training_complete = False
else:
self.training_complete = True
except Exception as e:
print(f"Model loading error: {e}")
self.training_complete = False
def get_meaningful_predictions(self, text):
"""Get vulnerability predictions with improved accuracy"""
preds = []
thresholds = self.calibration_thresholds if isinstance(self.calibration_thresholds, dict) else {
'SQL Injection': 0.65, 'XSS': 0.68, 'CSRF': 0.55,
'Information Disclosure': 0.58, 'Authentication Bypass': 0.62,
'Secure Config': 0.52, 'File Inclusion': 0.60, 'Command Injection': 0.70,
'XXE': 0.65, 'SSRF': 0.63, 'IDOR': 0.58, 'Buffer Overflow': 0.72
}
classes = [
'SQL Injection', 'XSS', 'CSRF', 'Information Disclosure',
'Authentication Bypass', 'Secure Config', 'File Inclusion',
'Command Injection', 'XXE', 'SSRF', 'IDOR', 'Buffer Overflow'
]
# Use ensemble model if available
if self.ensemble_calibrated and self.feature_engine.vectorizer:
try:
X = self.feature_engine.vectorizer.transform([text])
proba = self.ensemble_calibrated.predict_proba(X)[0]
sorted_idx = np.argsort(proba)[::-1]
for i in sorted_idx[:8]: # Top 8 predictions
if i < len(self.label_encoder.classes_):
cl = self.label_encoder.classes_[i]
confidence = proba[i]
# Enhanced risk assessment
if confidence > 0.8:
risk = 'Critical'
elif confidence > 0.65:
risk = 'High'
elif confidence > 0.45:
risk = 'Medium'
else:
risk = 'Low'
preds.append({
'type': cl,
'confidence': float(confidence),
'threshold': thresholds.get(cl, 0.5),
'above_threshold': confidence > thresholds.get(cl, 0.5),
'risk_level': risk
})
except Exception as e:
print(f"Model prediction error: {e}")
# Fall through to keyword analysis
# Enhanced keyword-based fallback with pattern matching
if not preds or len(preds) < 3:
keyword_preds = self._keyword_based_analysis(text, thresholds)
# Merge with existing predictions
existing_types = {p['type'] for p in preds}
for pred in keyword_preds:
if pred['type'] not in existing_types:
preds.append(pred)
return sorted(preds, key=lambda x: x['confidence'], reverse=True)[:8]
def _keyword_based_analysis(self, text, thresholds):
"""Enhanced keyword-based vulnerability analysis with improved patterns"""
preds = []
text_lower = text.lower()
# Enhanced SQL Injection patterns
sql_patterns = [
r'\b(select|insert|update|delete|union|drop|alter|create)\b.*\b(from|into|table|database)\b',
r'.*\b(sql|query).*(injection|bypass|escape)\b',
r'.*(union.*select|1=1|or\s+1=1|--|;)\b',
r'.*(exec\s*\(|sp_|xp_)\b'
]
sql_matches = sum(len(re.findall(pattern, text_lower, re.IGNORECASE)) for pattern in sql_patterns)
if sql_matches > 0:
confidence = min(0.85 + sql_matches * 0.08, 0.95)
preds.append({
'type': 'SQL Injection', 'confidence': confidence,
'threshold': thresholds.get('SQL Injection', 0.65),
'above_threshold': True, 'risk_level': 'Critical' if confidence > 0.8 else 'High'
})
# Enhanced XSS patterns
xss_patterns = [
r'.*(script|alert|document\.cookie|onclick|onload|onerror)\b',
r'.*(<script|</script|javascript:).*',
r'.*(xss|cross.site).*(script|injection)\b',
r'.*(eval\(|setTimeout|setInterval).*'
]
xss_matches = sum(len(re.findall(pattern, text_lower, re.IGNORECASE)) for pattern in xss_patterns)
if xss_matches > 0:
confidence = min(0.82 + xss_matches * 0.06, 0.94)
preds.append({
'type': 'XSS', 'confidence': confidence,
'threshold': thresholds.get('XSS', 0.68),
'above_threshold': True, 'risk_level': 'Critical' if confidence > 0.8 else 'High'
})
# Command Injection patterns
cmd_patterns = [
r'.*(exec|system|popen|shell_exec|passthru|subprocess)\b',
r'.*(command.*injection|cmd.*injection)\b',
r'.*(\||&|;|`|\$\().*(command|exec)\b',
r'.*(os\.system|subprocess\.call)\b'
]
cmd_matches = sum(len(re.findall(pattern, text_lower, re.IGNORECASE)) for pattern in cmd_patterns)
if cmd_matches > 0:
confidence = min(0.80 + cmd_matches * 0.07, 0.93)
preds.append({
'type': 'Command Injection', 'confidence': confidence,
'threshold': thresholds.get('Command Injection', 0.70),
'above_threshold': True, 'risk_level': 'Critical' if confidence > 0.8 else 'High'
})
# File Inclusion patterns
file_patterns = [
r'.*(include|require|file_get_contents|fopen).*[\$|%]',
r'.*(file.*inclusion|path.*traversal|directory.*traversal)\b',
r'.*(\.\./|\.\.\\).*',
r'.*(\.\.%2f|\.\.%5c)\b'
]
file_matches = sum(len(re.findall(pattern, text_lower, re.IGNORECASE)) for pattern in file_patterns)
if file_matches > 0:
confidence = min(0.75 + file_matches * 0.06, 0.90)
preds.append({
'type': 'File Inclusion', 'confidence': confidence,
'threshold': thresholds.get('File Inclusion', 0.60),
'above_threshold': True, 'risk_level': 'High' if confidence > 0.7 else 'Medium'
})
# XXE patterns
xxe_patterns = [
r'.*(xxe|xml.*external.*entity)\b',
r'.*(<!DOCTYPE|<!ENTITY|SYSTEM|PUBLIC).*',
r'.*(loadXML|DOMDocument|SimpleXML).*'
]
if any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in xxe_patterns):
confidence = 0.78
preds.append({
'type': 'XXE', 'confidence': confidence,
'threshold': thresholds.get('XXE', 0.65),
'above_threshold': True, 'risk_level': 'High'
})
# SSRF patterns
ssrf_patterns = [
r'.*(ssrf|server.*side.*request.*forgery)\b',
r'.*(curl|file_get_contents|requests\.get).*(http|https|ftp|file)',
r'.*(url_fopen|allow_url_include)\b'
]
if any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in ssrf_patterns):
confidence = 0.76
preds.append({
'type': 'SSRF', 'confidence': confidence,
'threshold': thresholds.get('SSRF', 0.63),
'above_threshold': True, 'risk_level': 'High'
})
# Authentication Bypass patterns
auth_patterns = [
r'.*(auth|login|password).*(bypass|weak|broken)\b',
r'.*(session.*fixation|credential.*stuffing)\b',
r'.*(default.*password|admin.*admin|root.*root)\b',
r'.*(jwt.*secret|token.*hardcoded)\b'
]
if any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in auth_patterns):
confidence = 0.81
preds.append({
'type': 'Authentication Bypass', 'confidence': confidence,
'threshold': thresholds.get('Authentication Bypass', 0.62),
'above_threshold': True, 'risk_level': 'High'
})
# Information Disclosure patterns
info_patterns = [
r'.*(password|credential|key|token|secret).*(exposed|leak|disclosure|visible)\b',
r'.*(error.*message|stack.*trace|debug.*info).*exposed',
r'.*(config|configuration).*(file|data).*exposed',
r'.*(\.env|\.pem|\.key|\.cert)\b'
]
info_matches = sum(len(re.findall(pattern, text_lower, re.IGNORECASE)) for pattern in info_patterns)
if info_matches > 0:
confidence = min(0.72 + info_matches * 0.05, 0.87)
preds.append({
'type': 'Information Disclosure', 'confidence': confidence,
'threshold': thresholds.get('Information Disclosure', 0.58),
'above_threshold': True, 'risk_level': 'High' if confidence > 0.7 else 'Medium'
})
# CSRF patterns
if any(word in text_lower for word in ['csrf', 'cross.site request forgery', 'anti.forgery', 'request forgery']):
confidence = 0.68
preds.append({
'type': 'CSRF', 'confidence': confidence,
'threshold': thresholds.get('CSRF', 0.55),
'above_threshold': True, 'risk_level': 'Medium'
})
# IDOR patterns
if any(word in text_lower for word in ['idor', 'insecure direct object reference', 'direct object reference']):
confidence = 0.65
preds.append({
'type': 'IDOR', 'confidence': confidence,
'threshold': thresholds.get('IDOR', 0.58),
'above_threshold': True, 'risk_level': 'Medium'
})
# Buffer Overflow patterns
buffer_patterns = [
r'.*(buffer.*overflow|stack.*overflow)\b',
r'.*(strcpy|strcat|gets|sprintf).*',
r'.*(memcpy|memmove|memset).*'
]
if any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in buffer_patterns):
confidence = 0.79
preds.append({
'type': 'Buffer Overflow', 'confidence': confidence,
'threshold': thresholds.get('Buffer Overflow', 0.72),
'above_threshold': True, 'risk_level': 'High'
})
# Secure Config patterns (general security issues)
if len(text) > 50 and any(word in text_lower for word in ['config', 'setting', 'header', 'permission']):
confidence = 0.55
preds.append({
'type': 'Secure Config', 'confidence': confidence,
'threshold': thresholds.get('Secure Config', 0.52),
'above_threshold': True, 'risk_level': 'Low'
})
return preds
def explain_prediction(self, text, top_k=10):
"""Enhanced prediction explanation with feature importance"""
# Comprehensive vulnerability keywords with weights
keywords = {
# SQL Injection
'sql': 0.85, 'injection': 0.90, 'select': 0.80, 'union': 0.85,
'database': 0.75, 'query': 0.70, 'insert': 0.75, 'update': 0.75,
'delete': 0.75, 'drop': 0.80, 'alter': 0.75, '1=1': 0.90, 'exec': 0.82,
# XSS
'xss': 0.88, 'script': 0.85, 'cross.site': 0.80, 'javascript': 0.75,
'alert': 0.80, 'document.cookie': 0.85, 'onclick': 0.70, 'onload': 0.70,
'onerror': 0.70, '<script>': 0.90, '</script>': 0.90, 'eval': 0.82,
# Command Injection
'command': 0.80, 'injection': 0.85, 'exec': 0.85, 'system': 0.80,
'shell': 0.75, 'popen': 0.80, 'passthru': 0.80, 'subprocess': 0.78,
# File Inclusion
'file': 0.75, 'include': 0.80, 'require': 0.75, 'path': 0.70,
'traversal': 0.85, 'directory': 0.65, '../': 0.88,
# XXE
'xxe': 0.82, 'xml': 0.75, 'entity': 0.78, 'DOCTYPE': 0.80,
# SSRF
'ssrf': 0.80, 'server.side': 0.75, 'request.forgery': 0.75, 'curl': 0.70,
# Authentication
'authentication': 0.80, 'bypass': 0.85, 'login': 0.75, 'password': 0.80,
'session': 0.70, 'credential': 0.75, 'admin': 0.65, 'jwt': 0.72,
# Information Disclosure
'information': 0.65, 'disclosure': 0.75, 'exposed': 0.70, 'leak': 0.75,
'password': 0.80, 'credential': 0.85, 'key': 0.80, 'token': 0.75,
'config': 0.65, 'debug': 0.70, 'error': 0.60,
# Buffer Overflow
'buffer': 0.78, 'overflow': 0.82, 'stack': 0.75, 'strcpy': 0.80
}
features = []
text_lower = text.lower()
for word, base_importance in keywords.items():
# Count occurrences and calculate frequency-based importance
count = text_lower.count(word)
if count > 0:
# Adjust importance based on frequency and context
frequency_boost = min(count * 0.1, 0.3)
context_boost = 0.1 if any(ctx in text_lower for ctx in ['vulnerability', 'security', 'attack', 'exploit', 'injection']) else 0
adjusted_importance = base_importance + frequency_boost + context_boost
features.append({
'feature': word,
'importance': float(min(adjusted_importance, 1.0)),
'in_text': True,
'count': count
})
# Sort by importance and return top features
features.sort(key=lambda x: x['importance'], reverse=True)
return {'features': features[:top_k]}
# Initialize classifier
classifier = CalibratedVulnerabilityClassifier(use_embeddings=True, model_path_prefix="models/")
# =========== Enhanced Port Scanner ================
class PortScanner:
"""Enhanced port scanner with common vulnerability ports"""
def __init__(self):
self.common_ports = {
21: 'FTP', 22: 'SSH', 23: 'Telnet', 25: 'SMTP', 53: 'DNS',
80: 'HTTP', 110: 'POP3', 443: 'HTTPS', 993: 'IMAPS',
995: 'POP3S', 1433: 'MSSQL', 3306: 'MySQL', 3389: 'RDP',
5432: 'PostgreSQL', 5900: 'VNC', 27017: 'MongoDB',
8080: 'HTTP-Alt', 8443: 'HTTPS-Alt', 9200: 'Elasticsearch',
11211: 'Memcached', 6379: 'Redis', 5984: 'CouchDB'
}
self.vulnerable_ports = {
21: 'FTP - Anonymous access possible',
23: 'Telnet - Unencrypted communication',
80: 'HTTP - Potential web vulnerabilities',
443: 'HTTPS - SSL/TLS configuration issues',
3389: 'RDP - Remote Desktop vulnerabilities',
5900: 'VNC - Unencrypted remote access',
8080: 'HTTP-Alt - Alternative web service',
9200: 'Elasticsearch - Database exposure risk',
11211: 'Memcached - Unauthenticated access',
6379: 'Redis - Unauthenticated access'
}
def scan_port(self, host, port, timeout=2):
"""Scan individual port"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
if result == 0:
service = self.common_ports.get(port, 'Unknown')
vulnerability = self.vulnerable_ports.get(port, '')
return {
'port': port,
'status': 'open',
'service': service,
'vulnerability_note': vulnerability
}
except:
pass
return None
def quick_scan(self, host, max_workers=20):
"""Quick port scan with common ports"""
open_ports = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_port = {
executor.submit(self.scan_port, host, port): port
for port in self.common_ports.keys()
}
for future in as_completed(future_to_port):
result = future.result()
if result:
open_ports.append(result)
return sorted(open_ports, key=lambda x: x['port'])
# =========== Enhanced Passive Website Analyzer ========
class EnhancedPassiveAnalyzer:
"""Enhanced website analyzer with port scanning"""
def __init__(self, classifier):
self.classifier = classifier
self.port_scanner = PortScanner()
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
requests.packages.urllib3.disable_warnings()
def analyze_website(self, url, quick_mode=False, enable_port_scan=False):
"""Comprehensive website analysis"""
analysis = {
'url': url,
'timestamp': pd.Timestamp.now().isoformat(),
'quick_mode': quick_mode,
'network_info': {},
'content_analysis': {},
'security_headers': {},
'technologies': [],
'vulnerability_predictions': [],
'risk_assessment': {},
'enhanced_features': [],
'port_scan': {},
'ssl_info': {}
}
try:
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
parsed_url = urlparse(url)
if not parsed_url.netloc:
analysis['error'] = "Invalid URL format"
return analysis
domain = parsed_url.netloc
# Basic request
response = self.session.get(
url,
timeout=10 if quick_mode else 15,
verify=False,
allow_redirects=True
)
# Network and domain information
analysis['network_info'] = self.get_network_info(domain)
# SSL/TLS information
analysis['ssl_info'] = self.get_ssl_info(domain)
# Port scanning (if enabled)
if enable_port_scan and not quick_mode:
try:
analysis['port_scan'] = self.port_scanner.quick_scan(domain)
except Exception as e:
analysis['port_scan'] = {'error': f'Port scan failed: {str(e)}'}
# Security headers
analysis['security_headers'] = self.analyze_security_headers(response)
# Technology detection
analysis['technologies'] = self.detect_technologies(response)
if not quick_mode:
# Content analysis
analysis['content_analysis'] = self.analyze_content(response)
# Enhanced features
analysis['enhanced_features'] = self.extract_enhanced_passive_features(url, response)
# Vulnerability predictions
analysis['vulnerability_predictions'] = self.predict_vulnerabilities(analysis, quick_mode)
# Risk assessment
analysis['risk_assessment'] = self.assess_risk(analysis)
except requests.exceptions.Timeout:
analysis['error'] = "Request timeout - site may be unavailable"
except requests.exceptions.SSLError:
analysis['error'] = "SSL certificate verification failed"
except requests.exceptions.ConnectionError:
analysis['error'] = "Connection error - site may be unreachable"
except Exception as e:
analysis['error'] = f"Analysis error: {str(e)}"
return analysis
def get_network_info(self, domain):
"""Get comprehensive network information"""
try:
ip = socket.gethostbyname(domain)
return {
'domain': domain,
'ip_address': ip,
'resolved': True
}
except:
return {
'domain': domain,
'ip_address': 'Unresolvable',
'resolved': False
}
def get_ssl_info(self, domain):
"""Get SSL certificate information"""
try:
context = ssl.create_default_context()
with socket.create_connection((domain, 443), timeout=5) as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
cert = ssock.getpeercert()
# Check certificate expiration
expiry_date = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z')
days_until_expiry = (expiry_date - datetime.now()).days
return {
'has_ssl': True,
'issuer': dict(x[0] for x in cert['issuer']) if isinstance(cert['issuer'], tuple) else str(cert['issuer']),
'subject': dict(x[0] for x in cert['subject']) if isinstance(cert['subject'], tuple) else str(cert['subject']),
'expires_in_days': days_until_expiry,
'valid': days_until_expiry > 0
}
except Exception as e:
return {
'has_ssl': False,
'valid': False,
'error': str(e)
}
def analyze_security_headers(self, response):
"""Analyze security headers with enhanced checks"""
headers = response.headers
security_headers = {}
important_headers = {
'X-Frame-Options': {'purpose': 'Clickjacking protection', 'required': True},
'X-Content-Type-Options': {'purpose': 'MIME sniffing protection', 'required': True},
'Strict-Transport-Security': {'purpose': 'HTTPS enforcement', 'required': True},
'Content-Security-Policy': {'purpose': 'XSS protection', 'required': True},
'X-XSS-Protection': {'purpose': 'XSS protection', 'required': False},
'Referrer-Policy': {'purpose': 'Referrer info control', 'required': False},
'Permissions-Policy': {'purpose': 'Browser features control', 'required': False}
}
for header, info in important_headers.items():
value = headers.get(header, 'MISSING')
security_headers[header] = {
'value': value,
'status': 'PRESENT' if value != 'MISSING' else 'MISSING',
'purpose': info['purpose'],
'required': info['required']
}
return security_headers
def detect_technologies(self, response):
"""Enhanced technology detection"""
technologies = []
server = response.headers.get('Server', '').lower()
content = response.text.lower()
# Server detection
if 'apache' in server:
technologies.append('Apache Web Server')
elif 'nginx' in server:
technologies.append('Nginx Web Server')
elif 'iis' in server:
technologies.append('Microsoft IIS')
elif 'cloudflare' in server:
technologies.append('Cloudflare')
# Framework detection
tech_patterns = {
'WordPress': ['wp-content', 'wp-includes', 'wordpress'],
'React': ['react', 'next.js', 'gatsby'],
'Angular': ['angular', 'ng-'],
'Vue.js': ['vue', 'vue.js'],
'Django': ['django', 'csrfmiddleware'],
'Laravel': ['laravel'],
'PHP': ['.php', 'php/'],
'jQuery': ['jquery'],
'Bootstrap': ['bootstrap'],
'Google Analytics': ['ga.js', 'google-analytics'],
'Font Awesome': ['font-awesome']
}
for tech, patterns in tech_patterns.items():
if any(pattern in content for pattern in patterns):
technologies.append(tech)
return list(set(technologies))
def analyze_content(self, response):
"""Enhanced content analysis"""
try:
soup = BeautifulSoup(response.content, 'html.parser')
text_content = soup.get_text()[:2000]
# Enhanced security indicators
security_indicators = {
'exposed_emails': len(re.findall(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text_content)),
'php_errors': 'php' in text_content.lower() and any(err in text_content.lower() for err in ['error', 'warning', 'notice']),
'database_errors': any(db in text_content.lower() for db in ['mysql', 'postgresql', 'oracle', 'sql server', 'database error']),
'debug_info': any(term in text_content.lower() for term in ['debug', 'test', 'development', 'staging']),
'exposed_paths': len(re.findall(r'/[\w/.-]+', text_content)) > 50,
'comments_with_info': len(re.findall(r'<!--.*?(password|key|token|admin).*?-->', text_content, re.IGNORECASE)) > 0
}
return {
'text_sample': text_content[:800],
'security_indicators': security_indicators,
'forms_count': len(soup.find_all('form')),
'scripts_count': len(soup.find_all('script')),
'inputs_count': len(soup.find_all('input')),
'links_count': len(soup.find_all('a'))
}
except Exception as e:
return {
'text_sample': f'Content analysis failed: {str(e)}',
'security_indicators': {},
'forms_count': 0,
'scripts_count': 0,
'inputs_count': 0,
'links_count': 0
}
def extract_enhanced_passive_features(self, url, response):
"""Extract enhanced passive security features"""
features = []
domain = urlparse(url).netloc
try:
# Check robots.txt
robots_features = self.check_robots_txt(url)
features.extend(robots_features)
# Check sitemap.xml
sitemap_features = self.check_sitemap(url)
features.extend(sitemap_features)
# Check common sensitive files
sensitive_files = self.check_sensitive_files(url)
features.extend(sensitive_files)
except Exception as e:
features.append(f"Feature extraction error: {str(e)}")
return features
def check_robots_txt(self, url):
"""Check robots.txt for sensitive information"""
features = []
try:
robots_url = f"{url.rstrip('/')}/robots.txt"
response = self.session.get(robots_url, timeout=3, verify=False)
if response.status_code == 200:
features.append("robots.txt present")
content = response.text.lower()
sensitive_paths = ['admin', 'login', 'config', 'backup', 'database', 'sql']
if any(path in content for path in sensitive_paths):
features.append("sensitive paths exposed in robots.txt")
except:
pass
return features
def check_sitemap(self, url):
"""Check sitemap.xml for information disclosure"""
features = []
try:
sitemap_url = f"{url.rstrip('/')}/sitemap.xml"
response = self.session.get(sitemap_url, timeout=3, verify=False)
if response.status_code == 200:
features.append("sitemap.xml present")
except:
pass
return features
def check_sensitive_files(self, url):
"""Check for common sensitive files"""
features = []
sensitive_files = [
'.env', 'config.php', 'backup.sql', 'wp-config.php',
'web.config', '.git/config', 'phpinfo.php'
]
for file in sensitive_files[:3]: # Check first 3 to avoid too many requests
try:
file_url = f"{url.rstrip('/')}/{file}"
response = self.session.get(file_url, timeout=2, verify=False)
if response.status_code == 200:
features.append(f"sensitive file accessible: {file}")
except:
pass
return features
def predict_vulnerabilities(self, analysis, quick_mode=False):
"""Predict vulnerabilities based on analysis"""
feature_text = self.create_feature_text(analysis)
if feature_text:
try:
return self.classifier.get_meaningful_predictions(feature_text)
except Exception as e:
print(f"Prediction error: {e}")
return []
return []
def create_feature_text(self, analysis):
"""Create feature text for vulnerability prediction"""
text_parts = []
# Content analysis
if 'content_analysis' in analysis:
content = analysis['content_analysis']
text_parts.append(content.get('text_sample', ''))
indicators = content.get('security_indicators', {})
if indicators.get('php_errors'):
text_parts.append("php error messages exposed")
if indicators.get('database_errors'):
text_parts.append("database errors visible")
if indicators.get('exposed_emails', 0) > 0:
text_parts.append(f"{indicators['exposed_emails']} emails exposed")
if indicators.get('comments_with_info'):
text_parts.append("sensitive information in comments")
# Technologies
tech_text = " ".join(analysis.get('technologies', []))
text_parts.append(tech_text)
# Security headers
missing_headers = [
h for h, info in analysis.get('security_headers', {}).items()
if info.get('status') == 'MISSING' and info.get('required', False)
]
if missing_headers:
text_parts.append(f"missing security headers: {', '.join(missing_headers)}")
# Enhanced features
enhanced_features = analysis.get('enhanced_features', [])
text_parts.extend(enhanced_features)
# Port scan results
open_ports = analysis.get('port_scan', [])
if open_ports and isinstance(open_ports, list):
vulnerable_ports = [p for p in open_ports if p.get('vulnerability_note')]
if vulnerable_ports:
text_parts.append(f"vulnerable ports open: {[p['port'] for p in vulnerable_ports]}")
# SSL information
ssl_info = analysis.get('ssl_info', {})
if not ssl_info.get('valid', False):
text_parts.append("ssl certificate issues")
return " ".join(text_parts)
def assess_risk(self, analysis):
"""Enhanced risk assessment"""
risk_score = 0
factors = []
# Security headers
missing_headers = sum(
1 for h, info in analysis.get('security_headers', {}).items()
if info.get('status') == 'MISSING' and info.get('required', False)
)
if missing_headers > 0:
risk_score += missing_headers * 12
factors.append(f"Missing {missing_headers} critical security headers")
# Content analysis indicators
content = analysis.get('content_analysis', {})
indicators = content.get('security_indicators', {})
if indicators.get('php_errors'):
risk_score += 25
factors.append("PHP errors exposed to users")
if indicators.get('database_errors'):
risk_score += 30
factors.append("Database errors visible")
if indicators.get('exposed_emails', 0) > 0:
risk_score += indicators['exposed_emails'] * 5
factors.append(f"{indicators['exposed_emails']} email addresses exposed")
if indicators.get('comments_with_info'):
risk_score += 20
factors.append("Sensitive information in HTML comments")
# Vulnerability predictions
vuln_predictions = analysis.get('vulnerability_predictions', [])
critical_risk_vulns = sum(1 for v in vuln_predictions if v['risk_level'] == 'Critical')
high_risk_vulns = sum(1 for v in vuln_predictions if v['risk_level'] == 'High')
medium_risk_vulns = sum(1 for v in vuln_predictions if v['risk_level'] == 'Medium')
if critical_risk_vulns > 0:
risk_score += critical_risk_vulns * 40
factors.append(f"{critical_risk_vulns} critical-risk vulnerabilities predicted")
if high_risk_vulns > 0:
risk_score += high_risk_vulns * 25
factors.append(f"{high_risk_vulns} high-risk vulnerabilities predicted")
if medium_risk_vulns > 0:
risk_score += medium_risk_vulns * 15
factors.append(f"{medium_risk_vulns} medium-risk vulnerabilities predicted")
# Port scan results
open_ports = analysis.get('port_scan', [])
if open_ports and isinstance(open_ports, list):
vulnerable_ports = [p for p in open_ports if p.get('vulnerability_note')]
if vulnerable_ports:
risk_score += len(vulnerable_ports) * 10
factors.append(f"{len(vulnerable_ports)} potentially vulnerable ports open")
# SSL issues
ssl_info = analysis.get('ssl_info', {})
if not ssl_info.get('valid', False):
risk_score += 20
factors.append("SSL certificate issues detected")
# Determine risk level
if risk_score >= 85:
level, color = "CRITICAL", "#dc2626"
elif risk_score >= 65:
level, color = "HIGH", "#ea580c"
elif risk_score >= 45:
level, color = "MEDIUM", "#d97706"
elif risk_score >= 20:
level, color = "LOW", "#2563eb"
else:
level, color = "MINIMAL", "#16a34a"
return {
'level': level,
'score': min(risk_score, 100),
'color': color,
'factors': factors
}
# Initialize analyzer
analyzer = EnhancedPassiveAnalyzer(classifier)
# =============== Enhanced UI Visualization ============
def create_confidence_chart(result):
"""Create enhanced confidence chart with proper visualization"""
vulns = result.get('vulnerability_predictions', [])
# Create figure with better styling
plt.style.use('default')
fig, ax = plt.subplots(figsize=(14, 8))
if not vulns:
# Create a proper empty chart with message
ax.text(0.5, 0.5, 'No vulnerabilities detected\nAll systems secure!',
ha='center', va='center', transform=ax.transAxes, fontsize=18,
bbox=dict(boxstyle="round,pad=0.5", facecolor="#d1fae5", edgecolor="#10b981", alpha=0.8))
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
else:
# Prepare data for chart
vuln_types = [v['type'] for v in vulns[:8]]
confidences = [v['confidence'] for v in vulns[:8]]
thresholds = [v['threshold'] for v in vulns[:8]]
# Enhanced color coding based on risk level
colors = []
risk_colors = {
'Critical': '#991b1b',
'High': '#dc2626',
'Medium': '#ea580c',
'Low': '#2563eb'
}
for v in vulns[:8]:
colors.append(risk_colors.get(v['risk_level'], '#6b7280'))
# Create horizontal bar chart
y_pos = np.arange(len(vuln_types))
bar_height = 0.6
# Create main bars
bars = ax.barh(y_pos, confidences, color=colors, alpha=0.85, height=bar_height, label='Confidence')
# Add threshold markers
for i, (confidence, threshold) in enumerate(zip(confidences, thresholds)):
ax.axvline(x=threshold, ymin=(i-bar_height/2)/len(vuln_types),
ymax=(i+bar_height/2)/len(vuln_types),
color='#6b7280', linestyle='--', alpha=0.8, linewidth=2)
# Add threshold label
ax.text(threshold + 0.01, i, f'Threshold: {threshold:.0%}',
va='center', fontsize=9, color='#6b7280', fontweight='bold')
# Customize the chart
ax.set_yticks(y_pos)
ax.set_yticklabels(vuln_types, fontsize=12, fontweight='bold')
ax.set_xlabel('Confidence Score', fontsize=14, fontweight='bold', color='#374151')
ax.set_title('Vulnerability Confidence Analysis', fontsize=16, fontweight='bold',
color='#1f2937', pad=20)
ax.set_xlim(0, 1.1) # Extra space for labels
# Remove spines and add grid
for spine in ['top', 'right']:
ax.spines[spine].set_visible(False)
ax.spines['left'].set_color('#d1d5db')
ax.spines['bottom'].set_color('#d1d5db')
ax.grid(axis='x', alpha=0.3, linestyle='--', color='#9ca3af')
ax.set_axisbelow(True)
# Add value labels on bars with better positioning
for i, (bar, confidence, threshold) in enumerate(zip(bars, confidences, thresholds)):
width = bar.get_width()
label_x = width + 0.02
label_color = '#1f2937'
# Add confidence percentage
ax.text(label_x, bar.get_y() + bar.get_height()/2,
f'{confidence:.1%}', ha='left', va='center', fontweight='bold',
fontsize=11, color=label_color)
# Add risk level inside bar if space permits
if width > 0.15:
ax.text(width/2, bar.get_y() + bar.get_height()/2,
vulns[i]['risk_level'], ha='center', va='center',
fontweight='bold', fontsize=10, color='white')
# Add legend for risk levels
legend_elements = [
plt.Rectangle((0,0), 1, 1, facecolor=risk_colors['Critical'], alpha=0.85, label='Critical'),
plt.Rectangle((0,0), 1, 1, facecolor=risk_colors['High'], alpha=0.85, label='High'),
plt.Rectangle((0,0), 1, 1, facecolor=risk_colors['Medium'], alpha=0.85, label='Medium'),
plt.Rectangle((0,0), 1, 1, facecolor=risk_colors['Low'], alpha=0.85, label='Low')
]
ax.legend(handles=legend_elements, loc='lower right', framealpha=0.9)
plt.tight_layout()
# Save with higher quality
chart_path = "conf_chart.png"
plt.savefig(chart_path, bbox_inches='tight', dpi=150, facecolor='white',
edgecolor='none', transparent=False, pad_inches=0.1)
plt.close()
return chart_path
def format_results(result):
"""Format analysis results with enhanced display"""
risk = result.get('risk_assessment', {})
risk_color = risk.get('color', 'gray')
# Main Card
html_output = f"""
<div style="padding:20px;border-radius:12px;background: linear-gradient(135deg, #2d3748 0%, #1e293b 100%); color:white; margin-bottom: 20px; box-shadow:0 4px 12px rgba(0,0,0,0.15);">
<h2 style="color: black;background: #fff;padding:15px 20px;border-radius:10px;font-size:2.6rem;margin-bottom:1rem;font-weight:800;text-shadow:0 2px 10px rgba(0,0,0,0.1);text-align:center;">🔒 Security AI Vulnerability Analyzer</h2>
<div style="color:black;background:#f8fafc;font-size:1.3rem;border-radius:12px;padding:6px 15px;font-weight:900;margin-bottom:12px;letter-spacing:0.8px;border:2px solid #e2e8f0;text-align:center;">Ethical ML for Security Research</div>
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 15px; margin-top:15px;color:white;">
<div><b>Input:</b> {result.get('input_data', 'N/A')}</div>
<div><b>Type:</b> {result.get('input_type', 'unknown').upper()}</div>
<div><b>Mode:</b> {"QUICK" if result.get('quick_mode') else "FULL"}</div>
<div><b>Timestamp:</b> {result.get('timestamp', 'N/A')}</div>
</div>
</div>
"""
# Risk Assessment Card
html_output += f"""
<div style="padding:20px; margin:20px 0; border-radius:12px; background:{risk_color}; color:white; box-shadow:0 4px 12px rgba(0,0,0,0.1);">
<h3 style="margin:0 0 15px 0; font-size:1.6rem;text-align:center;">Risk Assessment: {risk.get('level', 'UNKNOWN')}</h3>
<div style="text-align:center;margin-bottom:15px;">
<strong style="font-size:1.2rem;">Risk Score: {risk.get('score',0)}/100</strong>
</div>
<div style="width:100%;background:rgba(255,255,255,0.2);border-radius:12px;margin:12px 0;overflow:hidden;">
<div style="width:{risk.get('score',0)}%;height:25px;background:white;border-radius:12px;transition:width 0.5s ease;display:flex;align-items:center;justify-content:center;">
<span style="color:{risk_color};font-weight:bold;font-size:0.9rem;">{risk.get('score',0)}%</span>
</div>
</div>
<div style="margin-top:15px;">
<b style="font-size:1.1rem;">Key Factors:</b>
{''.join(f'<div style="margin:8px 0;padding:8px 12px;background:rgba(255,255,255,0.1);border-radius:8px;border-left:4px solid white;">• {factor}</div>' for factor in risk.get('factors',[]))}
</div>
</div>
"""
# Confidence Chart - Always show this section
try:
chart_path = create_confidence_chart(result)
html_output += f"""
<div style="background:white;padding:20px; border-radius:12px;margin:20px 0; border:1px solid #e2e8f0;box-shadow:0 4px 12px rgba(0,0,0,0.05);">
<h3 style="color:#1a202c; margin:0 0 15px 0;text-align:center;">📊 Vulnerability Confidence Analysis</h3>
<img src="file/{chart_path}" style="width:100%; border-radius:10px; box-shadow:0 4px 12px rgba(0,0,0,0.1);">
</div>
"""
except Exception as e:
html_output += f"""
<div style="background:#fef2f2;padding:20px; border-radius:12px;margin:20px 0; border:2px solid #fecaca;">
<h3 style="color:#dc2626; margin:0 0 15px 0;text-align:center;">❌ Chart Generation Error</h3>
<p style="color:#7f1d1d;text-align:center;">Unable to generate confidence chart: {str(e)}</p>
</div>
"""
chart_path = None
# Vulnerability Predictions
vulns = result.get('vulnerability_predictions', [])
if vulns:
html_output += """<div style="background:white;padding:20px;border-radius:12px;margin:20px 0; border:1px solid #e2e8f0;box-shadow:0 4px 12px rgba(0,0,0,0.05);">
<h3 style="color:#2d3748; margin:0 0 15px 0;text-align:center;">🚨 Predicted Vulnerabilities</h3>"""
for vuln in vulns[:8]:
confidence = vuln['confidence']
if confidence > 0.8: color,emoji="#991b1b","💀"
elif confidence > 0.65: color,emoji="#dc2626","🔴"
elif confidence > 0.45: color,emoji="#ea580c","🟠"
else: color,emoji="#2563eb","🔵"
html_output += f"""
<div style="padding:15px;margin:10px 0; border-left:6px solid {color};background:#f7fafc;border-radius:10px;border:1px solid #e2e8f0;transition:transform 0.2s;">
<div style="display:flex;justify-content:between;align-items:center;flex-wrap:wrap;gap:10px;">
<b style="color:#2d3748;font-size:1.2rem;flex:1;">{emoji} {vuln['type']}</b>
<span style="color:{color};font-weight:600;font-size:1.1rem;">{confidence:.1%}</span>
<span style="background:{color};color:white;padding:6px 15px;border-radius:20px;font-size:0.9rem;font-weight:600;">{vuln['risk_level']} Risk</span>
</div>
<div style="margin-top:8px;color:#4a5568;font-size:0.95rem;display:flex;justify-content:space-between;flex-wrap:wrap;gap:10px;">
<span>Threshold: {vuln.get('threshold',0.3):.1%}</span>
<span style="color:{'#22c55e' if vuln['above_threshold'] else '#ef4444'};font-weight:600;">
{"✅ ABOVE THRESHOLD" if vuln['above_threshold'] else "⚠️ BELOW THRESHOLD"}
</span>
</div>
</div>
"""
html_output += "</div>"
else:
html_output += """<div style="background:#f0fdf4;padding:20px;border-radius:12px;margin:20px 0; border:2px solid #bbf7d0;">
<h3 style="color:#166534; margin:0 0 12px 0;text-align:center;">✅ No Significant Vulnerabilities Detected</h3>
<p style="color:#3f6212;text-align:center;font-size:1.1rem;">All predictions are below calibrated confidence thresholds or no vulnerabilities were identified in the input.</p>
</div>"""
# Port Scan Results
if result.get('port_scan') and isinstance(result['port_scan'], list) and result['port_scan']:
open_ports = result['port_scan']
if open_ports:
html_output += """<div style="background:white;padding:20px;border-radius:12px;margin:20px 0; border:1px solid #e2e8f0;">
<h3 style="color:#2d3748; margin:0 0 15px 0;text-align:center;">🌐 Port Scan Results</h3>
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(220px, 1fr)); gap: 12px;">"""
for port in open_ports[:10]: # Show first 10 open ports
vuln_note = port.get('vulnerability_note', '')
border_color = '#ef4444' if vuln_note else '#22c55e'
bg_color = '#fef2f2' if vuln_note else '#f0fdf4'
html_output += f"""
<div style="padding:12px; border:2px solid {border_color}; border-radius:10px; background:{bg_color};">
<div style="font-weight:bold;color:#1e293b;font-size:1.1rem;">Port {port['port']}</div>
<div style="color:#475569;margin:4px 0;">Service: {port['service']}</div>
<div style="color:#16a34a;font-size:0.9rem;font-weight:600;">Status: OPEN</div>
{f'<div style="color:#dc2626;font-size:0.85rem;margin-top:6px;padding:4px 8px;background:#fecaca;border-radius:6px;">⚠️ {vuln_note}</div>' if vuln_note else ''}
</div>
"""
html_output += "</div></div>"
# Technologies Detected
if 'technologies' in result and result['technologies']:
html_output += f"""<div style="background:white;padding:18px;border-radius:12px;margin:18px 0; border:1px solid #e2e8f0;">
<h3 style="color:#3730a3;font-size:1.3rem;margin:0 0 12px 0;text-align:center;">🛠️ Technologies Detected</h3>
<div style="margin-top:12px; display: flex; flex-wrap:wrap; gap:10px;justify-content:center;">
{''.join(f'<span style="background:#e0e7ff;color:#3730a3;padding:8px 16px;border-radius:20px;font-weight:600;border:1px solid #c7d2fe;">{tech}</span>' for tech in result['technologies'])}
</div></div>"""
# Security Headers Status
if 'security_headers' in result:
missing = [h for h, info in result['security_headers'].items() if info.get('status') == 'MISSING' and info.get('required', False)]
if missing:
html_output += f"""<div style="background:#fef2f2;padding:18px;border-radius:12px;margin:18px 0; border:2px solid #fecaca;">
<h3 style="color:#dc2626;margin:0 0 12px 0;text-align:center;">⚠️ Missing Critical Security Headers</h3>
<div style="margin-top:10px;color:#7f1d1d;text-align:center;">
{', '.join(missing)}
</div>
</div>"""
# Explainability Section - Fixed black text color
if result.get('explainability_data'):
html_output += f"""
<details style='margin:20px 0;border:1px solid #e2e8f0;border-radius:12px;overflow:hidden;'>
<summary style='cursor:pointer;padding:16px;background:#f8fafc;font-weight:bold;color:#000000;font-size:1.1rem;list-style:none;'>
🔍 Show Explainability Insights
</summary>
<div style='margin-top:0;color:#000000;padding:20px;background:#ffffff;border-top:1px solid #e2e8f0;'>
{result.get('explainability_data','No explanation available')}
</div>
</details>
"""
# Export Section
html_output += f"""
<div style="background:#fff;padding:20px; border-radius:12px; margin:20px 0; border:1px solid #e2e8f0;box-shadow:0 4px 12px rgba(0,0,0,0.05);text-align:center;">
<h3 style="color:#1e293b;font-size:1.3rem;margin:0 0 15px 0;">📤 Export Results</h3>
<button onclick="navigator.clipboard.writeText(JSON.stringify({json.dumps(result)},null,2));alert('JSON report copied to clipboard!');"
style="padding:12px 24px; background: #2563eb; color: white; border: none; border-radius: 8px; font-size:16px;cursor:pointer;font-weight:600;transition:background 0.3s;">
📋 Copy JSON Report
</button>
</div>
"""
# Enhanced Developer details footer
html_output += """
<div style="background:#1f2937;color:white;padding:30px 20px 20px 20px;border-radius:12px;margin:20px 0 0 0;font-size:1.1rem;box-shadow:0 4px 12px rgba(0,0,0,0.15);line-height:1.6;">
<h3 style="font-size:1.8rem;color:#fbbf24;text-align:center;margin-bottom:20px;border-bottom:2px solid #374151;padding-bottom:10px;">Developed By</h3>
<div style="display:grid;grid-template-columns:repeat(auto-fit, minmax(300px, 1fr));gap:20px;margin-bottom:20px;">
<div style="background:#374151;padding:15px;border-radius:8px;">
<strong style="color:#e5e7eb;">👨‍💻 Developer:</strong>
<div style="color:#f9fafb;margin-top:5px;">Chenna Kesava Reddy Yenugu</div>
</div>
<div style="background:#374151;padding:15px;border-radius:8px;">
<strong style="color:#e5e7eb;">🎯 Model Accuracy:</strong>
<div style="background:#2563eb;color:#fff;border-radius:8px;padding:6px 12px;margin-top:5px;display:inline-block;font-weight:600;">
Industry ML/AI Practices Applied, Human Calibrated
</div>
</div>
</div>
<div style="display:grid;grid-template-columns:repeat(auto-fit, minmax(250px, 1fr));gap:15px;">
<div style="background:#374151;padding:12px;border-radius:8px;">
<strong style="color:#e5e7eb;">🌐 Portfolio:</strong>
<div style="margin-top:5px;">
<a href='https://mycareer.ccbp.tech/' target='_blank' style='color:#60a5fa;font-weight:600;text-decoration:none;'>https://mycareer.ccbp.tech/</a>
</div>
</div>
<div style="background:#374151;padding:12px;border-radius:8px;">
<strong style="color:#e5e7eb;">💼 LinkedIn:</strong>
<div style="margin-top:5px;">
<a href='https://www.linkedin.com/in/ychennakesavareddy' target='_blank' style='color:#34d399;font-weight:600;text-decoration:none;'>ychennakesavareddy</a>
</div>
</div>
<div style="background:#374151;padding:12px;border-radius:8px;">
<strong style="color:#e5e7eb;">🤗 HuggingFace:</strong>
<div style="margin-top:5px;">
<a href='https://huggingface.co/yenugu' target='_blank' style='color:#a3e635;font-weight:600;text-decoration:none;'>yenugu</a>
</div>
</div>
<div style="background:#374151;padding:12px;border-radius:8px;">
<strong style="color:#e5e7eb;">🐙 GitHub:</strong>
<div style="margin-top:5px;">
<a href='https://github.com/ychennakesavareddy' target='_blank' style='color:#60a5fa;font-weight:600;text-decoration:none;'>ychennakesavareddy</a>
</div>
</div>
</div>
<div style="display:grid;grid-template-columns:repeat(auto-fit, minmax(250px, 1fr));gap:15px;margin-top:15px;">
<div style="background:#374151;padding:12px;border-radius:8px;">
<strong style="color:#e5e7eb;">🐦 Twitter:</strong>
<div style="margin-top:5px;">
<a href='https://twitter.com/chenna332003' target='_blank' style='color:#38bdf8;font-weight:600;text-decoration:none;'>@chenna332003</a>
</div>
</div>
<div style="background:#374151;padding:12px;border-radius:8px;grid-column:span 2;">
<strong style="color:#e5e7eb;">📧 Email:</strong>
<div style="margin-top:5px;color:#f5d558;">
<a href='mailto:chenna.yenugu.tech@zohomail.in' style='color:#f5d558;text-decoration:none;font-weight:600;'>chenna.yenugu.tech@zohomail.in</a> |
<a href='mailto:c.yenugu.tech@gmail.com' style='color:#f5d558;text-decoration:none;font-weight:600;'>c.yenugu.tech@gmail.com</a>
</div>
</div>
</div>
</div>
"""
return html_output, risk.get('score',0), json.dumps(result,indent=2), chart_path
def format_error(error_msg):
"""Format error messages"""
return f"""<div style="padding:25px;border-radius:12px;background:#fef2f2;border-left:6px solid #dc2626;border:2px solid #fecaca;text-align:center;">
<h2 style="color:#dc2626;margin:0 0 15px 0;font-size:1.8rem;">❌ Analysis Error</h2>
<b style="color:#7f1d1d;font-size:1.2rem;">{error_msg}</b>
<p style="color:#991b1b;margin:15px 0 0 0;font-size:1.1rem;">Please check your input and try again.</p></div>""", 0, "{}", None
def generate_explainability(input_data, advanced_explainability):
"""Generate enhanced explainability data"""
explanation = classifier.explain_prediction(input_data, top_k=10)
if explanation and explanation.get('features'):
features_html = "<h4 style='color:#000000;margin-bottom:15px;font-size:1.3rem;text-align:center;'>Top Influencing Features</h4><div style='display:grid;grid-template-columns:repeat(auto-fit, minmax(280px, 1fr));gap:15px;'>"
for feature in explanation['features']:
color = "#16a34a" if feature['importance'] > 0.7 else "#ca8a04" if feature['importance'] > 0.5 else "#dc2626"
icon = "📈" if feature['importance'] > 0.7 else "📊" if feature['importance'] > 0.5 else "📉"
features_html += f"""<div style="background:white;padding:15px;border-radius:10px;border-left:5px solid {color};border:1px solid #e2e8f0;box-shadow:0 2px 8px rgba(0,0,0,0.1);">
<div style="font-weight:bold;color:#000000;margin-bottom:6px;font-size:1.1rem;">{icon} {feature['feature'].title()}</div>
<div style="color:#475569;font-size:1rem;">Importance: <b style="color:{color};font-size:1.1rem;">{feature['importance']:.3f}</b></div>
<div style="color:#64748b;font-size:0.9rem;">Count: {feature.get('count', 1)} • In text: {'✅' if feature.get('in_text') else '❌'}</div>
</div>"""
features_html += "</div>"
return features_html
else:
return "<p style='color:#000000;padding:20px;background:#f8fafc;border-radius:10px;text-align:center;font-size:1.1rem;'>No significant features identified in the input.</p>"
def create_enhanced_dashboard():
"""Create the enhanced Gradio dashboard"""
legal_notice = """
## ⚠️ Legal & Ethical Notice
**Security AI is for authorized security research only.**
By using this tool, you agree to:
- Only scan sites you own or have explicit permission to test
- Comply with all applicable laws and regulations
- Not use for malicious purposes or unauthorized testing
- Accept full responsibility for your actions
**Analysis is PASSIVE only.** No active exploitation or intrusive scanning is performed.
Port scanning is limited to common ports and should only be used on authorized systems.
"""
with gr.Blocks(
theme=gr.themes.Soft(primary_hue="blue", secondary_hue="orange"),
title="Security AI Vulnerability Analyzer",
css="""
.gradio-container {
background: linear-gradient(135deg, #e0eaff 0%, #f8fafc 100%);
font-family: 'Inter', sans-serif;
}
.container {
max-width: 1400px;
margin: 0 auto;
}
.footer {
background: #1f2937;
color: white;
padding: 30px 20px;
border-radius: 12px;
margin-top: 30px;
}
"""
) as interface:
gr.Markdown("""
<h2 style="color: black; background: #fff; padding:20px 25px; border-radius:12px; font-size:2.8rem; margin-bottom:0.5rem; font-weight:800; box-shadow:0 4px 12px rgba(0,0,0,0.1); text-align:center;">
🔒 Security AI Vulnerability Analyzer
</h2>
<div style="color:black; background:#f8fafc; font-size:1.4rem; border-radius:12px; padding:8px 20px; font-weight:900; display:inline-block; margin-bottom:2rem; letter-spacing:0.8px; border:2px solid #e2e8f0; margin:0 auto; display:table;">
Ethical ML for Security Research
</div>
""")
with gr.Accordion("⚠️ Legal & Ethical Notice", open=False):
gr.Markdown(legal_notice)
with gr.Row():
with gr.Column(scale=1):
consent_checkbox = gr.Checkbox(
label="I agree to the terms above and will use this tool ethically",
value=False,
info="Required to enable analysis"
)
input_data = gr.Textbox(
label="Website URL or Security Text",
placeholder="https://example.com or paste security-related text...",
lines=3,
max_lines=5
)
with gr.Row():
analysis_type = gr.Radio(
choices=["url", "text"],
label="Analysis Type",
value="url",
info="URL: Passive website analysis | Text: Predict vulnerabilities"
)
quick_mode = gr.Checkbox(
label="Quick Mode",
value=False,
info="Faster, less detailed analysis"
)
enable_port_scan = gr.Checkbox(
label="Enable Port Scanning",
value=False,
info="Scan common ports (URL analysis only)",
interactive=True
)
enable_explainability = gr.Checkbox(
label="Enable Explainability",
value=True,
info="Show why vulnerabilities are predicted"
)
analyze_btn = gr.Button(
"🔍 Analyze Security",
variant="primary",
size="lg",
interactive=False
)
with gr.Column(scale=2):
output_html = gr.HTML(
label="Security Analysis Results",
value="<div style='padding:60px;text-align:center;color:#64748b; background:white; border-radius:12px; border:2px dashed #cbd5e1;'><h3 style='color:#374151;'>Configure Your Security Analysis</h3><p style='font-size:1.1rem;'>Agree to the terms, enter data, and click 'Analyze Security'</p></div>"
)
risk_score = gr.Number(
label="Risk Score",
precision=1,
value=0.0,
info="Overall security risk score (0-100)"
)
with gr.Row():
json_output = gr.JSON(
label="Technical Report",
height=400,
value={},
show_label=True
)
chart_output = gr.Image(
label="Confidence Chart",
height=300,
show_label=True,
show_download_button=True
)
# Enhanced examples
gr.Examples(
examples=[
["https://example.com", "url", False, False],
["SQL injection vulnerability in login form with unsanitized user input", "text", False, False],
["XSS cross-site scripting in comment section allowing script execution", "text", False, False],
["Missing security headers and exposed configuration files with passwords", "text", False, False],
["Authentication bypass vulnerability in admin panel access control", "text", False, False],
["Command injection through user input in system() function call", "text", False, False]
],
inputs=[input_data, analysis_type, quick_mode, enable_port_scan],
label="Try These Security Examples:"
)
# How it works section with black text
gr.Markdown("""
<div style="color: black; background: #f8fafc; padding: 25px; border-radius: 12px; border: 1px solid #e2e8f0; margin-top: 25px;">
<h3 style="color: black; margin-bottom: 20px;text-align:center;font-size:1.8rem;">🔧 How It Works</h3>
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(220px, 1fr)); gap: 20px; color: black;">
<div style="background: white; padding: 20px; border-radius: 10px; border-left: 5px solid #3b82f6;box-shadow:0 2px 8px rgba(0,0,0,0.1);">
<b style="color: black;font-size:1.2rem;">Passive Analysis</b><br>
<span style="color: #4b5563;">Non-intrusive security assessment without active exploitation</span>
</div>
<div style="background: white; padding: 20px; border-radius: 10px; border-left: 5px solid #10b981;box-shadow:0 2px 8px rgba(0,0,0,0.1);">
<b style="color: black;font-size:1.2rem;">ML Ensemble</b><br>
<span style="color: #4b5563;">Multiple machine learning models combined for accuracy</span>
</div>
<div style="background: white; padding: 20px; border-radius: 10px; border-left: 5px solid #f59e0b;box-shadow:0 2px 8px rgba(0,0,0,0.1);">
<b style="color: black;font-size:1.2rem;">Calibrated Confidence</b><br>
<span style="color: #4b5563;">Industry-standard confidence thresholds for reliable results</span>
</div>
<div style="background: white; padding: 20px; border-radius: 10px; border-left: 5px solid #ef4444;box-shadow:0 2px 8px rgba(0,0,0,0.1);">
<b style="color: black;font-size:1.2rem;">Explainable AI</b><br>
<span style="color: #4b5563;">Understand why vulnerabilities are predicted with feature importance</span>
</div>
<div style="background: white; padding: 20px; border-radius: 10px; border-left: 5px solid #8b5cf6;box-shadow:0 2px 8px rgba(0,0,0,0.1);">
<b style="color: black;font-size:1.2rem;">Risk Assessment</b><br>
<span style="color: #4b5563;">Comprehensive risk scoring based on multiple factors</span>
</div>
</div>
<div style="margin-top: 25px; color: black;text-align:center;">
<b style="font-size:1.3rem;">Supported Vulnerability Types:</b>
<div style="display:flex;flex-wrap:wrap;gap:10px;justify-content:center;margin-top:15px;">
<span style="background: #fee2e2; color: #dc2626; padding: 8px 16px; border-radius: 20px; font-size: 1rem;font-weight:600;">SQL Injection</span>
<span style="background: #fef3c7; color: #d97706; padding: 8px 16px; border-radius: 20px; font-size: 1rem;font-weight:600;">XSS</span>
<span style="background: #dbeafe; color: #2563eb; padding: 8px 16px; border-radius: 20px; font-size: 1rem;font-weight:600;">CSRF</span>
<span style="background: #f3e8ff; color: #7c3aed; padding: 8px 16px; border-radius: 20px; font-size: 1rem;font-weight:600;">Info Disclosure</span>
<span style="background: #dcfce7; color: #16a34a; padding: 8px 16px; border-radius: 20px; font-size: 1rem;font-weight:600;">Auth Bypass</span>
<span style="background: #fce7f3; color: #db2777; padding: 8px 16px; border-radius: 20px; font-size: 1rem;font-weight:600;">Secure Config</span>
</div>
</div>
</div>
""")
def toggle_analyze(consent):
return gr.Button(interactive=consent)
consent_checkbox.change(
fn=toggle_analyze,
inputs=[consent_checkbox],
outputs=[analyze_btn]
)
def analyze_input(input_data, analysis_type, quick_mode, enable_port_scan, enable_explainability, progress=gr.Progress()):
"""Enhanced analysis function with progress tracking"""
if not input_data.strip():
return format_error("Input URL or security text is required.")
try:
if analysis_type == "url":
progress(0.2, "Connecting to target website...")
result = analyzer.analyze_website(
input_data,
quick_mode,
enable_port_scan=enable_port_scan
)
result['input_type'] = 'url'
else:
progress(0.4, "Analyzing security text...")
result = {
'input_type': 'text',
'input_data': input_data,
'quick_mode': quick_mode,
'vulnerability_predictions': classifier.get_meaningful_predictions(input_data),
'risk_assessment': {'level': 'UNKNOWN', 'score': 0, 'color': 'gray'}
}
# Calculate risk score for text analysis
risk_score = 0
factors = []
predictions = result['vulnerability_predictions']
for pred in predictions:
if pred['risk_level'] == 'Critical':
risk_score += 35
elif pred['risk_level'] == 'High':
risk_score += 25
elif pred['risk_level'] == 'Medium':
risk_score += 15
else:
risk_score += 8
if predictions:
factors.append(f"Found {len(predictions)} potential vulnerabilities")
result['risk_assessment'] = {
'level': 'CRITICAL' if risk_score >= 80 else 'HIGH' if risk_score >= 60 else 'MEDIUM' if risk_score >= 30 else 'LOW',
'score': min(risk_score, 100),
'color': '#dc2626' if risk_score >= 80 else '#ea580c' if risk_score >= 60 else '#d97706' if risk_score >= 30 else '#2563eb',
'factors': factors
}
result['input_data'] = input_data
result['timestamp'] = pd.Timestamp.now().isoformat()
# Generate explainability data if enabled
if enable_explainability and not quick_mode:
progress(0.8, "Generating explainability insights...")
explainability_data = generate_explainability(input_data, False)
result['explainability_data'] = explainability_data
progress(0.95, "Finalizing security report...")
return format_results(result)
except Exception as e:
return format_error(f"Security analysis failed: {str(e)}")
analyze_btn.click(
fn=analyze_input,
inputs=[input_data, analysis_type, quick_mode, enable_port_scan, enable_explainability],
outputs=[output_html, risk_score, json_output, chart_output]
)
return interface
# Launch the application
interface = create_enhanced_dashboard()
if __name__ == "__main__":
# Try multiple ports to find an available one
ports_to_try = [7860, 7861, 7862, 7863, 7864, 7865, 7866, 7867, 7868, 7869]
for port in ports_to_try:
try:
print(f"Trying to launch on port {port}...")
interface.launch(
server_name="0.0.0.0",
server_port=port,
show_error=True,
share=False,
inbrowser=True # Automatically open browser
)
break
except OSError as e:
if "Address already in use" in str(e) or "port" in str(e).lower():
print(f"Port {port} is busy, trying next port...")
continue
else:
raise e
else:
print("Could not find an available port. Please check if any Gradio apps are running.")
# Launch with default port selection
interface.launch(
server_name="0.0.0.0",
show_error=True,
share=False,
inbrowser=True
)