import logging import re from typing import Dict, Any from transformers import pipeline import torch logger = logging.getLogger(__name__) class SentimentPipeline: def __init__(self, model_name: str = "ProsusAI/finbert"): """ Initialize NLP pipeline for sentiment analysis of news/tweets. Using FinBERT as it's tuned for financial/market sentiment. """ self.model_name = model_name self.device = 0 if torch.cuda.is_available() else ( "mps" if torch.backends.mps.is_available() else -1 ) logger.info(f"Loading NLP Pipeline '{model_name}' on device '{self.device}'...") try: self.classifier = pipeline( "sentiment-analysis", model=self.model_name, device=self.device ) logger.info("NLP Pipeline loaded successfully.") except Exception as e: logger.error(f"Failed to load NLP model: {e}") self.classifier = None def preprocess_text(self, text: str) -> str: """Clean up social media artifacts.""" # Remove URLs text = re.sub(r'http\S+', '', text) # Remove mentions text = re.sub(r'@\w+', '', text) # Tidy whitespace text = ' '.join(text.split()) return text def analyze_sentiment(self, text: str) -> Dict[str, Any]: """ Analyze sentiment of a single text. Returns score from -1.0 (Negative) to +1.0 (Positive) and raw confidence. """ if not self.classifier: return {"score": 0.0, "confidence": 0.0, "label": "neutral"} clean_text = self.preprocess_text(text) if not clean_text: return {"score": 0.0, "confidence": 0.0, "label": "neutral"} # FinBERT labels: positive, negative, neutral try: result = self.classifier(clean_text)[0] label = result['label'].lower() confidence = result['score'] # Map to continuous score [-1, 1] if label == "positive": score = confidence elif label == "negative": score = -confidence else: score = 0.0 return { "score": score, "confidence": confidence, "label": label } except Exception as e: logger.error(f"Sentiment analysis failed: {e}") return {"score": 0.0, "confidence": 0.0, "label": "error"} def aggregate_stream_sentiment(self, text_stream: list[str]) -> float: """Calculate average sentiment from a batch of texts.""" if not text_stream: return 0.0 scores = [] for text in text_stream: res = self.analyze_sentiment(text) scores.append(res['score']) return sum(scores) / len(scores)