NOT-OMEGA commited on
Commit
b90444b
Β·
verified Β·
1 Parent(s): a1af93d

Update classify.py

Browse files
Files changed (1) hide show
  1. classify.py +26 -22
classify.py CHANGED
@@ -5,24 +5,26 @@ Architecture:
5
  LegacyCRM β†’ LLM directly
6
  Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
 
8
- Changes in V5:
9
- - Added ProcessPoolExecutor for classify_csv to distribute workload across all CPU cores.
10
- - Increased LRU cache size to 500,000 to maximize RAM usage and minimize LLM calls during 2M+ log stress tests.
11
- - High-resolution telemetry and True Batch Latency tracking retained from V4.
 
 
12
  """
13
  from __future__ import annotations
14
  import os
15
  import time
16
- import hashlib
17
  import statistics
18
  import pandas as pd
19
  from functools import lru_cache
20
- from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
21
  from processor_regex import classify_with_regex
22
  from processor_bert import classify_batch as bert_batch
23
  from processor_llm import classify_with_llm
24
 
25
- LEGACY_SOURCE = "LegacyCRM"
 
26
 
27
 
28
  # ── Result type ─────────────────────────────────────────────────────────────
@@ -37,8 +39,8 @@ def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
37
 
38
  # ── Caching Layer (V5 UPDATE: Max RAM Eater) ────────────────────────────────
39
  @lru_cache(maxsize=500000) # Increased to 500k to absorb duplicate logs in 2M+ datasets
40
- def cached_llm_call(log_hash: str, log_msg: str) -> str:
41
- """Only executes the expensive LLM call if the MD5 hash misses the cache."""
42
  return classify_with_llm(log_msg)
43
 
44
 
@@ -95,10 +97,9 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
95
  def parallel_llm(idx):
96
  src, msg = logs[idx]
97
 
98
- log_hash = hashlib.md5(msg.encode('utf-8')).hexdigest()
99
-
100
  t_llm_0 = time.perf_counter()
101
- label = cached_llm_call(log_hash, msg)
 
102
  t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
103
 
104
  base_tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
@@ -106,7 +107,8 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
106
 
107
  return idx, _make_result(label, tier, None, t_llm_ms)
108
 
109
- with ThreadPoolExecutor(max_workers=4) as executor:
 
110
  llm_results = list(executor.map(parallel_llm, llm_indices))
111
 
112
  for idx, res in llm_results:
@@ -153,7 +155,7 @@ def pipeline_summary(results: list[dict]) -> dict:
153
 
154
  # ── Multiprocessing Helper ───────────────────────────────────────────────────
155
  def _process_chunk(chunk: list[tuple[str, str]]) -> list[dict]:
156
- """Top-level helper function required for ProcessPoolExecutor mapping."""
157
  return classify_logs(chunk)
158
 
159
 
@@ -161,7 +163,7 @@ def _process_chunk(chunk: list[tuple[str, str]]) -> list[dict]:
161
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
162
  """
163
  Ultra-Optimized Batch Processing for 2M+ Logs.
164
- Uses ProcessPoolExecutor to max out CPU cores and gorge on RAM.
165
  """
166
  df = pd.read_csv(input_path)
167
  required = {"source", "log_message"}
@@ -180,10 +182,16 @@ def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str,
180
 
181
  results = []
182
 
183
- print(f"πŸ”₯ Firing up {max_cores} CPU cores to process {len(chunks)} chunks...")
184
- with ProcessPoolExecutor(max_workers=max_cores) as executor:
 
 
 
185
  for chunk_result in executor.map(_process_chunk, chunks):
186
  results.extend(chunk_result)
 
 
 
187
 
188
  df["predicted_label"] = [r["label"] for r in results]
189
  df["tier_used"] = [r["tier"] for r in results]
@@ -203,8 +211,6 @@ classify = classify_logs
203
 
204
  # ── Self-test ────────────────────────────────────────────────────────────────
205
  if __name__ == "__main__":
206
- # Required for safe multiprocessing on Windows/macOS
207
- # Ensures child processes don't recursively run the __main__ block
208
  sample = [
209
  ("ModernCRM", "IP 192.168.133.114 blocked due to potential attack"),
210
  ("BillingSystem", "User User12345 logged in."),
@@ -226,9 +232,7 @@ if __name__ == "__main__":
226
  print("\nπŸ“Š Pipeline Summary:")
227
 
228
  for tier, stats in summary["tier_stats"].items():
229
- if tier == "BERT":
230
- print(f" BERT Batch Latency: {stats['total_ms']} ms (Amortized over {stats['count']} logs)")
231
- elif "Regex" in tier:
232
  print(f" Regex Latency: < 0.1 ms (Recorded p50: {stats['p50_ms']} ms) | count={stats['count']}")
233
  else:
234
  print(f" {tier}: {stats['count']} logs ({stats['pct']}%) | "
 
5
  LegacyCRM β†’ LLM directly
6
  Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
 
8
+ Changes in V5 (Patched):
9
+ - Fixed multiprocessing: Switched to ThreadPoolExecutor to properly share the @lru_cache memory across chunks.
10
+ - Eliminated MD5 cache key collision risks.
11
+ - Removed hardcoded thread limits to avoid bottlenecks.
12
+ - Extracted business rules (LegacyCRM) to environment variables.
13
+ - Corrected misleading batch latency metrics.
14
  """
