NOT-OMEGA commited on
Commit
ace5ccf
Β·
verified Β·
1 Parent(s): 5b20649

Update classify.py

Browse files
Files changed (1) hide show
  1. classify.py +29 -33
classify.py CHANGED
@@ -1,14 +1,14 @@
1
  """
2
- classify.py β€” 3-Tier Hybrid Pipeline (V8 β€” Cloud Container Safe)
3
 
4
  Architecture:
5
  LegacyCRM β†’ LLM directly
6
  Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
 
8
- Changes in V8 (Stability First):
9
- - Removed ProcessPoolExecutor: It was causing Out-Of-Memory (OOM) crashes on Hugging Face Spaces by duplicating the BERT model across CPU cores.
10
- - Reverted to Sequential Chunks: Protects the 16GB RAM limit and keeps the 500k @lru_cache perfectly intact in the main process.
11
- - Retained ThreadPoolExecutor: Only used for LLM API calls (I/O bound), which is safe and won't crash the container.
12
  """
13
  from __future__ import annotations
14
  import os
@@ -16,7 +16,7 @@ import time
16
  import statistics
17
  import pandas as pd
18
  from functools import lru_cache
19
- from concurrent.futures import ThreadPoolExecutor
20
  from processor_regex import classify_with_regex
21
  from processor_bert import classify_batch as bert_batch
22
  from processor_llm import classify_with_llm
@@ -35,7 +35,7 @@ def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
35
  }
36
 
37
 
38
- # ── Caching Layer (Single Process - RAM Safe) ───────────────────────────────
39
  @lru_cache(maxsize=500000)
40
  def cached_llm_call(log_msg: str) -> str:
41
  """Executes the expensive LLM call only if the string misses the cache."""
@@ -70,7 +70,7 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
70
  else:
71
  bert_indices.append(i)
72
 
73
- # ── Step 2: BERT batch (Sequential - RAM Safe) ──────────────────────────
74
  if bert_indices:
75
  bert_msgs = [logs[i][1] for i in bert_indices]
76
 
@@ -86,7 +86,7 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
86
  else:
87
  llm_indices.append(idx)
88
 
89
- # ── Step 3: LLM (I/O Bound - Threading Safe) ────────────────────────────
90
  if llm_indices:
91
  def parallel_llm(idx):
92
  src, msg = logs[idx]
@@ -100,7 +100,6 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
100
 
101
  return idx, _make_result(label, tier, None, t_llm_ms)
102
 
103
- # ThreadPoolExecutor is safe for Gradio/HF Spaces because it shares memory
104
  with ThreadPoolExecutor() as executor:
105
  llm_results = list(executor.map(parallel_llm, llm_indices))
106
 
@@ -142,11 +141,16 @@ def pipeline_summary(results: list[dict]) -> dict:
142
  }
143
 
144
 
145
- # ── CSV batch classify (Container Safe Processing) ───────────────────────────
 
 
 
 
 
 
146
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
147
  """
148
- Stable Batch Processing for 2M+ Logs on Hugging Face Spaces.
149
- Runs chunks sequentially to prevent OOM memory crashes.
150
  """
151
  df = pd.read_csv(input_path)
152
  required = {"source", "log_message"}
@@ -156,18 +160,23 @@ def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str,
156
  log_pairs = list(zip(df["source"], df["log_message"]))
157
  total_logs = len(log_pairs)
158
 
159
- # Reduced chunk size slightly to give the container more breathing room
160
- chunk_size = 25000
 
 
 
 
 
161
  chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
162
 
163
  results = []
164
 
165
- print(f"πŸ”₯ Processing {len(chunks)} chunks sequentially to protect RAM...")
166
 
167
  t_start = time.perf_counter()
168
- # Sequential loop: Prevents Gradio from crashing and keeps memory stable
169
- for chunk in chunks:
170
- results.extend(classify_logs(chunk))
171
  t_end = time.perf_counter()
172
 
173
  print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
@@ -185,17 +194,4 @@ def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str,
185
 
186
 
187
  # Aliases
