Spaces:
Paused
Paused
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Any, Optional | |
| from bson import ObjectId | |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler | |
| from apscheduler.triggers.cron import CronTrigger | |
| from ..db.database import db | |
| from ..utils.cache import cache | |
| from ..utils.logger import logger, log_health_check, log_maintenance_activity | |
| from ..services.calendar import calendar | |
| from ..services.maintenance import maintenance_service | |
| from ..services.pos_analytics import pos_analytics | |
| from ..core.config import settings | |
| class SchedulerService: | |
| def __init__(self): | |
| self.scheduler = AsyncIOScheduler() | |
| self._setup_maintenance_jobs() | |
| def _setup_maintenance_jobs(self): | |
| """Set up scheduled maintenance and health check jobs""" | |
| # Health checks every 5 minutes | |
| self.scheduler.add_job( | |
| self._run_health_checks, | |
| trigger=CronTrigger(minute='*/5'), | |
| id='health_checks', | |
| replace_existing=True | |
| ) | |
| # System maintenance daily at 2 AM | |
| self.scheduler.add_job( | |
| self._run_maintenance, | |
| trigger=CronTrigger(hour=2), | |
| id='daily_maintenance', | |
| replace_existing=True | |
| ) | |
| # POS metrics sync every 15 minutes | |
| self.scheduler.add_job( | |
| self._sync_pos_metrics, | |
| trigger=CronTrigger(minute='*/15'), | |
| id='pos_sync', | |
| replace_existing=True | |
| ) | |
| # Resource monitoring every minute | |
| self.scheduler.add_job( | |
| self._monitor_resources, | |
| trigger=CronTrigger(minute='*'), | |
| id='resource_monitoring', | |
| replace_existing=True | |
| ) | |
| async def _run_health_checks(self): | |
| """Run comprehensive system health checks""" | |
| try: | |
| # Check database health | |
| db_health = await maintenance_service.check_database() | |
| log_health_check("database", db_health["status"], db_health) | |
| # Check Redis health | |
| redis_health = await maintenance_service.check_redis() | |
| log_health_check("redis", redis_health["status"], redis_health) | |
| # Check POS integration | |
| pos_health = await pos_analytics.health_check() | |
| log_health_check("pos_integration", pos_health["status"], pos_health) | |
| # Check background tasks | |
| tasks_health = await maintenance_service.check_background_tasks() | |
| log_health_check("background_tasks", tasks_health["status"], tasks_health) | |
| except Exception as e: | |
| logger.error(f"Health check failed: {str(e)}", exc_info=True) | |
| async def _run_maintenance(self): | |
| """Run daily system maintenance tasks""" | |
| try: | |
| # Perform system maintenance | |
| maintenance_result = await maintenance_service.perform_maintenance() | |
| log_maintenance_activity("system", maintenance_result["status"], maintenance_result) | |
| # Clean up expired sessions | |
| sessions_cleaned = await maintenance_service.cleanup_expired_sessions() | |
| log_maintenance_activity("session_cleanup", "completed", {"cleaned": sessions_cleaned}) | |
| # Archive old data | |
| archive_result = await maintenance_service.archive_old_data() | |
| if archive_result: | |
| log_maintenance_activity("data_archiving", "completed", archive_result) | |
| # Rotate log files | |
| await maintenance_service.rotate_log_files() | |
| log_maintenance_activity("log_rotation", "completed") | |
| # Manage storage quotas | |
| quota_result = await maintenance_service.manage_storage_quotas() | |
| log_maintenance_activity("storage_quotas", "completed", quota_result) | |
| except Exception as e: | |
| logger.error(f"Maintenance tasks failed: {str(e)}", exc_info=True) | |
| async def _sync_pos_metrics(self): | |
| """Sync POS metrics data""" | |
| try: | |
| success = await pos_analytics.sync_all_metrics() | |
| status = "success" if success else "partial_failure" | |
| log_maintenance_activity("pos_sync", status) | |
| except Exception as e: | |
| logger.error(f"POS metrics sync failed: {str(e)}", exc_info=True) | |
| log_maintenance_activity("pos_sync", "failed", {"error": str(e)}) | |
| async def _monitor_resources(self): | |
| """Monitor system resources""" | |
| try: | |
| resources = await maintenance_service.monitor_system_resources() | |
| if "error" not in resources: | |
| log_health_check("resources", "healthy", resources) | |
| else: | |
| log_health_check("resources", "warning", resources) | |
| except Exception as e: | |
| logger.error(f"Resource monitoring failed: {str(e)}", exc_info=True) | |
| log_health_check("resources", "error", {"error": str(e)}) | |
| def start(self): | |
| """Start the scheduler""" | |
| if not self.scheduler.running: | |
| self.scheduler.start() | |
| logger.info("Scheduler started - Maintenance and health check tasks initialized") | |
| def shutdown(self): | |
| """Shutdown the scheduler""" | |
| if self.scheduler.running: | |
| self.scheduler.shutdown() | |
| logger.info("Scheduler shutdown - Maintenance and health check tasks stopped") | |
| def get_jobs(self): | |
| """Get all scheduled jobs""" | |
| return [ | |
| { | |
| "id": job.id, | |
| "name": job.name, | |
| "next_run_time": job.next_run_time.isoformat() if job.next_run_time else None, | |
| "trigger": str(job.trigger) | |
| } | |
| for job in self.scheduler.get_jobs() | |
| ] | |
| async def create_recurring_event( | |
| self, | |
| user_id: str, | |
| title: str, | |
| description: str, | |
| start_time: datetime, | |
| end_time: datetime, | |
| recurrence_pattern: str, # daily, weekly, monthly, yearly | |
| recurrence_end_date: Optional[datetime] = None, | |
| attendees: List[str] = None, | |
| reminder_minutes: int = 30 | |
| ) -> List[Dict[str, Any]]: | |
| """Create recurring events based on pattern""" | |
| events = [] | |
| current_start = start_time | |
| current_end = end_time | |
| duration = end_time - start_time | |
| while True: | |
| if recurrence_end_date and current_start > recurrence_end_date: | |
| break | |
| # Create individual event instance | |
| event = await calendar.create_event( | |
| user_id=user_id, | |
| title=title, | |
| description=description, | |
| start_time=current_start, | |
| end_time=current_end, | |
| attendees=attendees, | |
| reminder_minutes=reminder_minutes | |
| ) | |
| events.append(event) | |
| # Calculate next occurrence | |
| if recurrence_pattern == "daily": | |
| current_start += timedelta(days=1) | |
| elif recurrence_pattern == "weekly": | |
| current_start += timedelta(weeks=1) | |
| elif recurrence_pattern == "monthly": | |
| # Add one month (approximately) | |
| if current_start.month == 12: | |
| current_start = current_start.replace(year=current_start.year + 1, month=1) | |
| else: | |
| current_start = current_start.replace(month=current_start.month + 1) | |
| elif recurrence_pattern == "yearly": | |
| current_start = current_start.replace(year=current_start.year + 1) | |
| current_end = current_start + duration | |
| return events | |
| async def update_recurring_event( | |
| self, | |
| event_id: str, | |
| user_id: str, | |
| update_data: Dict[str, Any], | |
| update_future: bool = True | |
| ) -> List[Dict[str, Any]]: | |
| """Update a recurring event and optionally its future occurrences""" | |
| # Get the original event | |
| event = await db.db["events"].find_one({ | |
| "_id": ObjectId(event_id), | |
| "user_id": user_id | |
| }) | |
| if not event: | |
| return [] | |
| # Update the current event | |
| await calendar.update_event(event_id, user_id, update_data) | |
| updated_events = [event] | |
| # Update future occurrences if requested | |
| if update_future: | |
| future_events = await db.db["events"].find({ | |
| "recurrence_group": event.get("recurrence_group"), | |
| "start_time": {"$gt": event["start_time"]}, | |
| "user_id": user_id | |
| }).to_list(None) | |
| for future_event in future_events: | |
| await calendar.update_event( | |
| str(future_event["_id"]), | |
| user_id, | |
| update_data | |
| ) | |
| updated_events.append(future_event) | |
| return updated_events | |
| async def delete_recurring_event( | |
| self, | |
| event_id: str, | |
| user_id: str, | |
| delete_future: bool = True | |
| ) -> bool: | |
| """Delete a recurring event and optionally its future occurrences""" | |
| event = await db.db["events"].find_one({ | |
| "_id": ObjectId(event_id), | |
| "user_id": user_id | |
| }) | |
| if not event: | |
| return False | |
| # Delete the current event | |
| await calendar.delete_event(event_id, user_id) | |
| # Delete future occurrences if requested | |
| if delete_future and event.get("recurrence_group"): | |
| await db.db["events"].delete_many({ | |
| "recurrence_group": event["recurrence_group"], | |
| "start_time": {"$gt": event["start_time"]}, | |
| "user_id": user_id | |
| }) | |
| return True | |
| async def get_upcoming_recurring_events( | |
| self, | |
| user_id: str, | |
| days: int = 30 | |
| ) -> List[Dict[str, Any]]: | |
| """Get upcoming recurring events for a user""" | |
| start_date = datetime.utcnow() | |
| end_date = start_date + timedelta(days=days) | |
| events = await calendar.get_user_events( | |
| user_id=user_id, | |
| start_date=start_date, | |
| end_date=end_date, | |
| include_attendee_events=True | |
| ) | |
| return sorted(events, key=lambda x: x["start_time"]) | |
| scheduler_service = SchedulerService() |