Spaces:
Running
Running
| """ | |
| PhilVerify β Scoring Engine (Orchestrator) | |
| Ties together all NLP modules, Layer 1, and Layer 2 into a final VerificationResponse. | |
| Final Score = (ML Confidence Γ 0.40) + (Evidence Score Γ 0.60) | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import uuid | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from config import get_settings | |
| from api.schemas import ( | |
| VerificationResponse, Verdict, Language, DomainTier, | |
| Layer1Result, Layer2Result, EntitiesResult, EvidenceSource, Stance, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| settings = get_settings() | |
| # ββ Module-level NLP singleton cache βββββββββββββββββββββββββββββββββββββββββ | |
| # These are created once per process and reused across all requests. | |
| # Creating fresh instances on every request causes unnecessary model reloads | |
| # from disk (300β500 ms each) which compounds into multi-second latency. | |
| _nlp_cache: dict = {} | |
| def _get_nlp(key: str, factory): | |
| """Return cached NLP instance, creating via factory() on first call.""" | |
| if key not in _nlp_cache: | |
| _nlp_cache[key] = factory() | |
| return _nlp_cache[key] | |
| # ββ Domain credibility lookup βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _DOMAIN_DB_PATH = Path(__file__).parent.parent / "domain_credibility.json" | |
| _DOMAIN_DB: dict = {} | |
| def _load_domain_db() -> dict: | |
| global _DOMAIN_DB | |
| if not _DOMAIN_DB: | |
| try: | |
| _DOMAIN_DB = json.loads(_DOMAIN_DB_PATH.read_text()) | |
| except Exception as e: | |
| logger.warning("Could not load domain_credibility.json: %s", e) | |
| return _DOMAIN_DB | |
| def get_domain_tier(domain: str) -> DomainTier | None: | |
| if not domain: | |
| return None | |
| db = _load_domain_db() | |
| domain = domain.lower().replace("www.", "") | |
| for tier_key, tier_data in db.items(): | |
| if domain in tier_data.get("domains", []): | |
| return DomainTier(int(tier_key[-1])) | |
| return DomainTier.SUSPICIOUS # Unknown domains default to Tier 3 | |
| def _map_verdict(final_score: float) -> Verdict: | |
| if final_score >= settings.credible_threshold: | |
| return Verdict.CREDIBLE | |
| elif final_score >= settings.fake_threshold: | |
| return Verdict.UNVERIFIED | |
| else: | |
| return Verdict.LIKELY_FAKE | |
| async def run_verification( | |
| text: str, | |
| input_type: str = "text", | |
| source_domain: str | None = None, | |
| ) -> VerificationResponse: | |
| """ | |
| Full verification pipeline orchestrator. | |
| Runs NLP analysis and ML classifier synchronously, evidence retrieval async. | |
| """ | |
| # ββ Lazy imports so app starts without heavy deps βββββββββββββββββββββββββ | |
| from nlp.preprocessor import TextPreprocessor | |
| from nlp.language_detector import LanguageDetector | |
| from nlp.ner import EntityExtractor | |
| from nlp.sentiment import SentimentAnalyzer | |
| from nlp.clickbait import ClickbaitDetector | |
| from nlp.claim_extractor import ClaimExtractor | |
| from evidence.news_fetcher import fetch_evidence, compute_similarity | |
| # ββ Step 1: Preprocess ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| preprocessor = _get_nlp("preprocessor", TextPreprocessor) | |
| proc = preprocessor.preprocess(text) | |
| # ββ Step 2: Language detection ββββββββββββββββββββββββββββββββββββββββββββ | |
| lang_detector = _get_nlp("lang_detector", LanguageDetector) | |
| lang_result = lang_detector.detect(text) | |
| language = Language(lang_result.language) if lang_result.language in Language._value2member_map_ else Language.TAGLISH | |
| # ββ Steps 3β6: NLP analysis (run concurrently) βββββββββββββββββββββββββββ | |
| ner_extractor = _get_nlp("ner_extractor", EntityExtractor) | |
| sentiment_analyzer = _get_nlp("sentiment", SentimentAnalyzer) | |
| clickbait_detector = _get_nlp("clickbait", ClickbaitDetector) | |
| claim_extractor = _get_nlp("claim_extractor", ClaimExtractor) | |
| ner_result = ner_extractor.extract(text) | |
| sentiment_result = sentiment_analyzer.analyze(proc.cleaned) | |
| clickbait_result = clickbait_detector.detect(text) | |
| claim_result = claim_extractor.extract(proc.cleaned) | |
| # ββ Step 7: Layer 1 β ML Classifier ββββββββββββββββββββββββββββββββββββββ | |
| # Try fine-tuned XLM-RoBERTa first; fall back to TF-IDF baseline if the | |
| # checkpoint hasn't been generated yet (ml/train_xlmr.py not yet run). | |
| model_tier = "xlmr" # for observability in logs | |
| try: | |
| from ml.xlm_roberta_classifier import XLMRobertaClassifier, ModelNotFoundError | |
| classifier = _get_nlp("xlmr_classifier", XLMRobertaClassifier) | |
| except ModelNotFoundError: | |
| logger.info("XLM-RoBERTa checkpoint not found β falling back to TF-IDF baseline") | |
| from ml.tfidf_classifier import TFIDFClassifier | |
| def _make_tfidf(): | |
| c = TFIDFClassifier(); c.train(); return c | |
| classifier = _get_nlp("tfidf_classifier", _make_tfidf) | |
| model_tier = "tfidf" | |
| except Exception as exc: | |
| logger.warning("XLM-RoBERTa load failed (%s) β falling back to TF-IDF", exc) | |
| from ml.tfidf_classifier import TFIDFClassifier | |
| def _make_tfidf(): # noqa: F811 | |
| c = TFIDFClassifier(); c.train(); return c | |
| classifier = _get_nlp("tfidf_classifier", _make_tfidf) | |
| model_tier = "tfidf" | |
| l1 = classifier.predict(proc.cleaned) | |
| logger.debug("Layer-1 (%s): %s %.1f%%", model_tier, l1.verdict, l1.confidence) | |
| # Enrich triggered features with NLP signals | |
| if clickbait_result.is_clickbait: | |
| l1.triggered_features.extend(clickbait_result.triggered_patterns[:3]) | |
| if sentiment_result.sentiment in ("high negative",): | |
| l1.triggered_features.append("high emotional language") | |
| layer1 = Layer1Result( | |
| verdict=Verdict(l1.verdict), | |
| confidence=l1.confidence, | |
| triggered_features=l1.triggered_features, | |
| ) | |
| # ββ Step 8: Layer 2 β Evidence Retrieval ββββββββββββββββββββββββββββββββββ | |
| # Default evidence score depends on source domain tier when no API key is set: | |
| # Tier 1 (Inquirer, GMA, Rapplerβ¦) β 65 β known credible, not neutral | |
| # Tier 2 (satire/opinion) β 45 β slight skepticism | |
| # Tier 3 (unknown) β 50 β neutral | |
| # Tier 4 (blacklisted) β 25 β heavy prior against | |
| _src_tier_pre = get_domain_tier(source_domain) if source_domain else None | |
| _EVIDENCE_DEFAULTS: dict = { | |
| DomainTier.CREDIBLE: 65.0, | |
| DomainTier.SATIRE_OPINION: 45.0, | |
| DomainTier.SUSPICIOUS: 50.0, | |
| DomainTier.KNOWN_FAKE: 25.0, | |
| } | |
| evidence_score = _EVIDENCE_DEFAULTS.get(_src_tier_pre, 50.0) if _src_tier_pre else 50.0 | |
| evidence_sources: list[EvidenceSource] = [] | |
| l2_verdict = Verdict.UNVERIFIED | |
| if settings.news_api_key: | |
| try: | |
| query_entities = ner_result.persons + ner_result.organizations + ner_result.locations | |
| articles = await fetch_evidence( | |
| claim_result.claim, | |
| settings.news_api_key, | |
| entities=query_entities | |
| ) | |
| for art in articles[:5]: | |
| article_text = f"{art.get('title', '')} {art.get('description', '')}" | |
| sim = compute_similarity(claim_result.claim, article_text) | |
| domain = (art.get("source", {}) or {}).get("name", "unknown").lower() | |
| tier = get_domain_tier(domain) | |
| # Simple stance heuristic β negative title keywords β Refutes | |
| title_lower = (art.get("title") or "").lower() | |
| stance = Stance.NOT_ENOUGH_INFO | |
| if any(w in title_lower for w in ["false", "fake", "hoax", "wrong", "debunked", "fact check"]): | |
| stance = Stance.REFUTES | |
| elif sim > 0.6: | |
| stance = Stance.SUPPORTS | |
| evidence_sources.append(EvidenceSource( | |
| title=art.get("title", ""), | |
| url=art.get("url", ""), | |
| similarity=sim, | |
| stance=stance, | |
| domain_tier=tier or DomainTier.SUSPICIOUS, | |
| published_at=art.get("publishedAt"), | |
| source_name=art.get("source", {}).get("name"), | |
| )) | |
| # Evidence score: average similarity Γ 100, penalized for refuting sources | |
| if evidence_sources: | |
| supporting = [s for s in evidence_sources if s.stance == Stance.SUPPORTS] | |
| refuting = [s for s in evidence_sources if s.stance == Stance.REFUTES] | |
| avg_sim = sum(s.similarity for s in evidence_sources) / len(evidence_sources) | |
| refute_penalty = len(refuting) * 15 | |
| evidence_score = max(0.0, min(100.0, avg_sim * 100 - refute_penalty)) | |
| if len(refuting) > len(supporting): | |
| l2_verdict = Verdict.LIKELY_FAKE | |
| elif len(supporting) >= 2: | |
| l2_verdict = Verdict.CREDIBLE | |
| except Exception as e: | |
| logger.warning("Evidence retrieval failed: %s β using neutral score", e) | |
| layer2 = Layer2Result( | |
| verdict=l2_verdict, | |
| evidence_score=round(evidence_score, 1), | |
| sources=evidence_sources, | |
| claim_used=claim_result.claim, | |
| ) | |
| # ββ Step 9: Final Score βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ML confidence is 0-100 where high = more credible for the predicted class. | |
| # Adjust: if ML says Fake, its confidence works against credibility. | |
| ml_credibility = l1.confidence if l1.verdict == "Credible" else (100 - l1.confidence) | |
| base_score = (ml_credibility * settings.ml_weight) + (evidence_score * settings.evidence_weight) | |
| # Domain credibility adjustment β applied when we know the source URL. | |
| # The adjustment scales with how much ML disagrees with the domain tier: | |
| # - Tier 1 source but ML says Fake at high confidence β bigger boost needed | |
| # - Tier 4 source but ML says Credible at high confidence β bigger penalty | |
| # Base adjustments are scaled up by a "disagreement multiplier" (1.0β2.0) | |
| # so that a 95%-confident ML prediction on a Tier 1 source still respects | |
| # the fact that the article came from a verified outlet. | |
| domain_tier = get_domain_tier(source_domain) if source_domain else None | |
| domain_adjustment = 0.0 | |
| if domain_tier is not None: | |
| _BASE_ADJ = { | |
| DomainTier.CREDIBLE: +20.0, # Tier 1 β established PH news orgs | |
| DomainTier.SATIRE_OPINION: -5.0, # Tier 2 β satire / opinion blogs | |
| DomainTier.SUSPICIOUS: -10.0, # Tier 3 β unknown / unverified | |
| DomainTier.KNOWN_FAKE: -35.0, # Tier 4 β blacklisted | |
| } | |
| base_adj = _BASE_ADJ.get(domain_tier, 0.0) | |
| # Disagreement multiplier: how much does ML diverge from what the domain implies? | |
| # Tier 1 implies credible (75), Tier 4 implies fake (25); others neutral (50) | |
| _TIER_IMPLIED_SCORE = { | |
| DomainTier.CREDIBLE: 75.0, | |
| DomainTier.SATIRE_OPINION: 50.0, | |
| DomainTier.SUSPICIOUS: 50.0, | |
| DomainTier.KNOWN_FAKE: 25.0, | |
| } | |
| implied = _TIER_IMPLIED_SCORE.get(domain_tier, 50.0) | |
| disagreement = abs(ml_credibility - implied) / 50.0 # 0.0 β 1.0+, capped below | |
| multiplier = min(1.5, 1.0 + disagreement * 0.5) # 1.0 (agree) β 1.5 (hard disagree) | |
| domain_adjustment = base_adj * multiplier | |
| logger.info( | |
| "Domain credibility: %s (Tier %s) base=%+.0f Γ multiplier=%.2f β %+.1f pts " | |
| "(ml_credibility=%.1f, implied=%.0f)", | |
| source_domain, domain_tier.value, base_adj, multiplier, domain_adjustment, | |
| ml_credibility, implied, | |
| ) | |
| final_score = round(min(100.0, max(0.0, base_score + domain_adjustment)), 1) | |
| verdict = _map_verdict(final_score) | |
| # ββ Step 10: Assemble response ββββββββββββββββββββββββββββββββββββββββββββ | |
| result = VerificationResponse( | |
| verdict=verdict, | |
| confidence=round(max(l1.confidence, evidence_score / 100 * 100), 1), | |
| final_score=final_score, | |
| layer1=layer1, | |
| layer2=layer2, | |
| entities=EntitiesResult( | |
| persons=ner_result.persons, | |
| organizations=ner_result.organizations, | |
| locations=ner_result.locations, | |
| dates=ner_result.dates, | |
| ), | |
| sentiment=sentiment_result.sentiment, | |
| emotion=sentiment_result.emotion, | |
| language=language, | |
| domain_credibility=get_domain_tier(source_domain) if source_domain else None, | |
| input_type=input_type, | |
| ) | |
| # ββ Record to Firestore (falls back to in-memory if Firebase not configured) β | |
| history_entry = { | |
| "id": str(uuid.uuid4()), | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "input_type": input_type, | |
| "text_preview": text[:120], | |
| "verdict": verdict.value, | |
| "confidence": result.confidence, | |
| "final_score": final_score, | |
| "entities": ner_result.to_dict(), | |
| "claim_used": claim_result.claim, | |
| "layer1": { | |
| "verdict": layer1.verdict.value, | |
| "confidence": layer1.confidence, | |
| "triggered_features": layer1.triggered_features, | |
| }, | |
| "layer2": { | |
| "verdict": layer2.verdict.value, | |
| "evidence_score": layer2.evidence_score, | |
| "claim_used": layer2.claim_used, | |
| }, | |
| "sentiment": sentiment_result.sentiment, | |
| "emotion": sentiment_result.emotion, | |
| "language": language.value, | |
| } | |
| try: | |
| from firebase_client import save_verification | |
| saved = await save_verification(history_entry) | |
| if not saved: | |
| # Firestore unavailable β fall back to in-memory store | |
| from api.routes.history import record_verification | |
| record_verification(history_entry) | |
| except Exception as e: | |
| logger.warning("Failed to record history: %s", e) | |
| try: | |
| from api.routes.history import record_verification | |
| record_verification(history_entry) | |
| except Exception: | |
| pass | |
| return result | |