Spaces:
Paused
Paused
File size: 8,075 Bytes
8dafdf7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 | from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from bson import ObjectId
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from ..db.database import db
from ..utils.cache import cache
from ..utils.logger import logger
from ..services.calendar import calendar
from ..services.maintenance import maintenance
class SchedulerService:
def __init__(self):
self.scheduler = AsyncIOScheduler()
self._setup_maintenance_jobs()
def _setup_maintenance_jobs(self):
"""Setup all maintenance related scheduled jobs"""
# Daily database maintenance at 2 AM
self.scheduler.add_job(
maintenance.perform_database_maintenance,
CronTrigger(hour=2),
id="daily_db_maintenance",
replace_existing=True
)
# Session cleanup every 6 hours
self.scheduler.add_job(
maintenance.cleanup_expired_sessions,
CronTrigger(hour="*/6"),
id="session_cleanup",
replace_existing=True
)
# System health check every 15 minutes
self.scheduler.add_job(
maintenance.monitor_system_resources,
CronTrigger(minute="*/15"),
id="health_check",
replace_existing=True
)
# Daily backup at 1 AM
self.scheduler.add_job(
maintenance.perform_scheduled_backup,
CronTrigger(hour=1),
id="daily_backup",
replace_existing=True
)
# Daily log rotation at 3 AM
self.scheduler.add_job(
maintenance.rotate_log_files,
CronTrigger(hour=3),
id="log_rotation",
replace_existing=True
)
# Storage quota check every 2 hours
self.scheduler.add_job(
maintenance.manage_storage_quotas,
CronTrigger(hour="*/2"),
id="storage_quota_check",
replace_existing=True
)
# Monthly data archiving at 4 AM on the 1st of each month
self.scheduler.add_job(
maintenance.archive_old_data,
CronTrigger(day=1, hour=4),
id="monthly_archiving",
replace_existing=True
)
def start(self):
"""Start the scheduler"""
try:
self.scheduler.start()
logger.info("Scheduler started successfully")
except Exception as e:
logger.error(f"Failed to start scheduler: {str(e)}")
raise
def shutdown(self):
"""Shutdown the scheduler"""
try:
self.scheduler.shutdown()
logger.info("Scheduler shutdown successfully")
except Exception as e:
logger.error(f"Error during scheduler shutdown: {str(e)}")
raise
def get_jobs(self):
"""Get all scheduled jobs"""
return [
{
"id": job.id,
"name": job.name,
"next_run_time": job.next_run_time.isoformat() if job.next_run_time else None,
"trigger": str(job.trigger)
}
for job in self.scheduler.get_jobs()
]
async def create_recurring_event(
self,
user_id: str,
title: str,
description: str,
start_time: datetime,
end_time: datetime,
recurrence_pattern: str, # daily, weekly, monthly, yearly
recurrence_end_date: Optional[datetime] = None,
attendees: List[str] = None,
reminder_minutes: int = 30
) -> List[Dict[str, Any]]:
"""Create recurring events based on pattern"""
events = []
current_start = start_time
current_end = end_time
duration = end_time - start_time
while True:
if recurrence_end_date and current_start > recurrence_end_date:
break
# Create individual event instance
event = await calendar.create_event(
user_id=user_id,
title=title,
description=description,
start_time=current_start,
end_time=current_end,
attendees=attendees,
reminder_minutes=reminder_minutes
)
events.append(event)
# Calculate next occurrence
if recurrence_pattern == "daily":
current_start += timedelta(days=1)
elif recurrence_pattern == "weekly":
current_start += timedelta(weeks=1)
elif recurrence_pattern == "monthly":
# Add one month (approximately)
if current_start.month == 12:
current_start = current_start.replace(year=current_start.year + 1, month=1)
else:
current_start = current_start.replace(month=current_start.month + 1)
elif recurrence_pattern == "yearly":
current_start = current_start.replace(year=current_start.year + 1)
current_end = current_start + duration
return events
async def update_recurring_event(
self,
event_id: str,
user_id: str,
update_data: Dict[str, Any],
update_future: bool = True
) -> List[Dict[str, Any]]:
"""Update a recurring event and optionally its future occurrences"""
# Get the original event
event = await db.db["events"].find_one({
"_id": ObjectId(event_id),
"user_id": user_id
})
if not event:
return []
# Update the current event
await calendar.update_event(event_id, user_id, update_data)
updated_events = [event]
# Update future occurrences if requested
if update_future:
future_events = await db.db["events"].find({
"recurrence_group": event.get("recurrence_group"),
"start_time": {"$gt": event["start_time"]},
"user_id": user_id
}).to_list(None)
for future_event in future_events:
await calendar.update_event(
str(future_event["_id"]),
user_id,
update_data
)
updated_events.append(future_event)
return updated_events
async def delete_recurring_event(
self,
event_id: str,
user_id: str,
delete_future: bool = True
) -> bool:
"""Delete a recurring event and optionally its future occurrences"""
event = await db.db["events"].find_one({
"_id": ObjectId(event_id),
"user_id": user_id
})
if not event:
return False
# Delete the current event
await calendar.delete_event(event_id, user_id)
# Delete future occurrences if requested
if delete_future and event.get("recurrence_group"):
await db.db["events"].delete_many({
"recurrence_group": event["recurrence_group"],
"start_time": {"$gt": event["start_time"]},
"user_id": user_id
})
return True
async def get_upcoming_recurring_events(
self,
user_id: str,
days: int = 30
) -> List[Dict[str, Any]]:
"""Get upcoming recurring events for a user"""
start_date = datetime.utcnow()
end_date = start_date + timedelta(days=days)
events = await calendar.get_user_events(
user_id=user_id,
start_date=start_date,
end_date=end_date,
include_attendee_events=True
)
return sorted(events, key=lambda x: x["start_time"])
scheduler = SchedulerService() |