Sajjadistic commited on
Commit
8b31d5e
·
verified ·
1 Parent(s): 94c5181

Update analysis_core.py

Browse files
Files changed (1) hide show
  1. analysis_core.py +130 -204
analysis_core.py CHANGED
@@ -1,7 +1,7 @@
1
  from __future__ import annotations
2
 
3
  import re
4
- from typing import Any, Dict, List, Optional, Tuple
5
 
6
  import numpy as np
7
  import pandas as pd
@@ -30,64 +30,18 @@ def _ensure_nltk() -> None:
30
  except LookupError:
31
  nltk.download("vader_lexicon", quiet=True)
32
 
33
- try:
34
- nltk.data.find("tokenizers/punkt_tab")
35
- except LookupError:
36
- try:
37
- nltk.download("punkt_tab", quiet=True)
38
- except Exception:
39
- pass
40
-
41
 
42
  _ensure_nltk()
43
- sia = SentimentIntensityAnalyzer()
44
 
45
 
46
- # -------------------- Transformer (ParsBERT DeepSentiPers) --------------------
47
  MODEL_NAME = "HooshvareLab/bert-fa-base-uncased-sentiment-deepsentipers-binary"
48
  DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
49
 
50
  TOKENIZER = AutoTokenizer.from_pretrained(MODEL_NAME)
51
- BERT_MODEL = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME).to(DEVICE)
52
- BERT_MODEL.eval()
53
-
54
-
55
- # -------------------- Lexicons (from your notebook idea) --------------------
56
- persian_positive = {
57
- "خوب","خیلی خوب","عالی","عالیه","خفن","باحال","نایس","قشنگ","زیبا","خوشگل",
58
- "عاشق","عاشقتم","دوستت","دوستت_دارم","دوستتدارم","مرسی","ممنون","ممنونم",
59
- "دمت گرم","دمت‌گرم","شاد","خوشحال","خوشحالم","آرومم","آرامشم","خوشبختم","راضیم",
60
- "بی‌نظیر","فوق‌العاده","توپ","محشر","شگفت‌انگیز","ایول","قربونت","قربانت","عزیزمی","عزیزم",
61
- "❤️","💖","💗","💙","💚","💛","💜","💕"
62
- }
63
-
64
- persian_negative = {
65
- "بد","خیلی بد","بدم","افتضاح","مزخرف","چرند","حالم_بده","حالمبده","ناراحت","غمگین",
66
- "اعصابم","اعصاب","کلافه","خسته","داغون","نفرت","متنفرم","لعنت","لعنتی","مسخره",
67
- "😡","😠","😞","😔","😭","💔"
68
- }
69
-
70
- persian_positive_phrases = {
71
- "خیلی دوستت دارم", "دوستت دارم", "دمت گرم", "آفرین", "دمت", "به‌به", "به به"
72
- }
73
-
74
- persian_negative_phrases = {
75
- "حالم بده", "خیلی بده", "اعصابم خورد", "حوصله ندارم"
76
- }
77
-
78
- persian_stopwords = {
79
- "و","در","به","از","که","این","اون","آن","من","تو","ما","شما","او","هیچ","هم","یا","اما","اگر",
80
- "برای","با","روی","تا","نه","را","همه","چی","چیه","چرا","کجا","کی","چه","یه","یک","بود","هست",
81
- "می","میخوام","میشه","کرد","کردم","کن","کردی","کردن","دارم","داری","داره","داشت","شد","شده"
82
- }
83
-
84
- english_stopwords = {
85
- "the","a","an","and","or","but","if","then","else","to","of","in","on","at","for","with",
86
- "is","am","are","was","were","be","been","being","i","you","he","she","we","they",
87
- "this","that","these","those","it","as","by","from","not","no","yes","do","does","did"
88
- }
89
-
90
- stopwords_all = persian_stopwords.union(english_stopwords)
91
 
92
 
93
  # -------------------- Telegram parsing --------------------
@@ -101,8 +55,10 @@ def extract_text(msg_text: Any) -> str:
101
  for part in msg_text:
102
  if isinstance(part, str):
103
  parts.append(part)
104
- elif isinstance(part, dict) and "text" in part and isinstance(part["text"], str):
105
- parts.append(part["text"])
 
 
106
  return "".join(parts)
107
  return ""
108
 
