NOT-OMEGA commited on
Commit
668419c
Β·
verified Β·
1 Parent(s): 9ca9aea

Update classify.py

Browse files
Files changed (1) hide show
  1. classify.py +91 -55
classify.py CHANGED
@@ -1,21 +1,23 @@
1
  """
2
- classify.py β€” 3-Tier Hybrid Pipeline (V9 β€” Balanced CPU & Gradio Safe)
3
-
4
- Architecture:
5
- LegacyCRM β†’ LLM directly
6
- Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
-
8
- Changes in V9:
9
- - Fixed CPU Starvation: Limited max_workers to half the CPU cores to prevent Gradio WebSocket timeouts.
10
- - Reduced IPC Overhead: Lowered chunk_size to 10,000 to prevent CPU lockups during cross-process data pickling.
11
- - Restored Multi-processing: Outer chunks use ProcessPoolExecutor for speed, inner LLM calls use ThreadPoolExecutor.
 
 
12
  """
13
  from __future__ import annotations
14
  import os
15
  import time
16
  import statistics
17
  import pandas as pd
18
- from functools import lru_cache
19
  from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
20
  from processor_regex import classify_with_regex
21
  from processor_bert import classify_batch as bert_batch
@@ -24,6 +26,36 @@ from processor_llm import classify_with_llm
24
  # ── Config ──────────────────────────────────────────────────────────────────
25
  LEGACY_SOURCE = os.getenv("LEGACY_SOURCE", "LegacyCRM")
26
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
 
28
  # ── Result type ─────────────────────────────────────────────────────────────
29
  def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
@@ -31,17 +63,10 @@ def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
31
  "label": label,
32
  "tier": tier,
33
  "confidence": confidence,
34
- "latency_ms": round(latency_ms, 4),
35
  }
36
 
37
 
38
- # ── Caching Layer (Sharded per Worker) ──────────────────────────────────────
39
- @lru_cache(maxsize=500000)
40
- def cached_llm_call(log_msg: str) -> str:
41
- """Executes the expensive LLM call only if the string misses the cache."""
42
- return classify_with_llm(log_msg)
43
-
44
-
45
  # ── Single log (backward-compatible) ────────────────────────────────────────
46
  def classify_log(source: str, log_msg: str) -> dict:
47
  results = classify_logs([(source, log_msg)])
@@ -49,13 +74,13 @@ def classify_log(source: str, log_msg: str) -> dict:
49
 
50
 
51
  # ── Batch pipeline (main entry point) ───────────────────────────────────────
52
- def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
53
  n = len(logs)
54
  results = [None] * n
55
 
56
  # ── Step 1: Route to groups ─────────────────────────────────────────────
57
- llm_indices = []
58
- bert_indices = []
59
 
60
  for i, (source, log_msg) in enumerate(logs):
61
  if source == LEGACY_SOURCE:
@@ -63,7 +88,7 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
63
  else:
64
  t_start = time.perf_counter()
65
  label = classify_with_regex(log_msg)
66
-
67
  if label:
68
  latency_ms = (time.perf_counter() - t_start) * 1000
69
  results[i] = _make_result(label, "Regex", 1.0, latency_ms)
@@ -76,28 +101,36 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
76
 
77
  t_bert_start = time.perf_counter()
78
  bert_results = bert_batch(bert_msgs)
79
- t_bert_end = time.perf_counter()
80
 
81
- bert_ms_per_log = (t_bert_end - t_bert_start) * 1000 / len(bert_msgs)
 
 
 
 
82
 
83
  for idx, (label, conf) in zip(bert_indices, bert_results):
84
  if label != "Unclassified":
85
- results[idx] = _make_result(label, "BERT", conf, bert_ms_per_log)
86
  else:
87
  llm_indices.append(idx)
88
 
89
- # ── Step 3: LLM (I/O Bound - Threading Applied Here) ────────────────────
90
  if llm_indices:
 
 
91
  def parallel_llm(idx):
92
  src, msg = logs[idx]
93
-
94
  t_llm_0 = time.perf_counter()
95
- label = cached_llm_call(msg)
 
