import asyncio from datetime import datetime import logging from typing import Dict, List import json logger = logging.getLogger(__name__) class EventMonitor: def __init__(self, bot): self.bot = bot self.monitoring_tasks: Dict[int, asyncio.Task] = {} # user_id -> task self.user_filters: Dict[int, dict] = {} # user_id -> filters self.last_events: Dict[int, List[str]] = {} # user_id -> list of event IDs async def start_monitoring(self, user_id: int, filters: dict): """Start monitoring events for a user with given filters""" if user_id in self.monitoring_tasks: # Stop existing task if any self.monitoring_tasks[user_id].cancel() # Store user filters self.user_filters[user_id] = filters self.last_events[user_id] = [] # Create and start new monitoring task task = asyncio.create_task(self._monitor_loop(user_id)) self.monitoring_tasks[user_id] = task return True async def stop_monitoring(self, user_id: int): """Stop monitoring events for a user""" if user_id in self.monitoring_tasks: self.monitoring_tasks[user_id].cancel() del self.monitoring_tasks[user_id] del self.user_filters[user_id] del self.last_events[user_id] return True return False async def _monitor_loop(self, user_id: int): """Main monitoring loop""" while True: try: filters = self.user_filters[user_id] # Get current events using existing payload method payload = self.bot._get_events_payload(**filters) response = await self.bot._make_request(payload) events = response["data"]["eventCollection"]["items"] # Check for new events current_event_ids = [event['id'] for event in events] new_events = [event for event in events if event['id'] not in self.last_events[user_id]] if new_events: # Send notification for each new event for event in new_events: message = self._format_event_message(event) await self.bot.send_message(user_id, message) # Update last events list self.last_events[user_id] = current_event_ids # Wait for 1 minute before next check await asyncio.sleep(60) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in monitor loop for user {user_id}: {e}") await asyncio.sleep(60) # Wait before retrying def _format_event_message(self, event: dict) -> str: """Format event details into a message""" message_parts = [ "🎉 *New Event Alert!* 🎉", f"🏆 *{event.get('title', 'N/A')}*", ] if event.get('subtitle'): message_parts.append(f"_{event.get('subtitle')}_") location = event.get('location', {}).get('title', 'N/A') message_parts.append(f"📍 Location: {location}") price = event.get('startingPrice', 'N/A') currency = event.get('currencyCode', 'N/A') message_parts.append(f"💰 Starting Price: {price} {currency}") schedule = event.get('schedule', {}) if schedule.get('openDateTime'): message_parts.append(f"🕒 Starts: {schedule['openDateTime']}") if event.get('ticketingUrlSlug'): message_parts.append( f"[More Info/Tickets](https://webook.com/ar/events/{event['ticketingUrlSlug']})" ) return "\n".join(message_parts)