kamau1's picture
fix(auth): properly update user password and use valid audit enum values
d9b6bad
"""
Authentication Endpoints - Supabase Auth Integration
"""
from fastapi import APIRouter, Depends, HTTPException, status, Request, Response
from sqlalchemy.orm import Session
from app.api.deps import get_db, get_current_active_user
from app.core.rate_limit import limiter
from app.schemas.auth import (
LoginRequest, TokenResponse, PasswordChange,
RefreshTokenRequest, ForgotPasswordRequest, ResetPasswordRequest, MessageResponse
)
from app.schemas.user import (
UserCreate, UserResponse, UserUpdate, UserProfile,
AdminOTPRequest, AdminRegistrationRequest
)
from app.schemas.user_preferences import (
UserPreferencesUpdate, UserPreferencesResponse,
DEFAULT_DASHBOARD_WIDGETS
)
from app.config.apps import (
get_available_apps_for_role,
get_available_app_codes_for_role,
get_default_favorites_for_role,
get_meta_apps_for_role,
validate_apps_for_role,
get_app_by_code,
APPS
)
from app.models.user_preference import UserPreference
from app.models.user import User
from app.core.supabase_auth import supabase_auth
from app.core.supabase_client import supabase_admin
from app.services.audit_service import AuditService
from app.services.password_reset_service import PasswordResetService
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/auth", tags=["Authentication"])
@router.post("/send-admin-otp", response_model=MessageResponse, status_code=status.HTTP_200_OK)
@limiter.limit("3/hour") # 3 OTP requests per hour per IP
async def send_admin_registration_otp(
request: Request,
response: Response,
otp_request: AdminOTPRequest,
db: Session = Depends(get_db)
):
"""
🔒 Step 1: Send OTP for Platform Admin Registration
**USER-FRIENDLY FLOW:**
User provides basic info (name, email, phone) → Admin receives OTP with details
**PROCESS:**
1. User submits: name, email, phone (NO PASSWORD YET)
2. System sends OTP to configured admin email
3. OTP email includes: registrant's name, email, phone
4. Admin verifies identity offline and shares OTP with user
5. User proceeds to /auth/register with OTP + password
**SECURITY MODEL:**
- Self-registration DISABLED for regular users (must use invitation flow)
- Platform admin creation requires OTP verification
- Admin email configured via PLATFORM_ADMIN_EMAIL environment variable
- Password is NOT sent or stored at this stage
**Request Body:**
```json
{
"email": "admin@example.com",
"first_name": "John",
"last_name": "Doe",
"phone": "+1234567890"
}
```
**Note:** Regular users (field agents, managers, etc.) must be invited by existing admins.
"""
from app.config import settings
from app.services.otp_service import OTPService
PLATFORM_ADMIN_EMAIL = settings.PLATFORM_ADMIN_EMAIL
try:
# Check if user already exists
existing_user = db.query(User).filter(User.email == otp_request.email).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Get OTP service
otp_service = OTPService.get_instance()
# Store basic registration data temporarily (30 minutes TTL)
# NOTE: Password is NOT stored here - will be provided in step 2
full_name = f"{otp_request.first_name} {otp_request.last_name}"
# Capture request metadata for security audit
from datetime import datetime
ip_address = request.client.host if request.client else "Unknown"
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
await otp_service.store_registration_data(
identifier=otp_request.email,
data={
"email": otp_request.email,
"name": full_name,
"first_name": otp_request.first_name,
"last_name": otp_request.last_name,
"phone": otp_request.phone,
"role": "platform_admin",
"registration_type": "platform_admin_otp",
"ip_address": ip_address,
"timestamp": timestamp
},
ttl=1800 # 30 minutes
)
# Generate and send OTP to admin email with registration context
otp_result = await otp_service.send_otp(
identifier=otp_request.email, # Storage key
channel="email",
recipient=PLATFORM_ADMIN_EMAIL, # Send to configured admin email
purpose="Platform Admin Registration",
db=db,
additional_metadata={
"admin_registration": True,
"registrant_name": full_name,
"registrant_email": otp_request.email,
"registrant_phone": otp_request.phone or "Not provided",
"ip_address": ip_address,
"timestamp": timestamp
}
)
# Audit log
AuditService.log_auth_event(
db=db,
action='register',
user_email=otp_request.email,
success=False, # Not completed yet
request=request,
reason=f"OTP sent to {PLATFORM_ADMIN_EMAIL}"
)
logger.info(f"Platform admin registration OTP sent for: {otp_request.email}")
return {
"message": f"✅ Registration request received! An OTP code has been sent to {PLATFORM_ADMIN_EMAIL} with your details (name, email, phone). "
f"Once the admin verifies your identity, they will share the OTP code with you. Then use /auth/register with the OTP and your password to complete registration."
}
except HTTPException:
raise
except Exception as e:
logger.error(f"OTP send error: {str(e)}")
AuditService.log_auth_event(
db=db,
action='register',
user_email=otp_request.email,
success=False,
request=request,
reason=f"OTP send failed: {str(e)}"
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Failed to send OTP: {str(e)}"
)
@router.post("/register", response_model=TokenResponse, status_code=status.HTTP_201_CREATED)
@limiter.limit("5/hour") # 5 registration attempts per hour per IP
async def register(
request: Request,
response: Response,
registration_data: AdminRegistrationRequest,
db: Session = Depends(get_db)
):
"""
🔒 Step 2: Complete Platform Admin Registration with OTP and Password
**USER-FRIENDLY FLOW:**
After receiving OTP from admin, complete registration with password.
**PROCESS:**
1. User previously called /auth/send-admin-otp with (name, email, phone)
2. Admin received OTP email with user details
3. Admin verified identity and shared OTP with user
4. User now submits: name, email, phone, password, OTP code
5. System verifies OTP and creates account
6. User is logged in immediately
**REQUEST BODY:**
```json
{
"email": "admin@example.com",
"first_name": "John",
"last_name": "Doe",
"phone": "+1234567890",
"password": "SecurePass123!",
"otp_code": "123456"
}
```
**PASSWORD SECURITY:**
- Send password as plain text over HTTPS (standard practice)
- Backend hashes password using Supabase Auth (bcrypt)
- Password must be 8+ chars with uppercase and digit
**Returns:** Access token for immediate login
"""
from app.services.otp_service import OTPService
try:
# Get OTP service
otp_service = OTPService.get_instance()
# Verify OTP
verification_result = otp_service.verify_otp(
email=registration_data.email,
phone=None,
code=registration_data.otp_code,
purpose="Platform Admin Registration",
db=db
)
if not verification_result['verified']:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"❌ Invalid or expired OTP. {verification_result.get('attempts_remaining', 0)} attempts remaining."
)
# Retrieve stored registration data from step 1
stored_data = await otp_service.get_registration_data(registration_data.email)
if not stored_data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="❌ Registration data not found or expired. Please start the registration process again via /auth/send-admin-otp."
)
# Verify registration type
if stored_data.get('registration_type') != 'platform_admin_otp':
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid registration type"
)
# Verify that submitted data matches stored data from step 1
stored_email = stored_data.get('email')
stored_name = stored_data.get('name')
submitted_name = f"{registration_data.first_name} {registration_data.last_name}"
if stored_email != registration_data.email or stored_name != submitted_name:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="❌ Registration data mismatch. Please ensure you're using the same details from step 1."
)
# Check if user already exists (safety check)
existing_user = db.query(User).filter(User.email == registration_data.email).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create Supabase auth user with password from current request
full_name = f"{registration_data.first_name} {registration_data.last_name}"
auth_response = await supabase_auth.sign_up(
email=registration_data.email,
password=registration_data.password, # Password provided in this step
user_metadata={
"first_name": registration_data.first_name,
"last_name": registration_data.last_name,
"phone": registration_data.phone,
"full_name": full_name
}
)
auth_user = auth_response["user"]
session = auth_response["session"]
# Create user record in our database
new_user = User(
id=auth_user.id,
email=registration_data.email,
name=full_name,
phone=registration_data.phone,
is_active=True,
role="platform_admin",
status="active" # Active after OTP verification
)
db.add(new_user)
db.commit()
db.refresh(new_user)
# Clean up stored registration data
await otp_service.delete_registration_data(registration_data.email)
# Audit log
AuditService.log_action(
db=db,
action='create',
entity_type='user',
entity_id=str(new_user.id),
description=f"Platform admin account created (OTP verified): {registration_data.email}",
user=new_user,
request=request,
additional_metadata={
'role': new_user.role,
'verification_method': 'otp_email'
}
)
logger.info(f"✅ Platform admin account created successfully: {registration_data.email}")
return {
"access_token": session.access_token,
"token_type": "bearer",
"user": {
"id": str(new_user.id),
"email": new_user.email,
"name": new_user.name,
"full_name": new_user.full_name,
"is_active": new_user.is_active,
"role": new_user.role
}
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Registration error: {str(e)}")
db.rollback()
AuditService.log_auth_event(
db=db,
action='register',
user_email=registration_data.email,
success=False,
request=request,
reason=f"Registration failed: {str(e)}"
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to complete registration: {str(e)}"
)
@router.post("/login", response_model=TokenResponse)
@limiter.limit("10/minute") # 10 login attempts per minute per IP
async def login(request: Request, response: Response, credentials: LoginRequest, db: Session = Depends(get_db)):
"""
Login with email and password via Supabase Auth
- **email**: User's email address
- **password**: User's password
Returns access token for authenticated requests
"""
try:
# Authenticate with Supabase Auth
auth_response = await supabase_auth.sign_in(
email=credentials.email,
password=credentials.password
)
auth_user = auth_response["user"]
access_token = auth_response["access_token"]
# PERFORMANCE: Query user with joinedload to get all data in ONE query
# Use .options() instead of separate queries
from sqlalchemy.orm import joinedload
user = db.query(User).filter(
User.id == auth_user.id,
User.deleted_at == None
).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User profile not found"
)
# Check if user is active
if not user.is_active:
# Audit failed login
AuditService.log_auth_event(
db=db,
action='login',
user_email=credentials.email,
success=False,
request=request,
reason='Account inactive'
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account is inactive. Please contact support."
)
# Audit successful login
AuditService.log_auth_event(
db=db,
action='login',
user_email=credentials.email,
success=True,
request=request,
user=user # Pass authenticated user for proper audit trail
)
logger.info(f"User logged in successfully: {credentials.email}")
# Get refresh token from auth response session object
session = auth_response.get("session")
refresh_token = session.refresh_token if session else None
expires_in = session.expires_in if session else 3600
return {
"access_token": access_token,
"refresh_token": refresh_token,
"expires_in": expires_in,
"token_type": "bearer",
"user": {
"id": str(user.id),
"email": user.email,
"first_name": user.first_name,
"last_name": user.last_name,
"full_name": user.full_name,
"role": user.role,
"is_active": user.is_active
}
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Login error: {str(e)}")
# Audit failed login
AuditService.log_auth_event(
db=db,
action='login',
user_email=credentials.email,
success=False,
request=request,
reason='Invalid credentials'
)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password"
)
@router.post("/login-full", response_model=dict)
@limiter.limit("10/minute")
async def login_full(
request: Request,
response: Response,
credentials: LoginRequest,
db: Session = Depends(get_db)
):
"""
🚀 OPTIMIZED LOGIN - Returns everything in ONE request
This endpoint combines:
- /auth/login
- /auth/me
- /auth/me/preferences
- /auth/me/preferences/available-apps
**Reduces 4 API calls to 1 = 75% faster login**
Use this instead of separate endpoints to reduce network latency.
Especially important for high-latency connections (cross-region, mobile, etc.)
"""
try:
# Step 1: Authenticate with Supabase
auth_response = await supabase_auth.sign_in(
email=credentials.email,
password=credentials.password
)
auth_user = auth_response["user"]
access_token = auth_response["access_token"]
session = auth_response.get("session")
# Step 2: Get user from database
user = db.query(User).filter(
User.id == auth_user.id,
User.deleted_at == None
).first()
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account is inactive or not found"
)
# Step 3: Get preferences
user_prefs = db.query(UserPreference).filter(
UserPreference.user_id == user.id,
UserPreference.deleted_at.is_(None)
).first()
preferences = {
"last_active_project_id": str(user_prefs.last_active_project_id) if user_prefs and user_prefs.last_active_project_id else None,
"dashboard_widgets": user_prefs.dashboard_widgets if user_prefs else DEFAULT_DASHBOARD_WIDGETS,
"favorite_apps": user_prefs.favorite_apps if user_prefs else get_default_favorites_for_role(user.role)
}
# Step 4: Get available apps
available_apps = get_available_apps_for_role(user.role)
# Step 4b: Get meta apps (apps available outside project context)
meta_apps = get_meta_apps_for_role(user.role)
# Step 5: Audit log (async, doesn't block response)
AuditService.log_auth_event(
db=db,
action='login',
user_email=credentials.email,
success=True,
request=request,
user=user
)
logger.info(f"User logged in successfully (full): {credentials.email}")
# Return everything in one response
return {
"access_token": access_token,
"refresh_token": session.refresh_token if session else None,
"expires_in": session.expires_in if session else 3600,
"token_type": "bearer",
"user": {
"id": str(user.id),
"email": user.email,
"first_name": user.first_name,
"last_name": user.last_name,
"full_name": user.full_name,
"role": user.role,
"is_active": user.is_active,
"client_id": str(user.client_id) if user.client_id else None,
"contractor_id": str(user.contractor_id) if user.contractor_id else None,
},
"preferences": preferences,
"available_apps": available_apps,
"meta_apps": meta_apps
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Login error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password"
)
@router.post("/refresh-token", response_model=TokenResponse)
async def refresh_access_token(
request_data: RefreshTokenRequest,
db: Session = Depends(get_db)
):
"""
Refresh access token using refresh token
**Purpose:** Get a new access token without re-logging in
**When to use:**
- Access token expired (401 error)
- Before token expires (recommended: 5 min before)
- On app startup to check session validity
**Request Body:**
```json
{
"refresh_token": "your-refresh-token-here"
}
```
**Response:** New access_token and refresh_token (rotated for security)
**Supabase Behavior:**
- Refresh tokens are automatically rotated on each use
- Old refresh token becomes invalid after rotation
- Refresh tokens last 30 days by default (Supabase Free tier)
**Error Handling:**
- 401: Invalid or expired refresh token → redirect to login
"""
try:
# Refresh session with Supabase
auth_response = await supabase_auth.refresh_session(request_data.refresh_token)
if not auth_response or not auth_response.get("session"):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token expired. Please log in again."
)
session = auth_response["session"]
auth_user = auth_response["user"]
# Get user from database to verify they still exist and are active
user = db.query(User).filter(
User.id == auth_user.id,
User.deleted_at == None
).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User account no longer exists"
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account is inactive. Contact support."
)
logger.info(f"✅ Token refreshed successfully for: {user.email}")
return {
"access_token": session.access_token,
"refresh_token": session.refresh_token, # New rotated token (Supabase rotates on each use)
"expires_in": session.expires_in,
"token_type": "bearer",
"user": {
"id": str(user.id),
"email": user.email,
"first_name": user.first_name,
"last_name": user.last_name,
"full_name": user.full_name,
"role": user.role,
"is_active": user.is_active
}
}
except HTTPException:
raise
except Exception as e:
error_msg = str(e).lower()
logger.error(f"❌ Token refresh error: {str(e)}")
# Parse Supabase error messages for better UX
if "expired" in error_msg or "invalid" in error_msg:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token expired. Please log in again."
)
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token refresh failed: {str(e)}"
)
@router.get("/me", response_model=UserProfile)
async def get_current_user_profile(
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get current user's profile with project context
Returns:
- User profile data
- primary_project: First assigned project (for single-project users)
- assigned_projects: All projects user is a member of
- last_active_project_id: User's last selected project (from user_preferences)
Requires authentication token in header:
Authorization: Bearer <token>
"""
from app.models.project import Project
from app.models.project_team import ProjectTeam
from app.models.user_preference import UserPreference
from app.schemas.user import ProjectContext
# Platform admins don't have project assignments
# For platform_admin role, return empty project context
if current_user.role == 'platform_admin':
# Get profile photo URL for platform admin
from app.models.user_document_link import UserDocumentLink
from app.models.document import Document
profile_photo_url = None
doc_link = db.query(UserDocumentLink).filter(
UserDocumentLink.user_id == current_user.id,
UserDocumentLink.document_link_type == 'profile_photo'
).first()
if doc_link:
document = db.query(Document).filter(
Document.id == doc_link.document_id,
Document.deleted_at.is_(None)
).first()
if document:
profile_photo_url = document.file_url
# Generate fresh signed URL for Supabase files
if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'):
from app.integrations.supabase import SupabaseStorageService
bucket = document.additional_metadata.get('bucket')
path = document.additional_metadata.get('path')
if bucket and path:
profile_photo_url = SupabaseStorageService.get_signed_url(bucket, path, 3600)
profile_data = UserProfile(
id=current_user.id,
email=current_user.email,
name=current_user.name,
phone=current_user.phone,
phone_alternate=current_user.phone_alternate,
role=current_user.role,
status=current_user.status,
is_active=current_user.is_active,
client_id=current_user.client_id,
contractor_id=current_user.contractor_id,
created_at=current_user.created_at,
updated_at=current_user.updated_at,
primary_project=None,
assigned_projects=[],
last_active_project_id=None,
profile_photo_url=profile_photo_url
)
return profile_data
# Get user preferences
user_prefs = db.query(UserPreference).filter(
UserPreference.user_id == current_user.id,
UserPreference.deleted_at.is_(None)
).first()
last_active_project_id = user_prefs.last_active_project_id if user_prefs else None
# Get all projects user is assigned to via project_team
project_memberships = db.query(ProjectTeam).filter(
ProjectTeam.user_id == current_user.id,
ProjectTeam.deleted_at.is_(None),
ProjectTeam.removed_at.is_(None)
).all()
# Get project details
project_ids = [pm.project_id for pm in project_memberships]
projects = db.query(Project).filter(
Project.id.in_(project_ids),
Project.deleted_at.is_(None)
).all() if project_ids else []
# Build project context list
assigned_projects = [
ProjectContext(id=p.id, title=p.title)
for p in projects
]
# Determine primary project
primary_project = None
if assigned_projects:
# Priority: last_active_project_id > primary_manager_id > first project
if last_active_project_id:
primary_project = next(
(p for p in assigned_projects if p.id == last_active_project_id),
assigned_projects[0]
)
else:
# Check if user is primary manager of any project
managed_project = next(
(p for p in projects if p.primary_manager_id == current_user.id),
None
)
if managed_project:
primary_project = ProjectContext(id=managed_project.id, title=managed_project.title)
else:
primary_project = assigned_projects[0]
# Get profile photo URL
from app.models.user_document_link import UserDocumentLink
from app.models.document import Document
profile_photo_url = None
doc_link = db.query(UserDocumentLink).filter(
UserDocumentLink.user_id == current_user.id,
UserDocumentLink.document_link_type == 'profile_photo'
).first()
if doc_link:
document = db.query(Document).filter(
Document.id == doc_link.document_id,
Document.deleted_at.is_(None)
).first()
if document:
profile_photo_url = document.file_url
# Generate fresh signed URL for Supabase files
if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'):
from app.integrations.supabase import SupabaseStorageService
bucket = document.additional_metadata.get('bucket')
path = document.additional_metadata.get('path')
if bucket and path:
profile_photo_url = SupabaseStorageService.get_signed_url(bucket, path, 3600)
# Build response with Pydantic model
profile_data = UserProfile(
id=current_user.id,
email=current_user.email,
name=current_user.name,
phone=current_user.phone,
phone_alternate=current_user.phone_alternate,
role=current_user.role,
status=current_user.status,
is_active=current_user.is_active,
client_id=current_user.client_id,
contractor_id=current_user.contractor_id,
created_at=current_user.created_at,
updated_at=current_user.updated_at,
primary_project=primary_project,
assigned_projects=assigned_projects,
last_active_project_id=last_active_project_id,
profile_photo_url=profile_photo_url
)
return profile_data
@router.put("/me", response_model=UserProfile)
async def update_profile(
profile_data: UserUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Update current user's profile
- **first_name**: Update first name
- **last_name**: Update last name
- **phone**: Update phone number
Requires authentication token
"""
# Track changes for audit
changes = {'old': {}, 'new': {}}
# Update name if first_name or last_name provided
if profile_data.first_name is not None or profile_data.last_name is not None:
# Get current name parts
current_first = current_user.first_name
current_last = current_user.last_name
# Use new values or keep current
new_first = profile_data.first_name if profile_data.first_name is not None else current_first
new_last = profile_data.last_name if profile_data.last_name is not None else current_last
# Update full name
old_name = current_user.name
current_user.name = f"{new_first} {new_last}".strip()
changes['old']['name'] = old_name
changes['new']['name'] = current_user.name
if profile_data.phone is not None:
# Check if phone is already taken by another user
existing_phone = db.query(User).filter(
User.phone == profile_data.phone,
User.id != current_user.id
).first()
if existing_phone:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Phone number already in use"
)
changes['old']['phone'] = current_user.phone
current_user.phone = profile_data.phone
changes['new']['phone'] = profile_data.phone
db.commit()
db.refresh(current_user)
# Audit profile update
if changes['old']: # Only log if there were actual changes
AuditService.log_action(
db=db,
action='update',
entity_type='user',
entity_id=str(current_user.id),
description=f"User updated profile: {current_user.email}",
user=current_user,
request=request,
changes=changes
)
return current_user
@router.post("/change-password", response_model=MessageResponse, status_code=status.HTTP_200_OK)
@limiter.limit("5/hour") # 5 password changes per hour per user
async def change_password(
request: Request,
response: Response,
password_data: PasswordChange,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Change user's password via Supabase Auth
- **current_password**: Current password for verification
- **new_password**: New password (min 8 chars, 1 digit, 1 uppercase)
Requires authentication token
Note: This endpoint requires re-authentication with current password
"""
try:
# Verify current password by attempting to sign in
await supabase_auth.sign_in(
email=current_user.email,
password=password_data.current_password
)
# Password is correct, now update to new password using Supabase Admin API
supabase_admin.auth.admin.update_user_by_id(
str(current_user.id),
{"password": password_data.new_password}
)
# Audit password change
AuditService.log_action(
db=db,
action='update', # Using 'update' as password_change is not in AuditAction enum
entity_type='user',
entity_id=str(current_user.id),
description=f"User changed password: {current_user.email}",
user=current_user,
request=request
)
logger.info(f"Password changed for user: {current_user.email}")
return {"message": "Password changed successfully. Please login again with your new password."}
except Exception as e:
logger.error(f"Password change error: {str(e)}")
# Audit failed password change
AuditService.log_action(
db=db,
action='update', # Using 'update' as password_change_failed is not in AuditAction enum
entity_type='user',
entity_id=str(current_user.id),
description=f"Failed password change attempt: {current_user.email}",
user=current_user,
request=request,
additional_metadata={'reason': str(e), 'status': 'failed'}
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Current password is incorrect or password change failed"
)
@router.post("/forgot-password", response_model=MessageResponse, status_code=status.HTTP_200_OK)
@limiter.limit("3/hour") # 3 password reset requests per hour per IP
async def forgot_password(
request: Request,
response: Response,
request_data: ForgotPasswordRequest,
db: Session = Depends(get_db)
):
"""
Request password reset email
- **email**: User's email address
Sends password reset link to email if account exists.
Always returns success to prevent email enumeration attacks.
"""
password_reset_service = PasswordResetService()
result = await password_reset_service.request_password_reset(
email=request_data.email,
db=db
)
# Audit password reset request
AuditService.log_action(
db=db,
action='create', # Valid AuditAction enum value (creating a reset request)
entity_type='auth',
description=f"Password reset requested for: {request_data.email}",
request=request,
additional_metadata={'email': request_data.email}
)
return MessageResponse(message=result['message'])
@router.post("/reset-password", response_model=MessageResponse, status_code=status.HTTP_200_OK)
@limiter.limit("5/hour") # 5 password reset attempts per hour per IP
async def reset_password(
request: Request,
response: Response,
reset_data: ResetPasswordRequest,
db: Session = Depends(get_db)
):
"""
Reset password using token from email
- **token**: Password reset token from email
- **new_password**: New password (min 8 chars, 1 digit, 1 uppercase)
Resets password if token is valid and not expired.
"""
password_reset_service = PasswordResetService()
try:
result = await password_reset_service.reset_password(
token=reset_data.token,
new_password=reset_data.new_password,
db=db
)
# Audit successful password reset
AuditService.log_action(
db=db,
action='update', # Valid AuditAction enum value (updating password)
entity_type='auth',
description=f"Password reset completed using token",
request=request,
additional_metadata={'token_prefix': reset_data.token[:8]}
)
return MessageResponse(message=result['message'])
except HTTPException as e:
# Audit failed password reset
AuditService.log_action(
db=db,
action='update', # Valid AuditAction enum value (failed update attempt)
entity_type='auth',
description=f"Password reset failed: {e.detail}",
request=request,
additional_metadata={
'token_prefix': reset_data.token[:8],
'reason': e.detail
}
)
raise
@router.post("/logout", response_model=MessageResponse, status_code=status.HTTP_200_OK)
async def logout(
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Logout current user
Requires authentication token.
Note: With JWT tokens, actual logout is handled client-side by removing the token.
This endpoint is for audit logging purposes.
"""
# Audit logout
AuditService.log_auth_event(
db=db,
action='logout',
user_email=current_user.email,
success=True,
request=request
)
logger.info(f"User logged out: {current_user.email}")
return MessageResponse(message="Logged out successfully")
# ============================================
# USER PREFERENCES ENDPOINTS
# ============================================
@router.get("/me/preferences", response_model=UserPreferencesResponse)
async def get_my_preferences(
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get current user's preferences from user_preferences table
Returns user preferences with role-based defaults if preferences don't exist yet.
Automatically creates preferences record if it doesn't exist (via database trigger).
"""
# Get or create preferences
preferences = db.query(UserPreference).filter(
UserPreference.user_id == current_user.id,
UserPreference.deleted_at == None
).first()
# If no preferences exist, create with role-based defaults
if not preferences:
default_favorites = get_default_favorites_for_role(current_user.role)
preferences = UserPreference(
user_id=current_user.id,
favorite_apps=default_favorites,
dashboard_widgets=DEFAULT_DASHBOARD_WIDGETS.get(current_user.role, ['recent_tickets', 'team_performance', 'sla_metrics']),
theme='light',
language='en'
)
db.add(preferences)
db.commit()
db.refresh(preferences)
logger.info(f"✅ Created default preferences for user: {current_user.email} (role: {current_user.role})")
return UserPreferencesResponse.from_orm(preferences)
@router.put("/me/preferences", response_model=UserPreferencesResponse)
async def update_my_preferences(
preferences_data: UserPreferencesUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Update current user's preferences in user_preferences table
Updates user preferences including favorite apps, theme, language, and notification settings.
Favorite apps are validated against role-specific available apps (max 6).
"""
# Get or create preferences
preferences = db.query(UserPreference).filter(
UserPreference.user_id == current_user.id,
UserPreference.deleted_at == None
).first()
if not preferences:
# Create new preferences record with role-based defaults
default_favorites = get_default_favorites_for_role(current_user.role)
preferences = UserPreference(
user_id=current_user.id,
favorite_apps=default_favorites,
dashboard_widgets=DEFAULT_DASHBOARD_WIDGETS.get(current_user.role, [])
)
db.add(preferences)
db.flush()
# Track changes for audit
changes = {'old': {}, 'new': {}}
# Update only provided fields
update_data = preferences_data.dict(exclude_unset=True)
# Validate favorite apps if provided
if 'favorite_apps' in update_data and update_data['favorite_apps'] is not None:
# Check max limit (6 apps)
if len(update_data['favorite_apps']) > 6:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Maximum 6 favorite apps allowed"
)
# Validate against role-specific available apps
is_valid, invalid_apps = validate_apps_for_role(update_data['favorite_apps'], current_user.role)
if not is_valid:
available_apps = get_available_app_codes_for_role(current_user.role)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid apps for {current_user.role}: {', '.join(invalid_apps)}. "
f"Available apps: {', '.join(available_apps)}"
)
changes['old']['favorite_apps'] = preferences.favorite_apps
changes['new']['favorite_apps'] = update_data['favorite_apps']
preferences.favorite_apps = update_data['favorite_apps']
# Validate last_active_project_id if provided
if 'last_active_project_id' in update_data:
from app.models.project import Project
from app.models.project_team import ProjectTeam
new_project_id = update_data['last_active_project_id']
# Platform admins don't have project context
if current_user.role == 'platform_admin' and new_project_id is not None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Platform admins cannot set active project (no project assignments)"
)
if new_project_id is not None:
# Verify user is actually assigned to this project
is_member = db.query(ProjectTeam).filter(
ProjectTeam.user_id == current_user.id,
ProjectTeam.project_id == new_project_id,
ProjectTeam.deleted_at.is_(None),
ProjectTeam.removed_at.is_(None)
).first()
if not is_member:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not assigned to this project"
)
# Verify project exists and is active
project = db.query(Project).filter(
Project.id == new_project_id,
Project.deleted_at.is_(None)
).first()
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
changes['old']['last_active_project_id'] = str(preferences.last_active_project_id) if preferences.last_active_project_id else None
changes['new']['last_active_project_id'] = str(new_project_id) if new_project_id else None
preferences.last_active_project_id = new_project_id
# Update other fields
for field, value in update_data.items():
if field not in ['favorite_apps', 'last_active_project_id'] and value is not None:
old_value = getattr(preferences, field, None)
if old_value != value:
changes['old'][field] = old_value
changes['new'][field] = value
setattr(preferences, field, value)
db.commit()
db.refresh(preferences)
# Audit log
if changes['old']:
AuditService.log_action(
db=db,
action='update',
entity_type='user_preferences',
entity_id=str(preferences.id),
description=f"User updated preferences: {current_user.email}",
user=current_user,
request=request,
changes=changes
)
logger.info(f"Preferences updated for user: {current_user.email}")
return UserPreferencesResponse.from_orm(preferences)
@router.get("/me/preferences/available-apps", response_model=dict)
async def get_available_apps_for_user(
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get list of apps available for user to favorite based on their role
**Returns:**
- Current favorite app codes
- All available apps with full metadata (name, icon, route, etc.)
- Default favorites for the role
- Maximum favorites allowed (6)
**Use Cases:**
- Populate app picker in settings UI
- Show which apps can be added/removed from favorites
- Display role-appropriate app options with icons and descriptions
"""
# Get current preferences
preferences = db.query(UserPreference).filter(
UserPreference.user_id == current_user.id,
UserPreference.deleted_at == None
).first()
# Get current favorites or defaults
current_favorites = preferences.favorite_apps if preferences else get_default_favorites_for_role(current_user.role)
# Get available apps with full metadata
available_app_objects = get_available_apps_for_role(current_user.role)
available_apps_detail = [app.to_dict() for app in available_app_objects]
# Get default favorites
default_favorites = get_default_favorites_for_role(current_user.role)
# Get meta apps (apps available outside project context)
meta_apps = get_meta_apps_for_role(current_user.role)
return {
"role": current_user.role,
"current_favorites": current_favorites,
"available_apps": available_apps_detail, # Full app metadata
"default_favorites": default_favorites,
"meta_apps": meta_apps, # Apps available outside project context
"max_favorites": 6
}
@router.get("/apps", response_model=dict)
async def get_all_apps_for_user(
current_user: User = Depends(get_current_active_user)
):
"""
Get ALL apps in the system with role-based access information
**Returns:**
- All apps with full metadata (name, description, icon, route, category)
- User's role and which apps they can access
- Categorized app groupings (Core, Operations, Sales, Team, Finance, Settings)
**Use Cases:**
- Render main navigation menu with all accessible apps
- Show app launcher/drawer with categories
- Display app directory or marketplace
- Generate sitemap for user's accessible routes
**Frontend Integration:**
- Filter apps by `has_access: true` to show only accessible apps
- Group apps by `category` for organized navigation
- Use `icon` and `route` to render navigation items
- Show disabled state for apps where `has_access: false`
"""
from app.config.apps import get_all_apps, AppCategory
# Get all apps in the system
all_apps = get_all_apps()
# Get apps user has access to
accessible_app_codes = set(get_available_app_codes_for_role(current_user.role))
# Build response with access information
apps_with_access = []
for app in all_apps:
app_dict = app.to_dict()
app_dict['has_access'] = app.code in accessible_app_codes
apps_with_access.append(app_dict)
# Group by category
apps_by_category = {}
for category in AppCategory:
apps_by_category[category.value] = [
app for app in apps_with_access
if app['category'] == category.value
]
return {
"user_role": current_user.role,
"total_apps": len(all_apps),
"accessible_apps": len(accessible_app_codes),
"apps": apps_with_access,
"apps_by_category": apps_by_category,
"categories": [cat.value for cat in AppCategory]
}