NOT-OMEGA commited on
Commit
a65e2b6
Β·
verified Β·
1 Parent(s): 5ef292a

Update classify.py

Browse files
Files changed (1) hide show
  1. classify.py +21 -63
classify.py CHANGED
@@ -1,16 +1,14 @@
1
  """
2
- classify.py β€” 3-Tier Hybrid Pipeline (V5 β€” Multiprocessing & Max RAM Utilization)
3
 
4
  Architecture:
5
  LegacyCRM β†’ LLM directly
6
  Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
 
8
- Changes in V5 (Patched):
9
- - Fixed multiprocessing: Switched to ThreadPoolExecutor to properly share the @lru_cache memory across chunks.
10
- - Eliminated MD5 cache key collision risks.
11
- - Removed hardcoded thread limits to avoid bottlenecks.
12
- - Extracted business rules (LegacyCRM) to environment variables.
13
- - Corrected misleading batch latency metrics.
14
  """
15
  from __future__ import annotations
16
  import os
@@ -37,25 +35,21 @@ def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
37
  }
38
 
39
 
40
- # ── Caching Layer (V5 UPDATE: Max RAM Eater) ────────────────────────────────
41
- @lru_cache(maxsize=500000) # Increased to 500k to absorb duplicate logs in 2M+ datasets
42
  def cached_llm_call(log_msg: str) -> str:
43
- """Only executes the expensive LLM call if the string misses the cache."""
44
  return classify_with_llm(log_msg)
45
 
46
 
47
  # ── Single log (backward-compatible) ────────────────────────────────────────
48
  def classify_log(source: str, log_msg: str) -> dict:
49
- """Classify a single log. Returns label, tier, confidence, and latency_ms."""
50
  results = classify_logs([(source, log_msg)])
51
  return results[0]
52
 
53
 
54
  # ── Batch pipeline (main entry point) ───────────────────────────────────────
55
  def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
56
- """
57
- Batch classify with 3-tier routing + per-result latency.
58
- """
59
  n = len(logs)
60
  results = [None] * n
61
 
@@ -76,7 +70,7 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
76
  else:
77
  bert_indices.append(i)
78
 
79
- # ── Step 2: BERT batch ──────────────────────────────────────────────────
80
  if bert_indices:
81
  bert_msgs = [logs[i][1] for i in bert_indices]
82
 
@@ -92,13 +86,12 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
92
  else:
93
  llm_indices.append(idx)
94
 
95
- # ── Step 3: LLM (Parallel Concurrency & Caching) ────────────────────────
96
  if llm_indices:
97
  def parallel_llm(idx):
98
  src, msg = logs[idx]
99
 
100
  t_llm_0 = time.perf_counter()
101
- # Removed redundant MD5 hashing; standard string key is safer and faster
102
  label = cached_llm_call(msg)
103
  t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
104
 
@@ -107,7 +100,6 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
107
 
108
  return idx, _make_result(label, tier, None, t_llm_ms)
109
 
110
- # Removed hardcoded max_workers=4 to allow auto-scaling based on CPU cores
111
  with ThreadPoolExecutor() as executor:
112
  llm_results = list(executor.map(parallel_llm, llm_indices))
113
 
@@ -119,10 +111,6 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
119
 
120
  # ── Pipeline summary ─────────────────────────────────────────────────────────
121
  def pipeline_summary(results: list[dict]) -> dict:
122
- """
123
- Aggregate stats from classify_logs output.
124
- Useful for dashboard and benchmark reporting.
125
- """
126
  tier_groups: dict[str, list[float]] = {}
127
  label_counts: dict[str, int] = {}
128
 
@@ -153,17 +141,12 @@ def pipeline_summary(results: list[dict]) -> dict:
153
  }
154
 
155
 
