Sajjadistic commited on
Commit
f4b0a68
·
verified ·
1 Parent(s): c2b5d94

Update analysis_core.py

Browse files
Files changed (1) hide show
  1. analysis_core.py +66 -59
analysis_core.py CHANGED
@@ -1,7 +1,7 @@
1
  from __future__ import annotations
2
 
3
  import re
4
- from collections import Counter
5
  from typing import Any, Dict, List, Tuple
6
 
7
  import numpy as np
@@ -45,7 +45,7 @@ MODEL = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME).to(DEVICE
45
  MODEL.eval()
46
 
47
 
48
- # -------------------- Lexicons --------------------
49
  persian_positive = {
50
  "خوب","عالی","عالیه","باحال","قشنگ","زیبا","خوشحال","شاد","مرسی","ممنون","ممنونم",
51
  "دمت","دمتگرم","دمت گرم","ایول","آفرین","دوستت","دوستت_دارم","دوستتدارم","عزیزم",
@@ -159,16 +159,17 @@ def score_en_vader(text: str) -> float:
159
 
160
 
161
  @torch.inference_mode()
162
- def score_fa_bert(text: str) -> float:
163
- if not text:
164
- return 0.0
165
- inputs = TOKENIZER(text, return_tensors="pt", truncation=True, max_length=256).to(DEVICE)
166
- out = MODEL(**inputs)
167
- probs = torch.softmax(out.logits, dim=-1).squeeze(0)
168
- if probs.numel() >= 2:
169
- score = float(probs[1] - probs[0]) # assume 0=neg, 1=pos
170
- return float(max(-1.0, min(1.0, score)))
171
- return 0.0
 
172
 
173
 
174
  def persian_lexicon_score(text: str) -> float:
@@ -193,12 +194,16 @@ def persian_lexicon_score(text: str) -> float:
193
  return float((pos - neg) / max(1, (pos + neg)))
194
 
195
 
196
- def compute_sentiments(df: pd.DataFrame, max_bert_persian: int = 200) -> Tuple[pd.DataFrame, int]:
 
 
 
 
197
  if df.empty:
198
  return df, 0
199
 
200
  df = df.copy()
201
- df["sentiment_final"] = np.nan
202
 
203
  pers_mask = df["text"].astype(str).apply(is_persian)
204
  pers_idx = df.index[pers_mask].tolist()
@@ -212,7 +217,9 @@ def compute_sentiments(df: pd.DataFrame, max_bert_persian: int = 200) -> Tuple[p
212
 
213
  bert_idx = pers_idx[: max(0, int(max_bert_persian))]
214
  if bert_idx:
215
- df.loc[bert_idx, "sentiment_final"] = df.loc[bert_idx, "text"].astype(str).apply(score_fa_bert)
 
 
216
 
217
  df["sentiment_final"] = df["sentiment_final"].fillna(0.0)
218
  return df, len(bert_idx)
@@ -243,9 +250,9 @@ def extract_lex_words_from_text(text: str, polarity: str, min_words: int = 4) ->
243
 
244
  out: List[str] = []
245
  for t in tokens:
246
- if polarity == "pos" and t in persian_positive:
247
  out.append(t)
248
- if polarity == "neg" and t in persian_negative:
249
  out.append(t)
250
 
251
  if len(out) < min_words:
@@ -278,7 +285,7 @@ def make_weekly_plot(df: pd.DataFrame, chat_name: str):
278
  "low_word_main": None,
279
  }
280
 
281
- fig = plt.figure(figsize=(18, 7))
282
  plt.title(_shape_fa(f"Emotion Trajectory in Chat: {chat_name}"))
283
  plt.xlabel("Time (weeks)")
284
  plt.ylabel("Average sentiment score")
@@ -292,18 +299,16 @@ def make_weekly_plot(df: pd.DataFrame, chat_name: str):
292
  x = ws.index
293
  y = ws.values.astype(float)
294
 
295
- q_lo = float(np.quantile(y, 0.05))
296
- q_hi = float(np.quantile(y, 0.95))
297
- if q_hi - q_lo < 0.15:
298
- q_lo = min(q_lo, 0.0) - 0.1
299
- q_hi = max(q_hi, 0.0) + 0.1
300
- pad = 0.08 * (q_hi - q_lo)
301
- y_min_plot = q_lo - pad
302
- y_max_plot = q_hi + pad
303
 
304
  plt.plot(x, y, color="red", linewidth=2, marker="o", markersize=4)
305
  plt.ylim(y_min_plot, y_max_plot)
306
- plt.margins(x=0.05, y=0.2)
307
 
308
  peak_week = ws.idxmax()
309
  low_week = ws.idxmin()
@@ -335,62 +340,63 @@ def make_weekly_plot(df: pd.DataFrame, chat_name: str):
335
  plt.annotate(
336
  _shape_fa("، ".join(peak_words) + f" (peak={peak_y:.3f})"),
337
  xy=(peak_week, peak_y_plot),
338
- xytext=(peak_week, y_max_plot + 0.08 * (y_max_plot - y_min_plot)),
339
  arrowprops=dict(arrowstyle="->"),
340
  ha="center",
341
- fontsize=9,
342
  )
343
 
344
  plt.scatter([low_week], [low_y_plot])
345
  plt.annotate(
346
  _shape_fa("، ".join(low_words) + f" (low={low_y:.3f})"),
347
  xy=(low_week, low_y_plot),
348
- xytext=(low_week, y_min_plot - 0.08 * (y_max_plot - y_min_plot)),
349
  arrowprops=dict(arrowstyle="->"),
350
  ha="center",
351
- fontsize=9,
352
  )
353
 
354
  plt.tight_layout()
355
  return fig, info
356
 
357
 
358
- # -------------------- Top lex words with score --------------------
359
- def top_lex_words(df: pd.DataFrame, top_n: int = 5) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
360
- tokens_all: List[str] = []
361
- for t in df["text"].astype(str).tolist():
362
- tokens_all.extend([w for w in custom_tokenize(t) if len(w) > 1 and w not in stopwords_all])
 
 
 
363
 
364
- if not tokens_all:
365
- return [], []
366
-
367
- total = len(tokens_all)
368
- cnt = Counter(tokens_all)
369
 
370
- pos_items = []
371
- for w in persian_positive:
372
- c = cnt.get(w, 0)
373
- if c > 0:
374
- pos_items.append((w, c / total))
375
 
376
- neg_items = []
377
- for w in persian_negative:
378
- c = cnt.get(w, 0)
379
- if c > 0:
380
- neg_items.append((w, -c / total))
 
381
 
382
- pos_items.sort(key=lambda x: x[1], reverse=True)
383
- neg_items.sort(key=lambda x: x[1]) # more negative first
384
 
385
- pos_top = [{"word": w, "score": float(s)} for w, s in pos_items[:top_n]]
386
- neg_top = [{"word": w, "score": float(s)} for w, s in neg_items[:top_n]]
387
  return pos_top, neg_top
388
 
389
 
390
  # -------------------- Main entry --------------------
391
  def analyze_chat(
392
  chat: Dict[str, Any],
393
- max_bert_persian: int = 200
394
  ) -> Tuple[Dict[str, Any], Any, List[Dict[str, Any]], List[Dict[str, Any]]]:
395
 
396
  df = build_df(chat)
@@ -410,12 +416,12 @@ def analyze_chat(
410
  "top5_positive_lex": [],
411
  "top5_negative_lex": [],
412
  }
413
- fig = plt.figure(figsize=(18, 7))
414
  plt.title(_shape_fa(f"Emotion Trajectory in Chat: {name}"))
415
  plt.tight_layout()
416
  return empty, fig, [], []
417
 
418
- df, used = compute_sentiments(df, max_bert_persian=max_bert_persian)
419
 
420
  ws = weekly_series(df)
421
  weekly_records = [{"week_end": idx.isoformat(), "avg_sentiment": float(val)} for idx, val in ws.items()]
@@ -423,7 +429,8 @@ def analyze_chat(
423
 
424
  fig, info = make_weekly_plot(df, name)
425
 
426
- pos_top, neg_top = top_lex_words(df, top_n=5)
 
427
 
428
  result = {
429
  "chat_name": name,
 
1
  from __future__ import annotations
2
 
3
  import re
4
+ from collections import Counter, defaultdict
5
  from typing import Any, Dict, List, Tuple
6
 
7
  import numpy as np
 
45
  MODEL.eval()
46
 
47
 
48
+ # -------------------- Lexicons (small helper lists; not the only source anymore) --------------------
49
  persian_positive = {
50
  "خوب","عالی","عالیه","باحال","قشنگ","زیبا","خوشحال","شاد","مرسی","ممنون","ممنونم",
51
  "دمت","دمتگرم","دمت گرم","ایول","آفرین","دوستت","دوستت_دارم","دوستتدارم","عزیزم",
 
159
 
160
 
161
  @torch.inference_mode()
162
+ def score_fa_bert_batch(texts: List[str], batch_size: int = 16) -> List[float]:
163
+ scores: List[float] = []
164
+ for i in range(0, len(texts), batch_size):
165
+ chunk = texts[i:i + batch_size]
166
+ inputs = TOKENIZER(chunk, return_tensors="pt", truncation=True, padding=True, max_length=256).to(DEVICE)
167
+ out = MODEL(**inputs)
168
+ probs = torch.softmax(out.logits, dim=-1)
169
+ diff = (probs[:, 1] - probs[:, 0]).detach().cpu().numpy().astype(float).tolist()
170
+ diff = [float(max(-1.0, min(1.0, d))) for d in diff]
171
+ scores.extend(diff)
172
+ return scores
173
 
174
 
175
  def persian_lexicon_score(text: str) -> float:
 
194
  return float((pos - neg) / max(1, (pos + neg)))
195
 
196
 
197
+ def compute_sentiments(df: pd.DataFrame, max_bert_persian: int = 500, bert_batch_size: int = 16) -> Tuple[pd.DataFrame, int]:
198
+ """
199
+ English messages: VADER
200
+ Persian messages: BERT on first N Persian messages (batched for speed), lexicon for the rest
201
+ """
202
  if df.empty:
203
  return df, 0
204
 
205
  df = df.copy()
206
+ df["sentiment_final"] = 0.0
207
 
208
  pers_mask = df["text"].astype(str).apply(is_persian)
209
  pers_idx = df.index[pers_mask].tolist()
 
217
 
218
  bert_idx = pers_idx[: max(0, int(max_bert_persian))]
219
  if bert_idx:
220
+ texts = df.loc[bert_idx, "text"].astype(str).tolist()
221
+ scores = score_fa_bert_batch(texts, batch_size=int(bert_batch_size))
222
+ df.loc[bert_idx, "sentiment_final"] = scores
223
 
224
  df["sentiment_final"] = df["sentiment_final"].fillna(0.0)
225
  return df, len(bert_idx)
 
250
 
251
  out: List[str] = []
252
  for t in tokens:
253
+ if polarity == "pos" and (t in persian_positive):
254
  out.append(t)
255
+ if polarity == "neg" and (t in persian_negative):
256
  out.append(t)
257
 
258
  if len(out) < min_words:
 
285
  "low_word_main": None,
286
  }
287
 
288
+ fig = plt.figure(figsize=(22, 8))
289
  plt.title(_shape_fa(f"Emotion Trajectory in Chat: {chat_name}"))
290
  plt.xlabel("Time (weeks)")
291
  plt.ylabel("Average sentiment score")
 
299
  x = ws.index
300
  y = ws.values.astype(float)
301
 
302
+ # IMPORTANT: show true peaks (use full min/max range)
303
+ y_min = float(np.min(y))
304
+ y_max = float(np.max(y))
305
+ pad = 0.08 * max(1e-9, (y_max - y_min))
306
+ y_min_plot = y_min - pad
307
+ y_max_plot = y_max + pad
 
 
308
 
309
  plt.plot(x, y, color="red", linewidth=2, marker="o", markersize=4)
310
  plt.ylim(y_min_plot, y_max_plot)
311
+ plt.margins(x=0.03, y=0.15)
312
 
313
  peak_week = ws.idxmax()
314
  low_week = ws.idxmin()
 
340
  plt.annotate(
341
  _shape_fa("، ".join(peak_words) + f" (peak={peak_y:.3f})"),
342
  xy=(peak_week, peak_y_plot),
343
+ xytext=(peak_week, y_max_plot + 0.06 * (y_max_plot - y_min_plot)),
344
  arrowprops=dict(arrowstyle="->"),
345
  ha="center",
346
+ fontsize=10,
347
  )
348
 
349
  plt.scatter([low_week], [low_y_plot])
350
  plt.annotate(
351
  _shape_fa("، ".join(low_words) + f" (low={low_y:.3f})"),
352
  xy=(low_week, low_y_plot),
353
+ xytext=(low_week, y_min_plot - 0.06 * (y_max_plot - y_min_plot)),
354
  arrowprops=dict(arrowstyle="->"),
355
  ha="center",
356
+ fontsize=10,
357
  )
358
 
359
  plt.tight_layout()
360
  return fig, info
361
 
362
 
363
+ # -------------------- Weighted top words (fixes "no negative words") --------------------
364
+ def top_words_weighted_by_sentiment(df: pd.DataFrame, top_n: int = 5) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
365
+ """
366
+ Extracts top positive/negative words by weighting tokens with message sentiment magnitude.
367
+ This does NOT depend on a tiny predefined negative list, so you will get negative results if chat has negativity.
368
+ """
369
+ pos_w = defaultdict(float)
370
+ neg_w = defaultdict(float)
371
 
372
+ for text, s in zip(df["text"].astype(str).tolist(), df["sentiment_final"].astype(float).tolist()):
373
+ tokens = [t for t in custom_tokenize(text) if len(t) > 1 and t not in stopwords_all]
374
+ if not tokens:
375
+ continue
 
376
 
377
+ mag = float(abs(s))
378
+ if mag < 1e-9:
379
+ continue
 
 
380
 
381
+ if s > 0:
382
+ for t in tokens:
383
+ pos_w[t] += mag
384
+ elif s < 0:
385
+ for t in tokens:
386
+ neg_w[t] += mag
387
 
388
+ pos_items = sorted(pos_w.items(), key=lambda x: x[1], reverse=True)[:top_n]
389
+ neg_items = sorted(neg_w.items(), key=lambda x: x[1], reverse=True)[:top_n]
390
 
391
+ pos_top = [{"word": w, "score": float(v)} for w, v in pos_items]
392
+ neg_top = [{"word": w, "score": float(-v)} for w, v in neg_items] # negative sign to show negativity
393
  return pos_top, neg_top
394
 
395
 
396
  # -------------------- Main entry --------------------
397
  def analyze_chat(
398
  chat: Dict[str, Any],
399
+ max_bert_persian: int = 500
400
  ) -> Tuple[Dict[str, Any], Any, List[Dict[str, Any]], List[Dict[str, Any]]]:
401
 
402
  df = build_df(chat)
 
416
  "top5_positive_lex": [],
417
  "top5_negative_lex": [],
418
  }
419
+ fig = plt.figure(figsize=(22, 8))
420
  plt.title(_shape_fa(f"Emotion Trajectory in Chat: {name}"))
421
  plt.tight_layout()
422
  return empty, fig, [], []
423
 
424
+ df, used = compute_sentiments(df, max_bert_persian=max_bert_persian, bert_batch_size=16)
425
 
426
  ws = weekly_series(df)
427
  weekly_records = [{"week_end": idx.isoformat(), "avg_sentiment": float(val)} for idx, val in ws.items()]
 
429
 
430
  fig, info = make_weekly_plot(df, name)
431
 
432
+ # IMPORTANT: this fixes "no negative words" even when negatives exist
433
+ pos_top, neg_top = top_words_weighted_by_sentiment(df, top_n=5)
434
 
435
  result = {
436
  "chat_name": name,