File size: 8,316 Bytes
2222383
1a9b340
 
 
 
 
 
 
 
 
 
2222383
 
3f1a1d6
2222383
 
 
f2c6de4
7d3f899
2222383
 
 
 
b90444b
 
2222383
1a9b340
2222383
 
 
 
 
 
f2c6de4
2222383
 
1a9b340
 
 
b90444b
1a9b340
f2c6de4
 
1a9b340
 
8ca0e43
 
 
 
1a9b340
 
2222383
 
 
 
1a9b340
2222383
 
 
 
 
 
 
de30f06
2222383
de30f06
2222383
de30f06
 
2222383
 
 
1a9b340
2222383
 
1a9b340
2222383
 
 
 
 
 
 
 
 
 
 
 
1a9b340
217890c
 
 
1a9b340
217890c
b90444b
217890c
f2c6de4
 
 
 
217890c
 
1a9b340
 
 
 
 
2222383
 
 
1a9b340
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2d11b15
1a9b340
2d11b15
 
1a9b340
 
2222383
1a9b340
 
 
2222383
 
 
1a9b340
2222383
 
3f1a1d6
 
1a9b340
 
 
 
 
 
 
2d11b15
 
 
 
1a9b340
3f1a1d6
ca8312a
1a9b340
2d11b15
 
b90444b
2d11b15
b90444b
2222383
f2c6de4
 
 
 
2222383
 
 
 
 
 
 
1a9b340
 
ace5ccf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
"""
classify.py β€” 3-Tier Hybrid Pipeline (V9 β€” Balanced CPU & Gradio Safe)

Architecture:
  LegacyCRM β†’ LLM directly
  Others    β†’ Regex β†’ BERT (batch) β†’ LLM fallback

Changes in V9:
  - Fixed CPU Starvation: Limited max_workers to half the CPU cores to prevent Gradio WebSocket timeouts.
  - Reduced IPC Overhead: Lowered chunk_size to 10,000 to prevent CPU lockups during cross-process data pickling.
  - Restored Multi-processing: Outer chunks use ProcessPoolExecutor for speed, inner LLM calls use ThreadPoolExecutor.
"""
from __future__ import annotations
import os
import time
import statistics
import pandas as pd
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from processor_regex import classify_with_regex
from processor_bert  import classify_batch as bert_batch
from processor_llm   import classify_with_llm

# ── Config ──────────────────────────────────────────────────────────────────
LEGACY_SOURCE = os.getenv("LEGACY_SOURCE", "LegacyCRM")


# ── Result type ─────────────────────────────────────────────────────────────
def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
    return {
        "label":      label,
        "tier":       tier,
        "confidence": confidence,
        "latency_ms": round(latency_ms, 4), 
    }


# ── Caching Layer (Sharded per Worker) ──────────────────────────────────────
@lru_cache(maxsize=500000) 
def cached_llm_call(log_msg: str) -> str:
    """Executes the expensive LLM call only if the string misses the cache."""
    return classify_with_llm(log_msg)


# ── Single log (backward-compatible) ────────────────────────────────────────
def classify_log(source: str, log_msg: str) -> dict:
    results = classify_logs([(source, log_msg)])
    return results[0]


