import asyncio from datetime import datetime, timedelta from ..db.database import db from ..services.maintenance import maintenance_service from ..utils.logger import logger from ..services.websocket import create_and_broadcast_notification from sqlalchemy import select, delete from ..db.models import Event, User, Notification from ..services.pos_analytics import pos_analytics async def check_event_reminders(): """Check and send event reminders""" try: now = datetime.utcnow() # Find events happening soon that haven't sent reminders stmt = select(Event).where( Event.start_time > now, Event.start_time <= now + timedelta(minutes=30), Event.reminder_sent == False ) async with db.session() as session: result = await session.execute(stmt) upcoming_events = result.scalars().all() for event in upcoming_events: # Get event owner user_stmt = select(User).where(User.id == event.user_id) user_result = await session.execute(user_stmt) user = user_result.scalar_one_or_none() if user: # Send reminder to event owner and attendees reminder_users = [str(user.id)] + event.attendees for user_id in reminder_users: await create_and_broadcast_notification( user_id=user_id, title=f"Event Reminder: {event.title}", message=f"Your event '{event.title}' starts in {event.reminder_minutes} minutes", notification_type="event_reminder", data={"event_id": str(event.id)} ) # Mark reminder as sent event.reminder_sent = True # Commit all changes at once await session.commit() except Exception as e: logger.error(f"Error in event reminder check: {str(e)}") async def cleanup_old_notifications(): """Clean up old notifications""" try: cutoff = datetime.utcnow() - timedelta(days=30) async with db.session() as session: # Delete notifications older than 30 days that have been read stmt = delete(Notification).where( Notification.created_at < cutoff, Notification.read == True ) result = await session.execute(stmt) await session.commit() # Get number of deleted rows deleted_count = result.rowcount logger.info(f"Cleaned up {deleted_count} old notifications") except Exception as e: logger.error(f"Error in cleanup_old_notifications: {str(e)}") async def perform_daily_maintenance(): """Perform daily system maintenance tasks""" try: # Clean up expired sessions deleted_sessions = await maintenance_service.cleanup_expired_sessions() logger.info(f"Cleaned up {deleted_sessions} expired sessions") # Archive old data archived = await maintenance_service.archive_old_data() if archived: logger.info(f"Archived data: {archived}") # Check system health health_data = await maintenance_service.check_system_health() if "error" not in health_data: logger.info("System health check completed successfully") else: logger.error(f"System health check error: {health_data['error']}") # Monitor system resources resources = await maintenance_service.monitor_system_resources() if "error" not in resources: logger.info("System resource monitoring completed successfully") else: logger.error(f"Resource monitoring error: {resources['error']}") except Exception as e: logger.error(f"Error in daily maintenance: {str(e)}") async def perform_weekly_maintenance(): """Perform weekly system maintenance tasks""" try: # Perform database maintenance await maintenance_service.perform_database_maintenance() logger.info("Database maintenance completed successfully") # Rotate log files await maintenance_service.rotate_log_files() logger.info("Log rotation completed successfully") # Manage storage quotas quota_results = await maintenance_service.manage_storage_quotas() if quota_results.get("warnings"): for warning in quota_results["warnings"]: logger.warning(warning) logger.info(f"Storage cleanup: removed {quota_results.get('cleaned', 0)} temporary files") except Exception as e: logger.error(f"Error in weekly maintenance: {str(e)}") async def sync_pos_metrics_task(): """Background task to sync POS metrics every 5 minutes""" while True: try: success = await pos_analytics.sync_all_metrics() if success: logger.info("Successfully synced POS metrics") else: logger.error("Failed to sync POS metrics") except Exception as e: logger.error(f"Error in POS metrics sync task: {str(e)}") # Wait for 5 minutes before next sync await asyncio.sleep(300) async def run_periodic_tasks(): """Run periodic maintenance tasks""" daily_maintenance_run = False weekly_maintenance_run = False while True: try: now = datetime.utcnow() # Check event reminders every minute await check_event_reminders() # Clean up old notifications daily await cleanup_old_notifications() # Run daily maintenance at 2 AM if now.hour == 2 and not daily_maintenance_run: await perform_daily_maintenance() daily_maintenance_run = True elif now.hour != 2: daily_maintenance_run = False # Run weekly maintenance on Sunday at 3 AM if now.weekday() == 6 and now.hour == 3 and not weekly_maintenance_run: await perform_weekly_maintenance() weekly_maintenance_run = True elif now.weekday() != 6 or now.hour != 3: weekly_maintenance_run = False # Wait before next check await asyncio.sleep(60) # 1 minute except Exception as e: logger.error(f"Error in periodic tasks: {str(e)}") await asyncio.sleep(60) # Wait before retrying