import streamlit as st import nmap import paramiko import subprocess import requests import json import re import os import pandas as pd from io import StringIO import matplotlib.pyplot as plt import logging import plotly.express as px from datetime import datetime import traceback import platform # Nouvelle dépendance pour surveiller les ressources système try: import psutil HAS_PSUTIL = True except ImportError: HAS_PSUTIL = False # Si psutil n'est pas installé, on pourrait le signaler dans les logs # Pour envoyer des alertes Slack (exemple simplifié) try: from slack_sdk import WebClient from slack_sdk.errors import SlackApiError HAS_SLACK = True except ImportError: HAS_SLACK = False # Même remarque : on log ou on ignore # --- Configuration Initiale --- st.set_page_config( page_title="Analyse de Vulnérabilités", layout="wide", initial_sidebar_state="expanded" ) # --- Configuration des Logs --- LOG_RECORDS = [] LOG_LEVEL_FILTER = "INFO" LOG_LEVELS = {"DEBUG": logging.DEBUG, "INFO": logging.INFO, "WARNING": logging.WARNING, "ERROR": logging.ERROR} class ListHandler(logging.Handler): def emit(self, record): log_entry = self.format(record) # Compare le niveau du log au niveau défini par LOG_LEVEL_FILTER if record.levelno >= LOG_LEVELS[LOG_LEVEL_FILTER]: LOG_RECORDS.append(log_entry) logger = logging.getLogger("VulnScanner") logger.setLevel(logging.DEBUG) handler = ListHandler() handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) logger.addHandler(handler) # --- Constantes --- WEAK_PASSWORDS_FILE = "weak_passwords.txt" DEFAULT_WEAK_PASSWORDS = ["123456", "password", "admin", "azerty", "qwerty", "motdepasse"] # Exemple très simplifié de mappage CVE # Dans la réalité, on irait interroger une base NVD/CVE EXAMPLE_CVE_DB = { "apache2": { "2.4.48": "CVE-2021-XYZ", # CVE fictif "2.4.52": "CVE-2021-ABC" }, "openssl": { "1.1.1f": "CVE-2020-XYZ", "3.0.2": "CVE-2022-XYZ" } } SLACK_TOKEN = "xoxb-xxxxxxxxxxx-xxxxxxxxxxxxx-xxxxxxxxxxx" # À adapter SLACK_CHANNEL = "#vulnerabilities" # À adapter # --- Fonctions Utilitaires --- @st.cache_data def load_weak_passwords(): """Charge la liste des mots de passe faibles depuis un fichier ou utilise une liste par défaut.""" try: with open(WEAK_PASSWORDS_FILE, "r", encoding="utf-8") as f: passwords = [line.strip() for line in f if line.strip()] logger.info(f"Chargement de {len(passwords)} mots de passe faibles depuis {WEAK_PASSWORDS_FILE}") return passwords except FileNotFoundError: logger.warning(f"Fichier {WEAK_PASSWORDS_FILE} non trouvé. Utilisation de la liste par défaut.") return DEFAULT_WEAK_PASSWORDS def validate_ip(ip): """Valide une adresse IP (x.x.x.x).""" return bool(re.match(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$", ip)) def validate_url(url): """Valide une URL (http:// ou https://).""" return bool(re.match(r"^(http://|https://).+$", url)) def send_notification(message, severity="INFO"): """Placeholder pour une intégration de notification future.""" logger.log(LOG_LEVELS[severity], f"Notification: {message}") # Exemple d'envoi Slack def slack_alert(message, token=SLACK_TOKEN, channel=SLACK_CHANNEL): """Envoie un message Slack (exemple très basique).""" if not HAS_SLACK: logger.warning("Slack SDK non installé. Alerte non envoyée.") return "Slack SDK non installé." client = WebClient(token=token) try: response = client.chat_postMessage(channel=channel, text=message) return f"Slack Alert sent: {response['ts']}" except SlackApiError as e: logger.error(f"Erreur Slack: {e}") return f"Erreur Slack: {str(e)}" # --- Fonctions d'Analyse --- def scan_ports(target_ip, ports="1-1024", args="-T4"): """Effectue un scan de ports avec Nmap.""" nm = nmap.PortScanner() with st.spinner(f"Scanning {target_ip}..."): try: # Vérifie la validité du champ 'ports' if not re.match(r"^(\d+(-\d+)?(?:,\d+(-\d+)?)*)$", ports): raise ValueError("Format de ports invalide (ex: '22,80' ou '1-1024').") nm.scan(target_ip, ports=ports, arguments=args) logger.info(f"Scan terminé pour {target_ip} sur les ports {ports}") hosts = nm.all_hosts() open_ports = [] for host in hosts: if nm[host].state() == "up": tcp_ports = nm[host].get("tcp", {}) for p in tcp_ports: if tcp_ports[p]["state"] == "open": open_ports.append((host, p, tcp_ports[p]["name"])) return nm.scaninfo(), hosts, nm.csv(), open_ports except Exception as e: logger.error(f"Erreur lors du scan de {target_ip}: {str(e)}") send_notification(f"Échec du scan de {target_ip}: {str(e)}", "ERROR") return None, None, None, None def test_ssh(hostname, port, username, password): """Teste une connexion SSH.""" client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: client.connect(hostname, port=port, username=username, password=password, timeout=5) logger.info(f"Connexion SSH réussie à {username}@{hostname}:{port}") return "✅ Connexion SSH réussie" except Exception as e: logger.warning(f"Échec SSH à {hostname}:{port} - {str(e)}") return f"❌ Échec SSH: {str(e)}" finally: client.close() def audit_software(): """ Audit les logiciels installés (dpkg). Retourne un DataFrame avec les colonnes suivantes : [Status, Name, Version, Architecture, Description, CVE] """ try: result = subprocess.run(["dpkg", "--list"], capture_output=True, text=True, check=True) software_data = [] for line in result.stdout.splitlines(): if line.startswith("ii"): parts = line.split() if len(parts) >= 5: name = parts[1].lower() version = parts[2].lower() # On cherche dans la base de CVE simplifiée cve = EXAMPLE_CVE_DB.get(name, {}).get(version, "N/A") software_data.append({ "Status": parts[0], "Name": name, "Version": version, "Architecture": parts[3], "Description": " ".join(parts[4:]), "CVE": cve }) logger.info(f"Audit terminé: {len(software_data)} logiciels trouvés") return pd.DataFrame(software_data) except Exception as e: logger.error(f"Erreur lors de l'audit logiciel: {str(e)}") send_notification(f"Échec de l'audit logiciel: {str(e)}", "ERROR") return None def check_weak_passwords(password): """Vérifie la robustesse d'un mot de passe (longueur, liste de mots de passe faibles).""" weak_passwords = load_weak_passwords() if password in weak_passwords: logger.warning(f"Mot de passe faible détecté: {password}") return "⚠️ Mot de passe faible (liste connue)" if len(password) < 8: logger.warning(f"Mot de passe trop court: {password}") return "⚠️ Mot de passe trop court (< 8 caractères)" logger.info("Mot de passe semble robuste") return "✅ Mot de passe robuste (vérification basique)" def ssl_scan(domain): """Effectue un scan SSL/TLS avec sslscan.""" try: result = subprocess.run(["sslscan", domain], capture_output=True, text=True, check=True) logger.info(f"Scan SSL/TLS terminé pour {domain}") return result.stdout except FileNotFoundError: logger.error("sslscan non installé") return "Erreur: sslscan requis mais non installé." except Exception as e: logger.error(f"Erreur SSL/TLS pour {domain}: {str(e)}") return f"Erreur: {str(e)}" def web_dir_bruteforce(url, wordlist="/usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt"): """Bruteforce les répertoires web avec Gobuster.""" try: result = subprocess.run(["gobuster", "dir", "-u", url, "-w", wordlist], capture_output=True, text=True, check=True) logger.info(f"Bruteforce terminé pour {url}") return result.stdout except FileNotFoundError: logger.error("gobuster ou wordlist introuvable") return "Erreur: gobuster ou wordlist introuvable." except Exception as e: logger.error(f"Erreur bruteforce pour {url}: {str(e)}") return f"Erreur: {str(e)}" def nikto_scan(url): """Scanne une URL avec Nikto.""" try: result = subprocess.run(["nikto", "-h", url], capture_output=True, text=True, check=True) logger.info(f"Scan Nikto terminé pour {url}") return result.stdout except FileNotFoundError: logger.error("Nikto non installé") return "Erreur: Nikto requis mais non installé." except Exception as e: logger.error(f"Erreur Nikto pour {url}: {str(e)}") return f"Erreur: {str(e)}" def analyze_http_headers(url): """Analyse les headers HTTP d'une URL.""" try: response = requests.get(url, timeout=10) headers = response.headers security_headers = { "Strict-Transport-Security": "Force HTTPS", "X-Frame-Options": "Protège contre le clickjacking", "X-Content-Type-Options": "Empêche le MIME-sniffing", "Referrer-Policy": "Contrôle les références", "Permissions-Policy": "Limite les fonctionnalités", "Content-Security-Policy": "Protège contre XSS" } result = "".join( f"✅ **{h}**: Présent - {desc}\n" if h in headers else f"⚠️ **{h}**: Absent - {desc}\n" for h, desc in security_headers.items() ) result += "\n**Headers complets:**\n" + "\n".join(f"* {k}: {v}" for k, v in headers.items()) logger.info(f"Analyse des headers terminée pour {url}") return result except Exception as e: logger.error(f"Erreur lors de l'analyse des headers pour {url}: {str(e)}") return f"Erreur: {str(e)}" def get_system_info(): """ Récupère quelques informations système grâce à 'platform'. Utile pour savoir sur quel OS/Architecture tourne le script. """ info = { "OS": platform.system(), "OS Version": platform.version(), "Platform": platform.platform(), "Machine": platform.machine(), "Processor": platform.processor() } return info def wpscan_run(url): """Lance WPScan sur un site WordPress.""" try: # On passe des options d'exemple comme --random-user-agent et --disable-tls-checks result = subprocess.run( ["wpscan", "--url", url, "--random-user-agent", "--disable-tls-checks"], capture_output=True, text=True, check=True ) logger.info(f"WPScan terminé pour {url}") return result.stdout except FileNotFoundError: logger.error("WPScan non installé ou introuvable.") return "Erreur: WPScan requis mais non installé." except Exception as e: logger.error(f"Erreur WPScan pour {url}: {str(e)}") return f"Erreur: {str(e)}" def get_resource_usage(): """Retourne des infos CPU/RAM via psutil (si dispo).""" if not HAS_PSUTIL: return "psutil non installé. Veuillez l'installer pour monitorer les ressources." usage = { "cpu_percent": psutil.cpu_percent(interval=1), "memory": psutil.virtual_memory()._asdict(), # RAM : total, used, free, ... } return usage def generate_report(results): """Génère un rapport textuel des résultats.""" report = f"RAPPORT D'ANALYSE - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n{'=' * 50}\n" for key, data in results.items(): report += f"\n--- {key.replace('_', ' ').title()} ---\n" if key == "port_scan" and data.get("scan_info"): report += f"Infos: {data['scan_info']}\nHôtes: {data['hosts']}\nPorts ouverts: {data['open_ports']}\n" elif key == "ssh_test": report += f"Résultat: {data}\n" elif key == "software_audit" and data is not None: # On profite de to_string() de pandas report += f"Logiciels: {data.to_string(index=False)}\n" elif key == "password_check": report += f"Résultat: {data['result']} (masqué)\n" elif key in ["ssl_scan", "web_dir_bruteforce", "nikto_scan", "http_headers_analysis", "wpscan"]: report += f"Cible: {data['target']}\nRésultat: {data['result']}\n" elif key == "system_info": report += f"Informations Système:\n{json.dumps(data, indent=2)}\n" elif key == "resources": report += f"Usage CPU: {data['cpu_percent']}%\n" if isinstance(data['memory'], dict): report += f"Usage RAM: {data['memory']['percent']}%\n" return report # --- Interface Streamlit --- st.title("🛡️ Scanner de Vulnérabilités") results = {} tabs = st.tabs([ "Scan Ports", "Test SSH", "Audit Logiciels", "Mots de Passe", "SSL/TLS", "Bruteforce Web", "Scan Nikto", "Headers HTTP", "Infos Système", "Scan WordPress", "Ressources Système", "Guide", "Logs" ]) # Onglet 1: Scan des Ports with tabs[0]: st.header("Scan des Ports") col1, col2 = st.columns(2) with col1: ip = st.text_input("Adresse IP", "127.0.0.1") with col2: ports = st.text_input("Ports (ex: 1-1024)", "1-1024") args = st.selectbox("Options Nmap", ["-T4", "-sS", "-sT", "-sU", "-sV -O"]) if st.button("Scanner"): if not validate_ip(ip): st.error("IP invalide") else: scan_info, hosts, csv_results, open_ports = scan_ports(ip, ports, args) if scan_info: results["port_scan"] = { "scan_info": scan_info, "hosts": hosts, "csv_results": csv_results, "open_ports": open_ports } st.write("Hôtes:", hosts) if open_ports: df = pd.DataFrame(open_ports, columns=["IP", "Port", "Service"]) st.dataframe(df) fig = px.bar(df, x="Port", color="Service", title="Ports Ouverts") st.plotly_chart(fig) else: st.info("Aucun port ouvert détecté") # Onglet 2: Test SSH with tabs[1]: st.header("Test SSH") col1, col2 = st.columns(2) with col1: host = st.text_input("Hôte", "127.0.0.1") port = st.number_input("Port", 1, 65535, 22) with col2: user = st.text_input("Utilisateur", key="ssh_user") pwd_ssh = st.text_input("Mot de passe", type="password", key="ssh_password") if st.button("Tester SSH"): result = test_ssh(host, port, user, pwd_ssh) results["ssh_test"] = result st.write(result) # Onglet 3: Audit Logiciels with tabs[2]: st.header("Audit Logiciels") if st.button("Lancer Audit"): df = audit_software() if df is not None: results["software_audit"] = df st.dataframe(df) fig = px.pie(df, names="Architecture", title="Répartition par Architecture") st.plotly_chart(fig) # On affiche un avertissement si l'on détecte une CVE cves_detectees = df[df["CVE"] != "N/A"] if not cves_detectees.empty: st.warning("Certaines CVE potentiellement détectées !") # Optionnellement, on peut déclencher un Slack if st.button("Notifier Slack (CVE)"): alert_text = f"Des vulnérabilités ont été détectées: {cves_detectees['CVE'].unique().tolist()}" resp = slack_alert(alert_text) st.write(resp) # Onglet 4: Vérification Mots de Passe with tabs[3]: st.header("Vérification Mots de Passe") pwd_check = st.text_input("Mot de passe", type="password", key="check_password") if st.button("Vérifier"): result = check_weak_passwords(pwd_check) results["password_check"] = {"result": result} st.write(result) # Onglet 5: Scan SSL/TLS with tabs[4]: st.header("Scan SSL/TLS") domain = st.text_input("Domaine", "example.com") if st.button("Scanner SSL"): result = ssl_scan(domain) results["ssl_scan"] = {"target": domain, "result": result} st.text_area("Résultat", result, height=300) # Onglet 6: Bruteforce Web with tabs[5]: st.header("Bruteforce Web") url_gobuster = st.text_input("URL", "http://example.com", key="gobuster_url") if st.button("Lancer Bruteforce"): if validate_url(url_gobuster): result = web_dir_bruteforce(url_gobuster) results["web_dir_bruteforce"] = {"target": url_gobuster, "result": result} st.text_area("Résultat", result, height=300) else: st.error("URL invalide") # Onglet 7: Scan Nikto with tabs[6]: st.header("Scan Nikto") url_nikto = st.text_input("URL", "http://example.com", key="nikto_url") if st.button("Lancer Nikto"): if validate_url(url_nikto): result = nikto_scan(url_nikto) results["nikto_scan"] = {"target": url_nikto, "result": result} st.text_area("Résultat", result, height=300) else: st.error("URL invalide") # Onglet 8: Headers HTTP with tabs[7]: st.header("Headers HTTP") url_headers = st.text_input("URL", "http://example.com", key="headers_url") if st.button("Analyser"): if validate_url(url_headers): result = analyze_http_headers(url_headers) results["http_headers_analysis"] = {"target": url_headers, "result": result} st.markdown(result) else: st.error("URL invalide") # Onglet 9: Infos Système with tabs[8]: st.header("Infos Système") if st.button("Afficher Infos"): sys_info = get_system_info() results["system_info"] = sys_info st.json(sys_info) # Onglet 10: Scan WordPress with tabs[9]: st.header("Scan WordPress") wp_url = st.text_input("URL du site WordPress", "http://example.com", key="wpscan_url") if st.button("Lancer WPScan"): if validate_url(wp_url): result = wpscan_run(wp_url) results["wpscan"] = {"target": wp_url, "result": result} st.text_area("Résultat WPScan", result, height=300) else: st.error("URL invalide") # Onglet 11: Ressources Système with tabs[10]: st.header("Ressources Système") if st.button("Montrer l'état des ressources"): usage = get_resource_usage() if isinstance(usage, str): # psutil n'est pas installé st.error(usage) else: results["resources"] = usage st.write(f"CPU usage: {usage['cpu_percent']} %") st.write(f"RAM usage: {usage['memory']['percent']} %") # Onglet 12: Guide with tabs[11]: st.header("Guide Utilisateur") st.markdown(""" ### Utilisation - **Scan Ports**: Analyse les ports ouverts avec Nmap. - **Test SSH**: Vérifie les connexions SSH. - **Audit Logiciels**: Liste les logiciels installés (Debian/Ubuntu) et tente de matcher des CVE (exemple simplifié). - **Mots de Passe**: Vérifie la robustesse d'un mot de passe. - **SSL/TLS**: Analyse la configuration SSL/TLS via sslscan. - **Bruteforce Web**: Recherche des répertoires web via Gobuster. - **Scan Nikto**: Test de vulnérabilités web via Nikto. - **Headers HTTP**: Vérification des en-têtes de sécurité. - **Infos Système**: Récupère les informations sur l'OS, la machine et le processeur. - **Scan WordPress**: Lance WPScan pour détecter des vulnérabilités sur un site WordPress. - **Ressources Système**: Affiche l'état CPU/RAM (via psutil) si installé. ### Fonctions Avancées - **Slack Alert**: Envoie une alerte sur un canal Slack si des vulnérabilités sont détectées (nécessite la Slack SDK). """) # Onglet 13: Logs with tabs[12]: st.header("Logs") level = st.selectbox("Niveau", list(LOG_LEVELS.keys()), index=1) LOG_LEVEL_FILTER = level st.text_area("Logs", "\n".join(LOG_RECORDS) or "Aucun log", height=400) # --- Sidebar pour Export --- with st.sidebar: st.header("Export") if st.button("Générer Rapport"): report = generate_report(results) st.download_button("Télécharger", report, "rapport.txt", "text/plain") if st.button("Exporter JSON"): json_data = json.dumps(results, indent=2) st.download_button("Télécharger JSON", json_data, "resultats.json", "application/json") # Exemple de bouton Slack en sidebar if HAS_SLACK: with st.sidebar: st.header("Slack") if st.button("Envoyer Slack: Hello !"): resp = slack_alert("Hello from VulnScanner!") st.write(resp) if __name__ == "__main__": logger.info("Application démarrée")