File size: 3,986 Bytes
f959aff |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
import asyncio
from datetime import datetime
import logging
from typing import Dict, List
import json
logger = logging.getLogger(__name__)
class EventMonitor:
def __init__(self, bot):
self.bot = bot
self.monitoring_tasks: Dict[int, asyncio.Task] = {} # user_id -> task
self.user_filters: Dict[int, dict] = {} # user_id -> filters
self.last_events: Dict[int, List[str]] = {} # user_id -> list of event IDs
async def start_monitoring(self, user_id: int, filters: dict):
"""Start monitoring events for a user with given filters"""
if user_id in self.monitoring_tasks:
# Stop existing task if any
self.monitoring_tasks[user_id].cancel()
# Store user filters
self.user_filters[user_id] = filters
self.last_events[user_id] = []
# Create and start new monitoring task
task = asyncio.create_task(self._monitor_loop(user_id))
self.monitoring_tasks[user_id] = task
return True
async def stop_monitoring(self, user_id: int):
"""Stop monitoring events for a user"""
if user_id in self.monitoring_tasks:
self.monitoring_tasks[user_id].cancel()
del self.monitoring_tasks[user_id]
del self.user_filters[user_id]
del self.last_events[user_id]
return True
return False
async def _monitor_loop(self, user_id: int):
"""Main monitoring loop"""
while True:
try:
filters = self.user_filters[user_id]
# Get current events using existing payload method
payload = self.bot._get_events_payload(**filters)
response = await self.bot._make_request(payload)
events = response["data"]["eventCollection"]["items"]
# Check for new events
current_event_ids = [event['id'] for event in events]
new_events = [event for event in events
if event['id'] not in self.last_events[user_id]]
if new_events:
# Send notification for each new event
for event in new_events:
message = self._format_event_message(event)
await self.bot.send_message(user_id, message)
# Update last events list
self.last_events[user_id] = current_event_ids
# Wait for 1 minute before next check
await asyncio.sleep(60)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in monitor loop for user {user_id}: {e}")
await asyncio.sleep(60) # Wait before retrying
def _format_event_message(self, event: dict) -> str:
"""Format event details into a message"""
message_parts = [
"π *New Event Alert!* π",
f"π *{event.get('title', 'N/A')}*",
]
if event.get('subtitle'):
message_parts.append(f"_{event.get('subtitle')}_")
location = event.get('location', {}).get('title', 'N/A')
message_parts.append(f"π Location: {location}")
price = event.get('startingPrice', 'N/A')
currency = event.get('currencyCode', 'N/A')
message_parts.append(f"π° Starting Price: {price} {currency}")
schedule = event.get('schedule', {})
if schedule.get('openDateTime'):
message_parts.append(f"π Starts: {schedule['openDateTime']}")
if event.get('ticketingUrlSlug'):
message_parts.append(
f"[More Info/Tickets](https://webook.com/ar/events/{event['ticketingUrlSlug']})"
)
return "\n".join(message_parts) |