news / __init__.py
mdAmin313's picture
Create __init__.py
a0ac3c0 verified
import time
import logging
import re
from typing import Optional, Dict, Any, List, Tuple
# Import all internal modules
from ._config import trust_score_pct_from_final_score, SERPAPI_KEY
from ._utils import sanitize_text
from ._data_fetcher import phishing_checks, serpapi_search, aggregate_search_results, fetch_article_text_from_url
from ._media_analyzer import analyze_image_source
from ._model_interface import gemini_extract_claims_from_text, gemini_generate_claim_from_image, gemini_verify_claim
from ._signal_aggregator import hf_zero_shot_classify, combine_signals, media_authenticity_override
logger = logging.getLogger("fact_checker_orchestrator")
# Expose fetch_article_text_from_url from the package for app.py to use
__all__ = ["Orchestrator", "fetch_article_text_from_url"]
class Orchestrator:
"""
The central class for coordinating all fact-checking tools and reporting.
It orchestrates data fetching, analysis, and signal aggregation.
"""
def run(self, claim_text: str, article_text: Optional[str], url: Optional[str], image_source: Optional[str], run_serpapi: bool = True) -> dict:
# --- PHASE 1: Data Gathering and Pre-analysis ---
article_text = sanitize_text(article_text or "")
claim_text = sanitize_text(claim_text or "")
image_analysis = analyze_image_source(image_source) if image_source else None
phish_report = phishing_checks(url) if url else {}
# Initial web search (if necessary)
serpapi_result = {"available": False}
if run_serpapi and SERPAPI_KEY:
q = claim_text or article_text or url or image_source
if q: serpapi_result = serpapi_search(q, "web_search", num=8)
# --- PHASE 2: Claim Generation/Extraction ---
claims_to_check = []
if article_text:
claims_struct = gemini_extract_claims_from_text(article_text, max_claims=3)
claims_to_check = [c["claim"] for c in claims_struct if c.get("claim")]
# Fallbacks for claims
if not claims_to_check:
if claim_text:
claims_to_check = [claim_text]
elif image_source and image_analysis and image_analysis.get("fetched"):
claims_to_check = [gemini_generate_claim_from_image(image_source) or "Auto-generated (unverified): Could not form a factual claim from media."]
elif article_text:
sents = re.split(r'(?<=[.!?])\s+', article_text)
claims_to_check = [sents[0][:800].strip()] if sents else ["No clear claim or article text to analyze."]
# Deduplicate claims
unique_claims = []
seen = set()
for c in claims_to_check:
c_clean = c.strip().lower()
if c_clean and c_clean not in seen:
seen.add(c_clean)
unique_claims.append(c)
# --- PHASE 3: Verification and Reporting per Claim ---
reports = []
for claim in unique_claims:
# Re-run search specifically for the claim for better grounding
serpapi_for_claim = serpapi_search(claim, "web_search", num=6) if run_serpapi and SERPAPI_KEY else serpapi_result
hf_result = hf_zero_shot_classify(claim)
gemini_verdict = gemini_verify_claim(claim, serpapi_for_claim, image_analysis)
evidence_agg = aggregate_search_results(serpapi_for_claim)
combined = combine_signals(gemini_verdict, hf_result, evidence_agg)
# Check for media fabrication and override if needed
is_fake_media, override_reason = media_authenticity_override({"image_analysis": image_analysis, "gemini_verdict": gemini_verdict, "evidence_agg": evidence_agg})
if is_fake_media:
combined["final_verdict"] = "False"
combined["confidence"] = max(combined.get("confidence", 0.6), 0.7)
combined["reasons"].insert(0, override_reason)
report_entry = {
"claim": claim,
"context_snippet": article_text[:400] if article_text else "None",
"image_analysis": image_analysis,
"hf_classifier": hf_result,
"gemini_verdict": gemini_verdict,
"evidence_agg": evidence_agg,
"phishing_analysis": phish_report,
"final_verdict": combined["final_verdict"],
"confidence": combined["confidence"],
"reasons": combined.get("reasons", []),
"final_score": combined.get("final_score"),
"trust_score_pct": trust_score_pct_from_final_score(combined.get("final_score", 0.0))
}
# Generate the final user-friendly summary
report_entry["qa_summary"] = self.format_user_friendly_explanation(report_entry)
reports.append(report_entry)
# --- PHASE 4: Summary ---
summary = {"counts": {}, "dominant_verdict": "Unclear", "phishing_flag": False}
for r in reports:
v = r["final_verdict"]
summary["counts"][v] = summary["counts"].get(v, 0) + 1
if reports:
dominant = max(summary["counts"].items(), key=lambda x: x[1])[0]
summary["dominant_verdict"] = dominant
sb = (phish_report.get("safe_browsing") or {})
vt = (phish_report.get("virustotal") or {})
summary["phishing_flag"] = bool(sb.get("safe") is False or vt.get("safe") is False)
return {
"metadata": {
"url_analyzed": url, "image_source_analyzed": image_source,
"timestamp": time.time(), "claims_analyzed": len(reports)
},
"summary": summary,
"reports": reports,
}
def format_user_friendly_explanation(self, report_entry: dict) -> str:
"""Creates a structured Q/A summary for the Gradio output box."""
claim = report_entry.get("claim", "").strip() or "(no claim provided)"
reasons = report_entry.get("reasons", []) or []
verdict = report_entry.get("final_verdict", "Unclear")
confidence = report_entry.get("confidence", 0.0)
# Q0: Final Verdict
q0 = f"Q0: Final Verdict: **{verdict.upper()}** (Confidence: {int(confidence * 100)}%)\nClaim: {claim}"
# Q1: Why
reason_texts = [str(r["description"]) if isinstance(r, dict) and r.get("description") else str(r) for r in reasons[:3]]
q1 = f"Q1: Why did we reach this verdict?\nA: {'; '.join(reason_texts) or 'No strong model reasons were returned.'}"
# Q2: How verified
evidence_agg = report_entry.get("evidence_agg", {}) or {}
evidence = evidence_agg.get("evidence", []) or []
top_sources = [f"{e.get('domain')}: {e.get('title', '')[:100]}" for e in evidence[:3] if e.get('link')]
checks = ["Gemini model analysis", "Web evidence aggregation"]
if report_entry.get("hf_classifier") and not report_entry["hf_classifier"].get("error"): checks.append("HF zero-shot classifier")
if report_entry.get("image_analysis", {}).get("fetched"): checks.append("Media analysis (EXIF/ELA/pHash/reverse-image)")
phish = report_entry.get("phishing_analysis", {})
if phish.get("safe_browsing", {}).get("safe") is False: checks.append("Safe Browsing Flagged")
if phish.get("virustotal", {}).get("safe") is False: checks.append("VirusTotal Flagged")
q2_lines = [
"Q2: How was it verified?",
"A: Verified by:",
f"- Top web references:\n- {'\n- '.join(top_sources) if top_sources else 'No strong web sources found.'}",
f"- Automated checks: {', '.join(checks)}"
]
q2 = "\n".join(q2_lines)
# Q3: Next Steps
next_steps = ["- Seek independent confirmation from trusted outlets."]
if top_sources: next_steps.insert(0, "- Read the listed sources for full context and check publication dates.")
if phish.get("url") and phish.get("safe_browsing", {}).get("safe") is False:
next_steps.insert(0, "- **CRITICAL:** Do NOT click links from this page; treat it as potentially unsafe and report it.")
q3 = "Q3: What should you do next?\nA: " + " ".join(next_steps)
return f"{q0}\n\n{q1}\n\n{q2}\n\n{q3}"