Spaces:
Sleeping
Sleeping
| """ | |
| Password Reset Service | |
| """ | |
| import os | |
| import logging | |
| from datetime import datetime, timezone, timedelta | |
| from typing import Optional, Dict, Any | |
| from sqlalchemy.orm import Session | |
| from fastapi import HTTPException, status | |
| from app.models.user import User | |
| from app.models.invitation import UserInvitation | |
| from app.services.token_service import TokenService | |
| from app.services.notification_service import NotificationService | |
| from app.core.supabase_auth import supabase_auth | |
| logger = logging.getLogger(__name__) | |
| class PasswordResetService: | |
| """Service for password reset functionality""" | |
| def __init__(self): | |
| self.token_service = TokenService() | |
| self.notification_service = NotificationService() | |
| self.reset_token_expiry_hours = int(os.getenv('PASSWORD_RESET_TOKEN_EXPIRY_HOURS', '1')) | |
| async def request_password_reset( | |
| self, | |
| email: str, | |
| db: Session | |
| ) -> Dict[str, Any]: | |
| """ | |
| Generate password reset token and send email | |
| Args: | |
| email: User's email address | |
| db: Database session | |
| Returns: | |
| Dict with success status | |
| Note: Always returns success to prevent email enumeration attacks | |
| """ | |
| try: | |
| # Check if user exists | |
| user = db.query(User).filter( | |
| User.email == email, | |
| User.deleted_at == None | |
| ).first() | |
| if not user: | |
| # Don't reveal that user doesn't exist (security) | |
| logger.warning(f"Password reset requested for non-existent email: {email}") | |
| return { | |
| 'success': True, | |
| 'message': 'If an account exists with this email, you will receive a password reset link.' | |
| } | |
| # Check if user is active | |
| if not user.is_active: | |
| logger.warning(f"Password reset requested for inactive user: {email}") | |
| return { | |
| 'success': True, | |
| 'message': 'If an account exists with this email, you will receive a password reset link.' | |
| } | |
| # Generate secure token | |
| token = self.token_service.generate_token() | |
| # Calculate expiry (1 hour for security) | |
| expires_at = datetime.now(timezone.utc) + timedelta(hours=self.reset_token_expiry_hours) | |
| # Store token in user_invitations table (reuse infrastructure) | |
| # Use status='pending' and invitation_method='email' to distinguish from invitations | |
| reset_record = UserInvitation( | |
| email=email, | |
| phone=user.phone, | |
| invited_role=user.role, | |
| client_id=user.client_id, | |
| contractor_id=user.contractor_id, | |
| token=token, | |
| status='pending', | |
| invitation_method='email', | |
| invited_by_user_id=user.id, # Self-initiated | |
| invited_at=datetime.now(timezone.utc), | |
| expires_at=expires_at, | |
| invitation_metadata={'type': 'password_reset', 'user_id': str(user.id)} | |
| ) | |
| db.add(reset_record) | |
| db.commit() | |
| # Send password reset email | |
| await self._send_reset_email( | |
| email=email, | |
| name=user.first_name or email.split('@')[0].title(), | |
| token=token | |
| ) | |
| logger.info(f"Password reset requested for: {email}") | |
| return { | |
| 'success': True, | |
| 'message': 'If an account exists with this email, you will receive a password reset link.' | |
| } | |
| except Exception as e: | |
| logger.error(f"Password reset request error: {str(e)}") | |
| # Don't reveal errors to user (security) | |
| return { | |
| 'success': True, | |
| 'message': 'If an account exists with this email, you will receive a password reset link.' | |
| } | |
| async def reset_password( | |
| self, | |
| token: str, | |
| new_password: str, | |
| db: Session | |
| ) -> Dict[str, Any]: | |
| """ | |
| Reset password using token | |
| Args: | |
| token: Password reset token | |
| new_password: New password | |
| db: Database session | |
| Returns: | |
| Dict with success status | |
| Raises: | |
| HTTPException: If token invalid or reset fails | |
| """ | |
| # Validate token | |
| reset_record = db.query(UserInvitation).filter( | |
| UserInvitation.token == token, | |
| UserInvitation.status == 'pending' | |
| ).first() | |
| if not reset_record: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid or expired reset token" | |
| ) | |
| # Check if expired | |
| if reset_record.expires_at < datetime.now(timezone.utc): | |
| reset_record.status = 'expired' | |
| db.commit() | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Reset token has expired. Please request a new one." | |
| ) | |
| # Verify this is a password reset token | |
| if reset_record.invitation_metadata.get('type') != 'password_reset': | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid reset token" | |
| ) | |
| # Get user | |
| user = db.query(User).filter( | |
| User.email == reset_record.email, | |
| User.deleted_at == None | |
| ).first() | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| try: | |
| # Update password in Supabase Auth | |
| # Note: This requires admin privileges | |
| # Supabase provides password reset via their API | |
| await supabase_auth.update_user_password( | |
| user_id=str(user.id), | |
| new_password=new_password | |
| ) | |
| # Mark token as used | |
| reset_record.status = 'accepted' | |
| reset_record.accepted_at = datetime.now(timezone.utc) | |
| db.commit() | |
| logger.info(f"Password reset successful for: {user.email}") | |
| return { | |
| 'success': True, | |
| 'message': 'Password reset successful. You can now login with your new password.' | |
| } | |
| except Exception as e: | |
| logger.error(f"Password reset error: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to reset password. Please try again." | |
| ) | |
| async def _send_reset_email( | |
| self, | |
| email: str, | |
| name: str, | |
| token: str | |
| ) -> None: | |
| """Send password reset email""" | |
| reset_url = self.notification_service.construct_url(f'reset-password?token={token}') | |
| try: | |
| # Send email using Resend | |
| result = await self.notification_service.send_email( | |
| to_email=email, | |
| subject='Reset Your SwiftOps Password', | |
| template_name='password_reset', | |
| template_data={ | |
| 'name': name, | |
| 'reset_url': reset_url, | |
| 'expiry_hours': self.reset_token_expiry_hours | |
| } | |
| ) | |
| if not result['success']: | |
| logger.error(f"Failed to send reset email: {result.get('error')}") | |
| except Exception as e: | |
| logger.error(f"Error sending reset email: {str(e)}") | |