@@ -114,7 +70,7 @@ def extract_chats(data: Dict[str, Any]) -> List[Dict[str, Any]]:
114
  return lst
115
  if isinstance(data, dict) and "messages" in data and isinstance(data["messages"], list):
116
  return [data]
117
- raise ValueError("JSON format not recognized. expected Telegram export with data['chats']['list'].")
118
 
119
 
120
  def get_chat_name(chat: Dict[str, Any], fallback: str) -> str:
@@ -124,28 +80,27 @@ def get_chat_name(chat: Dict[str, Any], fallback: str) -> str:
124
  return fallback
125
 
126
 
127
- def build_df(selected_chat: Dict[str, Any]) -> pd.DataFrame:
128
- records: List[Dict[str, Any]] = []
129
- for msg in selected_chat.get("messages", []):
130
  if not isinstance(msg, dict):
131
  continue
 
132
  text = extract_text(msg.get("text", "")).strip()
133
  if not text:
134
  continue
135
- date_str = msg.get("date")
136
- if not isinstance(date_str, str) or not date_str:
 
137
  continue
 
138
  sender = msg.get("from") or msg.get("actor") or "Unknown"
139
- records.append(
140
- {
141
- "id": msg.get("id"),
142
- "date_raw": date_str,
143
- "sender": sender,
144
- "text": text,
145
- }
146
- )
147
-
148
- df = pd.DataFrame(records)
149
  if df.empty:
150
  return df
151
 
@@ -154,178 +109,149 @@ def build_df(selected_chat: Dict[str, Any]) -> pd.DataFrame:
154
  return df
155
 
156
 
157
- # -------------------- Tokenization + language detection --------------------
158
- def contains_persian(text: str) -> bool:
159
- return any("\u0600" <= ch <= "\u06FF" for ch in str(text))
160
-
161
-
162
- def custom_tokenize(text: str) -> List[str]:
163
- text = re.sub(r"http\S+|www\.\S+", " ", str(text))
164
- text = text.replace("\u200c", " ")
165
- tokens = re.findall(r"[\w\u0600-\u06FF]+", text)
166
- tokens = [t.replace("دوستتدارم", "دوستت_دارم").replace("حالمبده", "حالم_بده") for t in tokens]
167
- return tokens
168
-
169
 
170
- def persian_lexicon_score(text: str) -> float:
171
- text = str(text)
172
- tokens = custom_tokenize(text)
173
 
174
- pos = 0
175
- neg = 0
176
 
177
- for t in tokens:
178
- if t in persian_positive:
179
- pos += 1
180
- elif t in persian_negative:
181
- neg += 1
182
 
183
- norm_text = text.replace("\u200c", " ")
184
- for phrase in persian_positive_phrases:
185
- if phrase in norm_text:
186
- pos += 2
187
- for phrase in persian_negative_phrases:
188
- if phrase in norm_text:
189
- neg += 2
190
-
191
- score = (pos - neg) / max(1, (pos + neg))
192
- return float(max(-1.0, min(1.0, score)))
193
 
194
 
195
  @torch.inference_mode()
196
- def persian_sent_bert(text: str) -> float:
 
 
197
  inputs = TOKENIZER(text, return_tensors="pt", truncation=True, max_length=256).to(DEVICE)
198
- out = BERT_MODEL(**inputs)
199
  probs = torch.softmax(out.logits, dim=-1).squeeze(0)
200
-
201
  if probs.numel() >= 2:
202
- score = float(probs[1] - probs[0]) # 0=neg, 1=pos
203
  return float(max(-1.0, min(1.0, score)))
204
  return 0.0
205
 
206
 
207
- def persian_sentiment_hybrid(text: str) -> float:
208
- # hybrid: transformer + lexicon (keeps your notebook spirit)
209
- trf = persian_sent_bert(text)
210
- lex = persian_lexicon_score(text)
211
- return float(0.7 * trf + 0.3 * lex)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
212
 
 
 
 
 
213
 
214
- def label_sentiment(score: float, th: float = 0.1) -> str:
215
- if score > th:
216
- return "positive"
217
- if score < -th:
218
- return "negative"
219
- return "neutral"
220
 
221
 
222
- # -------------------- Persian shaping for plot --------------------
223
- def shape_text(text: str) -> str:
224
- text = str(text)
225
  try:
