File size: 3,772 Bytes
a5784e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import logging

from fastapi import Depends, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel

from ..dependencies import get_logger


class ApiKeyRequest(BaseModel):
    key: str


class ApiKeyTestRequest(BaseModel):
    key: str


async def get_api_keys(logger: logging.Logger = Depends(get_logger)):
    from .. import auth_utils

    try:
        auth_utils.initialize_keys()
        keys_info = list(auth_utils.API_KEYS)
        return JSONResponse(
            content={"success": True, "keys": keys_info, "total_count": len(keys_info)}
        )
    except Exception as e:
        logger.error(f"Failed to get API key list: {e}")
        raise HTTPException(status_code=500, detail=str(e))


async def add_api_key(
    request: ApiKeyRequest, logger: logging.Logger = Depends(get_logger)
):
    from .. import auth_utils

    key_value = request.key.strip()
    if not key_value or len(key_value) < 8:
        raise HTTPException(status_code=400, detail="Invalid API key format.")

    auth_utils.initialize_keys()
    if key_value in auth_utils.API_KEYS:
        raise HTTPException(status_code=400, detail="API key already exists.")

    try:
        key_file_path = auth_utils.KEY_FILE_PATH
        with open(key_file_path, "a+", encoding="utf-8") as f:
            f.seek(0)
            if f.read():
                f.write("\n")
            f.write(key_value)

        auth_utils.initialize_keys()
        logger.info(f"API key added: {key_value[:4]}...{key_value[-4:]}")
        return JSONResponse(
            content={
                "success": True,
                "message": "API key added successfully",
                "key_count": len(auth_utils.API_KEYS),
            }
        )
    except Exception as e:
        logger.error(f"Failed to add API key: {e}")
        raise HTTPException(status_code=500, detail=str(e))


async def test_api_key(
    request: ApiKeyTestRequest, logger: logging.Logger = Depends(get_logger)
):
    from .. import auth_utils

    key_value = request.key.strip()
    if not key_value:
        raise HTTPException(status_code=400, detail="API key cannot be empty.")

    auth_utils.initialize_keys()
    is_valid = auth_utils.verify_api_key(key_value)
    logger.info(
        f"API key test: {key_value[:4]}...{key_value[-4:]} - {'Valid' if is_valid else 'Invalid'}"
    )
    return JSONResponse(
        content={
            "success": True,
            "valid": is_valid,
            "message": "Key valid" if is_valid else "Key invalid or non-existent",
        }
    )


async def delete_api_key(
    request: ApiKeyRequest, logger: logging.Logger = Depends(get_logger)
):
    from .. import auth_utils

    key_value = request.key.strip()
    if not key_value:
        raise HTTPException(status_code=400, detail="API key cannot be empty.")

    auth_utils.initialize_keys()
    if key_value not in auth_utils.API_KEYS:
        raise HTTPException(status_code=404, detail="API key does not exist.")

    try:
        key_file_path = auth_utils.KEY_FILE_PATH
        with open(key_file_path, "r", encoding="utf-8") as f:
            lines = f.readlines()

        with open(key_file_path, "w", encoding="utf-8") as f:
            f.writelines(line for line in lines if line.strip() != key_value)

        auth_utils.initialize_keys()
        logger.info(f"API key deleted: {key_value[:4]}...{key_value[-4:]}")
        return JSONResponse(
            content={
                "success": True,
                "message": "API key deleted successfully",
                "key_count": len(auth_utils.API_KEYS),
            }
        )
    except Exception as e:
        logger.error(f"Failed to delete API key: {e}")
        raise HTTPException(status_code=500, detail=str(e))