alpha-predict / src /processor.py
DevKX's picture
Upload 14 files
4cb21eb verified
raw
history blame
5.54 kB
import pandas as pd
import numpy as np
import torch
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
import joblib
class Processor:
def __init__(self, scaler_path="models/robust_scaler.pkl"):
print("⚙️ Initializing AlphaProcessor...")
self.device = 0 if torch.cuda.is_available() else -1
self.model_name = "ProsusAI/finbert"
# Load Scaler (Required for normalization before GRU)
try:
self.scaler = joblib.load(scaler_path)
print(f"✅ Scaler loaded from {scaler_path}")
except:
print("⚠️ Scaler not found. Ensure robust_scaler.pkl is in models/ folder.")
# Initialize FinBERT Pipeline
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(
self.model_name, use_safetensors=True
)
self.sentiment_pipe = pipeline(
"sentiment-analysis",
model=self.model,
tokenizer=self.tokenizer,
device=self.device
)
def process(self, df_market, df_news):
"""
Modified to return metadata and historical features for Streamlit display.
"""
# 1. Process Sentiment features from headlines
df_sent, df_news_scored = self._generate_sentiment_profile(df_news)
# 2. Merge with market data and engineer all 14 features
df_features = self._engineer_14_features(df_market, df_sent)
# 3. Extract metadata for the UI (latest day's values)
latest_metrics = {
"Sent_Mean": df_features['Sent_Mean'].iloc[-1],
"News_Volume": np.exp(df_features['News_Volume'].iloc[-1]) - 1, # Reverse log
"Panic_Interaction": df_features['Sent_x_VIX'].iloc[-1],
"RSI": df_features['RSI'].iloc[-1] * 100
}
# 4. Get the last 30 days and scale for the Model
final_window = df_features.tail(30).values
scaled_window = self.scaler.transform(final_window)
input_tensor = np.expand_dims(scaled_window, axis=0).astype('float32')
# FIX: We now return df_features so app.py can plot the historical 30-day sentiment
return input_tensor, latest_metrics, df_features, df_news_scored
def _generate_sentiment_profile(self, df_news):
print("🧠 Running FinBERT Batch Analysis...")
titles = df_news['Title'].astype(str).tolist()
results = self.sentiment_pipe(titles, batch_size=32, truncation=True)
scores = []
for res in results:
label, score = res['label'].lower(), res['score']
scores.append(score if label == 'positive' else -score if label == 'negative' else 0.0)
df_news['Score'] = scores # Add scores to the raw news df
# Ensure dates match format for grouping
df_news['Date'] = pd.to_datetime(df_news['Date']).dt.date
grouped = df_news.groupby('Date')['Score']
daily = pd.DataFrame({
'Sent_Mean': grouped.mean(),
'Sent_Intensity': grouped.apply(lambda x: x.abs().mean()),
'News_Volume': np.log1p(grouped.count()),
'Net_Bull': grouped.apply(lambda x: x.sum() / (len(x) + 1))
}).fillna(0.0)
# Convert index back to datetime for merging
daily.index = pd.to_datetime(daily.index)
return daily, df_news
def _engineer_14_features(self, df, df_sent):
data = df.copy()
# --- QUANT BRANCH (7 Features) ---
tp = (data['High'] + data['Low'] + data['Close']) / 3
vwap = (tp * data['Volume']).rolling(20).sum() / (data['Volume'].rolling(20).sum() + 1e-9)
data['VWAP_Dist'] = np.log(data['Close'] / vwap)
delta = data['Close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
data['RSI'] = (100 - (100 / (1 + (gain/(loss + 1e-9))))) / 100.0
ema_12, ema_26 = data['Close'].ewm(span=12).mean(), data['Close'].ewm(span=26).mean()
data['MACD_Hist'] = ((ema_12 - ema_26) - (ema_12 - ema_26).ewm(span=9).mean()) / data['Close']
data['VIX_Norm'] = data['VIX'] / 100.0
data['VIX_Change'] = data['VIX'].pct_change()
tr = pd.concat([data['High']-data['Low'], abs(data['High']-data['Close'].shift()),
abs(data['Low']-data['Close'].shift())], axis=1).max(axis=1)
data['ATR_Dist'] = np.tanh((data['Close'] - data['Close'].rolling(22).mean()) / (tr.rolling(14).mean() + 1e-9))
data['Realized_Vol'] = data['Close'].pct_change().rolling(10).std() * 10
# --- SENTIMENT BRANCH (7 Features) ---
# Ensure indices match for joining
data.index = pd.to_datetime(data.index)
data = data.join(df_sent, how='left').fillna(0.0)
data['Sent_Mean_Delta'] = data['Sent_Mean'].diff().fillna(0.0)
data['Sent_Mean_EMA'] = data['Sent_Mean'].ewm(span=3).mean()
data['Sent_x_VIX'] = data['Sent_Mean'] * data['VIX_Norm'] # Panic Interaction
feature_cols = [
'VWAP_Dist', 'RSI', 'MACD_Hist', 'VIX_Norm', 'VIX_Change', 'ATR_Dist', 'Realized_Vol',
'Sent_Mean', 'Sent_Intensity', 'News_Volume', 'Net_Bull', 'Sent_Mean_Delta', 'Sent_Mean_EMA', 'Sent_x_VIX'
]
return data[feature_cols].dropna()