NOT-OMEGA commited on
Commit
1a9b340
Β·
verified Β·
1 Parent(s): 7d3f899

Update classify.py

Browse files
Files changed (1) hide show
  1. classify.py +81 -27
classify.py CHANGED
@@ -1,12 +1,20 @@
1
  """
2
- classify.py β€” 3-Tier Hybrid Pipeline (V11 β€” MAX SPEED + SAFE MULTIPROCESSING + UI FIX)
 
 
 
 
 
 
 
 
 
3
  """
4
  from __future__ import annotations
5
  import os
6
  import time
7
  import statistics
8
  import pandas as pd
9
- import multiprocessing as mp
10
  from functools import lru_cache
11
  from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
12
  from processor_regex import classify_with_regex
@@ -16,6 +24,7 @@ from processor_llm import classify_with_llm
16
  # ── Config ──────────────────────────────────────────────────────────────────
17
  LEGACY_SOURCE = os.getenv("LEGACY_SOURCE", "LegacyCRM")
18
 
 
19
  # ── Result type ─────────────────────────────────────────────────────────────
20
  def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
21
  return {
@@ -25,26 +34,29 @@ def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
25
  "latency_ms": round(latency_ms, 4),
26
  }
27
 
28
- # ── Caching Layer ───────────────────────────────────────────────────────────
29
- @lru_cache(maxsize=10000) # Reduced maxsize per-worker to prevent OOM
 
30
  def cached_llm_call(log_msg: str) -> str:
 
31
  return classify_with_llm(log_msg)
32
 
33
- # ── Single log (backward-compatible for UI) ─────────────────────────────────
 
34
  def classify_log(source: str, log_msg: str) -> dict:
35
- """Used by Gradio real-time analyzer tab."""
36
  results = classify_logs([(source, log_msg)])
37
  return results[0]
38
 
 
 
39
  def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
40
- """Processes a chunk of logs."""
41
  n = len(logs)
42
  results = [None] * n
43
 
 
44
  llm_indices = []
45
  bert_indices = []
46
 
47
- # Step 1: Regex (Now running on multiple cores in parallel!)
48
  for i, (source, log_msg) in enumerate(logs):
49
  if source == LEGACY_SOURCE:
50
  llm_indices.append(i)
@@ -58,9 +70,10 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
58
  else:
59
  bert_indices.append(i)
60
 
61
- # Step 2: BERT
62
  if bert_indices:
63
  bert_msgs = [logs[i][1] for i in bert_indices]
 
64
  t_bert_start = time.perf_counter()
65
  bert_results = bert_batch(bert_msgs)
66
  t_bert_end = time.perf_counter()
@@ -73,10 +86,11 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
73
  else:
74
  llm_indices.append(idx)
75
 
76
- # Step 3: LLM (Threaded inside each process)
77
  if llm_indices:
78
  def parallel_llm(idx):
79
  src, msg = logs[idx]
 
80
  t_llm_0 = time.perf_counter()
81
  label = cached_llm_call(msg)
82
  t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
@@ -86,45 +100,83 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
86
 
87
  return idx, _make_result(label, tier, None, t_llm_ms)
88
 
89
- # Inner ThreadPool for API network requests
90
- with ThreadPoolExecutor(max_workers=10) as executor:
91
- for idx, res in executor.map(parallel_llm, llm_indices):
92
- results[idx] = res
 
93
 
94
  return results
95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
  def _process_chunk(chunk: list[tuple[str, str]]) -> list[dict]:
97
- """Helper function for mapping."""
98
  return classify_logs(chunk)
99
 
100
- # ── CSV batch classify (Safe Spawn Multiprocessing) ─────────────────────────
 
101
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
 
 
 
102
  df = pd.read_csv(input_path)
103
  required = {"source", "log_message"}
104
  if not required.issubset(df.columns):
105
- raise ValueError(f"Missing required columns in CSV.")
106
 
107
  log_pairs = list(zip(df["source"], df["log_message"]))
108
  total_logs = len(log_pairs)
