from datetime import datetime, timedelta from typing import List, Dict, Any, Optional from bson import ObjectId from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from ..db.database import db from ..utils.cache import cache from ..utils.logger import logger, log_health_check, log_maintenance_activity from ..services.calendar import calendar from ..services.maintenance import maintenance_service from ..services.pos_analytics import pos_analytics from ..core.config import settings class SchedulerService: def __init__(self): self.scheduler = AsyncIOScheduler() self._setup_maintenance_jobs() def _setup_maintenance_jobs(self): """Set up scheduled maintenance and health check jobs""" # Health checks every 5 minutes self.scheduler.add_job( self._run_health_checks, trigger=CronTrigger(minute='*/5'), id='health_checks', replace_existing=True ) # System maintenance daily at 2 AM self.scheduler.add_job( self._run_maintenance, trigger=CronTrigger(hour=2), id='daily_maintenance', replace_existing=True ) # POS metrics sync every 15 minutes self.scheduler.add_job( self._sync_pos_metrics, trigger=CronTrigger(minute='*/15'), id='pos_sync', replace_existing=True ) # Resource monitoring every minute self.scheduler.add_job( self._monitor_resources, trigger=CronTrigger(minute='*'), id='resource_monitoring', replace_existing=True ) async def _run_health_checks(self): """Run comprehensive system health checks""" try: # Check database health db_health = await maintenance_service.check_database() log_health_check("database", db_health["status"], db_health) # Check Redis health redis_health = await maintenance_service.check_redis() log_health_check("redis", redis_health["status"], redis_health) # Check POS integration pos_health = await pos_analytics.health_check() log_health_check("pos_integration", pos_health["status"], pos_health) # Check background tasks tasks_health = await maintenance_service.check_background_tasks() log_health_check("background_tasks", tasks_health["status"], tasks_health) except Exception as e: logger.error(f"Health check failed: {str(e)}", exc_info=True) async def _run_maintenance(self): """Run daily system maintenance tasks""" try: # Perform system maintenance maintenance_result = await maintenance_service.perform_maintenance() log_maintenance_activity("system", maintenance_result["status"], maintenance_result) # Clean up expired sessions sessions_cleaned = await maintenance_service.cleanup_expired_sessions() log_maintenance_activity("session_cleanup", "completed", {"cleaned": sessions_cleaned}) # Archive old data archive_result = await maintenance_service.archive_old_data() if archive_result: log_maintenance_activity("data_archiving", "completed", archive_result) # Rotate log files await maintenance_service.rotate_log_files() log_maintenance_activity("log_rotation", "completed") # Manage storage quotas quota_result = await maintenance_service.manage_storage_quotas() log_maintenance_activity("storage_quotas", "completed", quota_result) except Exception as e: logger.error(f"Maintenance tasks failed: {str(e)}", exc_info=True) async def _sync_pos_metrics(self): """Sync POS metrics data""" try: success = await pos_analytics.sync_all_metrics() status = "success" if success else "partial_failure" log_maintenance_activity("pos_sync", status) except Exception as e: logger.error(f"POS metrics sync failed: {str(e)}", exc_info=True) log_maintenance_activity("pos_sync", "failed", {"error": str(e)}) async def _monitor_resources(self): """Monitor system resources""" try: resources = await maintenance_service.monitor_system_resources() if "error" not in resources: log_health_check("resources", "healthy", resources) else: log_health_check("resources", "warning", resources) except Exception as e: logger.error(f"Resource monitoring failed: {str(e)}", exc_info=True) log_health_check("resources", "error", {"error": str(e)}) def start(self): """Start the scheduler""" if not self.scheduler.running: self.scheduler.start() logger.info("Scheduler started - Maintenance and health check tasks initialized") def shutdown(self): """Shutdown the scheduler""" if self.scheduler.running: self.scheduler.shutdown() logger.info("Scheduler shutdown - Maintenance and health check tasks stopped") def get_jobs(self): """Get all scheduled jobs""" return [ { "id": job.id, "name": job.name, "next_run_time": job.next_run_time.isoformat() if job.next_run_time else None, "trigger": str(job.trigger) } for job in self.scheduler.get_jobs() ] async def create_recurring_event( self, user_id: str, title: str, description: str, start_time: datetime, end_time: datetime, recurrence_pattern: str, # daily, weekly, monthly, yearly recurrence_end_date: Optional[datetime] = None, attendees: List[str] = None, reminder_minutes: int = 30 ) -> List[Dict[str, Any]]: """Create recurring events based on pattern""" events = [] current_start = start_time current_end = end_time duration = end_time - start_time while True: if recurrence_end_date and current_start > recurrence_end_date: break # Create individual event instance event = await calendar.create_event( user_id=user_id, title=title, description=description, start_time=current_start, end_time=current_end, attendees=attendees, reminder_minutes=reminder_minutes ) events.append(event) # Calculate next occurrence if recurrence_pattern == "daily": current_start += timedelta(days=1) elif recurrence_pattern == "weekly": current_start += timedelta(weeks=1) elif recurrence_pattern == "monthly": # Add one month (approximately) if current_start.month == 12: current_start = current_start.replace(year=current_start.year + 1, month=1) else: current_start = current_start.replace(month=current_start.month + 1) elif recurrence_pattern == "yearly": current_start = current_start.replace(year=current_start.year + 1) current_end = current_start + duration return events async def update_recurring_event( self, event_id: str, user_id: str, update_data: Dict[str, Any], update_future: bool = True ) -> List[Dict[str, Any]]: """Update a recurring event and optionally its future occurrences""" # Get the original event event = await db.db["events"].find_one({ "_id": ObjectId(event_id), "user_id": user_id }) if not event: return [] # Update the current event await calendar.update_event(event_id, user_id, update_data) updated_events = [event] # Update future occurrences if requested if update_future: future_events = await db.db["events"].find({ "recurrence_group": event.get("recurrence_group"), "start_time": {"$gt": event["start_time"]}, "user_id": user_id }).to_list(None) for future_event in future_events: await calendar.update_event( str(future_event["_id"]), user_id, update_data ) updated_events.append(future_event) return updated_events async def delete_recurring_event( self, event_id: str, user_id: str, delete_future: bool = True ) -> bool: """Delete a recurring event and optionally its future occurrences""" event = await db.db["events"].find_one({ "_id": ObjectId(event_id), "user_id": user_id }) if not event: return False # Delete the current event await calendar.delete_event(event_id, user_id) # Delete future occurrences if requested if delete_future and event.get("recurrence_group"): await db.db["events"].delete_many({ "recurrence_group": event["recurrence_group"], "start_time": {"$gt": event["start_time"]}, "user_id": user_id }) return True async def get_upcoming_recurring_events( self, user_id: str, days: int = 30 ) -> List[Dict[str, Any]]: """Get upcoming recurring events for a user""" start_date = datetime.utcnow() end_date = start_date + timedelta(days=days) events = await calendar.get_user_events( user_id=user_id, start_date=start_date, end_date=end_date, include_attendee_events=True ) return sorted(events, key=lambda x: x["start_time"]) scheduler_service = SchedulerService()