chatgpt2api / services /api_key_service.py
tx1538's picture
Upload 179 files
9d7ddb9 verified
Raw
History Blame
7.63 kB
from __future__ import annotations
import hashlib
import json
import uuid
from datetime import datetime, timezone
from pathlib import Path
from threading import Lock
from typing import Any, Callable
from services.config import DATA_DIR
OPENAI_KEYS_FILE = DATA_DIR / "openai_keys.json"
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _clean(value: object) -> str:
return str(value or "").strip()
def _key_hash(secret: str) -> str:
return hashlib.sha256(secret.encode("utf-8")).hexdigest()
def _key_hint(secret: str) -> str:
value = _clean(secret)
if len(value) <= 12:
return f"{value[:4]}..."
return f"{value[:7]}...{value[-4:]}"
def check_openai_key_with_models(secret: str) -> dict[str, Any]:
"""Read-only official OpenAI API key check via /v1/models."""
from curl_cffi import requests
from services.proxy_service import proxy_settings
session = requests.Session(**proxy_settings.build_session_kwargs(verify=True))
try:
response = session.get(
"https://api.openai.com/v1/models",
headers={"Authorization": f"Bearer {secret}", "Accept": "application/json"},
timeout=30,
)
finally:
session.close()
status_code = int(response.status_code)
try:
payload = response.json()
except Exception:
payload = {}
if status_code == 200:
models = payload.get("data") if isinstance(payload, dict) else []
model_ids = [
_clean(item.get("id"))
for item in models
if isinstance(item, dict) and _clean(item.get("id"))
]
return {
"status": "ok",
"http_status": status_code,
"models_count": len(model_ids),
"sample_models": model_ids[:8],
"last_error": None,
}
message = ""
if isinstance(payload, dict):
error = payload.get("error")
if isinstance(error, dict):
message = _clean(error.get("message") or error.get("code") or error.get("type"))
else:
message = _clean(payload.get("message"))
if not message:
message = response.text[:300]
if status_code == 401:
status = "invalid"
elif status_code == 429:
status = "rate_limited"
elif status_code == 403:
status = "forbidden"
else:
status = "error"
return {
"status": status,
"http_status": status_code,
"models_count": 0,
"sample_models": [],
"last_error": message or f"HTTP {status_code}",
}
class OpenAIKeyService:
def __init__(
self,
store_file: Path = OPENAI_KEYS_FILE,
checker: Callable[[str], dict[str, Any]] = check_openai_key_with_models,
) -> None:
self.store_file = store_file
self.checker = checker
self._lock = Lock()
self._items = self._load()
def _load(self) -> list[dict[str, Any]]:
try:
raw = json.loads(self.store_file.read_text(encoding="utf-8"))
except Exception:
return []
if not isinstance(raw, list):
return []
items = []
for item in raw:
if isinstance(item, dict) and _clean(item.get("key")):
normalized = self._normalize(item)
if normalized is not None:
items.append(normalized)
return items
def _save(self) -> None:
self.store_file.parent.mkdir(parents=True, exist_ok=True)
self.store_file.write_text(json.dumps(self._items, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
def _normalize(self, item: dict[str, Any]) -> dict[str, Any] | None:
secret = _clean(item.get("key"))
if not secret:
return None
created_at = _clean(item.get("created_at")) or _now()
return {
"id": _clean(item.get("id")) or uuid.uuid4().hex,
"name": _clean(item.get("name")) or "OpenAI API Key",
"key": secret,
"key_hash": _clean(item.get("key_hash")) or _key_hash(secret),
"key_hint": _key_hint(secret),
"status": _clean(item.get("status")) or "unchecked",
"http_status": item.get("http_status"),
"models_count": int(item.get("models_count") or 0),
"sample_models": item.get("sample_models") if isinstance(item.get("sample_models"), list) else [],
"last_error": item.get("last_error"),
"last_checked_at": _clean(item.get("last_checked_at")),
"created_at": created_at,
"updated_at": _clean(item.get("updated_at")) or created_at,
}
@staticmethod
def _sanitize(item: dict[str, Any]) -> dict[str, Any]:
return {
key: value
for key, value in item.items()
if key not in {"key", "key_hash"}
}
def list_keys(self) -> list[dict[str, Any]]:
with self._lock:
return [self._sanitize(dict(item)) for item in self._items]
def add_key(self, name: str, secret: str, *, check: bool = False) -> dict[str, Any]:
secret = _clean(secret)
if not secret:
raise ValueError("api key is required")
if not secret.startswith("sk-"):
raise ValueError("api key must start with sk-")
digest = _key_hash(secret)
with self._lock:
if any(item.get("key_hash") == digest for item in self._items):
raise ValueError("api key already exists")
item = self._normalize({"name": name, "key": secret})
if item is None:
raise ValueError("api key is required")
self._items.append(item)
self._save()
key_id = item["id"]
if check:
return self.check_key(key_id)
return self._sanitize(item)
def delete_key(self, key_id: str) -> bool:
key_id = _clean(key_id)
with self._lock:
before = len(self._items)
self._items = [item for item in self._items if item.get("id") != key_id]
removed = len(self._items) != before
if removed:
self._save()
return removed
def check_key(self, key_id: str) -> dict[str, Any]:
key_id = _clean(key_id)
with self._lock:
item = next((item for item in self._items if item.get("id") == key_id), None)
if item is None:
raise KeyError(key_id)
secret = str(item["key"])
try:
result = self.checker(secret)
except Exception as exc:
result = {"status": "error", "http_status": None, "models_count": 0, "sample_models": [], "last_error": str(exc)}
with self._lock:
item = next((item for item in self._items if item.get("id") == key_id), None)
if item is None:
raise KeyError(key_id)
item.update(
{
"status": _clean(result.get("status")) or "error",
"http_status": result.get("http_status"),
"models_count": int(result.get("models_count") or 0),
"sample_models": result.get("sample_models") if isinstance(result.get("sample_models"), list) else [],
"last_error": result.get("last_error"),
"last_checked_at": _now(),
"updated_at": _now(),
}
)
self._save()
return self._sanitize(dict(item))
openai_key_service = OpenAIKeyService()