File size: 2,776 Bytes
f775c99
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import secrets
import hashlib
import asyncio
from fastapi import APIRouter, Depends, HTTPException, Path
from pydantic import BaseModel, Field
from typing import List, Dict, Any, cast
from app.core.auth import get_current_user
from app.core.database import db

router = APIRouter()

# --- 1. STRICT SCHEMAS ---
class CreateKeyRequest(BaseModel):
    # Firewall: Prevent DB Overflow by capping the name length
    name: str = Field(..., min_length=1, max_length=64, description="Name identifier for the API key")

def generate_secure_key():
    """
    Generates a production-grade API key.
    Returns: (raw_key_for_user, hashed_key_for_db, key_hint_for_ui)
    """
    raw_secret = secrets.token_hex(24) 
    full_key = f"axm_live_{raw_secret}"
    key_hash = hashlib.sha256(full_key.encode()).hexdigest()
    key_hint = f"{full_key[:12]}...{full_key[-4:]}"
    return full_key, key_hash, key_hint

# --- 2. ASYNC ENDPOINTS ---

@router.post("/")
async def create_api_key(req: CreateKeyRequest, user_id: str = Depends(get_current_user)):
    """Generates a new API key. The raw key is returned ONLY ONCE."""
    if not db: raise HTTPException(503, "DB Offline")
    
    full_key, key_hash, key_hint = generate_secure_key()
    
    # Non-blocking DB Insert
    await asyncio.to_thread(
        lambda: db.table("api_keys").insert({
            "user_id": user_id,
            "name": req.name,
            "key_value": key_hash, 
            "key_hint": key_hint   
        }).execute()
    )
    
    return {
        "status": "success", 
        "name": req.name, 
        "key_value": full_key, 
        "message": "Please copy this key now. You will not be able to see it again."
    }

@router.get("/")
async def list_api_keys(user_id: str = Depends(get_current_user)):
    """Lists all active API keys using their secure hints asynchronously."""
    if not db: raise HTTPException(503, "DB Offline")
    
    res = await asyncio.to_thread(
        lambda: db.table("api_keys")
        .select("id, name, created_at, last_used_at, is_active, key_hint")
        .eq("user_id", user_id)
        .order("created_at", desc=True)
        .execute()
    )
    
    return {"keys": cast(List[Dict[str, Any]], res.data)}

@router.delete("/{key_id}")
async def revoke_api_key(
    key_id: str = Path(..., description="ID of the key to revoke"), 
    user_id: str = Depends(get_current_user)
):
    """Instantly revokes an API key (Sets is_active to False) without freezing the UI."""
    if not db: raise HTTPException(503, "DB Offline")
    
    await asyncio.to_thread(
        lambda: db.table("api_keys")
        .update({"is_active": False})
        .eq("id", key_id)
        .eq("user_id", user_id)
        .execute()
    )
    return {"status": "revoked", "id": key_id}