NOT-OMEGA commited on
Commit
6ffecdd
Β·
verified Β·
1 Parent(s): a65e2b6

Update classify.py

Browse files
Files changed (1) hide show
  1. classify.py +22 -15
classify.py CHANGED
@@ -1,14 +1,14 @@
1
  """
2
- classify.py β€” 3-Tier Hybrid Pipeline (V6 β€” Hybrid Concurrency)
3
 
4
  Architecture:
5
  LegacyCRM β†’ LLM directly
6
  Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
 
8
- Changes in V6 (Final Polish):
9
- - Solved the GIL Bottleneck: Chunks run sequentially so BERT gets 100% CPU without thread-locking.
10
- - LLM concurrency: ThreadPoolExecutor is retained ONLY for LLM I/O calls.
11
- - Perfect Caching: Outer chunks are sequential, meaning the 500k @lru_cache stays in the main process memory for the entire 2M logs.
12
  """
13
  from __future__ import annotations
14
  import os
@@ -16,7 +16,7 @@ import time
16
  import statistics
17
  import pandas as pd
18
  from functools import lru_cache
19
- from concurrent.futures import ThreadPoolExecutor
20
  from processor_regex import classify_with_regex
21
  from processor_bert import classify_batch as bert_batch
22
  from processor_llm import classify_with_llm
@@ -35,7 +35,7 @@ def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
35
  }
36
 
37
 
38
- # ── Caching Layer (Max RAM Eater) ───────────────────────────────────────────
39
  @lru_cache(maxsize=500000)
40
  def cached_llm_call(log_msg: str) -> str:
41
  """Executes the expensive LLM call only if the string misses the cache."""
@@ -70,7 +70,7 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
70
  else:
71
  bert_indices.append(i)
72
 
73
- # ── Step 2: BERT batch (CPU Bound - No Threads Allowed Here) ────────────
74
  if bert_indices:
75
  bert_msgs = [logs[i][1] for i in bert_indices]
76
 
@@ -141,12 +141,18 @@ def pipeline_summary(results: list[dict]) -> dict:
141
  }
142
 
143
 
 
 
 
 
 
 
144
  # ── CSV batch classify (Hybrid Processing) ───────────────────────────────────
145
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
146
  """
147
  Ultra-Optimized Batch Processing for 2M+ Logs.
148
- Outer chunks run sequentially (bypasses GIL for BERT, preserves main memory cache).
149
- Inner LLM calls thread automatically inside classify_logs.
150
  """
151
  df = pd.read_csv(input_path)
152
  required = {"source", "log_message"}
@@ -156,18 +162,19 @@ def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str,
156
  log_pairs = list(zip(df["source"], df["log_message"]))
157
  total_logs = len(log_pairs)
158
 
159
- # Chunk size controls how much data is in RAM at once
160
  chunk_size = 50000
161
  chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
162
 
163
  results = []
164
 
165
- print(f"πŸ”₯ Processing {len(chunks)} chunks... (BERT handles CPU batching, LLM handles I/O threads)")
166
 
167
  t_start = time.perf_counter()
168
- # Process sequentially to avoid GIL locks on BERT and keep the cache in one memory block
169
- for chunk in chunks:
170
- results.extend(classify_logs(chunk))
 
171
  t_end = time.perf_counter()
172
 
173
  print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
 
1
  """
2
+ classify.py β€” 3-Tier Hybrid Pipeline (V7 β€” Maximum Speed & Sharded Caching)
3
 
4
  Architecture:
5
  LegacyCRM β†’ LLM directly
6
  Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
 
8
+ Changes in V7:
9
+ - Unfroze the Gradio UI and restored Processing Speed: Brought back ProcessPoolExecutor for the outer CSV chunks to utilize ALL CPU cores.
10
+ - LLM concurrency: ThreadPoolExecutor is retained inside classify_logs specifically for LLM I/O calls.
11
+ - Cache Architecture: Using a "Sharded Cache" approach. Each CPU worker process gets its own 500k @lru_cache, which is perfectly safe for 18GB RAM and avoids GIL locks entirely.
12
  """
13
  from __future__ import annotations
14
  import os
 
16
  import statistics
17
  import pandas as pd
18
  from functools import lru_cache
19
+ from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
20
  from processor_regex import classify_with_regex
21
  from processor_bert import classify_batch as bert_batch
22
  from processor_llm import classify_with_llm
 
35
  }
36
 
37
 
38
+ # ── Caching Layer (Sharded per CPU Core) ────────────────────────────────────
39
  @lru_cache(maxsize=500000)
40
  def cached_llm_call(log_msg: str) -> str:
41
  """Executes the expensive LLM call only if the string misses the cache."""
 
70
  else:
71
  bert_indices.append(i)
72
 
73
+ # ── Step 2: BERT batch (CPU Bound - Uses full core without GIL) ─────────
74
  if bert_indices:
75
  bert_msgs = [logs[i][1] for i in bert_indices]
76
 
 
141
  }
142
 
143
 
144
+ # ── Multiprocessing Helper ───────────────────────────────────────────────────
145
+ def _process_chunk(chunk: list[tuple[str, str]]) -> list[dict]:
146
+ """Top-level helper function required for ProcessPoolExecutor mapping."""
147
+ return classify_logs(chunk)
148
+
149
+
150
  # ── CSV batch classify (Hybrid Processing) ───────────────────────────────────
151
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
152
  """
153
  Ultra-Optimized Batch Processing for 2M+ Logs.
154
+ Outer chunks use ProcessPoolExecutor to smash through BERT on all CPU cores.
155
+ Inner LLM calls automatically use ThreadPoolExecutor to handle network I/O.
156
  """
157
  df = pd.read_csv(input_path)
158
  required = {"source", "log_message"}
 
162
  log_pairs = list(zip(df["source"], df["log_message"]))
163
  total_logs = len(log_pairs)
164
 
165
+ max_cores = max(1, os.cpu_count() - 1)
166
  chunk_size = 50000
167
  chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
168
 
169
  results = []
170
 
171
+ print(f"πŸ”₯ Firing up {max_cores} Process Cores... (BERT gets raw CPU, LLM gets Threads)")
172
 
173
  t_start = time.perf_counter()
174
+ # Brought ProcessPoolExecutor back to unblock the CPU and UI
175
+ with ProcessPoolExecutor(max_workers=max_cores) as executor:
176
+ for chunk_result in executor.map(_process_chunk, chunks):
177
+ results.extend(chunk_result)
178
  t_end = time.perf_counter()
179
 
180
  print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")