File size: 24,089 Bytes
1ac9f32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
import os
import re
from urllib.parse import urlparse
from itertools import chain
import pandas as pd
import numpy as np
from transformers import pipeline
import torch
import emoji
from collections import Counter

# Pre-compiled Regex Patterns for Analytics (V5.4 Optimization)
# Module-level compilation avoids redundant overhead in high-traffic request cycles.
STRESS_RE = re.compile(r'work|tired|sad|stressed|deadline|exhausted|unhappy|worry|anxious|sick|bad day|hard time')
AFFIRMATIVE_RE = re.compile(r'love|thanks|happy|we|miss|appreciate|glad|proud|beautiful|care')
DISMISSIVE_RE = re.compile(r'whatever|fine|okay|sure|k|ok|busy|tired|idk|anyway')

# Topic-specific regexes
LOGISTICS_RE = re.compile(r'dinner|lunch|bill|home|work|done|todo|buy|shop|cleaning')
EXTERNAL_RE = re.compile(r'friends|party|movie|news|gym|weather|job')
CONFLICT_RE = re.compile(r'sorry|why|fight|angry|stop|listen|mean|hurt|annoyed')
INTIMACY_RE = re.compile(r'love|miss|baby|darling|honey|kiss|hug|beautiful|forever')
BONDING_RE = re.compile(r'miss|love|haha|lol|fun|crazy|remember|bro|dude|bestie')
COLLABORATION_RE = re.compile(r'help|thanks|appreciate|great|good job|team|meeting|sync')
FALLBACK_BONDING_RE = re.compile(r'miss|care|fun')

sentiment_pipeline = None

def get_sentiment_pipeline():
    """Lazy load and quantize the Hinglish sentiment model on CPU."""
    global sentiment_pipeline
    if sentiment_pipeline is None:
        print("Loading and quantizing Hinglish sentiment model...")
        model_name = "pascalrai/hinglish-twitter-roberta-base-sentiment"
        
        # Determine device-downloaded model dir (Docker) or fall back to HuggingFace cache (local dev)
        model_dir = os.environ.get("MODEL_DIR")
        model_kwargs = {"model": model_name, "device": -1}
        if model_dir and os.path.isdir(model_dir):
            print(f"Loading model from local directory: {model_dir}")
            model_kwargs["model"] = model_dir
        
        sentiment_pipeline = pipeline("sentiment-analysis", **model_kwargs)
        # Apply dynamic quantization to Linear layers for 50% RAM reduction
        sentiment_pipeline.model = torch.quantization.quantize_dynamic(
            sentiment_pipeline.model, 
            {torch.nn.Linear}, 
            dtype=torch.qint8
        )
        print("Model loaded successfully.")
    return sentiment_pipeline

def validate_cloud_url(url: str) -> bool:
    """
    Validates that the provided cloud GPU URL is secure and matches the allowed domain.
    Prevents SSRF by enforcing HTTPS and restricting to *.lit.ai.
    """
    if not url:
        return False
    try:
        # 🛡️ Sentinel: Reject URLs with '@' to prevent credential-based SSRF bypasses
        if '@' in url:
            return False

        parsed = urlparse(url)
        # 🛡️ Sentinel: Use hostname instead of netloc to handle ports and auth safely
        hostname = parsed.hostname
        if not hostname:
            return False

        # Enforce HTTPS and restrict to Lightning AI domain (*.lit.ai)
        if parsed.scheme == 'https' and hostname.endswith('.lit.ai'):
            return True
        return False
    except Exception:
        return False

def calculate_latency(df: pd.DataFrame) -> pd.DataFrame:
    # DF is already sorted and index reset by run_analytics_pipeline
    df['prev_sender'] = df['sender'].shift(1)
    df['prev_timestamp'] = df['timestamp'].shift(1)
    
    df['gap_mins'] = (df['timestamp'] - df['prev_timestamp']).dt.total_seconds() / 60.0
    
    # Valid reply: Different sender, gap <= 24 hours (1440 mins)
    valid_reply_mask = (df['sender'] != df['prev_sender']) & (df['gap_mins'] <= 1440)
    
    df['latency_mins'] = np.nan
    df.loc[valid_reply_mask, 'latency_mins'] = df.loc[valid_reply_mask, 'gap_mins']
    
    # We preserve gap_mins for downstream functions like initiator_ratio and reengagement
    df.drop(columns=['prev_sender', 'prev_timestamp'], inplace=True)
    return df

