from collections import Counter import re import numpy as np import pandas as pd import torch from scipy import stats as scipy_stats from sentence_transformers import CrossEncoder from transformers import pipeline EPS = 1e-9 RAW_IMPACT_RULES = ( ("earnings", 0.95, (r"\bbeats?\b", r"tops? estimates", r"raises? guidance", r"strong guidance", r"record (sales|revenue|profit)", r"profit jumps?", r"revenue jumps?")), ("earnings", -0.95, (r"\bmiss(es|ed)?\b", r"below estimates", r"cuts? guidance", r"lowers? outlook", r"warns? on", r"profit falls?", r"revenue falls?")), ("analyst", 0.8, (r"\bupgrade[sd]?\b", r"outperform", r"overweight", r"\bbuy rating\b", r"price target raised")), ("analyst", -0.8, (r"\bdowngrade[sd]?\b", r"underperform", r"underweight", r"\bsell rating\b", r"price target cut")), ("growth", 0.6, (r"wins? (major )?(deal|contract|order)", r"partnership", r"approval", r"launches?", r"expands?", r"new product")), ("capital", 0.65, (r"buyback", r"repurchase", r"dividend hike", r"raises? dividend")), ("capital", -0.85, (r"share offering", r"secondary offering", r"dilution", r"defaults?", r"liquidity concerns?", r"cash burn", r"bankruptcy")), ("operations", 0.45, (r"margin expansion", r"cost cuts?", r"efficiency", r"turnaround", r"productivity")), ("operations", -0.55, (r"layoffs?", r"recall", r"outage", r"delay", r"strike", r"plant closure")), ("legal", -0.75, (r"lawsuit", r"probe", r"investigation", r"\bsec\b", r"antitrust", r"fraud", r"charges?")), ("macro", 0.35, (r"rate cut", r"stimulus", r"easing inflation", r"soft landing")), ("macro", -0.35, (r"tariff", r"inflation", r"rate hike", r"recession", r"geopolitical", r"trade war", r"headwind")), ("price_action", 0.45, (r"\brall(y|ies)\b", r"\bsurge(s|d)?\b", r"\bjump(s|ed)?\b", r"\bsoar(s|ed)?\b", r"\bgain(s|ed)?\b", r"breakout", r"all-time high")), ("price_action", -0.45, (r"\bplunge(s|d)?\b", r"\bslump(s|ed)?\b", r"\btumble(s|d)?\b", r"\bdrop(s|ped)?\b", r"\bfalls?\b", r"breakdown", r"52-week low")), ) IMPACT_RULES = tuple( { "category": category, "weight": weight, "patterns": tuple(re.compile(pattern, re.IGNORECASE) for pattern in patterns), } for category, weight, patterns in RAW_IMPACT_RULES ) UNCERTAINTY_PATTERNS = tuple( re.compile(pattern, re.IGNORECASE) for pattern in ( r"\bmixed\b", r"\buncertain\b", r"\bvolatile\b", r"\bcautious\b", r"\bunclear\b", r"\bawaits?\b", r"\bwatch(es|ing)?\b", r"\brisk\b", ) ) CATEGORY_LABELS = { "earnings": "earnings and guidance", "analyst": "analyst rating changes", "growth": "growth and demand catalysts", "capital": "capital allocation and balance-sheet news", "operations": "operational execution", "legal": "legal or regulatory risk", "macro": "macro and rates sensitivity", "price_action": "price-action headlines", "broad_news": "broad headline flow", } class AnalyticsEngine: _timing_history = { "scrape_per_article": [], "finbert_per_batch": [], "roberta_per_batch": [], "ranker_per_batch": [], } def __init__(self, preload_models=False): self.device = 0 if torch.cuda.is_available() else -1 self.torch_device = "cuda" if torch.cuda.is_available() else "cpu" self.finbert = None self.distilroberta = None self.ranker = None if preload_models: self._ensure_models_loaded() def _ensure_models_loaded(self): if self.finbert is None: self.finbert = pipeline( "sentiment-analysis", model="ProsusAI/finbert", device=self.device, max_length=512, truncation=True, ) if self.distilroberta is None: self.distilroberta = pipeline( "sentiment-analysis", model="distilbert/distilbert-base-uncased-finetuned-sst-2-english", device=self.device, max_length=512, truncation=True, ) if self.ranker is None: self.ranker = CrossEncoder( "cross-encoder/ms-marco-MiniLM-L-6-v2", device=self.torch_device, ) def _map_finbert(self, label, score): mapping = {"positive": 1.0, "neutral": 0.0, "negative": -1.0} return mapping.get(str(label).lower(), 0.0) * float(score) def _map_distilroberta(self, label, score): label = str(label).upper() if label == "POSITIVE": return float(score) if label == "NEGATIVE": return -float(score) return 0.0 def _prepare_frame(self, df): frame = df.copy() if "timestamp_parsed" not in frame.columns: if "timestamp" in frame.columns: ts = pd.to_datetime(frame["timestamp"], errors="coerce", utc=True) elif "pub_date" in frame.columns: ts = pd.to_datetime(frame["pub_date"], errors="coerce", utc=True) else: ts = pd.Series(pd.NaT, index=frame.index, dtype="datetime64[ns, UTC]") frame["timestamp_parsed"] = ts if frame["timestamp_parsed"].notna().any(): frame = frame.sort_values("timestamp_parsed", kind="stable").reset_index(drop=True) else: frame = frame.reset_index(drop=True) return frame def _score_market_impact(self, title): headline = str(title or "") positive_scores = Counter() negative_scores = Counter() total_score = 0.0 total_strength = 0.0 for rule in IMPACT_RULES: hits = sum(1 for pattern in rule["patterns"] if pattern.search(headline)) if not hits: continue contribution = rule["weight"] * hits total_score += contribution total_strength += abs(contribution) if contribution >= 0: positive_scores[rule["category"]] += contribution else: negative_scores[rule["category"]] += abs(contribution) uncertainty_hits = sum(1 for pattern in UNCERTAINTY_PATTERNS if pattern.search(headline)) return { "impact_bias": float(np.tanh(total_score / 1.75)), "event_strength": float(min(total_strength / 2.5, 1.0)), "uncertainty_score": float(min(uncertainty_hits * 0.25, 1.0)), "bullish_impact": float(sum(positive_scores.values())), "bearish_impact": float(sum(negative_scores.values())), "primary_bullish_catalyst": max(positive_scores, key=positive_scores.get, default=""), "primary_bearish_catalyst": max(negative_scores, key=negative_scores.get, default=""), } def _add_text_signals(self, frame): titles = frame["title"] if "title" in frame.columns else pd.Series("", index=frame.index) scored = [self._score_market_impact(title) for title in titles.fillna("")] for key in ( "impact_bias", "event_strength", "uncertainty_score", "bullish_impact", "bearish_impact", "primary_bullish_catalyst", "primary_bearish_catalyst", ): frame[key] = [item[key] for item in scored] return frame def _normalize_significance(self, values): scores = np.asarray(values, dtype=float) if scores.size == 0: return scores scores = np.nan_to_num(scores, nan=np.nanmedian(scores) if np.isfinite(scores).any() else 0.0) spread = float(np.nanstd(scores)) if spread < EPS: return np.full(scores.shape, 0.75, dtype=float) centered = (scores - float(np.nanmedian(scores))) / (spread + EPS) normalized = 1.0 / (1.0 + np.exp(-centered)) return 0.55 + (0.45 * normalized) def _build_recency_weight(self, timestamps): ts = pd.to_datetime(timestamps, errors="coerce", utc=True) if ts.notna().any(): newest = ts.max() age_hours = (newest - ts).dt.total_seconds().div(3600.0).clip(lower=0.0) weights = 0.35 + (0.65 * np.exp(-(age_hours / 72.0))) return weights.fillna(float(weights.median()) if weights.notna().any() else 0.7) if len(ts) == 0: return pd.Series(dtype=float) if len(ts) == 1: return pd.Series([1.0], index=timestamps.index if hasattr(timestamps, "index") else None) ramp = np.linspace(0.6, 1.0, len(ts)) return pd.Series(ramp, index=timestamps.index if hasattr(timestamps, "index") else None) def _safe_weighted_average(self, values, weights, default=0.0): values = np.asarray(values, dtype=float) weights = np.asarray(weights, dtype=float) total = float(np.sum(weights)) if total <= EPS: return float(default) return float(np.average(values, weights=weights)) def _effective_sample_size(self, weights): sample_weights = np.asarray(weights, dtype=float) total = float(np.sum(sample_weights)) square_total = float(np.sum(np.square(sample_weights))) if total <= EPS or square_total <= EPS: return 0.0 return float((total * total) / (square_total + EPS)) def _calibrate_direction_score( self, raw_edge, signal_magnitude, agreement_rate, significance_support, recency_support, event_support, uncertainty_load, conflict_load, effective_articles, headline_count, headline_concentration, ): evidence_support = float( np.clip((effective_articles - 1.0) / max(headline_count * 0.45, 1.0), 0.0, 1.0) ) freshness_support = float(np.clip((recency_support - 0.55) / 0.30, 0.0, 1.0)) diversification_support = float(np.clip((0.88 - headline_concentration) / 0.38, 0.0, 1.0)) event_support = float(np.clip(event_support, 0.0, 1.0)) major_singleton = ( (effective_articles < 1.2) and (event_support >= 0.72) and (significance_support >= 0.72) and (agreement_rate >= 0.85) and (recency_support >= 0.72) ) calibration_factor = float( np.clip( (0.25 * evidence_support) + (0.18 * freshness_support) + (0.18 * diversification_support) + (0.16 * event_support) + (0.12 * significance_support) + (0.11 * agreement_rate) - (0.16 * conflict_load) - (0.10 * uncertainty_load), 0.0, 1.0, ) ) calibrated_edge = float(raw_edge * (0.3 + (0.7 * calibration_factor))) if effective_articles < 1.2: calibrated_edge *= 0.92 if major_singleton else 0.4 elif effective_articles < 1.6: calibrated_edge *= 0.7 if headline_concentration > 0.82: calibrated_edge *= 0.96 if major_singleton else 0.62 elif headline_concentration > 0.68: calibrated_edge *= 0.82 if recency_support < 0.64: calibrated_edge *= max(0.35, recency_support / 0.64) if conflict_load > 0.45: calibrated_edge *= max(0.3, 1.0 - (0.95 * conflict_load)) if signal_magnitude < 0.2: calibrated_edge *= 0.55 direction_score = int(np.clip(round((calibrated_edge + 1.0) * 50.0), 0, 100)) decisive_setup = ( ( (abs(calibrated_edge) >= 0.16) or (major_singleton and abs(calibrated_edge) >= 0.1) ) and ( (effective_articles >= 1.45) or ( major_singleton and (abs(raw_edge) >= 0.35) and (signal_magnitude >= 0.4) ) ) and (major_singleton or (headline_concentration <= 0.86)) and (conflict_load <= 0.58) ) if conflict_load > 0.18 and abs(calibrated_edge) < 0.3: decisive_setup = False if decisive_setup: direction_call = self._direction_call(direction_score) else: direction_call = "MIXED" direction_score = int(np.clip(round(50.0 + (calibrated_edge * 20.0)), 35, 65)) return { "direction_call": direction_call, "direction_score": direction_score, "direction_edge": calibrated_edge, "evidence_support": evidence_support, "freshness_support": freshness_support, "diversification_support": diversification_support, "calibration_factor": calibration_factor, } def _augment_direction_features(self, df): frame = self._prepare_frame(df) if "ensemble_pol" not in frame.columns: frame["ensemble_pol"] = pd.to_numeric(frame.get("pol", 0.0), errors="coerce").fillna(0.0) else: frame["ensemble_pol"] = pd.to_numeric(frame["ensemble_pol"], errors="coerce").fillna(0.0) if "finbert_pol" not in frame.columns: frame["finbert_pol"] = frame["ensemble_pol"] if "roberta_pol" not in frame.columns: frame["roberta_pol"] = frame["ensemble_pol"] if "finbert_score" not in frame.columns: frame["finbert_score"] = pd.to_numeric(frame.get("score", 0.8), errors="coerce").fillna(0.8) if "roberta_score" not in frame.columns: frame["roberta_score"] = pd.to_numeric(frame.get("score", 0.8), errors="coerce").fillna(0.8) if "agreement" not in frame.columns: frame["agreement"] = ( np.sign(frame["finbert_pol"]).astype(float) == np.sign(frame["roberta_pol"]).astype(float) ).astype(float) else: frame["agreement"] = pd.to_numeric(frame["agreement"], errors="coerce").fillna(0.0) if "conviction" not in frame.columns: frame["conviction"] = ( np.sqrt(frame["finbert_score"] * frame["roberta_score"]) * frame["ensemble_pol"].abs() ) if "significance" not in frame.columns: frame["significance"] = 1.0 frame["significance"] = pd.to_numeric(frame["significance"], errors="coerce").fillna(1.0) frame = self._add_text_signals(frame) frame["recency_weight"] = self._build_recency_weight(frame["timestamp_parsed"]) frame["significance_weight"] = self._normalize_significance(frame["significance"]) confidence_weight = 0.55 + (0.45 * np.clip(frame["conviction"].to_numpy(float), 0.0, 1.0)) agreement_factor = np.where(frame["agreement"].to_numpy(float) >= 1.0, 1.0, 0.72) uncertainty_penalty = 1.0 - (0.35 * np.clip(frame["uncertainty_score"].to_numpy(float), 0.0, 1.0)) event_weight = 0.75 + (0.25 * frame["event_strength"].to_numpy(float)) mixed_catalyst = ( (frame["bullish_impact"].to_numpy(float) > 0.0) & (frame["bearish_impact"].to_numpy(float) > 0.0) ) mixed_penalty = np.where(mixed_catalyst, 0.55, 1.0) ensemble = frame["ensemble_pol"].to_numpy(float) impact = frame["impact_bias"].to_numpy(float) alignment = np.sign(ensemble) * np.sign(impact) alignment_boost = np.where(alignment > 0, 1.08, np.where(alignment < 0, 0.88, 1.0)) raw_direction = np.tanh(((ensemble * 0.68) + (impact * 0.32)) * alignment_boost * mixed_penalty) direction_weight = ( frame["recency_weight"].to_numpy(float) * frame["significance_weight"].to_numpy(float) * confidence_weight * agreement_factor * uncertainty_penalty * event_weight ) direction_weight = direction_weight * np.where(mixed_catalyst, 0.82, 1.0) frame["direction_signal"] = np.clip(raw_direction, -1.0, 1.0) frame["direction_weight"] = np.clip(direction_weight, 0.15, None) frame["direction_contribution"] = frame["direction_signal"] * frame["direction_weight"] frame["bullish_pressure_component"] = frame["direction_contribution"].clip(lower=0.0) frame["bearish_pressure_component"] = -frame["direction_contribution"].clip(upper=0.0) if len(frame) > 1: z_scores = scipy_stats.zscore(frame["ensemble_pol"].to_numpy(float), nan_policy="omit") if np.isscalar(z_scores): z_scores = np.zeros(len(frame), dtype=float) frame["z_score"] = np.nan_to_num(z_scores, nan=0.0, posinf=0.0, neginf=0.0) else: frame["z_score"] = 0.0 frame["momentum"] = frame["ensemble_pol"].rolling(window=max(6, len(frame) // 15), min_periods=1).mean() frame["label"] = frame.get("label", frame.get("finbert_label", "neutral")) frame["score"] = frame.get("score", frame.get("finbert_score", 0.0)) frame["pol"] = frame["ensemble_pol"] return frame def _theme_weights(self, frame, theme_col, value_col): weights = Counter() for theme, value in zip(frame[theme_col].fillna(""), frame[value_col].fillna(0.0)): if theme and float(value) > 0.0: weights[str(theme)] += float(value) return weights def _theme_label(self, theme): return CATEGORY_LABELS.get(theme or "broad_news", CATEGORY_LABELS["broad_news"]) def _direction_call(self, score): if score >= 55: return "UP" if score <= 45: return "DOWN" return "MIXED" def _state_title(self, direction_call, confidence, risk_balance, direction_edge): if direction_call == "UP": if confidence >= 75 and risk_balance < 0.35: return "Strong Bullish" if risk_balance >= 0.5: return "Bullish but Fragile" return "Constructive Bullish" if direction_call == "DOWN": if confidence >= 75 and risk_balance >= 0.55: return "High-Risk Bearish" if abs(direction_edge) < 0.2: return "Bearish but Choppy" return "Defensive Bearish" if risk_balance >= 0.5: return "Mixed / Fragile" return "Mixed / Rangebound" def _build_summary_narrative( self, direction_call, state_title, direction_score, direction_confidence, bullish_pressure, bearish_pressure, agreement_rate, top_positive_theme, top_negative_theme, momentum_delta, recency_support, uncertainty_load, ): tilt_word = { "UP": "an upside", "DOWN": "a downside", "MIXED": "a mixed", }[direction_call] state_summary = ( f"{state_title}. The system sees {tilt_word} bias with direction score " f"{direction_score}/100 and confidence {direction_confidence}/100." ) positive_theme_label = self._theme_label(top_positive_theme) negative_theme_label = self._theme_label(top_negative_theme) state_explanation = [ ( f"Weighted news pressure is {bullish_pressure:.0%} bullish versus " f"{bearish_pressure:.0%} bearish after recency and headline importance are applied." ), ( f"Model agreement is {agreement_rate:.0%}, and recent headlines contribute " f"about {recency_support:.0%} of the usable signal." ), ( f"The dominant upside theme is {positive_theme_label}, while the main risk theme is " f"{negative_theme_label}." ), ] upside_drivers = [] downside_risks = [] if top_positive_theme: upside_drivers.append( f"{positive_theme_label.capitalize()} are the strongest bullish driver in the current flow." ) else: upside_drivers.append( "There are bullish headlines, but they are not concentrated into one clean catalyst bucket yet." ) if momentum_delta > 0.05: upside_drivers.append("Sentiment momentum is improving in the newest headlines.") elif momentum_delta < -0.05: upside_drivers.append("A few bullish headlines exist, but the newest flow is weaker than the earlier flow.") else: upside_drivers.append("The headline trend is stable rather than accelerating sharply.") if recency_support >= 0.75: upside_drivers.append("Support is relatively fresh, so the signal is not leaning on stale articles.") else: upside_drivers.append("Some of the support comes from older headlines, which lowers near-term punch.") if top_negative_theme: downside_risks.append( f"{negative_theme_label.capitalize()} are the largest downside risk in the current tape." ) else: downside_risks.append("There is no single dominant risk theme, but the flow is still not fully clean.") if bearish_pressure >= 0.4: downside_risks.append("Bearish pressure is still large enough to overpower weak bullish follow-through.") else: downside_risks.append("Bearish pressure is contained for now, but it would not take much to lift it.") if uncertainty_load >= 0.2: downside_risks.append("Headline uncertainty is elevated, so the direction call deserves caution.") elif agreement_rate < 0.6: downside_risks.append("The models disagree too often, which reduces directional reliability.") else: downside_risks.append("Execution risk is moderate because the models are broadly aligned on direction.") return state_summary, state_explanation, upside_drivers, downside_risks def analyze(self, df, ticker, progress_cb=None): self._ensure_models_loaded() frame = self._prepare_frame(df) titles = frame["title"].fillna("").tolist() total = len(titles) batch_size = 32 if total == 0: return frame if progress_cb: progress_cb(0.05, f"Model 1/2: FinBERT analyzing {total} headlines...") finbert_results = [] for i in range(0, total, batch_size): batch = titles[i : i + batch_size] finbert_results.extend(self.finbert(batch)) if progress_cb: progress_cb(0.05 + (i / total) * 0.25, f"FinBERT: {min(i + batch_size, total)}/{total}") frame["finbert_label"] = [r["label"] for r in finbert_results] frame["finbert_score"] = [r["score"] for r in finbert_results] frame["finbert_pol"] = frame.apply( lambda row: self._map_finbert(row["finbert_label"], row["finbert_score"]), axis=1, ) if progress_cb: progress_cb(0.35, f"Model 2/2: DistilRoBERTa analyzing {total} headlines...") roberta_results = [] for i in range(0, total, batch_size): batch = titles[i : i + batch_size] roberta_results.extend(self.distilroberta(batch)) if progress_cb: progress_cb(0.35 + (i / total) * 0.25, f"DistilRoBERTa: {min(i + batch_size, total)}/{total}") frame["roberta_label"] = [r["label"] for r in roberta_results] frame["roberta_score"] = [r["score"] for r in roberta_results] frame["roberta_pol"] = frame.apply( lambda row: self._map_distilroberta(row["roberta_label"], row["roberta_score"]), axis=1, ) if progress_cb: progress_cb(0.65, "Computing ensemble sentiment fusion...") frame["ensemble_pol"] = (frame["finbert_pol"] * 0.6) + (frame["roberta_pol"] * 0.4) frame["conviction"] = np.sqrt(frame["finbert_score"] * frame["roberta_score"]) * frame["ensemble_pol"].abs() frame["agreement"] = ( np.sign(frame["finbert_pol"]).astype(float) == np.sign(frame["roberta_pol"]).astype(float) ).astype(float) if progress_cb: progress_cb(0.75, "Ranking headline significance...") query = f"Major market moving news for {ticker} stock" pairs = [[query, title] for title in titles] frame["significance"] = self.ranker.predict(pairs, batch_size=batch_size) if progress_cb: progress_cb(0.87, "Scoring likely market direction...") frame = self._augment_direction_features(frame) if progress_cb: progress_cb(1.0, "Analysis complete.") return frame def get_summary(self, df): frame = self._augment_direction_features(df) mean_pol = float(frame["ensemble_pol"].mean()) if not frame.empty else 0.0 directional = frame["direction_signal"] if "direction_signal" in frame.columns else pd.Series(dtype=float) pos_count = int((directional > 0.12).sum()) neg_count = int((directional < -0.12).sum()) total_directional = pos_count + neg_count dir_ratio = float((pos_count - neg_count) / total_directional) if total_directional else 0.0 conviction_total = float(frame["conviction"].sum()) if "conviction" in frame.columns else 0.0 if conviction_total > EPS: conv_weighted = float( np.average(frame["ensemble_pol"].to_numpy(float), weights=frame["conviction"].to_numpy(float)) ) else: conv_weighted = mean_pol agreed = frame[frame["agreement"] >= 1.0] if "agreement" in frame.columns else frame agreed_pol = float(agreed["ensemble_pol"].mean()) if not agreed.empty else mean_pol if len(frame) >= 6: recent = float(frame["direction_contribution"].tail(max(2, len(frame) // 3)).mean()) older = float(frame["direction_contribution"].head(max(2, len(frame) // 3)).mean()) momentum_delta = recent - older else: momentum_delta = float(frame["direction_contribution"].mean()) if not frame.empty else 0.0 up_pressure_raw = float(frame["bullish_pressure_component"].sum()) if not frame.empty else 0.0 down_pressure_raw = float(frame["bearish_pressure_component"].sum()) if not frame.empty else 0.0 pressure_total = up_pressure_raw + down_pressure_raw + EPS bullish_pressure = up_pressure_raw / pressure_total bearish_pressure = down_pressure_raw / pressure_total weighted_direction = self._safe_weighted_average( frame["direction_signal"].to_numpy(float) if not frame.empty else np.array([0.0]), frame["direction_weight"].to_numpy(float) if not frame.empty else np.array([1.0]), default=0.0, ) direction_edge = float( (0.7 * (bullish_pressure - bearish_pressure)) + (0.2 * weighted_direction) + (0.1 * np.tanh(momentum_delta * 2.5)) ) direction_score = int(np.clip(round((direction_edge + 1.0) * 50.0), 0, 100)) direction_call = self._direction_call(direction_score) agreement_rate = float(frame["agreement"].mean()) if not frame.empty else 0.0 directional_coverage = float((frame["direction_signal"].abs() > 0.12).mean()) if not frame.empty else 0.0 uncertainty_load = self._safe_weighted_average( frame["uncertainty_score"].to_numpy(float) if not frame.empty else np.array([0.0]), frame["direction_weight"].to_numpy(float) if not frame.empty else np.array([1.0]), default=0.0, ) recency_support = self._safe_weighted_average( frame["recency_weight"].to_numpy(float) if not frame.empty else np.array([0.65]), np.abs(frame["direction_contribution"].to_numpy(float)) + 0.05 if not frame.empty else np.array([1.0]), default=0.65, ) signal_magnitude = self._safe_weighted_average( np.abs(frame["direction_signal"].to_numpy(float)) if not frame.empty else np.array([0.0]), frame["direction_weight"].to_numpy(float) if not frame.empty else np.array([1.0]), default=0.0, ) significance_support = self._safe_weighted_average( frame["significance_weight"].to_numpy(float) if not frame.empty else np.array([0.75]), np.abs(frame["direction_contribution"].to_numpy(float)) + 0.05 if not frame.empty else np.array([1.0]), default=0.75, ) event_support = self._safe_weighted_average( frame["event_strength"].to_numpy(float) if not frame.empty else np.array([0.0]), np.abs(frame["direction_contribution"].to_numpy(float)) + 0.05 if not frame.empty else np.array([1.0]), default=0.0, ) evidence_weights = ( np.abs(frame["direction_contribution"].to_numpy(float)) if not frame.empty else np.array([0.0]) ) effective_articles = self._effective_sample_size(evidence_weights) headline_count = max(len(frame), 1) headline_concentration = float( np.max(evidence_weights) / (float(np.sum(evidence_weights)) + EPS) ) if evidence_weights.size else 1.0 pressure_skew = float(abs(bullish_pressure - bearish_pressure)) catalyst_mix = self._safe_weighted_average( ( (frame["bullish_impact"].to_numpy(float) > 0.0) & (frame["bearish_impact"].to_numpy(float) > 0.0) ).astype(float) if not frame.empty else np.array([0.0]), np.abs(frame["direction_contribution"].to_numpy(float)) + 0.05 if not frame.empty else np.array([1.0]), default=0.0, ) conflict_load = float( np.clip( ( ((1.0 - pressure_skew) * min(1.0, (up_pressure_raw + down_pressure_raw) / max(len(frame) * 0.55, 1.0))) + (0.55 * catalyst_mix) ), 0.0, 1.0, ) ) if ((up_pressure_raw > EPS and down_pressure_raw > EPS) or catalyst_mix > 0.0) else 0.0 direction_calibration = self._calibrate_direction_score( raw_edge=direction_edge, signal_magnitude=signal_magnitude, agreement_rate=agreement_rate, significance_support=significance_support, recency_support=recency_support, event_support=event_support, uncertainty_load=uncertainty_load, conflict_load=conflict_load, effective_articles=effective_articles, headline_count=headline_count, headline_concentration=headline_concentration, ) direction_edge = float(direction_calibration["direction_edge"]) direction_score = int(direction_calibration["direction_score"]) direction_call = str(direction_calibration["direction_call"]) direction_confidence = int( np.clip( round( 100.0 * ( (0.24 * abs(direction_edge)) + (0.15 * agreement_rate) + (0.12 * directional_coverage) + (0.13 * signal_magnitude) + (0.1 * significance_support) + (0.08 * recency_support) + (0.08 * event_support) + (0.08 * direction_calibration["evidence_support"]) + (0.06 * direction_calibration["diversification_support"]) - (0.12 * uncertainty_load) - (0.1 * conflict_load) ) ), 0, 100, ) ) if direction_call == "MIXED": direction_confidence = int(round(direction_confidence * 0.9)) tail_risk = self._safe_weighted_average( (frame["direction_signal"].to_numpy(float) < -0.55).astype(float) if not frame.empty else np.array([0.0]), frame["direction_weight"].to_numpy(float) if not frame.empty else np.array([1.0]), default=0.0, ) risk_balance = float( np.clip( (bearish_pressure * 0.55) + (tail_risk * 0.25) + (uncertainty_load * 0.2), 0.0, 1.0, ) ) composite = float( (mean_pol * 0.14) + (dir_ratio * 0.18) + (conv_weighted * 0.18) + (agreed_pol * 0.10) + (momentum_delta * 0.10) + (direction_edge * 0.30) ) vibe_balance = float( np.clip( (0.34 * direction_edge) + (0.18 * (bullish_pressure - bearish_pressure)) + (0.14 * conv_weighted) + (0.12 * mean_pol) + (0.1 * agreed_pol) + (0.12 * np.tanh(momentum_delta * 2.0)), -1.0, 1.0, ) ) vibe_amplifier = float( np.clip( 0.55 + (0.25 * (direction_confidence / 100.0)) + (0.2 * direction_calibration["calibration_factor"]), 0.45, 1.0, ) ) if direction_call == "MIXED": vibe_amplifier *= 0.82 vibe_tilt = float(np.sign(vibe_balance) * np.sqrt(abs(vibe_balance))) vibe = int(np.clip(round(5.0 + (4.4 * vibe_tilt * vibe_amplifier)), 1, 10)) positive_themes = self._theme_weights(frame, "primary_bullish_catalyst", "bullish_pressure_component") negative_themes = self._theme_weights(frame, "primary_bearish_catalyst", "bearish_pressure_component") top_positive_theme = max(positive_themes, key=positive_themes.get, default="") top_negative_theme = max(negative_themes, key=negative_themes.get, default="") dominant_theme = top_positive_theme or top_negative_theme or "broad_news" news_regime = f"{self._theme_label(dominant_theme)}-led" state_title = self._state_title(direction_call, direction_confidence, risk_balance, direction_edge) state_summary, state_explanation, bullish_drivers, bearish_risks = self._build_summary_narrative( direction_call=direction_call, state_title=state_title, direction_score=direction_score, direction_confidence=direction_confidence, bullish_pressure=bullish_pressure, bearish_pressure=bearish_pressure, agreement_rate=agreement_rate, top_positive_theme=top_positive_theme, top_negative_theme=top_negative_theme, momentum_delta=momentum_delta, recency_support=recency_support, uncertainty_load=uncertainty_load, ) frame["headline_priority"] = ( np.abs(frame["direction_contribution"]) * (0.45 + (0.55 * frame["significance_weight"])) * frame["recency_weight"] * (0.8 + (0.2 * frame["event_strength"])) ) heavy = frame.sort_values(by="headline_priority", ascending=False).head(8) heavy_hitters = [] for row in heavy.to_dict("records"): direction_label = self._direction_call( int(np.clip(round((float(row["direction_signal"]) + 1.0) * 50.0), 0, 100)) ) catalyst = self._theme_label( row.get("primary_bullish_catalyst") or row.get("primary_bearish_catalyst") or "broad_news" ) heavy_hitters.append( { **row, "direction_label": direction_label, "catalyst_label": catalyst, } ) avg_conviction = float(frame["conviction"].mean()) if not frame.empty else 0.0 quant_confidence = float( np.clip((avg_conviction * 0.45) + ((direction_confidence / 100.0) * 0.55), 0.0, 1.0) ) return { "avg_polarity": mean_pol, "vibe": vibe, "dir_ratio": dir_ratio, "conviction_weighted": conv_weighted, "agreement_rate": agreement_rate, "momentum_delta": float(momentum_delta), "composite_score": composite, "avg_conviction": avg_conviction, "tail_risk": float(tail_risk), "quant_confidence": quant_confidence, "direction_score": direction_score, "direction_call": direction_call, "direction_confidence": direction_confidence, "effective_articles": float(effective_articles), "headline_concentration": headline_concentration, "conflict_load": conflict_load, "event_support": float(event_support), "calibration_factor": float(direction_calibration["calibration_factor"]), "bullish_pressure": float(bullish_pressure), "bearish_pressure": float(bearish_pressure), "risk_balance": risk_balance, "recency_support": float(recency_support), "news_regime": news_regime, "state_title": state_title, "state_summary": state_summary, "state_explanation": state_explanation, "bullish_drivers": bullish_drivers, "bearish_risks": bearish_risks, "heavy_hitters": heavy_hitters, } def record_timing(self, phase, elapsed, units): if units > 0: per_unit = elapsed / units history = self._timing_history.get(phase, []) history.append(per_unit) self._timing_history[phase] = history[-10:] def _avg_timing(self, phase, default): history = self._timing_history.get(phase, []) if history: return sum(history) / len(history) return default def estimate_time(self, article_count): batches = max(1, (article_count + 31) // 32) scrape_time = article_count * self._avg_timing("scrape_per_article", 0.02) finbert_time = batches * self._avg_timing("finbert_per_batch", 0.8) roberta_time = batches * self._avg_timing("roberta_per_batch", 0.5) ranker_time = batches * self._avg_timing("ranker_per_batch", 1.2) overhead = 5 total = scrape_time + finbert_time + roberta_time + ranker_time + overhead return round(total * 1.15)