shadowlog / sections /analyze.py
Cyr-CK's picture
Changed the clustering approach
e76e281
import datetime
import ipaddress
import pandas as pd
import plotly.express as px
import plotly.graph_objs as go
import polars as pl
import streamlit as st
if "parsed_df" not in st.session_state:
st.session_state.parsed_df = None
# Page title
st.title("Data Analysis")
# Vérifier que les données sont chargées
if st.session_state.parsed_df is None:
st.info("Please upload a log file on the 'Upload' page.")
st.stop()
data = st.session_state.parsed_df
university_subnets = [
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("10.79.0.0/16"),
ipaddress.ip_network("159.84.0.0/16"),
]
# Fonction pour vérifier si une IP appartient aux sous-réseaux universitaires
def is_university_ip(ip):
try:
ip_obj = ipaddress.ip_address(ip)
return any(ip_obj in subnet for subnet in university_subnets)
except ValueError:
return False
# Créer les onglets principaux
tab1, tab2, tab3, tab4, tab5 = st.tabs(
["Explore data", "Dataviz", "Analysis", "Foreign IP addresses", "Sankey"]
)
# Onglet Analysis
with tab1:
st.subheader("Explore data")
# Vérifier que la colonne timestamp existe et est bien de type datetime
if "timestamp" in data.columns and data["timestamp"].dtype == pl.Datetime:
# Obtenir les valeurs min et max des dates
min_date = data["timestamp"].min().date()
max_date = data["timestamp"].max().date()
# Convertir les dates min et max en datetime pour inclure heures et minutes
min_date_time = datetime.datetime.combine(min_date, datetime.time(0, 0))
max_date_time = datetime.datetime.combine(max_date, datetime.time(23, 59))
selected_date_range = st.slider(
"Filter by date & time",
min_value=min_date_time,
max_value=max_date_time,
value=(min_date_time, max_date_time),
format="YYYY-MM-DD HH:mm",
)
start_date, end_date = selected_date_range
# Disposition des filtres en colonnes
col1, col2, col3 = st.columns(3)
# ---- FILTRE DATE ----
with col1:
st.markdown("### 🛠 Protocol")
if "protocole" in data.columns:
unique_protocols = sorted(
data["protocole"].unique().cast(pl.Utf8).to_list()
)
selected_protocol = st.selectbox(
"Select a protocol", ["All"] + unique_protocols
)
else:
selected_protocol = "All"
st.warning("Column 'protocole' not found.")
# ---- FILTRE action----
with col2:
st.markdown("### 🔄 Action")
if "action" in data.columns:
unique_action = sorted(
data["action"].unique().cast(pl.Utf8).to_list()
) # S'assurer du bon format
selected_action = st.selectbox(
"Select an action", ["All"] + unique_action
)
else:
selected_action = "All"
st.warning("Column 'action' not found.")
# ---- FILTRE portdst ----
with col3:
st.markdown("### 🔢 Port")
if "portdst" in data.columns:
min_port, max_port = (
int(data["portdst"].cast(pl.Utf8).min()),
int(data["portdst"].cast(pl.Utf8).max()),
)
# Initialize port range in session state if not present
if "port_range" not in st.session_state:
st.session_state.port_range = (min_port, max_port)
# Quick port range selection buttons
col_ports1, col_ports2, col_ports3 = st.columns(3)
# Define button handlers to update session state
def set_well_known():
st.session_state.port_range = (0, 1023)
def set_registered():
st.session_state.port_range = (1024, 49151)
def set_dynamic():
st.session_state.port_range = (49152, 65535)
with col_ports1:
st.button("Well-known (0-1023)", on_click=set_well_known)
with col_ports2:
st.button("Registered (1024-49151)", on_click=set_registered)
with col_ports3:
st.button("Dynamic (49152-65535)", on_click=set_dynamic)
# Custom range slider that uses and updates the session state
selected_port = st.slider(
"Custom port range",
min_port,
max_port,
value=st.session_state.port_range,
key="port_slider",
)
# Update port_range when slider changes
st.session_state.port_range = selected_port
else:
min_port, max_port = 0, 65535 # Standard TCP/IP port range
selected_port = (min_port, max_port)
st.warning("Column 'portdst' not found, default values applied.")
# Vérification des dates sélectionnées
if start_date > end_date:
st.error("The start date cannot be later than the end date.")
else:
# Conversion des dates en datetime
start_datetime = pl.datetime(
start_date.year, start_date.month, start_date.day
)
end_datetime = pl.datetime(
end_date.year, end_date.month, end_date.day, 23, 59, 59
)
# ---- APPLICATION DES FILTRES ----
filtered_data = data.filter(
(pl.col("timestamp") >= start_datetime)
& (pl.col("timestamp") <= end_datetime)
)
# Correction du filtrage par action(forcer conversion Utf8)
if "action" in data.columns and selected_action != "All":
filtered_data = filtered_data.filter(
pl.col("action").cast(pl.Utf8) == selected_action
)
# Filtrer par portdst en prenant en compte min/max
if "portdst" in data.columns:
filtered_data = filtered_data.filter(
(pl.col("portdst").cast(pl.Int64) >= selected_port[0])
& (pl.col("portdst").cast(pl.Int64) <= selected_port[1])
)
# Affichage des données filtrées
st.write(f"### 🔍 Data filtered : {filtered_data.shape[0]} entries")
st.dataframe(filtered_data, use_container_width=True)
else:
st.warning(
"The 'timestamp' column does not exist or is not in datetime format."
)
# Onglet Analysis
with tab2:
st.subheader("Dataviz")
# Créer ici un scatter plot permettant une Visualisation interactive des données (IP source avec le nombre
# d’occurrences de destination contactées, incluant le nombre de flux rejetés et autorisés).
# Agréger les données par IP source
df_agg = data.group_by("ipsrc").agg(
[
pl.col("ipdst").n_unique().alias("distinct_destinations"),
((pl.col("action") == "PERMIT").cast(pl.Int64)).sum().alias("count_permit"),
((pl.col("action") == "DENY").cast(pl.Int64)).sum().alias("count_deny"),
]
)
# Créer un scatter plot
if not df_agg.is_empty():
# We need to recreate the aggregation to count distinct destinations per action type
permit_agg = (
data.filter(pl.col("action") == "PERMIT")
.group_by("ipsrc")
.agg(
[
pl.col("ipdst").n_unique().alias("distinct_destinations"),
pl.count("ipsrc").alias("connections"),
]
)
.with_columns(pl.lit("PERMIT").alias("action"))
)
deny_agg = (
data.filter(pl.col("action") == "DENY")
.group_by("ipsrc")
.agg(
[
pl.col("ipdst").n_unique().alias("distinct_destinations"),
pl.count("ipsrc").alias("connections"),
]
)
.with_columns(pl.lit("DENY").alias("action"))
)
# Combine both datasets
combined_df = pl.concat([permit_agg, deny_agg])
# Convert to pandas
df_pandas = combined_df.to_pandas()
# Create the scatter plot with two points per IP source (one for PERMIT, one for DENY)
fig = px.scatter(
df_pandas,
x="ipsrc",
y="distinct_destinations",
color="action",
size="connections",
color_discrete_map={"PERMIT": "blue", "DENY": "red"},
hover_data=["connections", "action"],
title="Number of Distinct Destinations Contacted by Each IP Source",
labels={
"ipsrc": "Source IP Address",
"distinct_destinations": "Number of Distinct Destinations",
"connections": "Number of Connections",
"action": "Action",
},
)
# Improve layout for better readability
fig.update_layout(
xaxis={"categoryorder": "total descending"}, legend_title="Action Type"
)
st.plotly_chart(fig, use_container_width=True)
else:
st.info("No data available for scatter plot.")
# Onglet Analysis
with tab3:
st.subheader("Analysis")
# Afficher ici le top 10 des ports inférieurs à 1024 avec accès autorisé
st.write(
"### 🔢 Top 10 ports with authorized access"
" (portdst < 1024 and action == 'PERMIT')"
)
top_ports = (
data.filter((pl.col("portdst").cast(pl.Int64) < 1024) & (pl.col("action") == "PERMIT"))
.group_by("portdst")
.agg(pl.count("portdst").alias("count"))
.sort("count", descending=True)
.head(10)
)
st.dataframe(top_ports, use_container_width=True)
# Afficher ici le top 5 des IP sources les plus émettrices
st.write("### 🌐 Top 5 emitting IP addresses (ipsource and action == 'PERMIT')")
top_ips = (
data.filter(pl.col("action") == "PERMIT")
.group_by("ipsrc")
.agg(pl.count("ipsrc").alias("count"))
.sort("count", descending=True)
.head(5)
)
st.dataframe(top_ips, use_container_width=True)
# Graphique
st.write("### 🔴 Analysis of Blocked Attempts")
if "ipsrc" in data.columns and "action" in data.columns:
# Filtrer uniquement les tentatives bloquées
blocked_attempts = data.filter(pl.col("action") == "DENY")
# Compter les occurrences des IP sources bloquées
blocked_ips = (
blocked_attempts.group_by("ipsrc")
.agg(pl.count("ipsrc").alias("count"))
.sort("count", descending=True)
)
top_n = st.slider(" ", 5, 20, 10, key="top_n_slider")
# Sélectionner le Top N des IP bloquées
top_blocked_ips = blocked_ips.head(top_n)
# ---- GRAPHIQUE AVEC PLOTLY ----
color_palette = px.colors.sequential.Blues
if not top_blocked_ips.is_empty():
fig = px.bar(
top_blocked_ips.to_pandas(), # Convertir en DataFrame Pandas pour Plotly
x="count",
y="ipsrc",
orientation="h",
text="count",
title=f"Top {top_n} Most Blocked IPs",
labels={"ipsrc": "IP Source", "count": "Number of Blocked Attempts"},
color_discrete_sequence=["#3d85c6"],
)
# Amélioration du layout
fig.update_traces(texttemplate="%{text}", textposition="inside")
fig.update_layout(yaxis=dict(categoryorder="total ascending"))
# Afficher le graphique interactif
st.plotly_chart(fig, use_container_width=True)
else:
st.info("No blocked attempts found.")
else:
st.warning("Columns 'ipsrc' or 'action' not found.")
# Graphique de série temporelle des connexions par heure
st.write("### 📊 Connection Activity Analysis")
if "timestamp" in data.columns:
# 📌 Ajout d'un sélecteur de fréquence
frequency = st.selectbox(
"Select frequency", ["second", "minute", "hour", "day"], index=1
)
# Définition des formats selon la fréquence choisie
if frequency == "second":
time_format = "%Y-%m-%d %H:%M:%S"
time_label = "Second"
elif frequency == "minute":
time_format = "%Y-%m-%d %H:%M:00"
time_label = "Minute"
elif frequency == "hour":
time_format = "%Y-%m-%d %H:00:00"
time_label = "Hour"
else:
time_format = "%Y-%m-%d"
time_label = "Day"
# Filtrage et regroupement
activity_data = (
data.filter(pl.col("action") == "PERMIT")
.with_columns(
pl.col("timestamp").dt.strftime(time_format).alias("time_period")
)
.group_by("time_period")
.agg(pl.count("time_period").alias("connection_count"))
.sort("time_period")
)
# Vérifier s'il y a des données
if not activity_data.is_empty():
# Convertir en Pandas
df_activity = activity_data.to_pandas()
df_activity["time_period"] = pd.to_datetime(df_activity["time_period"])
# Tracer le graphique
fig = px.line(
df_activity,
x="time_period",
y="connection_count",
markers=True,
title=f"Connection Activity ({time_label} level)",
labels={
"time_period": time_label,
"connection_count": "Number of Connections",
},
line_shape="spline",
)
# Afficher le graphique
st.plotly_chart(fig, use_container_width=True)
else:
st.info("No connection data found for the selected period.")
else:
st.warning("Column 'timestamp' not found.")
# Onglet Foreign IP addresses
with tab4:
st.subheader("🚫 List of access outside the university network")
if "ipsrc" in data.columns and "action" in data.columns:
# Conversion des IPs en chaînes de caractères pour éviter les erreurs de type
data = data.with_columns(
[
pl.col("ipsrc").cast(pl.Utf8).alias("ipsrc"),
pl.col("action").cast(pl.Utf8).alias("action"),
]
)
# Vérification des IPs avec la fonction is_university_ip
data = data.with_columns(
[
pl.col("ipsrc")
.map_elements(is_university_ip, return_dtype=pl.Boolean)
.alias("is_src_university_ip")
]
)
# filtrer toutes les connexions impliquant une adresse externe
intrusion_attempts = data.filter((~pl.col("is_src_university_ip")))
# Ajout d'un filtre par action
selected_action = st.selectbox("Select action type", ["All", "PERMIT", "DENY"])
if selected_action != "All":
intrusion_attempts = intrusion_attempts.filter(
pl.col("action") == selected_action
)
# Affichage des accès externes
st.write(f"### 🔍 External accesses: {intrusion_attempts.shape[0]} entries")
st.dataframe(
intrusion_attempts.drop(["is_src_university_ip"]), use_container_width=True
)
else:
st.warning("Columns 'ipsrc' not found.")
# Onglet Sankey
with tab5:
st.subheader("Sankey Diagram")
def create_sankey(df, source_col, target_col):
"""Crée un diagramme de Sankey entre deux colonnes"""
df_grouped = df.group_by([source_col, target_col]).len().to_pandas()
# Création des nœuds
labels = list(
pd.concat([df_grouped[source_col], df_grouped[target_col]]).unique()
)
label_to_index = {label: i for i, label in enumerate(labels)}
# Création des liens
sources = df_grouped[source_col].map(label_to_index)
targets = df_grouped[target_col].map(label_to_index)
values = df_grouped["len"]
# Création du Sankey Diagram
fig = go.Figure(
go.Sankey(
node=dict(
pad=15,
thickness=20,
line=dict(color="black", width=0.5),
label=labels,
),
link=dict(source=sources, target=targets, value=values),
)
)
fig.update_layout(
title_text=f"Flow between {source_col} and {target_col}", font_size=10
)
st.plotly_chart(fig, use_container_width=True)
st.subheader("Connections where access were identified as : PERMIT")
data_filtered = data.filter(pl.col("action") == "PERMIT")
# 🔹 Sankey entre IP source et IP destination
create_sankey(data_filtered, "ipsrc", "ipdst")
# 🔹 Sankey entre IP source et port destination
df = data_filtered.with_columns(
data_filtered["portdst"]
)
create_sankey(df, "ipsrc", "portdst")
st.subheader("Connections where access were identified as : DENY")
data_filtered = data.filter(pl.col("action") == "DENY")
# 🔹 Sankey entre IP source et IP destination
create_sankey(data_filtered, "ipsrc", "ipdst")
# 🔹 Sankey entre IP source et port destination
df = data_filtered.with_columns(
data_filtered["portdst"]
)
create_sankey(df, "ipsrc", "portdst")