| import secrets | |
| import hashlib | |
| from typing import Optional, Dict, Tuple | |
| from app.utils import read_json, write_json | |
| from pathlib import Path | |
| class DatabaseAPIKeyManager: | |
| """Manage per-database API keys with secret key authentication""" | |
| def __init__(self): | |
| pass | |
| def _generate_pair(self) -> Tuple[str, str, str]: | |
| """Generate api_key (CDB- prefix), secret (NNNN-NNNN-NNNN numeric), and secret_hash.""" | |
| api_key = f"CDB-{secrets.token_urlsafe(24)[:32]}" | |
| secret_key = '-'.join(str(secrets.randbelow(10000)).zfill(4) for _ in range(3)) | |
| secret_hash = hashlib.sha256(secret_key.encode()).hexdigest() | |
| return api_key, secret_key, secret_hash | |
| def generate_db_api_key(self, workspace_id: str, database: str) -> str: | |
| """Legacy single-value generator — use generate_db_api_key_with_secret for new keys""" | |
| return f"CDB-{secrets.token_urlsafe(24)[:32]}" | |
| def store_db_api_key(self, user_store, database: str, api_key: str, secret_hash: str = '', workspace_id: str = '') -> Dict: | |
| """Store database API key and secret hash""" | |
| keys_file = user_store.local("metadata", "db_api_keys.json") | |
| keys = read_json(keys_file, {}) | |
| keys[database] = { | |
| 'api_key': api_key, | |
| 'secret_hash': secret_hash, | |
| 'workspace_id': workspace_id, | |
| 'database': database, | |
| 'created_at': self._utc_now(), | |
| 'last_used': None, | |
| 'usage_count': 0 | |
| } | |
| write_json(keys_file, keys) | |
| user_store.upload_file(keys_file, "metadata/db_api_keys.json", f"Create API key for {database}") | |
| return keys[database] | |
| def create_db_api_key(self, user_store, database: str, workspace_id: str = '') -> Dict: | |
| """Generate, store, and return api_key + secret_key (plaintext shown once)""" | |
| api_key, secret_key, secret_hash = self._generate_pair() | |
| self.store_db_api_key(user_store, database, api_key, secret_hash, workspace_id) | |
| return {'api_key': api_key, 'secret_key': secret_key} | |
| def get_db_api_key(self, user_store, database: str) -> Optional[str]: | |
| """Get API key for a database""" | |
| keys_file = user_store.local("metadata", "db_api_keys.json") | |
| keys = read_json(keys_file, {}) | |
| if database in keys: | |
| return keys[database]['api_key'] | |
| return None | |
| def verify_db_api_key(self, user_store, api_key: str, secret_key: str) -> Optional[Dict]: | |
| """Verify database API key + secret. Returns {database, workspace_id} or None.""" | |
| keys_file = user_store.local("metadata", "db_api_keys.json") | |
| keys = read_json(keys_file, {}) | |
| secret_hash = hashlib.sha256(secret_key.encode()).hexdigest() | |
| for database, key_info in keys.items(): | |
| if key_info['api_key'] != api_key: | |
| continue | |
| stored_hash = key_info.get('secret_hash', '') | |
| if stored_hash and stored_hash != secret_hash: | |
| continue | |
| key_info['last_used'] = self._utc_now() | |
| key_info['usage_count'] = key_info.get('usage_count', 0) + 1 | |
| write_json(keys_file, keys) | |
| return {'database': database, 'workspace_id': key_info.get('workspace_id', '')} | |
| return None | |
| def regenerate_db_api_key(self, user_store, workspace_id: str, database: str) -> Dict: | |
| """Regenerate API key + secret for a database. Returns new api_key + secret_key (shown once)""" | |
| api_key, secret_key, secret_hash = self._generate_pair() | |
| self.store_db_api_key(user_store, database, api_key, secret_hash) | |
| return {'api_key': api_key, 'secret_key': secret_key} | |
| def revoke_db_api_key(self, user_store, database: str) -> bool: | |
| """Revoke API key for a database""" | |
| keys_file = user_store.local("metadata", "db_api_keys.json") | |
| keys = read_json(keys_file, {}) | |
| if database in keys: | |
| del keys[database] | |
| write_json(keys_file, keys) | |
| user_store.upload_file(keys_file, "metadata/db_api_keys.json", f"Revoke API key for {database}") | |
| return True | |
| return False | |
| def list_db_api_keys(self, user_store) -> Dict: | |
| """List all database API keys""" | |
| keys_file = user_store.local("metadata", "db_api_keys.json") | |
| return read_json(keys_file, {}) | |
| def get_connection_string(self, base_url: str, database: str, api_key: str) -> Dict: | |
| """Generate connection string for a database""" | |
| return { | |
| 'database': database, | |
| 'api_endpoint': f"{base_url}/api/db/{database}", | |
| 'api_key': api_key, | |
| 'websocket': f"{base_url.replace('http', 'ws')}/ws?db={database}&key={api_key}", | |
| 'examples': { | |
| 'python': f'''import requests | |
| BASE_URL = "{base_url}/api/db/{database}" | |
| HEADERS = {{"X-Database-Key": "{api_key}"}} | |
| # Query data | |
| response = requests.get(f"{{BASE_URL}}/query", | |
| params={{"sql": "SELECT * FROM my_table LIMIT 10"}}, | |
| headers=HEADERS) | |
| data = response.json() | |
| # Insert data | |
| response = requests.post(f"{{BASE_URL}}/insert", | |
| json={{"table": "my_table", "data": {{"id": 1, "name": "John"}}}}, | |
| headers=HEADERS) | |
| ''', | |
| 'javascript': f'''const BASE_URL = '{base_url}/api/db/{database}'; | |
| const HEADERS = {{ 'X-Database-Key': '{api_key}' }}; | |
| // Query data | |
| fetch(`${{BASE_URL}}/query?sql=SELECT * FROM my_table LIMIT 10`, {{ | |
| headers: HEADERS | |
| }}) | |
| .then(res => res.json()) | |
| .then(data => console.log(data)); | |
| // Insert data | |
| fetch(`${{BASE_URL}}/insert`, {{ | |
| method: 'POST', | |
| headers: {{ ...HEADERS, 'Content-Type': 'application/json' }}, | |
| body: JSON.stringify({{ | |
| table: 'my_table', | |
| data: {{ id: 1, name: 'John' }} | |
| }}) | |
| }}); | |
| ''', | |
| 'curl': f'''# Query data | |
| curl "{base_url}/api/db/{database}/query?sql=SELECT%20*%20FROM%20my_table%20LIMIT%2010" \\ | |
| -H "X-Database-Key: {api_key}" | |
| # Insert data | |
| curl -X POST "{base_url}/api/db/{database}/insert" \\ | |
| -H "X-Database-Key: {api_key}" \\ | |
| -H "Content-Type: application/json" \\ | |
| -d '{{"table": "my_table", "data": {{"id": 1, "name": "John"}}}}' | |
| ''' | |
| } | |
| } | |
| def _utc_now(self): | |
| from datetime import datetime | |
| return datetime.utcnow().isoformat() | |
| db_api_key_manager = DatabaseAPIKeyManager() | |