def apply_sentiment(df: pd.DataFrame, hf_url: str = "", text_str: pd.Series = None) -> pd.DataFrame:
    # We only score PARTNER messages for the risk algorithm
    partner_mask = df['sender'] == 'PARTNER'
    
    # ⚡ Bolt Optimization: Use pre-calculated text_str if provided to avoid redundant astype(str)
    t_series = text_str if text_str is not None else df.loc[partner_mask, 'text'].astype(str)
    partner_msgs_series = t_series[partner_mask].str[:512] if text_str is not None else t_series.str[:512]
    partner_msgs = partner_msgs_series.tolist()
    
    sentiment_scores = []
    
    if hf_url:
        # 🛡️ Sentinel: Validate URL to prevent SSRF
        if not validate_cloud_url(hf_url):
            raise ValueError("Security Error: Invalid cloud GPU URL. Must be a secure https://*.lit.ai endpoint.")

        print(f"Offloading sentiment analysis of {len(partner_msgs)} messages to Cloud GPU...")
        import requests
        import concurrent.futures
        import time as _time
        
        # Ensure URL has /analyze endpoint precisely once
        base_url = hf_url.rstrip('/').replace('/analyze', '')
        api_endpoint = base_url + "/analyze"
        
        chunk_size = 1500 # Send in batches of 1500 to prevent payload too large/timeouts
        total_chunks = (len(partner_msgs) + chunk_size - 1) // chunk_size
        
        sentiment_scores = [0] * len(partner_msgs)
        
        MAX_RETRIES = 3
        BASE_TIMEOUT = 120  # seconds; increased from 90 to handle cold starts
        
        def fetch_chunk(chunk, chunk_index, start_idx):
            """Send a chunk to the Cloud GPU. Retries up to MAX_RETRIES on failure."""
            last_error = None
            for attempt in range(1, MAX_RETRIES + 1):
                timeout = BASE_TIMEOUT + (attempt - 1) * 60  # 120s, 180s, 240s
                try:
                    print(f"  Chunk {chunk_index}/{total_chunks} ({len(chunk)} msgs) → Cloud GPU (attempt {attempt}/{MAX_RETRIES}, timeout={timeout}s)...")
                    response = requests.post(
                        api_endpoint,
                        json={"texts": chunk},
                        headers={"Content-Type": "application/json"},
                        timeout=timeout,
                        allow_redirects=False  # 🛡️ Sentinel: Prevent SSRF redirect bypass
                    )
                    response.raise_for_status()
                    result = response.json()
                    if "scores" in result:
                        print(f"  ✓ Chunk {chunk_index}/{total_chunks} completed.")
                        return start_idx, result["scores"]
                    else:
                        raise ValueError(f"Invalid API response format for chunk {chunk_index}: missing 'scores' key")
                except Exception as e:
                    last_error = e
                    if attempt < MAX_RETRIES:
                        wait = 5 * attempt
                        print(f"  ✗ Chunk {chunk_index} attempt {attempt} failed ({e}). Retrying in {wait}s...")
                        _time.sleep(wait)
            # All retries exhausted — propagate the error (NO local fallback)
            raise RuntimeError(f"Chunk {chunk_index} failed after {MAX_RETRIES} attempts: {last_error}")
            
        chunks_data = []
        for i in range(0, len(partner_msgs), chunk_size):
            chunk = partner_msgs[i:i + chunk_size]
            chunk_index = (i // chunk_size) + 1
            chunks_data.append((chunk, chunk_index, i))
        
        try:
            with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
                futures = [executor.submit(fetch_chunk, c, ci, si) for c, ci, si in chunks_data]
                for future in concurrent.futures.as_completed(futures):
                    start_idx, scores = future.result()
                    for idx, score in enumerate(scores):
                        if start_idx + idx < len(sentiment_scores):
                            sentiment_scores[start_idx + idx] = score
                             
        except Exception as e:
            print(f"CRITICAL: Cloud GPU offload failed: {e}.")
            # No local fallback when a cloud URL is provided — this prevents 70k+ messages from locking up the CPU.
            raise RuntimeError(f"Cloud Sentiment Analysis Failed: {e}. Check your Lightning Studio instance.")
            
    else:
        # ONLY run local scoring if NO cloud URL was provided at all
        if partner_msgs:
            pipe = get_sentiment_pipeline()
            batch_size = 32
            print(f"Scoring {len(partner_msgs)} messages locally (pipeline batch_size={batch_size})...")

            try:
                # Performance Optimization (V5.2): Leverage Transformers native batching.
                results = pipe(partner_msgs, batch_size=batch_size)
                
                # Performance Optimization (V5.3): Replaced multiple str.contains scans
                # and np.select with a direct O(1) dictionary map for label-to-score conversion.
                labels = pd.Series([r['label'].lower() for r in results])

                label_map = {
                    'label_0': -1, 'negative': -1,
                    'label_1': 0, 'neutral': 0,
                    'label_2': 1, 'positive': 1
                }
                sentiment_scores = labels.map(label_map).fillna(0).astype(int).tolist()

            except Exception as e:
                print(f"Local sentiment analysis failed: {e}")
                sentiment_scores = [0] * len(partner_msgs)
                
    df['sentiment'] = 0
    if sentiment_scores:
        df.loc[partner_mask, 'sentiment'] = sentiment_scores
    
    return df

def aggregate_weekly(df: pd.DataFrame) -> pd.DataFrame:
    # Anchor to Monday - Vectorized
    df['week_start'] = df['timestamp'].dt.to_period('W').dt.start_time
    
    # Pre-calculate filtered sentiment for vectorized aggregation
    df['_partner_sent'] = df['sentiment'].where(df['sender'] == 'PARTNER')

    # Aggregate using vectorized .agg() instead of slow .apply()
    weekly = df.groupby('week_start').agg(
        volume=('sentiment', 'size'),
        median_latency=('latency_mins', 'median'),
        mean_sentiment=('_partner_sent', 'mean')
    ).reset_index()

    # Clean up temporary column
    df.drop(columns=['_partner_sent'], inplace=True)
    
    weekly.fillna({'median_latency': 0, 'mean_sentiment': 0}, inplace=True)
    return weekly

def calculate_emoji_frequency(df: pd.DataFrame, text_str: pd.Series = None) -> dict:
    """Extract top-10 emoji usage per sender. Must be called BEFORE privacy firewall drops text."""
    result = {}
    # Use pre-calculated string series if provided to avoid redundant astype(str)
    t_series = text_str if text_str is not None else df['text'].astype(str)
    for sender in ['ME', 'PARTNER']:
        mask = df['sender'] == sender
        # Performance Optimization (V5.1): Use itertools.chain with Counter to eliminate
        # the manual Python loop. This delegates character-level iteration to C-level
        # routines while maintaining O(N_unique_chars) calls to emoji.is_emoji().
        counts_all = Counter(chain.from_iterable(t_series[mask]))

        emoji_counts = {char: count for char, count in counts_all.items() if emoji.is_emoji(char)}
        counts = Counter(emoji_counts).most_common(10)
        result[sender] = [{'emoji': e, 'count': c} for e, c in counts]
    return result

def calculate_initiator_ratio(df: pd.DataFrame) -> dict:
    """Count conversation initiations. An initiation = message after a >=4 hour gap."""
    # Optimization: DF is already sorted by calculate_latency at the start of the pipeline
    if len(df) < 2:
        return {'me_initiations': 0, 'partner_initiations': 0, 'me_ratio': 0.0}
    
    gap_threshold_mins = 240  # 4 hours
    
    # First message is always an initiation
    initiation_mask = (df['gap_mins'] >= gap_threshold_mins) | (df.index == 0)
    initiations = df.loc[initiation_mask]
    
    me_count = int((initiations['sender'] == 'ME').sum())
    partner_count = int((initiations['sender'] == 'PARTNER').sum())
    total = me_count + partner_count
    
    return {
        'me_initiations': me_count,
        'partner_initiations': partner_count,
        'me_ratio': round(me_count / total, 4) if total > 0 else 0.0
    }

def calculate_risk_score(weekly_df: pd.DataFrame) -> pd.DataFrame:
    if weekly_df.empty: return weekly_df
    
    # Sentiment: -1 (bad) to 1 (good). Inverted: 1 (high risk) to 0 (low risk)
    weekly_df['sentiment_inv'] = (1 - weekly_df['mean_sentiment']) / 2.0
    
    # Latency: Normalize 0 to 1
    max_lat = weekly_df['median_latency'].max()
    min_lat = weekly_df['median_latency'].min()
    if max_lat > min_lat:
        weekly_df['latency_norm'] = (weekly_df['median_latency'] - min_lat) / (max_lat - min_lat)
    else:
        weekly_df['latency_norm'] = 0
        
    # Volume: Normalize and Invert
    max_vol = weekly_df['volume'].max()
    min_vol = weekly_df['volume'].min()
    if max_vol > min_vol:
        vol_norm = (weekly_df['volume'] - min_vol) / (max_vol - min_vol)
        weekly_df['volume_inv'] = 1.0 - vol_norm
    else:
        weekly_df['volume_inv'] = 0
        
    # Formula from PRD 2.0
    weekly_df['risk_score'] = (0.5 * weekly_df['sentiment_inv']) + (0.3 * weekly_df['latency_norm']) + (0.2 * weekly_df['volume_inv'])
    
    # Round metrics for clean UI
    weekly_df['risk_score'] = weekly_df['risk_score'].round(4)
    weekly_df['mean_sentiment'] = weekly_df['mean_sentiment'].round(4)
    weekly_df['median_latency'] = weekly_df['median_latency'].round(2)
    
    return weekly_df

def detect_risk_phases(weekly_df: pd.DataFrame) -> pd.DataFrame:
    """Label each week with a relationship phase based on risk score."""
    def _phase(score):
        if score < 0.3: return 'Honeymoon'
        elif score < 0.6: return 'Stable'
        elif score < 0.85: return 'Tension'
        else: return 'Danger'
    
    if not weekly_df.empty:
        weekly_df['phase'] = weekly_df['risk_score'].apply(_phase)
    return weekly_df

def calculate_power_dynamics(df: pd.DataFrame, text_str: pd.Series = None) -> dict:
    """Calculate the Word Count ratio to establish Power Dynamics (V3.0)."""
    if 'text' not in df.columns: return {}
    
    # Optimization: Use str.count for faster vectorized word counting
    # Use pre-calculated string series if provided
    t_series = text_str if text_str is not None else df['text'].astype(str)
    df['word_count'] = t_series.str.count(r'\S+')
    counts = df.groupby('sender')['word_count'].sum().to_dict()
    
    me_words = int(counts.get('ME', 0))
    partner_words = int(counts.get('PARTNER', 0))
    
    # Ratio: ME / PARTNER. If > 1, ME is dominating the conversation volume.
    ratio = float(round(me_words / partner_words, 2)) if partner_words > 0 else 0.0
    
    return {
        'me_word_count': me_words,
        'partner_word_count': partner_words,
        'power_ratio': ratio
    }

def calculate_affection_friction(df: pd.DataFrame, text_lower: pd.Series = None) -> dict:
    """Detect 'Burnout' via affirmative vs dismissive language trends (V3.0)."""
    if 'text' not in df.columns: return {}
    
    # Use pre-calculated lowercased series if provided
    text_lower = text_lower if text_lower is not None else df['text'].astype(str).str.lower()
    
    # Performance Optimization (V5.4): Use pre-compiled module-level regexes.
    aff_count = text_lower.str.contains(AFFIRMATIVE_RE).sum()
    dis_count = text_lower.str.contains(DISMISSIVE_RE).sum()
    
    return {
        'affirmative_count': int(aff_count),
        'dismissive_count': int(dis_count)
    }

def calculate_support_gap(df: pd.DataFrame, text_lower: pd.Series = None, text_str: pd.Series = None) -> dict:
    """Identify stress messages and measure partner's response quality (V4.0)."""
    if 'text' not in df.columns or len(df) < 5: return {}
    
    # Use input df directly as it is already sorted
    df_temp = df

    # Use pre-calculated series if provided
    t_lower = text_lower if text_lower is not None else df_temp['text'].astype(str).str.lower()

    # Performance Optimization (V5.4): Use pre-compiled STRESS_RE.
    # Vectorized stress detection outside the loop is much faster
    is_stress = t_lower.str.contains(STRESS_RE).values

    # Performance Optimization (V5.3): Refactored the Python loop to use integer indexing
    # and NumPy-native state tracking. This eliminates multiple dictionary lookups
    # and string key overhead in every iteration of the hot loop (O(N)).
    senders = df_temp['sender'].values
    # ME -> 0, PARTNER -> 1
    s_idx = (senders == 'PARTNER').astype(np.int8)
    timestamps = df_temp['timestamp'].values
    
    # Pre-calculate message lengths to avoid calling len() in the loop
    text_lens = text_str.str.len().values if text_str is not None else df_temp['text'].astype(str).str.len().values
    
    # State tracking using arrays (index 0: ME, index 1: PARTNER)
    stress_counts = np.zeros(2, dtype=np.int32)
    support_received = np.zeros(2, dtype=np.int32)
    active_stress_ts = np.full(2, np.datetime64('NaT'), dtype=timestamps.dtype)
    
    # Comparison threshold for response time
    threshold = np.timedelta64(60, 'm')

    for i in range(len(s_idx)):
        s = s_idx[i]
        ts = timestamps[i]
        
        # Did this person just send a stress message?
        if is_stress[i]:
            stress_counts[s] += 1
            active_stress_ts[s] = ts
            
        # Did this person just respond to the OTHER person's stress message?
        other_s = 1 - s # Flip 0 to 1, 1 to 0
        ast = active_stress_ts[other_s]

        if not np.isnat(ast):
            # Direct comparison of timedeltas avoids division overhead
            if (ts - ast) <= threshold and text_lens[i] > 10:
                support_received[other_s] += 1
                # Clear their stress state so we don't double count
                active_stress_ts[other_s] = np.datetime64('NaT')
                
    return {
        'ME': {'stress_count': int(stress_counts[0]), 'support_received': int(support_received[0])},
        'PARTNER': {'stress_count': int(stress_counts[1]), 'support_received': int(support_received[1])}
    }

def calculate_reengagement(df: pd.DataFrame) -> dict:
    """Detect who reaches out first after a long silence (> 24h) (V4.0)."""
    # Optimization: DF is already sorted
    if len(df) < 10: return {}
    
    # We already have gap_mins from calculate_latency
    # Long silence = gap > 24 hours (1440 mins)
    reengagements = df[df['gap_mins'] > 1440]
    counts = reengagements['sender'].value_counts().to_dict()
    
    return {
        'me_reengagements': int(counts.get('ME', 0)),
        'partner_reengagements': int(counts.get('PARTNER', 0))
    }

def calculate_linguistic_mirroring(df: pd.DataFrame, text_lower: pd.Series = None) -> dict:
    """Measure how frequently partners adopt each others vocabulary (V4.0)."""
    if 'text' not in df.columns or len(df) < 100:
        return {}
    
    # Simplified approach: Look for rare punctuation/emoji habits or unique high-frequency words
    punctuation_habits = ['!!!', '...', '??', 'haha', 'lol', 'lmao']
    
    results = {}
    # Optimization: Use vectorized .str.contains().any() to avoid massive string joins
    # Joining 100k messages into one string causes major memory spikes and slow search.
    # Use pre-calculated lowercased series if provided
    text_lower = text_lower if text_lower is not None else df['text'].astype(str).str.lower()
    
    # Pre-calculate habit presence for each sender using vectorized operations
    habit_presence = {}
    for sender in ['ME', 'PARTNER']:
        sender_mask = df['sender'] == sender
        sender_msgs = text_lower[sender_mask]
        habit_presence[sender] = {
            habit: sender_msgs.str.contains(habit, regex=False).any()
            for habit in punctuation_habits
        }

    for sender in ['ME', 'PARTNER']:
        other = 'PARTNER' if sender == 'ME' else 'ME'
        mirror_score = sum(
            1 for habit in punctuation_habits
            if habit_presence[sender][habit] and habit_presence[other][habit]
        )
        results[f"{sender}_mirroring"] = mirror_score
        
    return results

def calculate_topic_mix(df: pd.DataFrame, connection_type: str, text_lower: pd.Series = None) -> dict:
    """Categorize conversation dynamically based on connection type (V4.0)."""
    if 'text' not in df.columns: return {}
    
    # Performance Optimization (V5.4): Use pre-compiled module-level regexes.
    if connection_type == 'romantic':
        categories = {'Logistics': LOGISTICS_RE, 'Intimacy': INTIMACY_RE, 'Conflict': CONFLICT_RE, 'External': EXTERNAL_RE}
    elif connection_type in ['friendship', 'casual', 'family']:
        categories = {'Logistics': LOGISTICS_RE, 'Bonding': BONDING_RE, 'Disagreement': CONFLICT_RE, 'External': EXTERNAL_RE}
    elif connection_type == 'professional':
        categories = {'Operations': LOGISTICS_RE, 'Collaboration': COLLABORATION_RE, 'Blockers': CONFLICT_RE, 'External': EXTERNAL_RE}
    else:
        categories = {'Logistics': LOGISTICS_RE, 'Bonding': FALLBACK_BONDING_RE, 'Conflict': CONFLICT_RE, 'External': EXTERNAL_RE}
    
    # Use pre-calculated lowercased series if provided
    text_lower = text_lower if text_lower is not None else df['text'].astype(str).str.lower()
    results = {}
    
    for cat, regex in categories.items():
        results[cat] = int(text_lower.str.contains(regex).sum())
        
    return results

def run_analytics_pipeline(df: pd.DataFrame, hf_url: str = "", connection_type: str = "romantic") -> dict:
    """Runs the full analytics pipeline and returns a dict with weekly stats, emoji freq, and initiator ratio."""
    # ⚡ Bolt Optimization (V5.4): Reset index at entry to ensure alignment for pre-calculated
    # series and remove redundant O(N) reset_index calls from downstream functions.
    # DF is already sorted by timestamp in app.py.
    df = df.reset_index(drop=True)

    # ⚡ Bolt Optimization: Pre-calculate common series once at the pipeline entry
    # to avoid redundant O(N) operations across multiple analytics functions.
    text_str = df['text'].astype(str)
    text_lower = text_str.str.lower()

    df = calculate_latency(df)
    df = apply_sentiment(df, hf_url=hf_url, text_str=text_str)
    
    # Phase 6: Extract enhanced features BEFORE privacy firewall
    emoji_freq = calculate_emoji_frequency(df, text_str=text_str)
    initiator_ratio = calculate_initiator_ratio(df)
    
    # Phase 8 (V3.0): Power Dynamics & Burnout NLP
    power_dynamics = calculate_power_dynamics(df, text_str=text_str)
    affection_friction = calculate_affection_friction(df, text_lower=text_lower)
    
    # Phase 11 (V4.0): Advanced Personalization
    support_gap = calculate_support_gap(df, text_lower=text_lower, text_str=text_str)
    reengagement = calculate_reengagement(df)
    mirroring = calculate_linguistic_mirroring(df, text_lower=text_lower)
    topic_mix = calculate_topic_mix(df, connection_type, text_lower=text_lower)
    
    # Privacy handling: text is needed for flashbacks in app.py, so we don't drop it here anymore.
    # The app.py will handle the session storage and eventual purging.
        
    weekly_df = aggregate_weekly(df)
    weekly_df = calculate_risk_score(weekly_df)
    weekly_df = detect_risk_phases(weekly_df)
    
    # Format date for JSON
    weekly_df['week_start'] = weekly_df['week_start'].dt.strftime('%Y-%m-%d')
    
    return {
        'weekly': weekly_df.to_dict(orient='records'),
        'emoji_freq': emoji_freq,
        'initiator_ratio': initiator_ratio,
        'power_dynamics': power_dynamics,
        'affection_friction': affection_friction,
        'support_gap': support_gap,
        'reengagement': reengagement,
        'mirroring': mirroring,
        'topic_mix': topic_mix
    }