Spaces:
Running
Running
| """ | |
| error_analysis.py β Deep Dive into Unclassified / Misclassified Logs | |
| This script addresses the 76 unclassified logs from the 20k run. | |
| It answers: | |
| 1. What do these logs look like? (print + group) | |
| 2. Why did the model fail? (pattern analysis) | |
| 3. What should we do? (actionable fix suggestions) | |
| Google interview talking point: | |
| "I performed structured error analysis on my model's failure cases. | |
| I grouped them by failure type β vocabulary mismatch, ambiguous intent, | |
| formatting noise β and used that to drive targeted improvements." | |
| Usage: | |
| python error_analysis.py --input output.csv # post-classify CSV | |
| python error_analysis.py --simulate # demo with synthetic data | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import re | |
| import sys | |
| from collections import Counter, defaultdict | |
| from typing import Optional | |
| import pandas as pd | |
| # ββ Failure mode taxonomy ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class FailureMode: | |
| RARE_VOCAB = "rare_vocabulary" # domain-specific terms not in training | |
| AMBIGUOUS = "ambiguous_intent" # log could match multiple categories | |
| LEGACY_FORMAT = "legacy_format" # non-standard / old-school formatting | |
| TRUNCATED = "truncated_or_noisy" # partial / malformed log line | |
| NUMERIC_ONLY = "mostly_numeric" # ID/code-heavy, no semantic signal | |
| MULTI_EVENT = "multi_event" # one line, multiple events | |
| UNKNOWN = "unknown" | |
| def _detect_failure_mode(log: str) -> str: | |
| """Heuristic: guess WHY this log was unclassified.""" | |
| log_l = log.lower() | |
| if len(log) < 20: | |
| return FailureMode.TRUNCATED | |
| # Check ratio of digits to total chars | |
| digit_ratio = sum(c.isdigit() for c in log) / max(len(log), 1) | |
| if digit_ratio > 0.40: | |
| return FailureMode.NUMERIC_ONLY | |
| # Looks like it has 2+ events joined | |
| if log.count(";") >= 2 or log.count(" AND ") >= 1 or log.count(" | ") >= 2: | |
| return FailureMode.MULTI_EVENT | |
| # Legacy / unusual format signals | |
| legacy_signals = ["ticket", "escalation", "crm", "deprecated", "retire", | |
| "module will be", "workflow", "assigned agent"] | |
| if any(s in log_l for s in legacy_signals): | |
| return FailureMode.LEGACY_FORMAT | |
| # Ambiguity signals β could be error OR security | |
| ambiguous_signals = ["failed", "error", "unauthorized", "denied", "blocked"] | |
| if sum(1 for s in ambiguous_signals if s in log_l) >= 2: | |
| return FailureMode.AMBIGUOUS | |
| # Rare vocabulary | |
| rare_signals = ["sla", "oncall", "runbook", "pagerduty", "janitor", "gc ", "eviction"] | |
| if any(s in log_l for s in rare_signals): | |
| return FailureMode.RARE_VOCAB | |
| return FailureMode.UNKNOWN | |
| def _suggest_fix(mode: str) -> str: | |
| fixes = { | |
| FailureMode.RARE_VOCAB: "Add 5β10 training examples covering this vocabulary; or add regex rule.", | |
| FailureMode.AMBIGUOUS: "Use multi-label or add a dedicated 'Ambiguous' class; review confidence threshold.", | |
| FailureMode.LEGACY_FORMAT: "Route all legacy-format logs to LLM tier; add few-shot examples for LLM prompt.", | |
| FailureMode.TRUNCATED: "Add input validation: reject/flag logs under 15 chars before classification.", | |
| FailureMode.NUMERIC_ONLY: "Add regex patterns for structured numeric formats (job IDs, error codes, etc.).", | |
| FailureMode.MULTI_EVENT: "Pre-process: split multi-event lines on ';' or ' | ' before classifying.", | |
| FailureMode.UNKNOWN: "Manually review and add to training data or LLM few-shot examples.", | |
| } | |
| return fixes.get(mode, "Manual review required.") | |
| # ββ Core analysis ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def analyze_unclassified(df: pd.DataFrame, label_col: str = "predicted_label") -> None: | |
| """Full error analysis on a classified CSV DataFrame.""" | |
| unclassified = df[df[label_col] == "Unclassified"].copy() | |
| total_unclassified = len(unclassified) | |
| if total_unclassified == 0: | |
| print("β No unclassified logs found!") | |
| return | |
| print(f"\n{'='*70}") | |
| print(f"π ERROR ANALYSIS: {total_unclassified} Unclassified Logs") | |
| print(f"{'='*70}\n") | |
| # ββ Step 1: Print all unclassified logs βββββββββββββββββββββββββββββββββ | |
| log_col = "log_message" if "log_message" in df.columns else df.columns[-1] | |
| print(f"{'#':>4} {'Log Message'}") | |
| print("β" * 80) | |
| for i, (_, row) in enumerate(unclassified.iterrows(), 1): | |
| log = str(row.get(log_col, "")) | |
| print(f"{i:>4}. {log[:120]}") | |
| # ββ Step 2: Group by failure mode βββββββββββββββββββββββββββββββββββββββ | |
| print(f"\n{'='*70}") | |
| print("π GROUPING BY FAILURE MODE") | |
| print("β" * 70) | |
| groups: dict[str, list[str]] = defaultdict(list) | |
| for _, row in unclassified.iterrows(): | |
| log = str(row.get(log_col, "")) | |
| mode = _detect_failure_mode(log) | |
| groups[mode].append(log) | |
| for mode, logs in sorted(groups.items(), key=lambda x: -len(x[1])): | |
| pct = len(logs) / total_unclassified * 100 | |
| print(f"\nπΉ {mode} β {len(logs)} logs ({pct:.1f}%)") | |
| print(f" π‘ Fix: {_suggest_fix(mode)}") | |
| print(f" Examples:") | |
| for log in logs[:3]: | |
| print(f" β’ {log[:110]}") | |
| # ββ Step 3: Token frequency analysis ββββββββββββββββββββββββββββββββββββ | |
| print(f"\n{'='*70}") | |
| print("π COMMON TOKENS IN UNCLASSIFIED LOGS") | |
| print("β" * 70) | |
| STOPWORDS = {"the", "a", "an", "is", "in", "on", "for", "to", "of", | |
| "and", "or", "by", "at", "with", "has", "was", "be", | |
| "this", "that", "it", "not", "are", "from", "as"} | |
| all_tokens: list[str] = [] | |
| for _, row in unclassified.iterrows(): | |
| log = str(row.get(log_col, "")).lower() | |
| tokens = re.findall(r"[a-z]{3,}", log) | |
| all_tokens.extend(t for t in tokens if t not in STOPWORDS) | |
| counter = Counter(all_tokens) | |
| print("Top 20 tokens in unclassified logs:") | |
| for token, count in counter.most_common(20): | |
| bar = "β" * min(count, 40) | |
| print(f" {token:<20} {count:>4} {bar}") | |
| # ββ Step 4: Length distribution βββββββββββββββββββββββββββββββββββββββββ | |
| lengths = unclassified[log_col].apply(lambda x: len(str(x))) | |
| print(f"\n{'='*70}") | |
| print("π LOG LENGTH DISTRIBUTION (Unclassified)") | |
| print(f" Mean: {lengths.mean():.1f} chars") | |
| print(f" Median: {lengths.median():.1f} chars") | |
| print(f" Min: {lengths.min()} chars") | |
| print(f" Max: {lengths.max()} chars") | |
| short = (lengths < 30).sum() | |
| if short: | |
| print(f" β οΈ {short} logs under 30 chars β likely truncated/noisy") | |
| # ββ Step 5: Source breakdown βββββββββββββββββββββββββββββββββββββββββββββ | |
| if "source" in df.columns: | |
| print(f"\n{'='*70}") | |
| print("π·οΈ UNCLASSIFIED BY SOURCE") | |
| src_counts = unclassified["source"].value_counts() | |
| for src, cnt in src_counts.items(): | |
| bar = "β" * min(cnt, 40) | |
| print(f" {src:<22} {cnt:>4} {bar}") | |
| # ββ Step 6: Actionable summary βββββββββββββββββββββββββββββββββββββββββββ | |
| print(f"\n{'='*70}") | |
| print("β ACTIONABLE FIXES (Priority Order)") | |
| print("β" * 70) | |
| dominant_mode = max(groups.items(), key=lambda x: len(x[1]))[0] if groups else FailureMode.UNKNOWN | |
| fixes = [ | |
| (1, "regex", "Add patterns for top unclassified tokens to processor_regex.py"), | |
| (2, "training", "Add 10β20 examples per failure mode to training data"), | |
| (3, "llm", "For LEGACY_FORMAT failures: add to LLM few-shot examples"), | |
| (4, "preproc", "Pre-process: split multi-event logs, reject truncated logs"), | |
| (5, "threshold","Tune BERT confidence threshold (currently 0.30 β try 0.40)"), | |
| ] | |
| for priority, area, fix in fixes: | |
| print(f" {priority}. [{area.upper():^10}] {fix}") | |
| print(f"\nπ Dominant failure mode: '{dominant_mode}' ({len(groups.get(dominant_mode,[]))} logs)") | |
| print(f" Start here: {_suggest_fix(dominant_mode)}\n") | |
| # ββ Simulate 76 unclassified logs for demo ββββββββββββββββββββββββββββββββββββ | |
| def _simulate_unclassified() -> pd.DataFrame: | |
| """Generate synthetic 'unclassified' logs that mimic real failure patterns.""" | |
| logs = [ | |
| # Legacy format / CRM | |
| "Case escalation for ticket ID 9021 failed: agent inactive.", | |
| "CRM module 'ReportGenerator' will be retired in v4.1.", | |
| "Workflow for approval chain #4421 stalled at step 3.", | |
| "SLA breach detected for case ID 7701 (P1, 4h breach).", | |
| # Ambiguous | |
| "Service auth-api failed and unauthorized access was logged.", | |
| "Error: blocked request from 10.0.0.5 β reason unknown.", | |
| # Truncated / noisy | |
| "ERR", | |
| "srv timeout", | |
| "node-7", | |
| # Numeric-heavy | |
| "8821 9001 443 0 0 DROP IN=eth0 OUT= MAC=", | |
| "16 0 0 1 2024-01-14 03:21:00.001", | |
| # Multi-event | |
| "Backup started; disk usage at 92%; health check failed | node-3", | |
| # Rare vocab | |
| "PagerDuty alert triggered for on-call rotation P1-incident.", | |
| "GC eviction: 3.2GB heap compacted in 420ms.", | |
| "Janitor job completed: 14,000 stale tokens purged.", | |
| "Runbook auto-remediation triggered for alert ALT-9021.", | |
| ] | |
| # Pad to ~76 | |
| padded = (logs * 5)[:76] | |
| return pd.DataFrame({ | |
| "source": ["ModernCRM"] * 30 + ["LegacyCRM"] * 20 + ["AnalyticsEngine"] * 26, | |
| "log_message": padded, | |
| "predicted_label": ["Unclassified"] * 76, | |
| }) | |
| # ββ CLI ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Analyze unclassified/misclassified logs") | |
| parser.add_argument("--input", help="Path to classified CSV from classify_csv()") | |
| parser.add_argument("--simulate", action="store_true", | |
| help="Run with synthetic unclassified logs (no CSV needed)") | |
| parser.add_argument("--label-col", default="predicted_label", | |
| help="Column name that holds the predicted label") | |
| args = parser.parse_args() | |
| if args.simulate: | |
| df = _simulate_unclassified() | |
| print("π Running with SIMULATED 76 unclassified logsβ¦") | |
| elif args.input: | |
| df = pd.read_csv(args.input) | |
| else: | |
| parser.print_help() | |
| sys.exit(1) | |
| analyze_unclassified(df, label_col=args.label_col) | |
| if __name__ == "__main__": | |
| main() | |