226
- reshaped = arabic_reshaper.reshape(text)
227
- return get_display(reshaped)
228
  except Exception:
229
- return text
230
 
231
 
232
- # -------------------- Plot helpers (weekly + annotate peak/low) --------------------
233
- def _weekly_df(df: pd.DataFrame) -> pd.DataFrame:
234
- df_time = (
 
 
 
 
 
 
 
 
 
235
  df.set_index("date")
236
  .resample("W")["sentiment_final"]
237
  .mean()
238
- .to_frame("avg_sentiment")
239
  .dropna()
240
  )
241
- return df_time
242
-
243
-
244
- def _extreme_message(df: pd.DataFrame, week_end: pd.Timestamp, mode: str = "max"):
245
- start = week_end - pd.Timedelta(days=7)
246
- sub = df[(df["date"] > start) & (df["date"] <= week_end)]
247
- if sub.empty:
248
- return None
249
- if mode == "max":
250
- return sub.loc[sub["sentiment_final"].idxmax()]
251
- return sub.loc[sub["sentiment_final"].idxmin()]
252
-
253
-
254
- def _extract_words_from_message(text: str, polarity: str = "pos", min_words: int = 4) -> List[str]:
255
- tokens = custom_tokenize(text)
256
- tokens = [t for t in tokens if len(t) > 1 and t not in stopwords_all]
257
-
258
- words: List[str] = []
259
-
260
- for t in tokens:
261
- if polarity == "pos" and t in persian_positive:
262
- words.append(t)
263
- elif polarity == "neg" and t in persian_negative:
264
- words.append(t)
265
-
266
- if len(words) < min_words:
267
- for t in tokens:
268
- if t not in words:
269
- words.append(t)
270
- if len(words) >= min_words:
271
- break
272
-
273
- return words[:max(min_words, 4)]
274
-
275
 
276
- def make_weekly_plot(df: pd.DataFrame, chat_name: str):
277
- df_time = _weekly_df(df)
278
- fig = plt.figure(figsize=(12, 5))
279
 
280
- plt.plot(df_time.index, df_time["avg_sentiment"], color="red")
281
  plt.axhline(0, linestyle="--")
282
- plt.title(shape_text(f"Emotion Trajectory in Chat: {chat_name}"))
283
- plt.xlabel("Time (weeks)")
284
- plt.ylabel("Average sentiment score")
285
- plt.grid(True)
286
 
287
- if not df_time.empty:
288
- peak_week = df_time["avg_sentiment"].idxmax()
289
- low_week = df_time["avg_sentiment"].idxmin()
290
-
291
- peak_msg = _extreme_message(df, peak_week, "max")
292
- low_msg = _extreme_message(df, low_week, "min")
293
-
294
- peak_words = _extract_words_from_message(peak_msg["text"], "pos", 4) if peak_msg is not None else []
295
- low_words = _extract_words_from_message(low_msg["text"], "neg", 4) if low_msg is not None else []
296
-
297
- plt.scatter([peak_week], [df_time.loc[peak_week, "avg_sentiment"]])
298
- plt.annotate(
299
- shape_text("، ".join(peak_words)),
300
- xy=(peak_week, df_time.loc[peak_week, "avg_sentiment"]),
301
- xytext=(peak_week, df_time["avg_sentiment"].max() + 0.05),
302
- arrowprops=dict(arrowstyle="->"),
303
- ha="center",
304
- fontsize=10,
305
- )
306
-
307
- plt.scatter([low_week], [df_time.loc[low_week, "avg_sentiment"]])
308
- plt.annotate(
309
- shape_text("، ".join(low_words)),
310
- xy=(low_week, df_time.loc[low_week, "avg_sentiment"]),
311
- xytext=(low_week, df_time["avg_sentiment"].min() - 0.05),
312
- arrowprops=dict(arrowstyle="->"),
313
- ha="center",
314
- fontsize=10,
315
- )
316
 
317
  plt.tight_layout()
318
  return fig
319
 
320
 
321
- # -------------------- Main analysis (matches your notebook behavior) --------------------
322
- def analyze_selected_chat(chat: Dict[str, Any], n_trf: int = 100) -> Tuple[Dict[str, Any], Any]:
323
  df = build_df(chat)
324
- chat_name = get_chat_name(chat, "Selected chat")
325
 
326
  if df.empty:
