Spaces:
Sleeping
Sleeping
File size: 8,316 Bytes
2222383 1a9b340 2222383 3f1a1d6 2222383 f2c6de4 7d3f899 2222383 b90444b 2222383 1a9b340 2222383 f2c6de4 2222383 1a9b340 b90444b 1a9b340 f2c6de4 1a9b340 8ca0e43 1a9b340 2222383 1a9b340 2222383 de30f06 2222383 de30f06 2222383 de30f06 2222383 1a9b340 2222383 1a9b340 2222383 1a9b340 217890c 1a9b340 217890c b90444b 217890c f2c6de4 217890c 1a9b340 2222383 1a9b340 2d11b15 1a9b340 2d11b15 1a9b340 2222383 1a9b340 2222383 1a9b340 2222383 3f1a1d6 1a9b340 2d11b15 1a9b340 3f1a1d6 ca8312a 1a9b340 2d11b15 b90444b 2d11b15 b90444b 2222383 f2c6de4 2222383 1a9b340 ace5ccf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 | """
classify.py β 3-Tier Hybrid Pipeline (V9 β Balanced CPU & Gradio Safe)
Architecture:
LegacyCRM β LLM directly
Others β Regex β BERT (batch) β LLM fallback
Changes in V9:
- Fixed CPU Starvation: Limited max_workers to half the CPU cores to prevent Gradio WebSocket timeouts.
- Reduced IPC Overhead: Lowered chunk_size to 10,000 to prevent CPU lockups during cross-process data pickling.
- Restored Multi-processing: Outer chunks use ProcessPoolExecutor for speed, inner LLM calls use ThreadPoolExecutor.
"""
from __future__ import annotations
import os
import time
import statistics
import pandas as pd
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from processor_regex import classify_with_regex
from processor_bert import classify_batch as bert_batch
from processor_llm import classify_with_llm
# ββ Config ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
LEGACY_SOURCE = os.getenv("LEGACY_SOURCE", "LegacyCRM")
# ββ Result type βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
return {
"label": label,
"tier": tier,
"confidence": confidence,
"latency_ms": round(latency_ms, 4),
}
# ββ Caching Layer (Sharded per Worker) ββββββββββββββββββββββββββββββββββββββ
@lru_cache(maxsize=500000)
def cached_llm_call(log_msg: str) -> str:
"""Executes the expensive LLM call only if the string misses the cache."""
return classify_with_llm(log_msg)
# ββ Single log (backward-compatible) ββββββββββββββββββββββββββββββββββββββββ
def classify_log(source: str, log_msg: str) -> dict:
results = classify_logs([(source, log_msg)])
return results[0]
# ββ Batch pipeline (main entry point) βββββββββββββββββββββββββββββββββββββββ
def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
n = len(logs)
results = [None] * n
# ββ Step 1: Route to groups βββββββββββββββββββββββββββββββββββββββββββββ
llm_indices = []
bert_indices = []
for i, (source, log_msg) in enumerate(logs):
if source == LEGACY_SOURCE:
llm_indices.append(i)
else:
t_start = time.perf_counter()
label = classify_with_regex(log_msg)
if label:
latency_ms = (time.perf_counter() - t_start) * 1000
results[i] = _make_result(label, "Regex", 1.0, latency_ms)
else:
bert_indices.append(i)
# ββ Step 2: BERT batch (CPU Bound) ββββββββββββββββββββββββββββββββββββββ
if bert_indices:
bert_msgs = [logs[i][1] for i in bert_indices]
t_bert_start = time.perf_counter()
bert_results = bert_batch(bert_msgs)
t_bert_end = time.perf_counter()
bert_ms_per_log = (t_bert_end - t_bert_start) * 1000 / len(bert_msgs)
for idx, (label, conf) in zip(bert_indices, bert_results):
if label != "Unclassified":
results[idx] = _make_result(label, "BERT", conf, bert_ms_per_log)
else:
llm_indices.append(idx)
# ββ Step 3: LLM (I/O Bound - Threading Applied Here) ββββββββββββββββββββ
if llm_indices:
def parallel_llm(idx):
src, msg = logs[idx]
t_llm_0 = time.perf_counter()
label = cached_llm_call(msg)
t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
base_tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
tier = f"{base_tier} (Cache Hit)" if t_llm_ms < 5 else f"{base_tier} (API Call)"
return idx, _make_result(label, tier, None, t_llm_ms)
with ThreadPoolExecutor() as executor:
llm_results = list(executor.map(parallel_llm, llm_indices))
for idx, res in llm_results:
results[idx] = res
return results
# ββ Pipeline summary βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def pipeline_summary(results: list[dict]) -> dict:
tier_groups: dict[str, list[float]] = {}
label_counts: dict[str, int] = {}
for r in results:
tier = r["tier"]
tier_groups.setdefault(tier, []).append(r["latency_ms"])
label_counts[r["label"]] = label_counts.get(r["label"], 0) + 1
total = len(results)
tier_stats = {}
for tier, latencies in tier_groups.items():
latencies_sorted = sorted(latencies)
n = len(latencies_sorted)
tier_stats[tier] = {
"count": n,
"pct": round(n / total * 100, 1),
"p50_ms": round(statistics.median(latencies_sorted), 4),
"p95_ms": round(latencies_sorted[min(int(n * 0.95), n - 1)], 4),
"p99_ms": round(latencies_sorted[min(int(n * 0.99), n - 1)], 4),
"mean_ms": round(statistics.mean(latencies_sorted), 4),
"total_ms": round(sum(latencies_sorted), 4),
}
return {
"total": total,
"tier_stats": tier_stats,
"label_counts": label_counts,
}
# ββ Multiprocessing Helper βββββββββββββββββββββββββββββββββββββββββββββββββββ
def _process_chunk(chunk: list[tuple[str, str]]) -> list[dict]:
"""Top-level helper function required for ProcessPoolExecutor mapping."""
return classify_logs(chunk)
# ββ CSV batch classify (Balanced Processing) βββββββββββββββββββββββββββββββββ
def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
"""
Balanced Batch Processing to prevent CPU Starvation UI crashes.
"""
df = pd.read_csv(input_path)
required = {"source", "log_message"}
if not required.issubset(df.columns):
raise ValueError(f"Missing required columns in CSV. Expected: {required}. Found: {set(df.columns)}")
log_pairs = list(zip(df["source"], df["log_message"]))
total_logs = len(log_pairs)
# FIX: Use exactly half of the available CPU cores (minimum 1).
# This leaves the other half for Gradio websockets and the OS.
safe_cores = max(1, os.cpu_count() // 2)
# FIX: Reduce chunk size to 10,000.
# Massive chunks cause CPU lockups during inter-process data pickling.
chunk_size = 10000
chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
results = []
print(f"π₯ Firing up {safe_cores} CPU Cores (Leaving remaining for UI)...")
t_start = time.perf_counter()
with ProcessPoolExecutor(max_workers=safe_cores) as executor:
for chunk_result in executor.map(_process_chunk, chunks):
results.extend(chunk_result)
t_end = time.perf_counter()
print(f"β±οΈ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
df["predicted_label"] = [r["label"] for r in results]
df["tier_used"] = [r["tier"] for r in results]
df["latency_ms"] = [r["latency_ms"] for r in results]
df["confidence"] = [
f"{r['confidence']:.1%}" if r["confidence"] is not None else "N/A"
for r in results
]
df.to_csv(output_path, index=False)
return output_path, df
# Aliases
classify = classify_logs |