openai-openapi-template / app /security /auth_service.py
cevheri's picture
chore: format, improve logging, remove deprecated code
bcd6d73
from app.config.security_config import get_security_config
from fastapi import HTTPException, status, Security
from fastapi.security import APIKeyHeader
from loguru import logger
import base64
import hmac
import hashlib
import json
api_key_header = APIKeyHeader(
name="Authorization",
scheme_name="ApiKeyAuth",
description="API key in the format: sk-{username}-{base64_encoded_data}",
auto_error=False,
)
class AuthService:
def __init__(self):
self.api_key_header = api_key_header
self.security_config = get_security_config()
def decode_api_key(self, api_key: str) -> str:
"""Decode API key to extract username and verify signature."""
logger.trace(f"BEGIN: api_key: {api_key}")
try:
if api_key.startswith("Bearer "):
api_key = api_key[7:]
if not api_key.startswith("sk-"):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key format. Must start with 'sk-'",
)
parts = api_key.split("-", 2)
if len(parts) != 3:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key format. Must be in format: sk-{username}-{base64_encoded_data}",
)
encoded_data = parts[2]
try:
decoded_data = base64.b64decode(encoded_data).decode()
data = json.loads(decoded_data)
logger.trace(f"Decoded data: {data}")
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid API key data format: {str(e)}",
)
json_data = {
"username": data["username"],
"created_at": data["created_at"],
}
json_str = json.dumps(json_data)
logger.trace(f"JSON data for signature: {json_str}")
logger.trace(f"Secret key: {self.security_config.SECRET_KEY}")
expected_signature = hmac.new(
self.security_config.SECRET_KEY.encode(),
json_str.encode(),
hashlib.sha256,
).hexdigest()
logger.trace(f"Expected signature: {expected_signature}")
logger.trace(f"Received signature: {data['signature']}")
if data["signature"] != expected_signature:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid API key signature: {data['signature']} != {expected_signature}",
)
result = data["username"]
logger.trace(f"END: result: {result}")
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid API key: {str(e)}",
)
async def verify_credentials(self, api_key: str = Security(api_key_header)) -> str:
"""Verify API key and extract username."""
logger.trace(f"BEGIN: api_key: {api_key}")
if not self.security_config.ENABLED:
logger.warning("Security is disabled, using default username: " + self.security_config.DEFAULT_USERNAME)
return self.security_config.DEFAULT_USERNAME
if not api_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API key is required when security is enabled",
)
username = self.decode_api_key(api_key)
result = username
logger.trace(f"END: result: {result}")
return result