NOT-OMEGA commited on
Commit
3f1a1d6
Β·
verified Β·
1 Parent(s): 8401d25

Update classify.py

Browse files
Files changed (1) hide show
  1. classify.py +37 -25
classify.py CHANGED
@@ -1,23 +1,23 @@
1
  """
2
- classify.py β€” 3-Tier Hybrid Pipeline (V4 β€” MAANG-Grade Telemetry & Caching)
3
 
4
  Architecture:
5
  LegacyCRM β†’ LLM directly
6
  Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
 
8
- Changes in V4:
9
- - High-resolution telemetry (4 decimal places) to capture sub-ms Regex execution.
10
- - True Batch Latency tracking for BERT (decoupled from individual log spoofing).
11
- - MD5 Hashing & LRU Cache layer for the LLM to mathematically prove cost savings.
12
- - Parallelized LLM Tier using ThreadPoolExecutor for high throughput.
13
  """
14
  from __future__ import annotations
 
15
  import time
16
  import hashlib
17
  import statistics
18
  import pandas as pd
19
  from functools import lru_cache
20
- from concurrent.futures import ThreadPoolExecutor
21
  from processor_regex import classify_with_regex
22
  from processor_bert import classify_batch as bert_batch
23
  from processor_llm import classify_with_llm
@@ -31,13 +31,12 @@ def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
31
  "label": label,
32
  "tier": tier,
33
  "confidence": confidence,
34
- # FIX 2: Increased clock resolution to 4 decimal places for sub-ms accuracy
35
  "latency_ms": round(latency_ms, 4),
36
  }
37
 
38
 
39
- # ── Caching Layer (FIX 3) ───────────────────────────────────────────────────
40
- @lru_cache(maxsize=10000)
41
  def cached_llm_call(log_hash: str, log_msg: str) -> str:
42
  """Only executes the expensive LLM call if the MD5 hash misses the cache."""
43
  return classify_with_llm(log_msg)
@@ -83,8 +82,6 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
83
  bert_results = bert_batch(bert_msgs)
84
  t_bert_end = time.perf_counter()
85
 
86
- # We keep the amortized calculation strictly for the CSV line items,
87
- # but the pipeline_summary will handle reporting this as a Batch.
88
  bert_ms_per_log = (t_bert_end - t_bert_start) * 1000 / len(bert_msgs)
89
 
90
  for idx, (label, conf) in zip(bert_indices, bert_results):
@@ -98,7 +95,6 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
98
  def parallel_llm(idx):
99
  src, msg = logs[idx]
100
 
101
- # FIX 3: Generate MD5 hash of the log string
102
  log_hash = hashlib.md5(msg.encode('utf-8')).hexdigest()
103
 
104
  t_llm_0 = time.perf_counter()
@@ -106,13 +102,10 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
106
  t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
107
 
108
  base_tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
109
-
110
- # Categorize the telemetry based on execution time (Sub 5ms = Memory Hit)
111
  tier = f"{base_tier} (Cache Hit)" if t_llm_ms < 5 else f"{base_tier} (API Call)"
112
 
113
  return idx, _make_result(label, tier, None, t_llm_ms)
114
 
115
- # Parallelize API calls to prevent pipeline stall, restricted to 4 workers to prevent OOM
116
  with ThreadPoolExecutor(max_workers=4) as executor:
117
  llm_results = list(executor.map(parallel_llm, llm_indices))
118
 
@@ -144,12 +137,11 @@ def pipeline_summary(results: list[dict]) -> dict:
144
  tier_stats[tier] = {
145
  "count": n,
146
  "pct": round(n / total * 100, 1),
147
- # FIX 2: Prevent flatlining at 0.0 by expanding decimal precision
148
  "p50_ms": round(statistics.median(latencies_sorted), 4),
149
  "p95_ms": round(latencies_sorted[min(int(n * 0.95), n - 1)], 4),
150
  "p99_ms": round(latencies_sorted[min(int(n * 0.99), n - 1)], 4),
151
  "mean_ms": round(statistics.mean(latencies_sorted), 4),
152
- "total_ms": round(sum(latencies_sorted), 4), # Required for Batch calculation
153
  }
154
 
155
  return {
@@ -159,12 +151,17 @@ def pipeline_summary(results: list[dict]) -> dict:
159
  }
160
 
161
 
