import logging import os import io import html import aiohttp import socket from urllib.parse import urlparse from dotenv import load_dotenv # <--- NEW IMPORT from telegram import Update, constants from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters # ========================================== # ⚙️ CONFIGURATION # ========================================== # 1. Load environment variables from the .env file load_dotenv() # 2. Retrieve values BOT_TOKEN = os.getenv("BOT_TOKEN", "").strip().replace('"', '').replace("'", "") BACKEND_API_URL = os.getenv("EXTERNAL_ANALYSIS_API_URL", "").strip().replace('"', '').replace("'", "") API_KEY = os.getenv("API_KEY") # Check if critical vars are missing if not BOT_TOKEN or not BACKEND_API_URL: raise ValueError("❌ Error: BOT_TOKEN or BACKEND_API_URL is missing from .env file") # ========================================== # 📝 LOGGING SETUP # ========================================== logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) logger = logging.getLogger(__name__) # ========================================== # 🧠 REPORT FORMATTER (JSON -> HTML) # ========================================== def format_analysis_report(data): """ Converts the complex Backend JSON into a readable HTML Telegram message. """ try: # --- HEADER --- tag = data.get("tag", "Analysis") overall_summary = data.get("overall_summary", "No summary provided.") source_cred_list = data.get("source_credibility_summary", []) # Determine icon based on tag content tag_lower = tag.lower() if "true" in tag_lower or "verified" in tag_lower: icon = "🟢" elif "false" in tag_lower or "misinfo" in tag_lower or "fake" in tag_lower: icon = "🔴" else: icon = "⚠️" # Start building the message message = f"🚨 VERIFACT ANALYSIS REPORT\n" message += "━━━━━━━━━━━━━━━━━━━\n" message += f"Result: {icon} {html.escape(tag.upper())}\n" # Source Credibility Summary (Average) if source_cred_list: total_score = 0 count = 0 for item in source_cred_list: if isinstance(item, dict) and 'credibility_score' in item: try: total_score += int(item['credibility_score']) count += 1 except (ValueError, TypeError): pass if count > 0: avg_score = int(total_score / count) # Determine label based on average if avg_score >= 80: cred_label = "High" elif avg_score >= 60: cred_label = "Moderate" else: cred_label = "Low" message += f"Source Credibility: {cred_label} ({avg_score}%)\n" message += "\n📝 Summary:\n" message += f"{html.escape(overall_summary)}\n\n" # --- CLAIMS ANALYSIS --- claims = data.get("analyzed_claims", []) if claims: message += "🔍 CLAIMS ANALYSIS\n" message += "━━━━━━━━━━━━━━━━━━━\n" for i, claim in enumerate(claims, 1): claim_text = claim.get("claim_text", "N/A") conclusion = claim.get("conclusion", "N/A") message += f"{i}️⃣ Claim: \"{html.escape(claim_text)}\"\n" message += f"💡 Conclusion: {html.escape(conclusion)}\n" # Evidence supporting = claim.get("supporting_evidence", []) opposing = claim.get("opposing_evidence", []) if supporting: message += "✅ Supporting Evidence:\n" for ev in supporting: src = html.escape(ev.get('source', 'Unknown')) summ = html.escape(ev.get('summary', '')) # Try to shorten source URL for display if it's a URL if src.startswith('http'): from urllib.parse import urlparse try: domain = urlparse(src).netloc src_display = domain except: src_display = "Link" else: src_display = src message += f"• {summ} ({src_display})\n" if opposing: message += "❌ Opposing Evidence:\n" for ev in opposing: src = html.escape(ev.get('source', 'Unknown')) summ = html.escape(ev.get('summary', '')) # Try to shorten source URL for display if src.startswith('http'): from urllib.parse import urlparse try: domain = urlparse(src).netloc src_display = domain except: src_display = "Link" else: src_display = src message += f"• {summ} ({src_display})\n" message += "\n" # --- FACT CHECKS --- all_fact_checks = [] for claim in claims: all_fact_checks.extend(claim.get("fact_checking_results", [])) # Filter out "None" URLs or empty results valid_fact_checks = [fc for fc in all_fact_checks if fc.get('url') and fc.get('url') != "None"] if valid_fact_checks: message += "🔗 FACT CHECKS\n" seen_urls = set() for fc in valid_fact_checks: url = fc.get('url', '#') if url not in seen_urls: # Use inference or source name if available, else domain source = fc.get('source', 'Fact Check') if source == 'Fact Check' and url != '#': from urllib.parse import urlparse try: source = urlparse(url).netloc except: pass source = html.escape(source) message += f"• {source}\n" seen_urls.add(url) message += "\n" # --- SOURCE CREDIBILITY DETAILS --- if source_cred_list: message += "🛡️ SOURCE CREDIBILITY\n" for item in source_cred_list[:5]: # Limit to top 5 to avoid spam url = item.get('url', '') score = item.get('credibility_score', 'N/A') category = item.get('category', 'Unknown') # Extract domain domain = "Unknown Source" if url: from urllib.parse import urlparse try: domain = urlparse(url).netloc except: domain = url message += f"• {domain}: {category} ({score})\n" message += "\n" # --- REVERSE IMAGE SEARCH (Optional) --- ris = data.get("reverse_image_search_data") if ris: ris_summary = ris.get("summary", "") matched = ris.get("matched_links", []) if ris_summary or matched: message += "🖼️ IMAGE ANALYSIS\n" if ris_summary: message += f"{html.escape(ris_summary)}\n" if matched: for match in matched[:3]: domain = html.escape(match.get('domain', 'Link')) url = match.get('url', '#') date = html.escape(match.get('date', '')) message += f"• {domain} ({date})\n" message += "\n🤖 Analysis generated by Verifact" return message except Exception as e: logger.error(f"Formatting Error: {e}") return "⚠️ Format Error: Data received, but could not be displayed properly." # ========================================== # 📡 BACKEND CONNECTOR # ========================================== async def query_backend_pipeline(form_data): """ Sends Multipart Form Data (Text + Files) to Cloud Run. """ headers = {} if API_KEY: headers["Authorization"] = f"Bearer {API_KEY}" timeout = aiohttp.ClientTimeout(total=60) async with aiohttp.ClientSession(timeout=timeout) as session: try: async with session.post(BACKEND_API_URL, data=form_data, headers=headers) as response: if response.status == 200: return await response.json() else: error_text = await response.text() logger.error(f"Backend Error {response.status}: {error_text}") return None except Exception as e: logger.error(f"Connection Error: {e}") return None # ========================================== # 🎮 BOT HANDLERS # ========================================== async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): welcome_text = ( "👋 Verifact Forwarding Bot\n\n" "I am connected to the misinformation analysis pipeline.\n" "Forward me any Text or Image to verify it." ) await context.bot.send_message(chat_id=update.effective_chat.id, text=welcome_text, parse_mode='HTML') async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE): user_text = update.message.text await context.bot.send_chat_action(chat_id=update.effective_chat.id, action=constants.ChatAction.TYPING) status_msg = await context.bot.send_message( chat_id=update.effective_chat.id, text="📡 Verifact is analyzing text...", parse_mode='HTML' ) data = aiohttp.FormData() data.add_field('text', user_text) data.add_field('source', 'Telegram') json_response = await query_backend_pipeline(data) if json_response: report = format_analysis_report(json_response) await context.bot.edit_message_text( chat_id=update.effective_chat.id, message_id=status_msg.message_id, text=report, parse_mode='HTML', disable_web_page_preview=True ) else: await context.bot.edit_message_text( chat_id=update.effective_chat.id, message_id=status_msg.message_id, text="⚠️ System Error: The pipeline is currently unreachable or timed out.", parse_mode='HTML' ) async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): await context.bot.send_chat_action(chat_id=update.effective_chat.id, action=constants.ChatAction.UPLOAD_PHOTO) status_msg = await context.bot.send_message( chat_id=update.effective_chat.id, text="📡 Downloading media & analyzing...", parse_mode='HTML' ) try: photo = update.message.photo[-1] file_obj = await context.bot.get_file(photo.file_id) f_memory = io.BytesIO() await file_obj.download_to_memory(out=f_memory) f_memory.seek(0) data = aiohttp.FormData() caption_text = update.message.caption if update.message.caption else "Image analysis request" data.add_field('text', caption_text) data.add_field('source', 'Telegram') data.add_field('file', f_memory, filename='telegram_image.jpg', content_type='image/jpeg') json_response = await query_backend_pipeline(data) if json_response: report = format_analysis_report(json_response) await context.bot.edit_message_text( chat_id=update.effective_chat.id, message_id=status_msg.message_id, text=report, parse_mode='HTML', disable_web_page_preview=True ) else: await context.bot.edit_message_text( chat_id=update.effective_chat.id, message_id=status_msg.message_id, text="⚠️ Error: Analysis failed or timed out.", parse_mode='HTML' ) except Exception as e: logger.error(f"Image Handler Error: {e}") await context.bot.edit_message_text( chat_id=update.effective_chat.id, message_id=status_msg.message_id, text="❌ Error: Could not process the image file.", parse_mode='HTML' ) # ========================================== # 🛠️ DIAGNOSTICS # ========================================== def check_network(): """Checks DNS resolution for critical services.""" logger.info("--- NETWORK DIAGNOSTICS ---") targets = [ ("Telegram API", "api.telegram.org"), ("Google", "google.com") ] # Add Backend Host if parseable try: if BACKEND_API_URL: backend_host = urlparse(BACKEND_API_URL).netloc targets.append(("Backend API", backend_host)) except: pass for name, host in targets: try: ip = socket.gethostbyname(host) logger.info(f"✅ {name} ({host}) resolved to {ip}") except socket.gaierror as e: logger.error(f"❌ {name} ({host}) DNS FAILURE: {e}") except Exception as e: logger.error(f"❌ {name} ({host}) Unexpected Error: {e}") logger.info("---------------------------") # ========================================== # 🚀 MAIN RUNNER # ========================================== if __name__ == '__main__': check_network() from telegram.request import HTTPXRequest # Use a robust request object with longer timeouts trequest = HTTPXRequest(connection_pool_size=8, read_timeout=20.0, write_timeout=20.0, connect_timeout=20.0) application = ApplicationBuilder().token(BOT_TOKEN).request(trequest).build() application.add_handler(MessageHandler(filters.COMMAND & filters.Regex(r'^/start$'), start)) application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_text)) application.add_handler(MessageHandler(filters.PHOTO, handle_photo)) print(f"✅ Bot is running.") print(f"🔗 Connected to Backend: {BACKEND_API_URL}") application.run_polling()