tutosutiles's picture
Create app.py
6e37674 verified
import streamlit as st
import nmap
import paramiko
import subprocess
import requests
import json
import re
import os
import pandas as pd
from io import StringIO
import matplotlib.pyplot as plt
import logging
import plotly.express as px
from datetime import datetime
import traceback
import platform
# Nouvelle dépendance pour surveiller les ressources système
try:
import psutil
HAS_PSUTIL = True
except ImportError:
HAS_PSUTIL = False
# Si psutil n'est pas installé, on pourrait le signaler dans les logs
# Pour envoyer des alertes Slack (exemple simplifié)
try:
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
HAS_SLACK = True
except ImportError:
HAS_SLACK = False
# Même remarque : on log ou on ignore
# --- Configuration Initiale ---
st.set_page_config(
page_title="Analyse de Vulnérabilités",
layout="wide",
initial_sidebar_state="expanded"
)
# --- Configuration des Logs ---
LOG_RECORDS = []
LOG_LEVEL_FILTER = "INFO"
LOG_LEVELS = {"DEBUG": logging.DEBUG, "INFO": logging.INFO, "WARNING": logging.WARNING, "ERROR": logging.ERROR}
class ListHandler(logging.Handler):
def emit(self, record):
log_entry = self.format(record)
# Compare le niveau du log au niveau défini par LOG_LEVEL_FILTER
if record.levelno >= LOG_LEVELS[LOG_LEVEL_FILTER]:
LOG_RECORDS.append(log_entry)
logger = logging.getLogger("VulnScanner")
logger.setLevel(logging.DEBUG)
handler = ListHandler()
handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
logger.addHandler(handler)
# --- Constantes ---
WEAK_PASSWORDS_FILE = "weak_passwords.txt"
DEFAULT_WEAK_PASSWORDS = ["123456", "password", "admin", "azerty", "qwerty", "motdepasse"]
# Exemple très simplifié de mappage CVE
# Dans la réalité, on irait interroger une base NVD/CVE
EXAMPLE_CVE_DB = {
"apache2": {
"2.4.48": "CVE-2021-XYZ", # CVE fictif
"2.4.52": "CVE-2021-ABC"
},
"openssl": {
"1.1.1f": "CVE-2020-XYZ",
"3.0.2": "CVE-2022-XYZ"
}
}
SLACK_TOKEN = "xoxb-xxxxxxxxxxx-xxxxxxxxxxxxx-xxxxxxxxxxx" # À adapter
SLACK_CHANNEL = "#vulnerabilities" # À adapter
# --- Fonctions Utilitaires ---
@st.cache_data
def load_weak_passwords():
"""Charge la liste des mots de passe faibles depuis un fichier ou utilise une liste par défaut."""
try:
with open(WEAK_PASSWORDS_FILE, "r", encoding="utf-8") as f:
passwords = [line.strip() for line in f if line.strip()]
logger.info(f"Chargement de {len(passwords)} mots de passe faibles depuis {WEAK_PASSWORDS_FILE}")
return passwords
except FileNotFoundError:
logger.warning(f"Fichier {WEAK_PASSWORDS_FILE} non trouvé. Utilisation de la liste par défaut.")
return DEFAULT_WEAK_PASSWORDS
def validate_ip(ip):
"""Valide une adresse IP (x.x.x.x)."""
return bool(re.match(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$", ip))
def validate_url(url):
"""Valide une URL (http:// ou https://)."""
return bool(re.match(r"^(http://|https://).+$", url))
def send_notification(message, severity="INFO"):
"""Placeholder pour une intégration de notification future."""
logger.log(LOG_LEVELS[severity], f"Notification: {message}")
# Exemple d'envoi Slack
def slack_alert(message, token=SLACK_TOKEN, channel=SLACK_CHANNEL):
"""Envoie un message Slack (exemple très basique)."""
if not HAS_SLACK:
logger.warning("Slack SDK non installé. Alerte non envoyée.")
return "Slack SDK non installé."
client = WebClient(token=token)
try:
response = client.chat_postMessage(channel=channel, text=message)
return f"Slack Alert sent: {response['ts']}"
except SlackApiError as e:
logger.error(f"Erreur Slack: {e}")
return f"Erreur Slack: {str(e)}"
# --- Fonctions d'Analyse ---
def scan_ports(target_ip, ports="1-1024", args="-T4"):
"""Effectue un scan de ports avec Nmap."""
nm = nmap.PortScanner()
with st.spinner(f"Scanning {target_ip}..."):
try:
# Vérifie la validité du champ 'ports'
if not re.match(r"^(\d+(-\d+)?(?:,\d+(-\d+)?)*)$", ports):
raise ValueError("Format de ports invalide (ex: '22,80' ou '1-1024').")
nm.scan(target_ip, ports=ports, arguments=args)
logger.info(f"Scan terminé pour {target_ip} sur les ports {ports}")
hosts = nm.all_hosts()
open_ports = []
for host in hosts:
if nm[host].state() == "up":
tcp_ports = nm[host].get("tcp", {})
for p in tcp_ports:
if tcp_ports[p]["state"] == "open":
open_ports.append((host, p, tcp_ports[p]["name"]))
return nm.scaninfo(), hosts, nm.csv(), open_ports
except Exception as e:
logger.error(f"Erreur lors du scan de {target_ip}: {str(e)}")
send_notification(f"Échec du scan de {target_ip}: {str(e)}", "ERROR")
return None, None, None, None
def test_ssh(hostname, port, username, password):
"""Teste une connexion SSH."""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(hostname, port=port, username=username, password=password, timeout=5)
logger.info(f"Connexion SSH réussie à {username}@{hostname}:{port}")
return "✅ Connexion SSH réussie"
except Exception as e:
logger.warning(f"Échec SSH à {hostname}:{port} - {str(e)}")
return f"❌ Échec SSH: {str(e)}"
finally:
client.close()
def audit_software():
"""
Audit les logiciels installés (dpkg).
Retourne un DataFrame avec les colonnes suivantes :
[Status, Name, Version, Architecture, Description, CVE]
"""
try:
result = subprocess.run(["dpkg", "--list"], capture_output=True, text=True, check=True)
software_data = []
for line in result.stdout.splitlines():
if line.startswith("ii"):
parts = line.split()
if len(parts) >= 5:
name = parts[1].lower()
version = parts[2].lower()
# On cherche dans la base de CVE simplifiée
cve = EXAMPLE_CVE_DB.get(name, {}).get(version, "N/A")
software_data.append({
"Status": parts[0],
"Name": name,
"Version": version,
"Architecture": parts[3],
"Description": " ".join(parts[4:]),
"CVE": cve
})
logger.info(f"Audit terminé: {len(software_data)} logiciels trouvés")
return pd.DataFrame(software_data)
except Exception as e:
logger.error(f"Erreur lors de l'audit logiciel: {str(e)}")
send_notification(f"Échec de l'audit logiciel: {str(e)}", "ERROR")
return None
def check_weak_passwords(password):
"""Vérifie la robustesse d'un mot de passe (longueur, liste de mots de passe faibles)."""
weak_passwords = load_weak_passwords()
if password in weak_passwords:
logger.warning(f"Mot de passe faible détecté: {password}")
return "⚠️ Mot de passe faible (liste connue)"
if len(password) < 8:
logger.warning(f"Mot de passe trop court: {password}")
return "⚠️ Mot de passe trop court (< 8 caractères)"
logger.info("Mot de passe semble robuste")
return "✅ Mot de passe robuste (vérification basique)"
def ssl_scan(domain):
"""Effectue un scan SSL/TLS avec sslscan."""
try:
result = subprocess.run(["sslscan", domain], capture_output=True, text=True, check=True)
logger.info(f"Scan SSL/TLS terminé pour {domain}")
return result.stdout
except FileNotFoundError:
logger.error("sslscan non installé")
return "Erreur: sslscan requis mais non installé."
except Exception as e:
logger.error(f"Erreur SSL/TLS pour {domain}: {str(e)}")
return f"Erreur: {str(e)}"
def web_dir_bruteforce(url, wordlist="/usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt"):
"""Bruteforce les répertoires web avec Gobuster."""
try:
result = subprocess.run(["gobuster", "dir", "-u", url, "-w", wordlist], capture_output=True, text=True,
check=True)
logger.info(f"Bruteforce terminé pour {url}")
return result.stdout
except FileNotFoundError:
logger.error("gobuster ou wordlist introuvable")
return "Erreur: gobuster ou wordlist introuvable."
except Exception as e:
logger.error(f"Erreur bruteforce pour {url}: {str(e)}")
return f"Erreur: {str(e)}"
def nikto_scan(url):
"""Scanne une URL avec Nikto."""
try:
result = subprocess.run(["nikto", "-h", url], capture_output=True, text=True, check=True)
logger.info(f"Scan Nikto terminé pour {url}")
return result.stdout
except FileNotFoundError:
logger.error("Nikto non installé")
return "Erreur: Nikto requis mais non installé."
except Exception as e:
logger.error(f"Erreur Nikto pour {url}: {str(e)}")
return f"Erreur: {str(e)}"
def analyze_http_headers(url):
"""Analyse les headers HTTP d'une URL."""
try:
response = requests.get(url, timeout=10)
headers = response.headers
security_headers = {
"Strict-Transport-Security": "Force HTTPS",
"X-Frame-Options": "Protège contre le clickjacking",
"X-Content-Type-Options": "Empêche le MIME-sniffing",
"Referrer-Policy": "Contrôle les références",
"Permissions-Policy": "Limite les fonctionnalités",
"Content-Security-Policy": "Protège contre XSS"
}
result = "".join(
f"✅ **{h}**: Présent - {desc}\n" if h in headers else f"⚠️ **{h}**: Absent - {desc}\n"
for h, desc in security_headers.items()
)
result += "\n**Headers complets:**\n" + "\n".join(f"* {k}: {v}" for k, v in headers.items())
logger.info(f"Analyse des headers terminée pour {url}")
return result
except Exception as e:
logger.error(f"Erreur lors de l'analyse des headers pour {url}: {str(e)}")
return f"Erreur: {str(e)}"
def get_system_info():
"""
Récupère quelques informations système grâce à 'platform'.
Utile pour savoir sur quel OS/Architecture tourne le script.
"""
info = {
"OS": platform.system(),
"OS Version": platform.version(),
"Platform": platform.platform(),
"Machine": platform.machine(),
"Processor": platform.processor()
}
return info
def wpscan_run(url):
"""Lance WPScan sur un site WordPress."""
try:
# On passe des options d'exemple comme --random-user-agent et --disable-tls-checks
result = subprocess.run(
["wpscan", "--url", url, "--random-user-agent", "--disable-tls-checks"],
capture_output=True, text=True, check=True
)
logger.info(f"WPScan terminé pour {url}")
return result.stdout
except FileNotFoundError:
logger.error("WPScan non installé ou introuvable.")
return "Erreur: WPScan requis mais non installé."
except Exception as e:
logger.error(f"Erreur WPScan pour {url}: {str(e)}")
return f"Erreur: {str(e)}"
def get_resource_usage():
"""Retourne des infos CPU/RAM via psutil (si dispo)."""
if not HAS_PSUTIL:
return "psutil non installé. Veuillez l'installer pour monitorer les ressources."
usage = {
"cpu_percent": psutil.cpu_percent(interval=1),
"memory": psutil.virtual_memory()._asdict(), # RAM : total, used, free, ...
}
return usage
def generate_report(results):
"""Génère un rapport textuel des résultats."""
report = f"RAPPORT D'ANALYSE - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n{'=' * 50}\n"
for key, data in results.items():
report += f"\n--- {key.replace('_', ' ').title()} ---\n"
if key == "port_scan" and data.get("scan_info"):
report += f"Infos: {data['scan_info']}\nHôtes: {data['hosts']}\nPorts ouverts: {data['open_ports']}\n"
elif key == "ssh_test":
report += f"Résultat: {data}\n"
elif key == "software_audit" and data is not None:
# On profite de to_string() de pandas
report += f"Logiciels: {data.to_string(index=False)}\n"
elif key == "password_check":
report += f"Résultat: {data['result']} (masqué)\n"
elif key in ["ssl_scan", "web_dir_bruteforce", "nikto_scan", "http_headers_analysis", "wpscan"]:
report += f"Cible: {data['target']}\nRésultat: {data['result']}\n"
elif key == "system_info":
report += f"Informations Système:\n{json.dumps(data, indent=2)}\n"
elif key == "resources":
report += f"Usage CPU: {data['cpu_percent']}%\n"
if isinstance(data['memory'], dict):
report += f"Usage RAM: {data['memory']['percent']}%\n"
return report
# --- Interface Streamlit ---
st.title("🛡️ Scanner de Vulnérabilités")
results = {}
tabs = st.tabs([
"Scan Ports", "Test SSH", "Audit Logiciels", "Mots de Passe", "SSL/TLS",
"Bruteforce Web", "Scan Nikto", "Headers HTTP", "Infos Système",
"Scan WordPress", "Ressources Système", "Guide", "Logs"
])
# Onglet 1: Scan des Ports
with tabs[0]:
st.header("Scan des Ports")
col1, col2 = st.columns(2)
with col1:
ip = st.text_input("Adresse IP", "127.0.0.1")
with col2:
ports = st.text_input("Ports (ex: 1-1024)", "1-1024")
args = st.selectbox("Options Nmap", ["-T4", "-sS", "-sT", "-sU", "-sV -O"])
if st.button("Scanner"):
if not validate_ip(ip):
st.error("IP invalide")
else:
scan_info, hosts, csv_results, open_ports = scan_ports(ip, ports, args)
if scan_info:
results["port_scan"] = {
"scan_info": scan_info,
"hosts": hosts,
"csv_results": csv_results,
"open_ports": open_ports
}
st.write("Hôtes:", hosts)
if open_ports:
df = pd.DataFrame(open_ports, columns=["IP", "Port", "Service"])
st.dataframe(df)
fig = px.bar(df, x="Port", color="Service", title="Ports Ouverts")
st.plotly_chart(fig)
else:
st.info("Aucun port ouvert détecté")
# Onglet 2: Test SSH
with tabs[1]:
st.header("Test SSH")
col1, col2 = st.columns(2)
with col1:
host = st.text_input("Hôte", "127.0.0.1")
port = st.number_input("Port", 1, 65535, 22)
with col2:
user = st.text_input("Utilisateur", key="ssh_user")
pwd_ssh = st.text_input("Mot de passe", type="password", key="ssh_password")
if st.button("Tester SSH"):
result = test_ssh(host, port, user, pwd_ssh)
results["ssh_test"] = result
st.write(result)
# Onglet 3: Audit Logiciels
with tabs[2]:
st.header("Audit Logiciels")
if st.button("Lancer Audit"):
df = audit_software()
if df is not None:
results["software_audit"] = df
st.dataframe(df)
fig = px.pie(df, names="Architecture", title="Répartition par Architecture")
st.plotly_chart(fig)
# On affiche un avertissement si l'on détecte une CVE
cves_detectees = df[df["CVE"] != "N/A"]
if not cves_detectees.empty:
st.warning("Certaines CVE potentiellement détectées !")
# Optionnellement, on peut déclencher un Slack
if st.button("Notifier Slack (CVE)"):
alert_text = f"Des vulnérabilités ont été détectées: {cves_detectees['CVE'].unique().tolist()}"
resp = slack_alert(alert_text)
st.write(resp)
# Onglet 4: Vérification Mots de Passe
with tabs[3]:
st.header("Vérification Mots de Passe")
pwd_check = st.text_input("Mot de passe", type="password", key="check_password")
if st.button("Vérifier"):
result = check_weak_passwords(pwd_check)
results["password_check"] = {"result": result}
st.write(result)
# Onglet 5: Scan SSL/TLS
with tabs[4]:
st.header("Scan SSL/TLS")
domain = st.text_input("Domaine", "example.com")
if st.button("Scanner SSL"):
result = ssl_scan(domain)
results["ssl_scan"] = {"target": domain, "result": result}
st.text_area("Résultat", result, height=300)
# Onglet 6: Bruteforce Web
with tabs[5]:
st.header("Bruteforce Web")
url_gobuster = st.text_input("URL", "http://example.com", key="gobuster_url")
if st.button("Lancer Bruteforce"):
if validate_url(url_gobuster):
result = web_dir_bruteforce(url_gobuster)
results["web_dir_bruteforce"] = {"target": url_gobuster, "result": result}
st.text_area("Résultat", result, height=300)
else:
st.error("URL invalide")
# Onglet 7: Scan Nikto
with tabs[6]:
st.header("Scan Nikto")
url_nikto = st.text_input("URL", "http://example.com", key="nikto_url")
if st.button("Lancer Nikto"):
if validate_url(url_nikto):
result = nikto_scan(url_nikto)
results["nikto_scan"] = {"target": url_nikto, "result": result}
st.text_area("Résultat", result, height=300)
else:
st.error("URL invalide")
# Onglet 8: Headers HTTP
with tabs[7]:
st.header("Headers HTTP")
url_headers = st.text_input("URL", "http://example.com", key="headers_url")
if st.button("Analyser"):
if validate_url(url_headers):
result = analyze_http_headers(url_headers)
results["http_headers_analysis"] = {"target": url_headers, "result": result}
st.markdown(result)
else:
st.error("URL invalide")
# Onglet 9: Infos Système
with tabs[8]:
st.header("Infos Système")
if st.button("Afficher Infos"):
sys_info = get_system_info()
results["system_info"] = sys_info
st.json(sys_info)
# Onglet 10: Scan WordPress
with tabs[9]:
st.header("Scan WordPress")
wp_url = st.text_input("URL du site WordPress", "http://example.com", key="wpscan_url")
if st.button("Lancer WPScan"):
if validate_url(wp_url):
result = wpscan_run(wp_url)
results["wpscan"] = {"target": wp_url, "result": result}
st.text_area("Résultat WPScan", result, height=300)
else:
st.error("URL invalide")
# Onglet 11: Ressources Système
with tabs[10]:
st.header("Ressources Système")
if st.button("Montrer l'état des ressources"):
usage = get_resource_usage()
if isinstance(usage, str):
# psutil n'est pas installé
st.error(usage)
else:
results["resources"] = usage
st.write(f"CPU usage: {usage['cpu_percent']} %")
st.write(f"RAM usage: {usage['memory']['percent']} %")
# Onglet 12: Guide
with tabs[11]:
st.header("Guide Utilisateur")
st.markdown("""
### Utilisation
- **Scan Ports**: Analyse les ports ouverts avec Nmap.
- **Test SSH**: Vérifie les connexions SSH.
- **Audit Logiciels**: Liste les logiciels installés (Debian/Ubuntu) et tente de matcher des CVE (exemple simplifié).
- **Mots de Passe**: Vérifie la robustesse d'un mot de passe.
- **SSL/TLS**: Analyse la configuration SSL/TLS via sslscan.
- **Bruteforce Web**: Recherche des répertoires web via Gobuster.
- **Scan Nikto**: Test de vulnérabilités web via Nikto.
- **Headers HTTP**: Vérification des en-têtes de sécurité.
- **Infos Système**: Récupère les informations sur l'OS, la machine et le processeur.
- **Scan WordPress**: Lance WPScan pour détecter des vulnérabilités sur un site WordPress.
- **Ressources Système**: Affiche l'état CPU/RAM (via psutil) si installé.
### Fonctions Avancées
- **Slack Alert**: Envoie une alerte sur un canal Slack si des vulnérabilités sont détectées (nécessite la Slack SDK).
""")
# Onglet 13: Logs
with tabs[12]:
st.header("Logs")
level = st.selectbox("Niveau", list(LOG_LEVELS.keys()), index=1)
LOG_LEVEL_FILTER = level
st.text_area("Logs", "\n".join(LOG_RECORDS) or "Aucun log", height=400)
# --- Sidebar pour Export ---
with st.sidebar:
st.header("Export")
if st.button("Générer Rapport"):
report = generate_report(results)
st.download_button("Télécharger", report, "rapport.txt", "text/plain")
if st.button("Exporter JSON"):
json_data = json.dumps(results, indent=2)
st.download_button("Télécharger JSON", json_data, "resultats.json", "application/json")
# Exemple de bouton Slack en sidebar
if HAS_SLACK:
with st.sidebar:
st.header("Slack")
if st.button("Envoyer Slack: Hello !"):
resp = slack_alert("Hello from VulnScanner!")
st.write(resp)
if __name__ == "__main__":
logger.info("Application démarrée")