import logging
import asyncio
from datetime import datetime
from aiogram import Bot
from aiogram.exceptions import TelegramForbiddenError, TelegramAPIError
from db.session import async_session
from db.models import User
from sqlalchemy import select
from services.fx_service import fx_service
logger = logging.getLogger(__name__)
CURRENCIES = ["USD", "EUR", "GBP", "AUD", "JPY", "AED", "SAR", "INR", "CNY", "QAR"]
FLAGS = {
"USD": "🇺🇸",
"EUR": "🇪🇺",
"GBP": "🇬🇧",
"AUD": "🇦🇺",
"JPY": "🇯🇵",
"AED": "🇦🇪",
"SAR": "🇸🇦",
"INR": "🇮🇳",
"CNY": "🇨🇳",
"QAR": "🇶🇦"
}
def format_rates_list(rates: dict) -> str:
"""Formats the rates dictionary into a beautifully styled HTML message."""
lines = [
"🌍 Daily LKR Exchange Rates 🌍",
f"📅 Date: {datetime.now().strftime('%Y-%m-%d')}",
""
]
for cur in CURRENCIES:
rate = rates.get(cur)
flag = FLAGS.get(cur, "💱")
if rate:
lines.append(f"{flag} 1 {cur} = {rate:.2f} LKR")
else:
lines.append(f"{flag} 1 {cur} = Scraper Offline")
lines.extend([
"",
"🔄 Rates are sourced directly from the Central Bank of Sri Lanka (CBSL).",
"❌ Type /unsubscribe to opt-out of these daily updates."
])
return "\n".join(lines)
async def process_daily_broadcast(bot: Bot):
"""Processes the scheduled daily broadcast to all subscribed users."""
logger.info("Starting daily exchange rate broadcast...")
# 1. Fetch all exchange rates concurrently
tasks = [fx_service.get_rate(cur, "LKR") for cur in CURRENCIES]
rates_values = await asyncio.gather(*tasks)
rates = dict(zip(CURRENCIES, rates_values))
# 2. Format the message
broadcast_message = format_rates_list(rates)
# 3. Retrieve all active subscribers
async with async_session() as session:
result = await session.execute(select(User.chat_id).where(User.is_subscribed == True))
subscribers = result.scalars().all()
if not subscribers:
logger.info("No active subscribers found for daily broadcast.")
return
logger.info(f"Broadcasting to {len(subscribers)} active subscribers...")
# 4. Deliver updates
for chat_id in subscribers:
try:
await bot.send_message(
chat_id=chat_id,
text=broadcast_message,
parse_mode="HTML"
)
logger.info(f"Daily broadcast sent successfully to chat_id: {chat_id}")
except TelegramForbiddenError:
# User blocked the bot - deactivate subscription to save future API limits
logger.warning(f"User {chat_id} has blocked the bot. Opting them out of daily updates...")
user_res = await session.execute(select(User).where(User.chat_id == chat_id))
user_obj = user_res.scalar_one_or_none()
if user_obj:
user_obj.is_subscribed = False
await session.commit()
except TelegramAPIError as e:
logger.error(f"Telegram API error when broadcasting to {chat_id}: {e}")
except Exception as e:
logger.error(f"Unexpected error when broadcasting to {chat_id}: {e}")
logger.info("Daily exchange rate broadcast completed successfully.")