Admin-Desk / app /utils /tasks.py
Fred808's picture
Update app/utils/tasks.py
f674d57 verified
from datetime import datetime, timedelta
from ..db.database import db
from ..services.maintenance import maintenance
from ..utils.logger import logger
from ..services.websocket import create_and_broadcast_notification
import asyncio
from sqlalchemy import select, delete
from ..db.models import Event, User, Notification
async def check_event_reminders():
"""Check and send event reminders"""
try:
now = datetime.utcnow()
# Find events happening soon that haven't sent reminders
stmt = select(Event).where(
Event.start_time > now,
Event.start_time <= now + timedelta(minutes=30),
Event.reminder_sent == False
)
async with db.session() as session:
result = await session.execute(stmt)
upcoming_events = result.scalars().all()
for event in upcoming_events:
# Get event owner
user_stmt = select(User).where(User.id == event.user_id)
user_result = await session.execute(user_stmt)
user = user_result.scalar_one_or_none()
if user:
# Send reminder to event owner and attendees
reminder_users = [str(user.id)] + event.attendees
for user_id in reminder_users:
await create_and_broadcast_notification(
user_id=user_id,
title=f"Event Reminder: {event.title}",
message=f"Your event '{event.title}' starts in {event.reminder_minutes} minutes",
notification_type="event_reminder",
data={"event_id": str(event.id)}
)
# Mark reminder as sent
event.reminder_sent = True
# Commit all changes at once
await session.commit()
except Exception as e:
logger.error(f"Error in event reminder check: {str(e)}")
async def cleanup_old_notifications():
"""Clean up old notifications"""
try:
cutoff = datetime.utcnow() - timedelta(days=30)
async with db.session() as session:
# Delete notifications older than 30 days that have been read
stmt = delete(Notification).where(
Notification.created_at < cutoff,
Notification.read == True
)
result = await session.execute(stmt)
await session.commit()
# Get number of deleted rows
deleted_count = result.rowcount
logger.info(f"Cleaned up {deleted_count} old notifications")
except Exception as e:
logger.error(f"Error in cleanup_old_notifications: {str(e)}")
async def perform_daily_maintenance():
"""Perform daily system maintenance tasks"""
try:
# Clean up expired sessions
deleted_sessions = await maintenance.cleanup_expired_sessions()
logger.info(f"Cleaned up {deleted_sessions} expired sessions")
# Archive old data
archived = await maintenance.archive_old_data()
if archived:
logger.info(f"Archived data: {archived}")
# Check system health
health_data = await maintenance.check_system_health()
if "error" not in health_data:
logger.info("System health check completed successfully")
else:
logger.error(f"System health check error: {health_data['error']}")
# Monitor system resources
resources = await maintenance.monitor_system_resources()
if "error" not in resources:
logger.info("System resource monitoring completed successfully")
else:
logger.error(f"Resource monitoring error: {resources['error']}")
except Exception as e:
logger.error(f"Error in daily maintenance: {str(e)}")
async def perform_weekly_maintenance():
"""Perform weekly system maintenance tasks"""
try:
# Perform database maintenance
await maintenance.perform_database_maintenance()
logger.info("Database maintenance completed successfully")
# Rotate log files
await maintenance.rotate_log_files()
logger.info("Log rotation completed successfully")
# Manage storage quotas
quota_results = await maintenance.manage_storage_quotas()
if quota_results.get("warnings"):
for warning in quota_results["warnings"]:
logger.warning(warning)
logger.info(f"Storage cleanup: removed {quota_results.get('cleaned', 0)} temporary files")
except Exception as e:
logger.error(f"Error in weekly maintenance: {str(e)}")
async def run_periodic_tasks():
"""Run periodic maintenance tasks"""
daily_maintenance_run = False
weekly_maintenance_run = False
while True:
try:
now = datetime.utcnow()
# Check event reminders every minute
await check_event_reminders()
# Clean up old notifications daily
await cleanup_old_notifications()
# Run daily maintenance at 2 AM
if now.hour == 2 and not daily_maintenance_run:
await perform_daily_maintenance()
daily_maintenance_run = True
elif now.hour != 2:
daily_maintenance_run = False
# Run weekly maintenance on Sunday at 3 AM
if now.weekday() == 6 and now.hour == 3 and not weekly_maintenance_run:
await perform_weekly_maintenance()
weekly_maintenance_run = True
elif now.weekday() != 6 or now.hour != 3:
weekly_maintenance_run = False
# Wait before next check
await asyncio.sleep(60) # 1 minute
except Exception as e:
logger.error(f"Error in periodic tasks: {str(e)}")
await asyncio.sleep(60) # Wait before retrying