109
 
110
- # Use max cores for speed, but leave 1 for the OS/Gradio UI
111
- safe_cores = max(1, (os.cpu_count() or 1) - 1)
112
- chunk_size = 5000 # Slightly smaller chunks so data copies faster between processes
 
 
 
 
113
  chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
114
 
115
  results = []
116
 
117
- print(f"πŸ”₯ Firing up {safe_cores} CPU Cores with SAFE SPAWN context...")
118
 
119
  t_start = time.perf_counter()
120
-
121
- # FIX: Correctly pass the spawn context to ProcessPoolExecutor
122
- ctx = mp.get_context('spawn')
123
-
124
- with ProcessPoolExecutor(max_workers=safe_cores, mp_context=ctx) as executor:
125
  for chunk_result in executor.map(_process_chunk, chunks):
126
  results.extend(chunk_result)
127
-
128
  t_end = time.perf_counter()
129
 
130
  print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
@@ -140,4 +192,6 @@ def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str,
140
  df.to_csv(output_path, index=False)
141
  return output_path, df
142
 
 
 
143
  classify = classify_logs
 
1
  """
2
+ classify.py β€” 3-Tier Hybrid Pipeline (V9 β€” Balanced CPU & Gradio Safe)
3
+
4
+ Architecture:
5
+ LegacyCRM β†’ LLM directly
6
+ Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
+
8
+ Changes in V9:
9
+ - Fixed CPU Starvation: Limited max_workers to half the CPU cores to prevent Gradio WebSocket timeouts.
10
+ - Reduced IPC Overhead: Lowered chunk_size to 10,000 to prevent CPU lockups during cross-process data pickling.
11
+ - Restored Multi-processing: Outer chunks use ProcessPoolExecutor for speed, inner LLM calls use ThreadPoolExecutor.
12
  """
13
  from __future__ import annotations
14
  import os
15
  import time
16
  import statistics
17
  import pandas as pd
 
18
  from functools import lru_cache
19
  from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
20
  from processor_regex import classify_with_regex
 
24
  # ── Config ──────────────────────────────────────────────────────────────────
25
  LEGACY_SOURCE = os.getenv("LEGACY_SOURCE", "LegacyCRM")
26
 
27
+
28
  # ── Result type ─────────────────────────────────────────────────────────────
29
  def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
30
  return {
 
34
  "latency_ms": round(latency_ms, 4),
35
  }
36
 
37
+
38
+ # ── Caching Layer (Sharded per Worker) ──────────────────────────────────────
39
+ @lru_cache(maxsize=500000)
40
  def cached_llm_call(log_msg: str) -> str:
41
+ """Executes the expensive LLM call only if the string misses the cache."""
42
  return classify_with_llm(log_msg)
43
 
44
+
45
+ # ── Single log (backward-compatible) ────────────────────────────────────────
46
  def classify_log(source: str, log_msg: str) -> dict:
 
47
  results = classify_logs([(source, log_msg)])
48
  return results[0]
49
 
50
+
51
+ # ── Batch pipeline (main entry point) ───────────────────────────────────────
52
  def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
 
53
  n = len(logs)
54
  results = [None] * n
55
 
56
+ # ── Step 1: Route to groups ─────────────────────────────────────────────
57
  llm_indices = []
58
  bert_indices = []
59
 
 
60
  for i, (source, log_msg) in enumerate(logs):
61
  if source == LEGACY_SOURCE:
62
  llm_indices.append(i)
 
70
  else:
71
  bert_indices.append(i)
72
 
73
+ # ── Step 2: BERT batch (CPU Bound) ──────────────────────────────────────
74
  if bert_indices:
75
  bert_msgs = [logs[i][1] for i in bert_indices]
76
+
77
  t_bert_start = time.perf_counter()
78
  bert_results = bert_batch(bert_msgs)
79
  t_bert_end = time.perf_counter()
 
86
  else:
87
  llm_indices.append(idx)
88
 
89
+ # ── Step 3: LLM (I/O Bound - Threading Applied Here) ────────────────────
90
  if llm_indices:
