Spaces:
Paused
Paused
| from typing import Optional | |
| from fastapi import APIRouter, Depends, HTTPException, Query | |
| from sqlalchemy.orm import Session | |
| from app.modules.users.schemas import UserResponse | |
| from app.modules.users.service import UserService | |
| from app.services.infrastructure.auth_service import auth_service | |
| from app.services.infrastructure.storage.database_service import ( | |
| db_service, | |
| ) # Legacy fallback for bulk ops if not yet migrated | |
| from core.api_models import ( | |
| BulkOperationRequest, | |
| BulkOperationResponse, | |
| PaginationResponse, | |
| ) | |
| from core.database import get_db | |
| router = APIRouter() | |
| def get_user_service(db: Session = Depends(get_db)) -> UserService: | |
| return UserService(db) | |
| async def get_current_user_profile( | |
| current_user: dict = Depends(auth_service.get_current_user), | |
| service: UserService = Depends(get_user_service), | |
| ): | |
| """Get current authenticated user profile""" | |
| user = service.get_user(current_user["id"]) | |
| return user | |
| async def update_user_preferences( | |
| preferences: dict, | |
| current_user: dict = Depends(auth_service.get_current_user), | |
| user_service: UserService = Depends(get_user_service), | |
| ): | |
| """Update current user preferences""" | |
| try: | |
| user_id = current_user.get("id") | |
| success = user_service.update_user_preferences(user_id, preferences) | |
| if success: | |
| return {"status": "success", "preferences": preferences} | |
| else: | |
| raise HTTPException(status_code=400, detail="Failed to update preferences") | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, detail=f"Failed to update preferences: {str(e)}" | |
| ) | |
| async def get_user( | |
| user_id: str, | |
| current_user: dict = Depends(auth_service.get_current_user), | |
| service: UserService = Depends(get_user_service), | |
| ): | |
| """Get user by ID""" | |
| return service.get_user(user_id) | |
| async def get_users( | |
| page: int = Query(1, ge=1), | |
| page_size: int = Query(20, ge=1, le=100), | |
| q: Optional[str] = None, | |
| role: Optional[str] = None, | |
| department: Optional[str] = None, | |
| sort_by: Optional[str] = None, | |
| sort_order: str = "asc", | |
| status: Optional[str] = None, | |
| current_user: dict = Depends(auth_service.get_current_user), | |
| service: UserService = Depends(get_user_service), | |
| ): | |
| """Get users with pagination""" | |
| filters = { | |
| "q": q, | |
| "role": role, | |
| "department": department, | |
| "status": status, | |
| "sort_by": sort_by, | |
| "sort_order": sort_order, | |
| } | |
| offset = (page - 1) * page_size | |
| users, total = service.get_users_paginated( | |
| offset=offset, limit=page_size, filters=filters | |
| ) | |
| users_data = [UserResponse.model_validate(u) for u in users] | |
| pagination_response = PaginationResponse.create( | |
| page=page, page_size=page_size, total_items=total | |
| ) | |
| return {"users": users_data, "pagination": pagination_response.dict()} | |
| async def bulk_user_operations( | |
| request: BulkOperationRequest, | |
| current_user: dict = Depends(auth_service.get_current_user), | |
| ): | |
| """ | |
| Perform bulk operations. | |
| Delegating to legacy db_service for now as bulk logic is complex and not yet in UserService. | |
| """ | |
| # Reuse the exact logic from old router via db_service to ensure 100% compatibility | |
| # Copy-pasting logic or calling db_service directly is better than rewriting for now. | |
| # We will invoke the same logic as the old router. | |
| # ... (Implementing the same calls to db_service as existing router) | |
| # Since I cannot easily import the function from the other router, I will replicate the calls to db_service. | |
| successful = 0 | |
| errors = [] | |
| # Simplified replication: | |
| op = request.operation | |
| for uid in request.ids: | |
| try: | |
| success = False | |
| if op == "delete": | |
| success = db_service.delete_user(uid) | |
| elif op == "update": | |
| success = db_service.update_user(uid, request.data) | |
| elif op == "activate": | |
| success = db_service.update_user(uid, {"is_active": True}) | |
| elif op == "deactivate": | |
| success = db_service.update_user(uid, {"is_active": False}) | |
| else: | |
| raise HTTPException( | |
| status_code=400, detail=f"Unsupported operation: {op}" | |
| ) | |
| if success: | |
| successful += 1 | |
| else: | |
| errors.append({"user_id": uid, "error": "Operation failed"}) | |
| except Exception as e: | |
| errors.append({"user_id": uid, "error": str(e)}) | |
| return BulkOperationResponse( | |
| operation=request.operation, | |
| total_requested=len(request.ids), | |
| successful=successful, | |
| failed=len(errors), | |
| errors=errors if errors else None, | |
| ) | |