156
- # ── Multiprocessing Helper ───────────────────────────────────────────────────
157
- def _process_chunk(chunk: list[tuple[str, str]]) -> list[dict]:
158
- """Top-level helper function required for ThreadPoolExecutor mapping."""
159
- return classify_logs(chunk)
160
-
161
-
162
- # ── CSV batch classify (V5 UPDATE: Multi-Core Processor) ─────────────────────
163
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
164
  """
165
  Ultra-Optimized Batch Processing for 2M+ Logs.
166
- Uses ThreadPoolExecutor to share the massive lru_cache across chunks.
 
167
  """
168
  df = pd.read_csv(input_path)
169
  required = {"source", "log_message"}
@@ -173,22 +156,18 @@ def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str,
173
  log_pairs = list(zip(df["source"], df["log_message"]))
174
  total_logs = len(log_pairs)
175
 
176
- # OS ke liye 1 core chhod kar baaki sab use karo
177
- max_cores = max(1, os.cpu_count() - 1)
178
-
179
- # 50,000 logs ke chunks banao (Memory distribution)
180
  chunk_size = 50000
181
  chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
182
 
183
  results = []
184
 
185
- # Switched to ThreadPoolExecutor so memory (lru_cache) is shared properly
186
- print(f"πŸ”₯ Firing up {max_cores} worker threads to process {len(chunks)} chunks...")
187
 
188
  t_start = time.perf_counter()
189
- with ThreadPoolExecutor(max_workers=max_cores) as executor:
190
- for chunk_result in executor.map(_process_chunk, chunks):
191
- results.extend(chunk_result)
192
  t_end = time.perf_counter()
193
 
194
  print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
@@ -214,30 +193,9 @@ if __name__ == "__main__":
214
  sample = [
215
  ("ModernCRM", "IP 192.168.133.114 blocked due to potential attack"),
216
  ("BillingSystem", "User User12345 logged in."),
217
- ("AnalyticsEngine", "File data_6957.csv uploaded successfully by user User265."),
218
- ("ModernHR", "GET /v2/servers/detail HTTP/1.1 status: 200 len: 1583 time: 0.19"),
219
- ("ModernHR", "Admin access escalation detected for user 9429"),
220
- ("LegacyCRM", "Case escalation for ticket ID 7324 failed because the assigned support agent is no longer active."),
221
- ("LegacyCRM", "Case escalation for ticket ID 7324 failed because the assigned support agent is no longer active."),
222
  ]
223
 
224
- print(f'{"Source":<20} {"Tier":<22} {"Conf":>6} {"Lat(ms)":>8} {"Label":<25} Log')
225
- print("─" * 115)
226
  results = classify_logs(sample)
227
- for (source, log), r in zip(sample, results):
228
- conf = f"{r['confidence']:.0%}" if r["confidence"] else " N/A"
229
- print(f'{source:<20} {r["tier"]:<22} {conf:>6} {r["latency_ms"]:>8.4f} {r["label"]:<25} {log[:40]}')
230
-
231
- summary = pipeline_summary(results)
232
- print("\nπŸ“Š Pipeline Summary:")
233
-
234
- for tier, stats in summary["tier_stats"].items():
235
- if "Regex" in tier:
236
- print(f" Regex Latency: < 0.1 ms (Recorded p50: {stats['p50_ms']} ms) | count={stats['count']}")
237
- else:
238
- print(f" {tier}: {stats['count']} logs ({stats['pct']}%) | "
239
- f"p50={stats['p50_ms']}ms p95={stats['p95_ms']}ms p99={stats['p99_ms']}ms")
240
-
241
- print("\n🏷️ Label distribution:")
242
- for label, count in sorted(summary["label_counts"].items(), key=lambda x: -x[1]):
243
- print(f" β€’ {label}: {count}")
 
1
  """
2
+ classify.py β€” 3-Tier Hybrid Pipeline (V6 β€” Hybrid Concurrency)
3
 
4
  Architecture:
5
  LegacyCRM β†’ LLM directly
6
  Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
 
8
+ Changes in V6 (Final Polish):
9
+ - Solved the GIL Bottleneck: Chunks run sequentially so BERT gets 100% CPU without thread-locking.
10
+ - LLM concurrency: ThreadPoolExecutor is retained ONLY for LLM I/O calls.
11
+ - Perfect Caching: Outer chunks are sequential, meaning the 500k @lru_cache stays in the main process memory for the entire 2M logs.
 
 
12
  """
