Spaces:
Paused
Paused
File size: 8,915 Bytes
73e4339 a3b84cc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 | import os
import shutil
import psutil
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
from sqlalchemy import select, delete, update, func
from ..db.database import db
from ..utils.logger import logger
from ..core.config import settings
from ..services.websocket import create_and_broadcast_notification
from ..db.models import User, Order, Notification, Session
class MaintenanceService:
async def cleanup_expired_sessions(self) -> int:
"""Clean up expired sessions"""
try:
cutoff = datetime.utcnow() - timedelta(days=7)
async with db.session() as session:
stmt = delete(Session).where(Session.last_activity < cutoff)
result = await session.execute(stmt)
await session.commit()
return result.rowcount
except Exception as e:
logger.error(f"Error cleaning up sessions: {str(e)}")
return 0
async def archive_old_data(self) -> Dict[str, int]:
"""Archive old data"""
try:
cutoff = datetime.utcnow() - timedelta(days=365)
archived = {}
async with db.session() as session:
# Archive old orders
order_stmt = update(Order).where(
Order.created_at < cutoff,
Order.status.in_(["completed", "cancelled"])
).values(archived=True)
order_result = await session.execute(order_stmt)
archived["orders"] = order_result.rowcount
# Archive old notifications
notif_stmt = update(Notification).where(
Notification.created_at < cutoff,
Notification.read == True
).values(archived=True)
notif_result = await session.execute(notif_stmt)
archived["notifications"] = notif_result.rowcount
await session.commit()
return archived
except Exception as e:
logger.error(f"Error archiving old data: {str(e)}")
return None
async def check_system_health(self) -> Dict[str, Any]:
"""Check system health"""
try:
async with db.session() as session:
# Check database connection by running a simple query
await session.execute(select(func.now()))
# Get database size (using psutil for disk stats)
disk = psutil.disk_usage('/')
total_space = disk.total / (1024 * 1024 * 1024) # GB
free_space = disk.free / (1024 * 1024 * 1024) # GB
health_data = {
"status": "healthy",
"database": {
"connected": True
},
"disk": {
"total_gb": total_space,
"free_gb": free_space,
"usage_percent": disk.percent
},
"timestamp": datetime.utcnow()
}
# Send alert if disk space is low
if free_space < 5: # Less than 5GB free
await create_and_broadcast_notification(
user_id="admin",
title="Low Disk Space Alert",
message=f"Server is running low on disk space. Only {free_space:.2f}GB remaining.",
notification_type="system_alert",
data={"free_space_gb": free_space}
)
return health_data
except Exception as e:
logger.error(f"Health check error: {str(e)}")
return {"status": "unhealthy", "error": str(e)}
async def monitor_system_resources(self) -> Dict[str, Any]:
"""Monitor system resources"""
try:
async with db.session() as session:
# Get current active connections (using psutil for process stats)
process = psutil.Process()
open_files = process.open_files()
connections = len([f for f in open_files if 'socket' in str(f.path)])
resources = {
"database": {
"connections": connections,
},
"system": {
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent
},
"timestamp": datetime.utcnow()
}
# Alert if too many connections or high resource usage
if connections > settings.MAX_DB_CONNECTIONS * 0.9:
await create_and_broadcast_notification(
user_id="admin",
title="High Database Connections",
message=f"Database has {connections} active connections",
notification_type="system_alert",
data={"connections": connections}
)
return resources
except Exception as e:
logger.error(f"Resource monitoring error: {str(e)}")
return {"error": str(e)}
async def perform_database_maintenance(self) -> Dict[str, Any]:
"""Perform database maintenance tasks"""
try:
async with db.session() as session:
# Run ANALYZE on major tables
for table in [User, Order, Notification]:
await session.execute(f"ANALYZE {table.__tablename__}")
# Clean up any orphaned records
# For example, delete notifications for non-existent users
stmt = delete(Notification).where(
~Notification.user_id.in_(
select(User.id)
)
)
await session.execute(stmt)
await session.commit()
return {"status": "success"}
except Exception as e:
logger.error(f"Database maintenance error: {str(e)}")
return {"error": str(e)}
async def rotate_log_files(self) -> None:
"""Rotate log files"""
log_dir = "logs"
max_log_size = 10 * 1024 * 1024 # 10MB
try:
for filename in os.listdir(log_dir):
filepath = os.path.join(log_dir, filename)
if os.path.getsize(filepath) > max_log_size:
# Archive old log
archive_name = f"{filename}.{datetime.now().strftime('%Y%m%d')}"
shutil.move(filepath, os.path.join(log_dir, archive_name))
# Create new log file
open(filepath, 'a').close()
logger.info(f"Rotated log file: {filename}")
except Exception as e:
logger.error(f"Log rotation error: {str(e)}")
async def manage_storage_quotas(self) -> Dict[str, Any]:
"""Manage storage quotas and cleanup"""
try:
results = {
"warnings": [],
"cleaned": 0
}
# Check and clean upload directories
upload_dirs = ["uploads/documents", "uploads/images"]
for directory in upload_dirs:
if os.path.exists(directory):
total_size = sum(
os.path.getsize(os.path.join(directory, f))
for f in os.listdir(directory)
if os.path.isfile(os.path.join(directory, f))
) / (1024 * 1024) # Convert to MB
if total_size > settings.MAX_UPLOAD_DIR_SIZE_MB:
results["warnings"].append(
f"Upload directory {directory} exceeds size limit"
)
# Clean up temporary files
temp_dirs = ["uploads/temp", "backups/temp"]
for directory in temp_dirs:
if os.path.exists(directory):
# Remove files older than 24 hours
cutoff = datetime.now() - timedelta(days=1)
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
if os.path.getctime(filepath) < cutoff.timestamp():
os.remove(filepath)
results["cleaned"] += 1
return results
except Exception as e:
logger.error(f"Storage quota management error: {str(e)}")
return {"error": str(e)}
maintenance = MaintenanceService() |