File size: 8,896 Bytes
2222383
6ffecdd
2222383
 
 
 
 
6ffecdd
 
 
 
2222383
 
3f1a1d6
2222383
 
 
f2c6de4
6ffecdd
2222383
 
 
 
b90444b
 
2222383
 
 
 
 
 
 
 
f2c6de4
2222383
 
 
6ffecdd
a65e2b6
b90444b
a65e2b6
f2c6de4
 
 
2222383
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
de30f06
2222383
de30f06
2222383
de30f06
 
2222383
 
 
6ffecdd
2222383
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a65e2b6
217890c
 
 
f2c6de4
217890c
b90444b
217890c
f2c6de4
 
 
 
217890c
 
b90444b
217890c
 
 
 
2222383
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f2c6de4
 
 
 
3f1a1d6
2222383
 
 
 
 
 
 
 
 
6ffecdd
 
 
 
 
 
a65e2b6
2222383
 
3f1a1d6
6ffecdd
 
2222383
 
 
 
de30f06
2222383
 
3f1a1d6
 
6ffecdd
3f1a1d6
 
 
 
 
6ffecdd
b90444b
 
6ffecdd
 
 
 
b90444b
 
 
2222383
f2c6de4
 
 
 
2222383
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a65e2b6
2222383
 
a65e2b6
2222383
a65e2b6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"""
classify.py β€” 3-Tier Hybrid Pipeline (V7 β€” Maximum Speed & Sharded Caching)

Architecture:
  LegacyCRM β†’ LLM directly
  Others    β†’ Regex β†’ BERT (batch) β†’ LLM fallback

Changes in V7:
  - Unfroze the Gradio UI and restored Processing Speed: Brought back ProcessPoolExecutor for the outer CSV chunks to utilize ALL CPU cores.
  - LLM concurrency: ThreadPoolExecutor is retained inside classify_logs specifically for LLM I/O calls.
  - Cache Architecture: Using a "Sharded Cache" approach. Each CPU worker process gets its own 500k @lru_cache, which is perfectly safe for 18GB RAM and avoids GIL locks entirely.
"""
from __future__ import annotations
import os
import time
import statistics
import pandas as pd
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from processor_regex import classify_with_regex
from processor_bert  import classify_batch as bert_batch
from processor_llm   import classify_with_llm

# ── Config ──────────────────────────────────────────────────────────────────
LEGACY_SOURCE = os.getenv("LEGACY_SOURCE", "LegacyCRM")


# ── Result type ─────────────────────────────────────────────────────────────
def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
    return {
        "label":      label,
        "tier":       tier,
        "confidence": confidence,
        "latency_ms": round(latency_ms, 4), 
    }


# ── Caching Layer (Sharded per CPU Core) ────────────────────────────────────
@lru_cache(maxsize=500000) 
def cached_llm_call(log_msg: str) -> str:
    """Executes the expensive LLM call only if the string misses the cache."""
    return classify_with_llm(log_msg)


# ── Single log (backward-compatible) ────────────────────────────────────────
def classify_log(source: str, log_msg: str) -> dict:
    results = classify_logs([(source, log_msg)])
    return results[0]


# ── Batch pipeline (main entry point) ───────────────────────────────────────
def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
    n       = len(logs)
    results = [None] * n

    # ── Step 1: Route to groups ─────────────────────────────────────────────
    llm_indices   = []
    bert_indices  = []

    for i, (source, log_msg) in enumerate(logs):
        if source == LEGACY_SOURCE:
            llm_indices.append(i)
        else:
            t_start = time.perf_counter()
            label = classify_with_regex(log_msg)
            
            if label:
                latency_ms = (time.perf_counter() - t_start) * 1000
                results[i] = _make_result(label, "Regex", 1.0, latency_ms)
            else:
                bert_indices.append(i)

    # ── Step 2: BERT batch (CPU Bound - Uses full core without GIL) ─────────
    if bert_indices:
        bert_msgs = [logs[i][1] for i in bert_indices]

        t_bert_start = time.perf_counter()
        bert_results = bert_batch(bert_msgs)
        t_bert_end   = time.perf_counter()

        bert_ms_per_log = (t_bert_end - t_bert_start) * 1000 / len(bert_msgs)

        for idx, (label, conf) in zip(bert_indices, bert_results):
            if label != "Unclassified":
                results[idx] = _make_result(label, "BERT", conf, bert_ms_per_log)
            else:
                llm_indices.append(idx)

    # ── Step 3: LLM (I/O Bound - Threading Applied Here) ────────────────────
    if llm_indices:
        def parallel_llm(idx):
            src, msg = logs[idx]
            
            t_llm_0 = time.perf_counter()
            label = cached_llm_call(msg)
            t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
            
            base_tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
            tier = f"{base_tier} (Cache Hit)" if t_llm_ms < 5 else f"{base_tier} (API Call)"
            
            return idx, _make_result(label, tier, None, t_llm_ms)

        with ThreadPoolExecutor() as executor:
            llm_results = list(executor.map(parallel_llm, llm_indices))

        for idx, res in llm_results:
            results[idx] = res

    return results


