|
|
"""Thread length and sentiment correlation analysis.""" |
|
|
|
|
|
import logging |
|
|
from typing import List, Dict, Optional, Tuple |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from scipy import stats |
|
|
from sklearn.linear_model import LinearRegression |
|
|
|
|
|
from analysis.sentiment_analyzer import SentimentAnalyzer |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class ThreadAnalyzer: |
|
|
""" |
|
|
Analyze correlation between thread length and sentiment. |
|
|
|
|
|
Thread length is the number of comments under a news article. |
|
|
Temperature is the probability that a comment is negative. |
|
|
""" |
|
|
|
|
|
def __init__(self, sentiment_analyzer: Optional[SentimentAnalyzer] = None): |
|
|
""" |
|
|
Initialize thread analyzer. |
|
|
|
|
|
Args: |
|
|
sentiment_analyzer: SentimentAnalyzer instance (creates new if None) |
|
|
""" |
|
|
if sentiment_analyzer is None: |
|
|
self.analyzer = SentimentAnalyzer() |
|
|
else: |
|
|
self.analyzer = sentiment_analyzer |
|
|
|
|
|
def calculate_thread_lengths( |
|
|
self, |
|
|
data: List[Dict], |
|
|
news_id_key: str = "news_id", |
|
|
comment_id_key: str = "id" |
|
|
) -> Dict[str, int]: |
|
|
""" |
|
|
Calculate thread length (number of comments) for each news item. |
|
|
|
|
|
Args: |
|
|
data: List of comment dictionaries |
|
|
news_id_key: Key for news ID in data dict |
|
|
comment_id_key: Key for comment ID in data dict |
|
|
|
|
|
Returns: |
|
|
Dictionary mapping news_id to thread length |
|
|
|
|
|
Example: |
|
|
>>> analyzer = ThreadAnalyzer() |
|
|
>>> data = [ |
|
|
... {"news_id": 1, "id": 1}, |
|
|
... {"news_id": 1, "id": 2}, |
|
|
... {"news_id": 2, "id": 3}, |
|
|
... ] |
|
|
>>> lengths = analyzer.calculate_thread_lengths(data) |
|
|
>>> lengths[1] |
|
|
2 |
|
|
""" |
|
|
thread_lengths = {} |
|
|
|
|
|
for item in data: |
|
|
news_id = item.get(news_id_key) |
|
|
if news_id: |
|
|
thread_lengths[news_id] = thread_lengths.get(news_id, 0) + 1 |
|
|
|
|
|
return thread_lengths |
|
|
|
|
|
def calculate_temperature( |
|
|
self, |
|
|
data: List[Dict], |
|
|
news_id_key: str = "news_id", |
|
|
text_key: str = "text" |
|
|
) -> Dict[str, float]: |
|
|
""" |
|
|
Calculate temperature (negative sentiment probability) for each news item. |
|
|
|
|
|
Temperature is the probability that a comment is negative. |
|
|
|
|
|
Args: |
|
|
data: List of comment dictionaries |
|
|
news_id_key: Key for news ID in data dict |
|
|
text_key: Key for text in data dict |
|
|
|
|
|
Returns: |
|
|
Dictionary mapping news_id to average temperature |
|
|
""" |
|
|
|
|
|
news_comments = {} |
|
|
for item in data: |
|
|
news_id = item.get(news_id_key) |
|
|
text = item.get(text_key) |
|
|
if news_id and text: |
|
|
if news_id not in news_comments: |
|
|
news_comments[news_id] = [] |
|
|
news_comments[news_id].append(text) |
|
|
|
|
|
|
|
|
temperatures = {} |
|
|
|
|
|
for news_id, texts in news_comments.items(): |
|
|
|
|
|
results = self.analyzer.analyze_batch(texts) |
|
|
|
|
|
|
|
|
negative_scores = [] |
|
|
for result in results: |
|
|
label = result["label"] |
|
|
score = result["score"] |
|
|
|
|
|
if label == "NEGATIVE": |
|
|
|
|
|
negative_scores.append(score) |
|
|
elif label == "POSITIVE": |
|
|
|
|
|
negative_scores.append(1.0 - score) |
|
|
else: |
|
|
|
|
|
negative_scores.append(0.5) |
|
|
|
|
|
avg_temperature = np.mean(negative_scores) if negative_scores else 0.5 |
|
|
temperatures[news_id] = avg_temperature |
|
|
|
|
|
return temperatures |
|
|
|
|
|
def analyze_correlation( |
|
|
self, |
|
|
thread_lengths: Dict[str, int], |
|
|
temperatures: Dict[str, float] |
|
|
) -> Dict: |
|
|
""" |
|
|
Analyze correlation between thread length and temperature. |
|
|
|
|
|
Args: |
|
|
thread_lengths: Dictionary mapping news_id to thread length |
|
|
temperatures: Dictionary mapping news_id to temperature |
|
|
|
|
|
Returns: |
|
|
Dictionary with correlation statistics |
|
|
""" |
|
|
|
|
|
common_ids = set(thread_lengths.keys()) & set(temperatures.keys()) |
|
|
|
|
|
if len(common_ids) < 2: |
|
|
return { |
|
|
"correlation": 0.0, |
|
|
"p_value": 1.0, |
|
|
"significant": False, |
|
|
"sample_size": len(common_ids), |
|
|
"error": "Insufficient data for correlation analysis" |
|
|
} |
|
|
|
|
|
|
|
|
lengths = [thread_lengths[id] for id in common_ids] |
|
|
temps = [temperatures[id] for id in common_ids] |
|
|
|
|
|
|
|
|
correlation, p_value = stats.pearsonr(lengths, temps) |
|
|
|
|
|
|
|
|
X = np.array(lengths).reshape(-1, 1) |
|
|
y = np.array(temps) |
|
|
|
|
|
reg = LinearRegression() |
|
|
reg.fit(X, y) |
|
|
|
|
|
slope = reg.coef_[0] |
|
|
intercept = reg.intercept_ |
|
|
r_squared = reg.score(X, y) |
|
|
|
|
|
return { |
|
|
"correlation": float(correlation), |
|
|
"p_value": float(p_value), |
|
|
"significant": p_value < 0.05, |
|
|
"sample_size": len(common_ids), |
|
|
"slope": float(slope), |
|
|
"intercept": float(intercept), |
|
|
"r_squared": float(r_squared), |
|
|
"interpretation": self._interpret_correlation(correlation, p_value) |
|
|
} |
|
|
|
|
|
def _interpret_correlation(self, correlation: float, p_value: float) -> str: |
|
|
"""Interpret correlation results.""" |
|
|
if p_value >= 0.05: |
|
|
return "No significant correlation (p >= 0.05)" |
|
|
|
|
|
if abs(correlation) < 0.1: |
|
|
strength = "negligible" |
|
|
elif abs(correlation) < 0.3: |
|
|
strength = "weak" |
|
|
elif abs(correlation) < 0.5: |
|
|
strength = "moderate" |
|
|
elif abs(correlation) < 0.7: |
|
|
strength = "strong" |
|
|
else: |
|
|
strength = "very strong" |
|
|
|
|
|
direction = "positive" if correlation > 0 else "negative" |
|
|
|
|
|
return f"{strength.capitalize()} {direction} correlation (r={correlation:.3f}, p={p_value:.4f})" |
|
|
|
|
|
def analyze_from_dataframe( |
|
|
self, |
|
|
df: pd.DataFrame, |
|
|
news_id_col: str = "news_id", |
|
|
text_col: str = "text" |
|
|
) -> Tuple[pd.DataFrame, Dict]: |
|
|
""" |
|
|
Analyze thread-sentiment correlation from DataFrame. |
|
|
|
|
|
Args: |
|
|
df: DataFrame with news_id and text columns |
|
|
news_id_col: Name of news_id column |
|
|
text_col: Name of text column |
|
|
|
|
|
Returns: |
|
|
Tuple of (DataFrame with thread stats, correlation results) |
|
|
""" |
|
|
|
|
|
data = df[[news_id_col, text_col]].to_dict('records') |
|
|
|
|
|
|
|
|
thread_lengths = self.calculate_thread_lengths( |
|
|
data, |
|
|
news_id_key=news_id_col |
|
|
) |
|
|
temperatures = self.calculate_temperature( |
|
|
data, |
|
|
news_id_key=news_id_col, |
|
|
text_key=text_col |
|
|
) |
|
|
|
|
|
|
|
|
correlation_results = self.analyze_correlation(thread_lengths, temperatures) |
|
|
|
|
|
|
|
|
common_ids = set(thread_lengths.keys()) & set(temperatures.keys()) |
|
|
thread_stats = pd.DataFrame([ |
|
|
{ |
|
|
"news_id": news_id, |
|
|
"thread_length": thread_lengths[news_id], |
|
|
"temperature": temperatures[news_id] |
|
|
} |
|
|
for news_id in common_ids |
|
|
]) |
|
|
|
|
|
return thread_stats, correlation_results |
|
|
|
|
|
|
|
|
|
|
|
|