telegram-sentiment-analysis / analysis_core.py
Sajjadistic's picture
Update analysis_core.py
fb5aa4a verified
from __future__ import annotations
import re
from collections import defaultdict
from typing import Any, Dict, List, Tuple
import numpy as np
import pandas as pd
import nltk
from nltk.sentiment import SentimentIntensityAnalyzer
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import matplotlib.pyplot as plt
import arabic_reshaper
from bidi.algorithm import get_display
# ===================== NLTK =====================
def _ensure_nltk() -> None:
try:
nltk.data.find("tokenizers/punkt")
except LookupError:
nltk.download("punkt", quiet=True)
try:
nltk.data.find("sentiment/vader_lexicon.zip")
except LookupError:
nltk.download("vader_lexicon", quiet=True)
_ensure_nltk()
SIA = SentimentIntensityAnalyzer()
# ===================== Persian sentiment model =====================
MODEL_NAME = "HooshvareLab/bert-fa-base-uncased-sentiment-deepsentipers-binary"
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
TOKENIZER = AutoTokenizer.from_pretrained(MODEL_NAME)
MODEL = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME).to(DEVICE)
MODEL.eval()
# ===================== Small lex helpers (not used for final top words scale anymore) =====================
persian_positive = {
"خوب","عالی","عالیه","باحال","قشنگ","زیبا","خوشحال","شاد","مرسی","ممنون","ممنونم",
"دمت","دمتگرم","دمت گرم","ایول","آفرین","دوستت","دوستت_دارم","دوستتدارم","عزیزم",
"محشر","فوق‌العاده","فوق العاده","بی‌نظیر","بی نظیر","خفن","توپ","کارت درسته",
"درسته","حرفت حقه","حرفت حق","احسنت","دمت حسابی","دمت گرم حسابی",
"حال‌خوب","حال خوب","انرژی","مثبت","دلنشین","دوست‌داشتنی","دوست داشتنی",
"خوشم اومد","عاشقشم","عاشقتم","دوست دارم","دوستدارم","خیلی خوب","خیلی عالی",
"عالی بود","عالیه کارت","عالیه کار","عالیه ها","خوبه","خوب بود","خوبم","خوبی",
"سپاس","سپاسگزارم","تشکر","تشکر میکنم","تشکر می‌کنم","مرسیی","مرسییی","مرسی خیلی",
"لطف کردی","لطف دارین","زحمت کشیدی","زحمت کشیدین","قدردانم","مدیونم","دستت درد نکنه",
"دستت درد نکنه واقعا","مچکرم","ممنون ازت","ممنون از شما","خیلی ممنون","خیلی متشکرم",
"موفق باشی","موفق","پاینده","برقرار باشی","سربلند","سربلند باشی","پیروز","پیروز باشی",
"شاد باشی","خوشحال باشی","بهترین","بهترینه","بهترینی","عالی‌ترین","بی‌رقیب","درجه یک",
"درجه‌یک","تاپه","تاپ","خفن‌ترین","بترکونی","ترکوندی","ترکوند","کارت عالیه",
"دوست‌داشتنیه","دوست داشتنیه","دلبر","دلنشینه","قلبمی","قلب منی","جونمی","جونم",
"ناز","نازی","نازنین","ماه","ماهی","فرشته","فرشته‌ای","عشق","عشقم","عشقی",
"دوست‌داشتنی‌ترین","محبوب","محبوبم","مهربون","مهربونی","با مرامی","باحاله",
"حال میده","حال می‌ده","حال کردم","حال داد","کیف کردم","کیف داد","لذت بردم",
"لذت‌بخش","حال‌انگیز","روح‌نواز","آرامش","آرامش‌بخش","دلچسب",
"❤️","💖","💗","💙","💚","💛","💜","💕","💞","💓","💘","😍","🥰","😘",
"😊","😄","😁","🙂","😌","🤗","👍","👌","🙌","👏","✨","🔥","🌟","⭐","💯"
}
persian_negative = {
"بد","افتضاح","مزخرف","ناراحت","غمگین","خسته","داغون","اعصاب","کلافه","لعنت","مسخره",
"حالم_بده","حالمبده","حالم بده","حالم خوب نیست",
"خیلی بد","خیلی بده","بدجور","افتضاحه","افتضاح بود","مزخرفه","چرته","چرت","چرت و پرت",
"حالم گرفته","حال بد","حال بدی","حال ندارم","بی‌حال","بی حال","له شدم",
"عصبی","عصبانی","اعصاب‌خراب","اعصاب خراب","رو اعصاب","اعصابم خورده","کلافم",
"حرص","حرصم دراومد","حرصم گرفت","اعصاب خوردکن",
"غم","غمگینم","افسرده","افسردگی","دپرس","دلگیر","دل‌شکسته","دل شکسته","دلسرد",
"ناامید","ناامیدی","بی‌انگیزه","بی انگیزه","بی‌حوصله","بی حوصله",
"حالم بده","حال ندارم","حال ندارم اصلا","حالم خیلی بده","حال بدی دارم",
"حال افتضاح","حال خراب","حالم خرابه",
"چرند","مسخره‌بازی","مسخره بازی","چرت گفتن","چرت میگی","چرندیات",
"مزخرفات","الکی","بی‌معنی","بی معنی","احمقانه",
"اعصابم داغونه","کلافه‌ام","کلافه ام","رو مخ","رو مخمه","رو مخی",
"خستم","خیلی خسته","له شدم","بریدم","بریدم دیگه",
"داغونم","خراب","خرابه","ویران","بدبخت","بدبختی","بیچاره",
"لعنتی","لعنت به","کوفت","کوفت و زهرمار","نفرتم","حالم ازش بهم می‌خوره",
"😡","😠","🤬","😞","😔","😢","😭","☹️","🙁","😩","😫","😣",
"💔","🖤","🥀","💢","👎","🚫","⛔"
}
persian_positive_phrases = {"دوستت دارم", "خیلی دوستت دارم", "دمت گرم", "آفرین"}
persian_negative_phrases = {"حالم بده", "خیلی بده", "اعصابم خورد"}
persian_stopwords = {
"و","در","به","از","که","این","اون","آن","من","تو","ما","شما","او","هم","یا","اما","اگر",
"برای","با","روی","تا","نه","را","همه","چی","چیه","چرا","کجا","کی","چه","یه","یک","بود","هست",
"می","میشه","کرد","کردم","کن","کردی","کردن","دارم","داری","داره","داشت","شد","شده"
}
english_stopwords = {
"the","a","an","and","or","but","if","then","else","to","of","in","on","at","for","with",
"is","am","are","was","were","be","been","being","i","you","he","she","we","they",
"this","that","these","those","it","as","by","from","not","no","yes","do","does","did"
}
stopwords_all = persian_stopwords.union(english_stopwords)
# ===================== Telegram parsing =====================
def extract_text(msg_text: Any) -> str:
if msg_text is None:
return ""
if isinstance(msg_text, str):
return msg_text
if isinstance(msg_text, list):
parts: List[str] = []
for part in msg_text:
if isinstance(part, str):
parts.append(part)
elif isinstance(part, dict):
t = part.get("text")
if isinstance(t, str):
parts.append(t)
return "".join(parts)
return ""
def extract_chats(data: Dict[str, Any]) -> List[Dict[str, Any]]:
if isinstance(data, dict) and "chats" in data and isinstance(data["chats"], dict) and "list" in data["chats"]:
lst = data["chats"]["list"]
if isinstance(lst, list):
return lst
if isinstance(data, dict) and "messages" in data and isinstance(data["messages"], list):
return [data]
raise ValueError("JSON format not recognized. Need Telegram export result.json with chats.list.")
def get_chat_name(chat: Dict[str, Any], fallback: str) -> str:
name = chat.get("name") or chat.get("title")
if isinstance(name, str) and name.strip():
return name.strip()
return fallback
def build_df(chat: Dict[str, Any]) -> pd.DataFrame:
rows: List[Dict[str, Any]] = []
for msg in chat.get("messages", []):
if not isinstance(msg, dict):
continue
text = extract_text(msg.get("text", "")).strip()
if not text:
continue
date_raw = msg.get("date")
if not isinstance(date_raw, str) or not date_raw:
continue
rows.append({"date_raw": date_raw, "text": text})
df = pd.DataFrame(rows)
if df.empty:
return df
df["date"] = pd.to_datetime(df["date_raw"], errors="coerce", utc=True).dt.tz_convert(None)
df = df.dropna(subset=["date"]).sort_values("date").reset_index(drop=True)
return df
# ===================== Tokenize + Persian detect =====================
_FA_RE = re.compile(r"[\u0600-\u06FF]")
def is_persian(text: str) -> bool:
return bool(_FA_RE.search(text or ""))
def custom_tokenize(text: str) -> List[str]:
text = re.sub(r"http\S+|www\.\S+", " ", str(text))
text = text.replace("\u200c", " ")
tokens = re.findall(r"[\w\u0600-\u06FF]+", text)
tokens = [t.replace("دوستتدارم", "دوستت_دارم").replace("حالمبده", "حالم_بده") for t in tokens]
return tokens
# ===================== Sentiment scoring =====================
def score_en_vader(text: str) -> float:
return float(SIA.polarity_scores(text)["compound"])
@torch.inference_mode()
def score_fa_bert_batch(texts: List[str], batch_size: int = 16) -> List[float]:
scores: List[float] = []
for i in range(0, len(texts), batch_size):
chunk = texts[i:i + batch_size]
inputs = TOKENIZER(chunk, return_tensors="pt", truncation=True, padding=True, max_length=256).to(DEVICE)
out = MODEL(**inputs)
probs = torch.softmax(out.logits, dim=-1)
diff = (probs[:, 1] - probs[:, 0]).detach().cpu().numpy().astype(float).tolist()
diff = [float(max(-1.0, min(1.0, d))) for d in diff]
scores.extend(diff)
return scores
def persian_lexicon_score(text: str) -> float:
tokens = custom_tokenize(text)
pos = 0
neg = 0
for t in tokens:
if t in persian_positive:
pos += 1
elif t in persian_negative:
neg += 1
norm = str(text).replace("\u200c", " ")
for ph in persian_positive_phrases:
if ph in norm:
pos += 2
for ph in persian_negative_phrases:
if ph in norm:
neg += 2
return float((pos - neg) / max(1, (pos + neg)))
def compute_sentiments(df: pd.DataFrame, max_bert_persian: int = 500, bert_batch_size: int = 16) -> Tuple[pd.DataFrame, int]:
if df.empty:
return df, 0
df = df.copy()
df["sentiment_final"] = 0.0
pers_mask = df["text"].astype(str).apply(is_persian)
pers_idx = df.index[pers_mask].tolist()
en_idx = df.index[~pers_mask].tolist()
if en_idx:
df.loc[en_idx, "sentiment_final"] = df.loc[en_idx, "text"].astype(str).apply(score_en_vader)
if pers_idx:
df.loc[pers_idx, "sentiment_final"] = df.loc[pers_idx, "text"].astype(str).apply(persian_lexicon_score)
bert_idx = pers_idx[: max(0, int(max_bert_persian))]
if bert_idx:
texts = df.loc[bert_idx, "text"].astype(str).tolist()
scores = score_fa_bert_batch(texts, batch_size=int(bert_batch_size))
df.loc[bert_idx, "sentiment_final"] = scores
df["sentiment_final"] = df["sentiment_final"].fillna(0.0)
df["sentiment_final"] = df["sentiment_final"].clip(-1.0, 1.0)
return df, len(bert_idx)
# ===================== Weekly aggregation + extremes =====================
def weekly_series(df: pd.DataFrame) -> pd.Series:
return (
df.set_index("date")
.resample("W")["sentiment_final"]
.mean()
.dropna()
)
def message_in_week(df: pd.DataFrame, week_end: pd.Timestamp, mode: str):
start = week_end - pd.Timedelta(days=7)
sub = df[(df["date"] > start) & (df["date"] <= week_end)]
if sub.empty:
return None
if mode == "max":
return sub.loc[sub["sentiment_final"].idxmax()]
return sub.loc[sub["sentiment_final"].idxmin()]
def extract_lex_words_from_text(text: str, polarity: str, min_words: int = 4) -> List[str]:
tokens = [t for t in custom_tokenize(text) if len(t) > 1 and t not in stopwords_all]
out: List[str] = []
for t in tokens:
if polarity == "pos" and (t in persian_positive):
out.append(t)
if polarity == "neg" and (t in persian_negative):
out.append(t)
if len(out) < min_words:
for t in tokens:
if t not in out:
out.append(t)
if len(out) >= min_words:
break
return out[:max(min_words, 4)]
# ===================== Plot helpers =====================
def _shape_fa(s: str) -> str:
try:
return get_display(arabic_reshaper.reshape(str(s)))
except Exception:
return str(s)
def make_weekly_plot(df: pd.DataFrame, chat_name: str):
ws = weekly_series(df)
info: Dict[str, Any] = {
"peak_week_end": None,
"low_week_end": None,
"peak_words": [],
"low_words": [],
"peak_word_main": None,
"low_word_main": None,
}
fig, ax = plt.subplots(figsize=(22, 8))
ax.set_title(_shape_fa(f"Emotion Trajectory in Chat: {chat_name}"))
ax.set_xlabel("Time (weeks)")
ax.set_ylabel("Average sentiment score")
ax.grid(True)
ax.axhline(0, linestyle="--")
if ws.empty:
fig.tight_layout()
return fig, info
x = ws.index
y = ws.values.astype(float)
y_min = float(np.min(y))
y_max = float(np.max(y))
yr = max(1e-9, (y_max - y_min))
top_pad = 0.30 * yr
bot_pad = 0.20 * yr
ax.plot(x, y, color="red", linewidth=2, marker="o", markersize=4)
ax.set_ylim(y_min - bot_pad, y_max + top_pad)
ax.margins(x=0.03)
peak_week = ws.idxmax()
low_week = ws.idxmin()
info["peak_week_end"] = peak_week.isoformat()
info["low_week_end"] = low_week.isoformat()
peak_msg = message_in_week(df, peak_week, "max")
low_msg = message_in_week(df, low_week, "min")
peak_words = extract_lex_words_from_text(str(peak_msg["text"]) if peak_msg is not None else "", "pos", 4)
low_words = extract_lex_words_from_text(str(low_msg["text"]) if low_msg is not None else "", "neg", 4)
info["peak_words"] = peak_words
info["low_words"] = low_words
info["peak_word_main"] = peak_words[0] if peak_words else None
info["low_word_main"] = low_words[0] if low_words else None
peak_y = float(ws.loc[peak_week])
low_y = float(ws.loc[low_week])
peak_text_y = peak_y + 0.18 * yr
low_text_y = low_y - 0.18 * yr
ax.scatter([peak_week], [peak_y], zorder=3)
ax.annotate(
_shape_fa("، ".join(peak_words) + f" (peak={peak_y:.3f})"),
xy=(peak_week, peak_y),
xytext=(peak_week, peak_text_y),
arrowprops=dict(arrowstyle="->"),
ha="center",
fontsize=10,
clip_on=False,
)
ax.scatter([low_week], [low_y], zorder=3)
ax.annotate(
_shape_fa("، ".join(low_words) + f" (low={low_y:.3f})"),
xy=(low_week, low_y),
xytext=(low_week, low_text_y),
arrowprops=dict(arrowstyle="->"),
ha="center",
fontsize=10,
clip_on=False,
)
fig.subplots_adjust(top=0.90)
fig.tight_layout()
return fig, info
# ===================== Top words (normalized to [-1, +1]) =====================
def top_words_weighted_by_sentiment(df: pd.DataFrame, top_n: int = 5) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""
Fix: only count sentiment-bearing tokens/phrases (lexicon + phrase lexicon).
This prevents neutral words like 'خیلی', 'کار', 'امروز' from showing up as negative/positive.
Scores are normalized to [-1, +1].
"""
pos_w = defaultdict(float)
neg_w = defaultdict(float)
# quick phrase lists (so we can weight phrases too)
pos_phrases = list(persian_positive_phrases) if isinstance(persian_positive_phrases, (set, list, tuple)) else []
neg_phrases = list(persian_negative_phrases) if isinstance(persian_negative_phrases, (set, list, tuple)) else []
for text, s in zip(df["text"].astype(str).tolist(), df["sentiment_final"].astype(float).tolist()):
if abs(s) < 1e-9:
continue
# 1) phrase-level evidence (stronger, more meaningful)
norm = str(text).replace("\u200c", " ")
if s > 0:
for ph in pos_phrases:
if ph and ph in norm:
pos_w[ph] += float(s) * 2.0
elif s < 0:
for ph in neg_phrases:
if ph and ph in norm:
neg_w[ph] += float(abs(s)) * 2.0
# 2) token-level evidence (ONLY lexicon tokens)
tokens = custom_tokenize(text)
for t in tokens:
if len(t) <= 1:
continue
if t in stopwords_all:
continue
if s > 0 and (t in persian_positive):
pos_w[t] += float(s)
elif s < 0 and (t in persian_negative):
neg_w[t] += float(abs(s))
# if no meaningful lex tokens found, return empty lists (better than wrong words)
if not pos_w:
pos_top = []
else:
max_pos = max(pos_w.values(), default=0.0)
pos_items = sorted(pos_w.items(), key=lambda x: x[1], reverse=True)[:top_n]
pos_top = [{"word": w, "score": float(v / max_pos)} for w, v in pos_items] if max_pos > 1e-12 else []
for d in pos_top:
d["score"] = float(max(0.0, min(1.0, d["score"])))
if not neg_w:
neg_top = []
else:
max_neg = max(neg_w.values(), default=0.0)
neg_items = sorted(neg_w.items(), key=lambda x: x[1], reverse=True)[:top_n]
neg_top = [{"word": w, "score": float(-(v / max_neg))} for w, v in neg_items] if max_neg > 1e-12 else []
for d in neg_top:
d["score"] = float(min(0.0, max(-1.0, d["score"])))
return pos_top, neg_top
# ===================== Main entry =====================
def analyze_chat(
chat: Dict[str, Any],
max_bert_persian: int = 500
) -> Tuple[Dict[str, Any], Any, List[Dict[str, Any]], List[Dict[str, Any]]]:
df = build_df(chat)
name = get_chat_name(chat, "Selected chat")
if df.empty:
empty = {
"chat_name": name,
"message_count": 0,
"bert_used_on_persian_messages": 0,
"overall_avg_sentiment": 0.0,
"peak_word_main": None,
"low_word_main": None,
"peak_words": [],
"low_words": [],
"weekly": [],
"top5_positive_lex": [],
"top5_negative_lex": [],
}
fig, ax = plt.subplots(figsize=(22, 8))
ax.set_title(_shape_fa(f"Emotion Trajectory in Chat: {name}"))
fig.tight_layout()
return empty, fig, [], []
df, used = compute_sentiments(df, max_bert_persian=max_bert_persian, bert_batch_size=16)
ws = weekly_series(df)
weekly_records = [{"week_end": idx.isoformat(), "avg_sentiment": float(val)} for idx, val in ws.items()]
overall = float(df["sentiment_final"].mean())
fig, info = make_weekly_plot(df, name)
pos_top, neg_top = top_words_weighted_by_sentiment(df, top_n=5)
result = {
"chat_name": name,
"message_count": int(len(df)),
"bert_used_on_persian_messages": int(used),
"overall_avg_sentiment": overall,
"weekly": weekly_records,
"peak_week_end": info["peak_week_end"],
"low_week_end": info["low_week_end"],
"peak_word_main": info["peak_word_main"],
"low_word_main": info["low_word_main"],
"peak_words": info["peak_words"],
"low_words": info["low_words"],
"top5_positive_lex": pos_top,
"top5_negative_lex": neg_top,
}
return result, fig, pos_top, neg_top