Spaces:
Build error
Build error
| import streamlit as st | |
| import nmap | |
| import paramiko | |
| import subprocess | |
| import requests | |
| import json | |
| import re | |
| import os | |
| import pandas as pd | |
| from io import StringIO | |
| import matplotlib.pyplot as plt | |
| import logging | |
| import plotly.express as px | |
| from datetime import datetime | |
| import traceback | |
| import platform | |
| # Nouvelle dépendance pour surveiller les ressources système | |
| try: | |
| import psutil | |
| HAS_PSUTIL = True | |
| except ImportError: | |
| HAS_PSUTIL = False | |
| # Si psutil n'est pas installé, on pourrait le signaler dans les logs | |
| # Pour envoyer des alertes Slack (exemple simplifié) | |
| try: | |
| from slack_sdk import WebClient | |
| from slack_sdk.errors import SlackApiError | |
| HAS_SLACK = True | |
| except ImportError: | |
| HAS_SLACK = False | |
| # Même remarque : on log ou on ignore | |
| # --- Configuration Initiale --- | |
| st.set_page_config( | |
| page_title="Analyse de Vulnérabilités", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # --- Configuration des Logs --- | |
| LOG_RECORDS = [] | |
| LOG_LEVEL_FILTER = "INFO" | |
| LOG_LEVELS = {"DEBUG": logging.DEBUG, "INFO": logging.INFO, "WARNING": logging.WARNING, "ERROR": logging.ERROR} | |
| class ListHandler(logging.Handler): | |
| def emit(self, record): | |
| log_entry = self.format(record) | |
| # Compare le niveau du log au niveau défini par LOG_LEVEL_FILTER | |
| if record.levelno >= LOG_LEVELS[LOG_LEVEL_FILTER]: | |
| LOG_RECORDS.append(log_entry) | |
| logger = logging.getLogger("VulnScanner") | |
| logger.setLevel(logging.DEBUG) | |
| handler = ListHandler() | |
| handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) | |
| logger.addHandler(handler) | |
| # --- Constantes --- | |
| WEAK_PASSWORDS_FILE = "weak_passwords.txt" | |
| DEFAULT_WEAK_PASSWORDS = ["123456", "password", "admin", "azerty", "qwerty", "motdepasse"] | |
| # Exemple très simplifié de mappage CVE | |
| # Dans la réalité, on irait interroger une base NVD/CVE | |
| EXAMPLE_CVE_DB = { | |
| "apache2": { | |
| "2.4.48": "CVE-2021-XYZ", # CVE fictif | |
| "2.4.52": "CVE-2021-ABC" | |
| }, | |
| "openssl": { | |
| "1.1.1f": "CVE-2020-XYZ", | |
| "3.0.2": "CVE-2022-XYZ" | |
| } | |
| } | |
| SLACK_TOKEN = "xoxb-xxxxxxxxxxx-xxxxxxxxxxxxx-xxxxxxxxxxx" # À adapter | |
| SLACK_CHANNEL = "#vulnerabilities" # À adapter | |
| # --- Fonctions Utilitaires --- | |
| def load_weak_passwords(): | |
| """Charge la liste des mots de passe faibles depuis un fichier ou utilise une liste par défaut.""" | |
| try: | |
| with open(WEAK_PASSWORDS_FILE, "r", encoding="utf-8") as f: | |
| passwords = [line.strip() for line in f if line.strip()] | |
| logger.info(f"Chargement de {len(passwords)} mots de passe faibles depuis {WEAK_PASSWORDS_FILE}") | |
| return passwords | |
| except FileNotFoundError: | |
| logger.warning(f"Fichier {WEAK_PASSWORDS_FILE} non trouvé. Utilisation de la liste par défaut.") | |
| return DEFAULT_WEAK_PASSWORDS | |
| def validate_ip(ip): | |
| """Valide une adresse IP (x.x.x.x).""" | |
| return bool(re.match(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$", ip)) | |
| def validate_url(url): | |
| """Valide une URL (http:// ou https://).""" | |
| return bool(re.match(r"^(http://|https://).+$", url)) | |
| def send_notification(message, severity="INFO"): | |
| """Placeholder pour une intégration de notification future.""" | |
| logger.log(LOG_LEVELS[severity], f"Notification: {message}") | |
| # Exemple d'envoi Slack | |
| def slack_alert(message, token=SLACK_TOKEN, channel=SLACK_CHANNEL): | |
| """Envoie un message Slack (exemple très basique).""" | |
| if not HAS_SLACK: | |
| logger.warning("Slack SDK non installé. Alerte non envoyée.") | |
| return "Slack SDK non installé." | |
| client = WebClient(token=token) | |
| try: | |
| response = client.chat_postMessage(channel=channel, text=message) | |
| return f"Slack Alert sent: {response['ts']}" | |
| except SlackApiError as e: | |
| logger.error(f"Erreur Slack: {e}") | |
| return f"Erreur Slack: {str(e)}" | |
| # --- Fonctions d'Analyse --- | |
| def scan_ports(target_ip, ports="1-1024", args="-T4"): | |
| """Effectue un scan de ports avec Nmap.""" | |
| nm = nmap.PortScanner() | |
| with st.spinner(f"Scanning {target_ip}..."): | |
| try: | |
| # Vérifie la validité du champ 'ports' | |
| if not re.match(r"^(\d+(-\d+)?(?:,\d+(-\d+)?)*)$", ports): | |
| raise ValueError("Format de ports invalide (ex: '22,80' ou '1-1024').") | |
| nm.scan(target_ip, ports=ports, arguments=args) | |
| logger.info(f"Scan terminé pour {target_ip} sur les ports {ports}") | |
| hosts = nm.all_hosts() | |
| open_ports = [] | |
| for host in hosts: | |
| if nm[host].state() == "up": | |
| tcp_ports = nm[host].get("tcp", {}) | |
| for p in tcp_ports: | |
| if tcp_ports[p]["state"] == "open": | |
| open_ports.append((host, p, tcp_ports[p]["name"])) | |
| return nm.scaninfo(), hosts, nm.csv(), open_ports | |
| except Exception as e: | |
| logger.error(f"Erreur lors du scan de {target_ip}: {str(e)}") | |
| send_notification(f"Échec du scan de {target_ip}: {str(e)}", "ERROR") | |
| return None, None, None, None | |
| def test_ssh(hostname, port, username, password): | |
| """Teste une connexion SSH.""" | |
| client = paramiko.SSHClient() | |
| client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) | |
| try: | |
| client.connect(hostname, port=port, username=username, password=password, timeout=5) | |
| logger.info(f"Connexion SSH réussie à {username}@{hostname}:{port}") | |
| return "✅ Connexion SSH réussie" | |
| except Exception as e: | |
| logger.warning(f"Échec SSH à {hostname}:{port} - {str(e)}") | |
| return f"❌ Échec SSH: {str(e)}" | |
| finally: | |
| client.close() | |
| def audit_software(): | |
| """ | |
| Audit les logiciels installés (dpkg). | |
| Retourne un DataFrame avec les colonnes suivantes : | |
| [Status, Name, Version, Architecture, Description, CVE] | |
| """ | |
| try: | |
| result = subprocess.run(["dpkg", "--list"], capture_output=True, text=True, check=True) | |
| software_data = [] | |
| for line in result.stdout.splitlines(): | |
| if line.startswith("ii"): | |
| parts = line.split() | |
| if len(parts) >= 5: | |
| name = parts[1].lower() | |
| version = parts[2].lower() | |
| # On cherche dans la base de CVE simplifiée | |
| cve = EXAMPLE_CVE_DB.get(name, {}).get(version, "N/A") | |
| software_data.append({ | |
| "Status": parts[0], | |
| "Name": name, | |
| "Version": version, | |
| "Architecture": parts[3], | |
| "Description": " ".join(parts[4:]), | |
| "CVE": cve | |
| }) | |
| logger.info(f"Audit terminé: {len(software_data)} logiciels trouvés") | |
| return pd.DataFrame(software_data) | |
| except Exception as e: | |
| logger.error(f"Erreur lors de l'audit logiciel: {str(e)}") | |
| send_notification(f"Échec de l'audit logiciel: {str(e)}", "ERROR") | |
| return None | |
| def check_weak_passwords(password): | |
| """Vérifie la robustesse d'un mot de passe (longueur, liste de mots de passe faibles).""" | |
| weak_passwords = load_weak_passwords() | |
| if password in weak_passwords: | |
| logger.warning(f"Mot de passe faible détecté: {password}") | |
| return "⚠️ Mot de passe faible (liste connue)" | |
| if len(password) < 8: | |
| logger.warning(f"Mot de passe trop court: {password}") | |
| return "⚠️ Mot de passe trop court (< 8 caractères)" | |
| logger.info("Mot de passe semble robuste") | |
| return "✅ Mot de passe robuste (vérification basique)" | |
| def ssl_scan(domain): | |
| """Effectue un scan SSL/TLS avec sslscan.""" | |
| try: | |
| result = subprocess.run(["sslscan", domain], capture_output=True, text=True, check=True) | |
| logger.info(f"Scan SSL/TLS terminé pour {domain}") | |
| return result.stdout | |
| except FileNotFoundError: | |
| logger.error("sslscan non installé") | |
| return "Erreur: sslscan requis mais non installé." | |
| except Exception as e: | |
| logger.error(f"Erreur SSL/TLS pour {domain}: {str(e)}") | |
| return f"Erreur: {str(e)}" | |
| def web_dir_bruteforce(url, wordlist="/usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt"): | |
| """Bruteforce les répertoires web avec Gobuster.""" | |
| try: | |
| result = subprocess.run(["gobuster", "dir", "-u", url, "-w", wordlist], capture_output=True, text=True, | |
| check=True) | |
| logger.info(f"Bruteforce terminé pour {url}") | |
| return result.stdout | |
| except FileNotFoundError: | |
| logger.error("gobuster ou wordlist introuvable") | |
| return "Erreur: gobuster ou wordlist introuvable." | |
| except Exception as e: | |
| logger.error(f"Erreur bruteforce pour {url}: {str(e)}") | |
| return f"Erreur: {str(e)}" | |
| def nikto_scan(url): | |
| """Scanne une URL avec Nikto.""" | |
| try: | |
| result = subprocess.run(["nikto", "-h", url], capture_output=True, text=True, check=True) | |
| logger.info(f"Scan Nikto terminé pour {url}") | |
| return result.stdout | |
| except FileNotFoundError: | |
| logger.error("Nikto non installé") | |
| return "Erreur: Nikto requis mais non installé." | |
| except Exception as e: | |
| logger.error(f"Erreur Nikto pour {url}: {str(e)}") | |
| return f"Erreur: {str(e)}" | |
| def analyze_http_headers(url): | |
| """Analyse les headers HTTP d'une URL.""" | |
| try: | |
| response = requests.get(url, timeout=10) | |
| headers = response.headers | |
| security_headers = { | |
| "Strict-Transport-Security": "Force HTTPS", | |
| "X-Frame-Options": "Protège contre le clickjacking", | |
| "X-Content-Type-Options": "Empêche le MIME-sniffing", | |
| "Referrer-Policy": "Contrôle les références", | |
| "Permissions-Policy": "Limite les fonctionnalités", | |
| "Content-Security-Policy": "Protège contre XSS" | |
| } | |
| result = "".join( | |
| f"✅ **{h}**: Présent - {desc}\n" if h in headers else f"⚠️ **{h}**: Absent - {desc}\n" | |
| for h, desc in security_headers.items() | |
| ) | |
| result += "\n**Headers complets:**\n" + "\n".join(f"* {k}: {v}" for k, v in headers.items()) | |
| logger.info(f"Analyse des headers terminée pour {url}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Erreur lors de l'analyse des headers pour {url}: {str(e)}") | |
| return f"Erreur: {str(e)}" | |
| def get_system_info(): | |
| """ | |
| Récupère quelques informations système grâce à 'platform'. | |
| Utile pour savoir sur quel OS/Architecture tourne le script. | |
| """ | |
| info = { | |
| "OS": platform.system(), | |
| "OS Version": platform.version(), | |
| "Platform": platform.platform(), | |
| "Machine": platform.machine(), | |
| "Processor": platform.processor() | |
| } | |
| return info | |
| def wpscan_run(url): | |
| """Lance WPScan sur un site WordPress.""" | |
| try: | |
| # On passe des options d'exemple comme --random-user-agent et --disable-tls-checks | |
| result = subprocess.run( | |
| ["wpscan", "--url", url, "--random-user-agent", "--disable-tls-checks"], | |
| capture_output=True, text=True, check=True | |
| ) | |
| logger.info(f"WPScan terminé pour {url}") | |
| return result.stdout | |
| except FileNotFoundError: | |
| logger.error("WPScan non installé ou introuvable.") | |
| return "Erreur: WPScan requis mais non installé." | |
| except Exception as e: | |
| logger.error(f"Erreur WPScan pour {url}: {str(e)}") | |
| return f"Erreur: {str(e)}" | |
| def get_resource_usage(): | |
| """Retourne des infos CPU/RAM via psutil (si dispo).""" | |
| if not HAS_PSUTIL: | |
| return "psutil non installé. Veuillez l'installer pour monitorer les ressources." | |
| usage = { | |
| "cpu_percent": psutil.cpu_percent(interval=1), | |
| "memory": psutil.virtual_memory()._asdict(), # RAM : total, used, free, ... | |
| } | |
| return usage | |
| def generate_report(results): | |
| """Génère un rapport textuel des résultats.""" | |
| report = f"RAPPORT D'ANALYSE - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n{'=' * 50}\n" | |
| for key, data in results.items(): | |
| report += f"\n--- {key.replace('_', ' ').title()} ---\n" | |
| if key == "port_scan" and data.get("scan_info"): | |
| report += f"Infos: {data['scan_info']}\nHôtes: {data['hosts']}\nPorts ouverts: {data['open_ports']}\n" | |
| elif key == "ssh_test": | |
| report += f"Résultat: {data}\n" | |
| elif key == "software_audit" and data is not None: | |
| # On profite de to_string() de pandas | |
| report += f"Logiciels: {data.to_string(index=False)}\n" | |
| elif key == "password_check": | |
| report += f"Résultat: {data['result']} (masqué)\n" | |
| elif key in ["ssl_scan", "web_dir_bruteforce", "nikto_scan", "http_headers_analysis", "wpscan"]: | |
| report += f"Cible: {data['target']}\nRésultat: {data['result']}\n" | |
| elif key == "system_info": | |
| report += f"Informations Système:\n{json.dumps(data, indent=2)}\n" | |
| elif key == "resources": | |
| report += f"Usage CPU: {data['cpu_percent']}%\n" | |
| if isinstance(data['memory'], dict): | |
| report += f"Usage RAM: {data['memory']['percent']}%\n" | |
| return report | |
| # --- Interface Streamlit --- | |
| st.title("🛡️ Scanner de Vulnérabilités") | |
| results = {} | |
| tabs = st.tabs([ | |
| "Scan Ports", "Test SSH", "Audit Logiciels", "Mots de Passe", "SSL/TLS", | |
| "Bruteforce Web", "Scan Nikto", "Headers HTTP", "Infos Système", | |
| "Scan WordPress", "Ressources Système", "Guide", "Logs" | |
| ]) | |
| # Onglet 1: Scan des Ports | |
| with tabs[0]: | |
| st.header("Scan des Ports") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| ip = st.text_input("Adresse IP", "127.0.0.1") | |
| with col2: | |
| ports = st.text_input("Ports (ex: 1-1024)", "1-1024") | |
| args = st.selectbox("Options Nmap", ["-T4", "-sS", "-sT", "-sU", "-sV -O"]) | |
| if st.button("Scanner"): | |
| if not validate_ip(ip): | |
| st.error("IP invalide") | |
| else: | |
| scan_info, hosts, csv_results, open_ports = scan_ports(ip, ports, args) | |
| if scan_info: | |
| results["port_scan"] = { | |
| "scan_info": scan_info, | |
| "hosts": hosts, | |
| "csv_results": csv_results, | |
| "open_ports": open_ports | |
| } | |
| st.write("Hôtes:", hosts) | |
| if open_ports: | |
| df = pd.DataFrame(open_ports, columns=["IP", "Port", "Service"]) | |
| st.dataframe(df) | |
| fig = px.bar(df, x="Port", color="Service", title="Ports Ouverts") | |
| st.plotly_chart(fig) | |
| else: | |
| st.info("Aucun port ouvert détecté") | |
| # Onglet 2: Test SSH | |
| with tabs[1]: | |
| st.header("Test SSH") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| host = st.text_input("Hôte", "127.0.0.1") | |
| port = st.number_input("Port", 1, 65535, 22) | |
| with col2: | |
| user = st.text_input("Utilisateur", key="ssh_user") | |
| pwd_ssh = st.text_input("Mot de passe", type="password", key="ssh_password") | |
| if st.button("Tester SSH"): | |
| result = test_ssh(host, port, user, pwd_ssh) | |
| results["ssh_test"] = result | |
| st.write(result) | |
| # Onglet 3: Audit Logiciels | |
| with tabs[2]: | |
| st.header("Audit Logiciels") | |
| if st.button("Lancer Audit"): | |
| df = audit_software() | |
| if df is not None: | |
| results["software_audit"] = df | |
| st.dataframe(df) | |
| fig = px.pie(df, names="Architecture", title="Répartition par Architecture") | |
| st.plotly_chart(fig) | |
| # On affiche un avertissement si l'on détecte une CVE | |
| cves_detectees = df[df["CVE"] != "N/A"] | |
| if not cves_detectees.empty: | |
| st.warning("Certaines CVE potentiellement détectées !") | |
| # Optionnellement, on peut déclencher un Slack | |
| if st.button("Notifier Slack (CVE)"): | |
| alert_text = f"Des vulnérabilités ont été détectées: {cves_detectees['CVE'].unique().tolist()}" | |
| resp = slack_alert(alert_text) | |
| st.write(resp) | |
| # Onglet 4: Vérification Mots de Passe | |
| with tabs[3]: | |
| st.header("Vérification Mots de Passe") | |
| pwd_check = st.text_input("Mot de passe", type="password", key="check_password") | |
| if st.button("Vérifier"): | |
| result = check_weak_passwords(pwd_check) | |
| results["password_check"] = {"result": result} | |
| st.write(result) | |
| # Onglet 5: Scan SSL/TLS | |
| with tabs[4]: | |
| st.header("Scan SSL/TLS") | |
| domain = st.text_input("Domaine", "example.com") | |
| if st.button("Scanner SSL"): | |
| result = ssl_scan(domain) | |
| results["ssl_scan"] = {"target": domain, "result": result} | |
| st.text_area("Résultat", result, height=300) | |
| # Onglet 6: Bruteforce Web | |
| with tabs[5]: | |
| st.header("Bruteforce Web") | |
| url_gobuster = st.text_input("URL", "http://example.com", key="gobuster_url") | |
| if st.button("Lancer Bruteforce"): | |
| if validate_url(url_gobuster): | |
| result = web_dir_bruteforce(url_gobuster) | |
| results["web_dir_bruteforce"] = {"target": url_gobuster, "result": result} | |
| st.text_area("Résultat", result, height=300) | |
| else: | |
| st.error("URL invalide") | |
| # Onglet 7: Scan Nikto | |
| with tabs[6]: | |
| st.header("Scan Nikto") | |
| url_nikto = st.text_input("URL", "http://example.com", key="nikto_url") | |
| if st.button("Lancer Nikto"): | |
| if validate_url(url_nikto): | |
| result = nikto_scan(url_nikto) | |
| results["nikto_scan"] = {"target": url_nikto, "result": result} | |
| st.text_area("Résultat", result, height=300) | |
| else: | |
| st.error("URL invalide") | |
| # Onglet 8: Headers HTTP | |
| with tabs[7]: | |
| st.header("Headers HTTP") | |
| url_headers = st.text_input("URL", "http://example.com", key="headers_url") | |
| if st.button("Analyser"): | |
| if validate_url(url_headers): | |
| result = analyze_http_headers(url_headers) | |
| results["http_headers_analysis"] = {"target": url_headers, "result": result} | |
| st.markdown(result) | |
| else: | |
| st.error("URL invalide") | |
| # Onglet 9: Infos Système | |
| with tabs[8]: | |
| st.header("Infos Système") | |
| if st.button("Afficher Infos"): | |
| sys_info = get_system_info() | |
| results["system_info"] = sys_info | |
| st.json(sys_info) | |
| # Onglet 10: Scan WordPress | |
| with tabs[9]: | |
| st.header("Scan WordPress") | |
| wp_url = st.text_input("URL du site WordPress", "http://example.com", key="wpscan_url") | |
| if st.button("Lancer WPScan"): | |
| if validate_url(wp_url): | |
| result = wpscan_run(wp_url) | |
| results["wpscan"] = {"target": wp_url, "result": result} | |
| st.text_area("Résultat WPScan", result, height=300) | |
| else: | |
| st.error("URL invalide") | |
| # Onglet 11: Ressources Système | |
| with tabs[10]: | |
| st.header("Ressources Système") | |
| if st.button("Montrer l'état des ressources"): | |
| usage = get_resource_usage() | |
| if isinstance(usage, str): | |
| # psutil n'est pas installé | |
| st.error(usage) | |
| else: | |
| results["resources"] = usage | |
| st.write(f"CPU usage: {usage['cpu_percent']} %") | |
| st.write(f"RAM usage: {usage['memory']['percent']} %") | |
| # Onglet 12: Guide | |
| with tabs[11]: | |
| st.header("Guide Utilisateur") | |
| st.markdown(""" | |
| ### Utilisation | |
| - **Scan Ports**: Analyse les ports ouverts avec Nmap. | |
| - **Test SSH**: Vérifie les connexions SSH. | |
| - **Audit Logiciels**: Liste les logiciels installés (Debian/Ubuntu) et tente de matcher des CVE (exemple simplifié). | |
| - **Mots de Passe**: Vérifie la robustesse d'un mot de passe. | |
| - **SSL/TLS**: Analyse la configuration SSL/TLS via sslscan. | |
| - **Bruteforce Web**: Recherche des répertoires web via Gobuster. | |
| - **Scan Nikto**: Test de vulnérabilités web via Nikto. | |
| - **Headers HTTP**: Vérification des en-têtes de sécurité. | |
| - **Infos Système**: Récupère les informations sur l'OS, la machine et le processeur. | |
| - **Scan WordPress**: Lance WPScan pour détecter des vulnérabilités sur un site WordPress. | |
| - **Ressources Système**: Affiche l'état CPU/RAM (via psutil) si installé. | |
| ### Fonctions Avancées | |
| - **Slack Alert**: Envoie une alerte sur un canal Slack si des vulnérabilités sont détectées (nécessite la Slack SDK). | |
| """) | |
| # Onglet 13: Logs | |
| with tabs[12]: | |
| st.header("Logs") | |
| level = st.selectbox("Niveau", list(LOG_LEVELS.keys()), index=1) | |
| LOG_LEVEL_FILTER = level | |
| st.text_area("Logs", "\n".join(LOG_RECORDS) or "Aucun log", height=400) | |
| # --- Sidebar pour Export --- | |
| with st.sidebar: | |
| st.header("Export") | |
| if st.button("Générer Rapport"): | |
| report = generate_report(results) | |
| st.download_button("Télécharger", report, "rapport.txt", "text/plain") | |
| if st.button("Exporter JSON"): | |
| json_data = json.dumps(results, indent=2) | |
| st.download_button("Télécharger JSON", json_data, "resultats.json", "application/json") | |
| # Exemple de bouton Slack en sidebar | |
| if HAS_SLACK: | |
| with st.sidebar: | |
| st.header("Slack") | |
| if st.button("Envoyer Slack: Hello !"): | |
| resp = slack_alert("Hello from VulnScanner!") | |
| st.write(resp) | |
| if __name__ == "__main__": | |
| logger.info("Application démarrée") | |