Spaces:
Paused
Paused
| import asyncio | |
| from datetime import datetime, timedelta | |
| from ..db.database import db | |
| from ..services.maintenance import maintenance_service | |
| from ..utils.logger import logger | |
| from ..services.websocket import create_and_broadcast_notification | |
| from sqlalchemy import select, delete | |
| from ..db.models import Event, User, Notification | |
| from ..services.pos_analytics import pos_analytics | |
| async def check_event_reminders(): | |
| """Check and send event reminders""" | |
| try: | |
| now = datetime.utcnow() | |
| # Find events happening soon that haven't sent reminders | |
| stmt = select(Event).where( | |
| Event.start_time > now, | |
| Event.start_time <= now + timedelta(minutes=30), | |
| Event.reminder_sent == False | |
| ) | |
| async with db.session() as session: | |
| result = await session.execute(stmt) | |
| upcoming_events = result.scalars().all() | |
| for event in upcoming_events: | |
| # Get event owner | |
| user_stmt = select(User).where(User.id == event.user_id) | |
| user_result = await session.execute(user_stmt) | |
| user = user_result.scalar_one_or_none() | |
| if user: | |
| # Send reminder to event owner and attendees | |
| reminder_users = [str(user.id)] + event.attendees | |
| for user_id in reminder_users: | |
| await create_and_broadcast_notification( | |
| user_id=user_id, | |
| title=f"Event Reminder: {event.title}", | |
| message=f"Your event '{event.title}' starts in {event.reminder_minutes} minutes", | |
| notification_type="event_reminder", | |
| data={"event_id": str(event.id)} | |
| ) | |
| # Mark reminder as sent | |
| event.reminder_sent = True | |
| # Commit all changes at once | |
| await session.commit() | |
| except Exception as e: | |
| logger.error(f"Error in event reminder check: {str(e)}") | |
| async def cleanup_old_notifications(): | |
| """Clean up old notifications""" | |
| try: | |
| cutoff = datetime.utcnow() - timedelta(days=30) | |
| async with db.session() as session: | |
| # Delete notifications older than 30 days that have been read | |
| stmt = delete(Notification).where( | |
| Notification.created_at < cutoff, | |
| Notification.read == True | |
| ) | |
| result = await session.execute(stmt) | |
| await session.commit() | |
| # Get number of deleted rows | |
| deleted_count = result.rowcount | |
| logger.info(f"Cleaned up {deleted_count} old notifications") | |
| except Exception as e: | |
| logger.error(f"Error in cleanup_old_notifications: {str(e)}") | |
| async def perform_daily_maintenance(): | |
| """Perform daily system maintenance tasks""" | |
| try: | |
| # Clean up expired sessions | |
| deleted_sessions = await maintenance_service.cleanup_expired_sessions() | |
| logger.info(f"Cleaned up {deleted_sessions} expired sessions") | |
| # Archive old data | |
| archived = await maintenance_service.archive_old_data() | |
| if archived: | |
| logger.info(f"Archived data: {archived}") | |
| # Check system health | |
| health_data = await maintenance_service.check_system_health() | |
| if "error" not in health_data: | |
| logger.info("System health check completed successfully") | |
| else: | |
| logger.error(f"System health check error: {health_data['error']}") | |
| # Monitor system resources | |
| resources = await maintenance_service.monitor_system_resources() | |
| if "error" not in resources: | |
| logger.info("System resource monitoring completed successfully") | |
| else: | |
| logger.error(f"Resource monitoring error: {resources['error']}") | |
| except Exception as e: | |
| logger.error(f"Error in daily maintenance: {str(e)}") | |
| async def perform_weekly_maintenance(): | |
| """Perform weekly system maintenance tasks""" | |
| try: | |
| # Perform database maintenance | |
| await maintenance_service.perform_database_maintenance() | |
| logger.info("Database maintenance completed successfully") | |
| # Rotate log files | |
| await maintenance_service.rotate_log_files() | |
| logger.info("Log rotation completed successfully") | |
| # Manage storage quotas | |
| quota_results = await maintenance_service.manage_storage_quotas() | |
| if quota_results.get("warnings"): | |
| for warning in quota_results["warnings"]: | |
| logger.warning(warning) | |
| logger.info(f"Storage cleanup: removed {quota_results.get('cleaned', 0)} temporary files") | |
| except Exception as e: | |
| logger.error(f"Error in weekly maintenance: {str(e)}") | |
| async def sync_pos_metrics_task(): | |
| """Background task to sync POS metrics every 5 minutes""" | |
| while True: | |
| try: | |
| success = await pos_analytics.sync_all_metrics() | |
| if success: | |
| logger.info("Successfully synced POS metrics") | |
| else: | |
| logger.error("Failed to sync POS metrics") | |
| except Exception as e: | |
| logger.error(f"Error in POS metrics sync task: {str(e)}") | |
| # Wait for 5 minutes before next sync | |
| await asyncio.sleep(300) | |
| async def run_periodic_tasks(): | |
| """Run periodic maintenance tasks""" | |
| daily_maintenance_run = False | |
| weekly_maintenance_run = False | |
| while True: | |
| try: | |
| now = datetime.utcnow() | |
| # Check event reminders every minute | |
| await check_event_reminders() | |
| # Clean up old notifications daily | |
| await cleanup_old_notifications() | |
| # Run daily maintenance at 2 AM | |
| if now.hour == 2 and not daily_maintenance_run: | |
| await perform_daily_maintenance() | |
| daily_maintenance_run = True | |
| elif now.hour != 2: | |
| daily_maintenance_run = False | |
| # Run weekly maintenance on Sunday at 3 AM | |
| if now.weekday() == 6 and now.hour == 3 and not weekly_maintenance_run: | |
| await perform_weekly_maintenance() | |
| weekly_maintenance_run = True | |
| elif now.weekday() != 6 or now.hour != 3: | |
| weekly_maintenance_run = False | |
| # Wait before next check | |
| await asyncio.sleep(60) # 1 minute | |
| except Exception as e: | |
| logger.error(f"Error in periodic tasks: {str(e)}") | |
| await asyncio.sleep(60) # Wait before retrying | |