import os
import sqlite3
import tempfile
from datetime import datetime
import polars as pl
import streamlit as st
st.title("ShadowLog - Log File Analyzer")
st.write("Upload a log file to analyze with the following format :")
st.write(
"""
| Column name |
timestamp |
ipsrc |
ipdst |
protocole |
portsrc |
portdst |
rule |
action |
interface |
unknown |
fw |
| Format |
YYYY-MM-DD HH:MM:SS |
str |
str |
str |
int |
int |
int |
str |
str |
str |
int |
""",
unsafe_allow_html=True,
)
# Add checkbox for date filtering
apply_date_filter = st.checkbox(
"Apply date filtering (Nov 1, 2024 - Mar 1, 2025)", value=True
)
uploaded_file = st.file_uploader("Choose a log file")
if "parsed_df" not in st.session_state:
st.session_state.parsed_df = None
if uploaded_file is not None:
with st.spinner("Parsing and filtering the file..."):
try:
# Read the CSV
st.session_state.parsed_df = pl.read_csv(
uploaded_file,
separator=";",
has_header=False,
infer_schema_length=10000,
dtypes={
"timestamp": pl.Datetime,
"ipsrc": pl.Utf8,
"ipdst": pl.Utf8,
"protocole": pl.Utf8,
"portsrc": pl.Utf8,
"portdst": pl.Utf8,
"rule": pl.Utf8,
"action": pl.Utf8,
"interface": pl.Utf8,
"unknown": pl.Utf8,
"fw": pl.Int64,
},
).drop(["portsrc", "unknown", "fw"])
# Apply date filter only if checkbox is checked
if apply_date_filter:
st.session_state.parsed_df = st.session_state.parsed_df.filter(
(pl.col("timestamp") >= pl.datetime(2024, 11, 1))
& (pl.col("timestamp") < pl.datetime(2025, 3, 1))
)
row_count = st.session_state.parsed_df.height
if row_count == 0:
st.error(
"No data found in the file. Try uncheck the date filter option."
)
else:
st.success(
f"File parsed and filtered successfully! After filtering, {row_count:,} rows remain."
)
except Exception as e:
st.error(f"Error parsing the file: {e}")
if st.session_state.parsed_df is not None:
if st.button("Convert to SQLite"):
with st.spinner("Converting to SQLite..."):
# Create a temporary file for the SQLite database
temp_db_file = tempfile.NamedTemporaryFile(delete=False, suffix=".db")
temp_db_path = temp_db_file.name
temp_db_file.close()
# Connect to SQLite database
conn = sqlite3.connect(temp_db_path)
# Convert Polars DataFrame to Pandas for easy SQLite export
pandas_df = st.session_state.parsed_df.to_pandas()
# Write to SQLite
pandas_df.to_sql("logs", conn, if_exists="replace", index=False)
conn.close()
# Prepare file for download
with open(temp_db_path, "rb") as file:
db_contents = file.read()
# Create download button
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
st.download_button(
label="Download SQLite Database",
data=db_contents,
file_name=f"logs_{timestamp}.sqlite3",
mime="application/octet-stream",
)
# Clean up
os.unlink(temp_db_path)
st.success("SQLite conversion complete!")