corpusdb / app /db_api_key_manager.py
mrsavage1's picture
Upload 52 files
083b511 verified
import secrets
import hashlib
from typing import Optional, Dict, Tuple
from app.utils import read_json, write_json
from pathlib import Path
class DatabaseAPIKeyManager:
"""Manage per-database API keys with secret key authentication"""
def __init__(self):
pass
def _generate_pair(self) -> Tuple[str, str, str]:
"""Generate api_key (CDB- prefix), secret (NNNN-NNNN-NNNN numeric), and secret_hash."""
api_key = f"CDB-{secrets.token_urlsafe(24)[:32]}"
secret_key = '-'.join(str(secrets.randbelow(10000)).zfill(4) for _ in range(3))
secret_hash = hashlib.sha256(secret_key.encode()).hexdigest()
return api_key, secret_key, secret_hash
def generate_db_api_key(self, workspace_id: str, database: str) -> str:
"""Legacy single-value generator — use generate_db_api_key_with_secret for new keys"""
return f"CDB-{secrets.token_urlsafe(24)[:32]}"
def store_db_api_key(self, user_store, database: str, api_key: str, secret_hash: str = '', workspace_id: str = '') -> Dict:
"""Store database API key and secret hash"""
keys_file = user_store.local("metadata", "db_api_keys.json")
keys = read_json(keys_file, {})
keys[database] = {
'api_key': api_key,
'secret_hash': secret_hash,
'workspace_id': workspace_id,
'database': database,
'created_at': self._utc_now(),
'last_used': None,
'usage_count': 0
}
write_json(keys_file, keys)
user_store.upload_file(keys_file, "metadata/db_api_keys.json", f"Create API key for {database}")
return keys[database]
def create_db_api_key(self, user_store, database: str, workspace_id: str = '') -> Dict:
"""Generate, store, and return api_key + secret_key (plaintext shown once)"""
api_key, secret_key, secret_hash = self._generate_pair()
self.store_db_api_key(user_store, database, api_key, secret_hash, workspace_id)
return {'api_key': api_key, 'secret_key': secret_key}
def get_db_api_key(self, user_store, database: str) -> Optional[str]:
"""Get API key for a database"""
keys_file = user_store.local("metadata", "db_api_keys.json")
keys = read_json(keys_file, {})
if database in keys:
return keys[database]['api_key']
return None
def verify_db_api_key(self, user_store, api_key: str, secret_key: str) -> Optional[Dict]:
"""Verify database API key + secret. Returns {database, workspace_id} or None."""
keys_file = user_store.local("metadata", "db_api_keys.json")
keys = read_json(keys_file, {})
secret_hash = hashlib.sha256(secret_key.encode()).hexdigest()
for database, key_info in keys.items():
if key_info['api_key'] != api_key:
continue
stored_hash = key_info.get('secret_hash', '')
if stored_hash and stored_hash != secret_hash:
continue
key_info['last_used'] = self._utc_now()
key_info['usage_count'] = key_info.get('usage_count', 0) + 1
write_json(keys_file, keys)
return {'database': database, 'workspace_id': key_info.get('workspace_id', '')}
return None
def regenerate_db_api_key(self, user_store, workspace_id: str, database: str) -> Dict:
"""Regenerate API key + secret for a database. Returns new api_key + secret_key (shown once)"""
api_key, secret_key, secret_hash = self._generate_pair()
self.store_db_api_key(user_store, database, api_key, secret_hash)
return {'api_key': api_key, 'secret_key': secret_key}
def revoke_db_api_key(self, user_store, database: str) -> bool:
"""Revoke API key for a database"""
keys_file = user_store.local("metadata", "db_api_keys.json")
keys = read_json(keys_file, {})
if database in keys:
del keys[database]
write_json(keys_file, keys)
user_store.upload_file(keys_file, "metadata/db_api_keys.json", f"Revoke API key for {database}")
return True
return False
def list_db_api_keys(self, user_store) -> Dict:
"""List all database API keys"""
keys_file = user_store.local("metadata", "db_api_keys.json")
return read_json(keys_file, {})
def get_connection_string(self, base_url: str, database: str, api_key: str) -> Dict:
"""Generate connection string for a database"""
return {
'database': database,
'api_endpoint': f"{base_url}/api/db/{database}",
'api_key': api_key,
'websocket': f"{base_url.replace('http', 'ws')}/ws?db={database}&key={api_key}",
'examples': {
'python': f'''import requests
BASE_URL = "{base_url}/api/db/{database}"
HEADERS = {{"X-Database-Key": "{api_key}"}}
# Query data
response = requests.get(f"{{BASE_URL}}/query",
params={{"sql": "SELECT * FROM my_table LIMIT 10"}},
headers=HEADERS)
data = response.json()
# Insert data
response = requests.post(f"{{BASE_URL}}/insert",
json={{"table": "my_table", "data": {{"id": 1, "name": "John"}}}},
headers=HEADERS)
''',
'javascript': f'''const BASE_URL = '{base_url}/api/db/{database}';
const HEADERS = {{ 'X-Database-Key': '{api_key}' }};
// Query data
fetch(`${{BASE_URL}}/query?sql=SELECT * FROM my_table LIMIT 10`, {{
headers: HEADERS
}})
.then(res => res.json())
.then(data => console.log(data));
// Insert data
fetch(`${{BASE_URL}}/insert`, {{
method: 'POST',
headers: {{ ...HEADERS, 'Content-Type': 'application/json' }},
body: JSON.stringify({{
table: 'my_table',
data: {{ id: 1, name: 'John' }}
}})
}});
''',
'curl': f'''# Query data
curl "{base_url}/api/db/{database}/query?sql=SELECT%20*%20FROM%20my_table%20LIMIT%2010" \\
-H "X-Database-Key: {api_key}"
# Insert data
curl -X POST "{base_url}/api/db/{database}/insert" \\
-H "X-Database-Key: {api_key}" \\
-H "Content-Type: application/json" \\
-d '{{"table": "my_table", "data": {{"id": 1, "name": "John"}}}}'
'''
}
}
def _utc_now(self):
from datetime import datetime
return datetime.utcnow().isoformat()
db_api_key_manager = DatabaseAPIKeyManager()