NOT-OMEGA commited on
Commit
ca8312a
Β·
verified Β·
1 Parent(s): 3812273

Update classify.py

Browse files
Files changed (1) hide show
  1. classify.py +20 -77
classify.py CHANGED
@@ -1,14 +1,14 @@
1
  """
2
- classify.py β€” 3-Tier Hybrid Pipeline (V9 β€” Balanced CPU & Gradio Safe)
3
 
4
  Architecture:
5
  LegacyCRM β†’ LLM directly
6
  Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
 
8
- Changes in V9:
9
- - Fixed CPU Starvation: Limited max_workers to half the CPU cores to prevent Gradio WebSocket timeouts.
10
- - Reduced IPC Overhead: Lowered chunk_size to 10,000 to prevent CPU lockups during cross-process data pickling.
11
- - Restored Multi-processing: Outer chunks use ProcessPoolExecutor for speed, inner LLM calls use ThreadPoolExecutor.
12
  """
13
  from __future__ import annotations
14
  import os
@@ -16,7 +16,7 @@ import time
16
  import statistics
17
  import pandas as pd
18
  from functools import lru_cache
19
- from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
20
  from processor_regex import classify_with_regex
21
  from processor_bert import classify_batch as bert_batch
22
  from processor_llm import classify_with_llm
@@ -24,6 +24,8 @@ from processor_llm import classify_with_llm
24
  # ── Config ──────────────────────────────────────────────────────────────────
25
  LEGACY_SOURCE = os.getenv("LEGACY_SOURCE", "LegacyCRM")
26
 
 
 
27
 
28
  # ── Result type ─────────────────────────────────────────────────────────────
29
  def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
@@ -34,26 +36,22 @@ def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
34
  "latency_ms": round(latency_ms, 4),
35
  }
36
 
37
-
38
- # ── Caching Layer (Sharded per Worker) ──────────────────────────────────────
39
  @lru_cache(maxsize=500000)
40
  def cached_llm_call(log_msg: str) -> str:
41
  """Executes the expensive LLM call only if the string misses the cache."""
42
  return classify_with_llm(log_msg)
43
 
44
-
45
  # ── Single log (backward-compatible) ────────────────────────────────────────
46
  def classify_log(source: str, log_msg: str) -> dict:
47
  results = classify_logs([(source, log_msg)])
48
  return results[0]
49
 
50
-
51
  # ── Batch pipeline (main entry point) ───────────────────────────────────────
52
  def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
53
  n = len(logs)
54
  results = [None] * n
55
 
56
- # ── Step 1: Route to groups ─────────────────────────────────────────────
57
  llm_indices = []
58
  bert_indices = []
59
 
@@ -70,7 +68,7 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
70
  else:
71
  bert_indices.append(i)
72
 
73
- # ── Step 2: BERT batch (CPU Bound) ──────────────────────────────────────
74
  if bert_indices:
75
  bert_msgs = [logs[i][1] for i in bert_indices]
76
 
@@ -86,7 +84,7 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
86
  else:
87
  llm_indices.append(idx)
88
 
89
- # ── Step 3: LLM (I/O Bound - Threading Applied Here) ────────────────────
90
  if llm_indices:
91
  def parallel_llm(idx):
92
  src, msg = logs[idx]
@@ -100,58 +98,16 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
100
 
101
  return idx, _make_result(label, tier, None, t_llm_ms)
102
 
103
- with ThreadPoolExecutor() as executor:
104
- llm_results = list(executor.map(parallel_llm, llm_indices))
105
-
106
- for idx, res in llm_results:
107
  results[idx] = res
108
 
109
  return results
110
 
111
-
112
- # ── Pipeline summary ─────────────────────────────────────────────────────────
113
- def pipeline_summary(results: list[dict]) -> dict:
114
- tier_groups: dict[str, list[float]] = {}
115
- label_counts: dict[str, int] = {}
116
-
117
- for r in results:
118
- tier = r["tier"]
119
- tier_groups.setdefault(tier, []).append(r["latency_ms"])
120
- label_counts[r["label"]] = label_counts.get(r["label"], 0) + 1
121
-
122
- total = len(results)
123
- tier_stats = {}
124
- for tier, latencies in tier_groups.items():
125
- latencies_sorted = sorted(latencies)
126
- n = len(latencies_sorted)
127
- tier_stats[tier] = {
128
- "count": n,
129
- "pct": round(n / total * 100, 1),
130
- "p50_ms": round(statistics.median(latencies_sorted), 4),
131
- "p95_ms": round(latencies_sorted[min(int(n * 0.95), n - 1)], 4),
132
- "p99_ms": round(latencies_sorted[min(int(n * 0.99), n - 1)], 4),
133
- "mean_ms": round(statistics.mean(latencies_sorted), 4),
134
- "total_ms": round(sum(latencies_sorted), 4),
135
- }
136
-
137
- return {
138
- "total": total,
139
- "tier_stats": tier_stats,
140
- "label_counts": label_counts,
141
- }
142
-
143
-
144
- # ── Multiprocessing Helper ───────────────────────────────────────────────────
145
- def _process_chunk(chunk: list[tuple[str, str]]) -> list[dict]:
146
- """Top-level helper function required for ProcessPoolExecutor mapping."""
147
- return classify_logs(chunk)
148
-
149
-
150
- # ── CSV batch classify (Balanced Processing) ─────────────────────────────────
151
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
152
- """
153
- Balanced Batch Processing to prevent CPU Starvation UI crashes.
154
- """
155
  df = pd.read_csv(input_path)
