Spaces:
Paused
Paused
File size: 6,072 Bytes
f674d57 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
from datetime import datetime, timedelta
from ..db.database import db
from ..services.maintenance import maintenance
from ..utils.logger import logger
from ..services.websocket import create_and_broadcast_notification
import asyncio
from sqlalchemy import select, delete
from ..db.models import Event, User, Notification
async def check_event_reminders():
"""Check and send event reminders"""
try:
now = datetime.utcnow()
# Find events happening soon that haven't sent reminders
stmt = select(Event).where(
Event.start_time > now,
Event.start_time <= now + timedelta(minutes=30),
Event.reminder_sent == False
)
async with db.session() as session:
result = await session.execute(stmt)
upcoming_events = result.scalars().all()
for event in upcoming_events:
# Get event owner
user_stmt = select(User).where(User.id == event.user_id)
user_result = await session.execute(user_stmt)
user = user_result.scalar_one_or_none()
if user:
# Send reminder to event owner and attendees
reminder_users = [str(user.id)] + event.attendees
for user_id in reminder_users:
await create_and_broadcast_notification(
user_id=user_id,
title=f"Event Reminder: {event.title}",
message=f"Your event '{event.title}' starts in {event.reminder_minutes} minutes",
notification_type="event_reminder",
data={"event_id": str(event.id)}
)
# Mark reminder as sent
event.reminder_sent = True
# Commit all changes at once
await session.commit()
except Exception as e:
logger.error(f"Error in event reminder check: {str(e)}")
async def cleanup_old_notifications():
"""Clean up old notifications"""
try:
cutoff = datetime.utcnow() - timedelta(days=30)
async with db.session() as session:
# Delete notifications older than 30 days that have been read
stmt = delete(Notification).where(
Notification.created_at < cutoff,
Notification.read == True
)
result = await session.execute(stmt)
await session.commit()
# Get number of deleted rows
deleted_count = result.rowcount
logger.info(f"Cleaned up {deleted_count} old notifications")
except Exception as e:
logger.error(f"Error in cleanup_old_notifications: {str(e)}")
async def perform_daily_maintenance():
"""Perform daily system maintenance tasks"""
try:
# Clean up expired sessions
deleted_sessions = await maintenance.cleanup_expired_sessions()
logger.info(f"Cleaned up {deleted_sessions} expired sessions")
# Archive old data
archived = await maintenance.archive_old_data()
if archived:
logger.info(f"Archived data: {archived}")
# Check system health
health_data = await maintenance.check_system_health()
if "error" not in health_data:
logger.info("System health check completed successfully")
else:
logger.error(f"System health check error: {health_data['error']}")
# Monitor system resources
resources = await maintenance.monitor_system_resources()
if "error" not in resources:
logger.info("System resource monitoring completed successfully")
else:
logger.error(f"Resource monitoring error: {resources['error']}")
except Exception as e:
logger.error(f"Error in daily maintenance: {str(e)}")
async def perform_weekly_maintenance():
"""Perform weekly system maintenance tasks"""
try:
# Perform database maintenance
await maintenance.perform_database_maintenance()
logger.info("Database maintenance completed successfully")
# Rotate log files
await maintenance.rotate_log_files()
logger.info("Log rotation completed successfully")
# Manage storage quotas
quota_results = await maintenance.manage_storage_quotas()
if quota_results.get("warnings"):
for warning in quota_results["warnings"]:
logger.warning(warning)
logger.info(f"Storage cleanup: removed {quota_results.get('cleaned', 0)} temporary files")
except Exception as e:
logger.error(f"Error in weekly maintenance: {str(e)}")
async def run_periodic_tasks():
"""Run periodic maintenance tasks"""
daily_maintenance_run = False
weekly_maintenance_run = False
while True:
try:
now = datetime.utcnow()
# Check event reminders every minute
await check_event_reminders()
# Clean up old notifications daily
await cleanup_old_notifications()
# Run daily maintenance at 2 AM
if now.hour == 2 and not daily_maintenance_run:
await perform_daily_maintenance()
daily_maintenance_run = True
elif now.hour != 2:
daily_maintenance_run = False
# Run weekly maintenance on Sunday at 3 AM
if now.weekday() == 6 and now.hour == 3 and not weekly_maintenance_run:
await perform_weekly_maintenance()
weekly_maintenance_run = True
elif now.weekday() != 6 or now.hour != 3:
weekly_maintenance_run = False
# Wait before next check
await asyncio.sleep(60) # 1 minute
except Exception as e:
logger.error(f"Error in periodic tasks: {str(e)}")
await asyncio.sleep(60) # Wait before retrying
|