LogAI-Engine / classify.py
NOT-OMEGA's picture
Upload 13 files
2222383 verified
raw
history blame
8.34 kB
"""
classify.py β€” 3-Tier Hybrid Pipeline (V3 β€” Latency-Tracked)
Architecture:
LegacyCRM β†’ LLM directly
Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
Changes in V3:
- Tier-wise latency tracking (regex_ms, bert_ms, llm_ms)
- Pipeline summary with p50/p95 per tier
- Defensive: LLM timeout + retry baked in via processor_llm
- classify_logs returns richer result dict
"""
from __future__ import annotations
import time
import statistics
import pandas as pd
from processor_regex import classify_with_regex
from processor_bert import classify_batch as bert_batch
from processor_llm import classify_with_llm
LEGACY_SOURCE = "LegacyCRM"
# ── Result type ─────────────────────────────────────────────────────────────
def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
return {
"label": label,
"tier": tier,
"confidence": confidence,
"latency_ms": round(latency_ms, 3),
}
# ── Single log (backward-compatible) ────────────────────────────────────────
def classify_log(source: str, log_msg: str) -> dict:
"""Single log classify karo. Returns label, tier, confidence, latency_ms."""
results = classify_logs([(source, log_msg)])
return results[0]
# ── Batch pipeline (main entry point) ───────────────────────────────────────
def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
"""
Batch classify with 3-tier routing + per-result latency.
Returns list of dicts:
{ label, tier, confidence, latency_ms }
Tier routing:
LegacyCRM source β†’ LLM directly
Regex match β†’ done (sub-ms)
Remainder β†’ BERT batch β†’ LLM if low confidence
"""
n = len(logs)
results = [None] * n
# ── Step 1: Route to groups ─────────────────────────────────────────────
llm_indices = []
bert_indices = []
entry_times = [time.perf_counter()] * n # approximate per-log start
t_route_start = time.perf_counter()
for i, (source, log_msg) in enumerate(logs):
entry_times[i] = time.perf_counter()
if source == LEGACY_SOURCE:
llm_indices.append(i)
else:
t0 = time.perf_counter()
label = classify_with_regex(log_msg)
t1 = time.perf_counter()
if label:
results[i] = _make_result(label, "Regex", 1.0, (t1 - t0) * 1000)
else:
bert_indices.append(i)
# ── Step 2: BERT batch ──────────────────────────────────────────────────
if bert_indices:
bert_msgs = [logs[i][1] for i in bert_indices]
t_bert_start = time.perf_counter()
bert_results = bert_batch(bert_msgs)
t_bert_end = time.perf_counter()
bert_ms_per_log = (t_bert_end - t_bert_start) * 1000 / len(bert_msgs)
for idx, (label, conf) in zip(bert_indices, bert_results):
if label != "Unclassified":
results[idx] = _make_result(label, "BERT", conf, bert_ms_per_log)
else:
llm_indices.append(idx)
# ── Step 3: LLM (LegacyCRM + BERT fallback) ────────────────────────────
for i in llm_indices:
_, log_msg = logs[i]
t0 = time.perf_counter()
label = classify_with_llm(log_msg)
t1 = time.perf_counter()
tier = "LLM" if logs[i][0] == LEGACY_SOURCE else "LLM (fallback)"
results[i] = _make_result(label, tier, None, (t1 - t0) * 1000)
return results
# ── Pipeline summary ─────────────────────────────────────────────────────────
def pipeline_summary(results: list[dict]) -> dict:
"""
Aggregate stats from classify_logs output.
Useful for dashboard and benchmark reporting.
"""
tier_groups: dict[str, list[float]] = {}
label_counts: dict[str, int] = {}
for r in results:
tier = r["tier"]
tier_groups.setdefault(tier, []).append(r["latency_ms"])
label_counts[r["label"]] = label_counts.get(r["label"], 0) + 1
total = len(results)
tier_stats = {}
for tier, latencies in tier_groups.items():
latencies_sorted = sorted(latencies)
n = len(latencies_sorted)
tier_stats[tier] = {
"count": n,
"pct": round(n / total * 100, 1),
"p50_ms": round(statistics.median(latencies_sorted), 2),
"p95_ms": round(latencies_sorted[min(int(n * 0.95), n - 1)], 2),
"p99_ms": round(latencies_sorted[min(int(n * 0.99), n - 1)], 2),
"mean_ms": round(statistics.mean(latencies_sorted), 2),
}
return {
"total": total,
"tier_stats": tier_stats,
"label_counts": label_counts,
}
# ── CSV batch classify ───────────────────────────────────────────────────────
def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
"""
CSV file classify karo.
Required columns: 'source', 'log_message'
Output: adds 'predicted_label', 'tier_used', 'confidence', 'latency_ms'
"""
df = pd.read_csv(input_path)
required = {"source", "log_message"}
if not required.issubset(df.columns):
raise ValueError(f"CSV mein ye columns chahiye: {required}. Mila: {set(df.columns)}")
log_pairs = list(zip(df["source"], df["log_message"]))
results = classify_logs(log_pairs)
df["predicted_label"] = [r["label"] for r in results]
df["tier_used"] = [r["tier"] for r in results]
df["latency_ms"] = [r["latency_ms"] for r in results]
df["confidence"] = [
f"{r['confidence']:.1%}" if r["confidence"] is not None else "N/A"
for r in results
]
df.to_csv(output_path, index=False)
return output_path, df
# Aliases
classify = classify_logs
# ── Self-test ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
sample = [
("ModernCRM", "IP 192.168.133.114 blocked due to potential attack"),
("BillingSystem", "User User12345 logged in."),
("AnalyticsEngine", "File data_6957.csv uploaded successfully by user User265."),
("ModernHR", "GET /v2/servers/detail HTTP/1.1 status: 200 len: 1583 time: 0.19"),
("ModernHR", "Admin access escalation detected for user 9429"),
("LegacyCRM", "Case escalation for ticket ID 7324 failed because the assigned support agent is no longer active."),
("LegacyCRM", "The 'ReportGenerator' module will be retired in version 4.0."),
]
print(f'{"Source":<20} {"Tier":<18} {"Conf":>6} {"Lat(ms)":>8} {"Label":<25} Log')
print("─" * 115)
results = classify_logs(sample)
for (source, log), r in zip(sample, results):
conf = f"{r['confidence']:.0%}" if r["confidence"] else " N/A"
print(f'{source:<20} {r["tier"]:<18} {conf:>6} {r["latency_ms"]:>8.1f} {r["label"]:<25} {log[:40]}')
summary = pipeline_summary(results)
print("\nπŸ“Š Pipeline Summary:")
for tier, stats in summary["tier_stats"].items():
print(f" {tier}: {stats['count']} logs ({stats['pct']}%) | "
f"p50={stats['p50_ms']}ms p95={stats['p95_ms']}ms p99={stats['p99_ms']}ms")
print("\n🏷️ Label distribution:")
for label, count in sorted(summary["label_counts"].items(), key=lambda x: -x[1]):
print(f" β€’ {label}: {count}")