Spaces:
Running
Running
File size: 11,511 Bytes
4561114 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 | """
error_analysis.py β Deep Dive into Unclassified / Misclassified Logs
This script addresses the 76 unclassified logs from the 20k run.
It answers:
1. What do these logs look like? (print + group)
2. Why did the model fail? (pattern analysis)
3. What should we do? (actionable fix suggestions)
Google interview talking point:
"I performed structured error analysis on my model's failure cases.
I grouped them by failure type β vocabulary mismatch, ambiguous intent,
formatting noise β and used that to drive targeted improvements."
Usage:
python error_analysis.py --input output.csv # post-classify CSV
python error_analysis.py --simulate # demo with synthetic data
"""
from __future__ import annotations
import argparse
import re
import sys
from collections import Counter, defaultdict
from typing import Optional
import pandas as pd
# ββ Failure mode taxonomy ββββββββββββββββββββββββββββββββββββββββββββββββββββ
class FailureMode:
RARE_VOCAB = "rare_vocabulary" # domain-specific terms not in training
AMBIGUOUS = "ambiguous_intent" # log could match multiple categories
LEGACY_FORMAT = "legacy_format" # non-standard / old-school formatting
TRUNCATED = "truncated_or_noisy" # partial / malformed log line
NUMERIC_ONLY = "mostly_numeric" # ID/code-heavy, no semantic signal
MULTI_EVENT = "multi_event" # one line, multiple events
UNKNOWN = "unknown"
def _detect_failure_mode(log: str) -> str:
"""Heuristic: guess WHY this log was unclassified."""
log_l = log.lower()
if len(log) < 20:
return FailureMode.TRUNCATED
# Check ratio of digits to total chars
digit_ratio = sum(c.isdigit() for c in log) / max(len(log), 1)
if digit_ratio > 0.40:
return FailureMode.NUMERIC_ONLY
# Looks like it has 2+ events joined
if log.count(";") >= 2 or log.count(" AND ") >= 1 or log.count(" | ") >= 2:
return FailureMode.MULTI_EVENT
# Legacy / unusual format signals
legacy_signals = ["ticket", "escalation", "crm", "deprecated", "retire",
"module will be", "workflow", "assigned agent"]
if any(s in log_l for s in legacy_signals):
return FailureMode.LEGACY_FORMAT
# Ambiguity signals β could be error OR security
ambiguous_signals = ["failed", "error", "unauthorized", "denied", "blocked"]
if sum(1 for s in ambiguous_signals if s in log_l) >= 2:
return FailureMode.AMBIGUOUS
# Rare vocabulary
rare_signals = ["sla", "oncall", "runbook", "pagerduty", "janitor", "gc ", "eviction"]
if any(s in log_l for s in rare_signals):
return FailureMode.RARE_VOCAB
return FailureMode.UNKNOWN
def _suggest_fix(mode: str) -> str:
fixes = {
FailureMode.RARE_VOCAB: "Add 5β10 training examples covering this vocabulary; or add regex rule.",
FailureMode.AMBIGUOUS: "Use multi-label or add a dedicated 'Ambiguous' class; review confidence threshold.",
FailureMode.LEGACY_FORMAT: "Route all legacy-format logs to LLM tier; add few-shot examples for LLM prompt.",
FailureMode.TRUNCATED: "Add input validation: reject/flag logs under 15 chars before classification.",
FailureMode.NUMERIC_ONLY: "Add regex patterns for structured numeric formats (job IDs, error codes, etc.).",
FailureMode.MULTI_EVENT: "Pre-process: split multi-event lines on ';' or ' | ' before classifying.",
FailureMode.UNKNOWN: "Manually review and add to training data or LLM few-shot examples.",
}
return fixes.get(mode, "Manual review required.")
# ββ Core analysis ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def analyze_unclassified(df: pd.DataFrame, label_col: str = "predicted_label") -> None:
"""Full error analysis on a classified CSV DataFrame."""
unclassified = df[df[label_col] == "Unclassified"].copy()
total_unclassified = len(unclassified)
if total_unclassified == 0:
print("β
No unclassified logs found!")
return
print(f"\n{'='*70}")
print(f"π ERROR ANALYSIS: {total_unclassified} Unclassified Logs")
print(f"{'='*70}\n")
# ββ Step 1: Print all unclassified logs βββββββββββββββββββββββββββββββββ
log_col = "log_message" if "log_message" in df.columns else df.columns[-1]
print(f"{'#':>4} {'Log Message'}")
print("β" * 80)
for i, (_, row) in enumerate(unclassified.iterrows(), 1):
log = str(row.get(log_col, ""))
print(f"{i:>4}. {log[:120]}")
# ββ Step 2: Group by failure mode βββββββββββββββββββββββββββββββββββββββ
print(f"\n{'='*70}")
print("π GROUPING BY FAILURE MODE")
print("β" * 70)
groups: dict[str, list[str]] = defaultdict(list)
for _, row in unclassified.iterrows():
log = str(row.get(log_col, ""))
mode = _detect_failure_mode(log)
groups[mode].append(log)
for mode, logs in sorted(groups.items(), key=lambda x: -len(x[1])):
pct = len(logs) / total_unclassified * 100
print(f"\nπΉ {mode} β {len(logs)} logs ({pct:.1f}%)")
print(f" π‘ Fix: {_suggest_fix(mode)}")
print(f" Examples:")
for log in logs[:3]:
print(f" β’ {log[:110]}")
# ββ Step 3: Token frequency analysis ββββββββββββββββββββββββββββββββββββ
print(f"\n{'='*70}")
print("π COMMON TOKENS IN UNCLASSIFIED LOGS")
print("β" * 70)
STOPWORDS = {"the", "a", "an", "is", "in", "on", "for", "to", "of",
"and", "or", "by", "at", "with", "has", "was", "be",
"this", "that", "it", "not", "are", "from", "as"}
all_tokens: list[str] = []
for _, row in unclassified.iterrows():
log = str(row.get(log_col, "")).lower()
tokens = re.findall(r"[a-z]{3,}", log)
all_tokens.extend(t for t in tokens if t not in STOPWORDS)
counter = Counter(all_tokens)
print("Top 20 tokens in unclassified logs:")
for token, count in counter.most_common(20):
bar = "β" * min(count, 40)
print(f" {token:<20} {count:>4} {bar}")
# ββ Step 4: Length distribution βββββββββββββββββββββββββββββββββββββββββ
lengths = unclassified[log_col].apply(lambda x: len(str(x)))
print(f"\n{'='*70}")
print("π LOG LENGTH DISTRIBUTION (Unclassified)")
print(f" Mean: {lengths.mean():.1f} chars")
print(f" Median: {lengths.median():.1f} chars")
print(f" Min: {lengths.min()} chars")
print(f" Max: {lengths.max()} chars")
short = (lengths < 30).sum()
if short:
print(f" β οΈ {short} logs under 30 chars β likely truncated/noisy")
# ββ Step 5: Source breakdown βββββββββββββββββββββββββββββββββββββββββββββ
if "source" in df.columns:
print(f"\n{'='*70}")
print("π·οΈ UNCLASSIFIED BY SOURCE")
src_counts = unclassified["source"].value_counts()
for src, cnt in src_counts.items():
bar = "β" * min(cnt, 40)
print(f" {src:<22} {cnt:>4} {bar}")
# ββ Step 6: Actionable summary βββββββββββββββββββββββββββββββββββββββββββ
print(f"\n{'='*70}")
print("β
ACTIONABLE FIXES (Priority Order)")
print("β" * 70)
dominant_mode = max(groups.items(), key=lambda x: len(x[1]))[0] if groups else FailureMode.UNKNOWN
fixes = [
(1, "regex", "Add patterns for top unclassified tokens to processor_regex.py"),
(2, "training", "Add 10β20 examples per failure mode to training data"),
(3, "llm", "For LEGACY_FORMAT failures: add to LLM few-shot examples"),
(4, "preproc", "Pre-process: split multi-event logs, reject truncated logs"),
(5, "threshold","Tune BERT confidence threshold (currently 0.30 β try 0.40)"),
]
for priority, area, fix in fixes:
print(f" {priority}. [{area.upper():^10}] {fix}")
print(f"\nπ Dominant failure mode: '{dominant_mode}' ({len(groups.get(dominant_mode,[]))} logs)")
print(f" Start here: {_suggest_fix(dominant_mode)}\n")
# ββ Simulate 76 unclassified logs for demo ββββββββββββββββββββββββββββββββββββ
def _simulate_unclassified() -> pd.DataFrame:
"""Generate synthetic 'unclassified' logs that mimic real failure patterns."""
logs = [
# Legacy format / CRM
"Case escalation for ticket ID 9021 failed: agent inactive.",
"CRM module 'ReportGenerator' will be retired in v4.1.",
"Workflow for approval chain #4421 stalled at step 3.",
"SLA breach detected for case ID 7701 (P1, 4h breach).",
# Ambiguous
"Service auth-api failed and unauthorized access was logged.",
"Error: blocked request from 10.0.0.5 β reason unknown.",
# Truncated / noisy
"ERR",
"srv timeout",
"node-7",
# Numeric-heavy
"8821 9001 443 0 0 DROP IN=eth0 OUT= MAC=",
"16 0 0 1 2024-01-14 03:21:00.001",
# Multi-event
"Backup started; disk usage at 92%; health check failed | node-3",
# Rare vocab
"PagerDuty alert triggered for on-call rotation P1-incident.",
"GC eviction: 3.2GB heap compacted in 420ms.",
"Janitor job completed: 14,000 stale tokens purged.",
"Runbook auto-remediation triggered for alert ALT-9021.",
]
# Pad to ~76
padded = (logs * 5)[:76]
return pd.DataFrame({
"source": ["ModernCRM"] * 30 + ["LegacyCRM"] * 20 + ["AnalyticsEngine"] * 26,
"log_message": padded,
"predicted_label": ["Unclassified"] * 76,
})
# ββ CLI ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def main():
parser = argparse.ArgumentParser(description="Analyze unclassified/misclassified logs")
parser.add_argument("--input", help="Path to classified CSV from classify_csv()")
parser.add_argument("--simulate", action="store_true",
help="Run with synthetic unclassified logs (no CSV needed)")
parser.add_argument("--label-col", default="predicted_label",
help="Column name that holds the predicted label")
args = parser.parse_args()
if args.simulate:
df = _simulate_unclassified()
print("π Running with SIMULATED 76 unclassified logsβ¦")
elif args.input:
df = pd.read_csv(args.input)
else:
parser.print_help()
sys.exit(1)
analyze_unclassified(df, label_col=args.label_col)
if __name__ == "__main__":
main()
|