from datetime import datetime, timedelta from typing import List, Dict, Any, Optional from bson import ObjectId from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from ..db.database import db from ..utils.cache import cache from ..utils.logger import logger from ..services.calendar import calendar from ..services.maintenance import maintenance class SchedulerService: def __init__(self): self.scheduler = AsyncIOScheduler() self._setup_maintenance_jobs() def _setup_maintenance_jobs(self): """Setup all maintenance related scheduled jobs""" # Daily database maintenance at 2 AM self.scheduler.add_job( maintenance.perform_database_maintenance, CronTrigger(hour=2), id="daily_db_maintenance", replace_existing=True ) # Session cleanup every 6 hours self.scheduler.add_job( maintenance.cleanup_expired_sessions, CronTrigger(hour="*/6"), id="session_cleanup", replace_existing=True ) # System health check every 15 minutes self.scheduler.add_job( maintenance.monitor_system_resources, CronTrigger(minute="*/15"), id="health_check", replace_existing=True ) # Daily backup at 1 AM self.scheduler.add_job( maintenance.perform_scheduled_backup, CronTrigger(hour=1), id="daily_backup", replace_existing=True ) # Daily log rotation at 3 AM self.scheduler.add_job( maintenance.rotate_log_files, CronTrigger(hour=3), id="log_rotation", replace_existing=True ) # Storage quota check every 2 hours self.scheduler.add_job( maintenance.manage_storage_quotas, CronTrigger(hour="*/2"), id="storage_quota_check", replace_existing=True ) # Monthly data archiving at 4 AM on the 1st of each month self.scheduler.add_job( maintenance.archive_old_data, CronTrigger(day=1, hour=4), id="monthly_archiving", replace_existing=True ) def start(self): """Start the scheduler""" try: self.scheduler.start() logger.info("Scheduler started successfully") except Exception as e: logger.error(f"Failed to start scheduler: {str(e)}") raise def shutdown(self): """Shutdown the scheduler""" try: self.scheduler.shutdown() logger.info("Scheduler shutdown successfully") except Exception as e: logger.error(f"Error during scheduler shutdown: {str(e)}") raise def get_jobs(self): """Get all scheduled jobs""" return [ { "id": job.id, "name": job.name, "next_run_time": job.next_run_time.isoformat() if job.next_run_time else None, "trigger": str(job.trigger) } for job in self.scheduler.get_jobs() ] async def create_recurring_event( self, user_id: str, title: str, description: str, start_time: datetime, end_time: datetime, recurrence_pattern: str, # daily, weekly, monthly, yearly recurrence_end_date: Optional[datetime] = None, attendees: List[str] = None, reminder_minutes: int = 30 ) -> List[Dict[str, Any]]: """Create recurring events based on pattern""" events = [] current_start = start_time current_end = end_time duration = end_time - start_time while True: if recurrence_end_date and current_start > recurrence_end_date: break # Create individual event instance event = await calendar.create_event( user_id=user_id, title=title, description=description, start_time=current_start, end_time=current_end, attendees=attendees, reminder_minutes=reminder_minutes ) events.append(event) # Calculate next occurrence if recurrence_pattern == "daily": current_start += timedelta(days=1) elif recurrence_pattern == "weekly": current_start += timedelta(weeks=1) elif recurrence_pattern == "monthly": # Add one month (approximately) if current_start.month == 12: current_start = current_start.replace(year=current_start.year + 1, month=1) else: current_start = current_start.replace(month=current_start.month + 1) elif recurrence_pattern == "yearly": current_start = current_start.replace(year=current_start.year + 1) current_end = current_start + duration return events async def update_recurring_event( self, event_id: str, user_id: str, update_data: Dict[str, Any], update_future: bool = True ) -> List[Dict[str, Any]]: """Update a recurring event and optionally its future occurrences""" # Get the original event event = await db.db["events"].find_one({ "_id": ObjectId(event_id), "user_id": user_id }) if not event: return [] # Update the current event await calendar.update_event(event_id, user_id, update_data) updated_events = [event] # Update future occurrences if requested if update_future: future_events = await db.db["events"].find({ "recurrence_group": event.get("recurrence_group"), "start_time": {"$gt": event["start_time"]}, "user_id": user_id }).to_list(None) for future_event in future_events: await calendar.update_event( str(future_event["_id"]), user_id, update_data ) updated_events.append(future_event) return updated_events async def delete_recurring_event( self, event_id: str, user_id: str, delete_future: bool = True ) -> bool: """Delete a recurring event and optionally its future occurrences""" event = await db.db["events"].find_one({ "_id": ObjectId(event_id), "user_id": user_id }) if not event: return False # Delete the current event await calendar.delete_event(event_id, user_id) # Delete future occurrences if requested if delete_future and event.get("recurrence_group"): await db.db["events"].delete_many({ "recurrence_group": event["recurrence_group"], "start_time": {"$gt": event["start_time"]}, "user_id": user_id }) return True async def get_upcoming_recurring_events( self, user_id: str, days: int = 30 ) -> List[Dict[str, Any]]: """Get upcoming recurring events for a user""" start_date = datetime.utcnow() end_date = start_date + timedelta(days=days) events = await calendar.get_user_events( user_id=user_id, start_date=start_date, end_date=end_date, include_attendee_events=True ) return sorted(events, key=lambda x: x["start_time"]) scheduler = SchedulerService()