File size: 3,589 Bytes
7803d4e
fec8feb
 
7803d4e
fec8feb
7803d4e
fec8feb
7803d4e
 
 
 
 
fec8feb
7803d4e
fec8feb
 
 
 
 
 
 
 
 
 
 
 
7803d4e
fec8feb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7803d4e
fec8feb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7803d4e
fec8feb
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import logging
import asyncio
from datetime import datetime
from aiogram import Bot
from aiogram.exceptions import TelegramForbiddenError, TelegramAPIError
from db.session import async_session
from db.models import User
from sqlalchemy import select
from services.fx_service import fx_service

logger = logging.getLogger(__name__)

CURRENCIES = ["USD", "EUR", "GBP", "AUD", "JPY", "AED", "SAR", "INR", "CNY", "QAR"]

FLAGS = {
    "USD": "๐Ÿ‡บ๐Ÿ‡ธ",
    "EUR": "๐Ÿ‡ช๐Ÿ‡บ",
    "GBP": "๐Ÿ‡ฌ๐Ÿ‡ง",
    "AUD": "๐Ÿ‡ฆ๐Ÿ‡บ",
    "JPY": "๐Ÿ‡ฏ๐Ÿ‡ต",
    "AED": "๐Ÿ‡ฆ๐Ÿ‡ช",
    "SAR": "๐Ÿ‡ธ๐Ÿ‡ฆ",
    "INR": "๐Ÿ‡ฎ๐Ÿ‡ณ",
    "CNY": "๐Ÿ‡จ๐Ÿ‡ณ",
    "QAR": "๐Ÿ‡ถ๐Ÿ‡ฆ"
}

def format_rates_list(rates: dict) -> str:
    """Formats the rates dictionary into a beautifully styled HTML message."""
    lines = [
        "๐ŸŒ <b>Daily LKR Exchange Rates</b> ๐ŸŒ",
        f"๐Ÿ“… <i>Date: {datetime.now().strftime('%Y-%m-%d')}</i>",
        ""
    ]
    for cur in CURRENCIES:
        rate = rates.get(cur)
        flag = FLAGS.get(cur, "๐Ÿ’ฑ")
        if rate:
            lines.append(f"{flag} <b>1 {cur}</b> = {rate:.2f} LKR")
        else:
            lines.append(f"{flag} <b>1 {cur}</b> = <i>Scraper Offline</i>")
            
    lines.extend([
        "",
        "๐Ÿ”„ <i>Rates are sourced directly from the Central Bank of Sri Lanka (CBSL).</i>",
        "โŒ Type /unsubscribe to opt-out of these daily updates."
    ])
    return "\n".join(lines)

async def process_daily_broadcast(bot: Bot):
    """Processes the scheduled daily broadcast to all subscribed users."""
    logger.info("Starting daily exchange rate broadcast...")
    
    # 1. Fetch all exchange rates concurrently
    tasks = [fx_service.get_rate(cur, "LKR") for cur in CURRENCIES]
    rates_values = await asyncio.gather(*tasks)
    rates = dict(zip(CURRENCIES, rates_values))
    
    # 2. Format the message
    broadcast_message = format_rates_list(rates)
    
    # 3. Retrieve all active subscribers
    async with async_session() as session:
        result = await session.execute(select(User.chat_id).where(User.is_subscribed == True))
        subscribers = result.scalars().all()
        
        if not subscribers:
            logger.info("No active subscribers found for daily broadcast.")
            return
            
        logger.info(f"Broadcasting to {len(subscribers)} active subscribers...")
        
        # 4. Deliver updates
        for chat_id in subscribers:
            try:
                await bot.send_message(
                    chat_id=chat_id,
                    text=broadcast_message,
                    parse_mode="HTML"
                )
                logger.info(f"Daily broadcast sent successfully to chat_id: {chat_id}")
            except TelegramForbiddenError:
                # User blocked the bot - deactivate subscription to save future API limits
                logger.warning(f"User {chat_id} has blocked the bot. Opting them out of daily updates...")
                user_res = await session.execute(select(User).where(User.chat_id == chat_id))
                user_obj = user_res.scalar_one_or_none()
                if user_obj:
                    user_obj.is_subscribed = False
                    await session.commit()
            except TelegramAPIError as e:
                logger.error(f"Telegram API error when broadcasting to {chat_id}: {e}")
            except Exception as e:
                logger.error(f"Unexpected error when broadcasting to {chat_id}: {e}")
                
    logger.info("Daily exchange rate broadcast completed successfully.")