NOT-OMEGA commited on
Commit
5b20649
Β·
verified Β·
1 Parent(s): 6ffecdd

Update classify.py

Browse files
Files changed (1) hide show
  1. classify.py +19 -26
classify.py CHANGED
@@ -1,14 +1,14 @@
1
  """
2
- classify.py β€” 3-Tier Hybrid Pipeline (V7 β€” Maximum Speed & Sharded Caching)
3
 
4
  Architecture:
5
  LegacyCRM β†’ LLM directly
6
  Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
 
8
- Changes in V7:
9
- - Unfroze the Gradio UI and restored Processing Speed: Brought back ProcessPoolExecutor for the outer CSV chunks to utilize ALL CPU cores.
10
- - LLM concurrency: ThreadPoolExecutor is retained inside classify_logs specifically for LLM I/O calls.
11
- - Cache Architecture: Using a "Sharded Cache" approach. Each CPU worker process gets its own 500k @lru_cache, which is perfectly safe for 18GB RAM and avoids GIL locks entirely.
12
  """
13
  from __future__ import annotations
14
  import os
@@ -16,7 +16,7 @@ import time
16
  import statistics
17
  import pandas as pd
18
  from functools import lru_cache
19
- from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
20
  from processor_regex import classify_with_regex
21
  from processor_bert import classify_batch as bert_batch
22
  from processor_llm import classify_with_llm
@@ -35,7 +35,7 @@ def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
35
  }
36
 
37
 
38
- # ── Caching Layer (Sharded per CPU Core) ────────────────────────────────────
39
  @lru_cache(maxsize=500000)
40
  def cached_llm_call(log_msg: str) -> str:
41
  """Executes the expensive LLM call only if the string misses the cache."""
@@ -70,7 +70,7 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
70
  else:
71
  bert_indices.append(i)
72
 
73
- # ── Step 2: BERT batch (CPU Bound - Uses full core without GIL) ─────────
74
  if bert_indices:
75
  bert_msgs = [logs[i][1] for i in bert_indices]
76
 
@@ -86,7 +86,7 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
86
  else:
87
  llm_indices.append(idx)
88
 
89
- # ── Step 3: LLM (I/O Bound - Threading Applied Here) ────────────────────
90
  if llm_indices:
91
  def parallel_llm(idx):
92
  src, msg = logs[idx]
@@ -100,6 +100,7 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
100
 
101
  return idx, _make_result(label, tier, None, t_llm_ms)
102
 
 
103
  with ThreadPoolExecutor() as executor:
104
  llm_results = list(executor.map(parallel_llm, llm_indices))
105
 
@@ -141,18 +142,11 @@ def pipeline_summary(results: list[dict]) -> dict:
141
  }
142
 
143
 
144
- # ── Multiprocessing Helper ───────────────────────────────────────────────────
145
- def _process_chunk(chunk: list[tuple[str, str]]) -> list[dict]:
146
- """Top-level helper function required for ProcessPoolExecutor mapping."""
147
- return classify_logs(chunk)
148
-
149
-
150
- # ── CSV batch classify (Hybrid Processing) ───────────────────────────────────
151
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
152
  """
153
- Ultra-Optimized Batch Processing for 2M+ Logs.
154
- Outer chunks use ProcessPoolExecutor to smash through BERT on all CPU cores.
155
- Inner LLM calls automatically use ThreadPoolExecutor to handle network I/O.
156
  """
157
  df = pd.read_csv(input_path)
158
  required = {"source", "log_message"}
@@ -162,19 +156,18 @@ def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str,
162
  log_pairs = list(zip(df["source"], df["log_message"]))
163
  total_logs = len(log_pairs)
164
 
165
- max_cores = max(1, os.cpu_count() - 1)
166
- chunk_size = 50000
167
  chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
168
 
169
  results = []
170
 
171
- print(f"πŸ”₯ Firing up {max_cores} Process Cores... (BERT gets raw CPU, LLM gets Threads)")
172
 
173
  t_start = time.perf_counter()
174
- # Brought ProcessPoolExecutor back to unblock the CPU and UI
175
- with ProcessPoolExecutor(max_workers=max_cores) as executor:
176
- for chunk_result in executor.map(_process_chunk, chunks):
177
- results.extend(chunk_result)
178
  t_end = time.perf_counter()
179
 
180
  print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
 
1
  """
2
+ classify.py β€” 3-Tier Hybrid Pipeline (V8 β€” Cloud Container Safe)
3
 
4
  Architecture:
5
  LegacyCRM β†’ LLM directly
6
  Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
 
8
+ Changes in V8 (Stability First):
9
+ - Removed ProcessPoolExecutor: It was causing Out-Of-Memory (OOM) crashes on Hugging Face Spaces by duplicating the BERT model across CPU cores.
10
+ - Reverted to Sequential Chunks: Protects the 16GB RAM limit and keeps the 500k @lru_cache perfectly intact in the main process.
11
+ - Retained ThreadPoolExecutor: Only used for LLM API calls (I/O bound), which is safe and won't crash the container.
12
  """
13
  from __future__ import annotations
14
  import os
 
16
  import statistics
17
  import pandas as pd
18
  from functools import lru_cache
19
+ from concurrent.futures import ThreadPoolExecutor
20
  from processor_regex import classify_with_regex
21
  from processor_bert import classify_batch as bert_batch
22
  from processor_llm import classify_with_llm
 
35
  }
36
 
37
 
38
+ # ── Caching Layer (Single Process - RAM Safe) ───────────────────────────────
39
  @lru_cache(maxsize=500000)
40
  def cached_llm_call(log_msg: str) -> str:
41
  """Executes the expensive LLM call only if the string misses the cache."""
 
70
  else:
71
  bert_indices.append(i)
72
 
73
+ # ── Step 2: BERT batch (Sequential - RAM Safe) ──────────────────────────
74
  if bert_indices:
75
  bert_msgs = [logs[i][1] for i in bert_indices]
76
 
 
86
  else:
87
  llm_indices.append(idx)
88
 
89
+ # ── Step 3: LLM (I/O Bound - Threading Safe) ────────────────────────────
90
  if llm_indices:
91
  def parallel_llm(idx):
92
  src, msg = logs[idx]
 
100
 
101
  return idx, _make_result(label, tier, None, t_llm_ms)
102
 
103
+ # ThreadPoolExecutor is safe for Gradio/HF Spaces because it shares memory
104
  with ThreadPoolExecutor() as executor:
105
  llm_results = list(executor.map(parallel_llm, llm_indices))
106
 
 
142
  }
143
 
144
 
145
+ # ── CSV batch classify (Container Safe Processing) ───────────────────────────
 
 
 
 
 
 
146
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
147
  """
148
+ Stable Batch Processing for 2M+ Logs on Hugging Face Spaces.
149
+ Runs chunks sequentially to prevent OOM memory crashes.
 
150
  """
151
  df = pd.read_csv(input_path)
152
  required = {"source", "log_message"}
 
156
  log_pairs = list(zip(df["source"], df["log_message"]))
157
  total_logs = len(log_pairs)
158
 
159
+ # Reduced chunk size slightly to give the container more breathing room
160
+ chunk_size = 25000
161
  chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
162
 
163
  results = []
164
 
165
+ print(f"πŸ”₯ Processing {len(chunks)} chunks sequentially to protect RAM...")
166
 
167
  t_start = time.perf_counter()
168
+ # Sequential loop: Prevents Gradio from crashing and keeps memory stable
169
+ for chunk in chunks:
170
+ results.extend(classify_logs(chunk))
 
171
  t_end = time.perf_counter()
172
 
173
  print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")