Admin-Desk2 / app /services /maintenance.py
Fred808's picture
Upload 94 files
1f8ac0c verified
import os
import shutil
import psutil
from datetime import datetime, timedelta
from typing import Dict, Any, Optional
from sqlalchemy import select, delete, update, func
from ..db.database import db
from ..utils.logger import logger
from ..core.config import settings
from ..services.websocket import create_and_broadcast_notification
from ..db.models import User, Order, Notification, Session
from ..utils.cache import cache
from sqlalchemy import text
import redis.asyncio as redis
import asyncio
from ..utils.retry import retry_with_backoff, circuit_breaker
class MaintenanceService:
def __init__(self):
self._last_sync_time = None
self._background_tasks = {}
self.last_health_check = None
self.last_maintenance = None
def get_timestamp(self) -> str:
"""Get current timestamp in ISO format"""
return datetime.utcnow().isoformat()
def get_last_sync_time(self) -> str:
"""Get last sync timestamp"""
return self._last_sync_time or "Never"
def get_next_sync_time(self) -> str:
"""Calculate next scheduled sync time"""
if not self._last_sync_time:
return "Not scheduled"
last_sync = datetime.fromisoformat(self._last_sync_time)
next_sync = last_sync + timedelta(minutes=15) # 15-minute sync interval
return next_sync.isoformat()
@retry_with_backoff(max_retries=3, initial_delay=1.0)
async def check_database(self) -> Dict[str, Any]:
"""Check database connectivity and performance metrics"""
try:
async with db() as session:
# Check basic connectivity
await session.execute(select(1))
# Check database stats
result = await session.execute(
text("""
SELECT
(SELECT count(*) FROM pg_stat_activity) as active_connections,
(SELECT count(*) FROM pg_stat_activity WHERE state = 'active') as active_queries,
(SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') as idle_connections,
(SELECT round(sum(blks_hit)*100/sum(blks_hit + blks_read), 2)
FROM pg_stat_database) as cache_hit_ratio
""")
)
stats = result.mappings().one()
return {
"status": "healthy",
"active_connections": stats["active_connections"],
"active_queries": stats["active_queries"],
"idle_connections": stats["idle_connections"],
"cache_hit_ratio": stats["cache_hit_ratio"]
}
except Exception as e:
logger.error(f"Database health check error: {str(e)}")
return {
"status": "unhealthy",
"error": str(e)
}
@retry_with_backoff(max_retries=2, initial_delay=0.5)
async def check_redis(self) -> Dict[str, Any]:
"""Check Redis connectivity and performance"""
try:
# Test Redis connection
await cache.redis.ping()
# Get Redis info
info = await cache.redis.info()
return {
"status": "healthy",
"connected_clients": info["connected_clients"],
"used_memory_human": info["used_memory_human"],
"total_connections_received": info["total_connections_received"],
"uptime_in_seconds": info["uptime_in_seconds"]
}
except Exception as e:
logger.error(f"Redis health check error: {str(e)}")
return {
"status": "unhealthy",
"error": str(e)
}
@circuit_breaker(failure_threshold=5, reset_timeout=60)
async def check_background_tasks(self) -> Dict[str, Any]:
"""Check status of background tasks"""
active_tasks = []
completed_tasks = []
failed_tasks = []
for task_id, task_info in self._background_tasks.items():
status = {
"id": task_id,
"name": task_info["name"],
"started_at": task_info["started_at"].isoformat(),
"last_updated": task_info["last_updated"].isoformat()
}
if task_info["task"].done():
if task_info["task"].exception():
failed_tasks.append({
**status,
"error": str(task_info["task"].exception())
})
else:
completed_tasks.append(status)
else:
active_tasks.append(status)
return {
"status": "healthy" if not failed_tasks else "warning",
"active_tasks": active_tasks,
"completed_tasks": completed_tasks,
"failed_tasks": failed_tasks
}
async def register_background_task(
self,
task: asyncio.Task,
name: str
) -> str:
"""Register a new background task for monitoring"""
task_id = f"{name}_{datetime.utcnow().timestamp()}"
self._background_tasks[task_id] = {
"task": task,
"name": name,
"started_at": datetime.utcnow(),
"last_updated": datetime.utcnow()
}
return task_id
async def update_task_status(self, task_id: str):
"""Update last_updated timestamp for a task"""
if task_id in self._background_tasks:
self._background_tasks[task_id]["last_updated"] = datetime.utcnow()
async def cleanup_completed_tasks(self):
"""Remove completed tasks older than 24 hours"""
current_time = datetime.utcnow()
to_remove = []
for task_id, task_info in self._background_tasks.items():
if task_info["task"].done():
age = current_time - task_info["last_updated"]
if age > timedelta(hours=24):
to_remove.append(task_id)
for task_id in to_remove:
del self._background_tasks[task_id]
async def perform_maintenance(self) -> Dict[str, Any]:
"""Perform system maintenance tasks"""
async with self.circuit_breaker:
try:
# Clean up old background tasks
await self.cleanup_completed_tasks()
# Clean up expired cache entries
await cache.cleanup_expired()
# Run database maintenance
async with db() as session:
# Clean up old sessions
await session.execute(
text("DELETE FROM sessions WHERE last_activity < NOW() - INTERVAL '7 days'")
)
# Clean up old notifications
await session.execute(
text("""
DELETE FROM notifications
WHERE created_at < NOW() - INTERVAL '30 days'
AND read = true
""")
)
# Analyze tables for query optimization
await session.execute(text("ANALYZE"))
await session.commit()
return {
"status": "success",
"message": "Maintenance completed successfully",
"timestamp": self.get_timestamp()
}
except Exception as e:
logger.error(f"Maintenance error: {str(e)}")
return {
"status": "error",
"message": f"Maintenance failed: {str(e)}",
"timestamp": self.get_timestamp()
}
@retry_with_backoff(max_retries=2)
async def cleanup_expired_sessions(self) -> int:
"""Clean up expired sessions"""
try:
cutoff = datetime.utcnow() - timedelta(days=7)
async with db.session() as session:
stmt = delete(Session).where(Session.last_activity < cutoff)
result = await session.execute(stmt)
await session.commit()
return result.rowcount
except Exception as e:
logger.error(f"Error cleaning up sessions: {str(e)}")
return 0
async def archive_old_data(self) -> Dict[str, int]:
"""Archive old data"""
try:
cutoff = datetime.utcnow() - timedelta(days=365)
archived = {}
async with db.session() as session:
# Archive old orders
order_stmt = update(Order).where(
Order.created_at < cutoff,
Order.status.in_(["completed", "cancelled"])
).values(archived=True)
order_result = await session.execute(order_stmt)
archived["orders"] = order_result.rowcount
# Archive old notifications
notif_stmt = update(Notification).where(
Notification.created_at < cutoff,
Notification.read == True
).values(archived=True)
notif_result = await session.execute(notif_stmt)
archived["notifications"] = notif_result.rowcount
await session.commit()
return archived
except Exception as e:
logger.error(f"Error archiving old data: {str(e)}")
return None
async def check_system_health(self) -> Dict[str, Any]:
"""Check system health"""
try:
async with db.session() as session:
# Check database connection by running a simple query
await session.execute(select(func.now()))
# Get database size (using psutil for disk stats)
disk = psutil.disk_usage('/')
total_space = disk.total / (1024 * 1024 * 1024) # GB
free_space = disk.free / (1024 * 1024 * 1024) # GB
health_data = {
"status": "healthy",
"database": {
"connected": True
},
"disk": {
"total_gb": total_space,
"free_gb": free_space,
"usage_percent": disk.percent
},
"timestamp": datetime.utcnow()
}
# Send alert if disk space is low
if free_space < 5: # Less than 5GB free
await create_and_broadcast_notification(
user_id="admin",
title="Low Disk Space Alert",
message=f"Server is running low on disk space. Only {free_space:.2f}GB remaining.",
notification_type="system_alert",
data={"free_space_gb": free_space}
)
return health_data
except Exception as e:
logger.error(f"Health check error: {str(e)}")
return {"status": "unhealthy", "error": str(e)}
@retry_with_backoff(max_retries=2, initial_delay=1.0)
async def monitor_system_resources(self) -> Dict[str, Any]:
"""Monitor system resources"""
try:
async with db.session() as session:
# Get current active connections (using psutil for process stats)
process = psutil.Process()
open_files = process.open_files()
connections = len([f for f in open_files if 'socket' in str(f.path)])
resources = {
"database": {
"connections": connections,
},
"system": {
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent
},
"timestamp": datetime.utcnow()
}
# Alert if too many connections or high resource usage
if connections > settings.MAX_DB_CONNECTIONS * 0.9:
await create_and_broadcast_notification(
user_id="admin",
title="High Database Connections",
message=f"Database has {connections} active connections",
notification_type="system_alert",
data={"connections": connections}
)
return resources
except Exception as e:
logger.error(f"Resource monitoring error: {str(e)}")
return {"error": str(e)}
async def perform_database_maintenance(self) -> Dict[str, Any]:
"""Perform database maintenance tasks"""
try:
async with db.session() as session:
# Run ANALYZE on major tables
for table in [User, Order, Notification]:
await session.execute(f"ANALYZE {table.__tablename__}")
# Clean up any orphaned records
# For example, delete notifications for non-existent users
stmt = delete(Notification).where(
~Notification.user_id.in_(
select(User.id)
)
)
await session.execute(stmt)
await session.commit()
return {"status": "success"}
except Exception as e:
logger.error(f"Database maintenance error: {str(e)}")
return {"error": str(e)}
async def rotate_log_files(self) -> None:
"""Rotate log files"""
log_dir = "logs"
max_log_size = 10 * 1024 * 1024 # 10MB
try:
for filename in os.listdir(log_dir):
filepath = os.path.join(log_dir, filename)
if os.path.getsize(filepath) > max_log_size:
# Archive old log
archive_name = f"{filename}.{datetime.now().strftime('%Y%m%d')}"
shutil.move(filepath, os.path.join(log_dir, archive_name))
# Create new log file
open(filepath, 'a').close()
logger.info(f"Rotated log file: {filename}")
except Exception as e:
logger.error(f"Log rotation error: {str(e)}")
@retry_with_backoff(max_retries=3)
async def manage_storage_quotas(self) -> Dict[str, Any]:
"""Manage storage quotas and cleanup"""
try:
results = {
"warnings": [],
"cleaned": 0
}
# Check and clean upload directories
upload_dirs = ["uploads/documents", "uploads/images"]
for directory in upload_dirs:
if os.path.exists(directory):
total_size = sum(
os.path.getsize(os.path.join(directory, f))
for f in os.listdir(directory)
if os.path.isfile(os.path.join(directory, f))
) / (1024 * 1024) # Convert to MB
if total_size > settings.MAX_UPLOAD_DIR_SIZE_MB:
results["warnings"].append(
f"Upload directory {directory} exceeds size limit"
)
# Clean up temporary files
temp_dirs = ["uploads/temp", "backups/temp"]
for directory in temp_dirs:
if os.path.exists(directory):
# Remove files older than 24 hours
cutoff = datetime.now() - timedelta(days=1)
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
if os.path.getctime(filepath) < cutoff.timestamp():
os.remove(filepath)
results["cleaned"] += 1
return results
except Exception as e:
logger.error(f"Storage quota management error: {str(e)}")
return {"error": str(e)}
def _calculate_storage_usage(self, storage_type: str) -> Dict[str, Any]:
"""Helper method to calculate storage usage"""
base_path = settings.UPLOAD_DIR / storage_type
total_size = sum(f.stat().st_size for f in base_path.glob('**/*') if f.is_file())
quota = getattr(settings, f"{storage_type.upper()}_QUOTA_BYTES")
usage_percent = (total_size / quota) * 100 if quota > 0 else 0
return {
"current_size": total_size,
"quota": quota,
"usage_percent": round(usage_percent, 2)
}
maintenance_service = MaintenanceService()