broskiiii commited on
Commit
0bd4607
·
1 Parent(s): 36bc547

feat: Add fake news detection with OCR text extraction

Browse files
app/agent.py CHANGED
@@ -38,7 +38,32 @@ def invoke_with_fallback(messages: list) -> str:
38
  raise RuntimeError(f"All Gemini models exhausted. Last error: {last_err}")
39
 
40
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
  def run_text_agent(text: str, url_flags: dict) -> AnalysisResult:
 
 
42
  system = (
43
  "You are a cybersecurity expert specializing in phishing detection. "
44
  "Analyse the provided text for phishing indicators: urgency language, "
@@ -51,7 +76,8 @@ def run_text_agent(text: str, url_flags: dict) -> AnalysisResult:
51
  raw = raw.strip().strip("```json").strip("```").strip()
52
  data = json.loads(raw)
53
  score = float(data["risk_score"])
54
- return AnalysisResult(
 
55
  risk_score=score,
56
  risk_level=_risk_level(score),
57
  threat_types=data.get("threat_types", []),
@@ -59,8 +85,11 @@ def run_text_agent(text: str, url_flags: dict) -> AnalysisResult:
59
  tool_outputs={"gemini_text": data, "url_scan": url_flags},
60
  )
61
 
 
 
62
 
63
- def run_image_agent(hf_result: dict, gemini_result: dict) -> AnalysisResult:
 
64
  hf_score = hf_result.get("deepfake_score", 0.0)
65
  gemini_score = gemini_result.get("risk_score", 0.0)
66
  combined = round((hf_score * 0.5) + (gemini_score * 0.5), 3)
@@ -68,17 +97,20 @@ def run_image_agent(hf_result: dict, gemini_result: dict) -> AnalysisResult:
68
  set(hf_result.get("threat_types", []) + gemini_result.get("threat_types", []))
69
  )
70
  explanation = (
71
- f"HuggingFace deepfake model: {hf_result.get('label', 'N/A')} "
72
  f"(confidence {hf_score:.2f}). "
73
  f"Gemini vision analysis: {gemini_result.get('explanation', '')}"
74
  )
75
- return AnalysisResult(
76
  risk_score=combined,
77
  risk_level=_risk_level(combined),
78
  threat_types=threat_types,
79
  explanation=explanation,
80
  tool_outputs={"hf_deepfake": hf_result, "gemini_vision": gemini_result},
81
  )
 
 
 
82
 
83
 
84
  def run_video_agent(gemini_result: dict, frame_scores: list[float]) -> AnalysisResult:
@@ -93,8 +125,15 @@ def run_video_agent(gemini_result: dict, frame_scores: list[float]) -> AnalysisR
93
  return AnalysisResult(
94
  risk_score=combined,
95
  risk_level=_risk_level(combined),
 
 
96
  threat_types=gemini_result.get("threat_types", ["deepfake_video"]),
97
  explanation=explanation,
 
 
 
 
 
98
  tool_outputs={"gemini_video": gemini_result, "frame_scores": frame_scores},
99
  )
100
 
@@ -114,7 +153,14 @@ def run_audio_agent(hf_result: dict, gemini_result: dict) -> AnalysisResult:
114
  return AnalysisResult(
115
  risk_score=combined,
116
  risk_level=_risk_level(combined),
 
 
117
  threat_types=threat_types,
118
  explanation=explanation,
 
 
 
 
 
119
  tool_outputs={"hf_audio": hf_result, "gemini_audio": gemini_result},
120
  )
 
38
  raise RuntimeError(f"All Gemini models exhausted. Last error: {last_err}")
39
 
40
 
