File size: 12,320 Bytes
72fdc64
7803d4e
4b7e1b4
5cceeb9
7803d4e
 
370fc07
 
7803d4e
e4759e7
32a47e0
7803d4e
370fc07
7803d4e
5cceeb9
 
 
 
f370663
 
 
 
 
5cceeb9
 
 
7803d4e
370fc07
fec8feb
7803d4e
fec8feb
370fc07
7803d4e
 
 
 
 
d65b565
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7f642b4
 
 
 
d65b565
28fd032
 
 
e4759e7
 
 
32a47e0
 
e4759e7
 
 
7803d4e
 
 
 
 
 
 
 
28fd032
 
754db9c
 
f53ddba
754db9c
 
28fd032
754db9c
28fd032
754db9c
 
 
 
 
 
 
 
 
 
 
 
5b9bc06
28fd032
 
754db9c
28fd032
 
 
754db9c
7803d4e
8ae4c35
 
 
 
 
 
7803d4e
ffdcbc6
 
 
93d598b
 
61207aa
93d598b
 
ffdcbc6
 
 
 
 
 
8ae4c35
6b640dc
 
 
 
 
 
 
d24d9cd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6b640dc
8ae4c35
7803d4e
fec8feb
 
7803d4e
 
 
 
 
 
853c8a4
8ae4c35
 
 
 
 
 
7803d4e
 
 
 
 
 
 
7f642b4
 
 
 
 
 
 
 
7803d4e
 
28fd032
 
 
dc02f8a
 
 
 
 
 
 
 
 
 
 
 
28fd032
61207aa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7803d4e
 
 
 
370fc07
d65b565
 
 
 
 
370fc07
 
 
 
 
 
fec8feb
 
370fc07
 
fec8feb
 
370fc07
 
 
 
 
 
 
 
72fdc64
370fc07
61207aa
72fdc64
 
370fc07
72fdc64
 
370fc07
 
72fdc64
 
 
 
 
370fc07
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
import asyncio
import logging
import socket
import aiohttp
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, Response
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from aiogram import Bot, Dispatcher, types
from aiogram.client.telegram import TelegramAPIServer
from aiogram.client.session.aiohttp import AiohttpSession
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sqlalchemy import select, func

# Monkeypatch socket.getaddrinfo to bypass DNS resolution blocks for api.telegram.org
original_getaddrinfo = socket.getaddrinfo
def custom_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
    if host == "api.telegram.org":
        # Map to Telegram Bot API native IPv6 and IPv4 IPs
        return [
            (socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('2001:67c:4e8:f004::9', port, 0, 0)),
            (socket.AF_INET, socket.SOCK_STREAM, 6, '', ('149.154.166.110', port))
        ]
    return original_getaddrinfo(host, port, family, type, proto, flags)
socket.getaddrinfo = custom_getaddrinfo

from core.config import settings
from db.session import init_db, async_session
from db.models import User
from bot.handlers import router as bot_router
from bot.scheduler import process_daily_broadcast
from services.fx_service import fx_service

# Configure Logging
logging.basicConfig(level=getattr(logging, settings.log_level))
logger = logging.getLogger(__name__)

# InMemoryLogHandler to store logs for web diagnostics
class InMemoryLogHandler(logging.Handler):
    def __init__(self, limit=300):
        super().__init__()
        self.limit = limit
        self.records = []

    def emit(self, record):
        try:
            self.records.append(self.format(record))
            if len(self.records) > self.limit:
                self.records.pop(0)
        except Exception:
            pass

in_memory_logs = InMemoryLogHandler()
in_memory_logs.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(in_memory_logs)
logging.getLogger("uvicorn").addHandler(in_memory_logs)
logging.getLogger("uvicorn.error").addHandler(in_memory_logs)
logging.getLogger("uvicorn.access").addHandler(in_memory_logs)
logging.getLogger("aiogram").addHandler(in_memory_logs)

# Global diagnostic results storage
diagnostic_results = []

# Initialize Aiogram Bot (with custom reverse proxy API server if configured)
if settings.telegram_api_server:
    api_server = TelegramAPIServer.from_base(settings.telegram_api_server)
    session = AiohttpSession(api=api_server)
    bot = Bot(token=settings.bot_token, session=session)
    logger.info(f"Initialized Bot with custom API server: {settings.telegram_api_server}")
else:
    bot = Bot(token=settings.bot_token)
dp = Dispatcher()
dp.include_router(bot_router)

