Spaces:
Paused
Paused
| from fastapi import APIRouter, Depends, HTTPException | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select, delete, func | |
| from typing import Dict, Any, List | |
| from datetime import datetime, timedelta | |
| from ..core.dependencies import get_current_active_user, get_db | |
| from ..db.database import get_db | |
| from ..db.models import User, Order, Notification, Event | |
| from ..utils.logger import logger | |
| from ..services.maintenance import maintenance_service | |
| from ..services.pos_analytics import pos_analytics | |
| import asyncio | |
| router = APIRouter() | |
| async def cleanup_sessions( | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> Dict[str, int]: | |
| """Manually trigger session cleanup""" | |
| if "admin" not in current_user.roles: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="Only administrators can perform maintenance operations" | |
| ) | |
| cutoff_date = datetime.utcnow() - timedelta(days=7) | |
| stmt = delete(Event).where(Event.created_at < cutoff_date) | |
| result = await db.execute(stmt) | |
| await db.commit() | |
| return {"deleted_sessions": result.rowcount} | |
| async def archive_data( | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> Dict[str, int]: | |
| """Manually trigger data archiving""" | |
| if "admin" not in current_user.roles: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="Only administrators can perform maintenance operations" | |
| ) | |
| archive_date = datetime.utcnow() - timedelta(days=365) | |
| archived = {} | |
| # Archive old orders | |
| orders_stmt = delete(Order).where( | |
| Order.created_at < archive_date, | |
| Order.status.in_(["delivered", "cancelled"]) | |
| ) | |
| orders_result = await db.execute(orders_stmt) | |
| archived["orders"] = orders_result.rowcount | |
| # Archive old notifications | |
| notif_stmt = delete(Notification).where( | |
| Notification.created_at < archive_date, | |
| Notification.read == True | |
| ) | |
| notif_result = await db.execute(notif_stmt) | |
| archived["notifications"] = notif_result.rowcount | |
| await db.commit() | |
| return archived | |
| async def check_health( | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> Dict[str, Any]: | |
| """Check system health metrics""" | |
| if "admin" not in current_user.roles: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="Only administrators can view system health" | |
| ) | |
| try: | |
| # Check database connection | |
| await db.execute(select(1)) | |
| # Get database statistics | |
| total_users = await db.scalar(select(func.count()).select_from(User)) | |
| total_orders = await db.scalar(select(func.count()).select_from(Order)) | |
| total_notifications = await db.scalar(select(func.count()).select_from(Notification)) | |
| return { | |
| "status": "healthy", | |
| "timestamp": datetime.utcnow(), | |
| "database": { | |
| "connected": True, | |
| "total_users": total_users, | |
| "total_orders": total_orders, | |
| "total_notifications": total_notifications | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Health check error: {str(e)}") | |
| return { | |
| "status": "unhealthy", | |
| "error": str(e), | |
| "timestamp": datetime.utcnow() | |
| } | |
| async def perform_db_maintenance( | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> Dict[str, Any]: | |
| """Manually trigger database maintenance""" | |
| if "admin" not in current_user.roles: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="Only administrators can perform maintenance operations" | |
| ) | |
| try: | |
| # Cleanup expired sessions | |
| await cleanup_sessions(current_user, db) | |
| # Run VACUUM ANALYZE (requires raw SQL) | |
| await db.execute("VACUUM ANALYZE;") | |
| return { | |
| "status": "success", | |
| "message": "Database maintenance completed successfully" | |
| } | |
| except Exception as e: | |
| logger.error(f"Database maintenance error: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Database maintenance failed: {str(e)}" | |
| ) | |
| async def health_check() -> Dict[str, Any]: | |
| """Public health check endpoint that doesn't require authentication""" | |
| return { | |
| "status": "healthy", | |
| "timestamp": maintenance_service.get_timestamp() | |
| } | |
| async def get_system_status( | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> Dict[str, Any]: | |
| """Get detailed system status including POS integration health""" | |
| if not current_user.is_superuser: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="Only superusers can access system status" | |
| ) | |
| # Check database connectivity | |
| db_status = await maintenance_service.check_database() | |
| # Check POS integration | |
| pos_status = await pos_analytics.health_check() | |
| # Check Redis connectivity | |
| redis_status = await maintenance_service.check_redis() | |
| # Check background tasks | |
| tasks_status = await maintenance_service.check_background_tasks() | |
| return { | |
| "status": "healthy", | |
| "timestamp": maintenance_service.get_timestamp(), | |
| "components": { | |
| "database": db_status, | |
| "redis": redis_status, | |
| "background_tasks": tasks_status, | |
| "pos_integration": pos_status | |
| } | |
| } | |
| async def trigger_pos_sync( | |
| current_user: User = Depends(get_current_active_user) | |
| ) -> Dict[str, Any]: | |
| """Manually trigger POS metrics synchronization""" | |
| if not current_user.is_superuser: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="Only superusers can trigger manual sync" | |
| ) | |
| try: | |
| success = await pos_analytics.sync_all_metrics() | |
| return { | |
| "status": "success" if success else "partial_failure", | |
| "message": "POS metrics sync completed successfully" if success else "Some metrics failed to sync" | |
| } | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Failed to sync POS metrics: {str(e)}" | |
| ) | |
| async def get_sync_status( | |
| current_user: User = Depends(get_current_active_user) | |
| ) -> Dict[str, Any]: | |
| """Get status of background sync tasks""" | |
| if not current_user.is_superuser: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="Only superusers can view sync status" | |
| ) | |
| tasks_status = await maintenance_service.check_background_tasks() | |
| pos_health = await pos_analytics.health_check() | |
| return { | |
| "background_tasks": tasks_status, | |
| "pos_integration": pos_health, | |
| "last_sync_timestamp": maintenance_service.get_last_sync_time(), | |
| "next_sync_due": maintenance_service.get_next_sync_time() | |
| } |