Spaces:
Paused
Paused
File size: 5,103 Bytes
4ae946d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app.modules.users.schemas import UserResponse
from app.modules.users.service import UserService
from app.services.infrastructure.auth_service import auth_service
from app.services.infrastructure.storage.database_service import (
db_service,
) # Legacy fallback for bulk ops if not yet migrated
from core.api_models import (
BulkOperationRequest,
BulkOperationResponse,
PaginationResponse,
)
from core.database import get_db
router = APIRouter()
def get_user_service(db: Session = Depends(get_db)) -> UserService:
return UserService(db)
@router.get("/me", response_model=UserResponse)
async def get_current_user_profile(
current_user: dict = Depends(auth_service.get_current_user),
service: UserService = Depends(get_user_service),
):
"""Get current authenticated user profile"""
user = service.get_user(current_user["id"])
return user
@router.put("/users/me/preferences")
async def update_user_preferences(
preferences: dict,
current_user: dict = Depends(auth_service.get_current_user),
user_service: UserService = Depends(get_user_service),
):
"""Update current user preferences"""
try:
user_id = current_user.get("id")
success = user_service.update_user_preferences(user_id, preferences)
if success:
return {"status": "success", "preferences": preferences}
else:
raise HTTPException(status_code=400, detail="Failed to update preferences")
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to update preferences: {str(e)}"
)
@router.get("/users/{user_id}", response_model=UserResponse)
async def get_user(
user_id: str,
current_user: dict = Depends(auth_service.get_current_user),
service: UserService = Depends(get_user_service),
):
"""Get user by ID"""
return service.get_user(user_id)
@router.get("/users", response_model=dict)
async def get_users(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
q: Optional[str] = None,
role: Optional[str] = None,
department: Optional[str] = None,
sort_by: Optional[str] = None,
sort_order: str = "asc",
status: Optional[str] = None,
current_user: dict = Depends(auth_service.get_current_user),
service: UserService = Depends(get_user_service),
):
"""Get users with pagination"""
filters = {
"q": q,
"role": role,
"department": department,
"status": status,
"sort_by": sort_by,
"sort_order": sort_order,
}
offset = (page - 1) * page_size
users, total = service.get_users_paginated(
offset=offset, limit=page_size, filters=filters
)
users_data = [UserResponse.model_validate(u) for u in users]
pagination_response = PaginationResponse.create(
page=page, page_size=page_size, total_items=total
)
return {"users": users_data, "pagination": pagination_response.dict()}
@router.post("/users/bulk", response_model=BulkOperationResponse)
async def bulk_user_operations(
request: BulkOperationRequest,
current_user: dict = Depends(auth_service.get_current_user),
):
"""
Perform bulk operations.
Delegating to legacy db_service for now as bulk logic is complex and not yet in UserService.
"""
# Reuse the exact logic from old router via db_service to ensure 100% compatibility
# Copy-pasting logic or calling db_service directly is better than rewriting for now.
# We will invoke the same logic as the old router.
# ... (Implementing the same calls to db_service as existing router)
# Since I cannot easily import the function from the other router, I will replicate the calls to db_service.
successful = 0
errors = []
# Simplified replication:
op = request.operation
for uid in request.ids:
try:
success = False
if op == "delete":
success = db_service.delete_user(uid)
elif op == "update":
success = db_service.update_user(uid, request.data)
elif op == "activate":
success = db_service.update_user(uid, {"is_active": True})
elif op == "deactivate":
success = db_service.update_user(uid, {"is_active": False})
else:
raise HTTPException(
status_code=400, detail=f"Unsupported operation: {op}"
)
if success:
successful += 1
else:
errors.append({"user_id": uid, "error": "Operation failed"})
except Exception as e:
errors.append({"user_id": uid, "error": str(e)})
return BulkOperationResponse(
operation=request.operation,
total_requested=len(request.ids),
successful=successful,
failed=len(errors),
errors=errors if errors else None,
)
|