# ── Pipeline summary ─────────────────────────────────────────────────────────
def pipeline_summary(results: list[dict]) -> dict:
    tier_groups: dict[str, list[float]] = {}
    label_counts: dict[str, int] = {}

    for r in results:
        tier = r["tier"]
        tier_groups.setdefault(tier, []).append(r["latency_ms"])
        label_counts[r["label"]] = label_counts.get(r["label"], 0) + 1

    total = len(results)
    tier_stats = {}
    for tier, latencies in tier_groups.items():
        latencies_sorted = sorted(latencies)
        n = len(latencies_sorted)
        tier_stats[tier] = {
            "count":    n,
            "pct":      round(n / total * 100, 1),
            "p50_ms":   round(statistics.median(latencies_sorted), 4),
            "p95_ms":   round(latencies_sorted[min(int(n * 0.95), n - 1)], 4),
            "p99_ms":   round(latencies_sorted[min(int(n * 0.99), n - 1)], 4),
            "mean_ms":  round(statistics.mean(latencies_sorted), 4),
            "total_ms": round(sum(latencies_sorted), 4), 
        }

    return {
        "total":        total,
        "tier_stats":   tier_stats,
        "label_counts": label_counts,
    }


# ── Multiprocessing Helper ───────────────────────────────────────────────────
def _process_chunk(chunk: list[tuple[str, str]]) -> list[dict]:
    """Top-level helper function required for ProcessPoolExecutor mapping."""
    return classify_logs(chunk)


# ── CSV batch classify (Hybrid Processing) ───────────────────────────────────
def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
    """
    Ultra-Optimized Batch Processing for 2M+ Logs.
    Outer chunks use ProcessPoolExecutor to smash through BERT on all CPU cores.
    Inner LLM calls automatically use ThreadPoolExecutor to handle network I/O.
    """
    df = pd.read_csv(input_path)
    required = {"source", "log_message"}
    if not required.issubset(df.columns):
        raise ValueError(f"Missing required columns in CSV. Expected: {required}. Found: {set(df.columns)}")

    log_pairs = list(zip(df["source"], df["log_message"]))
    total_logs = len(log_pairs)
    
    max_cores = max(1, os.cpu_count() - 1)
    chunk_size = 50000 
    chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
    
    results = []
    
    print(f"πŸ”₯ Firing up {max_cores} Process Cores... (BERT gets raw CPU, LLM gets Threads)")
    
    t_start = time.perf_counter()
    # Brought ProcessPoolExecutor back to unblock the CPU and UI
    with ProcessPoolExecutor(max_workers=max_cores) as executor:
        for chunk_result in executor.map(_process_chunk, chunks):
            results.extend(chunk_result)
    t_end = time.perf_counter()
    
    print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")

    df["predicted_label"] = [r["label"]       for r in results]
    df["tier_used"]       = [r["tier"]        for r in results]
    df["latency_ms"]      = [r["latency_ms"]  for r in results]
    df["confidence"]      = [
        f"{r['confidence']:.1%}" if r["confidence"] is not None else "N/A"
        for r in results
    ]

    df.to_csv(output_path, index=False)
    return output_path, df


# Aliases
classify = classify_logs


# ── Self-test ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    sample = [
        ("ModernCRM",       "IP 192.168.133.114 blocked due to potential attack"),
        ("BillingSystem",   "User User12345 logged in."),
        ("LegacyCRM",       "Case escalation failed due to active timeout."),
    ]

    print("Running quick test...")
    results = classify_logs(sample)
    print("Done. No errors.")