from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt import hashlib from fastapi import HTTPException, status, Depends from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session import os from database.models import User from database.session import get_db # Security configuration SECRET_KEY = os.getenv("SECRET_KEY", "auranexus-secret-key-change-in-production") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 # For demo purposes, using a simple hash function # In production, use proper password hashing like bcrypt security = HTTPBearer() def verify_password(plain_password: str, hashed_password: str) -> bool: # Simple hash comparison for demo purposes return get_password_hash(plain_password) == hashed_password def get_password_hash(password: str) -> str: # Simple hash for demo purposes return hashlib.sha256(password.encode()).hexdigest() def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ) -> User: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM]) user_id: str = payload.get("sub") if user_id is None: raise credentials_exception except JWTError: raise credentials_exception user = db.query(User).filter(User.id == user_id).first() if user is None: raise credentials_exception return user