# Initialize Scheduler
scheduler = AsyncIOScheduler()

@asynccontextmanager
async def lifespan(app: FastAPI):
    global diagnostic_results
    diagnostic_results.append("Starting resolution & connection diagnostics...")
    
    # Check DNS resolution
    for host in ["api.telegram.org", "www.cbsl.gov.lk"]:
        try:
            ips = socket.getaddrinfo(host, 443)
            diagnostic_results.append(f"DNS Resolution for {host}: {ips}")
        except Exception as e:
            diagnostic_results.append(f"DNS Resolution failed for {host}: {type(e).__name__}: {e}")
            
    # Try different socket configurations
    configs = [
        ("Default (No family forced)", {}),
        ("Forced IPv4 (AF_INET)", {"family": socket.AF_INET}),
        ("Forced IPv6 (AF_INET6)", {"family": socket.AF_INET6})
    ]
    
    for name, kwargs in configs:
        try:
            connector = aiohttp.TCPConnector(**kwargs)
            async with aiohttp.ClientSession(connector=connector) as sess:
                async with sess.get(f"https://api.telegram.org/bot{settings.bot_token}/getMe", timeout=5) as r:
                    text = await r.text()
                    diagnostic_results.append(f"Configuration '{name}' SUCCESS: Status {r.status}, body: {text}")
        except Exception as e:
            diagnostic_results.append(f"Configuration '{name}' FAILED: {type(e).__name__}: {e}")
            
    diagnostic_results.append("Diagnostics finished.")

    # 1. Initialize Database
    try:
        await init_db()
        logger.info("Database initialized successfully.")
    except Exception as e:
        logger.error(f"CRITICAL: Failed to initialize database on startup. The database may be paused or unreachable: {e}")
        diagnostic_results.append(f"DB Initialization FAILED: {type(e).__name__}: {e}")

    # Register Bot Menu Commands
    try:
        commands = [
            types.BotCommand(command="start", description="Start FlyRates & subscribe to daily rates"),
            types.BotCommand(command="current", description="Fetch live LKR exchange rates list instantly"),
            types.BotCommand(command="history", description="View weekly trends with visual sparkline graphs"),
            types.BotCommand(command="unsubscribe", description="Opt-out of daily LKR rates broadcast"),
            types.BotCommand(command="help", description="Show help guide and commands reference")
        ]
        await bot.set_my_commands(commands)
        logger.info("Bot commands registered successfully.")
    except Exception as e:
        logger.error(f"Failed to register bot commands: {e}")

    # 3. Setup Webhook (if URL is provided) or Fallback to Polling
    try:
        if settings.webhook_url:
            webhook_info = await bot.get_webhook_info()
            if webhook_info.url != f"{settings.webhook_url}/webhook":
                await bot.set_webhook(url=f"{settings.webhook_url}/webhook")
                logger.info(f"Webhook set to {settings.webhook_url}/webhook")
        else:
            logger.info("WEBHOOK_URL not set. Running in local mode. Checking active webhook...")
            webhook_info = await bot.get_webhook_info()
            if webhook_info.url:
                if settings.delete_webhook_on_local:
                    logger.warning(f"Active webhook detected at '{webhook_info.url}'. Deleting it as delete_webhook_on_local is enabled...")
                    await bot.delete_webhook(drop_pending_updates=False)
                    polling_task = asyncio.create_task(dp.start_polling(bot))
                    app.state.polling_task = polling_task
                else:
                    logger.warning(
                        f"Active webhook detected at '{webhook_info.url}'. "
                        "To protect the production bot, local long polling will NOT delete it, and polling is skipped. "
                        "If you wish to test locally and delete the webhook, set DELETE_WEBHOOK_ON_LOCAL=True in your local .env file."
                    )
            else:
                logger.info("No active webhook. Starting Bot in Long Polling mode...")
                polling_task = asyncio.create_task(dp.start_polling(bot))
                app.state.polling_task = polling_task
    except Exception as e:
        logger.error(f"Failed to setup Telegram interaction during startup: {e}")

    # 3. Setup Scheduled Jobs (exactly one clean daily broadcast at 8:00 AM)
    scheduler.add_job(process_daily_broadcast, 'cron', hour=8, args=[bot])
    scheduler.start()

    yield

    # Shutdown logic
    scheduler.shutdown()
    if not settings.webhook_url:
        if hasattr(app.state, "polling_task"):
            app.state.polling_task.cancel()
            try:
                await app.state.polling_task
            except asyncio.CancelledError:
                pass
    await bot.session.close()

