|
|
import asyncio |
|
|
from datetime import datetime |
|
|
import logging |
|
|
from typing import Dict, List |
|
|
import json |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class EventMonitor: |
|
|
def __init__(self, bot): |
|
|
self.bot = bot |
|
|
self.monitoring_tasks: Dict[int, asyncio.Task] = {} |
|
|
self.user_filters: Dict[int, dict] = {} |
|
|
self.last_events: Dict[int, List[str]] = {} |
|
|
|
|
|
async def start_monitoring(self, user_id: int, filters: dict): |
|
|
"""Start monitoring events for a user with given filters""" |
|
|
if user_id in self.monitoring_tasks: |
|
|
|
|
|
self.monitoring_tasks[user_id].cancel() |
|
|
|
|
|
|
|
|
self.user_filters[user_id] = filters |
|
|
self.last_events[user_id] = [] |
|
|
|
|
|
|
|
|
task = asyncio.create_task(self._monitor_loop(user_id)) |
|
|
self.monitoring_tasks[user_id] = task |
|
|
|
|
|
return True |
|
|
|
|
|
async def stop_monitoring(self, user_id: int): |
|
|
"""Stop monitoring events for a user""" |
|
|
if user_id in self.monitoring_tasks: |
|
|
self.monitoring_tasks[user_id].cancel() |
|
|
del self.monitoring_tasks[user_id] |
|
|
del self.user_filters[user_id] |
|
|
del self.last_events[user_id] |
|
|
return True |
|
|
return False |
|
|
|
|
|
async def _monitor_loop(self, user_id: int): |
|
|
"""Main monitoring loop""" |
|
|
while True: |
|
|
try: |
|
|
filters = self.user_filters[user_id] |
|
|
|
|
|
payload = self.bot._get_events_payload(**filters) |
|
|
response = await self.bot._make_request(payload) |
|
|
events = response["data"]["eventCollection"]["items"] |
|
|
|
|
|
|
|
|
current_event_ids = [event['id'] for event in events] |
|
|
new_events = [event for event in events |
|
|
if event['id'] not in self.last_events[user_id]] |
|
|
|
|
|
if new_events: |
|
|
|
|
|
for event in new_events: |
|
|
message = self._format_event_message(event) |
|
|
await self.bot.send_message(user_id, message) |
|
|
|
|
|
|
|
|
self.last_events[user_id] = current_event_ids |
|
|
|
|
|
|
|
|
await asyncio.sleep(60) |
|
|
|
|
|
except asyncio.CancelledError: |
|
|
break |
|
|
except Exception as e: |
|
|
logger.error(f"Error in monitor loop for user {user_id}: {e}") |
|
|
await asyncio.sleep(60) |
|
|
|
|
|
def _format_event_message(self, event: dict) -> str: |
|
|
"""Format event details into a message""" |
|
|
message_parts = [ |
|
|
"π *New Event Alert!* π", |
|
|
f"π *{event.get('title', 'N/A')}*", |
|
|
] |
|
|
|
|
|
if event.get('subtitle'): |
|
|
message_parts.append(f"_{event.get('subtitle')}_") |
|
|
|
|
|
location = event.get('location', {}).get('title', 'N/A') |
|
|
message_parts.append(f"π Location: {location}") |
|
|
|
|
|
price = event.get('startingPrice', 'N/A') |
|
|
currency = event.get('currencyCode', 'N/A') |
|
|
message_parts.append(f"π° Starting Price: {price} {currency}") |
|
|
|
|
|
schedule = event.get('schedule', {}) |
|
|
if schedule.get('openDateTime'): |
|
|
message_parts.append(f"π Starts: {schedule['openDateTime']}") |
|
|
|
|
|
if event.get('ticketingUrlSlug'): |
|
|
message_parts.append( |
|
|
f"[More Info/Tickets](https://webook.com/ar/events/{event['ticketingUrlSlug']})" |
|
|
) |
|
|
|
|
|
return "\n".join(message_parts) |