Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import torch | |
| from transformers import pipeline as hf_pipeline, AutoModelForSequenceClassification, AutoTokenizer | |
| from PIL import Image | |
| import io | |
| import easyocr | |
| import numpy as np | |
| import pandas as pd | |
| # ——— Load and preprocess NRC EmoLex —————————————————————————————————— | |
| EMOLEX_PATH = "NRC-Emotion-Lexicon-Wordlevel-v0.92.txt" | |
| emo_raw = pd.read_csv( | |
| EMOLEX_PATH, | |
| sep="\t", | |
| names=["word","emotion","flag"], | |
| comment="#", | |
| header=None | |
| ) | |
| emo_df = ( | |
| emo_raw | |
| .pivot(index="word", columns="emotion", values="flag") | |
| .fillna(0) | |
| .astype(int) | |
| ) | |
| EMOLEX = emo_df.to_dict(orient="index") | |
| def score_emolex(text_lower): | |
| counts = {emo: 0 for emo in emo_df.columns} | |
| for tok in text_lower.split(): | |
| if tok in EMOLEX: | |
| for emo, flag in EMOLEX[tok].items(): | |
| counts[emo] += flag | |
| return counts | |
| # ——— Load MPQA Subjectivity Lexicon ————————————————————————————————————————————— | |
| MPQA_PATH = "subjclueslen1-HLTEMNLP05.tff" | |
| mpqa_lex = {} | |
| with open(MPQA_PATH, encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line or line.startswith("#"): | |
| continue | |
| # build fields dict but skip any token without '=' | |
| fields = {} | |
| for item in line.split(): | |
| if "=" not in item: | |
| continue | |
| key, val = item.split("=", 1) | |
| fields[key] = val | |
| # must have word1 | |
| if "word1" not in fields: | |
| continue | |
| w = fields.pop("word1").lower() | |
| mpqa_lex.setdefault(w, []).append(fields) | |
| # ——— 1) Emotion Pipeline ———————————————————————————————————————————————— | |
| emotion_pipeline = hf_pipeline( | |
| "text-classification", | |
| model="j-hartmann/emotion-english-distilroberta-base", | |
| top_k=None, | |
| truncation=True | |
| ) | |
| def get_emotion_profile(text): | |
| results = emotion_pipeline(text) | |
| if isinstance(results, list) and isinstance(results[0], list): | |
| results = results[0] | |
| return {r["label"].lower(): round(r["score"], 3) for r in results} | |
| APOLOGY_KEYWORDS = ["sorry", "apology", "forgive"] | |
| # ——— 2) Abuse-Patterns Model —————————————————————————————————————————————— | |
| model_name = "SamanthaStorm/tether-multilabel-v3" | |
| model = AutoModelForSequenceClassification.from_pretrained(model_name) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False) | |
| LABELS = [ | |
| "blame shifting", "contradictory statements", "control", "dismissiveness", | |
| "gaslighting", "guilt tripping", "insults", "obscure language", | |
| "projection", "recovery phase", "threat" | |
| ] | |
| THRESHOLDS = { | |
| "blame shifting": 0.28, | |
| "contradictory statements": 0.27, | |
| "control": 0.08, | |
| "dismissiveness": 0.32, | |
| "gaslighting": 0.27, | |
| "guilt tripping": 0.31, | |
| "insults": 0.10, | |
| "obscure language": 0.55, | |
| "projection": 0.09, | |
| "recovery phase": 0.33, | |
| "threat": 0.15 | |
| } | |
| # ——— 3) Initialize EasyOCR reader ———————————————————————————————————————————— | |
| ocr_reader = easyocr.Reader(["en"], gpu=False) | |
| # ——— 4) Emotional-Tone Tagging ————————————————————————————————————————————— | |
| def get_emotional_tone_tag(emotion_profile, patterns, text_lower): | |
| sadness = emotion_profile.get("sadness", 0) | |
| joy = emotion_profile.get("joy", 0) | |
| neutral = emotion_profile.get("neutral", 0) | |
| disgust = emotion_profile.get("disgust", 0) | |
| anger = emotion_profile.get("anger", 0) | |
| fear = emotion_profile.get("fear", 0) | |
| # NRC-EmoLex counts | |
| words = text_lower.split() | |
| lex_counts = { | |
| emo: sum(EMOLEX.get(w, {}).get(emo, 0) for w in words) | |
| for emo in ["anger","joy","sadness","fear","disgust"] | |
| } | |
| # MPQA counts | |
| mpqa_counts = {"strongsubj":0,"weaksubj":0,"positive":0,"negative":0} | |
| for w in words: | |
| for entry in mpqa_lex.get(w, []): | |
| mpqa_counts[entry["type"]] += 1 | |
| mpqa_counts[entry["priorpolarity"]] += 1 | |
| # 0. Support override | |
| if lex_counts["joy"] > 0 and any(k in text_lower for k in ["support","hope","grace"]): | |
| return "supportive" | |
| if sadness > 0.4 \ | |
| and any(p in patterns for p in ["blame shifting","guilt tripping","recovery phase"]): | |
| return "performative regret" | |
| # 2. Coercive Warmth | |
| if (joy > 0.3 or sadness > 0.4) \ | |
| and (lex_counts["joy"] > 0 or lex_counts["sadness"] > 0) \ | |
| and any(p in patterns for p in ["control","gaslighting"]): | |
| return "coercive warmth" | |
| # 3. Cold Invalidation | |
| if (neutral + disgust) > 0.5 \ | |
| and lex_counts["disgust"] > 0 \ | |
| and any(p in patterns for p in ["dismissiveness","projection","obscure language"]): | |
| return "cold invalidation" | |
| # 4. Genuine Vulnerability | |
| if (sadness + fear) > 0.5 \ | |
| and lex_counts["sadness"] > 0 and lex_counts["fear"] > 0 \ | |
| and all(p == "recovery phase" for p in patterns): | |
| return "genuine vulnerability" | |
| # 5. Emotional Threat | |
| if (anger + disgust) > 0.5 \ | |
| and (lex_counts["anger"] > 0 or lex_counts["disgust"] > 0) \ | |
| and any(p in patterns for p in ["control","threat","insults","dismissiveness"]): | |
| return "emotional threat" | |
| # 6. Weaponized Sadness | |
| if sadness > 0.6 \ | |
| and lex_counts["sadness"] > 0 \ | |
| and any(p in patterns for p in ["guilt tripping","projection"]): | |
| return "weaponized sadness" | |
| # 7. Toxic Resignation | |
| if neutral > 0.5 \ | |
| and any(p in patterns for p in ["dismissiveness","obscure language"]) \ | |
| and lex_counts["disgust"] == 0: | |
| return "toxic resignation" | |
| # 8. Indignant Reproach | |
| if anger > 0.5 \ | |
| and lex_counts["anger"] > 0 \ | |
| and any(p in patterns for p in ["guilt tripping","contradictory statements"]): | |
| return "indignant reproach" | |
| # 9. Confrontational | |
| if anger > 0.6 \ | |
| and lex_counts["anger"] > 0 \ | |
| and patterns: | |
| return "confrontational" | |
| # 10. Passive Aggression | |
| if neutral > 0.6 \ | |
| and lex_counts["disgust"] > 0 \ | |
| and any(p in patterns for p in ["dismissiveness","projection"]): | |
| return "passive aggression" | |
| # 11. Sarcastic Mockery | |
| if joy > 0.3 \ | |
| and lex_counts["joy"] > 0 \ | |
| and "insults" in patterns: | |
| return "sarcastic mockery" | |
| # 12. Menacing Threat | |
| if fear > 0.3 \ | |
| and lex_counts["fear"] > 0 \ | |
| and "threat" in patterns: | |
| return "menacing threat" | |
| # 13. Pleading Concern | |
| if sadness > 0.3 \ | |
| and lex_counts["sadness"] > 0 \ | |
| and any(k in text_lower for k in APOLOGY_KEYWORDS) \ | |
| and not patterns: | |
| return "pleading concern" | |
| # 14. Fear-mongering | |
| if (fear + disgust) > 0.5 \ | |
| and lex_counts["fear"] > 0 \ | |
| and "projection" in patterns: | |
| return "fear-mongering" | |
| # 16. Empathetic Solidarity | |
| if joy > 0.2 and sadness > 0.2 \ | |
| and lex_counts["joy"] > 0 and lex_counts["sadness"] > 0 \ | |
| and not patterns: | |
| return "empathetic solidarity" | |
| # 17. Assertive Boundary | |
| if anger > 0.4 \ | |
| and lex_counts["anger"] > 0 \ | |
| and "control" in patterns: | |
| return "assertive boundary" | |
| # 18. Stonewalling | |
| if neutral > 0.7 \ | |
| and lex_counts["disgust"] == 0 \ | |
| and not patterns: | |
| return "stonewalling" | |
| return None | |
| # ——— 5) Single-message analysis ——————————————————————————————————————————— | |
| def analyze_message(text): | |
| text_lower = text.lower() | |
| emotion_profile = get_emotion_profile(text) | |
| # blend in NRC-EmoLex | |
| lex_counts = score_emolex(text_lower) | |
| max_lex = max(lex_counts.values()) or 1.0 | |
| lex_scores = {emo: cnt/max_lex for emo, cnt in lex_counts.items()} | |
| for emo in emotion_profile: | |
| emotion_profile[emo] = max(emotion_profile[emo], lex_scores.get(emo,0)) | |
| # abuse-patterns | |
| toks = tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
| with torch.no_grad(): | |
| logits = model(**toks).logits.squeeze(0) | |
| scores = torch.sigmoid(logits).cpu().numpy() | |
| active_patterns = [lab for lab, sc in zip(LABELS, scores) if sc >= THRESHOLDS[lab]] | |
| if any(k in text_lower for k in APOLOGY_KEYWORDS) and "recovery phase" not in active_patterns: | |
| active_patterns.append("recovery phase") | |
| tone_tag = get_emotional_tone_tag(emotion_profile, active_patterns, text_lower) | |
| return { | |
| "emotion_profile": emotion_profile, | |
| "active_patterns": active_patterns, | |
| "tone_tag": tone_tag | |
| } | |
| # ——— 6) Composite wrapper ——————————————————————————————————————————————— | |
| def analyze_composite(uploaded_file, *texts): | |
| outputs = [] | |
| # file handling / OCR | |
| if uploaded_file is not None: | |
| try: | |
| raw = uploaded_file.read() | |
| except: | |
| with open(uploaded_file, "rb") as f: | |
| raw = f.read() | |
| name = uploaded_file.name.lower() if hasattr(uploaded_file,"name") else uploaded_file.lower() | |
| if name.endswith((".png",".jpg",".jpeg",".bmp",".gif",".tiff")): | |
| img = Image.open(io.BytesIO(raw)) | |
| arr = np.array(img.convert("RGB")) | |
| content = "\n".join(ocr_reader.readtext(arr, detail=0)) | |
| else: | |
| try: | |
| content = raw.decode("utf-8") | |
| except UnicodeDecodeError: | |
| content = raw.decode("latin-1") | |
| r = analyze_message(content) | |
| outputs.append( | |
| "── Uploaded File ──\n" | |
| f"Emotion Profile : {r['emotion_profile']}\n" | |
| f"Active Patterns : {r['active_patterns']}\n" | |
| f"Emotional Tone : {r['tone_tag']}\n" | |
| ) | |
| # free-text messages | |
| for idx, txt in enumerate(texts, start=1): | |
| if not txt: | |
| continue | |
| r = analyze_message(txt) | |
| outputs.append( | |
| f"── Message {idx} ──\n" | |
| f"Emotion Profile : {r['emotion_profile']}\n" | |
| f"Active Patterns : {r['active_patterns']}\n" | |
| f"Emotional Tone : {r['tone_tag']}\n" | |
| ) | |
| if not outputs: | |
| return "Please enter at least one message." | |
| return "\n".join(outputs) | |
| # ——— 7) Gradio interface ——————————————————————————————————————————————— | |
| message_inputs = [gr.Textbox(label="Message")] | |
| iface = gr.Interface( | |
| fn=analyze_composite, | |
| inputs=[gr.File(file_types=[".txt",".png",".jpg",".jpeg"], label="Upload text or image")] + message_inputs, | |
| outputs=gr.Textbox(label="Analysis"), | |
| title="Tether Analyzer (extended tone tags)", | |
| description="Emotion profiling, pattern tags, and a wide set of nuanced tone categories—no abuse score or DARVO." | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch() |