import os import shutil import psutil from datetime import datetime, timedelta from typing import Dict, Any, Optional from sqlalchemy import select, delete, update, func from ..db.database import db from ..utils.logger import logger from ..core.config import settings from ..services.websocket import create_and_broadcast_notification from ..db.models import User, Order, Notification, Session class MaintenanceService: async def cleanup_expired_sessions(self) -> int: """Clean up expired sessions""" try: cutoff = datetime.utcnow() - timedelta(days=7) async with db.session() as session: stmt = delete(Session).where(Session.last_activity < cutoff) result = await session.execute(stmt) await session.commit() return result.rowcount except Exception as e: logger.error(f"Error cleaning up sessions: {str(e)}") return 0 async def archive_old_data(self) -> Dict[str, int]: """Archive old data""" try: cutoff = datetime.utcnow() - timedelta(days=365) archived = {} async with db.session() as session: # Archive old orders order_stmt = update(Order).where( Order.created_at < cutoff, Order.status.in_(["completed", "cancelled"]) ).values(archived=True) order_result = await session.execute(order_stmt) archived["orders"] = order_result.rowcount # Archive old notifications notif_stmt = update(Notification).where( Notification.created_at < cutoff, Notification.read == True ).values(archived=True) notif_result = await session.execute(notif_stmt) archived["notifications"] = notif_result.rowcount await session.commit() return archived except Exception as e: logger.error(f"Error archiving old data: {str(e)}") return None async def check_system_health(self) -> Dict[str, Any]: """Check system health""" try: async with db.session() as session: # Check database connection by running a simple query await session.execute(select(func.now())) # Get database size (using psutil for disk stats) disk = psutil.disk_usage('/') total_space = disk.total / (1024 * 1024 * 1024) # GB free_space = disk.free / (1024 * 1024 * 1024) # GB health_data = { "status": "healthy", "database": { "connected": True }, "disk": { "total_gb": total_space, "free_gb": free_space, "usage_percent": disk.percent }, "timestamp": datetime.utcnow() } # Send alert if disk space is low if free_space < 5: # Less than 5GB free await create_and_broadcast_notification( user_id="admin", title="Low Disk Space Alert", message=f"Server is running low on disk space. Only {free_space:.2f}GB remaining.", notification_type="system_alert", data={"free_space_gb": free_space} ) return health_data except Exception as e: logger.error(f"Health check error: {str(e)}") return {"status": "unhealthy", "error": str(e)} async def monitor_system_resources(self) -> Dict[str, Any]: """Monitor system resources""" try: async with db.session() as session: # Get current active connections (using psutil for process stats) process = psutil.Process() open_files = process.open_files() connections = len([f for f in open_files if 'socket' in str(f.path)]) resources = { "database": { "connections": connections, }, "system": { "cpu_percent": psutil.cpu_percent(), "memory_percent": psutil.virtual_memory().percent }, "timestamp": datetime.utcnow() } # Alert if too many connections or high resource usage if connections > settings.MAX_DB_CONNECTIONS * 0.9: await create_and_broadcast_notification( user_id="admin", title="High Database Connections", message=f"Database has {connections} active connections", notification_type="system_alert", data={"connections": connections} ) return resources except Exception as e: logger.error(f"Resource monitoring error: {str(e)}") return {"error": str(e)} async def perform_database_maintenance(self) -> Dict[str, Any]: """Perform database maintenance tasks""" try: async with db.session() as session: # Run ANALYZE on major tables for table in [User, Order, Notification]: await session.execute(f"ANALYZE {table.__tablename__}") # Clean up any orphaned records # For example, delete notifications for non-existent users stmt = delete(Notification).where( ~Notification.user_id.in_( select(User.id) ) ) await session.execute(stmt) await session.commit() return {"status": "success"} except Exception as e: logger.error(f"Database maintenance error: {str(e)}") return {"error": str(e)} async def rotate_log_files(self) -> None: """Rotate log files""" log_dir = "logs" max_log_size = 10 * 1024 * 1024 # 10MB try: for filename in os.listdir(log_dir): filepath = os.path.join(log_dir, filename) if os.path.getsize(filepath) > max_log_size: # Archive old log archive_name = f"{filename}.{datetime.now().strftime('%Y%m%d')}" shutil.move(filepath, os.path.join(log_dir, archive_name)) # Create new log file open(filepath, 'a').close() logger.info(f"Rotated log file: {filename}") except Exception as e: logger.error(f"Log rotation error: {str(e)}") async def manage_storage_quotas(self) -> Dict[str, Any]: """Manage storage quotas and cleanup""" try: results = { "warnings": [], "cleaned": 0 } # Check and clean upload directories upload_dirs = ["uploads/documents", "uploads/images"] for directory in upload_dirs: if os.path.exists(directory): total_size = sum( os.path.getsize(os.path.join(directory, f)) for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f)) ) / (1024 * 1024) # Convert to MB if total_size > settings.MAX_UPLOAD_DIR_SIZE_MB: results["warnings"].append( f"Upload directory {directory} exceeds size limit" ) # Clean up temporary files temp_dirs = ["uploads/temp", "backups/temp"] for directory in temp_dirs: if os.path.exists(directory): # Remove files older than 24 hours cutoff = datetime.now() - timedelta(days=1) for filename in os.listdir(directory): filepath = os.path.join(directory, filename) if os.path.getctime(filepath) < cutoff.timestamp(): os.remove(filepath) results["cleaned"] += 1 return results except Exception as e: logger.error(f"Storage quota management error: {str(e)}") return {"error": str(e)} maintenance = MaintenanceService()