Spaces:
Runtime error
Runtime error
| import logging | |
| import os | |
| import io | |
| import html | |
| import aiohttp | |
| import socket | |
| from urllib.parse import urlparse | |
| from dotenv import load_dotenv # <--- NEW IMPORT | |
| from telegram import Update, constants | |
| from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters | |
| # ========================================== | |
| # ⚙️ CONFIGURATION | |
| # ========================================== | |
| # 1. Load environment variables from the .env file | |
| load_dotenv() | |
| # 2. Retrieve values | |
| BOT_TOKEN = os.getenv("BOT_TOKEN", "").strip().replace('"', '').replace("'", "") | |
| BACKEND_API_URL = os.getenv("EXTERNAL_ANALYSIS_API_URL", "").strip().replace('"', '').replace("'", "") | |
| API_KEY = os.getenv("API_KEY") | |
| # Check if critical vars are missing | |
| if not BOT_TOKEN or not BACKEND_API_URL: | |
| raise ValueError("❌ Error: BOT_TOKEN or BACKEND_API_URL is missing from .env file") | |
| # ========================================== | |
| # 📝 LOGGING SETUP | |
| # ========================================== | |
| logging.basicConfig( | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| level=logging.INFO | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ========================================== | |
| # 🧠 REPORT FORMATTER (JSON -> HTML) | |
| # ========================================== | |
| def format_analysis_report(data): | |
| """ | |
| Converts the complex Backend JSON into a readable HTML Telegram message. | |
| """ | |
| try: | |
| # --- HEADER --- | |
| tag = data.get("tag", "Analysis") | |
| overall_summary = data.get("overall_summary", "No summary provided.") | |
| source_cred_list = data.get("source_credibility_summary", []) | |
| # Determine icon based on tag content | |
| tag_lower = tag.lower() | |
| if "true" in tag_lower or "verified" in tag_lower: | |
| icon = "🟢" | |
| elif "false" in tag_lower or "misinfo" in tag_lower or "fake" in tag_lower: | |
| icon = "🔴" | |
| else: | |
| icon = "⚠️" | |
| # Start building the message | |
| message = f"<b>🚨 VERIFACT ANALYSIS REPORT</b>\n" | |
| message += "━━━━━━━━━━━━━━━━━━━\n" | |
| message += f"<b>Result:</b> {icon} <b>{html.escape(tag.upper())}</b>\n" | |
| # Source Credibility Summary (Average) | |
| if source_cred_list: | |
| total_score = 0 | |
| count = 0 | |
| for item in source_cred_list: | |
| if isinstance(item, dict) and 'credibility_score' in item: | |
| try: | |
| total_score += int(item['credibility_score']) | |
| count += 1 | |
| except (ValueError, TypeError): | |
| pass | |
| if count > 0: | |
| avg_score = int(total_score / count) | |
| # Determine label based on average | |
| if avg_score >= 80: | |
| cred_label = "High" | |
| elif avg_score >= 60: | |
| cred_label = "Moderate" | |
| else: | |
| cred_label = "Low" | |
| message += f"<b>Source Credibility:</b> {cred_label} ({avg_score}%)\n" | |
| message += "\n<b>📝 Summary:</b>\n" | |
| message += f"<i>{html.escape(overall_summary)}</i>\n\n" | |
| # --- CLAIMS ANALYSIS --- | |
| claims = data.get("analyzed_claims", []) | |
| if claims: | |
| message += "<b>🔍 CLAIMS ANALYSIS</b>\n" | |
| message += "━━━━━━━━━━━━━━━━━━━\n" | |
| for i, claim in enumerate(claims, 1): | |
| claim_text = claim.get("claim_text", "N/A") | |
| conclusion = claim.get("conclusion", "N/A") | |
| message += f"<b>{i}️⃣ Claim:</b> \"{html.escape(claim_text)}\"\n" | |
| message += f"<b>💡 Conclusion:</b> {html.escape(conclusion)}\n" | |
| # Evidence | |
| supporting = claim.get("supporting_evidence", []) | |
| opposing = claim.get("opposing_evidence", []) | |
| if supporting: | |
| message += "<b>✅ Supporting Evidence:</b>\n" | |
| for ev in supporting: | |
| src = html.escape(ev.get('source', 'Unknown')) | |
| summ = html.escape(ev.get('summary', '')) | |
| # Try to shorten source URL for display if it's a URL | |
| if src.startswith('http'): | |
| from urllib.parse import urlparse | |
| try: | |
| domain = urlparse(src).netloc | |
| src_display = domain | |
| except: | |
| src_display = "Link" | |
| else: | |
| src_display = src | |
| message += f"• {summ} <i>({src_display})</i>\n" | |
| if opposing: | |
| message += "<b>❌ Opposing Evidence:</b>\n" | |
| for ev in opposing: | |
| src = html.escape(ev.get('source', 'Unknown')) | |
| summ = html.escape(ev.get('summary', '')) | |
| # Try to shorten source URL for display | |
| if src.startswith('http'): | |
| from urllib.parse import urlparse | |
| try: | |
| domain = urlparse(src).netloc | |
| src_display = domain | |
| except: | |
| src_display = "Link" | |
| else: | |
| src_display = src | |
| message += f"• {summ} <i>({src_display})</i>\n" | |
| message += "\n" | |
| # --- FACT CHECKS --- | |
| all_fact_checks = [] | |
| for claim in claims: | |
| all_fact_checks.extend(claim.get("fact_checking_results", [])) | |
| # Filter out "None" URLs or empty results | |
| valid_fact_checks = [fc for fc in all_fact_checks if fc.get('url') and fc.get('url') != "None"] | |
| if valid_fact_checks: | |
| message += "<b>🔗 FACT CHECKS</b>\n" | |
| seen_urls = set() | |
| for fc in valid_fact_checks: | |
| url = fc.get('url', '#') | |
| if url not in seen_urls: | |
| # Use inference or source name if available, else domain | |
| source = fc.get('source', 'Fact Check') | |
| if source == 'Fact Check' and url != '#': | |
| from urllib.parse import urlparse | |
| try: | |
| source = urlparse(url).netloc | |
| except: | |
| pass | |
| source = html.escape(source) | |
| message += f"• <a href='{url}'>{source}</a>\n" | |
| seen_urls.add(url) | |
| message += "\n" | |
| # --- SOURCE CREDIBILITY DETAILS --- | |
| if source_cred_list: | |
| message += "<b>🛡️ SOURCE CREDIBILITY</b>\n" | |
| for item in source_cred_list[:5]: # Limit to top 5 to avoid spam | |
| url = item.get('url', '') | |
| score = item.get('credibility_score', 'N/A') | |
| category = item.get('category', 'Unknown') | |
| # Extract domain | |
| domain = "Unknown Source" | |
| if url: | |
| from urllib.parse import urlparse | |
| try: | |
| domain = urlparse(url).netloc | |
| except: | |
| domain = url | |
| message += f"• <b>{domain}</b>: {category} ({score})\n" | |
| message += "\n" | |
| # --- REVERSE IMAGE SEARCH (Optional) --- | |
| ris = data.get("reverse_image_search_data") | |
| if ris: | |
| ris_summary = ris.get("summary", "") | |
| matched = ris.get("matched_links", []) | |
| if ris_summary or matched: | |
| message += "<b>🖼️ IMAGE ANALYSIS</b>\n" | |
| if ris_summary: | |
| message += f"{html.escape(ris_summary)}\n" | |
| if matched: | |
| for match in matched[:3]: | |
| domain = html.escape(match.get('domain', 'Link')) | |
| url = match.get('url', '#') | |
| date = html.escape(match.get('date', '')) | |
| message += f"• <a href='{url}'>{domain}</a> ({date})\n" | |
| message += "\n<i>🤖 Analysis generated by Verifact</i>" | |
| return message | |
| except Exception as e: | |
| logger.error(f"Formatting Error: {e}") | |
| return "⚠️ <b>Format Error:</b> Data received, but could not be displayed properly." | |
| # ========================================== | |
| # 📡 BACKEND CONNECTOR | |
| # ========================================== | |
| async def query_backend_pipeline(form_data): | |
| """ | |
| Sends Multipart Form Data (Text + Files) to Cloud Run. | |
| """ | |
| headers = {} | |
| if API_KEY: | |
| headers["Authorization"] = f"Bearer {API_KEY}" | |
| timeout = aiohttp.ClientTimeout(total=60) | |
| async with aiohttp.ClientSession(timeout=timeout) as session: | |
| try: | |
| async with session.post(BACKEND_API_URL, data=form_data, headers=headers) as response: | |
| if response.status == 200: | |
| return await response.json() | |
| else: | |
| error_text = await response.text() | |
| logger.error(f"Backend Error {response.status}: {error_text}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Connection Error: {e}") | |
| return None | |
| # ========================================== | |
| # 🎮 BOT HANDLERS | |
| # ========================================== | |
| async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| welcome_text = ( | |
| "👋 <b>Verifact Forwarding Bot</b>\n\n" | |
| "I am connected to the misinformation analysis pipeline.\n" | |
| "Forward me any <b>Text</b> or <b>Image</b> to verify it." | |
| ) | |
| await context.bot.send_message(chat_id=update.effective_chat.id, text=welcome_text, parse_mode='HTML') | |
| async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| user_text = update.message.text | |
| await context.bot.send_chat_action(chat_id=update.effective_chat.id, action=constants.ChatAction.TYPING) | |
| status_msg = await context.bot.send_message( | |
| chat_id=update.effective_chat.id, | |
| text="📡 <i>Verifact is analyzing text...</i>", | |
| parse_mode='HTML' | |
| ) | |
| data = aiohttp.FormData() | |
| data.add_field('text', user_text) | |
| data.add_field('source', 'Telegram') | |
| json_response = await query_backend_pipeline(data) | |
| if json_response: | |
| report = format_analysis_report(json_response) | |
| await context.bot.edit_message_text( | |
| chat_id=update.effective_chat.id, | |
| message_id=status_msg.message_id, | |
| text=report, | |
| parse_mode='HTML', | |
| disable_web_page_preview=True | |
| ) | |
| else: | |
| await context.bot.edit_message_text( | |
| chat_id=update.effective_chat.id, | |
| message_id=status_msg.message_id, | |
| text="⚠️ <b>System Error:</b> The pipeline is currently unreachable or timed out.", | |
| parse_mode='HTML' | |
| ) | |
| async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| await context.bot.send_chat_action(chat_id=update.effective_chat.id, action=constants.ChatAction.UPLOAD_PHOTO) | |
| status_msg = await context.bot.send_message( | |
| chat_id=update.effective_chat.id, | |
| text="📡 <i>Downloading media & analyzing...</i>", | |
| parse_mode='HTML' | |
| ) | |
| try: | |
| photo = update.message.photo[-1] | |
| file_obj = await context.bot.get_file(photo.file_id) | |
| f_memory = io.BytesIO() | |
| await file_obj.download_to_memory(out=f_memory) | |
| f_memory.seek(0) | |
| data = aiohttp.FormData() | |
| caption_text = update.message.caption if update.message.caption else "Image analysis request" | |
| data.add_field('text', caption_text) | |
| data.add_field('source', 'Telegram') | |
| data.add_field('file', f_memory, filename='telegram_image.jpg', content_type='image/jpeg') | |
| json_response = await query_backend_pipeline(data) | |
| if json_response: | |
| report = format_analysis_report(json_response) | |
| await context.bot.edit_message_text( | |
| chat_id=update.effective_chat.id, | |
| message_id=status_msg.message_id, | |
| text=report, | |
| parse_mode='HTML', | |
| disable_web_page_preview=True | |
| ) | |
| else: | |
| await context.bot.edit_message_text( | |
| chat_id=update.effective_chat.id, | |
| message_id=status_msg.message_id, | |
| text="⚠️ <b>Error:</b> Analysis failed or timed out.", | |
| parse_mode='HTML' | |
| ) | |
| except Exception as e: | |
| logger.error(f"Image Handler Error: {e}") | |
| await context.bot.edit_message_text( | |
| chat_id=update.effective_chat.id, | |
| message_id=status_msg.message_id, | |
| text="❌ <b>Error:</b> Could not process the image file.", | |
| parse_mode='HTML' | |
| ) | |
| # ========================================== | |
| # 🛠️ DIAGNOSTICS | |
| # ========================================== | |
| def check_network(): | |
| """Checks DNS resolution for critical services.""" | |
| logger.info("--- NETWORK DIAGNOSTICS ---") | |
| targets = [ | |
| ("Telegram API", "api.telegram.org"), | |
| ("Google", "google.com") | |
| ] | |
| # Add Backend Host if parseable | |
| try: | |
| if BACKEND_API_URL: | |
| backend_host = urlparse(BACKEND_API_URL).netloc | |
| targets.append(("Backend API", backend_host)) | |
| except: | |
| pass | |
| for name, host in targets: | |
| try: | |
| ip = socket.gethostbyname(host) | |
| logger.info(f"✅ {name} ({host}) resolved to {ip}") | |
| except socket.gaierror as e: | |
| logger.error(f"❌ {name} ({host}) DNS FAILURE: {e}") | |
| except Exception as e: | |
| logger.error(f"❌ {name} ({host}) Unexpected Error: {e}") | |
| logger.info("---------------------------") | |
| # ========================================== | |
| # 🚀 MAIN RUNNER | |
| # ========================================== | |
| if __name__ == '__main__': | |
| check_network() | |
| from telegram.request import HTTPXRequest | |
| # Use a robust request object with longer timeouts | |
| trequest = HTTPXRequest(connection_pool_size=8, read_timeout=20.0, write_timeout=20.0, connect_timeout=20.0) | |
| application = ApplicationBuilder().token(BOT_TOKEN).request(trequest).build() | |
| application.add_handler(MessageHandler(filters.COMMAND & filters.Regex(r'^/start$'), start)) | |
| application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_text)) | |
| application.add_handler(MessageHandler(filters.PHOTO, handle_photo)) | |
| print(f"✅ Bot is running.") | |
| print(f"🔗 Connected to Backend: {BACKEND_API_URL}") | |
| application.run_polling() |