41
+ def _merge_factcheck(result: AnalysisResult, fc: dict) -> AnalysisResult:
42
+ """Merge a fact-check result dict into an existing AnalysisResult."""
43
+ verdict = fc.get("verdict", "UNVERIFIABLE")
44
+ content_type = fc.get("content_type", "unknown")
45
+ simplified = fc.get("simplified_explanation", "")
46
+ fc_score = float(fc.get("risk_score", 0.5))
47
+ fc_threats = fc.get("threat_types", [])
48
+
49
+ combined_score = round(max(result.risk_score, fc_score * 0.6), 3)
50
+ combined_threats = list(set(result.threat_types + fc_threats))
51
+
52
+ return AnalysisResult(
53
+ risk_score=combined_score,
54
+ risk_level=_risk_level(combined_score),
55
+ verdict=verdict,
56
+ content_type=content_type,
57
+ threat_types=combined_threats,
58
+ explanation=result.explanation,
59
+ simplified_explanation=simplified,
60
+ tool_outputs={**result.tool_outputs, "fact_check": fc},
61
+ )
62
+
63
+
64
  def run_text_agent(text: str, url_flags: dict) -> AnalysisResult:
65
+ from app.tools.fakenews_tools import classify_and_fact_check
66
+
67
  system = (
68
  "You are a cybersecurity expert specializing in phishing detection. "
69
  "Analyse the provided text for phishing indicators: urgency language, "
 
76
  raw = raw.strip().strip("```json").strip("```").strip()
77
  data = json.loads(raw)
78
  score = float(data["risk_score"])
79
+
80
+ base = AnalysisResult(
81
  risk_score=score,
82
  risk_level=_risk_level(score),
83
  threat_types=data.get("threat_types", []),
 
85
  tool_outputs={"gemini_text": data, "url_scan": url_flags},
86
  )
87
 
88
+ fc = classify_and_fact_check(text)
89
+ return _merge_factcheck(base, fc)
90
 
91
+
92
+ def run_image_agent(hf_result: dict, gemini_result: dict, fc_result: dict | None = None) -> AnalysisResult:
93
  hf_score = hf_result.get("deepfake_score", 0.0)
94
  gemini_score = gemini_result.get("risk_score", 0.0)
95
  combined = round((hf_score * 0.5) + (gemini_score * 0.5), 3)
 
97
  set(hf_result.get("threat_types", []) + gemini_result.get("threat_types", []))
98
  )
99
  explanation = (
100
+ f"Deepfake model: {hf_result.get('label', 'N/A')} "
101
  f"(confidence {hf_score:.2f}). "
102
  f"Gemini vision analysis: {gemini_result.get('explanation', '')}"
103
  )
104
+ base = AnalysisResult(
105
  risk_score=combined,
106
  risk_level=_risk_level(combined),
107
  threat_types=threat_types,
108
  explanation=explanation,
109
  tool_outputs={"hf_deepfake": hf_result, "gemini_vision": gemini_result},
110
  )
111
+ if fc_result:
112
+ return _merge_factcheck(base, fc_result)
113
+ return base
114
 
115
 
116
  def run_video_agent(gemini_result: dict, frame_scores: list[float]) -> AnalysisResult:
 
125
  return AnalysisResult(
126
  risk_score=combined,
127
  risk_level=_risk_level(combined),
128
+ verdict="FAKE" if combined > 0.5 else "UNVERIFIABLE",
129
+ content_type="unknown",
130
  threat_types=gemini_result.get("threat_types", ["deepfake_video"]),
131
  explanation=explanation,
132
+ simplified_explanation=(
133
+ "This video shows signs of AI manipulation or deepfake content."
134
+ if combined > 0.5 else
135
+ "No definitive deepfake signals detected, but proceed with caution."
136
+ ),
137
  tool_outputs={"gemini_video": gemini_result, "frame_scores": frame_scores},
138
  )
139
 
 
153
  return AnalysisResult(
154
  risk_score=combined,
155
  risk_level=_risk_level(combined),
156
+ verdict="FAKE" if combined > 0.5 else "UNVERIFIABLE",
157
+ content_type="unknown",
158
  threat_types=threat_types,
159
  explanation=explanation,
160
+ simplified_explanation=(
161
+ "This audio appears to be AI-generated or synthetically cloned. Do not trust its authenticity."
162
+ if combined > 0.5 else
163
+ "No strong deepfake signals in audio, but remain cautious."
164
+ ),
165
  tool_outputs={"hf_audio": hf_result, "gemini_audio": gemini_result},
166
  )
app/models.py CHANGED
@@ -9,6 +9,9 @@ class TextRequest(BaseModel):
9
  class AnalysisResult(BaseModel):
10
  risk_score: float = Field(..., ge=0.0, le=1.0, description="0.0 = safe, 1.0 = critical threat")
