File size: 2,989 Bytes
77fd2f6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import logging
import re
from typing import Dict, Any
from transformers import pipeline
import torch

logger = logging.getLogger(__name__)

class SentimentPipeline:
    def __init__(self, model_name: str = "ProsusAI/finbert"):
        """
        Initialize NLP pipeline for sentiment analysis of news/tweets.
        Using FinBERT as it's tuned for financial/market sentiment.
        """
        self.model_name = model_name
        self.device = 0 if torch.cuda.is_available() else (
            "mps" if torch.backends.mps.is_available() else -1
        )
        logger.info(f"Loading NLP Pipeline '{model_name}' on device '{self.device}'...")
        
        try:
            self.classifier = pipeline(
                "sentiment-analysis", 
                model=self.model_name, 
                device=self.device
            )
            logger.info("NLP Pipeline loaded successfully.")
        except Exception as e:
            logger.error(f"Failed to load NLP model: {e}")
            self.classifier = None

    def preprocess_text(self, text: str) -> str:
        """Clean up social media artifacts."""
        # Remove URLs
        text = re.sub(r'http\S+', '', text)
        # Remove mentions
        text = re.sub(r'@\w+', '', text)
        # Tidy whitespace
        text = ' '.join(text.split())
        return text

    def analyze_sentiment(self, text: str) -> Dict[str, Any]:
        """
        Analyze sentiment of a single text.
        Returns score from -1.0 (Negative) to +1.0 (Positive) and raw confidence.
        """
        if not self.classifier:
            return {"score": 0.0, "confidence": 0.0, "label": "neutral"}
            
        clean_text = self.preprocess_text(text)
        if not clean_text:
             return {"score": 0.0, "confidence": 0.0, "label": "neutral"}
             
        # FinBERT labels: positive, negative, neutral
        try:
            result = self.classifier(clean_text)[0]
            label = result['label'].lower()
            confidence = result['score']
            
            # Map to continuous score [-1, 1]
            if label == "positive":
                score = confidence
            elif label == "negative":
                score = -confidence
            else:
                score = 0.0
                
            return {
                "score": score,
                "confidence": confidence,
                "label": label
            }
        except Exception as e:
            logger.error(f"Sentiment analysis failed: {e}")
            return {"score": 0.0, "confidence": 0.0, "label": "error"}

    def aggregate_stream_sentiment(self, text_stream: list[str]) -> float:
        """Calculate average sentiment from a batch of texts."""
        if not text_stream: return 0.0
        
        scores = []
        for text in text_stream:
            res = self.analyze_sentiment(text)
            scores.append(res['score'])
            
        return sum(scores) / len(scores)