Spaces:
Sleeping
Sleeping
Create __init__.py
Browse files- __init__.py +163 -0
__init__.py
ADDED
|
@@ -0,0 +1,163 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
import time
|
| 2 |
+
import logging
|
| 3 |
+
import re
|
| 4 |
+
from typing import Optional, Dict, Any, List, Tuple
|
| 5 |
+
|
| 6 |
+
# Import all internal modules
|
| 7 |
+
from ._config import trust_score_pct_from_final_score, SERPAPI_KEY
|
| 8 |
+
from ._utils import sanitize_text
|
| 9 |
+
from ._data_fetcher import phishing_checks, serpapi_search, aggregate_search_results, fetch_article_text_from_url
|
| 10 |
+
from ._media_analyzer import analyze_image_source
|
| 11 |
+
from ._model_interface import gemini_extract_claims_from_text, gemini_generate_claim_from_image, gemini_verify_claim
|
| 12 |
+
from ._signal_aggregator import hf_zero_shot_classify, combine_signals, media_authenticity_override
|
| 13 |
+
|
| 14 |
+
logger = logging.getLogger("fact_checker_orchestrator")
|
| 15 |
+
|
| 16 |
+
# Expose fetch_article_text_from_url from the package for app.py to use
|
| 17 |
+
__all__ = ["Orchestrator", "fetch_article_text_from_url"]
|
| 18 |
+
|
| 19 |
+
class Orchestrator:
|
| 20 |
+
"""
|
| 21 |
+
The central class for coordinating all fact-checking tools and reporting.
|
| 22 |
+
It orchestrates data fetching, analysis, and signal aggregation.
|
| 23 |
+
"""
|
| 24 |
+
def run(self, claim_text: str, article_text: Optional[str], url: Optional[str], image_source: Optional[str], run_serpapi: bool = True) -> dict:
|
| 25 |
+
|
| 26 |
+
# --- PHASE 1: Data Gathering and Pre-analysis ---
|
| 27 |
+
article_text = sanitize_text(article_text or "")
|
| 28 |
+
claim_text = sanitize_text(claim_text or "")
|
| 29 |
+
|
| 30 |
+
image_analysis = analyze_image_source(image_source) if image_source else None
|
| 31 |
+
phish_report = phishing_checks(url) if url else {}
|
| 32 |
+
|
| 33 |
+
# Initial web search (if necessary)
|
| 34 |
+
serpapi_result = {"available": False}
|
| 35 |
+
if run_serpapi and SERPAPI_KEY:
|
| 36 |
+
q = claim_text or article_text or url or image_source
|
| 37 |
+
if q: serpapi_result = serpapi_search(q, "web_search", num=8)
|
| 38 |
+
|
| 39 |
+
# --- PHASE 2: Claim Generation/Extraction ---
|
| 40 |
+
claims_to_check = []
|
| 41 |
+
if article_text:
|
| 42 |
+
claims_struct = gemini_extract_claims_from_text(article_text, max_claims=3)
|
| 43 |
+
claims_to_check = [c["claim"] for c in claims_struct if c.get("claim")]
|
| 44 |
+
|
| 45 |
+
# Fallbacks for claims
|
| 46 |
+
if not claims_to_check:
|
| 47 |
+
if claim_text:
|
| 48 |
+
claims_to_check = [claim_text]
|
| 49 |
+
elif image_source and image_analysis and image_analysis.get("fetched"):
|
| 50 |
+
claims_to_check = [gemini_generate_claim_from_image(image_source) or "Auto-generated (unverified): Could not form a factual claim from media."]
|
| 51 |
+
elif article_text:
|
| 52 |
+
sents = re.split(r'(?<=[.!?])\s+', article_text)
|
| 53 |
+
claims_to_check = [sents[0][:800].strip()] if sents else ["No clear claim or article text to analyze."]
|
| 54 |
+
|
| 55 |
+
# Deduplicate claims
|
| 56 |
+
unique_claims = []
|
| 57 |
+
seen = set()
|
| 58 |
+
for c in claims_to_check:
|
| 59 |
+
c_clean = c.strip().lower()
|
| 60 |
+
if c_clean and c_clean not in seen:
|
| 61 |
+
seen.add(c_clean)
|
| 62 |
+
unique_claims.append(c)
|
| 63 |
+
|
| 64 |
+
# --- PHASE 3: Verification and Reporting per Claim ---
|
| 65 |
+
reports = []
|
| 66 |
+
for claim in unique_claims:
|
| 67 |
+
# Re-run search specifically for the claim for better grounding
|
| 68 |
+
serpapi_for_claim = serpapi_search(claim, "web_search", num=6) if run_serpapi and SERPAPI_KEY else serpapi_result
|
| 69 |
+
|
| 70 |
+
hf_result = hf_zero_shot_classify(claim)
|
| 71 |
+
gemini_verdict = gemini_verify_claim(claim, serpapi_for_claim, image_analysis)
|
| 72 |
+
evidence_agg = aggregate_search_results(serpapi_for_claim)
|
| 73 |
+
combined = combine_signals(gemini_verdict, hf_result, evidence_agg)
|
| 74 |
+
|
| 75 |
+
# Check for media fabrication and override if needed
|
| 76 |
+
is_fake_media, override_reason = media_authenticity_override({"image_analysis": image_analysis, "gemini_verdict": gemini_verdict, "evidence_agg": evidence_agg})
|
| 77 |
+
if is_fake_media:
|
| 78 |
+
combined["final_verdict"] = "False"
|
| 79 |
+
combined["confidence"] = max(combined.get("confidence", 0.6), 0.7)
|
| 80 |
+
combined["reasons"].insert(0, override_reason)
|
| 81 |
+
|
| 82 |
+
report_entry = {
|
| 83 |
+
"claim": claim,
|
| 84 |
+
"context_snippet": article_text[:400] if article_text else "None",
|
| 85 |
+
"image_analysis": image_analysis,
|
| 86 |
+
"hf_classifier": hf_result,
|
| 87 |
+
"gemini_verdict": gemini_verdict,
|
| 88 |
+
"evidence_agg": evidence_agg,
|
| 89 |
+
"phishing_analysis": phish_report,
|
| 90 |
+
"final_verdict": combined["final_verdict"],
|
| 91 |
+
"confidence": combined["confidence"],
|
| 92 |
+
"reasons": combined.get("reasons", []),
|
| 93 |
+
"final_score": combined.get("final_score"),
|
| 94 |
+
"trust_score_pct": trust_score_pct_from_final_score(combined.get("final_score", 0.0))
|
| 95 |
+
}
|
| 96 |
+
# Generate the final user-friendly summary
|
| 97 |
+
report_entry["qa_summary"] = self.format_user_friendly_explanation(report_entry)
|
| 98 |
+
reports.append(report_entry)
|
| 99 |
+
|
| 100 |
+
# --- PHASE 4: Summary ---
|
| 101 |
+
summary = {"counts": {}, "dominant_verdict": "Unclear", "phishing_flag": False}
|
| 102 |
+
for r in reports:
|
| 103 |
+
v = r["final_verdict"]
|
| 104 |
+
summary["counts"][v] = summary["counts"].get(v, 0) + 1
|
| 105 |
+
if reports:
|
| 106 |
+
dominant = max(summary["counts"].items(), key=lambda x: x[1])[0]
|
| 107 |
+
summary["dominant_verdict"] = dominant
|
| 108 |
+
|
| 109 |
+
sb = (phish_report.get("safe_browsing") or {})
|
| 110 |
+
vt = (phish_report.get("virustotal") or {})
|
| 111 |
+
summary["phishing_flag"] = bool(sb.get("safe") is False or vt.get("safe") is False)
|
| 112 |
+
|
| 113 |
+
return {
|
| 114 |
+
"metadata": {
|
| 115 |
+
"url_analyzed": url, "image_source_analyzed": image_source,
|
| 116 |
+
"timestamp": time.time(), "claims_analyzed": len(reports)
|
| 117 |
+
},
|
| 118 |
+
"summary": summary,
|
| 119 |
+
"reports": reports,
|
| 120 |
+
}
|
| 121 |
+
|
| 122 |
+
def format_user_friendly_explanation(self, report_entry: dict) -> str:
|
| 123 |
+
"""Creates a structured Q/A summary for the Gradio output box."""
|
| 124 |
+
claim = report_entry.get("claim", "").strip() or "(no claim provided)"
|
| 125 |
+
reasons = report_entry.get("reasons", []) or []
|
| 126 |
+
verdict = report_entry.get("final_verdict", "Unclear")
|
| 127 |
+
confidence = report_entry.get("confidence", 0.0)
|
| 128 |
+
|
| 129 |
+
# Q0: Final Verdict
|
| 130 |
+
q0 = f"Q0: Final Verdict: **{verdict.upper()}** (Confidence: {int(confidence * 100)}%)\nClaim: {claim}"
|
| 131 |
+
|
| 132 |
+
# Q1: Why
|
| 133 |
+
reason_texts = [str(r["description"]) if isinstance(r, dict) and r.get("description") else str(r) for r in reasons[:3]]
|
| 134 |
+
q1 = f"Q1: Why did we reach this verdict?\nA: {'; '.join(reason_texts) or 'No strong model reasons were returned.'}"
|
| 135 |
+
|
| 136 |
+
# Q2: How verified
|
| 137 |
+
evidence_agg = report_entry.get("evidence_agg", {}) or {}
|
| 138 |
+
evidence = evidence_agg.get("evidence", []) or []
|
| 139 |
+
top_sources = [f"{e.get('domain')}: {e.get('title', '')[:100]}" for e in evidence[:3] if e.get('link')]
|
| 140 |
+
|
| 141 |
+
checks = ["Gemini model analysis", "Web evidence aggregation"]
|
| 142 |
+
if report_entry.get("hf_classifier") and not report_entry["hf_classifier"].get("error"): checks.append("HF zero-shot classifier")
|
| 143 |
+
if report_entry.get("image_analysis", {}).get("fetched"): checks.append("Media analysis (EXIF/ELA/pHash/reverse-image)")
|
| 144 |
+
phish = report_entry.get("phishing_analysis", {})
|
| 145 |
+
if phish.get("safe_browsing", {}).get("safe") is False: checks.append("Safe Browsing Flagged")
|
| 146 |
+
if phish.get("virustotal", {}).get("safe") is False: checks.append("VirusTotal Flagged")
|
| 147 |
+
|
| 148 |
+
q2_lines = [
|
| 149 |
+
"Q2: How was it verified?",
|
| 150 |
+
"A: Verified by:",
|
| 151 |
+
f"- Top web references:\n- {'\n- '.join(top_sources) if top_sources else 'No strong web sources found.'}",
|
| 152 |
+
f"- Automated checks: {', '.join(checks)}"
|
| 153 |
+
]
|
| 154 |
+
q2 = "\n".join(q2_lines)
|
| 155 |
+
|
| 156 |
+
# Q3: Next Steps
|
| 157 |
+
next_steps = ["- Seek independent confirmation from trusted outlets."]
|
| 158 |
+
if top_sources: next_steps.insert(0, "- Read the listed sources for full context and check publication dates.")
|
| 159 |
+
if phish.get("url") and phish.get("safe_browsing", {}).get("safe") is False:
|
| 160 |
+
next_steps.insert(0, "- **CRITICAL:** Do NOT click links from this page; treat it as potentially unsafe and report it.")
|
| 161 |
+
q3 = "Q3: What should you do next?\nA: " + " ".join(next_steps)
|
| 162 |
+
|
| 163 |
+
return f"{q0}\n\n{q1}\n\n{q2}\n\n{q3}"
|