327
- empty = {
328
- "chat_name": chat_name,
329
  "message_count": 0,
330
- "transformer_used_on_persian_messages": 0,
331
- "over
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  from __future__ import annotations
2
 
3
  import re
4
+ from typing import Any, Dict, List, Tuple
5
 
6
  import numpy as np
7
  import pandas as pd
 
30
  except LookupError:
31
  nltk.download("vader_lexicon", quiet=True)
32
 
 
 
 
 
 
 
 
 
33
 
34
  _ensure_nltk()
35
+ SIA = SentimentIntensityAnalyzer()
36
 
37
 
38
+ # -------------------- Model (Persian sentiment) --------------------
39
  MODEL_NAME = "HooshvareLab/bert-fa-base-uncased-sentiment-deepsentipers-binary"
40
  DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
41
 
42
  TOKENIZER = AutoTokenizer.from_pretrained(MODEL_NAME)
43
+ MODEL = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME).to(DEVICE)
44
+ MODEL.eval()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
 
46
 
47
  # -------------------- Telegram parsing --------------------
 
55
  for part in msg_text:
56
  if isinstance(part, str):
57
  parts.append(part)
58
+ elif isinstance(part, dict):
59
+ t = part.get("text")
60
+ if isinstance(t, str):
61
+ parts.append(t)
62
  return "".join(parts)
63
  return ""
64
 
 
70
  return lst
71
  if isinstance(data, dict) and "messages" in data and isinstance(data["messages"], list):
72
  return [data]
73
+ raise ValueError("JSON format not recognized. Need Telegram export result.json with chats.list.")
74
 
75
 
76
  def get_chat_name(chat: Dict[str, Any], fallback: str) -> str:
 
80
  return fallback
81
 
82
 
83
+ def build_df(chat: Dict[str, Any]) -> pd.DataFrame:
84
+ rows: List[Dict[str, Any]] = []
85
+ for msg in chat.get("messages", []):
86
  if not isinstance(msg, dict):
87
  continue
88
+
89
  text = extract_text(msg.get("text", "")).strip()
90
  if not text:
91
  continue
92
+
93
+ date_raw = msg.get("date")
94
+ if not isinstance(date_raw, str) or not date_raw:
95
  continue
96
+
97
  sender = msg.get("from") or msg.get("actor") or "Unknown"
98
+ if not isinstance(sender, str):
99
+ sender = str(sender)
100
+
101
+ rows.append({"date_raw": date_raw, "sender": sender, "text": text})
102
+
103
+ df = pd.DataFrame(rows)
 
 
 
 
104
  if df.empty:
105
  return df
106
 
 
109
  return df
110
 
111
 
112
+ # -------------------- Sentiment scoring --------------------
113
+ _FA_RE = re.compile(r"[\u0600-\u06FF]")
 
 
 
 
 
 
 
 
 
 
114
 
 
 
 
115
 
116
+ def is_persian(text: str) -> bool:
117
+ return bool(_FA_RE.search(text or ""))
118
 
 
 
 
 
 
119
 
120
+ def score_en_vader(text: str) -> float:
121
+ return float(SIA.polarity_scores(text)["compound"])
 
 
 
 
 
 
 
 
122
 
123
 
124
  @torch.inference_mode()
125
+ def score_fa_bert(text: str) -> float:
126
+ if not text:
127
+ return 0.0
128
  inputs = TOKENIZER(text, return_tensors="pt", truncation=True, max_length=256).to(DEVICE)
129
+ out = MODEL(**inputs)
130
  probs = torch.softmax(out.logits, dim=-1).squeeze(0)
 
131
  if probs.numel() >= 2:
132
+ score = float(probs[1] - probs[0]) # assume 0=neg, 1=pos
133
  return float(max(-1.0, min(1.0, score)))
134
  return 0.0
135
 
136
 
