Spaces:
Paused
Paused
| import os | |
| import shutil | |
| import psutil | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Any, Optional | |
| from sqlalchemy import select, delete, update, func | |
| from ..db.database import db | |
| from ..utils.logger import logger | |
| from ..core.config import settings | |
| from ..services.websocket import create_and_broadcast_notification | |
| from ..db.models import User, Order, Notification, Session | |
| from ..utils.cache import cache | |
| from sqlalchemy import text | |
| import redis.asyncio as redis | |
| import asyncio | |
| from ..utils.retry import retry_with_backoff, circuit_breaker | |
| class MaintenanceService: | |
| def __init__(self): | |
| self._last_sync_time = None | |
| self._background_tasks = {} | |
| self.last_health_check = None | |
| self.last_maintenance = None | |
| def get_timestamp(self) -> str: | |
| """Get current timestamp in ISO format""" | |
| return datetime.utcnow().isoformat() | |
| def get_last_sync_time(self) -> str: | |
| """Get last sync timestamp""" | |
| return self._last_sync_time or "Never" | |
| def get_next_sync_time(self) -> str: | |
| """Calculate next scheduled sync time""" | |
| if not self._last_sync_time: | |
| return "Not scheduled" | |
| last_sync = datetime.fromisoformat(self._last_sync_time) | |
| next_sync = last_sync + timedelta(minutes=15) # 15-minute sync interval | |
| return next_sync.isoformat() | |
| async def check_database(self) -> Dict[str, Any]: | |
| """Check database connectivity and performance metrics""" | |
| try: | |
| async with db() as session: | |
| # Check basic connectivity | |
| await session.execute(select(1)) | |
| # Check database stats | |
| result = await session.execute( | |
| text(""" | |
| SELECT | |
| (SELECT count(*) FROM pg_stat_activity) as active_connections, | |
| (SELECT count(*) FROM pg_stat_activity WHERE state = 'active') as active_queries, | |
| (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') as idle_connections, | |
| (SELECT round(sum(blks_hit)*100/sum(blks_hit + blks_read), 2) | |
| FROM pg_stat_database) as cache_hit_ratio | |
| """) | |
| ) | |
| stats = result.mappings().one() | |
| return { | |
| "status": "healthy", | |
| "active_connections": stats["active_connections"], | |
| "active_queries": stats["active_queries"], | |
| "idle_connections": stats["idle_connections"], | |
| "cache_hit_ratio": stats["cache_hit_ratio"] | |
| } | |
| except Exception as e: | |
| logger.error(f"Database health check error: {str(e)}") | |
| return { | |
| "status": "unhealthy", | |
| "error": str(e) | |
| } | |
| async def check_redis(self) -> Dict[str, Any]: | |
| """Check Redis connectivity and performance""" | |
| try: | |
| # Test Redis connection | |
| await cache.redis.ping() | |
| # Get Redis info | |
| info = await cache.redis.info() | |
| return { | |
| "status": "healthy", | |
| "connected_clients": info["connected_clients"], | |
| "used_memory_human": info["used_memory_human"], | |
| "total_connections_received": info["total_connections_received"], | |
| "uptime_in_seconds": info["uptime_in_seconds"] | |
| } | |
| except Exception as e: | |
| logger.error(f"Redis health check error: {str(e)}") | |
| return { | |
| "status": "unhealthy", | |
| "error": str(e) | |
| } | |
| async def check_background_tasks(self) -> Dict[str, Any]: | |
| """Check status of background tasks""" | |
| active_tasks = [] | |
| completed_tasks = [] | |
| failed_tasks = [] | |
| for task_id, task_info in self._background_tasks.items(): | |
| status = { | |
| "id": task_id, | |
| "name": task_info["name"], | |
| "started_at": task_info["started_at"].isoformat(), | |
| "last_updated": task_info["last_updated"].isoformat() | |
| } | |
| if task_info["task"].done(): | |
| if task_info["task"].exception(): | |
| failed_tasks.append({ | |
| **status, | |
| "error": str(task_info["task"].exception()) | |
| }) | |
| else: | |
| completed_tasks.append(status) | |
| else: | |
| active_tasks.append(status) | |
| return { | |
| "status": "healthy" if not failed_tasks else "warning", | |
| "active_tasks": active_tasks, | |
| "completed_tasks": completed_tasks, | |
| "failed_tasks": failed_tasks | |
| } | |
| async def register_background_task( | |
| self, | |
| task: asyncio.Task, | |
| name: str | |
| ) -> str: | |
| """Register a new background task for monitoring""" | |
| task_id = f"{name}_{datetime.utcnow().timestamp()}" | |
| self._background_tasks[task_id] = { | |
| "task": task, | |
| "name": name, | |
| "started_at": datetime.utcnow(), | |
| "last_updated": datetime.utcnow() | |
| } | |
| return task_id | |
| async def update_task_status(self, task_id: str): | |
| """Update last_updated timestamp for a task""" | |
| if task_id in self._background_tasks: | |
| self._background_tasks[task_id]["last_updated"] = datetime.utcnow() | |
| async def cleanup_completed_tasks(self): | |
| """Remove completed tasks older than 24 hours""" | |
| current_time = datetime.utcnow() | |
| to_remove = [] | |
| for task_id, task_info in self._background_tasks.items(): | |
| if task_info["task"].done(): | |
| age = current_time - task_info["last_updated"] | |
| if age > timedelta(hours=24): | |
| to_remove.append(task_id) | |
| for task_id in to_remove: | |
| del self._background_tasks[task_id] | |
| async def perform_maintenance(self) -> Dict[str, Any]: | |
| """Perform system maintenance tasks""" | |
| async with self.circuit_breaker: | |
| try: | |
| # Clean up old background tasks | |
| await self.cleanup_completed_tasks() | |
| # Clean up expired cache entries | |
| await cache.cleanup_expired() | |
| # Run database maintenance | |
| async with db() as session: | |
| # Clean up old sessions | |
| await session.execute( | |
| text("DELETE FROM sessions WHERE last_activity < NOW() - INTERVAL '7 days'") | |
| ) | |
| # Clean up old notifications | |
| await session.execute( | |
| text(""" | |
| DELETE FROM notifications | |
| WHERE created_at < NOW() - INTERVAL '30 days' | |
| AND read = true | |
| """) | |
| ) | |
| # Analyze tables for query optimization | |
| await session.execute(text("ANALYZE")) | |
| await session.commit() | |
| return { | |
| "status": "success", | |
| "message": "Maintenance completed successfully", | |
| "timestamp": self.get_timestamp() | |
| } | |
| except Exception as e: | |
| logger.error(f"Maintenance error: {str(e)}") | |
| return { | |
| "status": "error", | |
| "message": f"Maintenance failed: {str(e)}", | |
| "timestamp": self.get_timestamp() | |
| } | |
| async def cleanup_expired_sessions(self) -> int: | |
| """Clean up expired sessions""" | |
| try: | |
| cutoff = datetime.utcnow() - timedelta(days=7) | |
| async with db.session() as session: | |
| stmt = delete(Session).where(Session.last_activity < cutoff) | |
| result = await session.execute(stmt) | |
| await session.commit() | |
| return result.rowcount | |
| except Exception as e: | |
| logger.error(f"Error cleaning up sessions: {str(e)}") | |
| return 0 | |
| async def archive_old_data(self) -> Dict[str, int]: | |
| """Archive old data""" | |
| try: | |
| cutoff = datetime.utcnow() - timedelta(days=365) | |
| archived = {} | |
| async with db.session() as session: | |
| # Archive old orders | |
| order_stmt = update(Order).where( | |
| Order.created_at < cutoff, | |
| Order.status.in_(["completed", "cancelled"]) | |
| ).values(archived=True) | |
| order_result = await session.execute(order_stmt) | |
| archived["orders"] = order_result.rowcount | |
| # Archive old notifications | |
| notif_stmt = update(Notification).where( | |
| Notification.created_at < cutoff, | |
| Notification.read == True | |
| ).values(archived=True) | |
| notif_result = await session.execute(notif_stmt) | |
| archived["notifications"] = notif_result.rowcount | |
| await session.commit() | |
| return archived | |
| except Exception as e: | |
| logger.error(f"Error archiving old data: {str(e)}") | |
| return None | |
| async def check_system_health(self) -> Dict[str, Any]: | |
| """Check system health""" | |
| try: | |
| async with db.session() as session: | |
| # Check database connection by running a simple query | |
| await session.execute(select(func.now())) | |
| # Get database size (using psutil for disk stats) | |
| disk = psutil.disk_usage('/') | |
| total_space = disk.total / (1024 * 1024 * 1024) # GB | |
| free_space = disk.free / (1024 * 1024 * 1024) # GB | |
| health_data = { | |
| "status": "healthy", | |
| "database": { | |
| "connected": True | |
| }, | |
| "disk": { | |
| "total_gb": total_space, | |
| "free_gb": free_space, | |
| "usage_percent": disk.percent | |
| }, | |
| "timestamp": datetime.utcnow() | |
| } | |
| # Send alert if disk space is low | |
| if free_space < 5: # Less than 5GB free | |
| await create_and_broadcast_notification( | |
| user_id="admin", | |
| title="Low Disk Space Alert", | |
| message=f"Server is running low on disk space. Only {free_space:.2f}GB remaining.", | |
| notification_type="system_alert", | |
| data={"free_space_gb": free_space} | |
| ) | |
| return health_data | |
| except Exception as e: | |
| logger.error(f"Health check error: {str(e)}") | |
| return {"status": "unhealthy", "error": str(e)} | |
| async def monitor_system_resources(self) -> Dict[str, Any]: | |
| """Monitor system resources""" | |
| try: | |
| async with db.session() as session: | |
| # Get current active connections (using psutil for process stats) | |
| process = psutil.Process() | |
| open_files = process.open_files() | |
| connections = len([f for f in open_files if 'socket' in str(f.path)]) | |
| resources = { | |
| "database": { | |
| "connections": connections, | |
| }, | |
| "system": { | |
| "cpu_percent": psutil.cpu_percent(), | |
| "memory_percent": psutil.virtual_memory().percent | |
| }, | |
| "timestamp": datetime.utcnow() | |
| } | |
| # Alert if too many connections or high resource usage | |
| if connections > settings.MAX_DB_CONNECTIONS * 0.9: | |
| await create_and_broadcast_notification( | |
| user_id="admin", | |
| title="High Database Connections", | |
| message=f"Database has {connections} active connections", | |
| notification_type="system_alert", | |
| data={"connections": connections} | |
| ) | |
| return resources | |
| except Exception as e: | |
| logger.error(f"Resource monitoring error: {str(e)}") | |
| return {"error": str(e)} | |
| async def perform_database_maintenance(self) -> Dict[str, Any]: | |
| """Perform database maintenance tasks""" | |
| try: | |
| async with db.session() as session: | |
| # Run ANALYZE on major tables | |
| for table in [User, Order, Notification]: | |
| await session.execute(f"ANALYZE {table.__tablename__}") | |
| # Clean up any orphaned records | |
| # For example, delete notifications for non-existent users | |
| stmt = delete(Notification).where( | |
| ~Notification.user_id.in_( | |
| select(User.id) | |
| ) | |
| ) | |
| await session.execute(stmt) | |
| await session.commit() | |
| return {"status": "success"} | |
| except Exception as e: | |
| logger.error(f"Database maintenance error: {str(e)}") | |
| return {"error": str(e)} | |
| async def rotate_log_files(self) -> None: | |
| """Rotate log files""" | |
| log_dir = "logs" | |
| max_log_size = 10 * 1024 * 1024 # 10MB | |
| try: | |
| for filename in os.listdir(log_dir): | |
| filepath = os.path.join(log_dir, filename) | |
| if os.path.getsize(filepath) > max_log_size: | |
| # Archive old log | |
| archive_name = f"{filename}.{datetime.now().strftime('%Y%m%d')}" | |
| shutil.move(filepath, os.path.join(log_dir, archive_name)) | |
| # Create new log file | |
| open(filepath, 'a').close() | |
| logger.info(f"Rotated log file: {filename}") | |
| except Exception as e: | |
| logger.error(f"Log rotation error: {str(e)}") | |
| async def manage_storage_quotas(self) -> Dict[str, Any]: | |
| """Manage storage quotas and cleanup""" | |
| try: | |
| results = { | |
| "warnings": [], | |
| "cleaned": 0 | |
| } | |
| # Check and clean upload directories | |
| upload_dirs = ["uploads/documents", "uploads/images"] | |
| for directory in upload_dirs: | |
| if os.path.exists(directory): | |
| total_size = sum( | |
| os.path.getsize(os.path.join(directory, f)) | |
| for f in os.listdir(directory) | |
| if os.path.isfile(os.path.join(directory, f)) | |
| ) / (1024 * 1024) # Convert to MB | |
| if total_size > settings.MAX_UPLOAD_DIR_SIZE_MB: | |
| results["warnings"].append( | |
| f"Upload directory {directory} exceeds size limit" | |
| ) | |
| # Clean up temporary files | |
| temp_dirs = ["uploads/temp", "backups/temp"] | |
| for directory in temp_dirs: | |
| if os.path.exists(directory): | |
| # Remove files older than 24 hours | |
| cutoff = datetime.now() - timedelta(days=1) | |
| for filename in os.listdir(directory): | |
| filepath = os.path.join(directory, filename) | |
| if os.path.getctime(filepath) < cutoff.timestamp(): | |
| os.remove(filepath) | |
| results["cleaned"] += 1 | |
| return results | |
| except Exception as e: | |
| logger.error(f"Storage quota management error: {str(e)}") | |
| return {"error": str(e)} | |
| def _calculate_storage_usage(self, storage_type: str) -> Dict[str, Any]: | |
| """Helper method to calculate storage usage""" | |
| base_path = settings.UPLOAD_DIR / storage_type | |
| total_size = sum(f.stat().st_size for f in base_path.glob('**/*') if f.is_file()) | |
| quota = getattr(settings, f"{storage_type.upper()}_QUOTA_BYTES") | |
| usage_percent = (total_size / quota) * 100 if quota > 0 else 0 | |
| return { | |
| "current_size": total_size, | |
| "quota": quota, | |
| "usage_percent": round(usage_percent, 2) | |
| } | |
| maintenance_service = MaintenanceService() |