15
  from __future__ import annotations
16
  import os
17
  import time
 
18
  import statistics
19
  import pandas as pd
20
  from functools import lru_cache
21
+ from concurrent.futures import ThreadPoolExecutor
22
  from processor_regex import classify_with_regex
23
  from processor_bert import classify_batch as bert_batch
24
  from processor_llm import classify_with_llm
25
 
26
+ # ── Config ──────────────────────────────────────────────────────────────────
27
+ LEGACY_SOURCE = os.getenv("LEGACY_SOURCE", "LegacyCRM")
28
 
29
 
30
  # ── Result type ─────────────────────────────────────────────────────────────
 
39
 
40
  # ── Caching Layer (V5 UPDATE: Max RAM Eater) ────────────────────────────────
41
  @lru_cache(maxsize=500000) # Increased to 500k to absorb duplicate logs in 2M+ datasets
42
+ def cached_llm_call(log_msg: str) -> str:
43
+ """Only executes the expensive LLM call if the string misses the cache."""
44
  return classify_with_llm(log_msg)
45
 
46
 
 
97
  def parallel_llm(idx):
98
  src, msg = logs[idx]
99
 
 
 
100
  t_llm_0 = time.perf_counter()
101
+ # Removed redundant MD5 hashing; standard string key is safer and faster
102
+ label = cached_llm_call(msg)
103
  t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
104
 
105
  base_tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
 
107
 
108
  return idx, _make_result(label, tier, None, t_llm_ms)
109
 
110
+ # Removed hardcoded max_workers=4 to allow auto-scaling based on CPU cores
111
+ with ThreadPoolExecutor() as executor:
112
  llm_results = list(executor.map(parallel_llm, llm_indices))
113
 
114
  for idx, res in llm_results:
 
155
 
156
  # ── Multiprocessing Helper ───────────────────────────────────────────────────
157
  def _process_chunk(chunk: list[tuple[str, str]]) -> list[dict]:
158
+ """Top-level helper function required for ThreadPoolExecutor mapping."""
159
  return classify_logs(chunk)
160
 
161
 
 
163
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
164
  """
165
  Ultra-Optimized Batch Processing for 2M+ Logs.
166
+ Uses ThreadPoolExecutor to share the massive lru_cache across chunks.
167
  """
168
  df = pd.read_csv(input_path)
169
  required = {"source", "log_message"}
 
182
 
183
  results = []
184
 
185
+ # Switched to ThreadPoolExecutor so memory (lru_cache) is shared properly
186
+ print(f"πŸ”₯ Firing up {max_cores} worker threads to process {len(chunks)} chunks...")
187
+
188
+ t_start = time.perf_counter()
189
+ with ThreadPoolExecutor(max_workers=max_cores) as executor:
190
  for chunk_result in executor.map(_process_chunk, chunks):
191
  results.extend(chunk_result)
192
+ t_end = time.perf_counter()
193
+
194
+ print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
195
 
196
  df["predicted_label"] = [r["label"] for r in results]
197
  df["tier_used"] = [r["tier"] for r in results]
 
211
 
212
  # ── Self-test ────────────────────────────────────────────────────────────────
213
  if __name__ == "__main__":
 
 
214
  sample = [
215
  ("ModernCRM", "IP 192.168.133.114 blocked due to potential attack"),
216
  ("BillingSystem", "User User12345 logged in."),
 
232
  print("\nπŸ“Š Pipeline Summary:")
233
 
234
  for tier, stats in summary["tier_stats"].items():
235
+ if "Regex" in tier:
 
 
236
  print(f" Regex Latency: < 0.1 ms (Recorded p50: {stats['p50_ms']} ms) | count={stats['count']}")
237
  else:
238
  print(f" {tier}: {stats['count']} logs ({stats['pct']}%) | "