11
  risk_level: str = Field(..., description="LOW | MEDIUM | HIGH | CRITICAL")
 
 
12
  threat_types: list[str] = Field(default_factory=list)
13
  explanation: str
 
14
  tool_outputs: dict[str, Any] = Field(default_factory=dict)
 
9
  class AnalysisResult(BaseModel):
10
  risk_score: float = Field(..., ge=0.0, le=1.0, description="0.0 = safe, 1.0 = critical threat")
11
  risk_level: str = Field(..., description="LOW | MEDIUM | HIGH | CRITICAL")
12
+ verdict: str = Field(default="", description="REAL | FAKE | UNVERIFIABLE | PHISHING")
13
+ content_type: str = Field(default="", description="phishing | fake_news | real_news | unknown")
14
  threat_types: list[str] = Field(default_factory=list)
15
  explanation: str
16
+ simplified_explanation: str = Field(default="", description="Plain-language verdict summary")
17
  tool_outputs: dict[str, Any] = Field(default_factory=dict)
app/routers/image.py CHANGED
@@ -1,6 +1,7 @@
1
  from fastapi import APIRouter, UploadFile, File, HTTPException
2
  from app.models import AnalysisResult
3
- from app.tools.image_tools import hf_detect_image_deepfake, gemini_analyze_image
 
4
  from app.agent import run_image_agent
5
 
6
  router = APIRouter()
@@ -15,6 +16,10 @@ async def analyze_image(file: UploadFile = File(...)):
15
  image_bytes = await file.read()
16
  hf_result = hf_detect_image_deepfake(image_bytes, mime_type=file.content_type)
17
  gemini_result = gemini_analyze_image(image_bytes, mime_type=file.content_type)
18
- return run_image_agent(hf_result, gemini_result)
 
 
 
 
19
  except Exception as e:
20
  raise HTTPException(status_code=500, detail=str(e))
 
1
  from fastapi import APIRouter, UploadFile, File, HTTPException
2
  from app.models import AnalysisResult
3
+ from app.tools.image_tools import hf_detect_image_deepfake, gemini_analyze_image, gemini_extract_image_text
4
+ from app.tools.fakenews_tools import extract_text_from_image_for_factcheck
5
  from app.agent import run_image_agent
6
 
7
  router = APIRouter()
 
16
  image_bytes = await file.read()
17
  hf_result = hf_detect_image_deepfake(image_bytes, mime_type=file.content_type)
18
  gemini_result = gemini_analyze_image(image_bytes, mime_type=file.content_type)
19
+
20
+ extracted_text = gemini_extract_image_text(image_bytes, mime_type=file.content_type)
21
+ fc_result = extract_text_from_image_for_factcheck(extracted_text) if extracted_text else None
22
+
23
+ return run_image_agent(hf_result, gemini_result, fc_result)
24
  except Exception as e:
25
  raise HTTPException(status_code=500, detail=str(e))
app/routers/text.py CHANGED
@@ -1,6 +1,6 @@
1
  from fastapi import APIRouter, HTTPException
2
  from app.models import TextRequest, AnalysisResult
3
- from app.tools.text_tools import analyze_urls_in_text, gemini_analyze_text
4
  from app.agent import run_text_agent
5
 
6
  router = APIRouter()
@@ -10,7 +10,7 @@ router = APIRouter()
10
  async def analyze_text(request: TextRequest):
11
  try:
12
  url_flags = analyze_urls_in_text(request.text)
13
- gemini_data = gemini_analyze_text(request.text)
14
  return run_text_agent(request.text, url_flags)
15
  except Exception as e:
16
  raise HTTPException(status_code=500, detail=str(e))
 
 
1
  from fastapi import APIRouter, HTTPException
2
  from app.models import TextRequest, AnalysisResult
3
+ from app.tools.text_tools import analyze_urls_in_text
4
  from app.agent import run_text_agent
5
 
6
  router = APIRouter()
 
10
  async def analyze_text(request: TextRequest):
11
  try:
12
  url_flags = analyze_urls_in_text(request.text)
 
13
  return run_text_agent(request.text, url_flags)
14
  except Exception as e:
