File size: 7,755 Bytes
d23039a
 
711bdfc
 
d23039a
 
 
 
711bdfc
 
 
 
d23039a
 
711bdfc
 
 
1b18758
711bdfc
d23039a
26f3f24
 
 
 
 
 
fba30db
 
 
 
 
 
 
d23039a
fba30db
d23039a
 
 
 
 
 
 
 
 
1b18758
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fba30db
 
 
36529c1
 
fba30db
 
 
 
 
 
 
 
 
 
36529c1
 
 
 
 
 
 
 
fba30db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
36529c1
 
 
 
 
 
fba30db
 
26f3f24
fba30db
 
26f3f24
fba30db
 
36529c1
 
 
 
 
 
 
 
 
 
fba30db
 
 
 
711bdfc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d23039a
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
from __future__ import annotations

import math
from typing import Optional, Tuple

TRUST_SCALE = [
    (0, 20, "Very Likely Fake", "critical"),
    (21, 40, "Likely Fake", "danger"),
    (41, 55, "Possibly Manipulated", "warning"),
    (56, 69, "Uncertain — Needs Verification", "warning"),
    (70, 88, "Likely Real", "positive"),
    (89, 100, "Very Likely Real", "safe"),
]

# Score range for forced disagreement clamp
UNCERTAIN_SCORE_LO = 56
UNCERTAIN_SCORE_HI = 69
UNVERIFIED_NEWS_SCORE_CAP = 55


def _validate_weight_total(weights: list[float], context: str) -> None:
    total = sum(weights)
    if total > 1.000001:
        raise ValueError(f"{context} weights must not sum above 1.0 (got {total:.3f})")


def compute_authenticity_score(fake_probability: float, label: str = "") -> int:
    """Map a fake probability [0.0, 1.0] to a 0-100 authenticity score.

    The first argument must always be the model's fake-probability (not the
    top-label confidence). 0.0 (no fake signal) → 100, 1.0 (certain fake) → 0.

    The `label` parameter is accepted for backward compatibility but not used.
    """
    return int(round(max(0.0, min(100.0, (1.0 - float(fake_probability)) * 100.0))))


def get_verdict_label(score: int) -> Tuple[str, str]:
    for lo, hi, label, severity in TRUST_SCALE:
        if lo <= score <= hi:
            return label, severity
    return "Unknown", "warning"


def apply_unverified_news_gate(
    score: int,
    *,
    has_trusted_sources: bool,
    has_contradicting_evidence: bool,
    truth_override_applied: bool,
) -> Tuple[int, str, str, str | None]:
    """Prevent unverifiable news claims from receiving a real verdict.

    The text classifier can judge writing style, but a news claim with no
    corroborating trusted source should stay in the suspicious/verification band.
    Already-fake scores remain fake; the gate only caps overly-real scores.
    """
    if has_trusted_sources or has_contradicting_evidence or truth_override_applied:
        label, severity = get_verdict_label(score)
        return score, label, severity, None

    gated_score = min(score, UNVERIFIED_NEWS_SCORE_CAP)
    if gated_score > 40:
        return gated_score, "Suspicious", "warning", "no_trusted_source"

    label, severity = get_verdict_label(gated_score)
    return gated_score, label, severity, "no_trusted_source"


