from fastapi import APIRouter, Depends, HTTPException, status, Security from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm, APIKeyHeader from sqlalchemy.orm import Session from datetime import datetime, timedelta from jose import JWTError, jwt from passlib.context import CryptContext from typing import Optional, List import secrets from .settings import ( JWT_SECRET, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, ADMIN_USERNAME, ADMIN_PASSWORD ) from .database import get_db from .models import Customer as CustomerModel from .schemas import Customer as CustomerSchema, CustomerCreate # Password hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # OAuth2 scheme oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") api_key_header = APIKeyHeader(name="api-key") router = APIRouter() def verify_api_key(api_key: str = Security(api_key_header)) -> CustomerModel: """Verify API key and return customer""" db = next(get_db()) customer = db.query(CustomerModel).filter( CustomerModel.api_key == api_key, CustomerModel.is_active == True ).first() if not customer: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key", headers={"WWW-Authenticate": "Bearer"}, ) return customer def verify_password(plain_password: str, hashed_password: str): """Verify password""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str): """Hash password""" return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): """Create JWT token for admin access""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, JWT_SECRET, algorithm=ALGORITHM) return encoded_jwt def get_current_admin(token: str = Depends(oauth2_scheme)): """Get current admin user""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, JWT_SECRET, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception except JWTError: raise credentials_exception if username != ADMIN_USERNAME: raise credentials_exception return username @router.post("/token") async def login(form_data: OAuth2PasswordRequestForm = Depends()): if form_data.username != ADMIN_USERNAME or form_data.password != ADMIN_PASSWORD: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": form_data.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} @router.post("/customers/", response_model=dict) async def create_customer( customer_data: dict, db: Session = Depends(get_db), current_admin: str = Depends(get_current_admin) ): """Create a new customer""" try: # Validate required fields required_fields = ["name", "company_name", "email"] missing_fields = [field for field in required_fields if field not in customer_data] if missing_fields: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Missing required fields: {', '.join(missing_fields)}" ) # Check if email already exists existing_customer = db.query(CustomerModel).filter(CustomerModel.email == customer_data["email"]).first() if existing_customer: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) # Create new customer now = datetime.utcnow() customer = CustomerModel( name=customer_data["name"], company_name=customer_data["company_name"], email=customer_data["email"], api_key=str(secrets.token_urlsafe(32)), created_at=now, updated_at=now ) db.add(customer) db.commit() db.refresh(customer) return { "id": customer.id, "name": customer.name, "company_name": customer.company_name, "email": customer.email, "api_key": customer.api_key, "is_active": customer.is_active, "created_at": customer.created_at, "updated_at": customer.updated_at } except HTTPException as he: raise he except Exception as e: print(f"Error creating customer: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error creating customer: {str(e)}" ) @router.get("/customers/", response_model=List[CustomerSchema]) async def list_customers( db: Session = Depends(get_db), current_admin: str = Depends(get_current_admin) ): """List all customers""" try: customers = db.query(CustomerModel).all() return [ CustomerSchema( id=customer.id, name=customer.name, company_name=customer.company_name, email=customer.email, api_key=customer.api_key, is_active=customer.is_active, created_at=customer.created_at or datetime.utcnow(), updated_at=customer.updated_at or datetime.utcnow() ) for customer in customers ] except Exception as e: print(f"Error listing customers: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error listing customers: {str(e)}" ) @router.get("/customers/{customer_id}", response_model=dict) async def get_customer( customer_id: int, db: Session = Depends(get_db), current_admin: str = Depends(get_current_admin) ): """Get customer details""" customer = db.query(CustomerModel).filter(CustomerModel.id == customer_id).first() if not customer: raise HTTPException(status_code=404, detail="Customer not found") return customer @router.delete("/customers/{customer_id}") async def delete_customer( customer_id: int, db: Session = Depends(get_db), current_admin: str = Depends(get_current_admin) ): """Delete a customer""" customer = db.query(CustomerModel).filter(CustomerModel.id == customer_id).first() if not customer: raise HTTPException(status_code=404, detail="Customer not found") db.delete(customer) db.commit() return {"message": "Customer deleted successfully"}