13
  from __future__ import annotations
14
  import os
 
35
  }
36
 
37
 
38
+ # ── Caching Layer (Max RAM Eater) ───────────────────────────────────────────
39
+ @lru_cache(maxsize=500000)
40
  def cached_llm_call(log_msg: str) -> str:
41
+ """Executes the expensive LLM call only if the string misses the cache."""
42
  return classify_with_llm(log_msg)
43
 
44
 
45
  # ── Single log (backward-compatible) ────────────────────────────────────────
46
  def classify_log(source: str, log_msg: str) -> dict:
 
47
  results = classify_logs([(source, log_msg)])
48
  return results[0]
49
 
50
 
51
  # ── Batch pipeline (main entry point) ───────────────────────────────────────
52
  def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
 
 
 
53
  n = len(logs)
54
  results = [None] * n
55
 
 
70
  else:
71
  bert_indices.append(i)
72
 
73
+ # ── Step 2: BERT batch (CPU Bound - No Threads Allowed Here) ────────────
74
  if bert_indices:
75
  bert_msgs = [logs[i][1] for i in bert_indices]
76
 
 
86
  else:
87
  llm_indices.append(idx)
88
 
89
+ # ── Step 3: LLM (I/O Bound - Threading Applied Here) ────────────────────
90
  if llm_indices:
91
  def parallel_llm(idx):
92
  src, msg = logs[idx]
93
 
94
  t_llm_0 = time.perf_counter()
 
95
  label = cached_llm_call(msg)
96
  t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
97
 
 
100
 
101
  return idx, _make_result(label, tier, None, t_llm_ms)
102
 
 
103
  with ThreadPoolExecutor() as executor:
104
  llm_results = list(executor.map(parallel_llm, llm_indices))
105
 
 
111
 
112
  # ── Pipeline summary ─────────────────────────────────────────────────────────
113
  def pipeline_summary(results: list[dict]) -> dict:
 
 
 
 
114
  tier_groups: dict[str, list[float]] = {}
115
  label_counts: dict[str, int] = {}
116
 
 
141
  }
142
 
143
 
144
+ # ── CSV batch classify (Hybrid Processing) ───────────────────────────────────
 
 
 
 
 
 
145
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
146
  """
147
  Ultra-Optimized Batch Processing for 2M+ Logs.
148
+ Outer chunks run sequentially (bypasses GIL for BERT, preserves main memory cache).
149
+ Inner LLM calls thread automatically inside classify_logs.
150
  """
151
  df = pd.read_csv(input_path)
152
  required = {"source", "log_message"}
 
156
  log_pairs = list(zip(df["source"], df["log_message"]))
157
  total_logs = len(log_pairs)
158
 
159
+ # Chunk size controls how much data is in RAM at once
 
 
 
160
  chunk_size = 50000
161
  chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
162
 
163
  results = []
164
 
165
+ print(f"πŸ”₯ Processing {len(chunks)} chunks... (BERT handles CPU batching, LLM handles I/O threads)")
 
166
 
167
  t_start = time.perf_counter()
168
+ # Process sequentially to avoid GIL locks on BERT and keep the cache in one memory block
169
+ for chunk in chunks:
170
+ results.extend(classify_logs(chunk))
171
  t_end = time.perf_counter()
172
 
173
  print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
 
193
  sample = [
194
  ("ModernCRM", "IP 192.168.133.114 blocked due to potential attack"),
195
  ("BillingSystem", "User User12345 logged in."),
196
+ ("LegacyCRM", "Case escalation failed due to active timeout."),
 
 
 
 
197
  ]
198
 
199
+ print("Running quick test...")
 
200
  results = classify_logs(sample)
201
+ print("Done. No errors.")