chatbot / app /services /auth_service.py
Tahasaif3's picture
'changes'
efb660b
"""
Authentication service using a more robust approach than bcrypt
"""
import hashlib
import secrets
import base64
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from app.core.config import settings
from app.services.firebase_service import firestore_service
class AuthService:
def __init__(self):
self.algorithm = "sha256"
self.iterations = 100000
def _hash_password(self, password: str, salt: Optional[bytes] = None) -> tuple[str, str]:
"""
Hash a password using SHA-256 with salt
Args:
password: Plain text password
salt: Optional salt (generated if not provided)
Returns:
Tuple of (hashed_password_base64, salt_base64)
"""
if salt is None:
salt = secrets.token_bytes(32)
# Encode password to bytes
password_bytes = password.encode('utf-8')
# Hash the password with salt
hashed = hashlib.pbkdf2_hmac(
self.algorithm,
password_bytes,
salt,
self.iterations
)
# Convert to base64 for storage
hashed_b64 = base64.b64encode(hashed).decode('utf-8')
salt_b64 = base64.b64encode(salt).decode('utf-8')
return hashed_b64, salt_b64
def _verify_password(self, password: str, hashed_password_b64: str, salt_b64: str) -> bool:
"""
Verify a password against a hashed password
Args:
password: Plain text password
hashed_password_b64: Base64 encoded hashed password
salt_b64: Base64 encoded salt
Returns:
Boolean indicating if password matches
"""
try:
# Decode salt and hashed password
salt = base64.b64decode(salt_b64.encode('utf-8'))
hashed_password = base64.b64decode(hashed_password_b64.encode('utf-8'))
# Hash the provided password with the same salt
password_bytes = password.encode('utf-8')
test_hash = hashlib.pbkdf2_hmac(
self.algorithm,
password_bytes,
salt,
self.iterations
)
# Compare hashes securely
return secrets.compare_digest(test_hash, hashed_password)
except Exception as e:
print(f"Error verifying password: {e}")
return False
def register_user(self, email: str, password: str, name: str, preferences: Dict[str, Any]) -> Optional[str]:
"""
Register a new user
Args:
email: User's email
password: User's password
name: User's name
preferences: User preferences
Returns:
User ID if successful, None otherwise
"""
if not firestore_service:
print("Firestore service not available")
return None
try:
# Check if user already exists
existing_user = firestore_service.get_user_by_email(email)
if existing_user:
print("User already exists")
return None
# Hash the password
hashed_password, salt = self._hash_password(password)
# Prepare user data
user_data = {
"email": email,
"name": name,
"hashed_password": hashed_password,
"salt": salt,
"preferences": preferences,
"created_at": datetime.utcnow()
}
# Create user in Firestore
user_id = firestore_service.create_user(user_data)
return user_id
except Exception as e:
print(f"Error registering user: {e}")
return None
def authenticate_user(self, email: str, password: str) -> Optional[Dict[str, Any]]:
"""
Authenticate a user
Args:
email: User's email
password: User's password
Returns:
User data if authentication successful, None otherwise
"""
if not firestore_service:
print("Firestore service not available")
return None
try:
# Get user by email
user = firestore_service.get_user_by_email(email)
if not user:
print("User not found")
return None
# Verify password
hashed_password = user.get("hashed_password")
salt = user.get("salt")
if not hashed_password or not salt:
print("User password data missing")
return None
if self._verify_password(password, hashed_password, salt):
return user
else:
print("Password verification failed")
return None
except Exception as e:
print(f"Error authenticating user: {e}")
return None
def create_access_token(self, email: str) -> str:
"""
Create a simple access token (JWT-like but simplified)
Args:
email: User's email
Returns:
Access token string
"""
try:
# Create a simple token using email and timestamp
payload = f"{email}:{datetime.utcnow().timestamp()}"
token_bytes = payload.encode('utf-8')
# Hash the payload to create token
token_hash = hashlib.sha256(token_bytes).hexdigest()
# Combine payload and hash
token = f"{base64.b64encode(payload.encode('utf-8')).decode('utf-8')}.{token_hash}"
return token
except Exception as e:
print(f"Error creating access token: {e}")
# Fallback to a simple token
return base64.b64encode(f"{email}:{secrets.token_hex(16)}".encode('utf-8')).decode('utf-8')
def verify_access_token(self, token: str) -> Optional[str]:
"""
Verify an access token and return the user email
Args:
token: Access token
Returns:
User email if token is valid, None otherwise
"""
try:
if '.' in token:
# New token format
parts = token.split('.')
if len(parts) != 2:
return None
payload_b64, token_hash = parts
payload = base64.b64decode(payload_b64.encode('utf-8')).decode('utf-8')
# Verify hash
expected_hash = hashlib.sha256(payload.encode('utf-8')).hexdigest()
if secrets.compare_digest(token_hash, expected_hash):
email, timestamp = payload.split(':', 1)
return email
else:
# Old token format (fallback)
decoded = base64.b64decode(token.encode('utf-8')).decode('utf-8')
email, _ = decoded.split(':', 1)
return email
except Exception as e:
print(f"Error verifying access token: {e}")
return None
# Initialize the auth service
auth_service = AuthService()