Admin-Desk2 / app /services /scheduler.py
Fred808's picture
Upload 82 files
20adca1 verified
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from bson import ObjectId
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from ..db.database import db
from ..utils.cache import cache
from ..utils.logger import logger, log_health_check, log_maintenance_activity
from ..services.calendar import calendar
from ..services.maintenance import maintenance_service
from ..services.pos_analytics import pos_analytics
from ..core.config import settings
class SchedulerService:
def __init__(self):
self.scheduler = AsyncIOScheduler()
self._setup_maintenance_jobs()
def _setup_maintenance_jobs(self):
"""Set up scheduled maintenance and health check jobs"""
# Health checks every 5 minutes
self.scheduler.add_job(
self._run_health_checks,
trigger=CronTrigger(minute='*/5'),
id='health_checks',
replace_existing=True
)
# System maintenance daily at 2 AM
self.scheduler.add_job(
self._run_maintenance,
trigger=CronTrigger(hour=2),
id='daily_maintenance',
replace_existing=True
)
# POS metrics sync every 15 minutes
self.scheduler.add_job(
self._sync_pos_metrics,
trigger=CronTrigger(minute='*/15'),
id='pos_sync',
replace_existing=True
)
# Resource monitoring every minute
self.scheduler.add_job(
self._monitor_resources,
trigger=CronTrigger(minute='*'),
id='resource_monitoring',
replace_existing=True
)
async def _run_health_checks(self):
"""Run comprehensive system health checks"""
try:
# Check database health
db_health = await maintenance_service.check_database()
log_health_check("database", db_health["status"], db_health)
# Check Redis health
redis_health = await maintenance_service.check_redis()
log_health_check("redis", redis_health["status"], redis_health)
# Check POS integration
pos_health = await pos_analytics.health_check()
log_health_check("pos_integration", pos_health["status"], pos_health)
# Check background tasks
tasks_health = await maintenance_service.check_background_tasks()
log_health_check("background_tasks", tasks_health["status"], tasks_health)
except Exception as e:
logger.error(f"Health check failed: {str(e)}", exc_info=True)
async def _run_maintenance(self):
"""Run daily system maintenance tasks"""
try:
# Perform system maintenance
maintenance_result = await maintenance_service.perform_maintenance()
log_maintenance_activity("system", maintenance_result["status"], maintenance_result)
# Clean up expired sessions
sessions_cleaned = await maintenance_service.cleanup_expired_sessions()
log_maintenance_activity("session_cleanup", "completed", {"cleaned": sessions_cleaned})
# Archive old data
archive_result = await maintenance_service.archive_old_data()
if archive_result:
log_maintenance_activity("data_archiving", "completed", archive_result)
# Rotate log files
await maintenance_service.rotate_log_files()
log_maintenance_activity("log_rotation", "completed")
# Manage storage quotas
quota_result = await maintenance_service.manage_storage_quotas()
log_maintenance_activity("storage_quotas", "completed", quota_result)
except Exception as e:
logger.error(f"Maintenance tasks failed: {str(e)}", exc_info=True)
async def _sync_pos_metrics(self):
"""Sync POS metrics data"""
try:
success = await pos_analytics.sync_all_metrics()
status = "success" if success else "partial_failure"
log_maintenance_activity("pos_sync", status)
except Exception as e:
logger.error(f"POS metrics sync failed: {str(e)}", exc_info=True)
log_maintenance_activity("pos_sync", "failed", {"error": str(e)})
async def _monitor_resources(self):
"""Monitor system resources"""
try:
resources = await maintenance_service.monitor_system_resources()
if "error" not in resources:
log_health_check("resources", "healthy", resources)
else:
log_health_check("resources", "warning", resources)
except Exception as e:
logger.error(f"Resource monitoring failed: {str(e)}", exc_info=True)
log_health_check("resources", "error", {"error": str(e)})
def start(self):
"""Start the scheduler"""
if not self.scheduler.running:
self.scheduler.start()
logger.info("Scheduler started - Maintenance and health check tasks initialized")
def shutdown(self):
"""Shutdown the scheduler"""
if self.scheduler.running:
self.scheduler.shutdown()
logger.info("Scheduler shutdown - Maintenance and health check tasks stopped")
def get_jobs(self):
"""Get all scheduled jobs"""
return [
{
"id": job.id,
"name": job.name,
"next_run_time": job.next_run_time.isoformat() if job.next_run_time else None,
"trigger": str(job.trigger)
}
for job in self.scheduler.get_jobs()
]
async def create_recurring_event(
self,
user_id: str,
title: str,
description: str,
start_time: datetime,
end_time: datetime,
recurrence_pattern: str, # daily, weekly, monthly, yearly
recurrence_end_date: Optional[datetime] = None,
attendees: List[str] = None,
reminder_minutes: int = 30
) -> List[Dict[str, Any]]:
"""Create recurring events based on pattern"""
events = []
current_start = start_time
current_end = end_time
duration = end_time - start_time
while True:
if recurrence_end_date and current_start > recurrence_end_date:
break
# Create individual event instance
event = await calendar.create_event(
user_id=user_id,
title=title,
description=description,
start_time=current_start,
end_time=current_end,
attendees=attendees,
reminder_minutes=reminder_minutes
)
events.append(event)
# Calculate next occurrence
if recurrence_pattern == "daily":
current_start += timedelta(days=1)
elif recurrence_pattern == "weekly":
current_start += timedelta(weeks=1)
elif recurrence_pattern == "monthly":
# Add one month (approximately)
if current_start.month == 12:
current_start = current_start.replace(year=current_start.year + 1, month=1)
else:
current_start = current_start.replace(month=current_start.month + 1)
elif recurrence_pattern == "yearly":
current_start = current_start.replace(year=current_start.year + 1)
current_end = current_start + duration
return events
async def update_recurring_event(
self,
event_id: str,
user_id: str,
update_data: Dict[str, Any],
update_future: bool = True
) -> List[Dict[str, Any]]:
"""Update a recurring event and optionally its future occurrences"""
# Get the original event
event = await db.db["events"].find_one({
"_id": ObjectId(event_id),
"user_id": user_id
})
if not event:
return []
# Update the current event
await calendar.update_event(event_id, user_id, update_data)
updated_events = [event]
# Update future occurrences if requested
if update_future:
future_events = await db.db["events"].find({
"recurrence_group": event.get("recurrence_group"),
"start_time": {"$gt": event["start_time"]},
"user_id": user_id
}).to_list(None)
for future_event in future_events:
await calendar.update_event(
str(future_event["_id"]),
user_id,
update_data
)
updated_events.append(future_event)
return updated_events
async def delete_recurring_event(
self,
event_id: str,
user_id: str,
delete_future: bool = True
) -> bool:
"""Delete a recurring event and optionally its future occurrences"""
event = await db.db["events"].find_one({
"_id": ObjectId(event_id),
"user_id": user_id
})
if not event:
return False
# Delete the current event
await calendar.delete_event(event_id, user_id)
# Delete future occurrences if requested
if delete_future and event.get("recurrence_group"):
await db.db["events"].delete_many({
"recurrence_group": event["recurrence_group"],
"start_time": {"$gt": event["start_time"]},
"user_id": user_id
})
return True
async def get_upcoming_recurring_events(
self,
user_id: str,
days: int = 30
) -> List[Dict[str, Any]]:
"""Get upcoming recurring events for a user"""
start_date = datetime.utcnow()
end_date = start_date + timedelta(days=days)
events = await calendar.get_user_events(
user_id=user_id,
start_date=start_date,
end_date=end_date,
include_attendee_events=True
)
return sorted(events, key=lambda x: x["start_time"])
scheduler_service = SchedulerService()