162
- # ── CSV batch classify ───────────────────────────────────────────────────────
 
 
 
 
 
 
163
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
164
  """
165
- Process a batch of logs from a CSV file.
166
- Required columns: 'source', 'log_message'
167
- Output: appends 'predicted_label', 'tier_used', 'confidence', 'latency_ms'
168
  """
169
  df = pd.read_csv(input_path)
170
  required = {"source", "log_message"}
@@ -172,7 +169,21 @@ def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str,
172
  raise ValueError(f"Missing required columns in CSV. Expected: {required}. Found: {set(df.columns)}")
173
 
174
  log_pairs = list(zip(df["source"], df["log_message"]))
175
- results = classify_logs(log_pairs)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
176
 
177
  df["predicted_label"] = [r["label"] for r in results]
178
  df["tier_used"] = [r["tier"] for r in results]
@@ -192,6 +203,8 @@ classify = classify_logs
192
 
193
  # ── Self-test ────────────────────────────────────────────────────────────────
194
  if __name__ == "__main__":
 
 
195
  sample = [
196
  ("ModernCRM", "IP 192.168.133.114 blocked due to potential attack"),
197
  ("BillingSystem", "User User12345 logged in."),
@@ -199,7 +212,7 @@ if __name__ == "__main__":
199
  ("ModernHR", "GET /v2/servers/detail HTTP/1.1 status: 200 len: 1583 time: 0.19"),
200
  ("ModernHR", "Admin access escalation detected for user 9429"),
201
  ("LegacyCRM", "Case escalation for ticket ID 7324 failed because the assigned support agent is no longer active."),
202
- ("LegacyCRM", "Case escalation for ticket ID 7324 failed because the assigned support agent is no longer active."), # Deliberate duplicate to test MD5 Cache
203
  ]
204
 
205
  print(f'{"Source":<20} {"Tier":<22} {"Conf":>6} {"Lat(ms)":>8} {"Label":<25} Log')
@@ -212,7 +225,6 @@ if __name__ == "__main__":
212
  summary = pipeline_summary(results)
213
  print("\nπŸ“Š Pipeline Summary:")
214
 
215
- # FIX 1: Decoupling the reporting output to reflect architectural reality
216
  for tier, stats in summary["tier_stats"].items():
217
  if tier == "BERT":
218
  print(f" BERT Batch Latency: {stats['total_ms']} ms (Amortized over {stats['count']} logs)")
 
1
  """
2
+ classify.py β€” 3-Tier Hybrid Pipeline (V5 β€” Multiprocessing & Max RAM Utilization)
3
 
4
  Architecture:
5
  LegacyCRM β†’ LLM directly
6
  Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
 
8
+ Changes in V5:
9
+ - Added ProcessPoolExecutor for classify_csv to distribute workload across all CPU cores.
10
+ - Increased LRU cache size to 500,000 to maximize RAM usage and minimize LLM calls during 2M+ log stress tests.
11
+ - High-resolution telemetry and True Batch Latency tracking retained from V4.
 
12
  """
13
  from __future__ import annotations
14
+ import os
15
  import time
16
  import hashlib
17
  import statistics
18
  import pandas as pd
19
  from functools import lru_cache
20
+ from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
21
  from processor_regex import classify_with_regex
22
  from processor_bert import classify_batch as bert_batch
23
  from processor_llm import classify_with_llm
 
31
  "label": label,
32
  "tier": tier,
33
  "confidence": confidence,
 
34
  "latency_ms": round(latency_ms, 4),
35
  }
36
 
37
 
38
+ # ── Caching Layer (V5 UPDATE: Max RAM Eater) ────────────────────────────────
39
+ @lru_cache(maxsize=500000) # Increased to 500k to absorb duplicate logs in 2M+ datasets
40
  def cached_llm_call(log_hash: str, log_msg: str) -> str:
41
  """Only executes the expensive LLM call if the MD5 hash misses the cache."""
42
  return classify_with_llm(log_msg)
 
82
  bert_results = bert_batch(bert_msgs)
83
  t_bert_end = time.perf_counter()
84
 
 
 
85
  bert_ms_per_log = (t_bert_end - t_bert_start) * 1000 / len(bert_msgs)
86
 
87
  for idx, (label, conf) in zip(bert_indices, bert_results):
 
95
  def parallel_llm(idx):
96
  src, msg = logs[idx]
