| """ |
| Authentication service using a more robust approach than bcrypt |
| """ |
|
|
| import hashlib |
| import secrets |
| import base64 |
| from datetime import datetime, timedelta |
| from typing import Optional, Dict, Any |
| from app.core.config import settings |
| from app.services.firebase_service import firestore_service |
|
|
| class AuthService: |
| def __init__(self): |
| self.algorithm = "sha256" |
| self.iterations = 100000 |
| |
| def _hash_password(self, password: str, salt: Optional[bytes] = None) -> tuple[str, str]: |
| """ |
| Hash a password using SHA-256 with salt |
| |
| Args: |
| password: Plain text password |
| salt: Optional salt (generated if not provided) |
| |
| Returns: |
| Tuple of (hashed_password_base64, salt_base64) |
| """ |
| if salt is None: |
| salt = secrets.token_bytes(32) |
| |
| |
| password_bytes = password.encode('utf-8') |
| |
| |
| hashed = hashlib.pbkdf2_hmac( |
| self.algorithm, |
| password_bytes, |
| salt, |
| self.iterations |
| ) |
| |
| |
| hashed_b64 = base64.b64encode(hashed).decode('utf-8') |
| salt_b64 = base64.b64encode(salt).decode('utf-8') |
| |
| return hashed_b64, salt_b64 |
| |
| def _verify_password(self, password: str, hashed_password_b64: str, salt_b64: str) -> bool: |
| """ |
| Verify a password against a hashed password |
| |
| Args: |
| password: Plain text password |
| hashed_password_b64: Base64 encoded hashed password |
| salt_b64: Base64 encoded salt |
| |
| Returns: |
| Boolean indicating if password matches |
| """ |
| try: |
| |
| salt = base64.b64decode(salt_b64.encode('utf-8')) |
| hashed_password = base64.b64decode(hashed_password_b64.encode('utf-8')) |
| |
| |
| password_bytes = password.encode('utf-8') |
| test_hash = hashlib.pbkdf2_hmac( |
| self.algorithm, |
| password_bytes, |
| salt, |
| self.iterations |
| ) |
| |
| |
| return secrets.compare_digest(test_hash, hashed_password) |
| except Exception as e: |
| print(f"Error verifying password: {e}") |
| return False |
| |
| def register_user(self, email: str, password: str, name: str, preferences: Dict[str, Any]) -> Optional[str]: |
| """ |
| Register a new user |
| |
| Args: |
| email: User's email |
| password: User's password |
| name: User's name |
| preferences: User preferences |
| |
| Returns: |
| User ID if successful, None otherwise |
| """ |
| if not firestore_service: |
| print("Firestore service not available") |
| return None |
| |
| try: |
| |
| existing_user = firestore_service.get_user_by_email(email) |
| if existing_user: |
| print("User already exists") |
| return None |
| |
| |
| hashed_password, salt = self._hash_password(password) |
| |
| |
| user_data = { |
| "email": email, |
| "name": name, |
| "hashed_password": hashed_password, |
| "salt": salt, |
| "preferences": preferences, |
| "created_at": datetime.utcnow() |
| } |
| |
| |
| user_id = firestore_service.create_user(user_data) |
| return user_id |
| |
| except Exception as e: |
| print(f"Error registering user: {e}") |
| return None |
| |
| def authenticate_user(self, email: str, password: str) -> Optional[Dict[str, Any]]: |
| """ |
| Authenticate a user |
| |
| Args: |
| email: User's email |
| password: User's password |
| |
| Returns: |
| User data if authentication successful, None otherwise |
| """ |
| if not firestore_service: |
| print("Firestore service not available") |
| return None |
| |
| try: |
| |
| user = firestore_service.get_user_by_email(email) |
| if not user: |
| print("User not found") |
| return None |
| |
| |
| hashed_password = user.get("hashed_password") |
| salt = user.get("salt") |
| |
| if not hashed_password or not salt: |
| print("User password data missing") |
| return None |
| |
| if self._verify_password(password, hashed_password, salt): |
| return user |
| else: |
| print("Password verification failed") |
| return None |
| |
| except Exception as e: |
| print(f"Error authenticating user: {e}") |
| return None |
| |
| def create_access_token(self, email: str) -> str: |
| """ |
| Create a simple access token (JWT-like but simplified) |
| |
| Args: |
| email: User's email |
| |
| Returns: |
| Access token string |
| """ |
| try: |
| |
| payload = f"{email}:{datetime.utcnow().timestamp()}" |
| token_bytes = payload.encode('utf-8') |
| |
| |
| token_hash = hashlib.sha256(token_bytes).hexdigest() |
| |
| |
| token = f"{base64.b64encode(payload.encode('utf-8')).decode('utf-8')}.{token_hash}" |
| return token |
| |
| except Exception as e: |
| print(f"Error creating access token: {e}") |
| |
| return base64.b64encode(f"{email}:{secrets.token_hex(16)}".encode('utf-8')).decode('utf-8') |
| |
| def verify_access_token(self, token: str) -> Optional[str]: |
| """ |
| Verify an access token and return the user email |
| |
| Args: |
| token: Access token |
| |
| Returns: |
| User email if token is valid, None otherwise |
| """ |
| try: |
| if '.' in token: |
| |
| parts = token.split('.') |
| if len(parts) != 2: |
| return None |
| |
| payload_b64, token_hash = parts |
| payload = base64.b64decode(payload_b64.encode('utf-8')).decode('utf-8') |
| |
| |
| expected_hash = hashlib.sha256(payload.encode('utf-8')).hexdigest() |
| if secrets.compare_digest(token_hash, expected_hash): |
| email, timestamp = payload.split(':', 1) |
| return email |
| else: |
| |
| decoded = base64.b64decode(token.encode('utf-8')).decode('utf-8') |
| email, _ = decoded.split(':', 1) |
| return email |
| |
| except Exception as e: |
| print(f"Error verifying access token: {e}") |
| return None |
|
|
| |
| auth_service = AuthService() |