| |
| """ |
| main.py - Server for the Fake News Detection system |
| |
| This script creates a Flask server that exposes API endpoints to: |
| 1. Take user input (news query) from the UI |
| 2. Process the request through the fake news detection pipeline |
| 3. Return the results to the UI for display |
| """ |
|
|
| import os |
| import json |
| import time |
| from dotenv import load_dotenv |
| from flask import Flask, request, jsonify |
| from flask_cors import CORS |
|
|
| |
| from gdelt_api import ( |
| fetch_articles_from_gdelt, |
| filter_by_whitelisted_domains, |
| normalize_gdelt_articles |
| ) |
| from ranker import ArticleRanker |
| from gdelt_query_builder import generate_query, GEMINI_MODEL |
| import bias_analyzer |
|
|
| |
| print("Preloading embedding model for faster request processing...") |
| |
| global_ranker = ArticleRanker() |
|
|
|
|
| |
| |
|
|
|
|
| def format_results(query, ranked_articles): |
| """ |
| Format the ranked results in a structured way for the UI. |
| |
| Args: |
| query (str): The original query |
| ranked_articles (list): List of ranked article dictionaries |
| |
| Returns: |
| dict: Dictionary with formatted results |
| """ |
| result = {} |
| |
| if not ranked_articles: |
| result = { |
| "status": "no_results", |
| "message": "⚠️ No news found. Possibly Fake.", |
| "details": "No reliable sources could verify this information.", |
| "articles": [] |
| } |
| else: |
| |
| show_scores = os.getenv('SHOW_SIMILARITY_SCORES', 'true').lower() == 'true' |
| show_date = os.getenv('SHOW_PUBLISH_DATE', 'true').lower() == 'true' |
| show_url = os.getenv('SHOW_URL', 'true').lower() == 'true' |
| |
| formatted_articles = [] |
| for article in ranked_articles: |
| formatted_article = { |
| "rank": article['rank'], |
| "title": article['title'], |
| "source": article['source'] |
| } |
| |
| if show_scores: |
| formatted_article["similarity_score"] = round(article['similarity_score'], 4) |
| |
| if show_url: |
| formatted_article["url"] = article['url'] |
| |
| if show_date: |
| formatted_article["published_at"] = article['published_at'] |
| |
| formatted_articles.append(formatted_article) |
| |
| result = { |
| "status": "success", |
| "message": f"✅ Found {len(ranked_articles)} relevant articles for: '{query}'", |
| "articles": formatted_articles, |
| "footer": "If the news matches these reliable sources, it's likely true. If it contradicts them or no sources are found, it might be fake." |
| } |
| |
| return result |
|
|
|
|
| def remove_duplicates(articles): |
| """ |
| Remove duplicate articles based on URL. |
| |
| Args: |
| articles (list): List of article dictionaries |
| |
| Returns: |
| list: List with duplicate articles removed |
| """ |
| unique_urls = set() |
| unique_articles = [] |
| |
| for article in articles: |
| if article['url'] not in unique_urls: |
| unique_urls.add(article['url']) |
| unique_articles.append(article) |
| |
| return unique_articles |
|
|
|
|
| |
| |
|
|
|
|
| def main(): |
| """Main function to run the fake news detection pipeline as a server.""" |
| |
| load_dotenv() |
| |
| |
| app = Flask(__name__, static_folder='static') |
| CORS(app) |
| |
| @app.route('/static/') |
| def index(): |
| """Serve the main page.""" |
| return app.send_static_file('front.html') |
|
|
| |
| @app.route('/api/detect', methods=['POST']) |
| def detect_fake_news(): |
| """API endpoint to check if news is potentially fake.""" |
| |
| start_time = time.time() |
| |
| data = request.json |
| query = data.get('query', '') |
| |
| if not query: |
| return jsonify({ |
| "status": "error", |
| "message": "Please provide a news statement to verify." |
| }) |
| |
| |
| |
| |
| |
| query_variations = generate_query(query) |
| |
| |
| if query_variations == ["INAPPROPRIATE_QUERY"]: |
| return jsonify({ |
| "status": "error", |
| "message": "I cannot provide information on this topic as it appears to contain sensitive or inappropriate content." |
| }) |
| |
| |
| |
| |
| |
| all_articles = [] |
| for query_var in query_variations: |
| articles = fetch_articles_from_gdelt(query_var) |
| if articles: |
| all_articles.extend(articles) |
| |
| |
| unique_articles = remove_duplicates(all_articles) |
| |
| |
| use_whitelist_only = os.getenv('USE_WHITELIST_ONLY', 'false').lower() == 'true' |
| if use_whitelist_only: |
| print(f"Filtering articles to only include whitelisted domains...") |
| unique_articles = filter_by_whitelisted_domains(unique_articles) |
| print(f"After whitelist filtering: {len(unique_articles)} articles remain") |
| |
| |
| normalized_articles = normalize_gdelt_articles(unique_articles) |
| |
| if not normalized_articles: |
| return jsonify(format_results(query, [])) |
| |
| |
| |
| |
| |
| model_name = os.getenv('SIMILARITY_MODEL', 'intfloat/multilingual-e5-base') |
| |
| |
| if global_ranker.model_name == model_name: |
| ranker = global_ranker |
| else: |
| ranker = ArticleRanker(model_name) |
| |
| |
| TOP_K_ARTICLES = int(os.getenv('TOP_K_ARTICLES', 250)) |
| min_threshold = float(os.getenv('MIN_SIMILARITY_THRESHOLD', 0.1)) |
| |
| |
| article_texts = [f"{article['title']} {article['description'] or ''}" for article in normalized_articles] |
| |
| |
| query_embedding, article_embeddings = ranker.create_embeddings(query, article_texts) |
| similarities = ranker.calculate_similarities(query_embedding, article_embeddings) |
| |
| |
| top_indices = ranker.get_top_articles(similarities, normalized_articles, TOP_K_ARTICLES, min_threshold) |
| top_articles = ranker.format_results(top_indices, similarities, normalized_articles) |
| |
| |
| |
| |
| |
| |
| outlet_names = [article['source'] for article in top_articles] |
| unique_outlets = list(set(outlet_names)) |
| print(f"Analyzing {len(unique_outlets)} unique news outlets for bias...") |
| |
| |
| bias_analysis = bias_analyzer.analyze_bias(query, unique_outlets, GEMINI_MODEL) |
| |
| |
| |
| |
| print("\n" + "=" * 80) |
| print("EMBEDDING VECTORS BY BIAS CATEGORY") |
| print("=" * 80) |
| |
| |
| |
| |
| |
| category_rankings = bias_analyzer.categorize_and_rank_by_bias( |
| query, normalized_articles, bias_analysis, ranker, min_threshold |
| ) |
| |
| |
| |
| |
| |
| TOP_N_PER_CATEGORY = int(os.getenv('TOP_N_PER_CATEGORY', 5)) |
| |
| |
| category_article_counts = { |
| category: len(articles) |
| for category, articles in category_rankings.items() |
| if category not in ["descriptions", "reasoning"] |
| } |
| |
| |
| |
| filtered_category_rankings = {} |
| for category, articles in category_rankings.items(): |
| |
| if category in ["descriptions", "reasoning"]: |
| continue |
| |
| filtered_category_rankings[category] = articles[:TOP_N_PER_CATEGORY] |
| |
| |
| if len(filtered_category_rankings[category]) > 0: |
| print(f"\n===== Top {len(filtered_category_rankings[category])} articles from {category} category =====") |
| |
| |
| for i, article in enumerate(filtered_category_rankings[category], 1): |
| print(f"Article #{i}:") |
| print(f" Title: {article['title']}") |
| print(f" Source: {article['source']}") |
| print(f" Similarity Score: {article['similarity_score']:.4f}") |
| print(f" Rank: {article['rank']}") |
| print(f" URL: {article['url']}") |
| print(f" Published: {article['published_at']}") |
| print("-" * 50) |
| |
| |
| |
| |
| |
| print("\nGenerating factual summary using top articles from all categories...") |
| |
| |
| |
| filtered_category_rankings["reasoning"] = bias_analysis.get("reasoning", "No reasoning provided") |
| |
| |
| summary = bias_analyzer.generate_summary( |
| query, |
| normalized_articles, |
| filtered_category_rankings, |
| GEMINI_MODEL |
| ) |
| |
| |
| print(summary) |
| |
| |
| result = { |
| "query": query, |
| "summary": summary, |
| "reasoning": bias_analysis.get("reasoning", "No reasoning provided") |
| } |
| |
| return jsonify(result) |
| |
| @app.route('/api/health', methods=['GET']) |
| def health_check(): |
| """API endpoint to check if the server is running.""" |
| return jsonify({ |
| "status": "ok", |
| "message": "Fake News Detection API is running" |
| }) |
| |
| |
| port = int(os.getenv('PORT', 5000)) |
| debug = os.getenv('DEBUG', 'false').lower() == 'true' |
| |
| print(f"Starting Fake News Detection API server on port {port}...") |
| |
| app.run(host='0.0.0.0', port=port, debug=debug) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|