swiftops-backend / src /app /services /password_reset_service.py
kamau1's picture
feat: manage user profiles with permissions
dacae32
"""
Password Reset Service
"""
import os
import logging
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, Any
from sqlalchemy.orm import Session
from fastapi import HTTPException, status
from app.models.user import User
from app.models.invitation import UserInvitation
from app.services.token_service import TokenService
from app.services.notification_service import NotificationService
from app.core.supabase_auth import supabase_auth
logger = logging.getLogger(__name__)
class PasswordResetService:
"""Service for password reset functionality"""
def __init__(self):
self.token_service = TokenService()
self.notification_service = NotificationService()
self.reset_token_expiry_hours = int(os.getenv('PASSWORD_RESET_TOKEN_EXPIRY_HOURS', '1'))
async def request_password_reset(
self,
email: str,
db: Session
) -> Dict[str, Any]:
"""
Generate password reset token and send email
Args:
email: User's email address
db: Database session
Returns:
Dict with success status
Note: Always returns success to prevent email enumeration attacks
"""
try:
# Check if user exists
user = db.query(User).filter(
User.email == email,
User.deleted_at == None
).first()
if not user:
# Don't reveal that user doesn't exist (security)
logger.warning(f"Password reset requested for non-existent email: {email}")
return {
'success': True,
'message': 'If an account exists with this email, you will receive a password reset link.'
}
# Check if user is active
if not user.is_active:
logger.warning(f"Password reset requested for inactive user: {email}")
return {
'success': True,
'message': 'If an account exists with this email, you will receive a password reset link.'
}
# Generate secure token
token = self.token_service.generate_token()
# Calculate expiry (1 hour for security)
expires_at = datetime.now(timezone.utc) + timedelta(hours=self.reset_token_expiry_hours)
# Store token in user_invitations table (reuse infrastructure)
# Use status='pending' and invitation_method='email' to distinguish from invitations
reset_record = UserInvitation(
email=email,
phone=user.phone,
invited_role=user.role,
client_id=user.client_id,
contractor_id=user.contractor_id,
token=token,
status='pending',
invitation_method='email',
invited_by_user_id=user.id, # Self-initiated
invited_at=datetime.now(timezone.utc),
expires_at=expires_at,
invitation_metadata={'type': 'password_reset', 'user_id': str(user.id)}
)
db.add(reset_record)
db.commit()
# Send password reset email
await self._send_reset_email(
email=email,
name=user.first_name or email.split('@')[0].title(),
token=token
)
logger.info(f"Password reset requested for: {email}")
return {
'success': True,
'message': 'If an account exists with this email, you will receive a password reset link.'
}
except Exception as e:
logger.error(f"Password reset request error: {str(e)}")
# Don't reveal errors to user (security)
return {
'success': True,
'message': 'If an account exists with this email, you will receive a password reset link.'
}
async def reset_password(
self,
token: str,
new_password: str,
db: Session
) -> Dict[str, Any]:
"""
Reset password using token
Args:
token: Password reset token
new_password: New password
db: Database session
Returns:
Dict with success status
Raises:
HTTPException: If token invalid or reset fails
"""
# Validate token
reset_record = db.query(UserInvitation).filter(
UserInvitation.token == token,
UserInvitation.status == 'pending'
).first()
if not reset_record:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired reset token"
)
# Check if expired
if reset_record.expires_at < datetime.now(timezone.utc):
reset_record.status = 'expired'
db.commit()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Reset token has expired. Please request a new one."
)
# Verify this is a password reset token
if reset_record.invitation_metadata.get('type') != 'password_reset':
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid reset token"
)
# Get user
user = db.query(User).filter(
User.email == reset_record.email,
User.deleted_at == None
).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
try:
# Update password in Supabase Auth
# Note: This requires admin privileges
# Supabase provides password reset via their API
await supabase_auth.update_user_password(
user_id=str(user.id),
new_password=new_password
)
# Mark token as used
reset_record.status = 'accepted'
reset_record.accepted_at = datetime.now(timezone.utc)
db.commit()
logger.info(f"Password reset successful for: {user.email}")
return {
'success': True,
'message': 'Password reset successful. You can now login with your new password.'
}
except Exception as e:
logger.error(f"Password reset error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to reset password. Please try again."
)
async def _send_reset_email(
self,
email: str,
name: str,
token: str
) -> None:
"""Send password reset email"""
reset_url = self.notification_service.construct_url(f'reset-password?token={token}')
try:
# Send email using Resend
result = await self.notification_service.send_email(
to_email=email,
subject='Reset Your SwiftOps Password',
template_name='password_reset',
template_data={
'name': name,
'reset_url': reset_url,
'expiry_hours': self.reset_token_expiry_hours
}
)
if not result['success']:
logger.error(f"Failed to send reset email: {result.get('error')}")
except Exception as e:
logger.error(f"Error sending reset email: {str(e)}")