import secrets import hashlib from typing import Optional, Dict, Tuple from app.utils import read_json, write_json from pathlib import Path class DatabaseAPIKeyManager: """Manage per-database API keys with secret key authentication""" def __init__(self): pass def _generate_pair(self) -> Tuple[str, str, str]: """Generate api_key (CDB- prefix), secret (NNNN-NNNN-NNNN numeric), and secret_hash.""" api_key = f"CDB-{secrets.token_urlsafe(24)[:32]}" secret_key = '-'.join(str(secrets.randbelow(10000)).zfill(4) for _ in range(3)) secret_hash = hashlib.sha256(secret_key.encode()).hexdigest() return api_key, secret_key, secret_hash def generate_db_api_key(self, workspace_id: str, database: str) -> str: """Legacy single-value generator — use generate_db_api_key_with_secret for new keys""" return f"CDB-{secrets.token_urlsafe(24)[:32]}" def store_db_api_key(self, user_store, database: str, api_key: str, secret_hash: str = '', workspace_id: str = '') -> Dict: """Store database API key and secret hash""" keys_file = user_store.local("metadata", "db_api_keys.json") keys = read_json(keys_file, {}) keys[database] = { 'api_key': api_key, 'secret_hash': secret_hash, 'workspace_id': workspace_id, 'database': database, 'created_at': self._utc_now(), 'last_used': None, 'usage_count': 0 } write_json(keys_file, keys) user_store.upload_file(keys_file, "metadata/db_api_keys.json", f"Create API key for {database}") return keys[database] def create_db_api_key(self, user_store, database: str, workspace_id: str = '') -> Dict: """Generate, store, and return api_key + secret_key (plaintext shown once)""" api_key, secret_key, secret_hash = self._generate_pair() self.store_db_api_key(user_store, database, api_key, secret_hash, workspace_id) return {'api_key': api_key, 'secret_key': secret_key} def get_db_api_key(self, user_store, database: str) -> Optional[str]: """Get API key for a database""" keys_file = user_store.local("metadata", "db_api_keys.json") keys = read_json(keys_file, {}) if database in keys: return keys[database]['api_key'] return None def verify_db_api_key(self, user_store, api_key: str, secret_key: str) -> Optional[Dict]: """Verify database API key + secret. Returns {database, workspace_id} or None.""" keys_file = user_store.local("metadata", "db_api_keys.json") keys = read_json(keys_file, {}) secret_hash = hashlib.sha256(secret_key.encode()).hexdigest() for database, key_info in keys.items(): if key_info['api_key'] != api_key: continue stored_hash = key_info.get('secret_hash', '') if stored_hash and stored_hash != secret_hash: continue key_info['last_used'] = self._utc_now() key_info['usage_count'] = key_info.get('usage_count', 0) + 1 write_json(keys_file, keys) return {'database': database, 'workspace_id': key_info.get('workspace_id', '')} return None def regenerate_db_api_key(self, user_store, workspace_id: str, database: str) -> Dict: """Regenerate API key + secret for a database. Returns new api_key + secret_key (shown once)""" api_key, secret_key, secret_hash = self._generate_pair() self.store_db_api_key(user_store, database, api_key, secret_hash) return {'api_key': api_key, 'secret_key': secret_key} def revoke_db_api_key(self, user_store, database: str) -> bool: """Revoke API key for a database""" keys_file = user_store.local("metadata", "db_api_keys.json") keys = read_json(keys_file, {}) if database in keys: del keys[database] write_json(keys_file, keys) user_store.upload_file(keys_file, "metadata/db_api_keys.json", f"Revoke API key for {database}") return True return False def list_db_api_keys(self, user_store) -> Dict: """List all database API keys""" keys_file = user_store.local("metadata", "db_api_keys.json") return read_json(keys_file, {}) def get_connection_string(self, base_url: str, database: str, api_key: str) -> Dict: """Generate connection string for a database""" return { 'database': database, 'api_endpoint': f"{base_url}/api/db/{database}", 'api_key': api_key, 'websocket': f"{base_url.replace('http', 'ws')}/ws?db={database}&key={api_key}", 'examples': { 'python': f'''import requests BASE_URL = "{base_url}/api/db/{database}" HEADERS = {{"X-Database-Key": "{api_key}"}} # Query data response = requests.get(f"{{BASE_URL}}/query", params={{"sql": "SELECT * FROM my_table LIMIT 10"}}, headers=HEADERS) data = response.json() # Insert data response = requests.post(f"{{BASE_URL}}/insert", json={{"table": "my_table", "data": {{"id": 1, "name": "John"}}}}, headers=HEADERS) ''', 'javascript': f'''const BASE_URL = '{base_url}/api/db/{database}'; const HEADERS = {{ 'X-Database-Key': '{api_key}' }}; // Query data fetch(`${{BASE_URL}}/query?sql=SELECT * FROM my_table LIMIT 10`, {{ headers: HEADERS }}) .then(res => res.json()) .then(data => console.log(data)); // Insert data fetch(`${{BASE_URL}}/insert`, {{ method: 'POST', headers: {{ ...HEADERS, 'Content-Type': 'application/json' }}, body: JSON.stringify({{ table: 'my_table', data: {{ id: 1, name: 'John' }} }}) }}); ''', 'curl': f'''# Query data curl "{base_url}/api/db/{database}/query?sql=SELECT%20*%20FROM%20my_table%20LIMIT%2010" \\ -H "X-Database-Key: {api_key}" # Insert data curl -X POST "{base_url}/api/db/{database}/insert" \\ -H "X-Database-Key: {api_key}" \\ -H "Content-Type: application/json" \\ -d '{{"table": "my_table", "data": {{"id": 1, "name": "John"}}}}' ''' } } def _utc_now(self): from datetime import datetime return datetime.utcnow().isoformat() db_api_key_manager = DatabaseAPIKeyManager()