app = FastAPI(lifespan=lifespan)

@app.post("/webhook")
async def telegram_webhook(request: Request):
    """Endpoint for Telegram to send updates to."""
    try:
        payload = await request.json()
        logger.info(f"Incoming webhook payload: {payload}")
        update = types.Update.model_validate(payload, context={"bot": bot})
        await dp.feed_update(bot, update)
    except Exception as e:
        logger.exception("CRITICAL: Error processing webhook update:")
        raise e
    return Response(status_code=200)

@app.get("/api/diagnostics")
async def get_diagnostics():
    """Retrieve connection and network resolution diagnostic logs."""
    import os
    env_info = {
        "DATABASE_URL_set": bool(os.getenv("DATABASE_URL")),
        "WEBHOOK_URL_val": os.getenv("WEBHOOK_URL"),
        "TELEGRAM_API_SERVER_val": os.getenv("TELEGRAM_API_SERVER"),
        "BOT_TOKEN_set": bool(os.getenv("BOT_TOKEN")),
        "FX_API_KEY_set": bool(os.getenv("FX_API_KEY"))
    }
    return {
        "diagnostics": diagnostic_results,
        "env_info": env_info
    }

@app.get("/api/history")
async def get_rates_history(days: int = 30):
    """Retrieve historical exchange rates for all currencies over the past N days."""
    from db.models import ExchangeRateHistory
    from db.session import async_session
    from datetime import datetime, timezone, timedelta
    
    cutoff = datetime.now(timezone.utc) - timedelta(days=days)
    
    async with async_session() as session:
        try:
            # Query history since cutoff, sorted by timestamp ascending
            result = await session.execute(
                select(ExchangeRateHistory)
                .where(ExchangeRateHistory.timestamp >= cutoff.replace(tzinfo=None))
                .order_by(ExchangeRateHistory.timestamp.asc())
            )
            records = result.scalars().all()
            
            # Format as: { "USD": [{"date": "2026-05-18", "rate": 300.5}, ...], "EUR": [...] }
            history = {}
            for rec in records:
                cur = rec.currency
                if cur not in history:
                    history[cur] = []
                history[cur].append({
                    "date": rec.timestamp.strftime("%Y-%m-%d"),
                    "rate": rec.rate_to_lkr
                })
            return history
        except Exception as e:
            logger.error(f"Error fetching rate history from database: {e}")
            return {"error": "Failed to fetch rate history"}

@app.get("/health")
async def health_check():
    """Health check endpoint required by Hugging Face Spaces."""
    return {"status": "ok"}

@app.get("/api/logs")
async def get_logs():
    """Retrieve in-memory logs for diagnostics."""
    return {"logs": in_memory_logs.records}

@app.get("/api/stats")
async def get_system_stats():
    """Retrieve database metrics and live LKR exchange rates."""
    stats = {}
    try:
        async with async_session() as session:
            # Gather active subscribers count
            users_count = await session.scalar(select(func.count(User.chat_id)).where(User.is_subscribed == True))
            
            stats["subscribers"] = users_count or 0
            stats["active_subscriptions"] = 10  # 10 core LKR currencies tracked
            stats["active_thresholds"] = 1     # Unified active broadcast schedule
            stats["db_status"] = "connected"
    except Exception as e:
        logger.error(f"Error fetching stats from DB: {e}")
        stats["subscribers"] = 0
        stats["active_subscriptions"] = 0
        stats["active_thresholds"] = 0
        stats["db_status"] = "error"

    # Fetch live exchange rates to LKR concurrently
    rates = {}
    currencies = ["USD", "EUR", "GBP", "AUD", "JPY", "AED", "SAR", "INR", "CNY", "QAR"]
    
    async def fetch_rate_safe(cur: str):
        try:
            val = await fx_service.get_rate(cur, "LKR")
            return cur, val or 0.0
        except Exception as e:
            logger.error(f"Error fetching live rate for {cur}: {e}")
            return cur, 0.0

    results = await asyncio.gather(*(fetch_rate_safe(cur) for cur in currencies))
    for cur, rate in results:
        rates[cur] = rate
            
    stats["rates"] = rates
    stats["system_status"] = "online"
    
    return stats

# Serve static dashboard
app.mount("/static", StaticFiles(directory="static"), name="static")

@app.get("/")
async def get_dashboard():
    """Serves the premium dashboard index file."""
    return FileResponse("static/index.html")