peijun1's picture
Deploy AI Studio Proxy API to Hugging Face Spaces
a5784e9
Raw
History Blame Contribute Delete
3.77 kB
import logging
from fastapi import Depends, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from ..dependencies import get_logger
class ApiKeyRequest(BaseModel):
key: str
class ApiKeyTestRequest(BaseModel):
key: str
async def get_api_keys(logger: logging.Logger = Depends(get_logger)):
from .. import auth_utils
try:
auth_utils.initialize_keys()
keys_info = list(auth_utils.API_KEYS)
return JSONResponse(
content={"success": True, "keys": keys_info, "total_count": len(keys_info)}
)
except Exception as e:
logger.error(f"Failed to get API key list: {e}")
raise HTTPException(status_code=500, detail=str(e))
async def add_api_key(
request: ApiKeyRequest, logger: logging.Logger = Depends(get_logger)
):
from .. import auth_utils
key_value = request.key.strip()
if not key_value or len(key_value) < 8:
raise HTTPException(status_code=400, detail="Invalid API key format.")
auth_utils.initialize_keys()
if key_value in auth_utils.API_KEYS:
raise HTTPException(status_code=400, detail="API key already exists.")
try:
key_file_path = auth_utils.KEY_FILE_PATH
with open(key_file_path, "a+", encoding="utf-8") as f:
f.seek(0)
if f.read():
f.write("\n")
f.write(key_value)
auth_utils.initialize_keys()
logger.info(f"API key added: {key_value[:4]}...{key_value[-4:]}")
return JSONResponse(
content={
"success": True,
"message": "API key added successfully",
"key_count": len(auth_utils.API_KEYS),
}
)
except Exception as e:
logger.error(f"Failed to add API key: {e}")
raise HTTPException(status_code=500, detail=str(e))
async def test_api_key(
request: ApiKeyTestRequest, logger: logging.Logger = Depends(get_logger)
):
from .. import auth_utils
key_value = request.key.strip()
if not key_value:
raise HTTPException(status_code=400, detail="API key cannot be empty.")
auth_utils.initialize_keys()
is_valid = auth_utils.verify_api_key(key_value)
logger.info(
f"API key test: {key_value[:4]}...{key_value[-4:]} - {'Valid' if is_valid else 'Invalid'}"
)
return JSONResponse(
content={
"success": True,
"valid": is_valid,
"message": "Key valid" if is_valid else "Key invalid or non-existent",
}
)
async def delete_api_key(
request: ApiKeyRequest, logger: logging.Logger = Depends(get_logger)
):
from .. import auth_utils
key_value = request.key.strip()
if not key_value:
raise HTTPException(status_code=400, detail="API key cannot be empty.")
auth_utils.initialize_keys()
if key_value not in auth_utils.API_KEYS:
raise HTTPException(status_code=404, detail="API key does not exist.")
try:
key_file_path = auth_utils.KEY_FILE_PATH
with open(key_file_path, "r", encoding="utf-8") as f:
lines = f.readlines()
with open(key_file_path, "w", encoding="utf-8") as f:
f.writelines(line for line in lines if line.strip() != key_value)
auth_utils.initialize_keys()
logger.info(f"API key deleted: {key_value[:4]}...{key_value[-4:]}")
return JSONResponse(
content={
"success": True,
"message": "API key deleted successfully",
"key_count": len(auth_utils.API_KEYS),
}
)
except Exception as e:
logger.error(f"Failed to delete API key: {e}")
raise HTTPException(status_code=500, detail=str(e))