96
  t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
97
-
98
  base_tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
99
- tier = f"{base_tier} (Cache Hit)" if t_llm_ms < 5 else f"{base_tier} (API Call)"
100
-
 
101
  return idx, _make_result(label, tier, None, t_llm_ms)
102
 
103
  with ThreadPoolExecutor() as executor:
@@ -110,9 +143,14 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
110
 
111
 
112
  # ── Pipeline summary ─────────────────────────────────────────────────────────
113
- def pipeline_summary(results: list[dict]) -> dict:
114
- tier_groups: dict[str, list[float]] = {}
115
- label_counts: dict[str, int] = {}
 
 
 
 
 
116
 
117
  for r in results:
118
  tier = r["tier"]
@@ -131,7 +169,7 @@ def pipeline_summary(results: list[dict]) -> dict:
131
  "p95_ms": round(latencies_sorted[min(int(n * 0.95), n - 1)], 4),
132
  "p99_ms": round(latencies_sorted[min(int(n * 0.99), n - 1)], 4),
133
  "mean_ms": round(statistics.mean(latencies_sorted), 4),
134
- "total_ms": round(sum(latencies_sorted), 4),
135
  }
136
 
137
  return {
@@ -142,13 +180,13 @@ def pipeline_summary(results: list[dict]) -> dict:
142
 
143
 
144
  # ── Multiprocessing Helper ───────────────────────────────────────────────────
145
- def _process_chunk(chunk: list[tuple[str, str]]) -> list[dict]:
146
- """Top-level helper function required for ProcessPoolExecutor mapping."""
147
  return classify_logs(chunk)
148
 
149
 
150
  # ── CSV batch classify (Balanced Processing) ─────────────────────────────────
151
- def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
152
  """
153
  Balanced Batch Processing to prevent CPU Starvation UI crashes.
154
  """
@@ -157,33 +195,31 @@ def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str,
157
  if not required.issubset(df.columns):
158
  raise ValueError(f"Missing required columns in CSV. Expected: {required}. Found: {set(df.columns)}")
159
 
160
- log_pairs = list(zip(df["source"], df["log_message"]))
161
  total_logs = len(log_pairs)
162
-
163
- # FIX: Use exactly half of the available CPU cores (minimum 1).
164
- # This leaves the other half for Gradio websockets and the OS.
165
  safe_cores = max(1, os.cpu_count() // 2)
166
-
167
- # FIX: Reduce chunk size to 10,000.
168
- # Massive chunks cause CPU lockups during inter-process data pickling.
169
- chunk_size = 10000
170
  chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
171
-
172
  results = []
173
-
174
  print(f"πŸ”₯ Firing up {safe_cores} CPU Cores (Leaving remaining for UI)...")
175
-
176
  t_start = time.perf_counter()
177
  with ProcessPoolExecutor(max_workers=safe_cores) as executor:
178
  for chunk_result in executor.map(_process_chunk, chunks):
179
  results.extend(chunk_result)
180
  t_end = time.perf_counter()
181
-
182
  print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
183
 
184
- df["predicted_label"] = [r["label"] for r in results]
185
- df["tier_used"] = [r["tier"] for r in results]
186
- df["latency_ms"] = [r["latency_ms"] for r in results]
187
  df["confidence"] = [
188
  f"{r['confidence']:.1%}" if r["confidence"] is not None else "N/A"
189
  for r in results
@@ -194,4 +230,4 @@ def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str,
194
 
195
 
196
  # Aliases
197
- classify = classify_logs
 
1
  """
2
+ classify.py β€” 3-Tier Hybrid Pipeline (V10 β€” Bug Fixed)
3
+
4
+ Bug fixes vs V9:
5
+ 1. BERT latency was reporting cumulative sum of per-log values (= total batch ms),
6
+ not actual per-log latency. Now stores real wall-clock batch time separately
7
+ and reports true per-log ms.
8
+ 2. @lru_cache was per-process β€” with ProcessPoolExecutor every worker had its own
9
+ cold cache, so cross-process cache hits were impossible. Replaced with a
10
+ multiprocessing.Manager dict shared across all workers.
11
+ 3. LLM tier label was using a hard '<5ms' threshold to detect cache hits which
12
+ was unreliable (cold process startup skews timings). Now uses an explicit
13
+ boolean returned alongside the label.
14
  """
15
  from __future__ import annotations
16
  import os
17
  import time
18
  import statistics
19
  import pandas as pd
20
+ import multiprocessing
21
  from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
22
  from processor_regex import classify_with_regex
23
  from processor_bert import classify_batch as bert_batch
 
26
  # ── Config ──────────────────────────────────────────────────────────────────
27
  LEGACY_SOURCE = os.getenv("LEGACY_SOURCE", "LegacyCRM")
28
 
29
+ # ── Shared cross-process LLM cache ──────────────────────────────────────────
30
+ # BUG FIX #2: @lru_cache is per-process. With ProcessPoolExecutor, every worker
31
+ # has its own private cache that never warms across processes.
32
+ # Using multiprocessing.Manager().dict() gives a single shared cache for all workers.
33
+ _manager = None
34
+ _shared_llm_cache = None
35
+
36
+ def _get_shared_cache():
37
+ """Return (or lazily create) the shared cross-process LLM cache."""
38
+ global _manager, _shared_llm_cache
39
+ if _shared_llm_cache is None:
40
+ _manager = multiprocessing.Manager()
41
+ _shared_llm_cache = _manager.dict()
42
+ return _shared_llm_cache
43
+
44
+
45
+ def _cached_llm_call(log_msg: str, cache: dict) -> tuple:
46
+ """
47
+ Call LLM with shared cross-process cache.
48
+ Returns (label, cache_hit).
49
+ BUG FIX #2: uses shared dict instead of @lru_cache so all worker processes
50
+ benefit from each other's lookups.
51
+ BUG FIX #3: returns explicit cache_hit boolean instead of inferring from latency.
52
+ """
53
+ if log_msg in cache:
54
+ return cache[log_msg], True
55
+ label = classify_with_llm(log_msg)
56
+ cache[log_msg] = label
57
+ return label, False
58
+
59
 
60
  # ── Result type ─────────────────────────────────────────────────────────────
61
  def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
 
63
  "label": label,
64
  "tier": tier,
65
  "confidence": confidence,
66
+ "latency_ms": round(latency_ms, 4),
67
  }
68
 
69
 
 
 
 
 
 
 
 
70
  # ── Single log (backward-compatible) ────────────────────────────────────────
71
  def classify_log(source: str, log_msg: str) -> dict:
72
  results = classify_logs([(source, log_msg)])
 
74
 
75
 
76
  # ── Batch pipeline (main entry point) ───────────────────────────────────────
77
+ def classify_logs(logs: list) -> list:
78
  n = len(logs)
79
  results = [None] * n
80
 
81
  # ── Step 1: Route to groups ─────────────────────────────────────────────
82
+ llm_indices = []
83
+ bert_indices = []
84
 
85
  for i, (source, log_msg) in enumerate(logs):
86
  if source == LEGACY_SOURCE:
 
88
  else:
89
  t_start = time.perf_counter()
90
  label = classify_with_regex(log_msg)
91
+
92
  if label:
93
  latency_ms = (time.perf_counter() - t_start) * 1000
94
  results[i] = _make_result(label, "Regex", 1.0, latency_ms)
 
101
 
102
  t_bert_start = time.perf_counter()
103
  bert_results = bert_batch(bert_msgs)
104
+ t_bert_wall_ms = (time.perf_counter() - t_bert_start) * 1000
105
 
106
+ # BUG FIX #1: store TRUE per-log wall-clock ms.
107
+ # Old code did: bert_ms_per_log = total_ms / n, then assigned that same
108
+ # value to every log. pipeline_summary() then summed all n copies back up
109
+ # to total_ms β€” making BERT look like it took 2,962,635 ms on 2M logs.
110
+ bert_per_log_ms = t_bert_wall_ms / len(bert_msgs)
111
 
112
  for idx, (label, conf) in zip(bert_indices, bert_results):
113
  if label != "Unclassified":
114
+ results[idx] = _make_result(label, "BERT", conf, bert_per_log_ms)
115
  else:
116
  llm_indices.append(idx)
117
 
118
+ # ── Step 3: LLM (I/O Bound β€” Threading Applied Here) ────────────────────
119
  if llm_indices:
120
+ cache = _get_shared_cache()
121
+
122
  def parallel_llm(idx):
123
  src, msg = logs[idx]
124
+
125
  t_llm_0 = time.perf_counter()
126
+ # BUG FIX #2 + #3: shared cache + explicit cache_hit flag
127
+ label, cache_hit = _cached_llm_call(msg, cache)
128
  t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
129
+
130
  base_tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
131
+ # BUG FIX #3: explicit boolean, not fragile latency threshold
132
+ tier = f"{base_tier} (Cache Hit)" if cache_hit else f"{base_tier} (API Call)"
133
+
134
  return idx, _make_result(label, tier, None, t_llm_ms)
135
 
136
  with ThreadPoolExecutor() as executor:
 
143
 
144
 
145
  # ── Pipeline summary ─────────────────────────────────────────────────────────
146
+ def pipeline_summary(results: list) -> dict:
147
+ """
148
+ BUG FIX #1: With corrected per-log latency values (true wall-clock / n),
149
+ total_ms now reflects real batch wall time instead of the old tautology of
150
+ (total_ms/n) * n = total_ms that showed as 2,962,635 ms for BERT.
151
+ """
152
+ tier_groups = {}
153
+ label_counts = {}
154
 
155
  for r in results:
156
  tier = r["tier"]
 
169
  "p95_ms": round(latencies_sorted[min(int(n * 0.95), n - 1)], 4),
170
  "p99_ms": round(latencies_sorted[min(int(n * 0.99), n - 1)], 4),
171
  "mean_ms": round(statistics.mean(latencies_sorted), 4),
172
+ "total_ms": round(sum(latencies_sorted), 4),
173
  }
174
 
175
  return {
 
180
 
181
 
182
  # ── Multiprocessing Helper ───────────────────────────────────────────────────
183
+ def _process_chunk(chunk: list) -> list:
184
+ """Top-level helper required for ProcessPoolExecutor mapping."""
185
  return classify_logs(chunk)
186
 
187
 
188
  # ── CSV batch classify (Balanced Processing) ─────────────────────────────────
189
+ def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple:
190
  """
191
  Balanced Batch Processing to prevent CPU Starvation UI crashes.
192
  """
 
195
  if not required.issubset(df.columns):
196
  raise ValueError(f"Missing required columns in CSV. Expected: {required}. Found: {set(df.columns)}")
197
 
198
+ log_pairs = list(zip(df["source"], df["log_message"]))
199
  total_logs = len(log_pairs)
200
+
201
+ # Use exactly half the available CPU cores β€” leaves the other half for Gradio.
 
202
  safe_cores = max(1, os.cpu_count() // 2)
203
+
204
+ # Chunk size of 10,000 prevents CPU lockups during inter-process pickling.
205
+ chunk_size = 10000
 
206
  chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
207
+
208
  results = []
209
+
210
  print(f"πŸ”₯ Firing up {safe_cores} CPU Cores (Leaving remaining for UI)...")
211
+
212
  t_start = time.perf_counter()
213
  with ProcessPoolExecutor(max_workers=safe_cores) as executor:
214
  for chunk_result in executor.map(_process_chunk, chunks):
215
  results.extend(chunk_result)
216
  t_end = time.perf_counter()
217
+
218
  print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
219
 
220
+ df["predicted_label"] = [r["label"] for r in results]
221
+ df["tier_used"] = [r["tier"] for r in results]
222
+ df["latency_ms"] = [r["latency_ms"] for r in results]
223
  df["confidence"] = [
224
  f"{r['confidence']:.1%}" if r["confidence"] is not None else "N/A"
225
  for r in results
 
230
 
231
 
232
  # Aliases
233
+ classify = classify_logs