Spaces:
Paused
Paused
| import os | |
| import shutil | |
| import psutil | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Any, Optional | |
| from sqlalchemy import select, delete, update, func | |
| from ..db.database import db | |
| from ..utils.logger import logger | |
| from ..core.config import settings | |
| from ..services.websocket import create_and_broadcast_notification | |
| from ..db.models import User, Order, Notification, Session | |
| class MaintenanceService: | |
| async def cleanup_expired_sessions(self) -> int: | |
| """Clean up expired sessions""" | |
| try: | |
| cutoff = datetime.utcnow() - timedelta(days=7) | |
| async with db.session() as session: | |
| stmt = delete(Session).where(Session.last_activity < cutoff) | |
| result = await session.execute(stmt) | |
| await session.commit() | |
| return result.rowcount | |
| except Exception as e: | |
| logger.error(f"Error cleaning up sessions: {str(e)}") | |
| return 0 | |
| async def archive_old_data(self) -> Dict[str, int]: | |
| """Archive old data""" | |
| try: | |
| cutoff = datetime.utcnow() - timedelta(days=365) | |
| archived = {} | |
| async with db.session() as session: | |
| # Archive old orders | |
| order_stmt = update(Order).where( | |
| Order.created_at < cutoff, | |
| Order.status.in_(["completed", "cancelled"]) | |
| ).values(archived=True) | |
| order_result = await session.execute(order_stmt) | |
| archived["orders"] = order_result.rowcount | |
| # Archive old notifications | |
| notif_stmt = update(Notification).where( | |
| Notification.created_at < cutoff, | |
| Notification.read == True | |
| ).values(archived=True) | |
| notif_result = await session.execute(notif_stmt) | |
| archived["notifications"] = notif_result.rowcount | |
| await session.commit() | |
| return archived | |
| except Exception as e: | |
| logger.error(f"Error archiving old data: {str(e)}") | |
| return None | |
| async def check_system_health(self) -> Dict[str, Any]: | |
| """Check system health""" | |
| try: | |
| async with db.session() as session: | |
| # Check database connection by running a simple query | |
| await session.execute(select(func.now())) | |
| # Get database size (using psutil for disk stats) | |
| disk = psutil.disk_usage('/') | |
| total_space = disk.total / (1024 * 1024 * 1024) # GB | |
| free_space = disk.free / (1024 * 1024 * 1024) # GB | |
| health_data = { | |
| "status": "healthy", | |
| "database": { | |
| "connected": True | |
| }, | |
| "disk": { | |
| "total_gb": total_space, | |
| "free_gb": free_space, | |
| "usage_percent": disk.percent | |
| }, | |
| "timestamp": datetime.utcnow() | |
| } | |
| # Send alert if disk space is low | |
| if free_space < 5: # Less than 5GB free | |
| await create_and_broadcast_notification( | |
| user_id="admin", | |
| title="Low Disk Space Alert", | |
| message=f"Server is running low on disk space. Only {free_space:.2f}GB remaining.", | |
| notification_type="system_alert", | |
| data={"free_space_gb": free_space} | |
| ) | |
| return health_data | |
| except Exception as e: | |
| logger.error(f"Health check error: {str(e)}") | |
| return {"status": "unhealthy", "error": str(e)} | |
| async def monitor_system_resources(self) -> Dict[str, Any]: | |
| """Monitor system resources""" | |
| try: | |
| async with db.session() as session: | |
| # Get current active connections (using psutil for process stats) | |
| process = psutil.Process() | |
| open_files = process.open_files() | |
| connections = len([f for f in open_files if 'socket' in str(f.path)]) | |
| resources = { | |
| "database": { | |
| "connections": connections, | |
| }, | |
| "system": { | |
| "cpu_percent": psutil.cpu_percent(), | |
| "memory_percent": psutil.virtual_memory().percent | |
| }, | |
| "timestamp": datetime.utcnow() | |
| } | |
| # Alert if too many connections or high resource usage | |
| if connections > settings.MAX_DB_CONNECTIONS * 0.9: | |
| await create_and_broadcast_notification( | |
| user_id="admin", | |
| title="High Database Connections", | |
| message=f"Database has {connections} active connections", | |
| notification_type="system_alert", | |
| data={"connections": connections} | |
| ) | |
| return resources | |
| except Exception as e: | |
| logger.error(f"Resource monitoring error: {str(e)}") | |
| return {"error": str(e)} | |
| async def perform_database_maintenance(self) -> Dict[str, Any]: | |
| """Perform database maintenance tasks""" | |
| try: | |
| async with db.session() as session: | |
| # Run ANALYZE on major tables | |
| for table in [User, Order, Notification]: | |
| await session.execute(f"ANALYZE {table.__tablename__}") | |
| # Clean up any orphaned records | |
| # For example, delete notifications for non-existent users | |
| stmt = delete(Notification).where( | |
| ~Notification.user_id.in_( | |
| select(User.id) | |
| ) | |
| ) | |
| await session.execute(stmt) | |
| await session.commit() | |
| return {"status": "success"} | |
| except Exception as e: | |
| logger.error(f"Database maintenance error: {str(e)}") | |
| return {"error": str(e)} | |
| async def rotate_log_files(self) -> None: | |
| """Rotate log files""" | |
| log_dir = "logs" | |
| max_log_size = 10 * 1024 * 1024 # 10MB | |
| try: | |
| for filename in os.listdir(log_dir): | |
| filepath = os.path.join(log_dir, filename) | |
| if os.path.getsize(filepath) > max_log_size: | |
| # Archive old log | |
| archive_name = f"{filename}.{datetime.now().strftime('%Y%m%d')}" | |
| shutil.move(filepath, os.path.join(log_dir, archive_name)) | |
| # Create new log file | |
| open(filepath, 'a').close() | |
| logger.info(f"Rotated log file: {filename}") | |
| except Exception as e: | |
| logger.error(f"Log rotation error: {str(e)}") | |
| async def manage_storage_quotas(self) -> Dict[str, Any]: | |
| """Manage storage quotas and cleanup""" | |
| try: | |
| results = { | |
| "warnings": [], | |
| "cleaned": 0 | |
| } | |
| # Check and clean upload directories | |
| upload_dirs = ["uploads/documents", "uploads/images"] | |
| for directory in upload_dirs: | |
| if os.path.exists(directory): | |
| total_size = sum( | |
| os.path.getsize(os.path.join(directory, f)) | |
| for f in os.listdir(directory) | |
| if os.path.isfile(os.path.join(directory, f)) | |
| ) / (1024 * 1024) # Convert to MB | |
| if total_size > settings.MAX_UPLOAD_DIR_SIZE_MB: | |
| results["warnings"].append( | |
| f"Upload directory {directory} exceeds size limit" | |
| ) | |
| # Clean up temporary files | |
| temp_dirs = ["uploads/temp", "backups/temp"] | |
| for directory in temp_dirs: | |
| if os.path.exists(directory): | |
| # Remove files older than 24 hours | |
| cutoff = datetime.now() - timedelta(days=1) | |
| for filename in os.listdir(directory): | |
| filepath = os.path.join(directory, filename) | |
| if os.path.getctime(filepath) < cutoff.timestamp(): | |
| os.remove(filepath) | |
| results["cleaned"] += 1 | |
| return results | |
| except Exception as e: | |
| logger.error(f"Storage quota management error: {str(e)}") | |
| return {"error": str(e)} | |
| maintenance = MaintenanceService() |