File size: 20,206 Bytes
f270cec 456ed3b 8b31d5e f270cec 456ed3b f270cec 8b31d5e f270cec 52e6ffe f270cec 8b31d5e f270cec 52e6ffe 0993f3a 8e3076f 0993f3a 8e3076f 0993f3a 52e6ffe 0993f3a 52e6ffe 0993f3a c2b5d94 0993f3a 456ed3b f270cec 52e6ffe f270cec 52e6ffe f270cec 52e6ffe f270cec 52e6ffe f270cec 8b31d5e 52e6ffe 8b31d5e 52e6ffe c2b5d94 52e6ffe c2b5d94 52e6ffe c2b5d94 f270cec 52e6ffe 8b31d5e f270cec 52e6ffe 3c43710 52e6ffe 8b31d5e f4b0a68 c2b5d94 52e6ffe 8b31d5e 52e6ffe 8b31d5e 52e6ffe 8b31d5e 52e6ffe 0993f3a 52e6ffe 3c43710 52e6ffe f270cec 52e6ffe 0993f3a 52e6ffe f270cec a273b11 0993f3a 52e6ffe 3c43710 8b31d5e 0993f3a 3c43710 52e6ffe c2b5d94 52e6ffe 3c43710 52e6ffe 3c43710 52e6ffe 3c43710 52e6ffe 3c43710 52e6ffe 3c43710 52e6ffe 3c43710 52e6ffe 3c43710 52e6ffe 3c43710 52e6ffe 3c43710 52e6ffe f270cec 3c43710 52e6ffe 2261df8 52e6ffe fb5aa4a f4b0a68 0993f3a fb5aa4a 52e6ffe fb5aa4a f4b0a68 2261df8 fb5aa4a f4b0a68 fb5aa4a f4b0a68 fb5aa4a 2261df8 fb5aa4a 2261df8 fb5aa4a 2261df8 fb5aa4a 52e6ffe fb5aa4a 52e6ffe fb5aa4a 52e6ffe fb5aa4a 52e6ffe 456ed3b 2261df8 52e6ffe f270cec 52e6ffe f270cec 52e6ffe 3c43710 52e6ffe c2b5d94 52e6ffe 8b31d5e 52e6ffe 8b31d5e 52e6ffe 0993f3a 8b31d5e 0993f3a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 |
from __future__ import annotations
import re
from collections import defaultdict
from typing import Any, Dict, List, Tuple
import numpy as np
import pandas as pd
import nltk
from nltk.sentiment import SentimentIntensityAnalyzer
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import matplotlib.pyplot as plt
import arabic_reshaper
from bidi.algorithm import get_display
# ===================== NLTK =====================
def _ensure_nltk() -> None:
try:
nltk.data.find("tokenizers/punkt")
except LookupError:
nltk.download("punkt", quiet=True)
try:
nltk.data.find("sentiment/vader_lexicon.zip")
except LookupError:
nltk.download("vader_lexicon", quiet=True)
_ensure_nltk()
SIA = SentimentIntensityAnalyzer()
# ===================== Persian sentiment model =====================
MODEL_NAME = "HooshvareLab/bert-fa-base-uncased-sentiment-deepsentipers-binary"
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
TOKENIZER = AutoTokenizer.from_pretrained(MODEL_NAME)
MODEL = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME).to(DEVICE)
MODEL.eval()
# ===================== Small lex helpers (not used for final top words scale anymore) =====================
persian_positive = {
"خوب","عالی","عالیه","باحال","قشنگ","زیبا","خوشحال","شاد","مرسی","ممنون","ممنونم",
"دمت","دمتگرم","دمت گرم","ایول","آفرین","دوستت","دوستت_دارم","دوستتدارم","عزیزم",
"محشر","فوقالعاده","فوق العاده","بینظیر","بی نظیر","خفن","توپ","کارت درسته",
"درسته","حرفت حقه","حرفت حق","احسنت","دمت حسابی","دمت گرم حسابی",
"حالخوب","حال خوب","انرژی","مثبت","دلنشین","دوستداشتنی","دوست داشتنی",
"خوشم اومد","عاشقشم","عاشقتم","دوست دارم","دوستدارم","خیلی خوب","خیلی عالی",
"عالی بود","عالیه کارت","عالیه کار","عالیه ها","خوبه","خوب بود","خوبم","خوبی",
"سپاس","سپاسگزارم","تشکر","تشکر میکنم","تشکر میکنم","مرسیی","مرسییی","مرسی خیلی",
"لطف کردی","لطف دارین","زحمت کشیدی","زحمت کشیدین","قدردانم","مدیونم","دستت درد نکنه",
"دستت درد نکنه واقعا","مچکرم","ممنون ازت","ممنون از شما","خیلی ممنون","خیلی متشکرم",
"موفق باشی","موفق","پاینده","برقرار باشی","سربلند","سربلند باشی","پیروز","پیروز باشی",
"شاد باشی","خوشحال باشی","بهترین","بهترینه","بهترینی","عالیترین","بیرقیب","درجه یک",
"درجهیک","تاپه","تاپ","خفنترین","بترکونی","ترکوندی","ترکوند","کارت عالیه",
"دوستداشتنیه","دوست داشتنیه","دلبر","دلنشینه","قلبمی","قلب منی","جونمی","جونم",
"ناز","نازی","نازنین","ماه","ماهی","فرشته","فرشتهای","عشق","عشقم","عشقی",
"دوستداشتنیترین","محبوب","محبوبم","مهربون","مهربونی","با مرامی","باحاله",
"حال میده","حال میده","حال کردم","حال داد","کیف کردم","کیف داد","لذت بردم",
"لذتبخش","حالانگیز","روحنواز","آرامش","آرامشبخش","دلچسب",
"❤️","💖","💗","💙","💚","💛","💜","💕","💞","💓","💘","😍","🥰","😘",
"😊","😄","😁","🙂","😌","🤗","👍","👌","🙌","👏","✨","🔥","🌟","⭐","💯"
}
persian_negative = {
"بد","افتضاح","مزخرف","ناراحت","غمگین","خسته","داغون","اعصاب","کلافه","لعنت","مسخره",
"حالم_بده","حالمبده","حالم بده","حالم خوب نیست",
"خیلی بد","خیلی بده","بدجور","افتضاحه","افتضاح بود","مزخرفه","چرته","چرت","چرت و پرت",
"حالم گرفته","حال بد","حال بدی","حال ندارم","بیحال","بی حال","له شدم",
"عصبی","عصبانی","اعصابخراب","اعصاب خراب","رو اعصاب","اعصابم خورده","کلافم",
"حرص","حرصم دراومد","حرصم گرفت","اعصاب خوردکن",
"غم","غمگینم","افسرده","افسردگی","دپرس","دلگیر","دلشکسته","دل شکسته","دلسرد",
"ناامید","ناامیدی","بیانگیزه","بی انگیزه","بیحوصله","بی حوصله",
"حالم بده","حال ندارم","حال ندارم اصلا","حالم خیلی بده","حال بدی دارم",
"حال افتضاح","حال خراب","حالم خرابه",
"چرند","مسخرهبازی","مسخره بازی","چرت گفتن","چرت میگی","چرندیات",
"مزخرفات","الکی","بیمعنی","بی معنی","احمقانه",
"اعصابم داغونه","کلافهام","کلافه ام","رو مخ","رو مخمه","رو مخی",
"خستم","خیلی خسته","له شدم","بریدم","بریدم دیگه",
"داغونم","خراب","خرابه","ویران","بدبخت","بدبختی","بیچاره",
"لعنتی","لعنت به","کوفت","کوفت و زهرمار","نفرتم","حالم ازش بهم میخوره",
"😡","😠","🤬","😞","😔","😢","😭","☹️","🙁","😩","😫","😣",
"💔","🖤","🥀","💢","👎","🚫","⛔"
}
persian_positive_phrases = {"دوستت دارم", "خیلی دوستت دارم", "دمت گرم", "آفرین"}
persian_negative_phrases = {"حالم بده", "خیلی بده", "اعصابم خورد"}
persian_stopwords = {
"و","در","به","از","که","این","اون","آن","من","تو","ما","شما","او","هم","یا","اما","اگر",
"برای","با","روی","تا","نه","را","همه","چی","چیه","چرا","کجا","کی","چه","یه","یک","بود","هست",
"می","میشه","کرد","کردم","کن","کردی","کردن","دارم","داری","داره","داشت","شد","شده"
}
english_stopwords = {
"the","a","an","and","or","but","if","then","else","to","of","in","on","at","for","with",
"is","am","are","was","were","be","been","being","i","you","he","she","we","they",
"this","that","these","those","it","as","by","from","not","no","yes","do","does","did"
}
stopwords_all = persian_stopwords.union(english_stopwords)
# ===================== Telegram parsing =====================
def extract_text(msg_text: Any) -> str:
if msg_text is None:
return ""
if isinstance(msg_text, str):
return msg_text
if isinstance(msg_text, list):
parts: List[str] = []
for part in msg_text:
if isinstance(part, str):
parts.append(part)
elif isinstance(part, dict):
t = part.get("text")
if isinstance(t, str):
parts.append(t)
return "".join(parts)
return ""
def extract_chats(data: Dict[str, Any]) -> List[Dict[str, Any]]:
if isinstance(data, dict) and "chats" in data and isinstance(data["chats"], dict) and "list" in data["chats"]:
lst = data["chats"]["list"]
if isinstance(lst, list):
return lst
if isinstance(data, dict) and "messages" in data and isinstance(data["messages"], list):
return [data]
raise ValueError("JSON format not recognized. Need Telegram export result.json with chats.list.")
def get_chat_name(chat: Dict[str, Any], fallback: str) -> str:
name = chat.get("name") or chat.get("title")
if isinstance(name, str) and name.strip():
return name.strip()
return fallback
def build_df(chat: Dict[str, Any]) -> pd.DataFrame:
rows: List[Dict[str, Any]] = []
for msg in chat.get("messages", []):
if not isinstance(msg, dict):
continue
text = extract_text(msg.get("text", "")).strip()
if not text:
continue
date_raw = msg.get("date")
if not isinstance(date_raw, str) or not date_raw:
continue
rows.append({"date_raw": date_raw, "text": text})
df = pd.DataFrame(rows)
if df.empty:
return df
df["date"] = pd.to_datetime(df["date_raw"], errors="coerce", utc=True).dt.tz_convert(None)
df = df.dropna(subset=["date"]).sort_values("date").reset_index(drop=True)
return df
# ===================== Tokenize + Persian detect =====================
_FA_RE = re.compile(r"[\u0600-\u06FF]")
def is_persian(text: str) -> bool:
return bool(_FA_RE.search(text or ""))
def custom_tokenize(text: str) -> List[str]:
text = re.sub(r"http\S+|www\.\S+", " ", str(text))
text = text.replace("\u200c", " ")
tokens = re.findall(r"[\w\u0600-\u06FF]+", text)
tokens = [t.replace("دوستتدارم", "دوستت_دارم").replace("حالمبده", "حالم_بده") for t in tokens]
return tokens
# ===================== Sentiment scoring =====================
def score_en_vader(text: str) -> float:
return float(SIA.polarity_scores(text)["compound"])
@torch.inference_mode()
def score_fa_bert_batch(texts: List[str], batch_size: int = 16) -> List[float]:
scores: List[float] = []
for i in range(0, len(texts), batch_size):
chunk = texts[i:i + batch_size]
inputs = TOKENIZER(chunk, return_tensors="pt", truncation=True, padding=True, max_length=256).to(DEVICE)
out = MODEL(**inputs)
probs = torch.softmax(out.logits, dim=-1)
diff = (probs[:, 1] - probs[:, 0]).detach().cpu().numpy().astype(float).tolist()
diff = [float(max(-1.0, min(1.0, d))) for d in diff]
scores.extend(diff)
return scores
def persian_lexicon_score(text: str) -> float:
tokens = custom_tokenize(text)
pos = 0
neg = 0
for t in tokens:
if t in persian_positive:
pos += 1
elif t in persian_negative:
neg += 1
norm = str(text).replace("\u200c", " ")
for ph in persian_positive_phrases:
if ph in norm:
pos += 2
for ph in persian_negative_phrases:
if ph in norm:
neg += 2
return float((pos - neg) / max(1, (pos + neg)))
def compute_sentiments(df: pd.DataFrame, max_bert_persian: int = 500, bert_batch_size: int = 16) -> Tuple[pd.DataFrame, int]:
if df.empty:
return df, 0
df = df.copy()
df["sentiment_final"] = 0.0
pers_mask = df["text"].astype(str).apply(is_persian)
pers_idx = df.index[pers_mask].tolist()
en_idx = df.index[~pers_mask].tolist()
if en_idx:
df.loc[en_idx, "sentiment_final"] = df.loc[en_idx, "text"].astype(str).apply(score_en_vader)
if pers_idx:
df.loc[pers_idx, "sentiment_final"] = df.loc[pers_idx, "text"].astype(str).apply(persian_lexicon_score)
bert_idx = pers_idx[: max(0, int(max_bert_persian))]
if bert_idx:
texts = df.loc[bert_idx, "text"].astype(str).tolist()
scores = score_fa_bert_batch(texts, batch_size=int(bert_batch_size))
df.loc[bert_idx, "sentiment_final"] = scores
df["sentiment_final"] = df["sentiment_final"].fillna(0.0)
df["sentiment_final"] = df["sentiment_final"].clip(-1.0, 1.0)
return df, len(bert_idx)
# ===================== Weekly aggregation + extremes =====================
def weekly_series(df: pd.DataFrame) -> pd.Series:
return (
df.set_index("date")
.resample("W")["sentiment_final"]
.mean()
.dropna()
)
def message_in_week(df: pd.DataFrame, week_end: pd.Timestamp, mode: str):
start = week_end - pd.Timedelta(days=7)
sub = df[(df["date"] > start) & (df["date"] <= week_end)]
if sub.empty:
return None
if mode == "max":
return sub.loc[sub["sentiment_final"].idxmax()]
return sub.loc[sub["sentiment_final"].idxmin()]
def extract_lex_words_from_text(text: str, polarity: str, min_words: int = 4) -> List[str]:
tokens = [t for t in custom_tokenize(text) if len(t) > 1 and t not in stopwords_all]
out: List[str] = []
for t in tokens:
if polarity == "pos" and (t in persian_positive):
out.append(t)
if polarity == "neg" and (t in persian_negative):
out.append(t)
if len(out) < min_words:
for t in tokens:
if t not in out:
out.append(t)
if len(out) >= min_words:
break
return out[:max(min_words, 4)]
# ===================== Plot helpers =====================
def _shape_fa(s: str) -> str:
try:
return get_display(arabic_reshaper.reshape(str(s)))
except Exception:
return str(s)
def make_weekly_plot(df: pd.DataFrame, chat_name: str):
ws = weekly_series(df)
info: Dict[str, Any] = {
"peak_week_end": None,
"low_week_end": None,
"peak_words": [],
"low_words": [],
"peak_word_main": None,
"low_word_main": None,
}
fig, ax = plt.subplots(figsize=(22, 8))
ax.set_title(_shape_fa(f"Emotion Trajectory in Chat: {chat_name}"))
ax.set_xlabel("Time (weeks)")
ax.set_ylabel("Average sentiment score")
ax.grid(True)
ax.axhline(0, linestyle="--")
if ws.empty:
fig.tight_layout()
return fig, info
x = ws.index
y = ws.values.astype(float)
y_min = float(np.min(y))
y_max = float(np.max(y))
yr = max(1e-9, (y_max - y_min))
top_pad = 0.30 * yr
bot_pad = 0.20 * yr
ax.plot(x, y, color="red", linewidth=2, marker="o", markersize=4)
ax.set_ylim(y_min - bot_pad, y_max + top_pad)
ax.margins(x=0.03)
peak_week = ws.idxmax()
low_week = ws.idxmin()
info["peak_week_end"] = peak_week.isoformat()
info["low_week_end"] = low_week.isoformat()
peak_msg = message_in_week(df, peak_week, "max")
low_msg = message_in_week(df, low_week, "min")
peak_words = extract_lex_words_from_text(str(peak_msg["text"]) if peak_msg is not None else "", "pos", 4)
low_words = extract_lex_words_from_text(str(low_msg["text"]) if low_msg is not None else "", "neg", 4)
info["peak_words"] = peak_words
info["low_words"] = low_words
info["peak_word_main"] = peak_words[0] if peak_words else None
info["low_word_main"] = low_words[0] if low_words else None
peak_y = float(ws.loc[peak_week])
low_y = float(ws.loc[low_week])
peak_text_y = peak_y + 0.18 * yr
low_text_y = low_y - 0.18 * yr
ax.scatter([peak_week], [peak_y], zorder=3)
ax.annotate(
_shape_fa("، ".join(peak_words) + f" (peak={peak_y:.3f})"),
xy=(peak_week, peak_y),
xytext=(peak_week, peak_text_y),
arrowprops=dict(arrowstyle="->"),
ha="center",
fontsize=10,
clip_on=False,
)
ax.scatter([low_week], [low_y], zorder=3)
ax.annotate(
_shape_fa("، ".join(low_words) + f" (low={low_y:.3f})"),
xy=(low_week, low_y),
xytext=(low_week, low_text_y),
arrowprops=dict(arrowstyle="->"),
ha="center",
fontsize=10,
clip_on=False,
)
fig.subplots_adjust(top=0.90)
fig.tight_layout()
return fig, info
# ===================== Top words (normalized to [-1, +1]) =====================
def top_words_weighted_by_sentiment(df: pd.DataFrame, top_n: int = 5) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""
Fix: only count sentiment-bearing tokens/phrases (lexicon + phrase lexicon).
This prevents neutral words like 'خیلی', 'کار', 'امروز' from showing up as negative/positive.
Scores are normalized to [-1, +1].
"""
pos_w = defaultdict(float)
neg_w = defaultdict(float)
# quick phrase lists (so we can weight phrases too)
pos_phrases = list(persian_positive_phrases) if isinstance(persian_positive_phrases, (set, list, tuple)) else []
neg_phrases = list(persian_negative_phrases) if isinstance(persian_negative_phrases, (set, list, tuple)) else []
for text, s in zip(df["text"].astype(str).tolist(), df["sentiment_final"].astype(float).tolist()):
if abs(s) < 1e-9:
continue
# 1) phrase-level evidence (stronger, more meaningful)
norm = str(text).replace("\u200c", " ")
if s > 0:
for ph in pos_phrases:
if ph and ph in norm:
pos_w[ph] += float(s) * 2.0
elif s < 0:
for ph in neg_phrases:
if ph and ph in norm:
neg_w[ph] += float(abs(s)) * 2.0
# 2) token-level evidence (ONLY lexicon tokens)
tokens = custom_tokenize(text)
for t in tokens:
if len(t) <= 1:
continue
if t in stopwords_all:
continue
if s > 0 and (t in persian_positive):
pos_w[t] += float(s)
elif s < 0 and (t in persian_negative):
neg_w[t] += float(abs(s))
# if no meaningful lex tokens found, return empty lists (better than wrong words)
if not pos_w:
pos_top = []
else:
max_pos = max(pos_w.values(), default=0.0)
pos_items = sorted(pos_w.items(), key=lambda x: x[1], reverse=True)[:top_n]
pos_top = [{"word": w, "score": float(v / max_pos)} for w, v in pos_items] if max_pos > 1e-12 else []
for d in pos_top:
d["score"] = float(max(0.0, min(1.0, d["score"])))
if not neg_w:
neg_top = []
else:
max_neg = max(neg_w.values(), default=0.0)
neg_items = sorted(neg_w.items(), key=lambda x: x[1], reverse=True)[:top_n]
neg_top = [{"word": w, "score": float(-(v / max_neg))} for w, v in neg_items] if max_neg > 1e-12 else []
for d in neg_top:
d["score"] = float(min(0.0, max(-1.0, d["score"])))
return pos_top, neg_top
# ===================== Main entry =====================
def analyze_chat(
chat: Dict[str, Any],
max_bert_persian: int = 500
) -> Tuple[Dict[str, Any], Any, List[Dict[str, Any]], List[Dict[str, Any]]]:
df = build_df(chat)
name = get_chat_name(chat, "Selected chat")
if df.empty:
empty = {
"chat_name": name,
"message_count": 0,
"bert_used_on_persian_messages": 0,
"overall_avg_sentiment": 0.0,
"peak_word_main": None,
"low_word_main": None,
"peak_words": [],
"low_words": [],
"weekly": [],
"top5_positive_lex": [],
"top5_negative_lex": [],
}
fig, ax = plt.subplots(figsize=(22, 8))
ax.set_title(_shape_fa(f"Emotion Trajectory in Chat: {name}"))
fig.tight_layout()
return empty, fig, [], []
df, used = compute_sentiments(df, max_bert_persian=max_bert_persian, bert_batch_size=16)
ws = weekly_series(df)
weekly_records = [{"week_end": idx.isoformat(), "avg_sentiment": float(val)} for idx, val in ws.items()]
overall = float(df["sentiment_final"].mean())
fig, info = make_weekly_plot(df, name)
pos_top, neg_top = top_words_weighted_by_sentiment(df, top_n=5)
result = {
"chat_name": name,
"message_count": int(len(df)),
"bert_used_on_persian_messages": int(used),
"overall_avg_sentiment": overall,
"weekly": weekly_records,
"peak_week_end": info["peak_week_end"],
"low_week_end": info["low_week_end"],
"peak_word_main": info["peak_word_main"],
"low_word_main": info["low_word_main"],
"peak_words": info["peak_words"],
"low_words": info["low_words"],
"top5_positive_lex": pos_top,
"top5_negative_lex": neg_top,
}
return result, fig, pos_top, neg_top |