File size: 10,858 Bytes
d23039a
 
 
fba30db
d23039a
 
 
 
fba30db
d23039a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fba30db
d23039a
 
 
 
 
 
fba30db
d23039a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9020a5a
 
d23039a
 
 
 
 
 
 
 
 
 
 
 
fba30db
 
d23039a
 
 
 
 
 
 
 
 
fba30db
d23039a
 
 
fba30db
d23039a
 
 
 
 
 
 
 
 
 
 
 
fba30db
d23039a
 
fba30db
d23039a
 
 
fba30db
 
 
 
 
 
d23039a
 
fba30db
 
 
 
 
 
 
 
 
 
 
 
 
 
d23039a
 
 
 
 
 
 
 
 
fba30db
 
 
 
d23039a
fba30db
d23039a
 
 
fba30db
d23039a
fba30db
d23039a
 
 
 
 
 
 
 
 
 
 
 
 
fba30db
d23039a
 
 
 
 
 
 
 
 
 
 
 
 
fba30db
d23039a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fba30db
d23039a
 
 
 
 
 
 
fba30db
d23039a
 
 
 
fba30db
d23039a
 
 
 
9020a5a
f51c5bd
ed9f9c4
f51c5bd
d23039a
 
 
 
 
 
 
 
9020a5a
 
 
d23039a
 
 
9020a5a
f51c5bd
 
 
 
 
 
 
 
 
 
d23039a
fba30db
d23039a
 
 
 
 
fba30db
 
 
 
f51c5bd
 
 
d23039a
9020a5a
d23039a
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
from __future__ import annotations

import re
from dataclasses import dataclass
from typing import List, Optional

from loguru import logger

from config import settings
from models.model_loader import get_model_loader

FAKE_TOKENS = ("fake", "false", "unreliable", "misinformation")

CLICKBAIT_PATTERNS = [
    (r"\byou won'?t believe\b", "clickbait"),
    (r"\bbreaking\s*:", "clickbait"),
    (r"\bshocking\s*:", "clickbait"),
    (r"\bexclusive\s*:", "clickbait"),
    (r"\bjust\s+in\s*:", "clickbait"),
    (r"\burgent\s*:", "clickbait"),
    (r"\bwhat\s+happens\s+next\b", "clickbait"),
    (r"\bthis\s+will\s+change\b", "clickbait"),
    (r"\b(?:everyone|nobody)\s+(?:is|was)\s+talking\b", "clickbait"),
]

EMOTIONAL_WORDS = {
    "outrage", "shocking", "horrifying", "disgusting", "amazing", "incredible",
    "unbelievable", "devastating", "terrifying", "explosive", "bombshell",
    "jaw-dropping", "heartbreaking", "furious", "scandal", "crisis",
    "chaos", "destroyed", "slammed", "blasted", "exposed", "revealed",
}

SUPERLATIVES = {
    "best", "worst", "greatest", "biggest", "most", "least",
    "fastest", "deadliest", "largest", "smallest", "ultimate",
}

MANIPULATION_PATTERNS = [
    (r"\bsources?\s+(?:say|said|claim|report)\b", "unverified_claim", "medium",
     "Unverified source attribution without specific citation"),
    (r"\ballegedly\b", "unverified_claim", "low",
     "Hedging language suggests unverified information"),
    (r"\breports?\s+suggest\b", "unverified_claim", "medium",
     "Vague report attribution"),
    (r"\baccording\s+to\s+(?:some|many|several)\b", "unverified_claim", "medium",
     "Non-specific source attribution"),
    (r"\brunconfirmed\b", "unverified_claim", "medium",
     "Explicitly unconfirmed information"),
    (r"\boutrage\b", "emotional_manipulation", "medium",
     "Emotional trigger word designed to provoke reaction"),
    (r"\bshocking\s+truth\b", "emotional_manipulation", "high",
     "Sensationalist phrase designed to manipulate reader emotion"),
    (r"\bwake\s+up\b", "emotional_manipulation", "medium",
     "Call-to-action implying hidden knowledge"),
    (r"\bthey\s+don'?t\s+want\s+you\s+to\s+know\b", "emotional_manipulation", "high",
     "Conspiracy framing language"),
    (r"\bopen\s+your\s+eyes\b", "emotional_manipulation", "medium",
     "Implies audience ignorance"),
    (r"\bexperts?\s+(?:confirm|say|agree|warn)\b", "false_authority", "medium",
     "Unnamed expert citation without specific attribution"),
    (r"\bscientists?\s+(?:confirm|prove|say)\b", "false_authority", "medium",
     "Unnamed scientist citation"),
    (r"\bstudies?\s+(?:show|prove|confirm)\b", "false_authority", "low",
     "Vague study reference without citation"),
    (r"\beveryone\s+knows\b", "false_authority", "medium",
     "Appeal to common knowledge fallacy"),
    (r"\bit'?s\s+(?:a\s+)?(?:well-?known|proven)\s+fact\b", "false_authority", "medium",
     "Assertion of fact without evidence"),
]

