import asyncio import logging import socket import aiohttp from contextlib import asynccontextmanager from fastapi import FastAPI, Request, Response from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from aiogram import Bot, Dispatcher, types from aiogram.client.telegram import TelegramAPIServer from aiogram.client.session.aiohttp import AiohttpSession from apscheduler.schedulers.asyncio import AsyncIOScheduler from sqlalchemy import select, func # Monkeypatch socket.getaddrinfo to bypass DNS resolution blocks for api.telegram.org original_getaddrinfo = socket.getaddrinfo def custom_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0): if host == "api.telegram.org": # Map to Telegram Bot API native IPv6 and IPv4 IPs return [ (socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('2001:67c:4e8:f004::9', port, 0, 0)), (socket.AF_INET, socket.SOCK_STREAM, 6, '', ('149.154.166.110', port)) ] return original_getaddrinfo(host, port, family, type, proto, flags) socket.getaddrinfo = custom_getaddrinfo from core.config import settings from db.session import init_db, async_session from db.models import User from bot.handlers import router as bot_router from bot.scheduler import process_daily_broadcast from services.fx_service import fx_service # Configure Logging logging.basicConfig(level=getattr(logging, settings.log_level)) logger = logging.getLogger(__name__) # InMemoryLogHandler to store logs for web diagnostics class InMemoryLogHandler(logging.Handler): def __init__(self, limit=300): super().__init__() self.limit = limit self.records = [] def emit(self, record): try: self.records.append(self.format(record)) if len(self.records) > self.limit: self.records.pop(0) except Exception: pass in_memory_logs = InMemoryLogHandler() in_memory_logs.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logging.getLogger().addHandler(in_memory_logs) logging.getLogger("uvicorn").addHandler(in_memory_logs) logging.getLogger("uvicorn.error").addHandler(in_memory_logs) logging.getLogger("uvicorn.access").addHandler(in_memory_logs) logging.getLogger("aiogram").addHandler(in_memory_logs) # Global diagnostic results storage diagnostic_results = [] # Initialize Aiogram Bot (with custom reverse proxy API server if configured) if settings.telegram_api_server: api_server = TelegramAPIServer.from_base(settings.telegram_api_server) session = AiohttpSession(api=api_server) bot = Bot(token=settings.bot_token, session=session) logger.info(f"Initialized Bot with custom API server: {settings.telegram_api_server}") else: bot = Bot(token=settings.bot_token) dp = Dispatcher() dp.include_router(bot_router) # Initialize Scheduler scheduler = AsyncIOScheduler() @asynccontextmanager async def lifespan(app: FastAPI): global diagnostic_results diagnostic_results.append("Starting resolution & connection diagnostics...") # Check DNS resolution for host in ["api.telegram.org", "www.cbsl.gov.lk"]: try: ips = socket.getaddrinfo(host, 443) diagnostic_results.append(f"DNS Resolution for {host}: {ips}") except Exception as e: diagnostic_results.append(f"DNS Resolution failed for {host}: {type(e).__name__}: {e}") # Try different socket configurations configs = [ ("Default (No family forced)", {}), ("Forced IPv4 (AF_INET)", {"family": socket.AF_INET}), ("Forced IPv6 (AF_INET6)", {"family": socket.AF_INET6}) ] for name, kwargs in configs: try: connector = aiohttp.TCPConnector(**kwargs) async with aiohttp.ClientSession(connector=connector) as sess: async with sess.get(f"https://api.telegram.org/bot{settings.bot_token}/getMe", timeout=5) as r: text = await r.text() diagnostic_results.append(f"Configuration '{name}' SUCCESS: Status {r.status}, body: {text}") except Exception as e: diagnostic_results.append(f"Configuration '{name}' FAILED: {type(e).__name__}: {e}") diagnostic_results.append("Diagnostics finished.") # 1. Initialize Database try: await init_db() logger.info("Database initialized successfully.") except Exception as e: logger.error(f"CRITICAL: Failed to initialize database on startup. The database may be paused or unreachable: {e}") diagnostic_results.append(f"DB Initialization FAILED: {type(e).__name__}: {e}") # Register Bot Menu Commands try: commands = [ types.BotCommand(command="start", description="Start FlyRates & subscribe to daily rates"), types.BotCommand(command="current", description="Fetch live LKR exchange rates list instantly"), types.BotCommand(command="history", description="View weekly trends with visual sparkline graphs"), types.BotCommand(command="unsubscribe", description="Opt-out of daily LKR rates broadcast"), types.BotCommand(command="help", description="Show help guide and commands reference") ] await bot.set_my_commands(commands) logger.info("Bot commands registered successfully.") except Exception as e: logger.error(f"Failed to register bot commands: {e}") # 3. Setup Webhook (if URL is provided) or Fallback to Polling try: if settings.webhook_url: webhook_info = await bot.get_webhook_info() if webhook_info.url != f"{settings.webhook_url}/webhook": await bot.set_webhook(url=f"{settings.webhook_url}/webhook") logger.info(f"Webhook set to {settings.webhook_url}/webhook") else: logger.info("WEBHOOK_URL not set. Running in local mode. Checking active webhook...") webhook_info = await bot.get_webhook_info() if webhook_info.url: if settings.delete_webhook_on_local: logger.warning(f"Active webhook detected at '{webhook_info.url}'. Deleting it as delete_webhook_on_local is enabled...") await bot.delete_webhook(drop_pending_updates=False) polling_task = asyncio.create_task(dp.start_polling(bot)) app.state.polling_task = polling_task else: logger.warning( f"Active webhook detected at '{webhook_info.url}'. " "To protect the production bot, local long polling will NOT delete it, and polling is skipped. " "If you wish to test locally and delete the webhook, set DELETE_WEBHOOK_ON_LOCAL=True in your local .env file." ) else: logger.info("No active webhook. Starting Bot in Long Polling mode...") polling_task = asyncio.create_task(dp.start_polling(bot)) app.state.polling_task = polling_task except Exception as e: logger.error(f"Failed to setup Telegram interaction during startup: {e}") # 3. Setup Scheduled Jobs (exactly one clean daily broadcast at 8:00 AM) scheduler.add_job(process_daily_broadcast, 'cron', hour=8, args=[bot]) scheduler.start() yield # Shutdown logic scheduler.shutdown() if not settings.webhook_url: if hasattr(app.state, "polling_task"): app.state.polling_task.cancel() try: await app.state.polling_task except asyncio.CancelledError: pass await bot.session.close() app = FastAPI(lifespan=lifespan) @app.post("/webhook") async def telegram_webhook(request: Request): """Endpoint for Telegram to send updates to.""" try: payload = await request.json() logger.info(f"Incoming webhook payload: {payload}") update = types.Update.model_validate(payload, context={"bot": bot}) await dp.feed_update(bot, update) except Exception as e: logger.exception("CRITICAL: Error processing webhook update:") raise e return Response(status_code=200) @app.get("/api/diagnostics") async def get_diagnostics(): """Retrieve connection and network resolution diagnostic logs.""" import os env_info = { "DATABASE_URL_set": bool(os.getenv("DATABASE_URL")), "WEBHOOK_URL_val": os.getenv("WEBHOOK_URL"), "TELEGRAM_API_SERVER_val": os.getenv("TELEGRAM_API_SERVER"), "BOT_TOKEN_set": bool(os.getenv("BOT_TOKEN")), "FX_API_KEY_set": bool(os.getenv("FX_API_KEY")) } return { "diagnostics": diagnostic_results, "env_info": env_info } @app.get("/api/history") async def get_rates_history(days: int = 30): """Retrieve historical exchange rates for all currencies over the past N days.""" from db.models import ExchangeRateHistory from db.session import async_session from datetime import datetime, timezone, timedelta cutoff = datetime.now(timezone.utc) - timedelta(days=days) async with async_session() as session: try: # Query history since cutoff, sorted by timestamp ascending result = await session.execute( select(ExchangeRateHistory) .where(ExchangeRateHistory.timestamp >= cutoff.replace(tzinfo=None)) .order_by(ExchangeRateHistory.timestamp.asc()) ) records = result.scalars().all() # Format as: { "USD": [{"date": "2026-05-18", "rate": 300.5}, ...], "EUR": [...] } history = {} for rec in records: cur = rec.currency if cur not in history: history[cur] = [] history[cur].append({ "date": rec.timestamp.strftime("%Y-%m-%d"), "rate": rec.rate_to_lkr }) return history except Exception as e: logger.error(f"Error fetching rate history from database: {e}") return {"error": "Failed to fetch rate history"} @app.get("/health") async def health_check(): """Health check endpoint required by Hugging Face Spaces.""" return {"status": "ok"} @app.get("/api/logs") async def get_logs(): """Retrieve in-memory logs for diagnostics.""" return {"logs": in_memory_logs.records} @app.get("/api/stats") async def get_system_stats(): """Retrieve database metrics and live LKR exchange rates.""" stats = {} try: async with async_session() as session: # Gather active subscribers count users_count = await session.scalar(select(func.count(User.chat_id)).where(User.is_subscribed == True)) stats["subscribers"] = users_count or 0 stats["active_subscriptions"] = 10 # 10 core LKR currencies tracked stats["active_thresholds"] = 1 # Unified active broadcast schedule stats["db_status"] = "connected" except Exception as e: logger.error(f"Error fetching stats from DB: {e}") stats["subscribers"] = 0 stats["active_subscriptions"] = 0 stats["active_thresholds"] = 0 stats["db_status"] = "error" # Fetch live exchange rates to LKR concurrently rates = {} currencies = ["USD", "EUR", "GBP", "AUD", "JPY", "AED", "SAR", "INR", "CNY", "QAR"] async def fetch_rate_safe(cur: str): try: val = await fx_service.get_rate(cur, "LKR") return cur, val or 0.0 except Exception as e: logger.error(f"Error fetching live rate for {cur}: {e}") return cur, 0.0 results = await asyncio.gather(*(fetch_rate_safe(cur) for cur in currencies)) for cur, rate in results: rates[cur] = rate stats["rates"] = rates stats["system_status"] = "online" return stats # Serve static dashboard app.mount("/static", StaticFiles(directory="static"), name="static") @app.get("/") async def get_dashboard(): """Serves the premium dashboard index file.""" return FileResponse("static/index.html")