137
+ def compute_sentiments(df: pd.DataFrame, max_bert_persian: int = 200) -> Tuple[pd.DataFrame, int]:
138
+ """
139
+ - english: vader only
140
+ - persian: bert, but only first max_bert_persian persian messages (speed)
141
+ - for remaining persian messages: use vader (fallback) to avoid NaNs
142
+ """
143
+ if df.empty:
144
+ return df, 0
145
+
146
+ df = df.copy()
147
+ df["sentiment_final"] = np.nan
148
+
149
+ pers_mask = df["text"].astype(str).apply(is_persian)
150
+ pers_idx = df.index[pers_mask].tolist()
151
+ en_idx = df.index[~pers_mask].tolist()
152
+
153
+ # english
154
+ if en_idx:
155
+ df.loc[en_idx, "sentiment_final"] = df.loc[en_idx, "text"].astype(str).apply(score_en_vader)
156
+
157
+ # persian - bert on first N
158
+ bert_idx = pers_idx[: max(0, int(max_bert_persian))]
159
+ if bert_idx:
160
+ df.loc[bert_idx, "sentiment_final"] = df.loc[bert_idx, "text"].astype(str).apply(score_fa_bert)
161
 
162
+ # persian fallback - vader
163
+ rest_idx = [i for i in pers_idx if i not in set(bert_idx)]
164
+ if rest_idx:
165
+ df.loc[rest_idx, "sentiment_final"] = df.loc[rest_idx, "text"].astype(str).apply(score_en_vader)
166
 
167
+ df["sentiment_final"] = df["sentiment_final"].fillna(0.0)
168
+ return df, len(bert_idx)
 
 
 
 
169
 
170
 
171
+ # -------------------- Plotting --------------------
172
+ def _shape_fa(s: str) -> str:
 
173
  try:
174
+ return get_display(arabic_reshaper.reshape(str(s)))
 
175
  except Exception:
176
+ return str(s)
177
 
178
 
179
+ def make_weekly_plot(df: pd.DataFrame, chat_name: str):
180
+ fig = plt.figure(figsize=(12, 5))
181
+ plt.title(_shape_fa(f"Emotion Trajectory in Chat: {chat_name}"))
182
+ plt.xlabel("Time (weeks)")
183
+ plt.ylabel("Average sentiment score")
184
+ plt.grid(True)
185
+
186
+ if df.empty:
187
+ plt.tight_layout()
188
+ return fig
189
+
190
+ weekly = (
191
  df.set_index("date")
192
  .resample("W")["sentiment_final"]
193
  .mean()
 
194
  .dropna()
195
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
196
 
197
+ if weekly.empty:
198
+ plt.tight_layout()
199
+ return fig
200
 
201
+ plt.plot(weekly.index, weekly.values, color="red")
202
  plt.axhline(0, linestyle="--")
 
 
 
 
203
 
204
+ # mark max/min weeks
205
+ wmax = weekly.idxmax()
206
+ wmin = weekly.idxmin()
207
+ plt.scatter([wmax], [float(weekly.loc[wmax])])
208
+ plt.scatter([wmin], [float(weekly.loc[wmin])])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
209
 
210
  plt.tight_layout()
211
  return fig
212
 
213
 
214
+ # -------------------- Main API for UI --------------------
215
+ def analyze_chat(chat: Dict[str, Any], max_bert_persian: int = 200) -> Tuple[Dict[str, Any], Any]:
216
  df = build_df(chat)
217
+ name = get_chat_name(chat, "Selected chat")
218
 
219
  if df.empty:
220
+ result = {
221
+ "chat_name": name,
222
  "message_count": 0,
223
+ "bert_used_on_persian_messages": 0,
224
+ "overall_avg_sentiment": 0.0,
225
+ "weekly": [],
226
+ }
227
+ fig = make_weekly_plot(df, name)
228
+ return result, fig
229
+
230
+ df, used = compute_sentiments(df, max_bert_persian=max_bert_persian)
231
+
232
+ overall = float(df["sentiment_final"].mean())
233
+
234
+ weekly = (
235
+ df.set_index("date")
236
+ .resample("W")["sentiment_final"]
237
+ .mean()
238
+ .dropna()
239
+ .reset_index()
240
+ .rename(columns={"date": "week_end", "sentiment_final": "avg_sentiment"})
241
+ )
242
+
243
+ weekly_records = [
244
+ {"week_end": r["week_end"].isoformat(), "avg_sentiment": float(r["avg_sentiment"])}
245
+ for _, r in weekly.iterrows()
246
+ ]
247
+
248
+ result = {
249
+ "chat_name": name,
250
+ "message_count": int(len(df)),
251
+ "bert_used_on_persian_messages": int(used),
252
+ "overall_avg_sentiment": overall,
253
+ "weekly": weekly_records,
254
+ }
255
+
256
+ fig = make_weekly_plot(df, name)
257
+ return result, fig