File size: 3,589 Bytes
7803d4e fec8feb 7803d4e fec8feb 7803d4e fec8feb 7803d4e fec8feb 7803d4e fec8feb 7803d4e fec8feb 7803d4e fec8feb 7803d4e fec8feb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | import logging
import asyncio
from datetime import datetime
from aiogram import Bot
from aiogram.exceptions import TelegramForbiddenError, TelegramAPIError
from db.session import async_session
from db.models import User
from sqlalchemy import select
from services.fx_service import fx_service
logger = logging.getLogger(__name__)
CURRENCIES = ["USD", "EUR", "GBP", "AUD", "JPY", "AED", "SAR", "INR", "CNY", "QAR"]
FLAGS = {
"USD": "๐บ๐ธ",
"EUR": "๐ช๐บ",
"GBP": "๐ฌ๐ง",
"AUD": "๐ฆ๐บ",
"JPY": "๐ฏ๐ต",
"AED": "๐ฆ๐ช",
"SAR": "๐ธ๐ฆ",
"INR": "๐ฎ๐ณ",
"CNY": "๐จ๐ณ",
"QAR": "๐ถ๐ฆ"
}
def format_rates_list(rates: dict) -> str:
"""Formats the rates dictionary into a beautifully styled HTML message."""
lines = [
"๐ <b>Daily LKR Exchange Rates</b> ๐",
f"๐
<i>Date: {datetime.now().strftime('%Y-%m-%d')}</i>",
""
]
for cur in CURRENCIES:
rate = rates.get(cur)
flag = FLAGS.get(cur, "๐ฑ")
if rate:
lines.append(f"{flag} <b>1 {cur}</b> = {rate:.2f} LKR")
else:
lines.append(f"{flag} <b>1 {cur}</b> = <i>Scraper Offline</i>")
lines.extend([
"",
"๐ <i>Rates are sourced directly from the Central Bank of Sri Lanka (CBSL).</i>",
"โ Type /unsubscribe to opt-out of these daily updates."
])
return "\n".join(lines)
async def process_daily_broadcast(bot: Bot):
"""Processes the scheduled daily broadcast to all subscribed users."""
logger.info("Starting daily exchange rate broadcast...")
# 1. Fetch all exchange rates concurrently
tasks = [fx_service.get_rate(cur, "LKR") for cur in CURRENCIES]
rates_values = await asyncio.gather(*tasks)
rates = dict(zip(CURRENCIES, rates_values))
# 2. Format the message
broadcast_message = format_rates_list(rates)
# 3. Retrieve all active subscribers
async with async_session() as session:
result = await session.execute(select(User.chat_id).where(User.is_subscribed == True))
subscribers = result.scalars().all()
if not subscribers:
logger.info("No active subscribers found for daily broadcast.")
return
logger.info(f"Broadcasting to {len(subscribers)} active subscribers...")
# 4. Deliver updates
for chat_id in subscribers:
try:
await bot.send_message(
chat_id=chat_id,
text=broadcast_message,
parse_mode="HTML"
)
logger.info(f"Daily broadcast sent successfully to chat_id: {chat_id}")
except TelegramForbiddenError:
# User blocked the bot - deactivate subscription to save future API limits
logger.warning(f"User {chat_id} has blocked the bot. Opting them out of daily updates...")
user_res = await session.execute(select(User).where(User.chat_id == chat_id))
user_obj = user_res.scalar_one_or_none()
if user_obj:
user_obj.is_subscribed = False
await session.commit()
except TelegramAPIError as e:
logger.error(f"Telegram API error when broadcasting to {chat_id}: {e}")
except Exception as e:
logger.error(f"Unexpected error when broadcasting to {chat_id}: {e}")
logger.info("Daily exchange rate broadcast completed successfully.")
|