Spaces:
Sleeping
Sleeping
| from collections import Counter | |
| import re | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| from scipy import stats as scipy_stats | |
| from sentence_transformers import CrossEncoder | |
| from transformers import pipeline | |
| EPS = 1e-9 | |
| RAW_IMPACT_RULES = ( | |
| ("earnings", 0.95, (r"\bbeats?\b", r"tops? estimates", r"raises? guidance", r"strong guidance", r"record (sales|revenue|profit)", r"profit jumps?", r"revenue jumps?")), | |
| ("earnings", -0.95, (r"\bmiss(es|ed)?\b", r"below estimates", r"cuts? guidance", r"lowers? outlook", r"warns? on", r"profit falls?", r"revenue falls?")), | |
| ("analyst", 0.8, (r"\bupgrade[sd]?\b", r"outperform", r"overweight", r"\bbuy rating\b", r"price target raised")), | |
| ("analyst", -0.8, (r"\bdowngrade[sd]?\b", r"underperform", r"underweight", r"\bsell rating\b", r"price target cut")), | |
| ("growth", 0.6, (r"wins? (major )?(deal|contract|order)", r"partnership", r"approval", r"launches?", r"expands?", r"new product")), | |
| ("capital", 0.65, (r"buyback", r"repurchase", r"dividend hike", r"raises? dividend")), | |
| ("capital", -0.85, (r"share offering", r"secondary offering", r"dilution", r"defaults?", r"liquidity concerns?", r"cash burn", r"bankruptcy")), | |
| ("operations", 0.45, (r"margin expansion", r"cost cuts?", r"efficiency", r"turnaround", r"productivity")), | |
| ("operations", -0.55, (r"layoffs?", r"recall", r"outage", r"delay", r"strike", r"plant closure")), | |
| ("legal", -0.75, (r"lawsuit", r"probe", r"investigation", r"\bsec\b", r"antitrust", r"fraud", r"charges?")), | |
| ("macro", 0.35, (r"rate cut", r"stimulus", r"easing inflation", r"soft landing")), | |
| ("macro", -0.35, (r"tariff", r"inflation", r"rate hike", r"recession", r"geopolitical", r"trade war", r"headwind")), | |
| ("price_action", 0.45, (r"\brall(y|ies)\b", r"\bsurge(s|d)?\b", r"\bjump(s|ed)?\b", r"\bsoar(s|ed)?\b", r"\bgain(s|ed)?\b", r"breakout", r"all-time high")), | |
| ("price_action", -0.45, (r"\bplunge(s|d)?\b", r"\bslump(s|ed)?\b", r"\btumble(s|d)?\b", r"\bdrop(s|ped)?\b", r"\bfalls?\b", r"breakdown", r"52-week low")), | |
| ) | |
| IMPACT_RULES = tuple( | |
| { | |
| "category": category, | |
| "weight": weight, | |
| "patterns": tuple(re.compile(pattern, re.IGNORECASE) for pattern in patterns), | |
| } | |
| for category, weight, patterns in RAW_IMPACT_RULES | |
| ) | |
| UNCERTAINTY_PATTERNS = tuple( | |
| re.compile(pattern, re.IGNORECASE) | |
| for pattern in ( | |
| r"\bmixed\b", | |
| r"\buncertain\b", | |
| r"\bvolatile\b", | |
| r"\bcautious\b", | |
| r"\bunclear\b", | |
| r"\bawaits?\b", | |
| r"\bwatch(es|ing)?\b", | |
| r"\brisk\b", | |
| ) | |
| ) | |
| CATEGORY_LABELS = { | |
| "earnings": "earnings and guidance", | |
| "analyst": "analyst rating changes", | |
| "growth": "growth and demand catalysts", | |
| "capital": "capital allocation and balance-sheet news", | |
| "operations": "operational execution", | |
| "legal": "legal or regulatory risk", | |
| "macro": "macro and rates sensitivity", | |
| "price_action": "price-action headlines", | |
| "broad_news": "broad headline flow", | |
| } | |
| class AnalyticsEngine: | |
| _timing_history = { | |
| "scrape_per_article": [], | |
| "finbert_per_batch": [], | |
| "roberta_per_batch": [], | |
| "ranker_per_batch": [], | |
| } | |
| def __init__(self, preload_models=False): | |
| self.device = 0 if torch.cuda.is_available() else -1 | |
| self.torch_device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.finbert = None | |
| self.distilroberta = None | |
| self.ranker = None | |
| if preload_models: | |
| self._ensure_models_loaded() | |
| def _ensure_models_loaded(self): | |
| if self.finbert is None: | |
| self.finbert = pipeline( | |
| "sentiment-analysis", | |
| model="ProsusAI/finbert", | |
| device=self.device, | |
| max_length=512, | |
| truncation=True, | |
| ) | |
| if self.distilroberta is None: | |
| self.distilroberta = pipeline( | |
| "sentiment-analysis", | |
| model="distilbert/distilbert-base-uncased-finetuned-sst-2-english", | |
| device=self.device, | |
| max_length=512, | |
| truncation=True, | |
| ) | |
| if self.ranker is None: | |
| self.ranker = CrossEncoder( | |
| "cross-encoder/ms-marco-MiniLM-L-6-v2", | |
| device=self.torch_device, | |
| ) | |
| def _map_finbert(self, label, score): | |
| mapping = {"positive": 1.0, "neutral": 0.0, "negative": -1.0} | |
| return mapping.get(str(label).lower(), 0.0) * float(score) | |
| def _map_distilroberta(self, label, score): | |
| label = str(label).upper() | |
| if label == "POSITIVE": | |
| return float(score) | |
| if label == "NEGATIVE": | |
| return -float(score) | |
| return 0.0 | |
| def _prepare_frame(self, df): | |
| frame = df.copy() | |
| if "timestamp_parsed" not in frame.columns: | |
| if "timestamp" in frame.columns: | |
| ts = pd.to_datetime(frame["timestamp"], errors="coerce", utc=True) | |
| elif "pub_date" in frame.columns: | |
| ts = pd.to_datetime(frame["pub_date"], errors="coerce", utc=True) | |
| else: | |
| ts = pd.Series(pd.NaT, index=frame.index, dtype="datetime64[ns, UTC]") | |
| frame["timestamp_parsed"] = ts | |
| if frame["timestamp_parsed"].notna().any(): | |
| frame = frame.sort_values("timestamp_parsed", kind="stable").reset_index(drop=True) | |
| else: | |
| frame = frame.reset_index(drop=True) | |
| return frame | |
| def _score_market_impact(self, title): | |
| headline = str(title or "") | |
| positive_scores = Counter() | |
| negative_scores = Counter() | |
| total_score = 0.0 | |
| total_strength = 0.0 | |
| for rule in IMPACT_RULES: | |
| hits = sum(1 for pattern in rule["patterns"] if pattern.search(headline)) | |
| if not hits: | |
| continue | |
| contribution = rule["weight"] * hits | |
| total_score += contribution | |
| total_strength += abs(contribution) | |
| if contribution >= 0: | |
| positive_scores[rule["category"]] += contribution | |
| else: | |
| negative_scores[rule["category"]] += abs(contribution) | |
| uncertainty_hits = sum(1 for pattern in UNCERTAINTY_PATTERNS if pattern.search(headline)) | |
| return { | |
| "impact_bias": float(np.tanh(total_score / 1.75)), | |
| "event_strength": float(min(total_strength / 2.5, 1.0)), | |
| "uncertainty_score": float(min(uncertainty_hits * 0.25, 1.0)), | |
| "bullish_impact": float(sum(positive_scores.values())), | |
| "bearish_impact": float(sum(negative_scores.values())), | |
| "primary_bullish_catalyst": max(positive_scores, key=positive_scores.get, default=""), | |
| "primary_bearish_catalyst": max(negative_scores, key=negative_scores.get, default=""), | |
| } | |
| def _add_text_signals(self, frame): | |
| titles = frame["title"] if "title" in frame.columns else pd.Series("", index=frame.index) | |
| scored = [self._score_market_impact(title) for title in titles.fillna("")] | |
| for key in ( | |
| "impact_bias", | |
| "event_strength", | |
| "uncertainty_score", | |
| "bullish_impact", | |
| "bearish_impact", | |
| "primary_bullish_catalyst", | |
| "primary_bearish_catalyst", | |
| ): | |
| frame[key] = [item[key] for item in scored] | |
| return frame | |
| def _normalize_significance(self, values): | |
| scores = np.asarray(values, dtype=float) | |
| if scores.size == 0: | |
| return scores | |
| scores = np.nan_to_num(scores, nan=np.nanmedian(scores) if np.isfinite(scores).any() else 0.0) | |
| spread = float(np.nanstd(scores)) | |
| if spread < EPS: | |
| return np.full(scores.shape, 0.75, dtype=float) | |
| centered = (scores - float(np.nanmedian(scores))) / (spread + EPS) | |
| normalized = 1.0 / (1.0 + np.exp(-centered)) | |
| return 0.55 + (0.45 * normalized) | |
| def _build_recency_weight(self, timestamps): | |
| ts = pd.to_datetime(timestamps, errors="coerce", utc=True) | |
| if ts.notna().any(): | |
| newest = ts.max() | |
| age_hours = (newest - ts).dt.total_seconds().div(3600.0).clip(lower=0.0) | |
| weights = 0.35 + (0.65 * np.exp(-(age_hours / 72.0))) | |
| return weights.fillna(float(weights.median()) if weights.notna().any() else 0.7) | |
| if len(ts) == 0: | |
| return pd.Series(dtype=float) | |
| if len(ts) == 1: | |
| return pd.Series([1.0], index=timestamps.index if hasattr(timestamps, "index") else None) | |
| ramp = np.linspace(0.6, 1.0, len(ts)) | |
| return pd.Series(ramp, index=timestamps.index if hasattr(timestamps, "index") else None) | |
| def _safe_weighted_average(self, values, weights, default=0.0): | |
| values = np.asarray(values, dtype=float) | |
| weights = np.asarray(weights, dtype=float) | |
| total = float(np.sum(weights)) | |
| if total <= EPS: | |
| return float(default) | |
| return float(np.average(values, weights=weights)) | |
| def _effective_sample_size(self, weights): | |
| sample_weights = np.asarray(weights, dtype=float) | |
| total = float(np.sum(sample_weights)) | |
| square_total = float(np.sum(np.square(sample_weights))) | |
| if total <= EPS or square_total <= EPS: | |
| return 0.0 | |
| return float((total * total) / (square_total + EPS)) | |
| def _calibrate_direction_score( | |
| self, | |
| raw_edge, | |
| signal_magnitude, | |
| agreement_rate, | |
| significance_support, | |
| recency_support, | |
| event_support, | |
| uncertainty_load, | |
| conflict_load, | |
| effective_articles, | |
| headline_count, | |
| headline_concentration, | |
| ): | |
| evidence_support = float( | |
| np.clip((effective_articles - 1.0) / max(headline_count * 0.45, 1.0), 0.0, 1.0) | |
| ) | |
| freshness_support = float(np.clip((recency_support - 0.55) / 0.30, 0.0, 1.0)) | |
| diversification_support = float(np.clip((0.88 - headline_concentration) / 0.38, 0.0, 1.0)) | |
| event_support = float(np.clip(event_support, 0.0, 1.0)) | |
| major_singleton = ( | |
| (effective_articles < 1.2) | |
| and (event_support >= 0.72) | |
| and (significance_support >= 0.72) | |
| and (agreement_rate >= 0.85) | |
| and (recency_support >= 0.72) | |
| ) | |
| calibration_factor = float( | |
| np.clip( | |
| (0.25 * evidence_support) | |
| + (0.18 * freshness_support) | |
| + (0.18 * diversification_support) | |
| + (0.16 * event_support) | |
| + (0.12 * significance_support) | |
| + (0.11 * agreement_rate) | |
| - (0.16 * conflict_load) | |
| - (0.10 * uncertainty_load), | |
| 0.0, | |
| 1.0, | |
| ) | |
| ) | |
| calibrated_edge = float(raw_edge * (0.3 + (0.7 * calibration_factor))) | |
| if effective_articles < 1.2: | |
| calibrated_edge *= 0.92 if major_singleton else 0.4 | |
| elif effective_articles < 1.6: | |
| calibrated_edge *= 0.7 | |
| if headline_concentration > 0.82: | |
| calibrated_edge *= 0.96 if major_singleton else 0.62 | |
| elif headline_concentration > 0.68: | |
| calibrated_edge *= 0.82 | |
| if recency_support < 0.64: | |
| calibrated_edge *= max(0.35, recency_support / 0.64) | |
| if conflict_load > 0.45: | |
| calibrated_edge *= max(0.3, 1.0 - (0.95 * conflict_load)) | |
| if signal_magnitude < 0.2: | |
| calibrated_edge *= 0.55 | |
| direction_score = int(np.clip(round((calibrated_edge + 1.0) * 50.0), 0, 100)) | |
| decisive_setup = ( | |
| ( | |
| (abs(calibrated_edge) >= 0.16) | |
| or (major_singleton and abs(calibrated_edge) >= 0.1) | |
| ) | |
| and ( | |
| (effective_articles >= 1.45) | |
| or ( | |
| major_singleton | |
| and (abs(raw_edge) >= 0.35) | |
| and (signal_magnitude >= 0.4) | |
| ) | |
| ) | |
| and (major_singleton or (headline_concentration <= 0.86)) | |
| and (conflict_load <= 0.58) | |
| ) | |
| if conflict_load > 0.18 and abs(calibrated_edge) < 0.3: | |
| decisive_setup = False | |
| if decisive_setup: | |
| direction_call = self._direction_call(direction_score) | |
| else: | |
| direction_call = "MIXED" | |
| direction_score = int(np.clip(round(50.0 + (calibrated_edge * 20.0)), 35, 65)) | |
| return { | |
| "direction_call": direction_call, | |
| "direction_score": direction_score, | |
| "direction_edge": calibrated_edge, | |
| "evidence_support": evidence_support, | |
| "freshness_support": freshness_support, | |
| "diversification_support": diversification_support, | |
| "calibration_factor": calibration_factor, | |
| } | |
| def _augment_direction_features(self, df): | |
| frame = self._prepare_frame(df) | |
| if "ensemble_pol" not in frame.columns: | |
| frame["ensemble_pol"] = pd.to_numeric(frame.get("pol", 0.0), errors="coerce").fillna(0.0) | |
| else: | |
| frame["ensemble_pol"] = pd.to_numeric(frame["ensemble_pol"], errors="coerce").fillna(0.0) | |
| if "finbert_pol" not in frame.columns: | |
| frame["finbert_pol"] = frame["ensemble_pol"] | |
| if "roberta_pol" not in frame.columns: | |
| frame["roberta_pol"] = frame["ensemble_pol"] | |
| if "finbert_score" not in frame.columns: | |
| frame["finbert_score"] = pd.to_numeric(frame.get("score", 0.8), errors="coerce").fillna(0.8) | |
| if "roberta_score" not in frame.columns: | |
| frame["roberta_score"] = pd.to_numeric(frame.get("score", 0.8), errors="coerce").fillna(0.8) | |
| if "agreement" not in frame.columns: | |
| frame["agreement"] = ( | |
| np.sign(frame["finbert_pol"]).astype(float) == np.sign(frame["roberta_pol"]).astype(float) | |
| ).astype(float) | |
| else: | |
| frame["agreement"] = pd.to_numeric(frame["agreement"], errors="coerce").fillna(0.0) | |
| if "conviction" not in frame.columns: | |
| frame["conviction"] = ( | |
| np.sqrt(frame["finbert_score"] * frame["roberta_score"]) * frame["ensemble_pol"].abs() | |
| ) | |
| if "significance" not in frame.columns: | |
| frame["significance"] = 1.0 | |
| frame["significance"] = pd.to_numeric(frame["significance"], errors="coerce").fillna(1.0) | |
| frame = self._add_text_signals(frame) | |
| frame["recency_weight"] = self._build_recency_weight(frame["timestamp_parsed"]) | |
| frame["significance_weight"] = self._normalize_significance(frame["significance"]) | |
| confidence_weight = 0.55 + (0.45 * np.clip(frame["conviction"].to_numpy(float), 0.0, 1.0)) | |
| agreement_factor = np.where(frame["agreement"].to_numpy(float) >= 1.0, 1.0, 0.72) | |
| uncertainty_penalty = 1.0 - (0.35 * np.clip(frame["uncertainty_score"].to_numpy(float), 0.0, 1.0)) | |
| event_weight = 0.75 + (0.25 * frame["event_strength"].to_numpy(float)) | |
| mixed_catalyst = ( | |
| (frame["bullish_impact"].to_numpy(float) > 0.0) | |
| & (frame["bearish_impact"].to_numpy(float) > 0.0) | |
| ) | |
| mixed_penalty = np.where(mixed_catalyst, 0.55, 1.0) | |
| ensemble = frame["ensemble_pol"].to_numpy(float) | |
| impact = frame["impact_bias"].to_numpy(float) | |
| alignment = np.sign(ensemble) * np.sign(impact) | |
| alignment_boost = np.where(alignment > 0, 1.08, np.where(alignment < 0, 0.88, 1.0)) | |
| raw_direction = np.tanh(((ensemble * 0.68) + (impact * 0.32)) * alignment_boost * mixed_penalty) | |
| direction_weight = ( | |
| frame["recency_weight"].to_numpy(float) | |
| * frame["significance_weight"].to_numpy(float) | |
| * confidence_weight | |
| * agreement_factor | |
| * uncertainty_penalty | |
| * event_weight | |
| ) | |
| direction_weight = direction_weight * np.where(mixed_catalyst, 0.82, 1.0) | |
| frame["direction_signal"] = np.clip(raw_direction, -1.0, 1.0) | |
| frame["direction_weight"] = np.clip(direction_weight, 0.15, None) | |
| frame["direction_contribution"] = frame["direction_signal"] * frame["direction_weight"] | |
| frame["bullish_pressure_component"] = frame["direction_contribution"].clip(lower=0.0) | |
| frame["bearish_pressure_component"] = -frame["direction_contribution"].clip(upper=0.0) | |
| if len(frame) > 1: | |
| z_scores = scipy_stats.zscore(frame["ensemble_pol"].to_numpy(float), nan_policy="omit") | |
| if np.isscalar(z_scores): | |
| z_scores = np.zeros(len(frame), dtype=float) | |
| frame["z_score"] = np.nan_to_num(z_scores, nan=0.0, posinf=0.0, neginf=0.0) | |
| else: | |
| frame["z_score"] = 0.0 | |
| frame["momentum"] = frame["ensemble_pol"].rolling(window=max(6, len(frame) // 15), min_periods=1).mean() | |
| frame["label"] = frame.get("label", frame.get("finbert_label", "neutral")) | |
| frame["score"] = frame.get("score", frame.get("finbert_score", 0.0)) | |
| frame["pol"] = frame["ensemble_pol"] | |
| return frame | |
| def _theme_weights(self, frame, theme_col, value_col): | |
| weights = Counter() | |
| for theme, value in zip(frame[theme_col].fillna(""), frame[value_col].fillna(0.0)): | |
| if theme and float(value) > 0.0: | |
| weights[str(theme)] += float(value) | |
| return weights | |
| def _theme_label(self, theme): | |
| return CATEGORY_LABELS.get(theme or "broad_news", CATEGORY_LABELS["broad_news"]) | |
| def _direction_call(self, score): | |
| if score >= 55: | |
| return "UP" | |
| if score <= 45: | |
| return "DOWN" | |
| return "MIXED" | |
| def _state_title(self, direction_call, confidence, risk_balance, direction_edge): | |
| if direction_call == "UP": | |
| if confidence >= 75 and risk_balance < 0.35: | |
| return "Strong Bullish" | |
| if risk_balance >= 0.5: | |
| return "Bullish but Fragile" | |
| return "Constructive Bullish" | |
| if direction_call == "DOWN": | |
| if confidence >= 75 and risk_balance >= 0.55: | |
| return "High-Risk Bearish" | |
| if abs(direction_edge) < 0.2: | |
| return "Bearish but Choppy" | |
| return "Defensive Bearish" | |
| if risk_balance >= 0.5: | |
| return "Mixed / Fragile" | |
| return "Mixed / Rangebound" | |
| def _build_summary_narrative( | |
| self, | |
| direction_call, | |
| state_title, | |
| direction_score, | |
| direction_confidence, | |
| bullish_pressure, | |
| bearish_pressure, | |
| agreement_rate, | |
| top_positive_theme, | |
| top_negative_theme, | |
| momentum_delta, | |
| recency_support, | |
| uncertainty_load, | |
| ): | |
| tilt_word = { | |
| "UP": "an upside", | |
| "DOWN": "a downside", | |
| "MIXED": "a mixed", | |
| }[direction_call] | |
| state_summary = ( | |
| f"{state_title}. The system sees {tilt_word} bias with direction score " | |
| f"{direction_score}/100 and confidence {direction_confidence}/100." | |
| ) | |
| positive_theme_label = self._theme_label(top_positive_theme) | |
| negative_theme_label = self._theme_label(top_negative_theme) | |
| state_explanation = [ | |
| ( | |
| f"Weighted news pressure is {bullish_pressure:.0%} bullish versus " | |
| f"{bearish_pressure:.0%} bearish after recency and headline importance are applied." | |
| ), | |
| ( | |
| f"Model agreement is {agreement_rate:.0%}, and recent headlines contribute " | |
| f"about {recency_support:.0%} of the usable signal." | |
| ), | |
| ( | |
| f"The dominant upside theme is {positive_theme_label}, while the main risk theme is " | |
| f"{negative_theme_label}." | |
| ), | |
| ] | |
| upside_drivers = [] | |
| downside_risks = [] | |
| if top_positive_theme: | |
| upside_drivers.append( | |
| f"{positive_theme_label.capitalize()} are the strongest bullish driver in the current flow." | |
| ) | |
| else: | |
| upside_drivers.append( | |
| "There are bullish headlines, but they are not concentrated into one clean catalyst bucket yet." | |
| ) | |
| if momentum_delta > 0.05: | |
| upside_drivers.append("Sentiment momentum is improving in the newest headlines.") | |
| elif momentum_delta < -0.05: | |
| upside_drivers.append("A few bullish headlines exist, but the newest flow is weaker than the earlier flow.") | |
| else: | |
| upside_drivers.append("The headline trend is stable rather than accelerating sharply.") | |
| if recency_support >= 0.75: | |
| upside_drivers.append("Support is relatively fresh, so the signal is not leaning on stale articles.") | |
| else: | |
| upside_drivers.append("Some of the support comes from older headlines, which lowers near-term punch.") | |
| if top_negative_theme: | |
| downside_risks.append( | |
| f"{negative_theme_label.capitalize()} are the largest downside risk in the current tape." | |
| ) | |
| else: | |
| downside_risks.append("There is no single dominant risk theme, but the flow is still not fully clean.") | |
| if bearish_pressure >= 0.4: | |
| downside_risks.append("Bearish pressure is still large enough to overpower weak bullish follow-through.") | |
| else: | |
| downside_risks.append("Bearish pressure is contained for now, but it would not take much to lift it.") | |
| if uncertainty_load >= 0.2: | |
| downside_risks.append("Headline uncertainty is elevated, so the direction call deserves caution.") | |
| elif agreement_rate < 0.6: | |
| downside_risks.append("The models disagree too often, which reduces directional reliability.") | |
| else: | |
| downside_risks.append("Execution risk is moderate because the models are broadly aligned on direction.") | |
| return state_summary, state_explanation, upside_drivers, downside_risks | |
| def analyze(self, df, ticker, progress_cb=None): | |
| self._ensure_models_loaded() | |
| frame = self._prepare_frame(df) | |
| titles = frame["title"].fillna("").tolist() | |
| total = len(titles) | |
| batch_size = 32 | |
| if total == 0: | |
| return frame | |
| if progress_cb: | |
| progress_cb(0.05, f"Model 1/2: FinBERT analyzing {total} headlines...") | |
| finbert_results = [] | |
| for i in range(0, total, batch_size): | |
| batch = titles[i : i + batch_size] | |
| finbert_results.extend(self.finbert(batch)) | |
| if progress_cb: | |
| progress_cb(0.05 + (i / total) * 0.25, f"FinBERT: {min(i + batch_size, total)}/{total}") | |
| frame["finbert_label"] = [r["label"] for r in finbert_results] | |
| frame["finbert_score"] = [r["score"] for r in finbert_results] | |
| frame["finbert_pol"] = frame.apply( | |
| lambda row: self._map_finbert(row["finbert_label"], row["finbert_score"]), | |
| axis=1, | |
| ) | |
| if progress_cb: | |
| progress_cb(0.35, f"Model 2/2: DistilRoBERTa analyzing {total} headlines...") | |
| roberta_results = [] | |
| for i in range(0, total, batch_size): | |
| batch = titles[i : i + batch_size] | |
| roberta_results.extend(self.distilroberta(batch)) | |
| if progress_cb: | |
| progress_cb(0.35 + (i / total) * 0.25, f"DistilRoBERTa: {min(i + batch_size, total)}/{total}") | |
| frame["roberta_label"] = [r["label"] for r in roberta_results] | |
| frame["roberta_score"] = [r["score"] for r in roberta_results] | |
| frame["roberta_pol"] = frame.apply( | |
| lambda row: self._map_distilroberta(row["roberta_label"], row["roberta_score"]), | |
| axis=1, | |
| ) | |
| if progress_cb: | |
| progress_cb(0.65, "Computing ensemble sentiment fusion...") | |
| frame["ensemble_pol"] = (frame["finbert_pol"] * 0.6) + (frame["roberta_pol"] * 0.4) | |
| frame["conviction"] = np.sqrt(frame["finbert_score"] * frame["roberta_score"]) * frame["ensemble_pol"].abs() | |
| frame["agreement"] = ( | |
| np.sign(frame["finbert_pol"]).astype(float) == np.sign(frame["roberta_pol"]).astype(float) | |
| ).astype(float) | |
| if progress_cb: | |
| progress_cb(0.75, "Ranking headline significance...") | |
| query = f"Major market moving news for {ticker} stock" | |
| pairs = [[query, title] for title in titles] | |
| frame["significance"] = self.ranker.predict(pairs, batch_size=batch_size) | |
| if progress_cb: | |
| progress_cb(0.87, "Scoring likely market direction...") | |
| frame = self._augment_direction_features(frame) | |
| if progress_cb: | |
| progress_cb(1.0, "Analysis complete.") | |
| return frame | |
| def get_summary(self, df): | |
| frame = self._augment_direction_features(df) | |
| mean_pol = float(frame["ensemble_pol"].mean()) if not frame.empty else 0.0 | |
| directional = frame["direction_signal"] if "direction_signal" in frame.columns else pd.Series(dtype=float) | |
| pos_count = int((directional > 0.12).sum()) | |
| neg_count = int((directional < -0.12).sum()) | |
| total_directional = pos_count + neg_count | |
| dir_ratio = float((pos_count - neg_count) / total_directional) if total_directional else 0.0 | |
| conviction_total = float(frame["conviction"].sum()) if "conviction" in frame.columns else 0.0 | |
| if conviction_total > EPS: | |
| conv_weighted = float( | |
| np.average(frame["ensemble_pol"].to_numpy(float), weights=frame["conviction"].to_numpy(float)) | |
| ) | |
| else: | |
| conv_weighted = mean_pol | |
| agreed = frame[frame["agreement"] >= 1.0] if "agreement" in frame.columns else frame | |
| agreed_pol = float(agreed["ensemble_pol"].mean()) if not agreed.empty else mean_pol | |
| if len(frame) >= 6: | |
| recent = float(frame["direction_contribution"].tail(max(2, len(frame) // 3)).mean()) | |
| older = float(frame["direction_contribution"].head(max(2, len(frame) // 3)).mean()) | |
| momentum_delta = recent - older | |
| else: | |
| momentum_delta = float(frame["direction_contribution"].mean()) if not frame.empty else 0.0 | |
| up_pressure_raw = float(frame["bullish_pressure_component"].sum()) if not frame.empty else 0.0 | |
| down_pressure_raw = float(frame["bearish_pressure_component"].sum()) if not frame.empty else 0.0 | |
| pressure_total = up_pressure_raw + down_pressure_raw + EPS | |
| bullish_pressure = up_pressure_raw / pressure_total | |
| bearish_pressure = down_pressure_raw / pressure_total | |
| weighted_direction = self._safe_weighted_average( | |
| frame["direction_signal"].to_numpy(float) if not frame.empty else np.array([0.0]), | |
| frame["direction_weight"].to_numpy(float) if not frame.empty else np.array([1.0]), | |
| default=0.0, | |
| ) | |
| direction_edge = float( | |
| (0.7 * (bullish_pressure - bearish_pressure)) | |
| + (0.2 * weighted_direction) | |
| + (0.1 * np.tanh(momentum_delta * 2.5)) | |
| ) | |
| direction_score = int(np.clip(round((direction_edge + 1.0) * 50.0), 0, 100)) | |
| direction_call = self._direction_call(direction_score) | |
| agreement_rate = float(frame["agreement"].mean()) if not frame.empty else 0.0 | |
| directional_coverage = float((frame["direction_signal"].abs() > 0.12).mean()) if not frame.empty else 0.0 | |
| uncertainty_load = self._safe_weighted_average( | |
| frame["uncertainty_score"].to_numpy(float) if not frame.empty else np.array([0.0]), | |
| frame["direction_weight"].to_numpy(float) if not frame.empty else np.array([1.0]), | |
| default=0.0, | |
| ) | |
| recency_support = self._safe_weighted_average( | |
| frame["recency_weight"].to_numpy(float) if not frame.empty else np.array([0.65]), | |
| np.abs(frame["direction_contribution"].to_numpy(float)) + 0.05 if not frame.empty else np.array([1.0]), | |
| default=0.65, | |
| ) | |
| signal_magnitude = self._safe_weighted_average( | |
| np.abs(frame["direction_signal"].to_numpy(float)) if not frame.empty else np.array([0.0]), | |
| frame["direction_weight"].to_numpy(float) if not frame.empty else np.array([1.0]), | |
| default=0.0, | |
| ) | |
| significance_support = self._safe_weighted_average( | |
| frame["significance_weight"].to_numpy(float) if not frame.empty else np.array([0.75]), | |
| np.abs(frame["direction_contribution"].to_numpy(float)) + 0.05 if not frame.empty else np.array([1.0]), | |
| default=0.75, | |
| ) | |
| event_support = self._safe_weighted_average( | |
| frame["event_strength"].to_numpy(float) if not frame.empty else np.array([0.0]), | |
| np.abs(frame["direction_contribution"].to_numpy(float)) + 0.05 if not frame.empty else np.array([1.0]), | |
| default=0.0, | |
| ) | |
| evidence_weights = ( | |
| np.abs(frame["direction_contribution"].to_numpy(float)) if not frame.empty else np.array([0.0]) | |
| ) | |
| effective_articles = self._effective_sample_size(evidence_weights) | |
| headline_count = max(len(frame), 1) | |
| headline_concentration = float( | |
| np.max(evidence_weights) / (float(np.sum(evidence_weights)) + EPS) | |
| ) if evidence_weights.size else 1.0 | |
| pressure_skew = float(abs(bullish_pressure - bearish_pressure)) | |
| catalyst_mix = self._safe_weighted_average( | |
| ( | |
| (frame["bullish_impact"].to_numpy(float) > 0.0) | |
| & (frame["bearish_impact"].to_numpy(float) > 0.0) | |
| ).astype(float) if not frame.empty else np.array([0.0]), | |
| np.abs(frame["direction_contribution"].to_numpy(float)) + 0.05 if not frame.empty else np.array([1.0]), | |
| default=0.0, | |
| ) | |
| conflict_load = float( | |
| np.clip( | |
| ( | |
| ((1.0 - pressure_skew) * min(1.0, (up_pressure_raw + down_pressure_raw) / max(len(frame) * 0.55, 1.0))) | |
| + (0.55 * catalyst_mix) | |
| ), | |
| 0.0, | |
| 1.0, | |
| ) | |
| ) if ((up_pressure_raw > EPS and down_pressure_raw > EPS) or catalyst_mix > 0.0) else 0.0 | |
| direction_calibration = self._calibrate_direction_score( | |
| raw_edge=direction_edge, | |
| signal_magnitude=signal_magnitude, | |
| agreement_rate=agreement_rate, | |
| significance_support=significance_support, | |
| recency_support=recency_support, | |
| event_support=event_support, | |
| uncertainty_load=uncertainty_load, | |
| conflict_load=conflict_load, | |
| effective_articles=effective_articles, | |
| headline_count=headline_count, | |
| headline_concentration=headline_concentration, | |
| ) | |
| direction_edge = float(direction_calibration["direction_edge"]) | |
| direction_score = int(direction_calibration["direction_score"]) | |
| direction_call = str(direction_calibration["direction_call"]) | |
| direction_confidence = int( | |
| np.clip( | |
| round( | |
| 100.0 | |
| * ( | |
| (0.24 * abs(direction_edge)) | |
| + (0.15 * agreement_rate) | |
| + (0.12 * directional_coverage) | |
| + (0.13 * signal_magnitude) | |
| + (0.1 * significance_support) | |
| + (0.08 * recency_support) | |
| + (0.08 * event_support) | |
| + (0.08 * direction_calibration["evidence_support"]) | |
| + (0.06 * direction_calibration["diversification_support"]) | |
| - (0.12 * uncertainty_load) | |
| - (0.1 * conflict_load) | |
| ) | |
| ), | |
| 0, | |
| 100, | |
| ) | |
| ) | |
| if direction_call == "MIXED": | |
| direction_confidence = int(round(direction_confidence * 0.9)) | |
| tail_risk = self._safe_weighted_average( | |
| (frame["direction_signal"].to_numpy(float) < -0.55).astype(float) if not frame.empty else np.array([0.0]), | |
| frame["direction_weight"].to_numpy(float) if not frame.empty else np.array([1.0]), | |
| default=0.0, | |
| ) | |
| risk_balance = float( | |
| np.clip( | |
| (bearish_pressure * 0.55) + (tail_risk * 0.25) + (uncertainty_load * 0.2), | |
| 0.0, | |
| 1.0, | |
| ) | |
| ) | |
| composite = float( | |
| (mean_pol * 0.14) | |
| + (dir_ratio * 0.18) | |
| + (conv_weighted * 0.18) | |
| + (agreed_pol * 0.10) | |
| + (momentum_delta * 0.10) | |
| + (direction_edge * 0.30) | |
| ) | |
| vibe_balance = float( | |
| np.clip( | |
| (0.34 * direction_edge) | |
| + (0.18 * (bullish_pressure - bearish_pressure)) | |
| + (0.14 * conv_weighted) | |
| + (0.12 * mean_pol) | |
| + (0.1 * agreed_pol) | |
| + (0.12 * np.tanh(momentum_delta * 2.0)), | |
| -1.0, | |
| 1.0, | |
| ) | |
| ) | |
| vibe_amplifier = float( | |
| np.clip( | |
| 0.55 | |
| + (0.25 * (direction_confidence / 100.0)) | |
| + (0.2 * direction_calibration["calibration_factor"]), | |
| 0.45, | |
| 1.0, | |
| ) | |
| ) | |
| if direction_call == "MIXED": | |
| vibe_amplifier *= 0.82 | |
| vibe_tilt = float(np.sign(vibe_balance) * np.sqrt(abs(vibe_balance))) | |
| vibe = int(np.clip(round(5.0 + (4.4 * vibe_tilt * vibe_amplifier)), 1, 10)) | |
| positive_themes = self._theme_weights(frame, "primary_bullish_catalyst", "bullish_pressure_component") | |
| negative_themes = self._theme_weights(frame, "primary_bearish_catalyst", "bearish_pressure_component") | |
| top_positive_theme = max(positive_themes, key=positive_themes.get, default="") | |
| top_negative_theme = max(negative_themes, key=negative_themes.get, default="") | |
| dominant_theme = top_positive_theme or top_negative_theme or "broad_news" | |
| news_regime = f"{self._theme_label(dominant_theme)}-led" | |
| state_title = self._state_title(direction_call, direction_confidence, risk_balance, direction_edge) | |
| state_summary, state_explanation, bullish_drivers, bearish_risks = self._build_summary_narrative( | |
| direction_call=direction_call, | |
| state_title=state_title, | |
| direction_score=direction_score, | |
| direction_confidence=direction_confidence, | |
| bullish_pressure=bullish_pressure, | |
| bearish_pressure=bearish_pressure, | |
| agreement_rate=agreement_rate, | |
| top_positive_theme=top_positive_theme, | |
| top_negative_theme=top_negative_theme, | |
| momentum_delta=momentum_delta, | |
| recency_support=recency_support, | |
| uncertainty_load=uncertainty_load, | |
| ) | |
| frame["headline_priority"] = ( | |
| np.abs(frame["direction_contribution"]) | |
| * (0.45 + (0.55 * frame["significance_weight"])) | |
| * frame["recency_weight"] | |
| * (0.8 + (0.2 * frame["event_strength"])) | |
| ) | |
| heavy = frame.sort_values(by="headline_priority", ascending=False).head(8) | |
| heavy_hitters = [] | |
| for row in heavy.to_dict("records"): | |
| direction_label = self._direction_call( | |
| int(np.clip(round((float(row["direction_signal"]) + 1.0) * 50.0), 0, 100)) | |
| ) | |
| catalyst = self._theme_label( | |
| row.get("primary_bullish_catalyst") or row.get("primary_bearish_catalyst") or "broad_news" | |
| ) | |
| heavy_hitters.append( | |
| { | |
| **row, | |
| "direction_label": direction_label, | |
| "catalyst_label": catalyst, | |
| } | |
| ) | |
| avg_conviction = float(frame["conviction"].mean()) if not frame.empty else 0.0 | |
| quant_confidence = float( | |
| np.clip((avg_conviction * 0.45) + ((direction_confidence / 100.0) * 0.55), 0.0, 1.0) | |
| ) | |
| return { | |
| "avg_polarity": mean_pol, | |
| "vibe": vibe, | |
| "dir_ratio": dir_ratio, | |
| "conviction_weighted": conv_weighted, | |
| "agreement_rate": agreement_rate, | |
| "momentum_delta": float(momentum_delta), | |
| "composite_score": composite, | |
| "avg_conviction": avg_conviction, | |
| "tail_risk": float(tail_risk), | |
| "quant_confidence": quant_confidence, | |
| "direction_score": direction_score, | |
| "direction_call": direction_call, | |
| "direction_confidence": direction_confidence, | |
| "effective_articles": float(effective_articles), | |
| "headline_concentration": headline_concentration, | |
| "conflict_load": conflict_load, | |
| "event_support": float(event_support), | |
| "calibration_factor": float(direction_calibration["calibration_factor"]), | |
| "bullish_pressure": float(bullish_pressure), | |
| "bearish_pressure": float(bearish_pressure), | |
| "risk_balance": risk_balance, | |
| "recency_support": float(recency_support), | |
| "news_regime": news_regime, | |
| "state_title": state_title, | |
| "state_summary": state_summary, | |
| "state_explanation": state_explanation, | |
| "bullish_drivers": bullish_drivers, | |
| "bearish_risks": bearish_risks, | |
| "heavy_hitters": heavy_hitters, | |
| } | |
| def record_timing(self, phase, elapsed, units): | |
| if units > 0: | |
| per_unit = elapsed / units | |
| history = self._timing_history.get(phase, []) | |
| history.append(per_unit) | |
| self._timing_history[phase] = history[-10:] | |
| def _avg_timing(self, phase, default): | |
| history = self._timing_history.get(phase, []) | |
| if history: | |
| return sum(history) / len(history) | |
| return default | |
| def estimate_time(self, article_count): | |
| batches = max(1, (article_count + 31) // 32) | |
| scrape_time = article_count * self._avg_timing("scrape_per_article", 0.02) | |
| finbert_time = batches * self._avg_timing("finbert_per_batch", 0.8) | |
| roberta_time = batches * self._avg_timing("roberta_per_batch", 0.5) | |
| ranker_time = batches * self._avg_timing("ranker_per_batch", 1.2) | |
| overhead = 5 | |
| total = scrape_time + finbert_time + roberta_time + ranker_time + overhead | |
| return round(total * 1.15) | |