LogAI-Engine / HF /error_analysis.py
NOT-OMEGA's picture
Upload 10 files
4561114 verified
raw
history blame
11.5 kB
"""
error_analysis.py β€” Deep Dive into Unclassified / Misclassified Logs
This script addresses the 76 unclassified logs from the 20k run.
It answers:
1. What do these logs look like? (print + group)
2. Why did the model fail? (pattern analysis)
3. What should we do? (actionable fix suggestions)
Google interview talking point:
"I performed structured error analysis on my model's failure cases.
I grouped them by failure type β€” vocabulary mismatch, ambiguous intent,
formatting noise β€” and used that to drive targeted improvements."
Usage:
python error_analysis.py --input output.csv # post-classify CSV
python error_analysis.py --simulate # demo with synthetic data
"""
from __future__ import annotations
import argparse
import re
import sys
from collections import Counter, defaultdict
from typing import Optional
import pandas as pd
# ── Failure mode taxonomy ────────────────────────────────────────────────────
class FailureMode:
RARE_VOCAB = "rare_vocabulary" # domain-specific terms not in training
AMBIGUOUS = "ambiguous_intent" # log could match multiple categories
LEGACY_FORMAT = "legacy_format" # non-standard / old-school formatting
TRUNCATED = "truncated_or_noisy" # partial / malformed log line
NUMERIC_ONLY = "mostly_numeric" # ID/code-heavy, no semantic signal
MULTI_EVENT = "multi_event" # one line, multiple events
UNKNOWN = "unknown"
def _detect_failure_mode(log: str) -> str:
"""Heuristic: guess WHY this log was unclassified."""
log_l = log.lower()
if len(log) < 20:
return FailureMode.TRUNCATED
# Check ratio of digits to total chars
digit_ratio = sum(c.isdigit() for c in log) / max(len(log), 1)
if digit_ratio > 0.40:
return FailureMode.NUMERIC_ONLY
# Looks like it has 2+ events joined
if log.count(";") >= 2 or log.count(" AND ") >= 1 or log.count(" | ") >= 2:
return FailureMode.MULTI_EVENT
# Legacy / unusual format signals
legacy_signals = ["ticket", "escalation", "crm", "deprecated", "retire",
"module will be", "workflow", "assigned agent"]
if any(s in log_l for s in legacy_signals):
return FailureMode.LEGACY_FORMAT
# Ambiguity signals β€” could be error OR security
ambiguous_signals = ["failed", "error", "unauthorized", "denied", "blocked"]
if sum(1 for s in ambiguous_signals if s in log_l) >= 2:
return FailureMode.AMBIGUOUS
# Rare vocabulary
rare_signals = ["sla", "oncall", "runbook", "pagerduty", "janitor", "gc ", "eviction"]
if any(s in log_l for s in rare_signals):
return FailureMode.RARE_VOCAB
return FailureMode.UNKNOWN
def _suggest_fix(mode: str) -> str:
fixes = {
FailureMode.RARE_VOCAB: "Add 5–10 training examples covering this vocabulary; or add regex rule.",
FailureMode.AMBIGUOUS: "Use multi-label or add a dedicated 'Ambiguous' class; review confidence threshold.",
FailureMode.LEGACY_FORMAT: "Route all legacy-format logs to LLM tier; add few-shot examples for LLM prompt.",
FailureMode.TRUNCATED: "Add input validation: reject/flag logs under 15 chars before classification.",
FailureMode.NUMERIC_ONLY: "Add regex patterns for structured numeric formats (job IDs, error codes, etc.).",
FailureMode.MULTI_EVENT: "Pre-process: split multi-event lines on ';' or ' | ' before classifying.",
FailureMode.UNKNOWN: "Manually review and add to training data or LLM few-shot examples.",
}
return fixes.get(mode, "Manual review required.")
# ── Core analysis ────────────────────────────────────────────────────────────
def analyze_unclassified(df: pd.DataFrame, label_col: str = "predicted_label") -> None:
"""Full error analysis on a classified CSV DataFrame."""
unclassified = df[df[label_col] == "Unclassified"].copy()
total_unclassified = len(unclassified)
if total_unclassified == 0:
print("βœ… No unclassified logs found!")
return
print(f"\n{'='*70}")
print(f"πŸ” ERROR ANALYSIS: {total_unclassified} Unclassified Logs")
print(f"{'='*70}\n")
# ── Step 1: Print all unclassified logs ─────────────────────────────────
log_col = "log_message" if "log_message" in df.columns else df.columns[-1]
print(f"{'#':>4} {'Log Message'}")
print("─" * 80)
for i, (_, row) in enumerate(unclassified.iterrows(), 1):
log = str(row.get(log_col, ""))
print(f"{i:>4}. {log[:120]}")
# ── Step 2: Group by failure mode ───────────────────────────────────────
print(f"\n{'='*70}")
print("πŸ“‚ GROUPING BY FAILURE MODE")
print("─" * 70)
groups: dict[str, list[str]] = defaultdict(list)
for _, row in unclassified.iterrows():
log = str(row.get(log_col, ""))
mode = _detect_failure_mode(log)
groups[mode].append(log)
for mode, logs in sorted(groups.items(), key=lambda x: -len(x[1])):
pct = len(logs) / total_unclassified * 100
print(f"\nπŸ”Ή {mode} β€” {len(logs)} logs ({pct:.1f}%)")
print(f" πŸ’‘ Fix: {_suggest_fix(mode)}")
print(f" Examples:")
for log in logs[:3]:
print(f" β€’ {log[:110]}")
# ── Step 3: Token frequency analysis ────────────────────────────────────
print(f"\n{'='*70}")
print("πŸ“Š COMMON TOKENS IN UNCLASSIFIED LOGS")
print("─" * 70)
STOPWORDS = {"the", "a", "an", "is", "in", "on", "for", "to", "of",
"and", "or", "by", "at", "with", "has", "was", "be",
"this", "that", "it", "not", "are", "from", "as"}
all_tokens: list[str] = []
for _, row in unclassified.iterrows():
log = str(row.get(log_col, "")).lower()
tokens = re.findall(r"[a-z]{3,}", log)
all_tokens.extend(t for t in tokens if t not in STOPWORDS)
counter = Counter(all_tokens)
print("Top 20 tokens in unclassified logs:")
for token, count in counter.most_common(20):
bar = "β–ˆ" * min(count, 40)
print(f" {token:<20} {count:>4} {bar}")
# ── Step 4: Length distribution ─────────────────────────────────────────
lengths = unclassified[log_col].apply(lambda x: len(str(x)))
print(f"\n{'='*70}")
print("πŸ“ LOG LENGTH DISTRIBUTION (Unclassified)")
print(f" Mean: {lengths.mean():.1f} chars")
print(f" Median: {lengths.median():.1f} chars")
print(f" Min: {lengths.min()} chars")
print(f" Max: {lengths.max()} chars")
short = (lengths < 30).sum()
if short:
print(f" ⚠️ {short} logs under 30 chars β€” likely truncated/noisy")
# ── Step 5: Source breakdown ─────────────────────────────────────────────
if "source" in df.columns:
print(f"\n{'='*70}")
print("🏷️ UNCLASSIFIED BY SOURCE")
src_counts = unclassified["source"].value_counts()
for src, cnt in src_counts.items():
bar = "β–ˆ" * min(cnt, 40)
print(f" {src:<22} {cnt:>4} {bar}")
# ── Step 6: Actionable summary ───────────────────────────────────────────
print(f"\n{'='*70}")
print("βœ… ACTIONABLE FIXES (Priority Order)")
print("─" * 70)
dominant_mode = max(groups.items(), key=lambda x: len(x[1]))[0] if groups else FailureMode.UNKNOWN
fixes = [
(1, "regex", "Add patterns for top unclassified tokens to processor_regex.py"),
(2, "training", "Add 10–20 examples per failure mode to training data"),
(3, "llm", "For LEGACY_FORMAT failures: add to LLM few-shot examples"),
(4, "preproc", "Pre-process: split multi-event logs, reject truncated logs"),
(5, "threshold","Tune BERT confidence threshold (currently 0.30 β€” try 0.40)"),
]
for priority, area, fix in fixes:
print(f" {priority}. [{area.upper():^10}] {fix}")
print(f"\nπŸ“Œ Dominant failure mode: '{dominant_mode}' ({len(groups.get(dominant_mode,[]))} logs)")
print(f" Start here: {_suggest_fix(dominant_mode)}\n")
# ── Simulate 76 unclassified logs for demo ────────────────────────────────────
def _simulate_unclassified() -> pd.DataFrame:
"""Generate synthetic 'unclassified' logs that mimic real failure patterns."""
logs = [
# Legacy format / CRM
"Case escalation for ticket ID 9021 failed: agent inactive.",
"CRM module 'ReportGenerator' will be retired in v4.1.",
"Workflow for approval chain #4421 stalled at step 3.",
"SLA breach detected for case ID 7701 (P1, 4h breach).",
# Ambiguous
"Service auth-api failed and unauthorized access was logged.",
"Error: blocked request from 10.0.0.5 β€” reason unknown.",
# Truncated / noisy
"ERR",
"srv timeout",
"node-7",
# Numeric-heavy
"8821 9001 443 0 0 DROP IN=eth0 OUT= MAC=",
"16 0 0 1 2024-01-14 03:21:00.001",
# Multi-event
"Backup started; disk usage at 92%; health check failed | node-3",
# Rare vocab
"PagerDuty alert triggered for on-call rotation P1-incident.",
"GC eviction: 3.2GB heap compacted in 420ms.",
"Janitor job completed: 14,000 stale tokens purged.",
"Runbook auto-remediation triggered for alert ALT-9021.",
]
# Pad to ~76
padded = (logs * 5)[:76]
return pd.DataFrame({
"source": ["ModernCRM"] * 30 + ["LegacyCRM"] * 20 + ["AnalyticsEngine"] * 26,
"log_message": padded,
"predicted_label": ["Unclassified"] * 76,
})
# ── CLI ──────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Analyze unclassified/misclassified logs")
parser.add_argument("--input", help="Path to classified CSV from classify_csv()")
parser.add_argument("--simulate", action="store_true",
help="Run with synthetic unclassified logs (no CSV needed)")
parser.add_argument("--label-col", default="predicted_label",
help="Column name that holds the predicted label")
args = parser.parse_args()
if args.simulate:
df = _simulate_unclassified()
print("🎭 Running with SIMULATED 76 unclassified logs…")
elif args.input:
df = pd.read_csv(args.input)
else:
parser.print_help()
sys.exit(1)
analyze_unclassified(df, label_col=args.label_col)
if __name__ == "__main__":
main()