15
  raise HTTPException(status_code=500, detail=str(e))
16
+
app/tools/fakenews_tools.py ADDED
@@ -0,0 +1,119 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Fake-news / fact-check tools.
3
+ Uses Gemini with Google Search grounding (already configured).
4
+ No additional API keys needed.
5
+ """
6
+ import json
7
+ import re
8
+ from langchain_google_genai import ChatGoogleGenerativeAI
9
+ from langchain_core.messages import HumanMessage, SystemMessage
10
+ from app.config import GEMINI_API_KEY, GEMINI_MODEL, GEMINI_MODEL_FALLBACKS
11
+ from app.tools.retry_utils import execute_with_retry
12
+
13
+ try:
14
+ from google.genai import types as genai_types
15
+ _SEARCH_TOOL = genai_types.Tool(google_search=genai_types.GoogleSearch())
16
+ _HAS_SEARCH = True
17
+ except Exception:
18
+ _SEARCH_TOOL = None
19
+ _HAS_SEARCH = False
20
+
21
+
22
+ def _invoke_grounded(messages: list) -> str:
23
+ for model in [GEMINI_MODEL] + GEMINI_MODEL_FALLBACKS:
24
+ try:
25
+ llm = ChatGoogleGenerativeAI(
26
+ model=model,
27
+ google_api_key=GEMINI_API_KEY,
28
+ temperature=0.1,
29
+ )
30
+ kwargs = {"tools": [_SEARCH_TOOL]} if _HAS_SEARCH and _SEARCH_TOOL else {}
31
+ return execute_with_retry(lambda m=llm: m.invoke(messages, **kwargs).content)
32
+ except Exception as e:
33
+ if "429" not in str(e) and "RESOURCE_EXHAUSTED" not in str(e):
34
+ raise
35
+ raise RuntimeError("All Gemini models quota exhausted")
36
+
37
+
38
+ def _clean_json(raw: str) -> str:
39
+ raw = raw.strip()
40
+ raw = re.sub(r"^```(?:json)?", "", raw).rstrip("`").strip()
41
+ return raw
42
+
43
+
44
+ def classify_and_fact_check(text: str) -> dict:
45
+ """
46
+ Determines if the content is a phishing attempt or news/claim,
47
+ fact-checks it using Google Search grounding, and returns a
48
+ skeptical verdict leaning towards FAKE when evidence is weak.
49
+
50
+ Returns:
51
+ {
52
+ "content_type": "phishing|fake_news|real_news|unknown",
53
+ "verdict": "FAKE|REAL|UNVERIFIABLE|PHISHING",
54
+ "risk_score": float,
55
+ "checked_claims": [{"claim": str, "verdict": str, "source": str}],
56
+ "explanation": str,
57
+ "simplified_explanation": str,
58
+ "threat_types": [str]
59
+ }
60
+ """
61
+ system = (
62
+ "You are an extremely skeptical AI fact-checker and cybersecurity analyst, "
63
+ "similar to Twitter/X Community Notes and Grok fact-checking. "
64
+ "Your default assumption is that unverified or sensational content is FAKE or PHISHING. "
65
+ "You MUST use the Google Search tool to look up every specific claim, company, event, or person mentioned. "
66
+ "\n\nYour analysis pipeline:"
67
+ "\n1. CLASSIFY: Is this (a) a phishing/scam attempt, (b) a news claim or viral content, or (c) unknown?"
68
+ "\n2. EXTRACT: Identify all specific claims, company names, events, people, and dates."
69
+ "\n3. VERIFY: Search for each claim. Trusted sources: established news outlets, official .gov/.edu sites, Wikipedia."
70
+ "\n4. VERDICT: Apply these strict rules:"
71
+ "\n - If phishing indicators found → verdict=PHISHING"
72
+ "\n - If claims are confirmed by 2+ credible sources → verdict=REAL"
73
+ "\n - If claims are contradicted by credible sources → verdict=FAKE"
74
+ "\n - If evidence is weak, missing, or only from unverified sources → verdict=FAKE (lean skeptical)"
75
+ "\n - If completely unverifiable even after searching → verdict=UNVERIFIABLE"
76
+ "\n5. EXPLAIN: Write a simplified_explanation as if explaining to a non-technical person in 2-3 sentences. "
77
+ "Be direct: start with 'This appears to be FAKE/REAL/etc because...'"
78
+ "\n\nRespond ONLY with valid JSON:"
79
+ '{"content_type": "phishing|fake_news|real_news|unknown", '
80
+ '"verdict": "FAKE|REAL|UNVERIFIABLE|PHISHING", '
81
+ '"risk_score": <float 0.0-1.0>, '
82
+ '"checked_claims": [{"claim": <str>, "verdict": <str>, "source": <str>}], '
83
+ '"explanation": <str>, '
84
+ '"simplified_explanation": <str>, '
85
+ '"threat_types": [<str>]}'
86
+ )
87
+ try:
88
+ raw = _invoke_grounded([SystemMessage(content=system), HumanMessage(content=text)])
89
+ if not isinstance(raw, str):
90
+ raw = str(raw)
91
+ return json.loads(_clean_json(raw))
92
+ except Exception as e:
93
+ return {
94
+ "content_type": "unknown",
95
+ "verdict": "UNVERIFIABLE",
96
+ "risk_score": 0.5,
97
+ "checked_claims": [],
98
+ "explanation": f"Fact-check failed: {e}",
99
+ "simplified_explanation": "Could not verify this content. Treat with caution.",
100
+ "threat_types": [],
101
+ }
102
+
103
+
104
+ def extract_text_from_image_for_factcheck(gemini_text_response: str) -> dict:
105
+ """
106
+ Given text already extracted from an image (e.g. from gemini_analyze_image),
107
+ runs classify_and_fact_check on it.
108
+ """
109
+ if not gemini_text_response or len(gemini_text_response.strip()) < 10:
110
+ return {
111
+ "content_type": "unknown",
112
+ "verdict": "UNVERIFIABLE",
113
+ "risk_score": 0.0,
114
+ "checked_claims": [],
115
+ "explanation": "No text extracted from image to fact-check.",
116
+ "simplified_explanation": "No readable text was found in the image.",
117
+ "threat_types": [],
118
+ }
119
+ return classify_and_fact_check(gemini_text_response)
app/tools/image_tools.py CHANGED
@@ -55,6 +55,36 @@ def hf_detect_image_deepfake(image_bytes: bytes, mime_type: str = "image/jpeg")
55
 
56
 
57
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58
  def gemini_analyze_image(image_bytes: bytes, mime_type: str = "image/jpeg") -> dict:
59
  b64 = base64.b64encode(image_bytes).decode()
60
  system = (
 
55
 
56
 
57
 
58
+ def gemini_extract_image_text(image_bytes: bytes, mime_type: str = "image/jpeg") -> str:
59
+ """Extract all visible text/claims from an image for fact-checking."""
60
+ b64 = base64.b64encode(image_bytes).decode()
61
+ system = (
62
+ "You are an OCR and content extraction system. "
63
+ "Extract ALL visible text from this image verbatim: headlines, captions, tweets, posts, labels, watermarks. "
64
+ "If it contains news or claims, summarize the key claim in one sentence at the end. "
65
+ "Return plain text only, no JSON."
66
+ )
67
+ message = HumanMessage(
68
+ content=[
69
+ {"type": "text", "text": system},
70
+ {"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{b64}"}},
71
+ ]
72
+ )
73
+ from app.tools.retry_utils import execute_with_retry
74
+ for model in [GEMINI_MODEL] + GEMINI_MODEL_FALLBACKS:
75
+ try:
76
+ resp = execute_with_retry(
77
+ lambda m=model: ChatGoogleGenerativeAI(
78
+ model=m, google_api_key=GEMINI_API_KEY, temperature=0.0
79
+ ).invoke([message])
80
+ )
81
+ return resp.content if isinstance(resp.content, str) else str(resp.content)
82
+ except Exception as e:
83
+ if "429" not in str(e) and "RESOURCE_EXHAUSTED" not in str(e):
84
+ raise
85
+ return ""
86
+
87
+
88
  def gemini_analyze_image(image_bytes: bytes, mime_type: str = "image/jpeg") -> dict:
89
  b64 = base64.b64encode(image_bytes).decode()
90
  system = (