188
- classify = classify_logs
189
-
190
-
191
- # ── Self-test ────────────────────────────────────────────────────────────────
192
- if __name__ == "__main__":
193
- sample = [
194
- ("ModernCRM", "IP 192.168.133.114 blocked due to potential attack"),
195
- ("BillingSystem", "User User12345 logged in."),
196
- ("LegacyCRM", "Case escalation failed due to active timeout."),
197
- ]
198
-
199
- print("Running quick test...")
200
- results = classify_logs(sample)
201
- print("Done. No errors.")
 
1
  """
2
+ classify.py β€” 3-Tier Hybrid Pipeline (V9 β€” Balanced CPU & Gradio Safe)
3
 
4
  Architecture:
5
  LegacyCRM β†’ LLM directly
6
  Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
 
8
+ Changes in V9:
9
+ - Fixed CPU Starvation: Limited max_workers to half the CPU cores to prevent Gradio WebSocket timeouts.
10
+ - Reduced IPC Overhead: Lowered chunk_size to 10,000 to prevent CPU lockups during cross-process data pickling.
11
+ - Restored Multi-processing: Outer chunks use ProcessPoolExecutor for speed, inner LLM calls use ThreadPoolExecutor.
12
  """
13
  from __future__ import annotations
14
  import os
 
16
  import statistics
17
  import pandas as pd
18
  from functools import lru_cache
19
+ from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
20
  from processor_regex import classify_with_regex
21
  from processor_bert import classify_batch as bert_batch
22
  from processor_llm import classify_with_llm
 
35
  }
36
 
37
 
38
+ # ── Caching Layer (Sharded per Worker) ──────────────────────────────────────
39
  @lru_cache(maxsize=500000)
40
  def cached_llm_call(log_msg: str) -> str:
41
  """Executes the expensive LLM call only if the string misses the cache."""
 
70
  else:
71
  bert_indices.append(i)
72
 
73
+ # ── Step 2: BERT batch (CPU Bound) ──────────────────────────────────────
74
  if bert_indices:
75
  bert_msgs = [logs[i][1] for i in bert_indices]
76
 
 
86
  else:
87
  llm_indices.append(idx)
88
 
89
+ # ── Step 3: LLM (I/O Bound - Threading Applied Here) ────────────────────
90
  if llm_indices:
91
  def parallel_llm(idx):
92
  src, msg = logs[idx]
 
100
 
101
  return idx, _make_result(label, tier, None, t_llm_ms)
102
 
 
103
  with ThreadPoolExecutor() as executor:
104
  llm_results = list(executor.map(parallel_llm, llm_indices))
105
 
 
141
  }
142
 
143
 
144
+ # ── Multiprocessing Helper ───────────────────────────────────────────────────
145
+ def _process_chunk(chunk: list[tuple[str, str]]) -> list[dict]:
146
+ """Top-level helper function required for ProcessPoolExecutor mapping."""
147
+ return classify_logs(chunk)
148
+
149
+
150
+ # ── CSV batch classify (Balanced Processing) ─────────────────────────────────
151
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
152
  """
153
+ Balanced Batch Processing to prevent CPU Starvation UI crashes.
 
154
  """
155
  df = pd.read_csv(input_path)
156
  required = {"source", "log_message"}
 
160
  log_pairs = list(zip(df["source"], df["log_message"]))
161
  total_logs = len(log_pairs)
162
 
163
+ # FIX: Use exactly half of the available CPU cores (minimum 1).
164
+ # This leaves the other half for Gradio websockets and the OS.
165
+ safe_cores = max(1, os.cpu_count() // 2)
166
+
167
+ # FIX: Reduce chunk size to 10,000.
168
+ # Massive chunks cause CPU lockups during inter-process data pickling.
169
+ chunk_size = 10000
170
  chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
171
 
172
  results = []
173
 
174
+ print(f"πŸ”₯ Firing up {safe_cores} CPU Cores (Leaving remaining for UI)...")
175
 
176
  t_start = time.perf_counter()
177
+ with ProcessPoolExecutor(max_workers=safe_cores) as executor:
178
+ for chunk_result in executor.map(_process_chunk, chunks):
179
+ results.extend(chunk_result)
180
  t_end = time.perf_counter()
181
 
182
  print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
 
194
 
195
 
196
  # Aliases
197
+ classify = classify_logs