import altair as alt
import pandas as pd
import streamlit as st
st.title("📊 Alerts and Anomalies")
if "parsed_df" not in st.session_state or st.session_state.parsed_df is None:
st.warning(
"No log data is available. Please first upload and parse a log file in the Upload section."
)
else:
df = st.session_state.parsed_df
error_patterns = [
"error",
"critical",
"fatal",
"fail",
"exception",
"crash",
"timeout",
]
possible_level_cols = [
"level",
"severity",
"log_level",
"type",
"status",
"content",
"message",
]
# Function to identify errors by keywords
def detect_errors(dataframe, cols_to_search=None):
if cols_to_search is None:
# Search in all textual columns
cols_to_search = dataframe.select_dtypes(include=["object"]).columns
# Create a mask for rows containing errors
error_mask = pd.Series(False, index=dataframe.index)
for col in cols_to_search:
if col in dataframe.columns: # Make sure the column exists
col_mask = (
dataframe[col]
.astype(str)
.str.contains("|".join(error_patterns), case=False, na=False)
)
error_mask = error_mask | col_mask
# Return only the rows with errors
return dataframe[error_mask].copy()
# Display overall statistics
st.subheader("Overview of logs")
col1, col2, col3 = st.columns(3)
# Initialize error_df as an empty DataFrame
error_df = pd.DataFrame()
with col1:
total_entries = len(df)
st.metric("Total number of entries", total_entries)
with col2:
# Check if the 'level' column exists, otherwise look for a similar column
level_cols = None
level_cols = [
col
for col in df.columns
if any(
possible_col.lower() == col.lower()
for possible_col in possible_level_cols
)
]
if level_cols:
# Create a boolean mask for rows containing errors in any relevant column
error_df = detect_errors(df, level_cols)
error_count = len(error_df)
error_percent = (
(error_count / total_entries) * 100 if total_entries > 0 else 0
)
st.metric("Error entries", f"{error_count} ({error_percent:.1f}%)")
else:
st.metric("Error entries", "Not detectable")
with col3:
# Search for a datetime type column
timestamp_col = None
# First, look for columns that are already of datetime type
datetime_cols = [
col for col in df.columns if pd.api.types.is_datetime64_any_dtype(df[col])
]
if datetime_cols:
timestamp_col = datetime_cols[0]
else:
# If no datetime column is found, try to find by name
possible_ts_cols = ["timestamp", "date", "time", "datetime"]
for col in possible_ts_cols:
if col in df.columns:
timestamp_col = col
break
if timestamp_col:
time_range = f"{df[timestamp_col].min()} to {df[timestamp_col].max()}"
st.markdown(
f"**Time range**
{time_range}",
unsafe_allow_html=True,
)
else:
st.metric("Time range", "Not detectable")
# Detection of critical errors
st.subheader("Detected critical errors")
if not error_df.empty:
st.write(f"**{len(error_df)} critical errors detected**")
st.dataframe(error_df)
# Extraction of the most common error types
if len(error_df) > 5:
st.subheader("Frequent error types")
error_types = {}
# Browse textual columns to extract error patterns
for col in error_df.select_dtypes(include=["object"]).columns:
for pattern in ["error", "exception", "fail"]:
pattern_errors = error_df[
error_df[col].str.contains(pattern, case=False, na=False)
]
if not pattern_errors.empty:
# Extract error context (words after the pattern)
for _, row in pattern_errors.iterrows():
text = str(row[col])
if pattern.lower() in text.lower():
idx = text.lower().find(pattern.lower())
context = (
text[idx : idx + 50].strip()
if idx + 50 < len(text)
else text[idx:].strip()
)
if context not in error_types:
error_types[context] = 0
error_types[context] += 1
# Display the most frequent error types
sorted_errors = sorted(
error_types.items(), key=lambda x: x[1], reverse=True
)[:10]
error_types_df = pd.DataFrame(
sorted_errors, columns=["Error type", "Occurrences"]
)
st.dataframe(error_types_df)
# Visualization of errors
if timestamp_col:
st.subheader("Temporal distribution of errors")
# Convert to datetime if necessary
if not pd.api.types.is_datetime64_any_dtype(error_df[timestamp_col]):
try:
error_df[timestamp_col] = pd.to_datetime(
error_df[timestamp_col]
)
except:
pass
if pd.api.types.is_datetime64_any_dtype(error_df[timestamp_col]):
# Group by time period
error_count = (
error_df.groupby(pd.Grouper(key=timestamp_col, freq="1h"))
.size()
.reset_index()
)
error_count.columns = [timestamp_col, "count"]
# Create the chart with plotly
import plotly.express as px
fig = px.line(
error_count, x=timestamp_col, y="count", title="Errors per hour"
)
fig.update_layout(
xaxis_title="Time", yaxis_title="Number of errors", height=300
)
st.plotly_chart(fig, use_container_width=True)
else:
st.success("No critical errors detected in the logs.")
# Detection of anomalies
st.subheader("Anomaly detection")
# Temporal analysis if possible
if timestamp_col is not None and (
pd.api.types.is_datetime64_any_dtype(df[timestamp_col])
or pd.api.types.is_datetime64_any_dtype(
pd.to_datetime(df[timestamp_col], errors="coerce")
)
):
try:
# Convert to datetime if necessary
if not pd.api.types.is_datetime64_any_dtype(df[timestamp_col]):
df[timestamp_col] = pd.to_datetime(df[timestamp_col], errors="coerce")
# Group by time period
time_df = (
df.groupby(pd.Grouper(key=timestamp_col, freq="5Min"))
.size()
.reset_index()
)
time_df.columns = [timestamp_col, "count"]
# Calculate moving average and limits
time_df["moving_avg"] = (
time_df["count"].rolling(window=5, min_periods=1).mean()
)
time_df["std"] = (
time_df["count"].rolling(window=5, min_periods=1).std().fillna(0)
)
time_df["upper_bound"] = time_df["moving_avg"] + 2 * time_df["std"]
time_df["lower_bound"] = (time_df["moving_avg"] - 2 * time_df["std"]).clip(
lower=0
)
# Detection of peaks
time_df["is_anomaly"] = (time_df["count"] > time_df["upper_bound"]) | (
time_df["count"] < time_df["lower_bound"]
)
# Visualization
anomaly_points = time_df[time_df["is_anomaly"]]
if not anomaly_points.empty:
st.write(
f"**{len(anomaly_points)} periods with abnormal activity detected**"
)
# Create the chart
base = alt.Chart(time_df).encode(
x=alt.X(f"{timestamp_col}:T", title="Time")
)
line = base.mark_line().encode(
y=alt.Y("count:Q", title="Number of log entries")
)
bands = base.mark_area(opacity=0.2).encode(
y="lower_bound:Q",
y2="upper_bound:Q",
tooltip=[
f"{timestamp_col}:T",
"count:Q",
"moving_avg:Q",
"lower_bound:Q",
"upper_bound:Q",
],
)
points = (
base.mark_circle(size=100, color="red")
.encode(y="count:Q")
.transform_filter(alt.datum.is_anomaly == True)
)
chart = (line + bands + points).properties(
width=700,
height=300,
title="Anomaly detection (unusual activity)",
)
st.altair_chart(chart, use_container_width=True)
# Table of anomalies
st.write("Periods with abnormal activity:")
anomaly_df = anomaly_points[
[timestamp_col, "count", "moving_avg", "upper_bound", "lower_bound"]
]
anomaly_df.columns = [
"Period",
"Number of entries",
"Moving average",
"Upper limit",
"Lower limit",
]
st.dataframe(anomaly_df)
else:
st.success("No temporal anomalies detected.")
except Exception as e:
st.error(f"Unable to analyze the temporal distribution of logs: {e}")
# Detection of suspicious event sequences
if timestamp_col and level_cols:
st.subheader("Unusual event sequences")
try:
# Search for consecutive error sequences
df_sorted = df.sort_values(by=timestamp_col)
consecutive_errors = []
current_sequence = []
for i, row in df_sorted.iterrows():
# Check if any of the columns contain error levels
is_error = False
for col in level_cols:
if str(row[col]).upper() in ["ERROR", "CRITICAL", "FATAL"]:
is_error = True
break
if is_error:
current_sequence.append(i)
else:
if len(current_sequence) >= 3: # At least 3 consecutive errors
consecutive_errors.append(current_sequence)
current_sequence = []
if len(current_sequence) >= 3: # Don't forget the last sequence
consecutive_errors.append(current_sequence)
if consecutive_errors:
st.write(
f"**{len(consecutive_errors)} sequences of 3+ consecutive errors detected**"
)
# For each sequence, display the relevant entries
for i, sequence in enumerate(
consecutive_errors[:5]
): # Limit to 5 sequences for clarity
with st.expander(
f"Sequence {i + 1}: {len(sequence)} consecutive errors"
):
st.dataframe(df.loc[sequence])
else:
st.success("No sequences of consecutive errors detected.")
except Exception as e:
st.error(f"Unable to analyze event sequences: {e}")
# Recommendations
st.subheader("Recommendations")
if not error_df.empty:
st.warning(
"⚠️ Critical errors have been detected. Review the entries in red for more details."
)
if "error_types" in locals() and error_types:
top_error = sorted_errors[0][0]
st.info(
f"💡 The most frequent error is '{top_error}'. Focus your analysis on this type of error."
)
if "anomaly_points" in locals() and not anomaly_points.empty:
peak_time = anomaly_points.iloc[anomaly_points["count"].idxmax()][timestamp_col]
st.warning(
f"⚠️ A significant activity peak was detected around {peak_time}. Review this period."
)
if "consecutive_errors" in locals() and consecutive_errors:
st.warning(
"⚠️ Sequences of consecutive errors have been detected, which may indicate systemic issues."
)
if error_df.empty and ("anomaly_points" not in locals() or anomaly_points.empty):
st.success("✅ No major issues detected in the analyzed logs.")