LogAI-Engine / classify.py
NOT-OMEGA's picture
Update classify.py
de30f06 verified
raw
history blame
8.72 kB
"""
classify.py β€” 3-Tier Hybrid Pipeline (V3 β€” Latency-Tracked & Parallelized)
Architecture:
LegacyCRM β†’ LLM directly
Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
Changes in V3:
- Tier-wise latency tracking (regex_ms, bert_ms, llm_ms)
- Pipeline summary with p50/p95 per tier
- Defensive: LLM timeout + circuit breaker baked in via processor_llm
- Parallelized LLM Tier using ThreadPoolExecutor for high throughput
"""
from __future__ import annotations
import time
import statistics
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from processor_regex import classify_with_regex
from processor_bert import classify_batch as bert_batch
from processor_llm import classify_with_llm
LEGACY_SOURCE = "LegacyCRM"
# ── Result type ─────────────────────────────────────────────────────────────
def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
return {
"label": label,
"tier": tier,
"confidence": confidence,
"latency_ms": round(latency_ms, 3),
}
# ── Single log (backward-compatible) ────────────────────────────────────────
def classify_log(source: str, log_msg: str) -> dict:
"""Classify a single log. Returns label, tier, confidence, and latency_ms."""
results = classify_logs([(source, log_msg)])
return results[0]
# ── Batch pipeline (main entry point) ───────────────────────────────────────
def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
"""
Batch classify with 3-tier routing + per-result latency.
Returns list of dicts:
{ label, tier, confidence, latency_ms }
Tier routing:
LegacyCRM source β†’ LLM directly
Regex match β†’ done (sub-ms)
Remainder β†’ BERT batch β†’ LLM if low confidence
"""
n = len(logs)
results = [None] * n
# ── Step 1: Route to groups ─────────────────────────────────────────────
llm_indices = []
bert_indices = []
for i, (source, log_msg) in enumerate(logs):
if source == LEGACY_SOURCE:
llm_indices.append(i)
else:
t_start = time.perf_counter()
label = classify_with_regex(log_msg)
if label:
latency_ms = (time.perf_counter() - t_start) * 1000
results[i] = _make_result(label, "Regex", 1.0, latency_ms)
else:
bert_indices.append(i)
# ── Step 2: BERT batch ──────────────────────────────────────────────────
if bert_indices:
bert_msgs = [logs[i][1] for i in bert_indices]
t_bert_start = time.perf_counter()
bert_results = bert_batch(bert_msgs)
t_bert_end = time.perf_counter()
bert_ms_per_log = (t_bert_end - t_bert_start) * 1000 / len(bert_msgs)
for idx, (label, conf) in zip(bert_indices, bert_results):
if label != "Unclassified":
results[idx] = _make_result(label, "BERT", conf, bert_ms_per_log)
else:
llm_indices.append(idx)
# ── Step 3: LLM (Parallel Concurrency) ──────────────────────────────────
if llm_indices:
def parallel_llm(idx):
src, msg = logs[idx]
t_llm_0 = time.perf_counter()
label = classify_with_llm(msg)
t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
return idx, _make_result(label, tier, None, t_llm_ms)
# Parallelize API calls to prevent pipeline stall, restricted to 4 workers to prevent OOM
with ThreadPoolExecutor(max_workers=4) as executor:
llm_results = list(executor.map(parallel_llm, llm_indices))
for idx, res in llm_results:
results[idx] = res
return results
# ── Pipeline summary ─────────────────────────────────────────────────────────
def pipeline_summary(results: list[dict]) -> dict:
"""
Aggregate stats from classify_logs output.
Useful for dashboard and benchmark reporting.
"""
tier_groups: dict[str, list[float]] = {}
label_counts: dict[str, int] = {}
for r in results:
tier = r["tier"]
tier_groups.setdefault(tier, []).append(r["latency_ms"])
label_counts[r["label"]] = label_counts.get(r["label"], 0) + 1
total = len(results)
tier_stats = {}
for tier, latencies in tier_groups.items():
latencies_sorted = sorted(latencies)
n = len(latencies_sorted)
tier_stats[tier] = {
"count": n,
"pct": round(n / total * 100, 1),
"p50_ms": round(statistics.median(latencies_sorted), 2),
"p95_ms": round(latencies_sorted[min(int(n * 0.95), n - 1)], 2),
"p99_ms": round(latencies_sorted[min(int(n * 0.99), n - 1)], 2),
"mean_ms": round(statistics.mean(latencies_sorted), 2),
}
return {
"total": total,
"tier_stats": tier_stats,
"label_counts": label_counts,
}
# ── CSV batch classify ───────────────────────────────────────────────────────
def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
"""
Process a batch of logs from a CSV file.
Required columns: 'source', 'log_message'
Output: appends 'predicted_label', 'tier_used', 'confidence', 'latency_ms'
"""
df = pd.read_csv(input_path)
required = {"source", "log_message"}
if not required.issubset(df.columns):
raise ValueError(f"Missing required columns in CSV. Expected: {required}. Found: {set(df.columns)}")
log_pairs = list(zip(df["source"], df["log_message"]))
results = classify_logs(log_pairs)
df["predicted_label"] = [r["label"] for r in results]
df["tier_used"] = [r["tier"] for r in results]
df["latency_ms"] = [r["latency_ms"] for r in results]
df["confidence"] = [
f"{r['confidence']:.1%}" if r["confidence"] is not None else "N/A"
for r in results
]
df.to_csv(output_path, index=False)
return output_path, df
# Aliases
classify = classify_logs
# ── Self-test ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
sample = [
("ModernCRM", "IP 192.168.133.114 blocked due to potential attack"),
("BillingSystem", "User User12345 logged in."),
("AnalyticsEngine", "File data_6957.csv uploaded successfully by user User265."),
("ModernHR", "GET /v2/servers/detail HTTP/1.1 status: 200 len: 1583 time: 0.19"),
("ModernHR", "Admin access escalation detected for user 9429"),
("LegacyCRM", "Case escalation for ticket ID 7324 failed because the assigned support agent is no longer active."),
("LegacyCRM", "The 'ReportGenerator' module will be retired in version 4.0."),
]
print(f'{"Source":<20} {"Tier":<18} {"Conf":>6} {"Lat(ms)":>8} {"Label":<25} Log')
print("─" * 115)
results = classify_logs(sample)
for (source, log), r in zip(sample, results):
conf = f"{r['confidence']:.0%}" if r["confidence"] else " N/A"
print(f'{source:<20} {r["tier"]:<18} {conf:>6} {r["latency_ms"]:>8.1f} {r["label"]:<25} {log[:40]}')
summary = pipeline_summary(results)
print("\nπŸ“Š Pipeline Summary:")
for tier, stats in summary["tier_stats"].items():
print(f" {tier}: {stats['count']} logs ({stats['pct']}%) | "
f"p50={stats['p50_ms']}ms p95={stats['p95_ms']}ms p99={stats['p99_ms']}ms")
print("\n🏷️ Label distribution:")
for label, count in sorted(summary["label_counts"].items(), key=lambda x: -x[1]):
print(f" β€’ {label}: {count}")