finbert_anaylzer / engine /analytics.py
Jitendra12421's picture
Upload 12 files
16ae9d0 verified
from collections import Counter
import re
import numpy as np
import pandas as pd
import torch
from scipy import stats as scipy_stats
from sentence_transformers import CrossEncoder
from transformers import pipeline
EPS = 1e-9
RAW_IMPACT_RULES = (
("earnings", 0.95, (r"\bbeats?\b", r"tops? estimates", r"raises? guidance", r"strong guidance", r"record (sales|revenue|profit)", r"profit jumps?", r"revenue jumps?")),
("earnings", -0.95, (r"\bmiss(es|ed)?\b", r"below estimates", r"cuts? guidance", r"lowers? outlook", r"warns? on", r"profit falls?", r"revenue falls?")),
("analyst", 0.8, (r"\bupgrade[sd]?\b", r"outperform", r"overweight", r"\bbuy rating\b", r"price target raised")),
("analyst", -0.8, (r"\bdowngrade[sd]?\b", r"underperform", r"underweight", r"\bsell rating\b", r"price target cut")),
("growth", 0.6, (r"wins? (major )?(deal|contract|order)", r"partnership", r"approval", r"launches?", r"expands?", r"new product")),
("capital", 0.65, (r"buyback", r"repurchase", r"dividend hike", r"raises? dividend")),
("capital", -0.85, (r"share offering", r"secondary offering", r"dilution", r"defaults?", r"liquidity concerns?", r"cash burn", r"bankruptcy")),
("operations", 0.45, (r"margin expansion", r"cost cuts?", r"efficiency", r"turnaround", r"productivity")),
("operations", -0.55, (r"layoffs?", r"recall", r"outage", r"delay", r"strike", r"plant closure")),
("legal", -0.75, (r"lawsuit", r"probe", r"investigation", r"\bsec\b", r"antitrust", r"fraud", r"charges?")),
("macro", 0.35, (r"rate cut", r"stimulus", r"easing inflation", r"soft landing")),
("macro", -0.35, (r"tariff", r"inflation", r"rate hike", r"recession", r"geopolitical", r"trade war", r"headwind")),
("price_action", 0.45, (r"\brall(y|ies)\b", r"\bsurge(s|d)?\b", r"\bjump(s|ed)?\b", r"\bsoar(s|ed)?\b", r"\bgain(s|ed)?\b", r"breakout", r"all-time high")),
("price_action", -0.45, (r"\bplunge(s|d)?\b", r"\bslump(s|ed)?\b", r"\btumble(s|d)?\b", r"\bdrop(s|ped)?\b", r"\bfalls?\b", r"breakdown", r"52-week low")),
)
IMPACT_RULES = tuple(
{
"category": category,
"weight": weight,
"patterns": tuple(re.compile(pattern, re.IGNORECASE) for pattern in patterns),
}
for category, weight, patterns in RAW_IMPACT_RULES
)
UNCERTAINTY_PATTERNS = tuple(
re.compile(pattern, re.IGNORECASE)
for pattern in (
r"\bmixed\b",
r"\buncertain\b",
r"\bvolatile\b",
r"\bcautious\b",
r"\bunclear\b",
r"\bawaits?\b",
r"\bwatch(es|ing)?\b",
r"\brisk\b",
)
)
CATEGORY_LABELS = {
"earnings": "earnings and guidance",
"analyst": "analyst rating changes",
"growth": "growth and demand catalysts",
"capital": "capital allocation and balance-sheet news",
"operations": "operational execution",
"legal": "legal or regulatory risk",
"macro": "macro and rates sensitivity",
"price_action": "price-action headlines",
"broad_news": "broad headline flow",
}
class AnalyticsEngine:
_timing_history = {
"scrape_per_article": [],
"finbert_per_batch": [],
"roberta_per_batch": [],
"ranker_per_batch": [],
}
def __init__(self, preload_models=False):
self.device = 0 if torch.cuda.is_available() else -1
self.torch_device = "cuda" if torch.cuda.is_available() else "cpu"
self.finbert = None
self.distilroberta = None
self.ranker = None
if preload_models:
self._ensure_models_loaded()
def _ensure_models_loaded(self):
if self.finbert is None:
self.finbert = pipeline(
"sentiment-analysis",
model="ProsusAI/finbert",
device=self.device,
max_length=512,
truncation=True,
)
if self.distilroberta is None:
self.distilroberta = pipeline(
"sentiment-analysis",
model="distilbert/distilbert-base-uncased-finetuned-sst-2-english",
device=self.device,
max_length=512,
truncation=True,
)
if self.ranker is None:
self.ranker = CrossEncoder(
"cross-encoder/ms-marco-MiniLM-L-6-v2",
device=self.torch_device,
)
def _map_finbert(self, label, score):
mapping = {"positive": 1.0, "neutral": 0.0, "negative": -1.0}
return mapping.get(str(label).lower(), 0.0) * float(score)
def _map_distilroberta(self, label, score):
label = str(label).upper()
if label == "POSITIVE":
return float(score)
if label == "NEGATIVE":
return -float(score)
return 0.0
def _prepare_frame(self, df):
frame = df.copy()
if "timestamp_parsed" not in frame.columns:
if "timestamp" in frame.columns:
ts = pd.to_datetime(frame["timestamp"], errors="coerce", utc=True)
elif "pub_date" in frame.columns:
ts = pd.to_datetime(frame["pub_date"], errors="coerce", utc=True)
else:
ts = pd.Series(pd.NaT, index=frame.index, dtype="datetime64[ns, UTC]")
frame["timestamp_parsed"] = ts
if frame["timestamp_parsed"].notna().any():
frame = frame.sort_values("timestamp_parsed", kind="stable").reset_index(drop=True)
else:
frame = frame.reset_index(drop=True)
return frame
def _score_market_impact(self, title):
headline = str(title or "")
positive_scores = Counter()
negative_scores = Counter()
total_score = 0.0
total_strength = 0.0
for rule in IMPACT_RULES:
hits = sum(1 for pattern in rule["patterns"] if pattern.search(headline))
if not hits:
continue
contribution = rule["weight"] * hits
total_score += contribution
total_strength += abs(contribution)
if contribution >= 0:
positive_scores[rule["category"]] += contribution
else:
negative_scores[rule["category"]] += abs(contribution)
uncertainty_hits = sum(1 for pattern in UNCERTAINTY_PATTERNS if pattern.search(headline))
return {
"impact_bias": float(np.tanh(total_score / 1.75)),
"event_strength": float(min(total_strength / 2.5, 1.0)),
"uncertainty_score": float(min(uncertainty_hits * 0.25, 1.0)),
"bullish_impact": float(sum(positive_scores.values())),
"bearish_impact": float(sum(negative_scores.values())),
"primary_bullish_catalyst": max(positive_scores, key=positive_scores.get, default=""),
"primary_bearish_catalyst": max(negative_scores, key=negative_scores.get, default=""),
}
def _add_text_signals(self, frame):
titles = frame["title"] if "title" in frame.columns else pd.Series("", index=frame.index)
scored = [self._score_market_impact(title) for title in titles.fillna("")]
for key in (
"impact_bias",
"event_strength",
"uncertainty_score",
"bullish_impact",
"bearish_impact",
"primary_bullish_catalyst",
"primary_bearish_catalyst",
):
frame[key] = [item[key] for item in scored]
return frame
def _normalize_significance(self, values):
scores = np.asarray(values, dtype=float)
if scores.size == 0:
return scores
scores = np.nan_to_num(scores, nan=np.nanmedian(scores) if np.isfinite(scores).any() else 0.0)
spread = float(np.nanstd(scores))
if spread < EPS:
return np.full(scores.shape, 0.75, dtype=float)
centered = (scores - float(np.nanmedian(scores))) / (spread + EPS)
normalized = 1.0 / (1.0 + np.exp(-centered))
return 0.55 + (0.45 * normalized)
def _build_recency_weight(self, timestamps):
ts = pd.to_datetime(timestamps, errors="coerce", utc=True)
if ts.notna().any():
newest = ts.max()
age_hours = (newest - ts).dt.total_seconds().div(3600.0).clip(lower=0.0)
weights = 0.35 + (0.65 * np.exp(-(age_hours / 72.0)))
return weights.fillna(float(weights.median()) if weights.notna().any() else 0.7)
if len(ts) == 0:
return pd.Series(dtype=float)
if len(ts) == 1:
return pd.Series([1.0], index=timestamps.index if hasattr(timestamps, "index") else None)
ramp = np.linspace(0.6, 1.0, len(ts))
return pd.Series(ramp, index=timestamps.index if hasattr(timestamps, "index") else None)
def _safe_weighted_average(self, values, weights, default=0.0):
values = np.asarray(values, dtype=float)
weights = np.asarray(weights, dtype=float)
total = float(np.sum(weights))
if total <= EPS:
return float(default)
return float(np.average(values, weights=weights))
def _effective_sample_size(self, weights):
sample_weights = np.asarray(weights, dtype=float)
total = float(np.sum(sample_weights))
square_total = float(np.sum(np.square(sample_weights)))
if total <= EPS or square_total <= EPS:
return 0.0
return float((total * total) / (square_total + EPS))
def _calibrate_direction_score(
self,
raw_edge,
signal_magnitude,
agreement_rate,
significance_support,
recency_support,
event_support,
uncertainty_load,
conflict_load,
effective_articles,
headline_count,
headline_concentration,
):
evidence_support = float(
np.clip((effective_articles - 1.0) / max(headline_count * 0.45, 1.0), 0.0, 1.0)
)
freshness_support = float(np.clip((recency_support - 0.55) / 0.30, 0.0, 1.0))
diversification_support = float(np.clip((0.88 - headline_concentration) / 0.38, 0.0, 1.0))
event_support = float(np.clip(event_support, 0.0, 1.0))
major_singleton = (
(effective_articles < 1.2)
and (event_support >= 0.72)
and (significance_support >= 0.72)
and (agreement_rate >= 0.85)
and (recency_support >= 0.72)
)
calibration_factor = float(
np.clip(
(0.25 * evidence_support)
+ (0.18 * freshness_support)
+ (0.18 * diversification_support)
+ (0.16 * event_support)
+ (0.12 * significance_support)
+ (0.11 * agreement_rate)
- (0.16 * conflict_load)
- (0.10 * uncertainty_load),
0.0,
1.0,
)
)
calibrated_edge = float(raw_edge * (0.3 + (0.7 * calibration_factor)))
if effective_articles < 1.2:
calibrated_edge *= 0.92 if major_singleton else 0.4
elif effective_articles < 1.6:
calibrated_edge *= 0.7
if headline_concentration > 0.82:
calibrated_edge *= 0.96 if major_singleton else 0.62
elif headline_concentration > 0.68:
calibrated_edge *= 0.82
if recency_support < 0.64:
calibrated_edge *= max(0.35, recency_support / 0.64)
if conflict_load > 0.45:
calibrated_edge *= max(0.3, 1.0 - (0.95 * conflict_load))
if signal_magnitude < 0.2:
calibrated_edge *= 0.55
direction_score = int(np.clip(round((calibrated_edge + 1.0) * 50.0), 0, 100))
decisive_setup = (
(
(abs(calibrated_edge) >= 0.16)
or (major_singleton and abs(calibrated_edge) >= 0.1)
)
and (
(effective_articles >= 1.45)
or (
major_singleton
and (abs(raw_edge) >= 0.35)
and (signal_magnitude >= 0.4)
)
)
and (major_singleton or (headline_concentration <= 0.86))
and (conflict_load <= 0.58)
)
if conflict_load > 0.18 and abs(calibrated_edge) < 0.3:
decisive_setup = False
if decisive_setup:
direction_call = self._direction_call(direction_score)
else:
direction_call = "MIXED"
direction_score = int(np.clip(round(50.0 + (calibrated_edge * 20.0)), 35, 65))
return {
"direction_call": direction_call,
"direction_score": direction_score,
"direction_edge": calibrated_edge,
"evidence_support": evidence_support,
"freshness_support": freshness_support,
"diversification_support": diversification_support,
"calibration_factor": calibration_factor,
}
def _augment_direction_features(self, df):
frame = self._prepare_frame(df)
if "ensemble_pol" not in frame.columns:
frame["ensemble_pol"] = pd.to_numeric(frame.get("pol", 0.0), errors="coerce").fillna(0.0)
else:
frame["ensemble_pol"] = pd.to_numeric(frame["ensemble_pol"], errors="coerce").fillna(0.0)
if "finbert_pol" not in frame.columns:
frame["finbert_pol"] = frame["ensemble_pol"]
if "roberta_pol" not in frame.columns:
frame["roberta_pol"] = frame["ensemble_pol"]
if "finbert_score" not in frame.columns:
frame["finbert_score"] = pd.to_numeric(frame.get("score", 0.8), errors="coerce").fillna(0.8)
if "roberta_score" not in frame.columns:
frame["roberta_score"] = pd.to_numeric(frame.get("score", 0.8), errors="coerce").fillna(0.8)
if "agreement" not in frame.columns:
frame["agreement"] = (
np.sign(frame["finbert_pol"]).astype(float) == np.sign(frame["roberta_pol"]).astype(float)
).astype(float)
else:
frame["agreement"] = pd.to_numeric(frame["agreement"], errors="coerce").fillna(0.0)
if "conviction" not in frame.columns:
frame["conviction"] = (
np.sqrt(frame["finbert_score"] * frame["roberta_score"]) * frame["ensemble_pol"].abs()
)
if "significance" not in frame.columns:
frame["significance"] = 1.0
frame["significance"] = pd.to_numeric(frame["significance"], errors="coerce").fillna(1.0)
frame = self._add_text_signals(frame)
frame["recency_weight"] = self._build_recency_weight(frame["timestamp_parsed"])
frame["significance_weight"] = self._normalize_significance(frame["significance"])
confidence_weight = 0.55 + (0.45 * np.clip(frame["conviction"].to_numpy(float), 0.0, 1.0))
agreement_factor = np.where(frame["agreement"].to_numpy(float) >= 1.0, 1.0, 0.72)
uncertainty_penalty = 1.0 - (0.35 * np.clip(frame["uncertainty_score"].to_numpy(float), 0.0, 1.0))
event_weight = 0.75 + (0.25 * frame["event_strength"].to_numpy(float))
mixed_catalyst = (
(frame["bullish_impact"].to_numpy(float) > 0.0)
& (frame["bearish_impact"].to_numpy(float) > 0.0)
)
mixed_penalty = np.where(mixed_catalyst, 0.55, 1.0)
ensemble = frame["ensemble_pol"].to_numpy(float)
impact = frame["impact_bias"].to_numpy(float)
alignment = np.sign(ensemble) * np.sign(impact)
alignment_boost = np.where(alignment > 0, 1.08, np.where(alignment < 0, 0.88, 1.0))
raw_direction = np.tanh(((ensemble * 0.68) + (impact * 0.32)) * alignment_boost * mixed_penalty)
direction_weight = (
frame["recency_weight"].to_numpy(float)
* frame["significance_weight"].to_numpy(float)
* confidence_weight
* agreement_factor
* uncertainty_penalty
* event_weight
)
direction_weight = direction_weight * np.where(mixed_catalyst, 0.82, 1.0)
frame["direction_signal"] = np.clip(raw_direction, -1.0, 1.0)
frame["direction_weight"] = np.clip(direction_weight, 0.15, None)
frame["direction_contribution"] = frame["direction_signal"] * frame["direction_weight"]
frame["bullish_pressure_component"] = frame["direction_contribution"].clip(lower=0.0)
frame["bearish_pressure_component"] = -frame["direction_contribution"].clip(upper=0.0)
if len(frame) > 1:
z_scores = scipy_stats.zscore(frame["ensemble_pol"].to_numpy(float), nan_policy="omit")
if np.isscalar(z_scores):
z_scores = np.zeros(len(frame), dtype=float)
frame["z_score"] = np.nan_to_num(z_scores, nan=0.0, posinf=0.0, neginf=0.0)
else:
frame["z_score"] = 0.0
frame["momentum"] = frame["ensemble_pol"].rolling(window=max(6, len(frame) // 15), min_periods=1).mean()
frame["label"] = frame.get("label", frame.get("finbert_label", "neutral"))
frame["score"] = frame.get("score", frame.get("finbert_score", 0.0))
frame["pol"] = frame["ensemble_pol"]
return frame
def _theme_weights(self, frame, theme_col, value_col):
weights = Counter()
for theme, value in zip(frame[theme_col].fillna(""), frame[value_col].fillna(0.0)):
if theme and float(value) > 0.0:
weights[str(theme)] += float(value)
return weights
def _theme_label(self, theme):
return CATEGORY_LABELS.get(theme or "broad_news", CATEGORY_LABELS["broad_news"])
def _direction_call(self, score):
if score >= 55:
return "UP"
if score <= 45:
return "DOWN"
return "MIXED"
def _state_title(self, direction_call, confidence, risk_balance, direction_edge):
if direction_call == "UP":
if confidence >= 75 and risk_balance < 0.35:
return "Strong Bullish"
if risk_balance >= 0.5:
return "Bullish but Fragile"
return "Constructive Bullish"
if direction_call == "DOWN":
if confidence >= 75 and risk_balance >= 0.55:
return "High-Risk Bearish"
if abs(direction_edge) < 0.2:
return "Bearish but Choppy"
return "Defensive Bearish"
if risk_balance >= 0.5:
return "Mixed / Fragile"
return "Mixed / Rangebound"
def _build_summary_narrative(
self,
direction_call,
state_title,
direction_score,
direction_confidence,
bullish_pressure,
bearish_pressure,
agreement_rate,
top_positive_theme,
top_negative_theme,
momentum_delta,
recency_support,
uncertainty_load,
):
tilt_word = {
"UP": "an upside",
"DOWN": "a downside",
"MIXED": "a mixed",
}[direction_call]
state_summary = (
f"{state_title}. The system sees {tilt_word} bias with direction score "
f"{direction_score}/100 and confidence {direction_confidence}/100."
)
positive_theme_label = self._theme_label(top_positive_theme)
negative_theme_label = self._theme_label(top_negative_theme)
state_explanation = [
(
f"Weighted news pressure is {bullish_pressure:.0%} bullish versus "
f"{bearish_pressure:.0%} bearish after recency and headline importance are applied."
),
(
f"Model agreement is {agreement_rate:.0%}, and recent headlines contribute "
f"about {recency_support:.0%} of the usable signal."
),
(
f"The dominant upside theme is {positive_theme_label}, while the main risk theme is "
f"{negative_theme_label}."
),
]
upside_drivers = []
downside_risks = []
if top_positive_theme:
upside_drivers.append(
f"{positive_theme_label.capitalize()} are the strongest bullish driver in the current flow."
)
else:
upside_drivers.append(
"There are bullish headlines, but they are not concentrated into one clean catalyst bucket yet."
)
if momentum_delta > 0.05:
upside_drivers.append("Sentiment momentum is improving in the newest headlines.")
elif momentum_delta < -0.05:
upside_drivers.append("A few bullish headlines exist, but the newest flow is weaker than the earlier flow.")
else:
upside_drivers.append("The headline trend is stable rather than accelerating sharply.")
if recency_support >= 0.75:
upside_drivers.append("Support is relatively fresh, so the signal is not leaning on stale articles.")
else:
upside_drivers.append("Some of the support comes from older headlines, which lowers near-term punch.")
if top_negative_theme:
downside_risks.append(
f"{negative_theme_label.capitalize()} are the largest downside risk in the current tape."
)
else:
downside_risks.append("There is no single dominant risk theme, but the flow is still not fully clean.")
if bearish_pressure >= 0.4:
downside_risks.append("Bearish pressure is still large enough to overpower weak bullish follow-through.")
else:
downside_risks.append("Bearish pressure is contained for now, but it would not take much to lift it.")
if uncertainty_load >= 0.2:
downside_risks.append("Headline uncertainty is elevated, so the direction call deserves caution.")
elif agreement_rate < 0.6:
downside_risks.append("The models disagree too often, which reduces directional reliability.")
else:
downside_risks.append("Execution risk is moderate because the models are broadly aligned on direction.")
return state_summary, state_explanation, upside_drivers, downside_risks
def analyze(self, df, ticker, progress_cb=None):
self._ensure_models_loaded()
frame = self._prepare_frame(df)
titles = frame["title"].fillna("").tolist()
total = len(titles)
batch_size = 32
if total == 0:
return frame
if progress_cb:
progress_cb(0.05, f"Model 1/2: FinBERT analyzing {total} headlines...")
finbert_results = []
for i in range(0, total, batch_size):
batch = titles[i : i + batch_size]
finbert_results.extend(self.finbert(batch))
if progress_cb:
progress_cb(0.05 + (i / total) * 0.25, f"FinBERT: {min(i + batch_size, total)}/{total}")
frame["finbert_label"] = [r["label"] for r in finbert_results]
frame["finbert_score"] = [r["score"] for r in finbert_results]
frame["finbert_pol"] = frame.apply(
lambda row: self._map_finbert(row["finbert_label"], row["finbert_score"]),
axis=1,
)
if progress_cb:
progress_cb(0.35, f"Model 2/2: DistilRoBERTa analyzing {total} headlines...")
roberta_results = []
for i in range(0, total, batch_size):
batch = titles[i : i + batch_size]
roberta_results.extend(self.distilroberta(batch))
if progress_cb:
progress_cb(0.35 + (i / total) * 0.25, f"DistilRoBERTa: {min(i + batch_size, total)}/{total}")
frame["roberta_label"] = [r["label"] for r in roberta_results]
frame["roberta_score"] = [r["score"] for r in roberta_results]
frame["roberta_pol"] = frame.apply(
lambda row: self._map_distilroberta(row["roberta_label"], row["roberta_score"]),
axis=1,
)
if progress_cb:
progress_cb(0.65, "Computing ensemble sentiment fusion...")
frame["ensemble_pol"] = (frame["finbert_pol"] * 0.6) + (frame["roberta_pol"] * 0.4)
frame["conviction"] = np.sqrt(frame["finbert_score"] * frame["roberta_score"]) * frame["ensemble_pol"].abs()
frame["agreement"] = (
np.sign(frame["finbert_pol"]).astype(float) == np.sign(frame["roberta_pol"]).astype(float)
).astype(float)
if progress_cb:
progress_cb(0.75, "Ranking headline significance...")
query = f"Major market moving news for {ticker} stock"
pairs = [[query, title] for title in titles]
frame["significance"] = self.ranker.predict(pairs, batch_size=batch_size)
if progress_cb:
progress_cb(0.87, "Scoring likely market direction...")
frame = self._augment_direction_features(frame)
if progress_cb:
progress_cb(1.0, "Analysis complete.")
return frame
def get_summary(self, df):
frame = self._augment_direction_features(df)
mean_pol = float(frame["ensemble_pol"].mean()) if not frame.empty else 0.0
directional = frame["direction_signal"] if "direction_signal" in frame.columns else pd.Series(dtype=float)
pos_count = int((directional > 0.12).sum())
neg_count = int((directional < -0.12).sum())
total_directional = pos_count + neg_count
dir_ratio = float((pos_count - neg_count) / total_directional) if total_directional else 0.0
conviction_total = float(frame["conviction"].sum()) if "conviction" in frame.columns else 0.0
if conviction_total > EPS:
conv_weighted = float(
np.average(frame["ensemble_pol"].to_numpy(float), weights=frame["conviction"].to_numpy(float))
)
else:
conv_weighted = mean_pol
agreed = frame[frame["agreement"] >= 1.0] if "agreement" in frame.columns else frame
agreed_pol = float(agreed["ensemble_pol"].mean()) if not agreed.empty else mean_pol
if len(frame) >= 6:
recent = float(frame["direction_contribution"].tail(max(2, len(frame) // 3)).mean())
older = float(frame["direction_contribution"].head(max(2, len(frame) // 3)).mean())
momentum_delta = recent - older
else:
momentum_delta = float(frame["direction_contribution"].mean()) if not frame.empty else 0.0
up_pressure_raw = float(frame["bullish_pressure_component"].sum()) if not frame.empty else 0.0
down_pressure_raw = float(frame["bearish_pressure_component"].sum()) if not frame.empty else 0.0
pressure_total = up_pressure_raw + down_pressure_raw + EPS
bullish_pressure = up_pressure_raw / pressure_total
bearish_pressure = down_pressure_raw / pressure_total
weighted_direction = self._safe_weighted_average(
frame["direction_signal"].to_numpy(float) if not frame.empty else np.array([0.0]),
frame["direction_weight"].to_numpy(float) if not frame.empty else np.array([1.0]),
default=0.0,
)
direction_edge = float(
(0.7 * (bullish_pressure - bearish_pressure))
+ (0.2 * weighted_direction)
+ (0.1 * np.tanh(momentum_delta * 2.5))
)
direction_score = int(np.clip(round((direction_edge + 1.0) * 50.0), 0, 100))
direction_call = self._direction_call(direction_score)
agreement_rate = float(frame["agreement"].mean()) if not frame.empty else 0.0
directional_coverage = float((frame["direction_signal"].abs() > 0.12).mean()) if not frame.empty else 0.0
uncertainty_load = self._safe_weighted_average(
frame["uncertainty_score"].to_numpy(float) if not frame.empty else np.array([0.0]),
frame["direction_weight"].to_numpy(float) if not frame.empty else np.array([1.0]),
default=0.0,
)
recency_support = self._safe_weighted_average(
frame["recency_weight"].to_numpy(float) if not frame.empty else np.array([0.65]),
np.abs(frame["direction_contribution"].to_numpy(float)) + 0.05 if not frame.empty else np.array([1.0]),
default=0.65,
)
signal_magnitude = self._safe_weighted_average(
np.abs(frame["direction_signal"].to_numpy(float)) if not frame.empty else np.array([0.0]),
frame["direction_weight"].to_numpy(float) if not frame.empty else np.array([1.0]),
default=0.0,
)
significance_support = self._safe_weighted_average(
frame["significance_weight"].to_numpy(float) if not frame.empty else np.array([0.75]),
np.abs(frame["direction_contribution"].to_numpy(float)) + 0.05 if not frame.empty else np.array([1.0]),
default=0.75,
)
event_support = self._safe_weighted_average(
frame["event_strength"].to_numpy(float) if not frame.empty else np.array([0.0]),
np.abs(frame["direction_contribution"].to_numpy(float)) + 0.05 if not frame.empty else np.array([1.0]),
default=0.0,
)
evidence_weights = (
np.abs(frame["direction_contribution"].to_numpy(float)) if not frame.empty else np.array([0.0])
)
effective_articles = self._effective_sample_size(evidence_weights)
headline_count = max(len(frame), 1)
headline_concentration = float(
np.max(evidence_weights) / (float(np.sum(evidence_weights)) + EPS)
) if evidence_weights.size else 1.0
pressure_skew = float(abs(bullish_pressure - bearish_pressure))
catalyst_mix = self._safe_weighted_average(
(
(frame["bullish_impact"].to_numpy(float) > 0.0)
& (frame["bearish_impact"].to_numpy(float) > 0.0)
).astype(float) if not frame.empty else np.array([0.0]),
np.abs(frame["direction_contribution"].to_numpy(float)) + 0.05 if not frame.empty else np.array([1.0]),
default=0.0,
)
conflict_load = float(
np.clip(
(
((1.0 - pressure_skew) * min(1.0, (up_pressure_raw + down_pressure_raw) / max(len(frame) * 0.55, 1.0)))
+ (0.55 * catalyst_mix)
),
0.0,
1.0,
)
) if ((up_pressure_raw > EPS and down_pressure_raw > EPS) or catalyst_mix > 0.0) else 0.0
direction_calibration = self._calibrate_direction_score(
raw_edge=direction_edge,
signal_magnitude=signal_magnitude,
agreement_rate=agreement_rate,
significance_support=significance_support,
recency_support=recency_support,
event_support=event_support,
uncertainty_load=uncertainty_load,
conflict_load=conflict_load,
effective_articles=effective_articles,
headline_count=headline_count,
headline_concentration=headline_concentration,
)
direction_edge = float(direction_calibration["direction_edge"])
direction_score = int(direction_calibration["direction_score"])
direction_call = str(direction_calibration["direction_call"])
direction_confidence = int(
np.clip(
round(
100.0
* (
(0.24 * abs(direction_edge))
+ (0.15 * agreement_rate)
+ (0.12 * directional_coverage)
+ (0.13 * signal_magnitude)
+ (0.1 * significance_support)
+ (0.08 * recency_support)
+ (0.08 * event_support)
+ (0.08 * direction_calibration["evidence_support"])
+ (0.06 * direction_calibration["diversification_support"])
- (0.12 * uncertainty_load)
- (0.1 * conflict_load)
)
),
0,
100,
)
)
if direction_call == "MIXED":
direction_confidence = int(round(direction_confidence * 0.9))
tail_risk = self._safe_weighted_average(
(frame["direction_signal"].to_numpy(float) < -0.55).astype(float) if not frame.empty else np.array([0.0]),
frame["direction_weight"].to_numpy(float) if not frame.empty else np.array([1.0]),
default=0.0,
)
risk_balance = float(
np.clip(
(bearish_pressure * 0.55) + (tail_risk * 0.25) + (uncertainty_load * 0.2),
0.0,
1.0,
)
)
composite = float(
(mean_pol * 0.14)
+ (dir_ratio * 0.18)
+ (conv_weighted * 0.18)
+ (agreed_pol * 0.10)
+ (momentum_delta * 0.10)
+ (direction_edge * 0.30)
)
vibe_balance = float(
np.clip(
(0.34 * direction_edge)
+ (0.18 * (bullish_pressure - bearish_pressure))
+ (0.14 * conv_weighted)
+ (0.12 * mean_pol)
+ (0.1 * agreed_pol)
+ (0.12 * np.tanh(momentum_delta * 2.0)),
-1.0,
1.0,
)
)
vibe_amplifier = float(
np.clip(
0.55
+ (0.25 * (direction_confidence / 100.0))
+ (0.2 * direction_calibration["calibration_factor"]),
0.45,
1.0,
)
)
if direction_call == "MIXED":
vibe_amplifier *= 0.82
vibe_tilt = float(np.sign(vibe_balance) * np.sqrt(abs(vibe_balance)))
vibe = int(np.clip(round(5.0 + (4.4 * vibe_tilt * vibe_amplifier)), 1, 10))
positive_themes = self._theme_weights(frame, "primary_bullish_catalyst", "bullish_pressure_component")
negative_themes = self._theme_weights(frame, "primary_bearish_catalyst", "bearish_pressure_component")
top_positive_theme = max(positive_themes, key=positive_themes.get, default="")
top_negative_theme = max(negative_themes, key=negative_themes.get, default="")
dominant_theme = top_positive_theme or top_negative_theme or "broad_news"
news_regime = f"{self._theme_label(dominant_theme)}-led"
state_title = self._state_title(direction_call, direction_confidence, risk_balance, direction_edge)
state_summary, state_explanation, bullish_drivers, bearish_risks = self._build_summary_narrative(
direction_call=direction_call,
state_title=state_title,
direction_score=direction_score,
direction_confidence=direction_confidence,
bullish_pressure=bullish_pressure,
bearish_pressure=bearish_pressure,
agreement_rate=agreement_rate,
top_positive_theme=top_positive_theme,
top_negative_theme=top_negative_theme,
momentum_delta=momentum_delta,
recency_support=recency_support,
uncertainty_load=uncertainty_load,
)
frame["headline_priority"] = (
np.abs(frame["direction_contribution"])
* (0.45 + (0.55 * frame["significance_weight"]))
* frame["recency_weight"]
* (0.8 + (0.2 * frame["event_strength"]))
)
heavy = frame.sort_values(by="headline_priority", ascending=False).head(8)
heavy_hitters = []
for row in heavy.to_dict("records"):
direction_label = self._direction_call(
int(np.clip(round((float(row["direction_signal"]) + 1.0) * 50.0), 0, 100))
)
catalyst = self._theme_label(
row.get("primary_bullish_catalyst") or row.get("primary_bearish_catalyst") or "broad_news"
)
heavy_hitters.append(
{
**row,
"direction_label": direction_label,
"catalyst_label": catalyst,
}
)
avg_conviction = float(frame["conviction"].mean()) if not frame.empty else 0.0
quant_confidence = float(
np.clip((avg_conviction * 0.45) + ((direction_confidence / 100.0) * 0.55), 0.0, 1.0)
)
return {
"avg_polarity": mean_pol,
"vibe": vibe,
"dir_ratio": dir_ratio,
"conviction_weighted": conv_weighted,
"agreement_rate": agreement_rate,
"momentum_delta": float(momentum_delta),
"composite_score": composite,
"avg_conviction": avg_conviction,
"tail_risk": float(tail_risk),
"quant_confidence": quant_confidence,
"direction_score": direction_score,
"direction_call": direction_call,
"direction_confidence": direction_confidence,
"effective_articles": float(effective_articles),
"headline_concentration": headline_concentration,
"conflict_load": conflict_load,
"event_support": float(event_support),
"calibration_factor": float(direction_calibration["calibration_factor"]),
"bullish_pressure": float(bullish_pressure),
"bearish_pressure": float(bearish_pressure),
"risk_balance": risk_balance,
"recency_support": float(recency_support),
"news_regime": news_regime,
"state_title": state_title,
"state_summary": state_summary,
"state_explanation": state_explanation,
"bullish_drivers": bullish_drivers,
"bearish_risks": bearish_risks,
"heavy_hitters": heavy_hitters,
}
def record_timing(self, phase, elapsed, units):
if units > 0:
per_unit = elapsed / units
history = self._timing_history.get(phase, [])
history.append(per_unit)
self._timing_history[phase] = history[-10:]
def _avg_timing(self, phase, default):
history = self._timing_history.get(phase, [])
if history:
return sum(history) / len(history)
return default
def estimate_time(self, article_count):
batches = max(1, (article_count + 31) // 32)
scrape_time = article_count * self._avg_timing("scrape_per_article", 0.02)
finbert_time = batches * self._avg_timing("finbert_per_batch", 0.8)
roberta_time = batches * self._avg_timing("roberta_per_batch", 0.5)
ranker_time = batches * self._avg_timing("ranker_per_batch", 1.2)
overhead = 5
total = scrape_time + finbert_time + roberta_time + ranker_time + overhead
return round(total * 1.15)