Spaces:
Running
Running
| import pandas as pd | |
| import numpy as np | |
| import warnings | |
| import feedparser | |
| from datetime import datetime | |
| import gradio as gr | |
| warnings.filterwarnings('ignore') | |
| print("β Core imports done.") | |
| from datasets import load_dataset | |
| # ββ Fallback generators (non-negotiable for reproducibility) βββββββββββββββββ | |
| def generate_fallback_liar(): | |
| """Synthetic LIAR-style dataset if HuggingFace load fails.""" | |
| data = [ | |
| ("The unemployment rate is at a 50-year low.", "half-true"), | |
| ("Vaccines contain microchips for government tracking.", "pants-fire"), | |
| ("Climate change is causing more frequent hurricanes.", "mostly-true"), | |
| ("The stock market had its best year ever last year.", "false"), | |
| ("Water covers about 71% of Earth's surface.", "true"), | |
| ("The moon landing was filmed in a Hollywood studio.", "pants-fire"), | |
| ("Eating carrots improves night vision significantly.", "barely-true"), | |
| ("5G towers spread the COVID-19 virus.", "pants-fire"), | |
| ("Exercise reduces the risk of type 2 diabetes.", "true"), | |
| ("The Eiffel Tower grows taller in summer.", "mostly-true"), | |
| ] * 50 # 500 samples | |
| df = pd.DataFrame(data, columns=['statement', 'label']) | |
| print("β οΈ Using synthetic LIAR fallback (500 samples).") | |
| return df | |
| def generate_fallback_hallucination(): | |
| """Synthetic hallucination dataset if HuggingFace load fails.""" | |
| data = [ | |
| ("The Eiffel Tower is located in Berlin.", True), | |
| ("Python was created by Guido van Rossum.", False), | |
| ("Shakespeare wrote 'War and Peace'.", True), | |
| ("The speed of light is approximately 3Γ10βΈ m/s.", False), | |
| ("The Great Wall of China is visible from space with the naked eye.", True), | |
| ] * 40 | |
| df = pd.DataFrame(data, columns=['claim', 'is_hallucination']) | |
| print("β οΈ Using synthetic hallucination fallback (200 samples).") | |
| return df | |
| # ββ Load LIAR dataset βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| liar_raw = load_dataset("liar", trust_remote_code=True) | |
| liar_df = pd.DataFrame({ | |
| 'statement': liar_raw['train']['statement'], | |
| 'label': liar_raw['train']['label'] | |
| }) | |
| label_names = ['pants-fire','false','barely-true','half-true','mostly-true','true'] | |
| liar_df['label'] = liar_df['label'].apply(lambda x: label_names[x] if isinstance(x, int) else x) | |
| print(f"β LIAR dataset loaded: {len(liar_df)} samples") | |
| except Exception as e: | |
| print(f"LIAR load failed ({e}), using fallback.") | |
| liar_df = generate_fallback_liar() | |
| # ββ Load TruthfulQA βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| tqa_raw = load_dataset("truthful_qa", "generation", trust_remote_code=True) | |
| tqa_df = pd.DataFrame({ | |
| 'question': tqa_raw['validation']['question'], | |
| 'best_answer': tqa_raw['validation']['best_answer'], | |
| }) | |
| print(f"β TruthfulQA loaded: {len(tqa_df)} samples") | |
| except Exception as e: | |
| print(f"TruthfulQA load failed ({e}), using fallback.") | |
| tqa_df = generate_fallback_hallucination() | |
| # ββ Load HaluEval βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| halu_raw = load_dataset("pminervini/HaluEval", "general_samples", trust_remote_code=True) | |
| halu_df = pd.DataFrame(halu_raw['data']) | |
| print(f"β HaluEval loaded: {len(halu_df)} samples") | |
| except Exception as e: | |
| print(f"HaluEval load failed ({e}), using fallback.") | |
| halu_df = generate_fallback_hallucination() | |
| print("\nπ Dataset summary:") | |
| print(f" LIAR: {len(liar_df)} rows, columns: {list(liar_df.columns)}") | |
| print(f" TruthfulQA: {len(tqa_df)} rows") | |
| print(f" HaluEval: {len(halu_df)} rows") | |
| # ββ Live RSS News Feed ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| RSS_FEEDS = { | |
| 'BBC': 'http://feeds.bbci.co.uk/news/world/rss.xml', | |
| 'Reuters': 'https://feeds.reuters.com/reuters/topNews', | |
| 'AP': 'https://rsshub.app/apnews/topics/apf-topnews', | |
| } | |
| headlines = [] | |
| for source, url in RSS_FEEDS.items(): | |
| try: | |
| feed = feedparser.parse(url) | |
| for entry in feed.entries[:10]: | |
| pub = entry.get('published', str(datetime.now())) | |
| headlines.append({ | |
| 'headline': entry.get('title', ''), | |
| 'summary': entry.get('summary', ''), | |
| 'source': source, | |
| 'published_at': pub, | |
| 'link': entry.get('link', '') | |
| }) | |
| except Exception as e: | |
| print(f" β οΈ {source} RSS failed: {e}") | |
| if not headlines: | |
| # Fallback static headlines for offline environments | |
| headlines = [ | |
| {'headline': 'Global temperatures hit record highs in 2024', 'summary': '', 'source': 'synthetic', 'published_at': '2024-01-01', 'link': ''}, | |
| {'headline': 'AI models show improved reasoning capabilities', 'summary': '', 'source': 'synthetic', 'published_at': '2024-01-02', 'link': ''}, | |
| {'headline': 'New vaccine approved for respiratory illness', 'summary': '', 'source': 'synthetic', 'published_at': '2024-01-03', 'link': ''}, | |
| ] * 5 | |
| print("β οΈ Using synthetic headlines (no network access).") | |
| news_df = pd.DataFrame(headlines) | |
| news_df['published_at'] = pd.to_datetime(news_df['published_at'], errors='coerce', utc=True) | |
| print(f"β Live news loaded: {len(news_df)} headlines from {news_df['source'].nunique()} sources") | |
| from transformers import pipeline | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| import re | |
| # ββ Load lightweight models βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("Loading sentiment pipeline...") | |
| sentiment_pipeline = pipeline( | |
| "sentiment-analysis", | |
| model="distilbert-base-uncased-finetuned-sst-2-english", | |
| truncation=True, max_length=512 | |
| ) | |
| print("Loading NLI pipeline (DeBERTa)...") | |
| nli_pipeline = pipeline( | |
| "zero-shot-classification", | |
| model="cross-encoder/nli-deberta-v3-small", | |
| device=-1 # CPU | |
| ) | |
| print("Loading sentence embedding model...") | |
| embedder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| print("β All models loaded.") | |
| # ββ Build FAISS Vector Index of Trusted Facts βββββββββββββββββββββββββββββββββ | |
| TRUSTED_FACTS = [ | |
| "Water boils at 100 degrees Celsius at sea level.", | |
| "The Earth orbits the Sun, not the other way around.", | |
| "The speed of light in a vacuum is approximately 299,792 kilometers per second.", | |
| "DNA carries genetic information in living organisms.", | |
| "The Great Wall of China is not visible from space with the naked eye.", | |
| "Humans and chimpanzees share approximately 98.7% of their DNA.", | |
| "The moon is approximately 384,400 kilometers from Earth.", | |
| "Mount Everest is the highest mountain above sea level at 8,849 meters.", | |
| "Vaccines work by stimulating the immune system to recognize pathogens.", | |
| "The human brain contains approximately 86 billion neurons.", | |
| "Carbon dioxide concentration in the atmosphere has increased since industrialization.", | |
| "The Eiffel Tower is located in Paris, France.", | |
| "Python was created by Guido van Rossum and first released in 1991.", | |
| "Shakespeare wrote Hamlet, Macbeth, and Romeo and Juliet.", | |
| "The United States has 50 states.", | |
| "Albert Einstein published the special theory of relativity in 1905.", | |
| "Antibiotics are not effective against viral infections.", | |
| "The Pacific Ocean is the largest ocean on Earth.", | |
| "The human body has 206 bones in adulthood.", | |
| "Climate change is driven primarily by human greenhouse gas emissions according to scientific consensus.", | |
| ] | |
| # Encode and index | |
| fact_embeddings = embedder.encode(TRUSTED_FACTS, convert_to_numpy=True) | |
| dim = fact_embeddings.shape[1] | |
| faiss_index = faiss.IndexFlatL2(dim) | |
| faiss_index.add(fact_embeddings.astype(np.float32)) | |
| print(f"β FAISS index built with {faiss_index.ntotal} trusted facts (dim={dim})") | |
| # ββ Feature Extraction Functions ββββββββββββββββββββββββββββββββββββββββββββββ | |
| SOURCE_CREDIBILITY = { | |
| 'bbc.co.uk': 0.92, 'reuters.com': 0.94, 'apnews.com': 0.93, | |
| 'nytimes.com': 0.88, 'theguardian.com': 0.87, 'nature.com': 0.98, | |
| 'who.int': 0.97, 'cdc.gov': 0.97, 'infowars.com': 0.05, | |
| 'naturalnews.com': 0.08, 'breitbart.com': 0.22, 'synthetic': 0.50, | |
| 'BBC': 0.92, 'Reuters': 0.94, 'AP': 0.93, | |
| } | |
| FAKE_DOI_PATTERN = re.compile( | |
| r'10\.\d{4,}/[a-zA-Z0-9./_-]+' | |
| ) | |
| IMPOSSIBLE_YEAR = re.compile(r'\b(19[0-2]\d|2[1-9]\d{2})\b') | |
| INVENTED_INSTITUTIONS = re.compile( | |
| r'(Institute of [A-Z][a-z]+ [A-Z][a-z]+|Foundation for [A-Z][a-z]+ Research)', | |
| re.IGNORECASE | |
| ) | |
| def get_sentiment_score(text: str) -> float: | |
| try: | |
| result = sentiment_pipeline(text[:512])[0] | |
| score = result['score'] | |
| return score if result['label'] == 'POSITIVE' else -score | |
| except: | |
| return 0.0 | |
| def get_source_credibility(source: str) -> float: | |
| for domain, score in SOURCE_CREDIBILITY.items(): | |
| if domain.lower() in source.lower(): | |
| return score | |
| return 0.5 | |
| def get_citation_anomaly_score(text: str) -> float: | |
| score = 0.0 | |
| if FAKE_DOI_PATTERN.search(text): score += 0.3 | |
| if IMPOSSIBLE_YEAR.search(text): score += 0.3 | |
| if INVENTED_INSTITUTIONS.search(text): score += 0.4 | |
| return min(score, 1.0) | |
| def get_semantic_similarity(text: str, k: int = 3) -> float: | |
| try: | |
| emb = embedder.encode([text], convert_to_numpy=True).astype(np.float32) | |
| distances, _ = faiss_index.search(emb, k) | |
| avg_dist = np.mean(distances[0]) | |
| similarity = 1.0 / (1.0 + avg_dist) | |
| return float(np.clip(similarity, 0, 1)) | |
| except: | |
| return 0.5 | |
| def get_nli_contradiction_score(claim: str, references: list) -> float: | |
| try: | |
| result = nli_pipeline( | |
| claim, | |
| candidate_labels=["entailment", "neutral", "contradiction"], | |
| hypothesis_template="This claim is related to: {}", | |
| ) | |
| scores = dict(zip(result['labels'], result['scores'])) | |
| return float(scores.get('contradiction', 0.0)) | |
| except: | |
| return 0.5 | |
| def retrieve_reference_sentences(claim: str, k: int = 5) -> list: | |
| try: | |
| emb = embedder.encode([claim], convert_to_numpy=True).astype(np.float32) | |
| _, indices = faiss_index.search(emb, k) | |
| return [TRUSTED_FACTS[i] for i in indices[0] if i < len(TRUSTED_FACTS)] | |
| except: | |
| return TRUSTED_FACTS[:k] | |
| print("β Feature extraction functions defined.") | |
| # ββ A. Fake News Classifier (LIAR β 3-class) ββββββββββββββββββββββββββββββββββ | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import classification_report | |
| LIAR_MAP = { | |
| 'pants-fire': 'misinformation', | |
| 'false': 'misinformation', | |
| 'barely-true': 'uncertain', | |
| 'half-true': 'uncertain', | |
| 'mostly-true': 'credible', | |
| 'true': 'credible', | |
| } | |
| liar_sample = liar_df.sample(min(500, len(liar_df)), random_state=42).copy() | |
| liar_sample['label_3'] = liar_sample['label'].map(LIAR_MAP).fillna('uncertain') | |
| print("Encoding LIAR statements...") | |
| X_liar = embedder.encode(liar_sample['statement'].tolist(), show_progress_bar=False) | |
| y_liar = liar_sample['label_3'].values | |
| X_train, X_test, y_train, y_test = train_test_split(X_liar, y_liar, test_size=0.2, random_state=42) | |
| fake_news_clf = LogisticRegression(max_iter=500, random_state=42) | |
| fake_news_clf.fit(X_train, y_train) | |
| print("β Fake news classifier trained.") | |
| # ββ B. Hallucination Scorer βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def score_hallucination(claim: str) -> dict: | |
| try: | |
| references = retrieve_reference_sentences(claim, k=5) | |
| contradiction_score = get_nli_contradiction_score(claim, references) | |
| similarity = get_semantic_similarity(claim) | |
| citation_anomaly = get_citation_anomaly_score(claim) | |
| raw_risk = ( | |
| 0.50 * contradiction_score + | |
| 0.30 * (1 - similarity) + | |
| 0.20 * citation_anomaly | |
| ) | |
| hallucination_risk = int(np.clip(raw_risk * 100, 0, 100)) | |
| return { | |
| 'hallucination_risk': hallucination_risk, | |
| 'contradiction_score': round(contradiction_score, 3), | |
| 'semantic_similarity': round(similarity, 3), | |
| 'evidence_snippets': references[:3] | |
| } | |
| except Exception as e: | |
| return {'hallucination_risk': 50, 'contradiction_score': 0.5, | |
| 'semantic_similarity': 0.5, 'evidence_snippets': [], 'error': str(e)} | |
| print("β Hallucination scorer working.") | |
| # ββ C. Event Volatility Forecaster βββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| from statsforecast import StatsForecast | |
| from statsforecast.models import AutoARIMA | |
| HAS_STATSFORECAST = True | |
| except ImportError: | |
| HAS_STATSFORECAST = False | |
| print("β οΈ statsforecast not available, using EWMA fallback.") | |
| def compute_volatility_series(df: pd.DataFrame, window: int = 7) -> pd.Series: | |
| df = df.copy().sort_values('published_at') | |
| sentiments = df['headline'].apply(get_sentiment_score) | |
| volatility = sentiments.rolling(window=min(window, len(df)), min_periods=1).std().fillna(0) | |
| return volatility | |
| def forecast_volatility(series: pd.Series, horizon: int = 3) -> dict: | |
| if HAS_STATSFORECAST and len(series) >= 10: | |
| try: | |
| sf_df = pd.DataFrame({ | |
| 'unique_id': 'news_vol', | |
| 'ds': pd.date_range(start='2024-01-01', periods=len(series), freq='D'), | |
| 'y': series.values | |
| }) | |
| sf = StatsForecast(models=[AutoARIMA()], freq='D') | |
| forecast = sf.forecast(df=sf_df, h=horizon) | |
| forecasted_values = forecast['AutoARIMA'].values.tolist() | |
| trend = 'rising' if forecasted_values[-1] > series.mean() else 'stable' | |
| return {'method': 'AutoARIMA', 'forecast': forecasted_values, 'trend': trend} | |
| except: | |
| pass | |
| ewma = series.ewm(span=min(5, len(series))).mean() | |
| last = ewma.iloc[-1] | |
| forecasted = [last * (1 + 0.02 * i) for i in range(1, horizon + 1)] | |
| trend = 'rising' if forecasted[-1] > series.mean() else 'stable' | |
| return {'method': 'EWMA', 'forecast': forecasted, 'trend': trend} | |
| # ββ D. Final Risk Score Aggregator ββββββββββββββββββββββββββββββββββββββββββββ | |
| W_HALLUCINATION = 0.40 | |
| W_FAKE_NEWS = 0.35 | |
| W_CITATION = 0.15 | |
| W_SIMILARITY = 0.10 | |
| COLOR_MAP = { | |
| 'confirmed': 'rgba(52, 199, 89, 0.15)', | |
| 'uncertain': 'rgba(255, 204, 0, 0.15)', | |
| 'misinformation':'rgba(255, 59, 48, 0.15)', | |
| 'hallucination': 'rgba(175, 82, 222, 0.15)', | |
| } | |
| def get_fake_news_probability(text: str) -> tuple[str, float]: | |
| try: | |
| emb = embedder.encode([text]) | |
| proba = fake_news_clf.predict_proba(emb)[0] | |
| classes = fake_news_clf.classes_ | |
| label = classes[np.argmax(proba)] | |
| confidence = float(np.max(proba)) | |
| return label, confidence | |
| except: | |
| return 'uncertain', 0.5 | |
| def analyze_text(text: str, source: str = 'unknown') -> dict: | |
| try: | |
| halu_result = score_hallucination(text) | |
| fake_label, fake_conf = get_fake_news_probability(text) | |
| citation_score = get_citation_anomaly_score(text) | |
| similarity = get_semantic_similarity(text) | |
| credibility = get_source_credibility(source) | |
| fake_risk = {'misinformation': 0.9, 'uncertain': 0.5, 'credible': 0.1}.get(fake_label, 0.5) | |
| combined_risk = ( | |
| W_HALLUCINATION * (halu_result['hallucination_risk'] / 100) + | |
| W_FAKE_NEWS * fake_risk + | |
| W_CITATION * citation_score + | |
| W_SIMILARITY * (1 - similarity) | |
| ) | |
| combined_risk = float(np.clip(combined_risk, 0, 1)) | |
| if combined_risk < 0.25: | |
| status = 'confirmed' | |
| elif combined_risk < 0.55: | |
| status = 'uncertain' | |
| elif halu_result['hallucination_risk'] > 60: | |
| status = 'hallucination' | |
| else: | |
| status = 'misinformation' | |
| confidence = abs(combined_risk - 0.5) * 2 | |
| tooltip = ( | |
| f"{status.title()} risk: {int(combined_risk*100)}%. " | |
| f"Hallucination: {halu_result['hallucination_risk']}%. " | |
| f"Source credibility: {int(credibility*100)}%." | |
| ) | |
| return { | |
| 'text': text, | |
| 'status': status, | |
| 'color': COLOR_MAP[status], | |
| 'hallucination_risk': halu_result['hallucination_risk'], | |
| 'fake_news_label': fake_label, | |
| 'combined_risk': round(combined_risk, 3), | |
| 'confidence': round(confidence, 3), | |
| 'volatility_index': round(1 - similarity, 3), | |
| 'tooltip_message': tooltip, | |
| 'evidence_snippets': halu_result['evidence_snippets'] | |
| } | |
| except Exception as e: | |
| return { | |
| 'text': text, 'status': 'uncertain', 'color': COLOR_MAP['uncertain'], | |
| 'hallucination_risk': 50, 'fake_news_label': 'uncertain', | |
| 'combined_risk': 0.5, 'confidence': 0.0, 'volatility_index': 0.5, | |
| 'tooltip_message': f'Analysis failed gracefully: {str(e)}', | |
| 'evidence_snippets': [] | |
| } | |
| # ββ E. Main Web Application with Gradio βββββββββββββββββββββββββββββββββββββββ | |
| def predict(text): | |
| return analyze_text(text) | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π‘οΈ AI Risk & Fact-Checking Dashboard") | |
| gr.Markdown("Analyze text for hallucination risk, fake news probability, and citation anomalies.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_text = gr.Textbox( | |
| lines=5, | |
| placeholder="Enter a claim or news snippet here...", | |
| label="Text to Analyze" | |
| ) | |
| submit_btn = gr.Button("Analyze Risk", variant="primary") | |
| with gr.Column(): | |
| output_json = gr.JSON(label="Detailed Analysis Results") | |
| submit_btn.click(fn=predict, inputs=input_text, outputs=output_json) | |
| gr.Examples( | |
| examples=[ | |
| "The moon is made of cheese.", | |
| "Water boils at 100 degrees Celsius at sea level.", | |
| "According to a 2031 study from the Institute of Neural Enhancement, humans only use 10% of their brain.", | |
| "Global temperatures hit record highs in 2024.", | |
| ], | |
| inputs=input_text | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |