Desk-Back2 / app /services /scheduler.py
Fred808's picture
Upload 32 files
8dafdf7 verified
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from bson import ObjectId
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from ..db.database import db
from ..utils.cache import cache
from ..utils.logger import logger
from ..services.calendar import calendar
from ..services.maintenance import maintenance
class SchedulerService:
def __init__(self):
self.scheduler = AsyncIOScheduler()
self._setup_maintenance_jobs()
def _setup_maintenance_jobs(self):
"""Setup all maintenance related scheduled jobs"""
# Daily database maintenance at 2 AM
self.scheduler.add_job(
maintenance.perform_database_maintenance,
CronTrigger(hour=2),
id="daily_db_maintenance",
replace_existing=True
)
# Session cleanup every 6 hours
self.scheduler.add_job(
maintenance.cleanup_expired_sessions,
CronTrigger(hour="*/6"),
id="session_cleanup",
replace_existing=True
)
# System health check every 15 minutes
self.scheduler.add_job(
maintenance.monitor_system_resources,
CronTrigger(minute="*/15"),
id="health_check",
replace_existing=True
)
# Daily backup at 1 AM
self.scheduler.add_job(
maintenance.perform_scheduled_backup,
CronTrigger(hour=1),
id="daily_backup",
replace_existing=True
)
# Daily log rotation at 3 AM
self.scheduler.add_job(
maintenance.rotate_log_files,
CronTrigger(hour=3),
id="log_rotation",
replace_existing=True
)
# Storage quota check every 2 hours
self.scheduler.add_job(
maintenance.manage_storage_quotas,
CronTrigger(hour="*/2"),
id="storage_quota_check",
replace_existing=True
)
# Monthly data archiving at 4 AM on the 1st of each month
self.scheduler.add_job(
maintenance.archive_old_data,
CronTrigger(day=1, hour=4),
id="monthly_archiving",
replace_existing=True
)
def start(self):
"""Start the scheduler"""
try:
self.scheduler.start()
logger.info("Scheduler started successfully")
except Exception as e:
logger.error(f"Failed to start scheduler: {str(e)}")
raise
def shutdown(self):
"""Shutdown the scheduler"""
try:
self.scheduler.shutdown()
logger.info("Scheduler shutdown successfully")
except Exception as e:
logger.error(f"Error during scheduler shutdown: {str(e)}")
raise
def get_jobs(self):
"""Get all scheduled jobs"""
return [
{
"id": job.id,
"name": job.name,
"next_run_time": job.next_run_time.isoformat() if job.next_run_time else None,
"trigger": str(job.trigger)
}
for job in self.scheduler.get_jobs()
]
async def create_recurring_event(
self,
user_id: str,
title: str,
description: str,
start_time: datetime,
end_time: datetime,
recurrence_pattern: str, # daily, weekly, monthly, yearly
recurrence_end_date: Optional[datetime] = None,
attendees: List[str] = None,
reminder_minutes: int = 30
) -> List[Dict[str, Any]]:
"""Create recurring events based on pattern"""
events = []
current_start = start_time
current_end = end_time
duration = end_time - start_time
while True:
if recurrence_end_date and current_start > recurrence_end_date:
break
# Create individual event instance
event = await calendar.create_event(
user_id=user_id,
title=title,
description=description,
start_time=current_start,
end_time=current_end,
attendees=attendees,
reminder_minutes=reminder_minutes
)
events.append(event)
# Calculate next occurrence
if recurrence_pattern == "daily":
current_start += timedelta(days=1)
elif recurrence_pattern == "weekly":
current_start += timedelta(weeks=1)
elif recurrence_pattern == "monthly":
# Add one month (approximately)
if current_start.month == 12:
current_start = current_start.replace(year=current_start.year + 1, month=1)
else:
current_start = current_start.replace(month=current_start.month + 1)
elif recurrence_pattern == "yearly":
current_start = current_start.replace(year=current_start.year + 1)
current_end = current_start + duration
return events
async def update_recurring_event(
self,
event_id: str,
user_id: str,
update_data: Dict[str, Any],
update_future: bool = True
) -> List[Dict[str, Any]]:
"""Update a recurring event and optionally its future occurrences"""
# Get the original event
event = await db.db["events"].find_one({
"_id": ObjectId(event_id),
"user_id": user_id
})
if not event:
return []
# Update the current event
await calendar.update_event(event_id, user_id, update_data)
updated_events = [event]
# Update future occurrences if requested
if update_future:
future_events = await db.db["events"].find({
"recurrence_group": event.get("recurrence_group"),
"start_time": {"$gt": event["start_time"]},
"user_id": user_id
}).to_list(None)
for future_event in future_events:
await calendar.update_event(
str(future_event["_id"]),
user_id,
update_data
)
updated_events.append(future_event)
return updated_events
async def delete_recurring_event(
self,
event_id: str,
user_id: str,
delete_future: bool = True
) -> bool:
"""Delete a recurring event and optionally its future occurrences"""
event = await db.db["events"].find_one({
"_id": ObjectId(event_id),
"user_id": user_id
})
if not event:
return False
# Delete the current event
await calendar.delete_event(event_id, user_id)
# Delete future occurrences if requested
if delete_future and event.get("recurrence_group"):
await db.db["events"].delete_many({
"recurrence_group": event["recurrence_group"],
"start_time": {"$gt": event["start_time"]},
"user_id": user_id
})
return True
async def get_upcoming_recurring_events(
self,
user_id: str,
days: int = 30
) -> List[Dict[str, Any]]:
"""Get upcoming recurring events for a user"""
start_date = datetime.utcnow()
end_date = start_date + timedelta(days=days)
events = await calendar.get_user_events(
user_id=user_id,
start_date=start_date,
end_date=end_date,
include_attendee_events=True
)
return sorted(events, key=lambda x: x["start_time"])
scheduler = SchedulerService()