import os import shutil import psutil from datetime import datetime, timedelta from typing import Dict, Any, Optional from sqlalchemy import select, delete, update, func from ..db.database import db from ..utils.logger import logger from ..core.config import settings from ..services.websocket import create_and_broadcast_notification from ..db.models import User, Order, Notification, Session from ..utils.cache import cache from sqlalchemy import text import redis.asyncio as redis import asyncio from ..utils.retry import retry_with_backoff, circuit_breaker class MaintenanceService: def __init__(self): self._last_sync_time = None self._background_tasks = {} self.last_health_check = None self.last_maintenance = None def get_timestamp(self) -> str: """Get current timestamp in ISO format""" return datetime.utcnow().isoformat() def get_last_sync_time(self) -> str: """Get last sync timestamp""" return self._last_sync_time or "Never" def get_next_sync_time(self) -> str: """Calculate next scheduled sync time""" if not self._last_sync_time: return "Not scheduled" last_sync = datetime.fromisoformat(self._last_sync_time) next_sync = last_sync + timedelta(minutes=15) # 15-minute sync interval return next_sync.isoformat() @retry_with_backoff(max_retries=3, initial_delay=1.0) async def check_database(self) -> Dict[str, Any]: """Check database connectivity and performance metrics""" try: async with db() as session: # Check basic connectivity await session.execute(select(1)) # Check database stats result = await session.execute( text(""" SELECT (SELECT count(*) FROM pg_stat_activity) as active_connections, (SELECT count(*) FROM pg_stat_activity WHERE state = 'active') as active_queries, (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') as idle_connections, (SELECT round(sum(blks_hit)*100/sum(blks_hit + blks_read), 2) FROM pg_stat_database) as cache_hit_ratio """) ) stats = result.mappings().one() return { "status": "healthy", "active_connections": stats["active_connections"], "active_queries": stats["active_queries"], "idle_connections": stats["idle_connections"], "cache_hit_ratio": stats["cache_hit_ratio"] } except Exception as e: logger.error(f"Database health check error: {str(e)}") return { "status": "unhealthy", "error": str(e) } @retry_with_backoff(max_retries=2, initial_delay=0.5) async def check_redis(self) -> Dict[str, Any]: """Check Redis connectivity and performance""" try: # Test Redis connection await cache.redis.ping() # Get Redis info info = await cache.redis.info() return { "status": "healthy", "connected_clients": info["connected_clients"], "used_memory_human": info["used_memory_human"], "total_connections_received": info["total_connections_received"], "uptime_in_seconds": info["uptime_in_seconds"] } except Exception as e: logger.error(f"Redis health check error: {str(e)}") return { "status": "unhealthy", "error": str(e) } @circuit_breaker(failure_threshold=5, reset_timeout=60) async def check_background_tasks(self) -> Dict[str, Any]: """Check status of background tasks""" active_tasks = [] completed_tasks = [] failed_tasks = [] for task_id, task_info in self._background_tasks.items(): status = { "id": task_id, "name": task_info["name"], "started_at": task_info["started_at"].isoformat(), "last_updated": task_info["last_updated"].isoformat() } if task_info["task"].done(): if task_info["task"].exception(): failed_tasks.append({ **status, "error": str(task_info["task"].exception()) }) else: completed_tasks.append(status) else: active_tasks.append(status) return { "status": "healthy" if not failed_tasks else "warning", "active_tasks": active_tasks, "completed_tasks": completed_tasks, "failed_tasks": failed_tasks } async def register_background_task( self, task: asyncio.Task, name: str ) -> str: """Register a new background task for monitoring""" task_id = f"{name}_{datetime.utcnow().timestamp()}" self._background_tasks[task_id] = { "task": task, "name": name, "started_at": datetime.utcnow(), "last_updated": datetime.utcnow() } return task_id async def update_task_status(self, task_id: str): """Update last_updated timestamp for a task""" if task_id in self._background_tasks: self._background_tasks[task_id]["last_updated"] = datetime.utcnow() async def cleanup_completed_tasks(self): """Remove completed tasks older than 24 hours""" current_time = datetime.utcnow() to_remove = [] for task_id, task_info in self._background_tasks.items(): if task_info["task"].done(): age = current_time - task_info["last_updated"] if age > timedelta(hours=24): to_remove.append(task_id) for task_id in to_remove: del self._background_tasks[task_id] async def perform_maintenance(self) -> Dict[str, Any]: """Perform system maintenance tasks""" async with self.circuit_breaker: try: # Clean up old background tasks await self.cleanup_completed_tasks() # Clean up expired cache entries await cache.cleanup_expired() # Run database maintenance async with db() as session: # Clean up old sessions await session.execute( text("DELETE FROM sessions WHERE last_activity < NOW() - INTERVAL '7 days'") ) # Clean up old notifications await session.execute( text(""" DELETE FROM notifications WHERE created_at < NOW() - INTERVAL '30 days' AND read = true """) ) # Analyze tables for query optimization await session.execute(text("ANALYZE")) await session.commit() return { "status": "success", "message": "Maintenance completed successfully", "timestamp": self.get_timestamp() } except Exception as e: logger.error(f"Maintenance error: {str(e)}") return { "status": "error", "message": f"Maintenance failed: {str(e)}", "timestamp": self.get_timestamp() } @retry_with_backoff(max_retries=2) async def cleanup_expired_sessions(self) -> int: """Clean up expired sessions""" try: cutoff = datetime.utcnow() - timedelta(days=7) async with db.session() as session: stmt = delete(Session).where(Session.last_activity < cutoff) result = await session.execute(stmt) await session.commit() return result.rowcount except Exception as e: logger.error(f"Error cleaning up sessions: {str(e)}") return 0 async def archive_old_data(self) -> Dict[str, int]: """Archive old data""" try: cutoff = datetime.utcnow() - timedelta(days=365) archived = {} async with db.session() as session: # Archive old orders order_stmt = update(Order).where( Order.created_at < cutoff, Order.status.in_(["completed", "cancelled"]) ).values(archived=True) order_result = await session.execute(order_stmt) archived["orders"] = order_result.rowcount # Archive old notifications notif_stmt = update(Notification).where( Notification.created_at < cutoff, Notification.read == True ).values(archived=True) notif_result = await session.execute(notif_stmt) archived["notifications"] = notif_result.rowcount await session.commit() return archived except Exception as e: logger.error(f"Error archiving old data: {str(e)}") return None async def check_system_health(self) -> Dict[str, Any]: """Check system health""" try: async with db.session() as session: # Check database connection by running a simple query await session.execute(select(func.now())) # Get database size (using psutil for disk stats) disk = psutil.disk_usage('/') total_space = disk.total / (1024 * 1024 * 1024) # GB free_space = disk.free / (1024 * 1024 * 1024) # GB health_data = { "status": "healthy", "database": { "connected": True }, "disk": { "total_gb": total_space, "free_gb": free_space, "usage_percent": disk.percent }, "timestamp": datetime.utcnow() } # Send alert if disk space is low if free_space < 5: # Less than 5GB free await create_and_broadcast_notification( user_id="admin", title="Low Disk Space Alert", message=f"Server is running low on disk space. Only {free_space:.2f}GB remaining.", notification_type="system_alert", data={"free_space_gb": free_space} ) return health_data except Exception as e: logger.error(f"Health check error: {str(e)}") return {"status": "unhealthy", "error": str(e)} @retry_with_backoff(max_retries=2, initial_delay=1.0) async def monitor_system_resources(self) -> Dict[str, Any]: """Monitor system resources""" try: async with db.session() as session: # Get current active connections (using psutil for process stats) process = psutil.Process() open_files = process.open_files() connections = len([f for f in open_files if 'socket' in str(f.path)]) resources = { "database": { "connections": connections, }, "system": { "cpu_percent": psutil.cpu_percent(), "memory_percent": psutil.virtual_memory().percent }, "timestamp": datetime.utcnow() } # Alert if too many connections or high resource usage if connections > settings.MAX_DB_CONNECTIONS * 0.9: await create_and_broadcast_notification( user_id="admin", title="High Database Connections", message=f"Database has {connections} active connections", notification_type="system_alert", data={"connections": connections} ) return resources except Exception as e: logger.error(f"Resource monitoring error: {str(e)}") return {"error": str(e)} async def perform_database_maintenance(self) -> Dict[str, Any]: """Perform database maintenance tasks""" try: async with db.session() as session: # Run ANALYZE on major tables for table in [User, Order, Notification]: await session.execute(f"ANALYZE {table.__tablename__}") # Clean up any orphaned records # For example, delete notifications for non-existent users stmt = delete(Notification).where( ~Notification.user_id.in_( select(User.id) ) ) await session.execute(stmt) await session.commit() return {"status": "success"} except Exception as e: logger.error(f"Database maintenance error: {str(e)}") return {"error": str(e)} async def rotate_log_files(self) -> None: """Rotate log files""" log_dir = "logs" max_log_size = 10 * 1024 * 1024 # 10MB try: for filename in os.listdir(log_dir): filepath = os.path.join(log_dir, filename) if os.path.getsize(filepath) > max_log_size: # Archive old log archive_name = f"{filename}.{datetime.now().strftime('%Y%m%d')}" shutil.move(filepath, os.path.join(log_dir, archive_name)) # Create new log file open(filepath, 'a').close() logger.info(f"Rotated log file: {filename}") except Exception as e: logger.error(f"Log rotation error: {str(e)}") @retry_with_backoff(max_retries=3) async def manage_storage_quotas(self) -> Dict[str, Any]: """Manage storage quotas and cleanup""" try: results = { "warnings": [], "cleaned": 0 } # Check and clean upload directories upload_dirs = ["uploads/documents", "uploads/images"] for directory in upload_dirs: if os.path.exists(directory): total_size = sum( os.path.getsize(os.path.join(directory, f)) for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f)) ) / (1024 * 1024) # Convert to MB if total_size > settings.MAX_UPLOAD_DIR_SIZE_MB: results["warnings"].append( f"Upload directory {directory} exceeds size limit" ) # Clean up temporary files temp_dirs = ["uploads/temp", "backups/temp"] for directory in temp_dirs: if os.path.exists(directory): # Remove files older than 24 hours cutoff = datetime.now() - timedelta(days=1) for filename in os.listdir(directory): filepath = os.path.join(directory, filename) if os.path.getctime(filepath) < cutoff.timestamp(): os.remove(filepath) results["cleaned"] += 1 return results except Exception as e: logger.error(f"Storage quota management error: {str(e)}") return {"error": str(e)} def _calculate_storage_usage(self, storage_type: str) -> Dict[str, Any]: """Helper method to calculate storage usage""" base_path = settings.UPLOAD_DIR / storage_type total_size = sum(f.stat().st_size for f in base_path.glob('**/*') if f.is_file()) quota = getattr(settings, f"{storage_type.upper()}_QUOTA_BYTES") usage_percent = (total_size / quota) * 100 if quota > 0 else 0 return { "current_size": total_size, "quota": quota, "usage_percent": round(usage_percent, 2) } maintenance_service = MaintenanceService()