_NER_PREFERRED = {"PERSON", "ORG", "GPE", "EVENT", "PRODUCT", "NORP"}
# Cardinal numbers (counts, amounts) included in news queries only when short and digit-only
_NER_NUMERIC = {"CARDINAL", "MONEY", "QUANTITY"}


@dataclass
class TextClassification:
    label: str
    confidence: float
    fake_prob: float
    all_scores: dict[str, float]


@dataclass
class SensationalismResult:
    score: int
    level: str
    exclamation_count: int
    caps_word_count: int
    clickbait_matches: int
    emotional_word_count: int
    superlative_count: int


@dataclass
class ManipulationIndicator:
    pattern_type: str
    matched_text: str
    start_pos: int
    end_pos: int
    severity: str
    description: str


def detect_language(text: str) -> str:
    if not text or len(text.strip()) < 10:
        return "en"
    try:
        from langdetect import detect  # type: ignore
        lang = detect(text.strip())
        logger.info(f"Language detected: {lang}")
        return lang
    except ImportError:
        logger.debug("langdetect not installed - defaulting to 'en'")
        return "en"
    except Exception as e:
        logger.debug(f"Language detection failed: {e} - defaulting to 'en'")
        return "en"


def _scores_to_classification(items, *, allow_label0_fallback: bool = True) -> TextClassification:
    """Convert pipeline output to TextClassification.

    Prefer semantic fake labels. The bundled jy46604790 model uses
    LABEL_0=fake/LABEL_1=real, but arbitrary replacement models may not.
    """
    scores = {i["label"]: float(i["score"]) for i in items}
    top_label, top_conf = max(scores.items(), key=lambda kv: kv[1])

    fake_prob = max(
        (p for lbl, p in scores.items() if any(t in lbl.lower() for t in FAKE_TOKENS)),
        default=None,
    )
    if fake_prob is None:
        if allow_label0_fallback and "LABEL_0" in scores and "LABEL_1" in scores:
            fake_prob = scores["LABEL_0"]
        else:
            logger.warning(f"Could not infer fake label from text model labels: {list(scores)}")
            top_label = "uncertain_label_mapping"
            top_conf = 0.0
            fake_prob = 0.5

    return TextClassification(top_label, top_conf, fake_prob, scores)


def classify_text(text: str, language: Optional[str] = None) -> TextClassification:
    text = (text or "").strip()
    if not text:
        return TextClassification("unknown", 0.0, 0.0, {})

    loader = get_model_loader()
    is_non_english = bool(language and language != "en")
    if is_non_english and not settings.TEXT_MULTILANG_MODEL_ID:
        logger.warning(f"No multilingual text model configured for language={language}; returning uncertain score")
        return TextClassification("unsupported_language", 0.0, 0.5, {})

    pipe = loader.load_multilang_text_model() if is_non_english else loader.load_text_model()

    out = pipe(text[:2000], truncation=True, top_k=None)
    items = out[0] if isinstance(out[0], list) else out
    clf = _scores_to_classification(items, allow_label0_fallback=not is_non_english)
    logger.info(
        f"Text classify [{language or 'en'}] -> {clf.label} @ {clf.confidence:.3f} "
        f"fake_p={clf.fake_prob:.3f}"
    )
    return clf


