File size: 6,091 Bytes
5374a2d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
"""
Security components for authentication and authorization.
"""
import jwt
from datetime import datetime, timedelta
from typing import Optional, Dict, Any # , List
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel, ValidationError
from pymongo.errors import DuplicateKeyError
from bson import ObjectId
from evoagentx.app.config import settings
from evoagentx.app.db import Database
from evoagentx.app.schemas import TokenPayload, UserCreate, UserResponse
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 scheme for token authentication
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_PREFIX}/auth/login")
# User model for database
class UserInDB(BaseModel):
_id: Optional[ObjectId] = None
email: str
hashed_password: str
full_name: Optional[str] = None
is_active: bool = True
is_admin: bool = False
created_at: datetime = datetime.utcnow()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against a hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password for storing."""
return pwd_context.hash(password)
async def get_user_by_email(email: str) -> Optional[Dict[str, Any]]:
"""Get a user by email."""
return await Database.db.users.find_one({"email": email})
async def authenticate_user(email: str, password: str) -> Optional[Dict[str, Any]]:
"""Authenticate a user by email and password."""
user = await get_user_by_email(email)
if not user:
return None
if not verify_password(password, user["hashed_password"]):
return None
if not user.get("is_active", True):
return None
return user
async def create_user(user_create: UserCreate) -> UserResponse:
"""Create a new user."""
# Check if user already exists
existing_user = await get_user_by_email(user_create.email)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create new user
user_dict = user_create.dict()
hashed_password = get_password_hash(user_dict.pop("password"))
new_user = {
"email": user_dict["email"],
"hashed_password": hashed_password,
"full_name": user_dict.get("full_name"),
"is_active": True,
"is_admin": False,
"created_at": datetime.utcnow()
}
try:
insert_result = await Database.db.users.insert_one(new_user)
new_user["_id"] = insert_result.inserted_id
return UserResponse(**new_user)
except DuplicateKeyError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
def create_access_token(subject: str, expires_delta: Optional[timedelta] = None) -> str:
"""Create a new JWT access token."""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {"exp": expire, "sub": subject}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)) -> Dict[str, Any]:
"""Get the current user from a JWT token."""
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = TokenPayload(**payload)
if datetime.fromtimestamp(token_data.exp) < datetime.utcnow():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expired",
headers={"WWW-Authenticate": "Bearer"},
)
except (jwt.PyJWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user = await get_user_by_email(token_data.sub)
if user is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
async def get_current_active_user(current_user: Dict[str, Any] = Depends(get_current_user)) -> Dict[str, Any]:
"""Get the current active user."""
if not current_user.get("is_active", True):
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
async def get_current_admin_user(current_user: Dict[str, Any] = Depends(get_current_user)) -> Dict[str, Any]:
"""Get the current admin user."""
if not current_user.get("is_admin", False):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
# Initialize the users collection
async def init_users_collection():
"""Initialize the users collection with indexes."""
await Database.db.users.create_index("email", unique=True)
# Create admin user if it doesn't exist
admin_email = "admin@clayx.ai"
admin = await get_user_by_email(admin_email)
if not admin:
admin_user = UserCreate(
email=admin_email,
password="adminpassword", # Change this in production!
full_name="Admin User"
)
user_dict = admin_user.dict()
hashed_password = get_password_hash(user_dict["password"])
new_admin = {
"email": admin_email,
"hashed_password": hashed_password,
"full_name": "Admin User",
"is_active": True,
"is_admin": True,
"created_at": datetime.utcnow()
}
await Database.db.users.insert_one(new_admin) |