File size: 12,320 Bytes
72fdc64 7803d4e 4b7e1b4 5cceeb9 7803d4e 370fc07 7803d4e e4759e7 32a47e0 7803d4e 370fc07 7803d4e 5cceeb9 f370663 5cceeb9 7803d4e 370fc07 fec8feb 7803d4e fec8feb 370fc07 7803d4e d65b565 7f642b4 d65b565 28fd032 e4759e7 32a47e0 e4759e7 7803d4e 28fd032 754db9c f53ddba 754db9c 28fd032 754db9c 28fd032 754db9c 5b9bc06 28fd032 754db9c 28fd032 754db9c 7803d4e 8ae4c35 7803d4e ffdcbc6 93d598b 61207aa 93d598b ffdcbc6 8ae4c35 6b640dc d24d9cd 6b640dc 8ae4c35 7803d4e fec8feb 7803d4e 853c8a4 8ae4c35 7803d4e 7f642b4 7803d4e 28fd032 dc02f8a 28fd032 61207aa 7803d4e 370fc07 d65b565 370fc07 fec8feb 370fc07 fec8feb 370fc07 72fdc64 370fc07 61207aa 72fdc64 370fc07 72fdc64 370fc07 72fdc64 370fc07 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 | import asyncio
import logging
import socket
import aiohttp
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, Response
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from aiogram import Bot, Dispatcher, types
from aiogram.client.telegram import TelegramAPIServer
from aiogram.client.session.aiohttp import AiohttpSession
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sqlalchemy import select, func
# Monkeypatch socket.getaddrinfo to bypass DNS resolution blocks for api.telegram.org
original_getaddrinfo = socket.getaddrinfo
def custom_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
if host == "api.telegram.org":
# Map to Telegram Bot API native IPv6 and IPv4 IPs
return [
(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('2001:67c:4e8:f004::9', port, 0, 0)),
(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('149.154.166.110', port))
]
return original_getaddrinfo(host, port, family, type, proto, flags)
socket.getaddrinfo = custom_getaddrinfo
from core.config import settings
from db.session import init_db, async_session
from db.models import User
from bot.handlers import router as bot_router
from bot.scheduler import process_daily_broadcast
from services.fx_service import fx_service
# Configure Logging
logging.basicConfig(level=getattr(logging, settings.log_level))
logger = logging.getLogger(__name__)
# InMemoryLogHandler to store logs for web diagnostics
class InMemoryLogHandler(logging.Handler):
def __init__(self, limit=300):
super().__init__()
self.limit = limit
self.records = []
def emit(self, record):
try:
self.records.append(self.format(record))
if len(self.records) > self.limit:
self.records.pop(0)
except Exception:
pass
in_memory_logs = InMemoryLogHandler()
in_memory_logs.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(in_memory_logs)
logging.getLogger("uvicorn").addHandler(in_memory_logs)
logging.getLogger("uvicorn.error").addHandler(in_memory_logs)
logging.getLogger("uvicorn.access").addHandler(in_memory_logs)
logging.getLogger("aiogram").addHandler(in_memory_logs)
# Global diagnostic results storage
diagnostic_results = []
# Initialize Aiogram Bot (with custom reverse proxy API server if configured)
if settings.telegram_api_server:
api_server = TelegramAPIServer.from_base(settings.telegram_api_server)
session = AiohttpSession(api=api_server)
bot = Bot(token=settings.bot_token, session=session)
logger.info(f"Initialized Bot with custom API server: {settings.telegram_api_server}")
else:
bot = Bot(token=settings.bot_token)
dp = Dispatcher()
dp.include_router(bot_router)
# Initialize Scheduler
scheduler = AsyncIOScheduler()
@asynccontextmanager
async def lifespan(app: FastAPI):
global diagnostic_results
diagnostic_results.append("Starting resolution & connection diagnostics...")
# Check DNS resolution
for host in ["api.telegram.org", "www.cbsl.gov.lk"]:
try:
ips = socket.getaddrinfo(host, 443)
diagnostic_results.append(f"DNS Resolution for {host}: {ips}")
except Exception as e:
diagnostic_results.append(f"DNS Resolution failed for {host}: {type(e).__name__}: {e}")
# Try different socket configurations
configs = [
("Default (No family forced)", {}),
("Forced IPv4 (AF_INET)", {"family": socket.AF_INET}),
("Forced IPv6 (AF_INET6)", {"family": socket.AF_INET6})
]
for name, kwargs in configs:
try:
connector = aiohttp.TCPConnector(**kwargs)
async with aiohttp.ClientSession(connector=connector) as sess:
async with sess.get(f"https://api.telegram.org/bot{settings.bot_token}/getMe", timeout=5) as r:
text = await r.text()
diagnostic_results.append(f"Configuration '{name}' SUCCESS: Status {r.status}, body: {text}")
except Exception as e:
diagnostic_results.append(f"Configuration '{name}' FAILED: {type(e).__name__}: {e}")
diagnostic_results.append("Diagnostics finished.")
# 1. Initialize Database
try:
await init_db()
logger.info("Database initialized successfully.")
except Exception as e:
logger.error(f"CRITICAL: Failed to initialize database on startup. The database may be paused or unreachable: {e}")
diagnostic_results.append(f"DB Initialization FAILED: {type(e).__name__}: {e}")
# Register Bot Menu Commands
try:
commands = [
types.BotCommand(command="start", description="Start FlyRates & subscribe to daily rates"),
types.BotCommand(command="current", description="Fetch live LKR exchange rates list instantly"),
types.BotCommand(command="history", description="View weekly trends with visual sparkline graphs"),
types.BotCommand(command="unsubscribe", description="Opt-out of daily LKR rates broadcast"),
types.BotCommand(command="help", description="Show help guide and commands reference")
]
await bot.set_my_commands(commands)
logger.info("Bot commands registered successfully.")
except Exception as e:
logger.error(f"Failed to register bot commands: {e}")
# 3. Setup Webhook (if URL is provided) or Fallback to Polling
try:
if settings.webhook_url:
webhook_info = await bot.get_webhook_info()
if webhook_info.url != f"{settings.webhook_url}/webhook":
await bot.set_webhook(url=f"{settings.webhook_url}/webhook")
logger.info(f"Webhook set to {settings.webhook_url}/webhook")
else:
logger.info("WEBHOOK_URL not set. Running in local mode. Checking active webhook...")
webhook_info = await bot.get_webhook_info()
if webhook_info.url:
if settings.delete_webhook_on_local:
logger.warning(f"Active webhook detected at '{webhook_info.url}'. Deleting it as delete_webhook_on_local is enabled...")
await bot.delete_webhook(drop_pending_updates=False)
polling_task = asyncio.create_task(dp.start_polling(bot))
app.state.polling_task = polling_task
else:
logger.warning(
f"Active webhook detected at '{webhook_info.url}'. "
"To protect the production bot, local long polling will NOT delete it, and polling is skipped. "
"If you wish to test locally and delete the webhook, set DELETE_WEBHOOK_ON_LOCAL=True in your local .env file."
)
else:
logger.info("No active webhook. Starting Bot in Long Polling mode...")
polling_task = asyncio.create_task(dp.start_polling(bot))
app.state.polling_task = polling_task
except Exception as e:
logger.error(f"Failed to setup Telegram interaction during startup: {e}")
# 3. Setup Scheduled Jobs (exactly one clean daily broadcast at 8:00 AM)
scheduler.add_job(process_daily_broadcast, 'cron', hour=8, args=[bot])
scheduler.start()
yield
# Shutdown logic
scheduler.shutdown()
if not settings.webhook_url:
if hasattr(app.state, "polling_task"):
app.state.polling_task.cancel()
try:
await app.state.polling_task
except asyncio.CancelledError:
pass
await bot.session.close()
app = FastAPI(lifespan=lifespan)
@app.post("/webhook")
async def telegram_webhook(request: Request):
"""Endpoint for Telegram to send updates to."""
try:
payload = await request.json()
logger.info(f"Incoming webhook payload: {payload}")
update = types.Update.model_validate(payload, context={"bot": bot})
await dp.feed_update(bot, update)
except Exception as e:
logger.exception("CRITICAL: Error processing webhook update:")
raise e
return Response(status_code=200)
@app.get("/api/diagnostics")
async def get_diagnostics():
"""Retrieve connection and network resolution diagnostic logs."""
import os
env_info = {
"DATABASE_URL_set": bool(os.getenv("DATABASE_URL")),
"WEBHOOK_URL_val": os.getenv("WEBHOOK_URL"),
"TELEGRAM_API_SERVER_val": os.getenv("TELEGRAM_API_SERVER"),
"BOT_TOKEN_set": bool(os.getenv("BOT_TOKEN")),
"FX_API_KEY_set": bool(os.getenv("FX_API_KEY"))
}
return {
"diagnostics": diagnostic_results,
"env_info": env_info
}
@app.get("/api/history")
async def get_rates_history(days: int = 30):
"""Retrieve historical exchange rates for all currencies over the past N days."""
from db.models import ExchangeRateHistory
from db.session import async_session
from datetime import datetime, timezone, timedelta
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
async with async_session() as session:
try:
# Query history since cutoff, sorted by timestamp ascending
result = await session.execute(
select(ExchangeRateHistory)
.where(ExchangeRateHistory.timestamp >= cutoff.replace(tzinfo=None))
.order_by(ExchangeRateHistory.timestamp.asc())
)
records = result.scalars().all()
# Format as: { "USD": [{"date": "2026-05-18", "rate": 300.5}, ...], "EUR": [...] }
history = {}
for rec in records:
cur = rec.currency
if cur not in history:
history[cur] = []
history[cur].append({
"date": rec.timestamp.strftime("%Y-%m-%d"),
"rate": rec.rate_to_lkr
})
return history
except Exception as e:
logger.error(f"Error fetching rate history from database: {e}")
return {"error": "Failed to fetch rate history"}
@app.get("/health")
async def health_check():
"""Health check endpoint required by Hugging Face Spaces."""
return {"status": "ok"}
@app.get("/api/logs")
async def get_logs():
"""Retrieve in-memory logs for diagnostics."""
return {"logs": in_memory_logs.records}
@app.get("/api/stats")
async def get_system_stats():
"""Retrieve database metrics and live LKR exchange rates."""
stats = {}
try:
async with async_session() as session:
# Gather active subscribers count
users_count = await session.scalar(select(func.count(User.chat_id)).where(User.is_subscribed == True))
stats["subscribers"] = users_count or 0
stats["active_subscriptions"] = 10 # 10 core LKR currencies tracked
stats["active_thresholds"] = 1 # Unified active broadcast schedule
stats["db_status"] = "connected"
except Exception as e:
logger.error(f"Error fetching stats from DB: {e}")
stats["subscribers"] = 0
stats["active_subscriptions"] = 0
stats["active_thresholds"] = 0
stats["db_status"] = "error"
# Fetch live exchange rates to LKR concurrently
rates = {}
currencies = ["USD", "EUR", "GBP", "AUD", "JPY", "AED", "SAR", "INR", "CNY", "QAR"]
async def fetch_rate_safe(cur: str):
try:
val = await fx_service.get_rate(cur, "LKR")
return cur, val or 0.0
except Exception as e:
logger.error(f"Error fetching live rate for {cur}: {e}")
return cur, 0.0
results = await asyncio.gather(*(fetch_rate_safe(cur) for cur in currencies))
for cur, rate in results:
rates[cur] = rate
stats["rates"] = rates
stats["system_status"] = "online"
return stats
# Serve static dashboard
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/")
async def get_dashboard():
"""Serves the premium dashboard index file."""
return FileResponse("static/index.html")
|