vBot-2.1 / app /auth.py
Ajit Panday
Fix timestamp handling in customer endpoints
7714b76
from fastapi import APIRouter, Depends, HTTPException, status, Security
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm, APIKeyHeader
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from typing import Optional, List
import secrets
from .settings import (
JWT_SECRET, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES,
ADMIN_USERNAME, ADMIN_PASSWORD
)
from .database import get_db
from .models import Customer as CustomerModel
from .schemas import Customer as CustomerSchema, CustomerCreate
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
api_key_header = APIKeyHeader(name="api-key")
router = APIRouter()
def verify_api_key(api_key: str = Security(api_key_header)) -> CustomerModel:
"""Verify API key and return customer"""
db = next(get_db())
customer = db.query(CustomerModel).filter(
CustomerModel.api_key == api_key,
CustomerModel.is_active == True
).first()
if not customer:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
headers={"WWW-Authenticate": "Bearer"},
)
return customer
def verify_password(plain_password: str, hashed_password: str):
"""Verify password"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str):
"""Hash password"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Create JWT token for admin access"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, JWT_SECRET, algorithm=ALGORITHM)
return encoded_jwt
def get_current_admin(token: str = Depends(oauth2_scheme)):
"""Get current admin user"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
if username != ADMIN_USERNAME:
raise credentials_exception
return username
@router.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
if form_data.username != ADMIN_USERNAME or form_data.password != ADMIN_PASSWORD:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": form_data.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/customers/", response_model=dict)
async def create_customer(
customer_data: dict,
db: Session = Depends(get_db),
current_admin: str = Depends(get_current_admin)
):
"""Create a new customer"""
try:
# Validate required fields
required_fields = ["name", "company_name", "email"]
missing_fields = [field for field in required_fields if field not in customer_data]
if missing_fields:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Missing required fields: {', '.join(missing_fields)}"
)
# Check if email already exists
existing_customer = db.query(CustomerModel).filter(CustomerModel.email == customer_data["email"]).first()
if existing_customer:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create new customer
now = datetime.utcnow()
customer = CustomerModel(
name=customer_data["name"],
company_name=customer_data["company_name"],
email=customer_data["email"],
api_key=str(secrets.token_urlsafe(32)),
created_at=now,
updated_at=now
)
db.add(customer)
db.commit()
db.refresh(customer)
return {
"id": customer.id,
"name": customer.name,
"company_name": customer.company_name,
"email": customer.email,
"api_key": customer.api_key,
"is_active": customer.is_active,
"created_at": customer.created_at,
"updated_at": customer.updated_at
}
except HTTPException as he:
raise he
except Exception as e:
print(f"Error creating customer: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error creating customer: {str(e)}"
)
@router.get("/customers/", response_model=List[CustomerSchema])
async def list_customers(
db: Session = Depends(get_db),
current_admin: str = Depends(get_current_admin)
):
"""List all customers"""
try:
customers = db.query(CustomerModel).all()
return [
CustomerSchema(
id=customer.id,
name=customer.name,
company_name=customer.company_name,
email=customer.email,
api_key=customer.api_key,
is_active=customer.is_active,
created_at=customer.created_at or datetime.utcnow(),
updated_at=customer.updated_at or datetime.utcnow()
) for customer in customers
]
except Exception as e:
print(f"Error listing customers: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error listing customers: {str(e)}"
)
@router.get("/customers/{customer_id}", response_model=dict)
async def get_customer(
customer_id: int,
db: Session = Depends(get_db),
current_admin: str = Depends(get_current_admin)
):
"""Get customer details"""
customer = db.query(CustomerModel).filter(CustomerModel.id == customer_id).first()
if not customer:
raise HTTPException(status_code=404, detail="Customer not found")
return customer
@router.delete("/customers/{customer_id}")
async def delete_customer(
customer_id: int,
db: Session = Depends(get_db),
current_admin: str = Depends(get_current_admin)
):
"""Delete a customer"""
customer = db.query(CustomerModel).filter(CustomerModel.id == customer_id).first()
if not customer:
raise HTTPException(status_code=404, detail="Customer not found")
db.delete(customer)
db.commit()
return {"message": "Customer deleted successfully"}