Spaces:
Sleeping
Sleeping
File size: 7,295 Bytes
1de8ce6 c6d0d33 260918e c6d0d33 96c5d66 1de8ce6 62d9331 30d5632 c6d0d33 1de8ce6 c6d0d33 62d9331 1de8ce6 62d9331 1de8ce6 c6d0d33 1de8ce6 c6d0d33 1de8ce6 c6d0d33 96c5d66 c6d0d33 1de8ce6 c6d0d33 96c5d66 c6d0d33 1de8ce6 c6d0d33 ddecf13 c6d0d33 ddecf13 1445330 a46907b 1445330 62d9331 1445330 7714b76 62d9331 1445330 7714b76 73e6ba6 1445330 7714b76 1445330 73e6ba6 1445330 73e6ba6 c6d0d33 62d9331 c6d0d33 ddecf13 c6d0d33 ddecf13 093b53f 62d9331 974ed50 62d9331 974ed50 7714b76 974ed50 093b53f c6d0d33 ddecf13 c6d0d33 ddecf13 62d9331 c6d0d33 ddecf13 c6d0d33 ddecf13 62d9331 c6d0d33 ddecf13 c6d0d33 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
from fastapi import APIRouter, Depends, HTTPException, status, Security
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm, APIKeyHeader
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from typing import Optional, List
import secrets
from .settings import (
JWT_SECRET, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES,
ADMIN_USERNAME, ADMIN_PASSWORD
)
from .database import get_db
from .models import Customer as CustomerModel
from .schemas import Customer as CustomerSchema, CustomerCreate
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
api_key_header = APIKeyHeader(name="api-key")
router = APIRouter()
def verify_api_key(api_key: str = Security(api_key_header)) -> CustomerModel:
"""Verify API key and return customer"""
db = next(get_db())
customer = db.query(CustomerModel).filter(
CustomerModel.api_key == api_key,
CustomerModel.is_active == True
).first()
if not customer:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
headers={"WWW-Authenticate": "Bearer"},
)
return customer
def verify_password(plain_password: str, hashed_password: str):
"""Verify password"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str):
"""Hash password"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Create JWT token for admin access"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, JWT_SECRET, algorithm=ALGORITHM)
return encoded_jwt
def get_current_admin(token: str = Depends(oauth2_scheme)):
"""Get current admin user"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
if username != ADMIN_USERNAME:
raise credentials_exception
return username
@router.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
if form_data.username != ADMIN_USERNAME or form_data.password != ADMIN_PASSWORD:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": form_data.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/customers/", response_model=dict)
async def create_customer(
customer_data: dict,
db: Session = Depends(get_db),
current_admin: str = Depends(get_current_admin)
):
"""Create a new customer"""
try:
# Validate required fields
required_fields = ["name", "company_name", "email"]
missing_fields = [field for field in required_fields if field not in customer_data]
if missing_fields:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Missing required fields: {', '.join(missing_fields)}"
)
# Check if email already exists
existing_customer = db.query(CustomerModel).filter(CustomerModel.email == customer_data["email"]).first()
if existing_customer:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create new customer
now = datetime.utcnow()
customer = CustomerModel(
name=customer_data["name"],
company_name=customer_data["company_name"],
email=customer_data["email"],
api_key=str(secrets.token_urlsafe(32)),
created_at=now,
updated_at=now
)
db.add(customer)
db.commit()
db.refresh(customer)
return {
"id": customer.id,
"name": customer.name,
"company_name": customer.company_name,
"email": customer.email,
"api_key": customer.api_key,
"is_active": customer.is_active,
"created_at": customer.created_at,
"updated_at": customer.updated_at
}
except HTTPException as he:
raise he
except Exception as e:
print(f"Error creating customer: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error creating customer: {str(e)}"
)
@router.get("/customers/", response_model=List[CustomerSchema])
async def list_customers(
db: Session = Depends(get_db),
current_admin: str = Depends(get_current_admin)
):
"""List all customers"""
try:
customers = db.query(CustomerModel).all()
return [
CustomerSchema(
id=customer.id,
name=customer.name,
company_name=customer.company_name,
email=customer.email,
api_key=customer.api_key,
is_active=customer.is_active,
created_at=customer.created_at or datetime.utcnow(),
updated_at=customer.updated_at or datetime.utcnow()
) for customer in customers
]
except Exception as e:
print(f"Error listing customers: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error listing customers: {str(e)}"
)
@router.get("/customers/{customer_id}", response_model=dict)
async def get_customer(
customer_id: int,
db: Session = Depends(get_db),
current_admin: str = Depends(get_current_admin)
):
"""Get customer details"""
customer = db.query(CustomerModel).filter(CustomerModel.id == customer_id).first()
if not customer:
raise HTTPException(status_code=404, detail="Customer not found")
return customer
@router.delete("/customers/{customer_id}")
async def delete_customer(
customer_id: int,
db: Session = Depends(get_db),
current_admin: str = Depends(get_current_admin)
):
"""Delete a customer"""
customer = db.query(CustomerModel).filter(CustomerModel.id == customer_id).first()
if not customer:
raise HTTPException(status_code=404, detail="Customer not found")
db.delete(customer)
db.commit()
return {"message": "Customer deleted successfully"} |