Spaces:
Sleeping
Sleeping
File size: 8,896 Bytes
2222383 6ffecdd 2222383 6ffecdd 2222383 3f1a1d6 2222383 f2c6de4 6ffecdd 2222383 b90444b 2222383 f2c6de4 2222383 6ffecdd a65e2b6 b90444b a65e2b6 f2c6de4 2222383 de30f06 2222383 de30f06 2222383 de30f06 2222383 6ffecdd 2222383 a65e2b6 217890c f2c6de4 217890c b90444b 217890c f2c6de4 217890c b90444b 217890c 2222383 f2c6de4 3f1a1d6 2222383 6ffecdd a65e2b6 2222383 3f1a1d6 6ffecdd 2222383 de30f06 2222383 3f1a1d6 6ffecdd 3f1a1d6 6ffecdd b90444b 6ffecdd b90444b 2222383 f2c6de4 2222383 a65e2b6 2222383 a65e2b6 2222383 a65e2b6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | """
classify.py β 3-Tier Hybrid Pipeline (V7 β Maximum Speed & Sharded Caching)
Architecture:
LegacyCRM β LLM directly
Others β Regex β BERT (batch) β LLM fallback
Changes in V7:
- Unfroze the Gradio UI and restored Processing Speed: Brought back ProcessPoolExecutor for the outer CSV chunks to utilize ALL CPU cores.
- LLM concurrency: ThreadPoolExecutor is retained inside classify_logs specifically for LLM I/O calls.
- Cache Architecture: Using a "Sharded Cache" approach. Each CPU worker process gets its own 500k @lru_cache, which is perfectly safe for 18GB RAM and avoids GIL locks entirely.
"""
from __future__ import annotations
import os
import time
import statistics
import pandas as pd
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from processor_regex import classify_with_regex
from processor_bert import classify_batch as bert_batch
from processor_llm import classify_with_llm
# ββ Config ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
LEGACY_SOURCE = os.getenv("LEGACY_SOURCE", "LegacyCRM")
# ββ Result type βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
return {
"label": label,
"tier": tier,
"confidence": confidence,
"latency_ms": round(latency_ms, 4),
}
# ββ Caching Layer (Sharded per CPU Core) ββββββββββββββββββββββββββββββββββββ
@lru_cache(maxsize=500000)
def cached_llm_call(log_msg: str) -> str:
"""Executes the expensive LLM call only if the string misses the cache."""
return classify_with_llm(log_msg)
# ββ Single log (backward-compatible) ββββββββββββββββββββββββββββββββββββββββ
def classify_log(source: str, log_msg: str) -> dict:
results = classify_logs([(source, log_msg)])
return results[0]
# ββ Batch pipeline (main entry point) βββββββββββββββββββββββββββββββββββββββ
def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
n = len(logs)
results = [None] * n
# ββ Step 1: Route to groups βββββββββββββββββββββββββββββββββββββββββββββ
llm_indices = []
bert_indices = []
for i, (source, log_msg) in enumerate(logs):
if source == LEGACY_SOURCE:
llm_indices.append(i)
else:
t_start = time.perf_counter()
label = classify_with_regex(log_msg)
if label:
latency_ms = (time.perf_counter() - t_start) * 1000
results[i] = _make_result(label, "Regex", 1.0, latency_ms)
else:
bert_indices.append(i)
# ββ Step 2: BERT batch (CPU Bound - Uses full core without GIL) βββββββββ
if bert_indices:
bert_msgs = [logs[i][1] for i in bert_indices]
t_bert_start = time.perf_counter()
bert_results = bert_batch(bert_msgs)
t_bert_end = time.perf_counter()
bert_ms_per_log = (t_bert_end - t_bert_start) * 1000 / len(bert_msgs)
for idx, (label, conf) in zip(bert_indices, bert_results):
if label != "Unclassified":
results[idx] = _make_result(label, "BERT", conf, bert_ms_per_log)
else:
llm_indices.append(idx)
# ββ Step 3: LLM (I/O Bound - Threading Applied Here) ββββββββββββββββββββ
if llm_indices:
def parallel_llm(idx):
src, msg = logs[idx]
t_llm_0 = time.perf_counter()
label = cached_llm_call(msg)
t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
base_tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
tier = f"{base_tier} (Cache Hit)" if t_llm_ms < 5 else f"{base_tier} (API Call)"
return idx, _make_result(label, tier, None, t_llm_ms)
with ThreadPoolExecutor() as executor:
llm_results = list(executor.map(parallel_llm, llm_indices))
for idx, res in llm_results:
results[idx] = res
return results
# ββ Pipeline summary βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def pipeline_summary(results: list[dict]) -> dict:
tier_groups: dict[str, list[float]] = {}
label_counts: dict[str, int] = {}
for r in results:
tier = r["tier"]
tier_groups.setdefault(tier, []).append(r["latency_ms"])
label_counts[r["label"]] = label_counts.get(r["label"], 0) + 1
total = len(results)
tier_stats = {}
for tier, latencies in tier_groups.items():
latencies_sorted = sorted(latencies)
n = len(latencies_sorted)
tier_stats[tier] = {
"count": n,
"pct": round(n / total * 100, 1),
"p50_ms": round(statistics.median(latencies_sorted), 4),
"p95_ms": round(latencies_sorted[min(int(n * 0.95), n - 1)], 4),
"p99_ms": round(latencies_sorted[min(int(n * 0.99), n - 1)], 4),
"mean_ms": round(statistics.mean(latencies_sorted), 4),
"total_ms": round(sum(latencies_sorted), 4),
}
return {
"total": total,
"tier_stats": tier_stats,
"label_counts": label_counts,
}
# ββ Multiprocessing Helper βββββββββββββββββββββββββββββββββββββββββββββββββββ
def _process_chunk(chunk: list[tuple[str, str]]) -> list[dict]:
"""Top-level helper function required for ProcessPoolExecutor mapping."""
return classify_logs(chunk)
# ββ CSV batch classify (Hybrid Processing) βββββββββββββββββββββββββββββββββββ
def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
"""
Ultra-Optimized Batch Processing for 2M+ Logs.
Outer chunks use ProcessPoolExecutor to smash through BERT on all CPU cores.
Inner LLM calls automatically use ThreadPoolExecutor to handle network I/O.
"""
df = pd.read_csv(input_path)
required = {"source", "log_message"}
if not required.issubset(df.columns):
raise ValueError(f"Missing required columns in CSV. Expected: {required}. Found: {set(df.columns)}")
log_pairs = list(zip(df["source"], df["log_message"]))
total_logs = len(log_pairs)
max_cores = max(1, os.cpu_count() - 1)
chunk_size = 50000
chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
results = []
print(f"π₯ Firing up {max_cores} Process Cores... (BERT gets raw CPU, LLM gets Threads)")
t_start = time.perf_counter()
# Brought ProcessPoolExecutor back to unblock the CPU and UI
with ProcessPoolExecutor(max_workers=max_cores) as executor:
for chunk_result in executor.map(_process_chunk, chunks):
results.extend(chunk_result)
t_end = time.perf_counter()
print(f"β±οΈ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
df["predicted_label"] = [r["label"] for r in results]
df["tier_used"] = [r["tier"] for r in results]
df["latency_ms"] = [r["latency_ms"] for r in results]
df["confidence"] = [
f"{r['confidence']:.1%}" if r["confidence"] is not None else "N/A"
for r in results
]
df.to_csv(output_path, index=False)
return output_path, df
# Aliases
classify = classify_logs
# ββ Self-test ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
if __name__ == "__main__":
sample = [
("ModernCRM", "IP 192.168.133.114 blocked due to potential attack"),
("BillingSystem", "User User12345 logged in."),
("LegacyCRM", "Case escalation failed due to active timeout."),
]
print("Running quick test...")
results = classify_logs(sample)
print("Done. No errors.") |