| | import logging |
| | import re |
| | from typing import Dict, Any |
| | from transformers import pipeline |
| | import torch |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class SentimentPipeline: |
| | def __init__(self, model_name: str = "ProsusAI/finbert"): |
| | """ |
| | Initialize NLP pipeline for sentiment analysis of news/tweets. |
| | Using FinBERT as it's tuned for financial/market sentiment. |
| | """ |
| | self.model_name = model_name |
| | self.device = 0 if torch.cuda.is_available() else ( |
| | "mps" if torch.backends.mps.is_available() else -1 |
| | ) |
| | logger.info(f"Loading NLP Pipeline '{model_name}' on device '{self.device}'...") |
| | |
| | try: |
| | self.classifier = pipeline( |
| | "sentiment-analysis", |
| | model=self.model_name, |
| | device=self.device |
| | ) |
| | logger.info("NLP Pipeline loaded successfully.") |
| | except Exception as e: |
| | logger.error(f"Failed to load NLP model: {e}") |
| | self.classifier = None |
| |
|
| | def preprocess_text(self, text: str) -> str: |
| | """Clean up social media artifacts.""" |
| | |
| | text = re.sub(r'http\S+', '', text) |
| | |
| | text = re.sub(r'@\w+', '', text) |
| | |
| | text = ' '.join(text.split()) |
| | return text |
| |
|
| | def analyze_sentiment(self, text: str) -> Dict[str, Any]: |
| | """ |
| | Analyze sentiment of a single text. |
| | Returns score from -1.0 (Negative) to +1.0 (Positive) and raw confidence. |
| | """ |
| | if not self.classifier: |
| | return {"score": 0.0, "confidence": 0.0, "label": "neutral"} |
| | |
| | clean_text = self.preprocess_text(text) |
| | if not clean_text: |
| | return {"score": 0.0, "confidence": 0.0, "label": "neutral"} |
| | |
| | |
| | try: |
| | result = self.classifier(clean_text)[0] |
| | label = result['label'].lower() |
| | confidence = result['score'] |
| | |
| | |
| | if label == "positive": |
| | score = confidence |
| | elif label == "negative": |
| | score = -confidence |
| | else: |
| | score = 0.0 |
| | |
| | return { |
| | "score": score, |
| | "confidence": confidence, |
| | "label": label |
| | } |
| | except Exception as e: |
| | logger.error(f"Sentiment analysis failed: {e}") |
| | return {"score": 0.0, "confidence": 0.0, "label": "error"} |
| |
|
| | def aggregate_stream_sentiment(self, text_stream: list[str]) -> float: |
| | """Calculate average sentiment from a batch of texts.""" |
| | if not text_stream: return 0.0 |
| | |
| | scores = [] |
| | for text in text_stream: |
| | res = self.analyze_sentiment(text) |
| | scores.append(res['score']) |
| | |
| | return sum(scores) / len(scores) |
| |
|