def score_sensationalism(text: str) -> SensationalismResult:
    if not text:
        return SensationalismResult(0, "Low", 0, 0, 0, 0, 0)

    words = text.split()
    total_words = max(len(words), 1)
    excl = text.count("!")
    caps = sum(1 for w in words if w.isupper() and len(w) > 2)
    clickbait = sum(1 for pat, _ in CLICKBAIT_PATTERNS if re.search(pat, text, re.IGNORECASE))
    emotional = sum(1 for w in words if w.lower().strip(".,!?;:") in EMOTIONAL_WORDS)
    superlative = sum(1 for w in words if w.lower().strip(".,!?;:") in SUPERLATIVES)

    raw = (
        min(excl * 8, 25)
        + min(caps / total_words * 200, 25)
        + min(clickbait * 12, 25)
        + min(emotional * 6, 15)
        + min(superlative * 5, 10)
    )
    score = int(min(100, max(0, raw)))
    level = "Low" if score < 30 else ("Medium" if score < 60 else "High")

    logger.info(f"Sensationalism -> {score} ({level}) excl={excl} caps={caps} cb={clickbait} emo={emotional}")
    return SensationalismResult(score, level, excl, caps, clickbait, emotional, superlative)


def detect_manipulation_indicators(text: str) -> List[ManipulationIndicator]:
    if not text:
        return []
    indicators: List[ManipulationIndicator] = []
    for pattern, ptype, severity, description in MANIPULATION_PATTERNS:
        for m in re.finditer(pattern, text, re.IGNORECASE):
            indicators.append(ManipulationIndicator(
                pattern_type=ptype,
                matched_text=m.group(),
                start_pos=m.start(),
                end_pos=m.end(),
                severity=severity,
                description=description,
            ))
    indicators.sort(key=lambda i: i.start_pos)
    logger.info(f"Manipulation indicators -> {len(indicators)} found")
    return indicators


def extract_entities(text: str, max_k: int = 6) -> List[str]:
    if not text or len(text.strip()) < 20:
        return _extract_keywords_freq(text, max_k)

    nlp = get_model_loader().load_spacy_nlp()
    if nlp is None:
        return _extract_keywords_freq(text, max_k)

    try:
        doc = nlp(text[:5000])
        preferred: List[str] = []
        other: List[str] = []
        seen: set[str] = set()

        numeric: List[str] = []
        


        for ent in doc.ents:
            norm = ent.text.strip()
            norm_lower = norm.lower()
            if not norm or norm_lower in seen or len(norm) < 2:
                continue
            seen.add(norm_lower)
            if ent.label_ in _NER_PREFERRED:
                preferred.append(norm)
            elif ent.label_ in _NER_NUMERIC and norm.replace(",", "").isdigit() and len(norm) <= 6:
                # Include small cardinal numbers (e.g. "38", "55") — they're key facts
                numeric.append(norm)
            else:
                other.append(norm)

        entities = preferred + numeric + other
        if len(entities) < max_k:
            freq_kws = _extract_keywords_freq(text, max_k * 2)
            for k in freq_kws:
                if k.lower() not in seen:
                    entities.append(k)
                    seen.add(k.lower())

        result = entities[:max_k]
        logger.info(f"NER extracted {len(result)} entities: {result}")
        return result
    except Exception as e:
        logger.warning(f"spaCy NER failed: {e} - falling back to frequency extraction")
        return _extract_keywords_freq(text, max_k)


def _extract_keywords_freq(text: str, max_k: int = 6) -> List[str]:
    stop = {
        "the", "a", "an", "is", "are", "was", "were", "be", "been", "being", "to", "of", "and", "or", "but",
        "in", "on", "at", "for", "with", "by", "from", "as", "that", "this", "it", "its", "has", "have", "had",
        "will", "would", "can", "could", "should", "may", "might", "do", "does", "did", "not", "no", "so",
        "than", "then", "there", "their", "they", "them", "we", "our", "you", "your", "he", "she", "his", "her",
        "during", "several", "also", "about", "which", "who", "whom", "what", "where", "when", "why", "how",
        "all", "any", "both", "each", "few", "more", "most", "other", "some", "such", "only", "own", "same", "very",
        "these", "those", "into", "through", "after", "before", "over", "under", "between", "out", "against", "during"
    }
    words = re.findall(r"[A-Za-z][A-Za-z\-']{2,}|\b\d{1,5}\b", text or "")
    freq: dict[str, int] = {}
    for w in words:
        wl = w.lower()
        if wl in stop:
            continue
        freq[wl] = freq.get(wl, 0) + 1
    return [w for w, _ in sorted(freq.items(), key=lambda kv: (-kv[1], kv[0]))[:max_k]]


extract_keywords = extract_entities