91
  def parallel_llm(idx):
92
  src, msg = logs[idx]
93
+
94
  t_llm_0 = time.perf_counter()
95
  label = cached_llm_call(msg)
96
  t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
 
100
 
101
  return idx, _make_result(label, tier, None, t_llm_ms)
102
 
103
+ with ThreadPoolExecutor() as executor:
104
+ llm_results = list(executor.map(parallel_llm, llm_indices))
105
+
106
+ for idx, res in llm_results:
107
+ results[idx] = res
108
 
109
  return results
110
 
111
+
112
+ # ── Pipeline summary ─────────────────────────────────────────────────────────
113
+ def pipeline_summary(results: list[dict]) -> dict:
114
+ tier_groups: dict[str, list[float]] = {}
115
+ label_counts: dict[str, int] = {}
116
+
117
+ for r in results:
118
+ tier = r["tier"]
119
+ tier_groups.setdefault(tier, []).append(r["latency_ms"])
120
+ label_counts[r["label"]] = label_counts.get(r["label"], 0) + 1
121
+
122
+ total = len(results)
123
+ tier_stats = {}
124
+ for tier, latencies in tier_groups.items():
125
+ latencies_sorted = sorted(latencies)
126
+ n = len(latencies_sorted)
127
+ tier_stats[tier] = {
128
+ "count": n,
129
+ "pct": round(n / total * 100, 1),
130
+ "p50_ms": round(statistics.median(latencies_sorted), 4),
131
+ "p95_ms": round(latencies_sorted[min(int(n * 0.95), n - 1)], 4),
132
+ "p99_ms": round(latencies_sorted[min(int(n * 0.99), n - 1)], 4),
133
+ "mean_ms": round(statistics.mean(latencies_sorted), 4),
134
+ "total_ms": round(sum(latencies_sorted), 4),
135
+ }
136
+
137
+ return {
138
+ "total": total,
139
+ "tier_stats": tier_stats,
140
+ "label_counts": label_counts,
141
+ }
142
+
143
+
144
+ # ── Multiprocessing Helper ───────────────────────────────────────────────────
145
  def _process_chunk(chunk: list[tuple[str, str]]) -> list[dict]:
146
+ """Top-level helper function required for ProcessPoolExecutor mapping."""
147
  return classify_logs(chunk)
148
 
149
+
150
+ # ── CSV batch classify (Balanced Processing) ─────────────────────────────────
151
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
152
+ """
153
+ Balanced Batch Processing to prevent CPU Starvation UI crashes.
154
+ """
155
  df = pd.read_csv(input_path)
156
  required = {"source", "log_message"}
157
  if not required.issubset(df.columns):
158
+ raise ValueError(f"Missing required columns in CSV. Expected: {required}. Found: {set(df.columns)}")
159
 
160
  log_pairs = list(zip(df["source"], df["log_message"]))
161
  total_logs = len(log_pairs)
162
 
163
+ # FIX: Use exactly half of the available CPU cores (minimum 1).
164
+ # This leaves the other half for Gradio websockets and the OS.
165
+ safe_cores = max(1, os.cpu_count() // 2)
166
+
167
+ # FIX: Reduce chunk size to 10,000.
168
+ # Massive chunks cause CPU lockups during inter-process data pickling.
169
+ chunk_size = 10000
170
  chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
171
 
172
  results = []
173
 
174
+ print(f"πŸ”₯ Firing up {safe_cores} CPU Cores (Leaving remaining for UI)...")
175
 
176
  t_start = time.perf_counter()
177
+ with ProcessPoolExecutor(max_workers=safe_cores) as executor:
 
 
 
 
178
  for chunk_result in executor.map(_process_chunk, chunks):
179
  results.extend(chunk_result)
 
180
  t_end = time.perf_counter()
181
 
182
  print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
 
192
  df.to_csv(output_path, index=False)
193
  return output_path, df
194
 
195
+
196
+ # Aliases
197
  classify = classify_logs