Spaces:
Running
Running
| from __future__ import annotations | |
| from typing import Annotated | |
| from fastapi import APIRouter, Depends, Request, status | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from app.config import get_settings | |
| from app.core.auth.deps import get_current_user, get_db, get_temp_db_warning, require_application_id | |
| from app.core.auth.models import RefreshSession, User | |
| from app.core.auth.schemas import ( | |
| ChangePasswordSchema, | |
| ForgotPasswordSchema, | |
| LoginSchema, | |
| MessageDataResponse, | |
| ProfileResponse, | |
| RegisterSchema, | |
| ResetPasswordSchema, | |
| SchemaResponse, | |
| SessionListResponse, | |
| SessionOut, | |
| TokenRefreshSchema, | |
| TokenResponse, | |
| UpdateProfileSchema, | |
| UserSchemaField, | |
| UserSchemaResponse, | |
| DataResponse, | |
| ) | |
| from app.services.auth_service import AuthService | |
| router = APIRouter(prefix="/auth", tags=["Authentication"]) | |
| _settings = get_settings() | |
| def _build(data, warning): | |
| result = {"success": True, "data": data} | |
| if _settings.application_id: | |
| result["application_id"] = _settings.application_id | |
| if warning: | |
| result["warning"] = warning | |
| return result | |
| async def register( | |
| schema: RegisterSchema, | |
| db: Annotated[AsyncSession, Depends(get_db)], | |
| _: Annotated[bool, Depends(require_application_id)], | |
| ): | |
| user = await AuthService.register(db, schema) | |
| warning = get_temp_db_warning() | |
| return _build(AuthService.user_to_profile(user).model_dump(), warning) | |
| async def login( | |
| schema: LoginSchema, | |
| request: Request, | |
| db: Annotated[AsyncSession, Depends(get_db)], | |
| _: Annotated[bool, Depends(require_application_id)], | |
| ): | |
| tokens = await AuthService.login(db, request, schema) | |
| warning = get_temp_db_warning() | |
| return _build(tokens.model_dump(), warning) | |
| async def refresh_token( | |
| schema: TokenRefreshSchema, | |
| db: Annotated[AsyncSession, Depends(get_db)], | |
| _: Annotated[bool, Depends(require_application_id)], | |
| ): | |
| tokens = await AuthService.refresh(db, schema.refresh_token) | |
| warning = get_temp_db_warning() | |
| return _build(tokens.model_dump(), warning) | |
| async def logout( | |
| schema: TokenRefreshSchema, | |
| current_user: Annotated[User, Depends(get_current_user)], | |
| db: Annotated[AsyncSession, Depends(get_db)], | |
| _: Annotated[bool, Depends(require_application_id)], | |
| ): | |
| await AuthService.logout(db, current_user, schema.refresh_token) | |
| warning = get_temp_db_warning() | |
| return _build({"message": "Logged out successfully"}, warning) | |
| async def logout_all( | |
| current_user: Annotated[User, Depends(get_current_user)], | |
| db: Annotated[AsyncSession, Depends(get_db)], | |
| _: Annotated[bool, Depends(require_application_id)], | |
| ): | |
| await AuthService.logout_all(db, current_user) | |
| warning = get_temp_db_warning() | |
| return _build({"message": "All sessions revoked"}, warning) | |
| async def forgot_password( | |
| schema: ForgotPasswordSchema, | |
| db: Annotated[AsyncSession, Depends(get_db)], | |
| _: Annotated[bool, Depends(require_application_id)], | |
| ): | |
| await AuthService.forgot_password(db, schema) | |
| warning = get_temp_db_warning() | |
| return _build({"message": "If that email exists, a password reset link has been sent."}, warning) | |
| async def reset_password( | |
| schema: ResetPasswordSchema, | |
| db: Annotated[AsyncSession, Depends(get_db)], | |
| _: Annotated[bool, Depends(require_application_id)], | |
| ): | |
| await AuthService.reset_password(db, schema) | |
| warning = get_temp_db_warning() | |
| return _build({"message": "Password reset successfully"}, warning) | |
| async def change_password( | |
| schema: ChangePasswordSchema, | |
| current_user: Annotated[User, Depends(get_current_user)], | |
| db: Annotated[AsyncSession, Depends(get_db)], | |
| _: Annotated[bool, Depends(require_application_id)], | |
| ): | |
| await AuthService.change_password(db, current_user, schema) | |
| warning = get_temp_db_warning() | |
| return _build({"message": "Password changed successfully"}, warning) | |
| async def get_me( | |
| current_user: Annotated[User, Depends(get_current_user)], | |
| _: Annotated[bool, Depends(require_application_id)], | |
| ): | |
| warning = get_temp_db_warning() | |
| return _build(AuthService.user_to_profile(current_user).model_dump(), warning) | |
| async def update_me( | |
| schema: UpdateProfileSchema, | |
| current_user: Annotated[User, Depends(get_current_user)], | |
| db: Annotated[AsyncSession, Depends(get_db)], | |
| _: Annotated[bool, Depends(require_application_id)], | |
| ): | |
| user = await AuthService.update_profile(db, current_user, schema) | |
| warning = get_temp_db_warning() | |
| return _build(AuthService.user_to_profile(user).model_dump(), warning) | |
| async def delete_me( | |
| current_user: Annotated[User, Depends(get_current_user)], | |
| db: Annotated[AsyncSession, Depends(get_db)], | |
| _: Annotated[bool, Depends(require_application_id)], | |
| ): | |
| await AuthService.soft_delete(db, current_user) | |
| warning = get_temp_db_warning() | |
| return _build({"message": "Account deleted successfully"}, warning) | |
| async def get_user_schema( | |
| _: Annotated[bool, Depends(require_application_id)], | |
| ): | |
| columns = [ | |
| UserSchemaField(field="id", type="string (UUID)", required=True, description="Unique user identifier", constraints="Auto-generated"), | |
| UserSchemaField(field="email", type="string", required=True, description="User email address", constraints="Unique, max 255 chars"), | |
| UserSchemaField(field="username", type="string", required=False, description="Unique username", constraints="Unique, 2-50 chars, alphanumeric + underscore"), | |
| UserSchemaField(field="full_name", type="string", required=False, description="Display name", constraints="Max 100 chars"), | |
| UserSchemaField(field="password", type="string", required=True, description="User password (write-only)", constraints="Min 8 chars, uppercase, lowercase, digit"), | |
| UserSchemaField(field="password_hash", type="string", required=True, description="Argon2id password hash (internal)", constraints="Auto-generated"), | |
| UserSchemaField(field="is_active", type="boolean", required=False, description="Whether the user account is active", constraints="Default: true"), | |
| UserSchemaField(field="is_verified", type="boolean", required=False, description="Whether the email is verified", constraints="Default: false"), | |
| UserSchemaField(field="failed_login_attempts", type="integer", required=False, description="Consecutive failed login count", constraints="Default: 0"), | |
| UserSchemaField(field="locked_until", type="datetime (ISO 8601)", required=False, description="Account lock expiry", constraints="Nullable"), | |
| UserSchemaField(field="last_login", type="datetime (ISO 8601)", required=False, description="Last successful login timestamp", constraints="Nullable"), | |
| UserSchemaField(field="password_changed_at", type="datetime (ISO 8601)", required=False, description="Last password change", constraints="Nullable"), | |
| UserSchemaField(field="created_at", type="datetime (ISO 8601)", required=False, description="Account creation timestamp", constraints="Auto-set"), | |
| UserSchemaField(field="updated_at", type="datetime (ISO 8601)", required=False, description="Last update timestamp", constraints="Auto-updated"), | |
| UserSchemaField(field="deleted_at", type="datetime (ISO 8601)", required=False, description="Soft delete timestamp", constraints="Nullable"), | |
| UserSchemaField(field="roles", type="array[string]", required=False, description="Assigned role names", constraints="Via user_roles association table"), | |
| ] | |
| warning = get_temp_db_warning() | |
| return _build(UserSchemaResponse(table_name="users", columns=columns).model_dump(), warning) | |
| async def list_sessions( | |
| current_user: Annotated[User, Depends(get_current_user)], | |
| db: Annotated[AsyncSession, Depends(get_db)], | |
| _: Annotated[bool, Depends(require_application_id)], | |
| ): | |
| sessions = await AuthService.list_sessions(db, current_user) | |
| warning = get_temp_db_warning() | |
| return _build([SessionOut.model_validate(s).model_dump() for s in sessions], warning) | |
| async def revoke_session( | |
| session_id: str, | |
| current_user: Annotated[User, Depends(get_current_user)], | |
| db: Annotated[AsyncSession, Depends(get_db)], | |
| _: Annotated[bool, Depends(require_application_id)], | |
| ): | |
| await AuthService.revoke_session(db, current_user, session_id) | |
| warning = get_temp_db_warning() | |
| return _build({"message": "Session revoked successfully"}, warning) | |