def compute_video_authenticity_score(
    *,
    mean_suspicious_prob: float,
    max_suspicious_prob: float = 0.0,
    suspicious_ratio: float = 0.0,
    insufficient_faces: bool,
    temporal_score: float | None = None,
    audio_authenticity_score: float | None = None,
    has_audio: bool = False,
) -> Tuple[int, str, str]:
    """Combine video evidence into an authenticity verdict.

    Face-model evidence is authoritative only when enough face frames were
    scored. If face content is insufficient, use temporal/audio evidence when
    available instead of forcing a neutral result.

    The effective visual fake probability blends the per-frame mean with the
    per-frame maximum (65/35 split). This prevents a deepfake from hiding
    behind many clean frames: even a cluster of highly-suspicious frames
    raises the combined score meaningfully.

    A suspicious_ratio cap prevents a misleadingly high authenticity score when
    a significant fraction of frames are flagged regardless of the mean.
    """
    if insufficient_faces:
        evidence: list[tuple[float, float]] = []
        if temporal_score is not None:
            evidence.append((0.60, float(temporal_score)))
        if has_audio and audio_authenticity_score is not None:
            evidence.append((0.40, float(audio_authenticity_score)))

        if not evidence:
            return 50, "Insufficient face content", "warning"

        total_weight = sum(weight for weight, _score in evidence)
        combined = sum(weight * score for weight, score in evidence) / total_weight
        score = int(round(max(0.0, min(100.0, combined))))
        label, severity = get_verdict_label(score)
        return score, label, severity

    # Blend mean and max: mean alone is easily diluted by clean frames.
    # 65% mean keeps the overall distribution; 35% max ensures a cluster of
    # highly-suspicious frames cannot be hidden by majority-clean frames.
    effective_prob = 0.65 * float(mean_suspicious_prob) + 0.35 * float(max_suspicious_prob)
    visual_score = (1.0 - effective_prob) * 100.0

    temporal_sc = float(temporal_score) if temporal_score is not None else visual_score
    if has_audio and audio_authenticity_score is not None:
        _validate_weight_total([0.50, 0.30, 0.20], "video audio+temporal fusion")
        combined = 0.50 * visual_score + 0.30 * temporal_sc + 0.20 * float(audio_authenticity_score)
    else:
        _validate_weight_total([0.70, 0.30], "video visual+temporal fusion")
        combined = 0.70 * visual_score + 0.30 * temporal_sc
    score = int(round(max(0.0, min(100.0, combined))))

    # Suspicious-ratio caps: when a meaningful fraction of frames are flagged,
    # prevent the score from landing in a confident "Likely Real" band.
    # ≥40% suspicious → cap at 35 (Likely Fake zone).
    # ≥20% suspicious → cap at 50 (Uncertain/Suspicious zone).
    if suspicious_ratio >= 0.40:
        score = min(score, 35)
    elif suspicious_ratio >= 0.20:
        score = min(score, 50)

    label, severity = get_verdict_label(score)
    return score, label, severity


DISAGREEMENT_THRESHOLD = 0.25


def compute_signal_disagreement(components: dict[str, float]) -> Optional[float]:
    """Compute stdev of the primary evidence signals.

    Only considers signals that carry real model opinion (excludes exif/vlm
    which are weaker modifiers). Returns None when fewer than 2 signals present.
    """
    primary_keys = {"face_stack", "general", "forensics"}
    values = [v for k, v in components.items() if k in primary_keys]
    if len(values) < 2:
        return None
    mean = sum(values) / len(values)
    variance = sum((v - mean) ** 2 for v in values) / len(values)
    return math.sqrt(variance)


def maybe_clamp_to_uncertain(score: int, components: dict[str, float]) -> Tuple[int, Optional[str]]:
    """If primary signals disagree significantly, clamp score into the Uncertain band.

    Returns (final_score, disagreement_reason) where reason is None when no
    clamp was applied.
    """
    stdev = compute_signal_disagreement(components)
    if stdev is None or stdev < DISAGREEMENT_THRESHOLD:
        return score, None

    # Only clamp scores that would otherwise land in a confident verdict
    # (Very Likely Fake is still kept — if everything except one signal says
    # fake, the anomaly is informational but doesn't override).
    if score > UNCERTAIN_SCORE_HI:
        clamped = UNCERTAIN_SCORE_HI
    elif score < UNCERTAIN_SCORE_LO and score > 20:
        clamped = UNCERTAIN_SCORE_LO
    else:
        return score, None

    signal_summary = ", ".join(f"{k}={v:.2f}" for k, v in components.items()
                               if k in {"face_stack", "general", "forensics"})
    reason = f"signal_disagreement(stdev={stdev:.2f}; {signal_summary})"
    return clamped, reason


def get_score_color(score: int) -> str:
    """Linear interpolate Red (#E53935) → Amber (#FFA726) → Green (#43A047)."""
    def lerp(a: int, b: int, t: float) -> int:
        return int(round(a + (b - a) * t))

    score = max(0, min(100, score))
    if score <= 50:
        t = score / 50.0
        r, g, b = lerp(0xE5, 0xFF, t), lerp(0x39, 0xA7, t), lerp(0x35, 0x26, t)
    else:
        t = (score - 50) / 50.0
        r, g, b = lerp(0xFF, 0x43, t), lerp(0xA7, 0xA0, t), lerp(0x26, 0x47, t)
    return f"#{r:02X}{g:02X}{b:02X}"