Admin-Desk / app /services /maintenance.py
Fred808's picture
Update app/services/maintenance.py
73e4339 verified
import os
import shutil
import psutil
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
from sqlalchemy import select, delete, update, func
from ..db.database import db
from ..utils.logger import logger
from ..core.config import settings
from ..services.websocket import create_and_broadcast_notification
from ..db.models import User, Order, Notification, Session
class MaintenanceService:
async def cleanup_expired_sessions(self) -> int:
"""Clean up expired sessions"""
try:
cutoff = datetime.utcnow() - timedelta(days=7)
async with db.session() as session:
stmt = delete(Session).where(Session.last_activity < cutoff)
result = await session.execute(stmt)
await session.commit()
return result.rowcount
except Exception as e:
logger.error(f"Error cleaning up sessions: {str(e)}")
return 0
async def archive_old_data(self) -> Dict[str, int]:
"""Archive old data"""
try:
cutoff = datetime.utcnow() - timedelta(days=365)
archived = {}
async with db.session() as session:
# Archive old orders
order_stmt = update(Order).where(
Order.created_at < cutoff,
Order.status.in_(["completed", "cancelled"])
).values(archived=True)
order_result = await session.execute(order_stmt)
archived["orders"] = order_result.rowcount
# Archive old notifications
notif_stmt = update(Notification).where(
Notification.created_at < cutoff,
Notification.read == True
).values(archived=True)
notif_result = await session.execute(notif_stmt)
archived["notifications"] = notif_result.rowcount
await session.commit()
return archived
except Exception as e:
logger.error(f"Error archiving old data: {str(e)}")
return None
async def check_system_health(self) -> Dict[str, Any]:
"""Check system health"""
try:
async with db.session() as session:
# Check database connection by running a simple query
await session.execute(select(func.now()))
# Get database size (using psutil for disk stats)
disk = psutil.disk_usage('/')
total_space = disk.total / (1024 * 1024 * 1024) # GB
free_space = disk.free / (1024 * 1024 * 1024) # GB
health_data = {
"status": "healthy",
"database": {
"connected": True
},
"disk": {
"total_gb": total_space,
"free_gb": free_space,
"usage_percent": disk.percent
},
"timestamp": datetime.utcnow()
}
# Send alert if disk space is low
if free_space < 5: # Less than 5GB free
await create_and_broadcast_notification(
user_id="admin",
title="Low Disk Space Alert",
message=f"Server is running low on disk space. Only {free_space:.2f}GB remaining.",
notification_type="system_alert",
data={"free_space_gb": free_space}
)
return health_data
except Exception as e:
logger.error(f"Health check error: {str(e)}")
return {"status": "unhealthy", "error": str(e)}
async def monitor_system_resources(self) -> Dict[str, Any]:
"""Monitor system resources"""
try:
async with db.session() as session:
# Get current active connections (using psutil for process stats)
process = psutil.Process()
open_files = process.open_files()
connections = len([f for f in open_files if 'socket' in str(f.path)])
resources = {
"database": {
"connections": connections,
},
"system": {
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent
},
"timestamp": datetime.utcnow()
}
# Alert if too many connections or high resource usage
if connections > settings.MAX_DB_CONNECTIONS * 0.9:
await create_and_broadcast_notification(
user_id="admin",
title="High Database Connections",
message=f"Database has {connections} active connections",
notification_type="system_alert",
data={"connections": connections}
)
return resources
except Exception as e:
logger.error(f"Resource monitoring error: {str(e)}")
return {"error": str(e)}
async def perform_database_maintenance(self) -> Dict[str, Any]:
"""Perform database maintenance tasks"""
try:
async with db.session() as session:
# Run ANALYZE on major tables
for table in [User, Order, Notification]:
await session.execute(f"ANALYZE {table.__tablename__}")
# Clean up any orphaned records
# For example, delete notifications for non-existent users
stmt = delete(Notification).where(
~Notification.user_id.in_(
select(User.id)
)
)
await session.execute(stmt)
await session.commit()
return {"status": "success"}
except Exception as e:
logger.error(f"Database maintenance error: {str(e)}")
return {"error": str(e)}
async def rotate_log_files(self) -> None:
"""Rotate log files"""
log_dir = "logs"
max_log_size = 10 * 1024 * 1024 # 10MB
try:
for filename in os.listdir(log_dir):
filepath = os.path.join(log_dir, filename)
if os.path.getsize(filepath) > max_log_size:
# Archive old log
archive_name = f"{filename}.{datetime.now().strftime('%Y%m%d')}"
shutil.move(filepath, os.path.join(log_dir, archive_name))
# Create new log file
open(filepath, 'a').close()
logger.info(f"Rotated log file: {filename}")
except Exception as e:
logger.error(f"Log rotation error: {str(e)}")
async def manage_storage_quotas(self) -> Dict[str, Any]:
"""Manage storage quotas and cleanup"""
try:
results = {
"warnings": [],
"cleaned": 0
}
# Check and clean upload directories
upload_dirs = ["uploads/documents", "uploads/images"]
for directory in upload_dirs:
if os.path.exists(directory):
total_size = sum(
os.path.getsize(os.path.join(directory, f))
for f in os.listdir(directory)
if os.path.isfile(os.path.join(directory, f))
) / (1024 * 1024) # Convert to MB
if total_size > settings.MAX_UPLOAD_DIR_SIZE_MB:
results["warnings"].append(
f"Upload directory {directory} exceeds size limit"
)
# Clean up temporary files
temp_dirs = ["uploads/temp", "backups/temp"]
for directory in temp_dirs:
if os.path.exists(directory):
# Remove files older than 24 hours
cutoff = datetime.now() - timedelta(days=1)
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
if os.path.getctime(filepath) < cutoff.timestamp():
os.remove(filepath)
results["cleaned"] += 1
return results
except Exception as e:
logger.error(f"Storage quota management error: {str(e)}")
return {"error": str(e)}
maintenance = MaintenanceService()