""" error_analysis.py — Deep Dive into Unclassified / Misclassified Logs This script addresses the 76 unclassified logs from the 20k run. It answers: 1. What do these logs look like? (print + group) 2. Why did the model fail? (pattern analysis) 3. What should we do? (actionable fix suggestions) Google interview talking point: "I performed structured error analysis on my model's failure cases. I grouped them by failure type — vocabulary mismatch, ambiguous intent, formatting noise — and used that to drive targeted improvements." Usage: python error_analysis.py --input output.csv # post-classify CSV python error_analysis.py --simulate # demo with synthetic data """ from __future__ import annotations import argparse import re import sys from collections import Counter, defaultdict from typing import Optional import pandas as pd # ── Failure mode taxonomy ──────────────────────────────────────────────────── class FailureMode: RARE_VOCAB = "rare_vocabulary" # domain-specific terms not in training AMBIGUOUS = "ambiguous_intent" # log could match multiple categories LEGACY_FORMAT = "legacy_format" # non-standard / old-school formatting TRUNCATED = "truncated_or_noisy" # partial / malformed log line NUMERIC_ONLY = "mostly_numeric" # ID/code-heavy, no semantic signal MULTI_EVENT = "multi_event" # one line, multiple events UNKNOWN = "unknown" def _detect_failure_mode(log: str) -> str: """Heuristic: guess WHY this log was unclassified.""" log_l = log.lower() if len(log) < 20: return FailureMode.TRUNCATED # Check ratio of digits to total chars digit_ratio = sum(c.isdigit() for c in log) / max(len(log), 1) if digit_ratio > 0.40: return FailureMode.NUMERIC_ONLY # Looks like it has 2+ events joined if log.count(";") >= 2 or log.count(" AND ") >= 1 or log.count(" | ") >= 2: return FailureMode.MULTI_EVENT # Legacy / unusual format signals legacy_signals = ["ticket", "escalation", "crm", "deprecated", "retire", "module will be", "workflow", "assigned agent"] if any(s in log_l for s in legacy_signals): return FailureMode.LEGACY_FORMAT # Ambiguity signals — could be error OR security ambiguous_signals = ["failed", "error", "unauthorized", "denied", "blocked"] if sum(1 for s in ambiguous_signals if s in log_l) >= 2: return FailureMode.AMBIGUOUS # Rare vocabulary rare_signals = ["sla", "oncall", "runbook", "pagerduty", "janitor", "gc ", "eviction"] if any(s in log_l for s in rare_signals): return FailureMode.RARE_VOCAB return FailureMode.UNKNOWN def _suggest_fix(mode: str) -> str: fixes = { FailureMode.RARE_VOCAB: "Add 5–10 training examples covering this vocabulary; or add regex rule.", FailureMode.AMBIGUOUS: "Use multi-label or add a dedicated 'Ambiguous' class; review confidence threshold.", FailureMode.LEGACY_FORMAT: "Route all legacy-format logs to LLM tier; add few-shot examples for LLM prompt.", FailureMode.TRUNCATED: "Add input validation: reject/flag logs under 15 chars before classification.", FailureMode.NUMERIC_ONLY: "Add regex patterns for structured numeric formats (job IDs, error codes, etc.).", FailureMode.MULTI_EVENT: "Pre-process: split multi-event lines on ';' or ' | ' before classifying.", FailureMode.UNKNOWN: "Manually review and add to training data or LLM few-shot examples.", } return fixes.get(mode, "Manual review required.") # ── Core analysis ──────────────────────────────────────────────────────────── def analyze_unclassified(df: pd.DataFrame, label_col: str = "predicted_label") -> None: """Full error analysis on a classified CSV DataFrame.""" unclassified = df[df[label_col] == "Unclassified"].copy() total_unclassified = len(unclassified) if total_unclassified == 0: print("✅ No unclassified logs found!") return print(f"\n{'='*70}") print(f"🔍 ERROR ANALYSIS: {total_unclassified} Unclassified Logs") print(f"{'='*70}\n") # ── Step 1: Print all unclassified logs ───────────────────────────────── log_col = "log_message" if "log_message" in df.columns else df.columns[-1] print(f"{'#':>4} {'Log Message'}") print("─" * 80) for i, (_, row) in enumerate(unclassified.iterrows(), 1): log = str(row.get(log_col, "")) print(f"{i:>4}. {log[:120]}") # ── Step 2: Group by failure mode ─────────────────────────────────────── print(f"\n{'='*70}") print("📂 GROUPING BY FAILURE MODE") print("─" * 70) groups: dict[str, list[str]] = defaultdict(list) for _, row in unclassified.iterrows(): log = str(row.get(log_col, "")) mode = _detect_failure_mode(log) groups[mode].append(log) for mode, logs in sorted(groups.items(), key=lambda x: -len(x[1])): pct = len(logs) / total_unclassified * 100 print(f"\n🔹 {mode} — {len(logs)} logs ({pct:.1f}%)") print(f" 💡 Fix: {_suggest_fix(mode)}") print(f" Examples:") for log in logs[:3]: print(f" • {log[:110]}") # ── Step 3: Token frequency analysis ──────────────────────────────────── print(f"\n{'='*70}") print("📊 COMMON TOKENS IN UNCLASSIFIED LOGS") print("─" * 70) STOPWORDS = {"the", "a", "an", "is", "in", "on", "for", "to", "of", "and", "or", "by", "at", "with", "has", "was", "be", "this", "that", "it", "not", "are", "from", "as"} all_tokens: list[str] = [] for _, row in unclassified.iterrows(): log = str(row.get(log_col, "")).lower() tokens = re.findall(r"[a-z]{3,}", log) all_tokens.extend(t for t in tokens if t not in STOPWORDS) counter = Counter(all_tokens) print("Top 20 tokens in unclassified logs:") for token, count in counter.most_common(20): bar = "█" * min(count, 40) print(f" {token:<20} {count:>4} {bar}") # ── Step 4: Length distribution ───────────────────────────────────────── lengths = unclassified[log_col].apply(lambda x: len(str(x))) print(f"\n{'='*70}") print("📏 LOG LENGTH DISTRIBUTION (Unclassified)") print(f" Mean: {lengths.mean():.1f} chars") print(f" Median: {lengths.median():.1f} chars") print(f" Min: {lengths.min()} chars") print(f" Max: {lengths.max()} chars") short = (lengths < 30).sum() if short: print(f" ⚠️ {short} logs under 30 chars — likely truncated/noisy") # ── Step 5: Source breakdown ───────────────────────────────────────────── if "source" in df.columns: print(f"\n{'='*70}") print("🏷️ UNCLASSIFIED BY SOURCE") src_counts = unclassified["source"].value_counts() for src, cnt in src_counts.items(): bar = "█" * min(cnt, 40) print(f" {src:<22} {cnt:>4} {bar}") # ── Step 6: Actionable summary ─────────────────────────────────────────── print(f"\n{'='*70}") print("✅ ACTIONABLE FIXES (Priority Order)") print("─" * 70) dominant_mode = max(groups.items(), key=lambda x: len(x[1]))[0] if groups else FailureMode.UNKNOWN fixes = [ (1, "regex", "Add patterns for top unclassified tokens to processor_regex.py"), (2, "training", "Add 10–20 examples per failure mode to training data"), (3, "llm", "For LEGACY_FORMAT failures: add to LLM few-shot examples"), (4, "preproc", "Pre-process: split multi-event logs, reject truncated logs"), (5, "threshold","Tune BERT confidence threshold (currently 0.30 — try 0.40)"), ] for priority, area, fix in fixes: print(f" {priority}. [{area.upper():^10}] {fix}") print(f"\n📌 Dominant failure mode: '{dominant_mode}' ({len(groups.get(dominant_mode,[]))} logs)") print(f" Start here: {_suggest_fix(dominant_mode)}\n") # ── Simulate 76 unclassified logs for demo ──────────────────────────────────── def _simulate_unclassified() -> pd.DataFrame: """Generate synthetic 'unclassified' logs that mimic real failure patterns.""" logs = [ # Legacy format / CRM "Case escalation for ticket ID 9021 failed: agent inactive.", "CRM module 'ReportGenerator' will be retired in v4.1.", "Workflow for approval chain #4421 stalled at step 3.", "SLA breach detected for case ID 7701 (P1, 4h breach).", # Ambiguous "Service auth-api failed and unauthorized access was logged.", "Error: blocked request from 10.0.0.5 — reason unknown.", # Truncated / noisy "ERR", "srv timeout", "node-7", # Numeric-heavy "8821 9001 443 0 0 DROP IN=eth0 OUT= MAC=", "16 0 0 1 2024-01-14 03:21:00.001", # Multi-event "Backup started; disk usage at 92%; health check failed | node-3", # Rare vocab "PagerDuty alert triggered for on-call rotation P1-incident.", "GC eviction: 3.2GB heap compacted in 420ms.", "Janitor job completed: 14,000 stale tokens purged.", "Runbook auto-remediation triggered for alert ALT-9021.", ] # Pad to ~76 padded = (logs * 5)[:76] return pd.DataFrame({ "source": ["ModernCRM"] * 30 + ["LegacyCRM"] * 20 + ["AnalyticsEngine"] * 26, "log_message": padded, "predicted_label": ["Unclassified"] * 76, }) # ── CLI ────────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Analyze unclassified/misclassified logs") parser.add_argument("--input", help="Path to classified CSV from classify_csv()") parser.add_argument("--simulate", action="store_true", help="Run with synthetic unclassified logs (no CSV needed)") parser.add_argument("--label-col", default="predicted_label", help="Column name that holds the predicted label") args = parser.parse_args() if args.simulate: df = _simulate_unclassified() print("🎭 Running with SIMULATED 76 unclassified logs…") elif args.input: df = pd.read_csv(args.input) else: parser.print_help() sys.exit(1) analyze_unclassified(df, label_col=args.label_col) if __name__ == "__main__": main()