import logging
import os
import io
import html
import aiohttp
import socket
from urllib.parse import urlparse
from dotenv import load_dotenv # <--- NEW IMPORT
from telegram import Update, constants
from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters
# ==========================================
# ⚙️ CONFIGURATION
# ==========================================
# 1. Load environment variables from the .env file
load_dotenv()
# 2. Retrieve values
BOT_TOKEN = os.getenv("BOT_TOKEN", "").strip().replace('"', '').replace("'", "")
BACKEND_API_URL = os.getenv("EXTERNAL_ANALYSIS_API_URL", "").strip().replace('"', '').replace("'", "")
API_KEY = os.getenv("API_KEY")
# Check if critical vars are missing
if not BOT_TOKEN or not BACKEND_API_URL:
raise ValueError("❌ Error: BOT_TOKEN or BACKEND_API_URL is missing from .env file")
# ==========================================
# 📝 LOGGING SETUP
# ==========================================
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# ==========================================
# 🧠 REPORT FORMATTER (JSON -> HTML)
# ==========================================
def format_analysis_report(data):
"""
Converts the complex Backend JSON into a readable HTML Telegram message.
"""
try:
# --- HEADER ---
tag = data.get("tag", "Analysis")
overall_summary = data.get("overall_summary", "No summary provided.")
source_cred_list = data.get("source_credibility_summary", [])
# Determine icon based on tag content
tag_lower = tag.lower()
if "true" in tag_lower or "verified" in tag_lower:
icon = "🟢"
elif "false" in tag_lower or "misinfo" in tag_lower or "fake" in tag_lower:
icon = "🔴"
else:
icon = "⚠️"
# Start building the message
message = f"🚨 VERIFACT ANALYSIS REPORT\n"
message += "━━━━━━━━━━━━━━━━━━━\n"
message += f"Result: {icon} {html.escape(tag.upper())}\n"
# Source Credibility Summary (Average)
if source_cred_list:
total_score = 0
count = 0
for item in source_cred_list:
if isinstance(item, dict) and 'credibility_score' in item:
try:
total_score += int(item['credibility_score'])
count += 1
except (ValueError, TypeError):
pass
if count > 0:
avg_score = int(total_score / count)
# Determine label based on average
if avg_score >= 80:
cred_label = "High"
elif avg_score >= 60:
cred_label = "Moderate"
else:
cred_label = "Low"
message += f"Source Credibility: {cred_label} ({avg_score}%)\n"
message += "\n📝 Summary:\n"
message += f"{html.escape(overall_summary)}\n\n"
# --- CLAIMS ANALYSIS ---
claims = data.get("analyzed_claims", [])
if claims:
message += "🔍 CLAIMS ANALYSIS\n"
message += "━━━━━━━━━━━━━━━━━━━\n"
for i, claim in enumerate(claims, 1):
claim_text = claim.get("claim_text", "N/A")
conclusion = claim.get("conclusion", "N/A")
message += f"{i}️⃣ Claim: \"{html.escape(claim_text)}\"\n"
message += f"💡 Conclusion: {html.escape(conclusion)}\n"
# Evidence
supporting = claim.get("supporting_evidence", [])
opposing = claim.get("opposing_evidence", [])
if supporting:
message += "✅ Supporting Evidence:\n"
for ev in supporting:
src = html.escape(ev.get('source', 'Unknown'))
summ = html.escape(ev.get('summary', ''))
# Try to shorten source URL for display if it's a URL
if src.startswith('http'):
from urllib.parse import urlparse
try:
domain = urlparse(src).netloc
src_display = domain
except:
src_display = "Link"
else:
src_display = src
message += f"• {summ} ({src_display})\n"
if opposing:
message += "❌ Opposing Evidence:\n"
for ev in opposing:
src = html.escape(ev.get('source', 'Unknown'))
summ = html.escape(ev.get('summary', ''))
# Try to shorten source URL for display
if src.startswith('http'):
from urllib.parse import urlparse
try:
domain = urlparse(src).netloc
src_display = domain
except:
src_display = "Link"
else:
src_display = src
message += f"• {summ} ({src_display})\n"
message += "\n"
# --- FACT CHECKS ---
all_fact_checks = []
for claim in claims:
all_fact_checks.extend(claim.get("fact_checking_results", []))
# Filter out "None" URLs or empty results
valid_fact_checks = [fc for fc in all_fact_checks if fc.get('url') and fc.get('url') != "None"]
if valid_fact_checks:
message += "🔗 FACT CHECKS\n"
seen_urls = set()
for fc in valid_fact_checks:
url = fc.get('url', '#')
if url not in seen_urls:
# Use inference or source name if available, else domain
source = fc.get('source', 'Fact Check')
if source == 'Fact Check' and url != '#':
from urllib.parse import urlparse
try:
source = urlparse(url).netloc
except:
pass
source = html.escape(source)
message += f"• {source}\n"
seen_urls.add(url)
message += "\n"
# --- SOURCE CREDIBILITY DETAILS ---
if source_cred_list:
message += "🛡️ SOURCE CREDIBILITY\n"
for item in source_cred_list[:5]: # Limit to top 5 to avoid spam
url = item.get('url', '')
score = item.get('credibility_score', 'N/A')
category = item.get('category', 'Unknown')
# Extract domain
domain = "Unknown Source"
if url:
from urllib.parse import urlparse
try:
domain = urlparse(url).netloc
except:
domain = url
message += f"• {domain}: {category} ({score})\n"
message += "\n"
# --- REVERSE IMAGE SEARCH (Optional) ---
ris = data.get("reverse_image_search_data")
if ris:
ris_summary = ris.get("summary", "")
matched = ris.get("matched_links", [])
if ris_summary or matched:
message += "🖼️ IMAGE ANALYSIS\n"
if ris_summary:
message += f"{html.escape(ris_summary)}\n"
if matched:
for match in matched[:3]:
domain = html.escape(match.get('domain', 'Link'))
url = match.get('url', '#')
date = html.escape(match.get('date', ''))
message += f"• {domain} ({date})\n"
message += "\n🤖 Analysis generated by Verifact"
return message
except Exception as e:
logger.error(f"Formatting Error: {e}")
return "⚠️ Format Error: Data received, but could not be displayed properly."
# ==========================================
# 📡 BACKEND CONNECTOR
# ==========================================
async def query_backend_pipeline(form_data):
"""
Sends Multipart Form Data (Text + Files) to Cloud Run.
"""
headers = {}
if API_KEY:
headers["Authorization"] = f"Bearer {API_KEY}"
timeout = aiohttp.ClientTimeout(total=60)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.post(BACKEND_API_URL, data=form_data, headers=headers) as response:
if response.status == 200:
return await response.json()
else:
error_text = await response.text()
logger.error(f"Backend Error {response.status}: {error_text}")
return None
except Exception as e:
logger.error(f"Connection Error: {e}")
return None
# ==========================================
# 🎮 BOT HANDLERS
# ==========================================
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
welcome_text = (
"👋 Verifact Forwarding Bot\n\n"
"I am connected to the misinformation analysis pipeline.\n"
"Forward me any Text or Image to verify it."
)
await context.bot.send_message(chat_id=update.effective_chat.id, text=welcome_text, parse_mode='HTML')
async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_text = update.message.text
await context.bot.send_chat_action(chat_id=update.effective_chat.id, action=constants.ChatAction.TYPING)
status_msg = await context.bot.send_message(
chat_id=update.effective_chat.id,
text="📡 Verifact is analyzing text...",
parse_mode='HTML'
)
data = aiohttp.FormData()
data.add_field('text', user_text)
data.add_field('source', 'Telegram')
json_response = await query_backend_pipeline(data)
if json_response:
report = format_analysis_report(json_response)
await context.bot.edit_message_text(
chat_id=update.effective_chat.id,
message_id=status_msg.message_id,
text=report,
parse_mode='HTML',
disable_web_page_preview=True
)
else:
await context.bot.edit_message_text(
chat_id=update.effective_chat.id,
message_id=status_msg.message_id,
text="⚠️ System Error: The pipeline is currently unreachable or timed out.",
parse_mode='HTML'
)
async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE):
await context.bot.send_chat_action(chat_id=update.effective_chat.id, action=constants.ChatAction.UPLOAD_PHOTO)
status_msg = await context.bot.send_message(
chat_id=update.effective_chat.id,
text="📡 Downloading media & analyzing...",
parse_mode='HTML'
)
try:
photo = update.message.photo[-1]
file_obj = await context.bot.get_file(photo.file_id)
f_memory = io.BytesIO()
await file_obj.download_to_memory(out=f_memory)
f_memory.seek(0)
data = aiohttp.FormData()
caption_text = update.message.caption if update.message.caption else "Image analysis request"
data.add_field('text', caption_text)
data.add_field('source', 'Telegram')
data.add_field('file', f_memory, filename='telegram_image.jpg', content_type='image/jpeg')
json_response = await query_backend_pipeline(data)
if json_response:
report = format_analysis_report(json_response)
await context.bot.edit_message_text(
chat_id=update.effective_chat.id,
message_id=status_msg.message_id,
text=report,
parse_mode='HTML',
disable_web_page_preview=True
)
else:
await context.bot.edit_message_text(
chat_id=update.effective_chat.id,
message_id=status_msg.message_id,
text="⚠️ Error: Analysis failed or timed out.",
parse_mode='HTML'
)
except Exception as e:
logger.error(f"Image Handler Error: {e}")
await context.bot.edit_message_text(
chat_id=update.effective_chat.id,
message_id=status_msg.message_id,
text="❌ Error: Could not process the image file.",
parse_mode='HTML'
)
# ==========================================
# 🛠️ DIAGNOSTICS
# ==========================================
def check_network():
"""Checks DNS resolution for critical services."""
logger.info("--- NETWORK DIAGNOSTICS ---")
targets = [
("Telegram API", "api.telegram.org"),
("Google", "google.com")
]
# Add Backend Host if parseable
try:
if BACKEND_API_URL:
backend_host = urlparse(BACKEND_API_URL).netloc
targets.append(("Backend API", backend_host))
except:
pass
for name, host in targets:
try:
ip = socket.gethostbyname(host)
logger.info(f"✅ {name} ({host}) resolved to {ip}")
except socket.gaierror as e:
logger.error(f"❌ {name} ({host}) DNS FAILURE: {e}")
except Exception as e:
logger.error(f"❌ {name} ({host}) Unexpected Error: {e}")
logger.info("---------------------------")
# ==========================================
# 🚀 MAIN RUNNER
# ==========================================
if __name__ == '__main__':
check_network()
from telegram.request import HTTPXRequest
# Use a robust request object with longer timeouts
trequest = HTTPXRequest(connection_pool_size=8, read_timeout=20.0, write_timeout=20.0, connect_timeout=20.0)
application = ApplicationBuilder().token(BOT_TOKEN).request(trequest).build()
application.add_handler(MessageHandler(filters.COMMAND & filters.Regex(r'^/start$'), start))
application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_text))
application.add_handler(MessageHandler(filters.PHOTO, handle_photo))
print(f"✅ Bot is running.")
print(f"🔗 Connected to Backend: {BACKEND_API_URL}")
application.run_polling()