webook / monitor.py
Mohammed Foud
Add application file
f959aff
import asyncio
from datetime import datetime
import logging
from typing import Dict, List
import json
logger = logging.getLogger(__name__)
class EventMonitor:
def __init__(self, bot):
self.bot = bot
self.monitoring_tasks: Dict[int, asyncio.Task] = {} # user_id -> task
self.user_filters: Dict[int, dict] = {} # user_id -> filters
self.last_events: Dict[int, List[str]] = {} # user_id -> list of event IDs
async def start_monitoring(self, user_id: int, filters: dict):
"""Start monitoring events for a user with given filters"""
if user_id in self.monitoring_tasks:
# Stop existing task if any
self.monitoring_tasks[user_id].cancel()
# Store user filters
self.user_filters[user_id] = filters
self.last_events[user_id] = []
# Create and start new monitoring task
task = asyncio.create_task(self._monitor_loop(user_id))
self.monitoring_tasks[user_id] = task
return True
async def stop_monitoring(self, user_id: int):
"""Stop monitoring events for a user"""
if user_id in self.monitoring_tasks:
self.monitoring_tasks[user_id].cancel()
del self.monitoring_tasks[user_id]
del self.user_filters[user_id]
del self.last_events[user_id]
return True
return False
async def _monitor_loop(self, user_id: int):
"""Main monitoring loop"""
while True:
try:
filters = self.user_filters[user_id]
# Get current events using existing payload method
payload = self.bot._get_events_payload(**filters)
response = await self.bot._make_request(payload)
events = response["data"]["eventCollection"]["items"]
# Check for new events
current_event_ids = [event['id'] for event in events]
new_events = [event for event in events
if event['id'] not in self.last_events[user_id]]
if new_events:
# Send notification for each new event
for event in new_events:
message = self._format_event_message(event)
await self.bot.send_message(user_id, message)
# Update last events list
self.last_events[user_id] = current_event_ids
# Wait for 1 minute before next check
await asyncio.sleep(60)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in monitor loop for user {user_id}: {e}")
await asyncio.sleep(60) # Wait before retrying
def _format_event_message(self, event: dict) -> str:
"""Format event details into a message"""
message_parts = [
"πŸŽ‰ *New Event Alert!* πŸŽ‰",
f"πŸ† *{event.get('title', 'N/A')}*",
]
if event.get('subtitle'):
message_parts.append(f"_{event.get('subtitle')}_")
location = event.get('location', {}).get('title', 'N/A')
message_parts.append(f"πŸ“ Location: {location}")
price = event.get('startingPrice', 'N/A')
currency = event.get('currencyCode', 'N/A')
message_parts.append(f"πŸ’° Starting Price: {price} {currency}")
schedule = event.get('schedule', {})
if schedule.get('openDateTime'):
message_parts.append(f"πŸ•’ Starts: {schedule['openDateTime']}")
if event.get('ticketingUrlSlug'):
message_parts.append(
f"[More Info/Tickets](https://webook.com/ar/events/{event['ticketingUrlSlug']})"
)
return "\n".join(message_parts)