Spaces:
Sleeping
Sleeping
| """ | |
| classify.py β 3-Tier Hybrid Pipeline (V7 β Maximum Speed & Sharded Caching) | |
| Architecture: | |
| LegacyCRM β LLM directly | |
| Others β Regex β BERT (batch) β LLM fallback | |
| Changes in V7: | |
| - Unfroze the Gradio UI and restored Processing Speed: Brought back ProcessPoolExecutor for the outer CSV chunks to utilize ALL CPU cores. | |
| - LLM concurrency: ThreadPoolExecutor is retained inside classify_logs specifically for LLM I/O calls. | |
| - Cache Architecture: Using a "Sharded Cache" approach. Each CPU worker process gets its own 500k @lru_cache, which is perfectly safe for 18GB RAM and avoids GIL locks entirely. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import time | |
| import statistics | |
| import pandas as pd | |
| from functools import lru_cache | |
| from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor | |
| from processor_regex import classify_with_regex | |
| from processor_bert import classify_batch as bert_batch | |
| from processor_llm import classify_with_llm | |
| # ββ Config ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| LEGACY_SOURCE = os.getenv("LEGACY_SOURCE", "LegacyCRM") | |
| # ββ Result type βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict: | |
| return { | |
| "label": label, | |
| "tier": tier, | |
| "confidence": confidence, | |
| "latency_ms": round(latency_ms, 4), | |
| } | |
| # ββ Caching Layer (Sharded per CPU Core) ββββββββββββββββββββββββββββββββββββ | |
| def cached_llm_call(log_msg: str) -> str: | |
| """Executes the expensive LLM call only if the string misses the cache.""" | |
| return classify_with_llm(log_msg) | |
| # ββ Single log (backward-compatible) ββββββββββββββββββββββββββββββββββββββββ | |
| def classify_log(source: str, log_msg: str) -> dict: | |
| results = classify_logs([(source, log_msg)]) | |
| return results[0] | |
| # ββ Batch pipeline (main entry point) βββββββββββββββββββββββββββββββββββββββ | |
| def classify_logs(logs: list[tuple[str, str]]) -> list[dict]: | |
| n = len(logs) | |
| results = [None] * n | |
| # ββ Step 1: Route to groups βββββββββββββββββββββββββββββββββββββββββββββ | |
| llm_indices = [] | |
| bert_indices = [] | |
| for i, (source, log_msg) in enumerate(logs): | |
| if source == LEGACY_SOURCE: | |
| llm_indices.append(i) | |
| else: | |
| t_start = time.perf_counter() | |
| label = classify_with_regex(log_msg) | |
| if label: | |
| latency_ms = (time.perf_counter() - t_start) * 1000 | |
| results[i] = _make_result(label, "Regex", 1.0, latency_ms) | |
| else: | |
| bert_indices.append(i) | |
| # ββ Step 2: BERT batch (CPU Bound - Uses full core without GIL) βββββββββ | |
| if bert_indices: | |
| bert_msgs = [logs[i][1] for i in bert_indices] | |
| t_bert_start = time.perf_counter() | |
| bert_results = bert_batch(bert_msgs) | |
| t_bert_end = time.perf_counter() | |
| bert_ms_per_log = (t_bert_end - t_bert_start) * 1000 / len(bert_msgs) | |
| for idx, (label, conf) in zip(bert_indices, bert_results): | |
| if label != "Unclassified": | |
| results[idx] = _make_result(label, "BERT", conf, bert_ms_per_log) | |
| else: | |
| llm_indices.append(idx) | |
| # ββ Step 3: LLM (I/O Bound - Threading Applied Here) ββββββββββββββββββββ | |
| if llm_indices: | |
| def parallel_llm(idx): | |
| src, msg = logs[idx] | |
| t_llm_0 = time.perf_counter() | |
| label = cached_llm_call(msg) | |
| t_llm_ms = (time.perf_counter() - t_llm_0) * 1000 | |
| base_tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)" | |
| tier = f"{base_tier} (Cache Hit)" if t_llm_ms < 5 else f"{base_tier} (API Call)" | |
| return idx, _make_result(label, tier, None, t_llm_ms) | |
| with ThreadPoolExecutor() as executor: | |
| llm_results = list(executor.map(parallel_llm, llm_indices)) | |
| for idx, res in llm_results: | |
| results[idx] = res | |
| return results | |
| # ββ Pipeline summary βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def pipeline_summary(results: list[dict]) -> dict: | |
| tier_groups: dict[str, list[float]] = {} | |
| label_counts: dict[str, int] = {} | |
| for r in results: | |
| tier = r["tier"] | |
| tier_groups.setdefault(tier, []).append(r["latency_ms"]) | |
| label_counts[r["label"]] = label_counts.get(r["label"], 0) + 1 | |
| total = len(results) | |
| tier_stats = {} | |
| for tier, latencies in tier_groups.items(): | |
| latencies_sorted = sorted(latencies) | |
| n = len(latencies_sorted) | |
| tier_stats[tier] = { | |
| "count": n, | |
| "pct": round(n / total * 100, 1), | |
| "p50_ms": round(statistics.median(latencies_sorted), 4), | |
| "p95_ms": round(latencies_sorted[min(int(n * 0.95), n - 1)], 4), | |
| "p99_ms": round(latencies_sorted[min(int(n * 0.99), n - 1)], 4), | |
| "mean_ms": round(statistics.mean(latencies_sorted), 4), | |
| "total_ms": round(sum(latencies_sorted), 4), | |
| } | |
| return { | |
| "total": total, | |
| "tier_stats": tier_stats, | |
| "label_counts": label_counts, | |
| } | |
| # ββ Multiprocessing Helper βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _process_chunk(chunk: list[tuple[str, str]]) -> list[dict]: | |
| """Top-level helper function required for ProcessPoolExecutor mapping.""" | |
| return classify_logs(chunk) | |
| # ββ CSV batch classify (Hybrid Processing) βββββββββββββββββββββββββββββββββββ | |
| def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]: | |
| """ | |
| Ultra-Optimized Batch Processing for 2M+ Logs. | |
| Outer chunks use ProcessPoolExecutor to smash through BERT on all CPU cores. | |
| Inner LLM calls automatically use ThreadPoolExecutor to handle network I/O. | |
| """ | |
| df = pd.read_csv(input_path) | |
| required = {"source", "log_message"} | |
| if not required.issubset(df.columns): | |
| raise ValueError(f"Missing required columns in CSV. Expected: {required}. Found: {set(df.columns)}") | |
| log_pairs = list(zip(df["source"], df["log_message"])) | |
| total_logs = len(log_pairs) | |
| max_cores = max(1, os.cpu_count() - 1) | |
| chunk_size = 50000 | |
| chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)] | |
| results = [] | |
| print(f"π₯ Firing up {max_cores} Process Cores... (BERT gets raw CPU, LLM gets Threads)") | |
| t_start = time.perf_counter() | |
| # Brought ProcessPoolExecutor back to unblock the CPU and UI | |
| with ProcessPoolExecutor(max_workers=max_cores) as executor: | |
| for chunk_result in executor.map(_process_chunk, chunks): | |
| results.extend(chunk_result) | |
| t_end = time.perf_counter() | |
| print(f"β±οΈ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds") | |
| df["predicted_label"] = [r["label"] for r in results] | |
| df["tier_used"] = [r["tier"] for r in results] | |
| df["latency_ms"] = [r["latency_ms"] for r in results] | |
| df["confidence"] = [ | |
| f"{r['confidence']:.1%}" if r["confidence"] is not None else "N/A" | |
| for r in results | |
| ] | |
| df.to_csv(output_path, index=False) | |
| return output_path, df | |
| # Aliases | |
| classify = classify_logs | |
| # ββ Self-test ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| sample = [ | |
| ("ModernCRM", "IP 192.168.133.114 blocked due to potential attack"), | |
| ("BillingSystem", "User User12345 logged in."), | |
| ("LegacyCRM", "Case escalation failed due to active timeout."), | |
| ] | |
| print("Running quick test...") | |
| results = classify_logs(sample) | |
| print("Done. No errors.") |