Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, Depends, HTTPException, status, Security | |
| from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm, APIKeyHeader | |
| from sqlalchemy.orm import Session | |
| from datetime import datetime, timedelta | |
| from jose import JWTError, jwt | |
| from passlib.context import CryptContext | |
| from typing import Optional, List | |
| import secrets | |
| from .settings import ( | |
| JWT_SECRET, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, | |
| ADMIN_USERNAME, ADMIN_PASSWORD | |
| ) | |
| from .database import get_db | |
| from .models import Customer as CustomerModel | |
| from .schemas import Customer as CustomerSchema, CustomerCreate | |
| # Password hashing | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| # OAuth2 scheme | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") | |
| api_key_header = APIKeyHeader(name="api-key") | |
| router = APIRouter() | |
| def verify_api_key(api_key: str = Security(api_key_header)) -> CustomerModel: | |
| """Verify API key and return customer""" | |
| db = next(get_db()) | |
| customer = db.query(CustomerModel).filter( | |
| CustomerModel.api_key == api_key, | |
| CustomerModel.is_active == True | |
| ).first() | |
| if not customer: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid API key", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| return customer | |
| def verify_password(plain_password: str, hashed_password: str): | |
| """Verify password""" | |
| return pwd_context.verify(plain_password, hashed_password) | |
| def get_password_hash(password: str): | |
| """Hash password""" | |
| return pwd_context.hash(password) | |
| def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): | |
| """Create JWT token for admin access""" | |
| to_encode = data.copy() | |
| if expires_delta: | |
| expire = datetime.utcnow() + expires_delta | |
| else: | |
| expire = datetime.utcnow() + timedelta(minutes=15) | |
| to_encode.update({"exp": expire}) | |
| encoded_jwt = jwt.encode(to_encode, JWT_SECRET, algorithm=ALGORITHM) | |
| return encoded_jwt | |
| def get_current_admin(token: str = Depends(oauth2_scheme)): | |
| """Get current admin user""" | |
| credentials_exception = HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Could not validate credentials", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| try: | |
| payload = jwt.decode(token, JWT_SECRET, algorithms=[ALGORITHM]) | |
| username: str = payload.get("sub") | |
| if username is None: | |
| raise credentials_exception | |
| except JWTError: | |
| raise credentials_exception | |
| if username != ADMIN_USERNAME: | |
| raise credentials_exception | |
| return username | |
| async def login(form_data: OAuth2PasswordRequestForm = Depends()): | |
| if form_data.username != ADMIN_USERNAME or form_data.password != ADMIN_PASSWORD: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Incorrect username or password", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
| access_token = create_access_token( | |
| data={"sub": form_data.username}, expires_delta=access_token_expires | |
| ) | |
| return {"access_token": access_token, "token_type": "bearer"} | |
| async def create_customer( | |
| customer_data: dict, | |
| db: Session = Depends(get_db), | |
| current_admin: str = Depends(get_current_admin) | |
| ): | |
| """Create a new customer""" | |
| try: | |
| # Validate required fields | |
| required_fields = ["name", "company_name", "email"] | |
| missing_fields = [field for field in required_fields if field not in customer_data] | |
| if missing_fields: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Missing required fields: {', '.join(missing_fields)}" | |
| ) | |
| # Check if email already exists | |
| existing_customer = db.query(CustomerModel).filter(CustomerModel.email == customer_data["email"]).first() | |
| if existing_customer: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Email already registered" | |
| ) | |
| # Create new customer | |
| now = datetime.utcnow() | |
| customer = CustomerModel( | |
| name=customer_data["name"], | |
| company_name=customer_data["company_name"], | |
| email=customer_data["email"], | |
| api_key=str(secrets.token_urlsafe(32)), | |
| created_at=now, | |
| updated_at=now | |
| ) | |
| db.add(customer) | |
| db.commit() | |
| db.refresh(customer) | |
| return { | |
| "id": customer.id, | |
| "name": customer.name, | |
| "company_name": customer.company_name, | |
| "email": customer.email, | |
| "api_key": customer.api_key, | |
| "is_active": customer.is_active, | |
| "created_at": customer.created_at, | |
| "updated_at": customer.updated_at | |
| } | |
| except HTTPException as he: | |
| raise he | |
| except Exception as e: | |
| print(f"Error creating customer: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Error creating customer: {str(e)}" | |
| ) | |
| async def list_customers( | |
| db: Session = Depends(get_db), | |
| current_admin: str = Depends(get_current_admin) | |
| ): | |
| """List all customers""" | |
| try: | |
| customers = db.query(CustomerModel).all() | |
| return [ | |
| CustomerSchema( | |
| id=customer.id, | |
| name=customer.name, | |
| company_name=customer.company_name, | |
| email=customer.email, | |
| api_key=customer.api_key, | |
| is_active=customer.is_active, | |
| created_at=customer.created_at or datetime.utcnow(), | |
| updated_at=customer.updated_at or datetime.utcnow() | |
| ) for customer in customers | |
| ] | |
| except Exception as e: | |
| print(f"Error listing customers: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Error listing customers: {str(e)}" | |
| ) | |
| async def get_customer( | |
| customer_id: int, | |
| db: Session = Depends(get_db), | |
| current_admin: str = Depends(get_current_admin) | |
| ): | |
| """Get customer details""" | |
| customer = db.query(CustomerModel).filter(CustomerModel.id == customer_id).first() | |
| if not customer: | |
| raise HTTPException(status_code=404, detail="Customer not found") | |
| return customer | |
| async def delete_customer( | |
| customer_id: int, | |
| db: Session = Depends(get_db), | |
| current_admin: str = Depends(get_current_admin) | |
| ): | |
| """Delete a customer""" | |
| customer = db.query(CustomerModel).filter(CustomerModel.id == customer_id).first() | |
| if not customer: | |
| raise HTTPException(status_code=404, detail="Customer not found") | |
| db.delete(customer) | |
| db.commit() | |
| return {"message": "Customer deleted successfully"} |