Spaces:
Sleeping
Sleeping
| import time | |
| import logging | |
| import re | |
| from typing import Optional, Dict, Any, List, Tuple | |
| # Import all internal modules | |
| from ._config import trust_score_pct_from_final_score, SERPAPI_KEY | |
| from ._utils import sanitize_text | |
| from ._data_fetcher import phishing_checks, serpapi_search, aggregate_search_results, fetch_article_text_from_url | |
| from ._media_analyzer import analyze_image_source | |
| from ._model_interface import gemini_extract_claims_from_text, gemini_generate_claim_from_image, gemini_verify_claim | |
| from ._signal_aggregator import hf_zero_shot_classify, combine_signals, media_authenticity_override | |
| logger = logging.getLogger("fact_checker_orchestrator") | |
| # Expose fetch_article_text_from_url from the package for app.py to use | |
| __all__ = ["Orchestrator", "fetch_article_text_from_url"] | |
| class Orchestrator: | |
| """ | |
| The central class for coordinating all fact-checking tools and reporting. | |
| It orchestrates data fetching, analysis, and signal aggregation. | |
| """ | |
| def run(self, claim_text: str, article_text: Optional[str], url: Optional[str], image_source: Optional[str], run_serpapi: bool = True) -> dict: | |
| # --- PHASE 1: Data Gathering and Pre-analysis --- | |
| article_text = sanitize_text(article_text or "") | |
| claim_text = sanitize_text(claim_text or "") | |
| image_analysis = analyze_image_source(image_source) if image_source else None | |
| phish_report = phishing_checks(url) if url else {} | |
| # Initial web search (if necessary) | |
| serpapi_result = {"available": False} | |
| if run_serpapi and SERPAPI_KEY: | |
| q = claim_text or article_text or url or image_source | |
| if q: serpapi_result = serpapi_search(q, "web_search", num=8) | |
| # --- PHASE 2: Claim Generation/Extraction --- | |
| claims_to_check = [] | |
| if article_text: | |
| claims_struct = gemini_extract_claims_from_text(article_text, max_claims=3) | |
| claims_to_check = [c["claim"] for c in claims_struct if c.get("claim")] | |
| # Fallbacks for claims | |
| if not claims_to_check: | |
| if claim_text: | |
| claims_to_check = [claim_text] | |
| elif image_source and image_analysis and image_analysis.get("fetched"): | |
| claims_to_check = [gemini_generate_claim_from_image(image_source) or "Auto-generated (unverified): Could not form a factual claim from media."] | |
| elif article_text: | |
| sents = re.split(r'(?<=[.!?])\s+', article_text) | |
| claims_to_check = [sents[0][:800].strip()] if sents else ["No clear claim or article text to analyze."] | |
| # Deduplicate claims | |
| unique_claims = [] | |
| seen = set() | |
| for c in claims_to_check: | |
| c_clean = c.strip().lower() | |
| if c_clean and c_clean not in seen: | |
| seen.add(c_clean) | |
| unique_claims.append(c) | |
| # --- PHASE 3: Verification and Reporting per Claim --- | |
| reports = [] | |
| for claim in unique_claims: | |
| # Re-run search specifically for the claim for better grounding | |
| serpapi_for_claim = serpapi_search(claim, "web_search", num=6) if run_serpapi and SERPAPI_KEY else serpapi_result | |
| hf_result = hf_zero_shot_classify(claim) | |
| gemini_verdict = gemini_verify_claim(claim, serpapi_for_claim, image_analysis) | |
| evidence_agg = aggregate_search_results(serpapi_for_claim) | |
| combined = combine_signals(gemini_verdict, hf_result, evidence_agg) | |
| # Check for media fabrication and override if needed | |
| is_fake_media, override_reason = media_authenticity_override({"image_analysis": image_analysis, "gemini_verdict": gemini_verdict, "evidence_agg": evidence_agg}) | |
| if is_fake_media: | |
| combined["final_verdict"] = "False" | |
| combined["confidence"] = max(combined.get("confidence", 0.6), 0.7) | |
| combined["reasons"].insert(0, override_reason) | |
| report_entry = { | |
| "claim": claim, | |
| "context_snippet": article_text[:400] if article_text else "None", | |
| "image_analysis": image_analysis, | |
| "hf_classifier": hf_result, | |
| "gemini_verdict": gemini_verdict, | |
| "evidence_agg": evidence_agg, | |
| "phishing_analysis": phish_report, | |
| "final_verdict": combined["final_verdict"], | |
| "confidence": combined["confidence"], | |
| "reasons": combined.get("reasons", []), | |
| "final_score": combined.get("final_score"), | |
| "trust_score_pct": trust_score_pct_from_final_score(combined.get("final_score", 0.0)) | |
| } | |
| # Generate the final user-friendly summary | |
| report_entry["qa_summary"] = self.format_user_friendly_explanation(report_entry) | |
| reports.append(report_entry) | |
| # --- PHASE 4: Summary --- | |
| summary = {"counts": {}, "dominant_verdict": "Unclear", "phishing_flag": False} | |
| for r in reports: | |
| v = r["final_verdict"] | |
| summary["counts"][v] = summary["counts"].get(v, 0) + 1 | |
| if reports: | |
| dominant = max(summary["counts"].items(), key=lambda x: x[1])[0] | |
| summary["dominant_verdict"] = dominant | |
| sb = (phish_report.get("safe_browsing") or {}) | |
| vt = (phish_report.get("virustotal") or {}) | |
| summary["phishing_flag"] = bool(sb.get("safe") is False or vt.get("safe") is False) | |
| return { | |
| "metadata": { | |
| "url_analyzed": url, "image_source_analyzed": image_source, | |
| "timestamp": time.time(), "claims_analyzed": len(reports) | |
| }, | |
| "summary": summary, | |
| "reports": reports, | |
| } | |
| def format_user_friendly_explanation(self, report_entry: dict) -> str: | |
| """Creates a structured Q/A summary for the Gradio output box.""" | |
| claim = report_entry.get("claim", "").strip() or "(no claim provided)" | |
| reasons = report_entry.get("reasons", []) or [] | |
| verdict = report_entry.get("final_verdict", "Unclear") | |
| confidence = report_entry.get("confidence", 0.0) | |
| # Q0: Final Verdict | |
| q0 = f"Q0: Final Verdict: **{verdict.upper()}** (Confidence: {int(confidence * 100)}%)\nClaim: {claim}" | |
| # Q1: Why | |
| reason_texts = [str(r["description"]) if isinstance(r, dict) and r.get("description") else str(r) for r in reasons[:3]] | |
| q1 = f"Q1: Why did we reach this verdict?\nA: {'; '.join(reason_texts) or 'No strong model reasons were returned.'}" | |
| # Q2: How verified | |
| evidence_agg = report_entry.get("evidence_agg", {}) or {} | |
| evidence = evidence_agg.get("evidence", []) or [] | |
| top_sources = [f"{e.get('domain')}: {e.get('title', '')[:100]}" for e in evidence[:3] if e.get('link')] | |
| checks = ["Gemini model analysis", "Web evidence aggregation"] | |
| if report_entry.get("hf_classifier") and not report_entry["hf_classifier"].get("error"): checks.append("HF zero-shot classifier") | |
| if report_entry.get("image_analysis", {}).get("fetched"): checks.append("Media analysis (EXIF/ELA/pHash/reverse-image)") | |
| phish = report_entry.get("phishing_analysis", {}) | |
| if phish.get("safe_browsing", {}).get("safe") is False: checks.append("Safe Browsing Flagged") | |
| if phish.get("virustotal", {}).get("safe") is False: checks.append("VirusTotal Flagged") | |
| q2_lines = [ | |
| "Q2: How was it verified?", | |
| "A: Verified by:", | |
| f"- Top web references:\n- {'\n- '.join(top_sources) if top_sources else 'No strong web sources found.'}", | |
| f"- Automated checks: {', '.join(checks)}" | |
| ] | |
| q2 = "\n".join(q2_lines) | |
| # Q3: Next Steps | |
| next_steps = ["- Seek independent confirmation from trusted outlets."] | |
| if top_sources: next_steps.insert(0, "- Read the listed sources for full context and check publication dates.") | |
| if phish.get("url") and phish.get("safe_browsing", {}).get("safe") is False: | |
| next_steps.insert(0, "- **CRITICAL:** Do NOT click links from this page; treat it as potentially unsafe and report it.") | |
| q3 = "Q3: What should you do next?\nA: " + " ".join(next_steps) | |
| return f"{q0}\n\n{q1}\n\n{q2}\n\n{q3}" | |