# ── Batch pipeline (main entry point) ───────────────────────────────────────
def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
    n       = len(logs)
    results = [None] * n

    # ── Step 1: Route to groups ─────────────────────────────────────────────
    llm_indices   = []
    bert_indices  = []

    for i, (source, log_msg) in enumerate(logs):
        if source == LEGACY_SOURCE:
            llm_indices.append(i)
        else:
            t_start = time.perf_counter()
            label = classify_with_regex(log_msg)
            
            if label:
                latency_ms = (time.perf_counter() - t_start) * 1000
                results[i] = _make_result(label, "Regex", 1.0, latency_ms)
            else:
                bert_indices.append(i)

    # ── Step 2: BERT batch (CPU Bound) ──────────────────────────────────────
    if bert_indices:
        bert_msgs = [logs[i][1] for i in bert_indices]

        t_bert_start = time.perf_counter()
        bert_results = bert_batch(bert_msgs)
        t_bert_end   = time.perf_counter()

        bert_ms_per_log = (t_bert_end - t_bert_start) * 1000 / len(bert_msgs)

        for idx, (label, conf) in zip(bert_indices, bert_results):
            if label != "Unclassified":
                results[idx] = _make_result(label, "BERT", conf, bert_ms_per_log)
            else:
                llm_indices.append(idx)

    # ── Step 3: LLM (I/O Bound - Threading Applied Here) ────────────────────
    if llm_indices:
        def parallel_llm(idx):
            src, msg = logs[idx]
            
            t_llm_0 = time.perf_counter()
            label = cached_llm_call(msg)
            t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
            
            base_tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
            tier = f"{base_tier} (Cache Hit)" if t_llm_ms < 5 else f"{base_tier} (API Call)"
            
            return idx, _make_result(label, tier, None, t_llm_ms)

        with ThreadPoolExecutor() as executor:
            llm_results = list(executor.map(parallel_llm, llm_indices))

        for idx, res in llm_results:
            results[idx] = res

    return results


# ── Pipeline summary ─────────────────────────────────────────────────────────
def pipeline_summary(results: list[dict]) -> dict:
    tier_groups: dict[str, list[float]] = {}
    label_counts: dict[str, int] = {}

    for r in results:
        tier = r["tier"]
        tier_groups.setdefault(tier, []).append(r["latency_ms"])
        label_counts[r["label"]] = label_counts.get(r["label"], 0) + 1

    total = len(results)
    tier_stats = {}
    for tier, latencies in tier_groups.items():
        latencies_sorted = sorted(latencies)
        n = len(latencies_sorted)
        tier_stats[tier] = {
            "count":    n,
            "pct":      round(n / total * 100, 1),
            "p50_ms":   round(statistics.median(latencies_sorted), 4),
            "p95_ms":   round(latencies_sorted[min(int(n * 0.95), n - 1)], 4),
            "p99_ms":   round(latencies_sorted[min(int(n * 0.99), n - 1)], 4),
            "mean_ms":  round(statistics.mean(latencies_sorted), 4),
            "total_ms": round(sum(latencies_sorted), 4), 
        }

    return {
        "total":        total,
        "tier_stats":   tier_stats,
        "label_counts": label_counts,
    }


# ── Multiprocessing Helper ───────────────────────────────────────────────────
def _process_chunk(chunk: list[tuple[str, str]]) -> list[dict]:
    """Top-level helper function required for ProcessPoolExecutor mapping."""
    return classify_logs(chunk)


# ── CSV batch classify (Balanced Processing) ─────────────────────────────────
def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
    """
    Balanced Batch Processing to prevent CPU Starvation UI crashes.
    """
    df = pd.read_csv(input_path)
    required = {"source", "log_message"}
    if not required.issubset(df.columns):
        raise ValueError(f"Missing required columns in CSV. Expected: {required}. Found: {set(df.columns)}")

    log_pairs = list(zip(df["source"], df["log_message"]))
    total_logs = len(log_pairs)
    
    # FIX: Use exactly half of the available CPU cores (minimum 1). 
    # This leaves the other half for Gradio websockets and the OS.
    safe_cores = max(1, os.cpu_count() // 2)
    
    # FIX: Reduce chunk size to 10,000. 
    # Massive chunks cause CPU lockups during inter-process data pickling.
    chunk_size = 10000 
    chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
    
    results = []
    
    print(f"πŸ”₯ Firing up {safe_cores} CPU Cores (Leaving remaining for UI)...")
    
    t_start = time.perf_counter()
    with ProcessPoolExecutor(max_workers=safe_cores) as executor:
        for chunk_result in executor.map(_process_chunk, chunks):
            results.extend(chunk_result)
    t_end = time.perf_counter()
    
    print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")

    df["predicted_label"] = [r["label"]       for r in results]
    df["tier_used"]       = [r["tier"]        for r in results]
    df["latency_ms"]      = [r["latency_ms"]  for r in results]
    df["confidence"]      = [
        f"{r['confidence']:.1%}" if r["confidence"] is not None else "N/A"
        for r in results
    ]

    df.to_csv(output_path, index=False)
    return output_path, df


# Aliases
classify = classify_logs