97
 
 
98
  log_hash = hashlib.md5(msg.encode('utf-8')).hexdigest()
99
 
100
  t_llm_0 = time.perf_counter()
 
102
  t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
103
 
104
  base_tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
 
 
105
  tier = f"{base_tier} (Cache Hit)" if t_llm_ms < 5 else f"{base_tier} (API Call)"
106
 
107
  return idx, _make_result(label, tier, None, t_llm_ms)
108
 
 
109
  with ThreadPoolExecutor(max_workers=4) as executor:
110
  llm_results = list(executor.map(parallel_llm, llm_indices))
111
 
 
137
  tier_stats[tier] = {
138
  "count": n,
139
  "pct": round(n / total * 100, 1),
 
140
  "p50_ms": round(statistics.median(latencies_sorted), 4),
141
  "p95_ms": round(latencies_sorted[min(int(n * 0.95), n - 1)], 4),
142
  "p99_ms": round(latencies_sorted[min(int(n * 0.99), n - 1)], 4),
143
  "mean_ms": round(statistics.mean(latencies_sorted), 4),
144
+ "total_ms": round(sum(latencies_sorted), 4),
145
  }
146
 
147
  return {
 
151
  }
152
 
153
 
154
+ # ── Multiprocessing Helper ───────────────────────────────────────────────────
155
+ def _process_chunk(chunk: list[tuple[str, str]]) -> list[dict]:
156
+ """Top-level helper function required for ProcessPoolExecutor mapping."""
157
+ return classify_logs(chunk)
158
+
159
+
160
+ # ── CSV batch classify (V5 UPDATE: Multi-Core Processor) ─────────────────────
161
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
162
  """
163
+ Ultra-Optimized Batch Processing for 2M+ Logs.
164
+ Uses ProcessPoolExecutor to max out CPU cores and gorge on RAM.
 
165
  """
166
  df = pd.read_csv(input_path)
167
  required = {"source", "log_message"}
 
169
  raise ValueError(f"Missing required columns in CSV. Expected: {required}. Found: {set(df.columns)}")
170
 
171
  log_pairs = list(zip(df["source"], df["log_message"]))
172
+ total_logs = len(log_pairs)
173
+
174
+ # OS ke liye 1 core chhod kar baaki sab use karo
175
+ max_cores = max(1, os.cpu_count() - 1)
176
+
177
+ # 50,000 logs ke chunks banao (Memory distribution)
178
+ chunk_size = 50000
179
+ chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
180
+
181
+ results = []
182
+
183
+ print(f"πŸ”₯ Firing up {max_cores} CPU cores to process {len(chunks)} chunks...")
184
+ with ProcessPoolExecutor(max_workers=max_cores) as executor:
185
+ for chunk_result in executor.map(_process_chunk, chunks):
186
+ results.extend(chunk_result)
187
 
188
  df["predicted_label"] = [r["label"] for r in results]
189
  df["tier_used"] = [r["tier"] for r in results]
 
203
 
204
  # ── Self-test ────────────────────────────────────────────────────────────────
205
  if __name__ == "__main__":
206
+ # Required for safe multiprocessing on Windows/macOS
207
+ # Ensures child processes don't recursively run the __main__ block
208
  sample = [
209
  ("ModernCRM", "IP 192.168.133.114 blocked due to potential attack"),
210
  ("BillingSystem", "User User12345 logged in."),
 
212
  ("ModernHR", "GET /v2/servers/detail HTTP/1.1 status: 200 len: 1583 time: 0.19"),
213
  ("ModernHR", "Admin access escalation detected for user 9429"),
214
  ("LegacyCRM", "Case escalation for ticket ID 7324 failed because the assigned support agent is no longer active."),
215
+ ("LegacyCRM", "Case escalation for ticket ID 7324 failed because the assigned support agent is no longer active."),
216
  ]
217
 
218
  print(f'{"Source":<20} {"Tier":<22} {"Conf":>6} {"Lat(ms)":>8} {"Label":<25} Log')
 
225
  summary = pipeline_summary(results)
226
  print("\nπŸ“Š Pipeline Summary:")
227
 
 
228
  for tier, stats in summary["tier_stats"].items():
229
  if tier == "BERT":
230
  print(f" BERT Batch Latency: {stats['total_ms']} ms (Amortized over {stats['count']} logs)")