156
  required = {"source", "log_message"}
157
  if not required.issubset(df.columns):
@@ -160,25 +116,14 @@ def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str,
160
  log_pairs = list(zip(df["source"], df["log_message"]))
161
  total_logs = len(log_pairs)
162
 
163
- # FIX: Use exactly half of the available CPU cores (minimum 1).
164
- # This leaves the other half for Gradio websockets and the OS.
165
- safe_cores = max(1, os.cpu_count() // 2)
166
-
167
- # FIX: Reduce chunk size to 10,000.
168
- # Massive chunks cause CPU lockups during inter-process data pickling.
169
- chunk_size = 10000
170
- chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
171
 
172
- results = []
173
 
174
- print(f"πŸ”₯ Firing up {safe_cores} CPU Cores (Leaving remaining for UI)...")
 
175
 
176
- t_start = time.perf_counter()
177
- with ProcessPoolExecutor(max_workers=safe_cores) as executor:
178
- for chunk_result in executor.map(_process_chunk, chunks):
179
- results.extend(chunk_result)
180
  t_end = time.perf_counter()
181
-
182
  print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
183
 
184
  df["predicted_label"] = [r["label"] for r in results]
@@ -192,6 +137,4 @@ def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str,
192
  df.to_csv(output_path, index=False)
193
  return output_path, df
194
 
195
-
196
- # Aliases
197
  classify = classify_logs
 
1
  """
2
+ classify.py β€” 3-Tier Hybrid Pipeline (V10 β€” Thread-Safe & Shared Cache)
3
 
4
  Architecture:
5
  LegacyCRM β†’ LLM directly
6
  Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
 
8
+ Changes in V10:
9
+ - Removed buggy ProcessPoolExecutor (Fixes fork deadlocks & memory spikes).
10
+ - Global ThreadPoolExecutor for LLM (Fixes thread thrashing & context switching).
11
+ - LRU Cache is now genuinely shared across the entire run.
12
  """
13
  from __future__ import annotations
14
  import os
 
16
  import statistics
17
  import pandas as pd
18
  from functools import lru_cache
19
+ from concurrent.futures import ThreadPoolExecutor
20
  from processor_regex import classify_with_regex
21
  from processor_bert import classify_batch as bert_batch
22
  from processor_llm import classify_with_llm
 
24
  # ── Config ──────────────────────────────────────────────────────────────────
25
  LEGACY_SOURCE = os.getenv("LEGACY_SOURCE", "LegacyCRM")
26
 
27
+ # FIX: One global pool to prevent OS thread thrashing per chunk.
28
+ _llm_executor = ThreadPoolExecutor(max_workers=min(32, (os.cpu_count() or 1) * 4))
29
 
30
  # ── Result type ─────────────────────────────────────────────────────────────
31
  def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
 
36
  "latency_ms": round(latency_ms, 4),
37
  }
38
 
39
+ # ── Caching Layer (Now Global) ──────────────────────────────────────────────
 
40
  @lru_cache(maxsize=500000)
41
  def cached_llm_call(log_msg: str) -> str:
42
  """Executes the expensive LLM call only if the string misses the cache."""
43
  return classify_with_llm(log_msg)
44
 
 
45
  # ── Single log (backward-compatible) ────────────────────────────────────────
46
  def classify_log(source: str, log_msg: str) -> dict:
47
  results = classify_logs([(source, log_msg)])
48
  return results[0]
49
 
 
50
  # ── Batch pipeline (main entry point) ───────────────────────────────────────
51
  def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
52
  n = len(logs)
53
  results = [None] * n
54
 
 
55
  llm_indices = []
56
  bert_indices = []
57
 
 
68
  else:
69
  bert_indices.append(i)
70
 
71
+ # ── Step 2: BERT batch (ONNX handles its own multi-threading) ───────────
72
  if bert_indices:
73
  bert_msgs = [logs[i][1] for i in bert_indices]
74
 
 
84
  else:
85
  llm_indices.append(idx)
86
 
87
+ # ── Step 3: LLM (I/O Bound - Using Global Thread Pool) ──────────────────
88
  if llm_indices:
89
  def parallel_llm(idx):
90
  src, msg = logs[idx]
 
98
 
99
  return idx, _make_result(label, tier, None, t_llm_ms)
100
 
101
+ # Delegate entirely to the pre-warmed global thread pool
102
+ futures = [_llm_executor.submit(parallel_llm, idx) for idx in llm_indices]
103
+ for future in futures:
104
+ idx, res = future.result()
105
  results[idx] = res
106
 
107
  return results
108
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
109
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
110
+ """Single-process batch processing (relying on ONNX C++ threads + Python network threads)"""
 
 
111
  df = pd.read_csv(input_path)
112
  required = {"source", "log_message"}
113
  if not required.issubset(df.columns):
 
116
  log_pairs = list(zip(df["source"], df["log_message"]))
117
  total_logs = len(log_pairs)
118
 
119
+ print(f"πŸ”₯ Processing {total_logs} logs (Thread Pool active for LLMs)...")
 
 
 
 
 
 
 
120
 
121
+ t_start = time.perf_counter()
122
 
123
+ # Process everything in one go - let classify_logs handle the internal batching
124
+ results = classify_logs(log_pairs)
125
 
 
 
 
 
126
  t_end = time.perf_counter()
 
127
  print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
128
 
129
  df["predicted_label"] = [r["label"] for r in results]
 
137
  df.to_csv(output_path, index=False)
138
  return output_path, df
139
 
 
 
140
  classify = classify_logs