| import secrets |
| import hashlib |
| import threading |
| from datetime import datetime, timezone |
| from typing import Dict, Optional |
| from pathlib import Path |
| import json |
|
|
| class CorpusDBAPIKeyManager: |
| """ |
| Manages CorpusDB API keys with format: |
| - API Key: cDb_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx (44 chars total) |
| - Secret Key: xxx-xxx (7 chars with dash) |
| """ |
| |
| def __init__(self, storage_path: str = "app_data/api_keys"): |
| self.storage_path = Path(storage_path) |
| self.storage_path.mkdir(parents=True, exist_ok=True) |
| self.keys_file = self.storage_path / "keys.json" |
| self._lock = threading.Lock() |
| self._load_keys() |
| |
| def _load_keys(self): |
| """Load API keys from storage""" |
| if self.keys_file.exists(): |
| with open(self.keys_file, 'r') as f: |
| self.keys = json.load(f) |
| else: |
| self.keys = {} |
| |
| def _save_keys(self): |
| """Save API keys to storage (caller must hold self._lock)""" |
| tmp = self.keys_file.with_suffix('.tmp') |
| with open(tmp, 'w') as f: |
| json.dump(self.keys, f, indent=2) |
| tmp.replace(self.keys_file) |
| |
| def generate_api_key(self, user_id: str, username: str, workspace_id: str) -> Dict[str, str]: |
| """ |
| Generate new API key and secret for a user. |
| Returns api_key (cDb_ prefix) and secret (NNNN-NNNN-NNNN format, shown once). |
| """ |
| random_part = secrets.token_urlsafe(30)[:40] |
| api_key = f"cDb_{random_part}" |
|
|
| |
| secret_key = '-'.join(str(secrets.randbelow(10000)).zfill(4) for _ in range(3)) |
| |
| |
| secret_hash = hashlib.sha256(secret_key.encode()).hexdigest() |
| |
| |
| key_data = { |
| 'api_key': api_key, |
| 'secret_hash': secret_hash, |
| 'user_id': user_id, |
| 'username': username, |
| 'workspace_id': workspace_id, |
| 'created_at': datetime.now(timezone.utc).isoformat(), |
| 'last_used': None, |
| 'usage_count': 0, |
| 'active': True |
| } |
| |
| with self._lock: |
| self.keys[api_key] = key_data |
| self._save_keys() |
| |
| |
| return { |
| 'api_key': api_key, |
| 'secret_key': secret_key, |
| 'user_id': user_id, |
| 'username': username, |
| 'workspace_id': workspace_id, |
| 'created_at': key_data['created_at'] |
| } |
| |
| def verify_api_key(self, api_key: str, secret_key: str) -> Optional[Dict]: |
| """ |
| Verify API key and secret combination |
| |
| Returns user info if valid, None if invalid |
| """ |
| if not api_key or not api_key.startswith('cDb_'): |
| return None |
| |
| if api_key not in self.keys: |
| return None |
| |
| key_data = self.keys[api_key] |
| |
| if not key_data.get('active'): |
| return None |
| |
| |
| secret_hash = hashlib.sha256(secret_key.encode()).hexdigest() |
| if secret_hash != key_data['secret_hash']: |
| return None |
| |
| |
| with self._lock: |
| key_data['last_used'] = datetime.now(timezone.utc).isoformat() |
| key_data['usage_count'] = key_data.get('usage_count', 0) + 1 |
| self._save_keys() |
| |
| return { |
| 'user_id': key_data['user_id'], |
| 'username': key_data['username'], |
| 'workspace_id': key_data['workspace_id'] |
| } |
| |
| def get_user_keys(self, user_id: str) -> list: |
| """Get all API keys for a user""" |
| user_keys = [] |
| for api_key, data in self.keys.items(): |
| if data['user_id'] == user_id: |
| user_keys.append({ |
| 'api_key': api_key, |
| 'created_at': data['created_at'], |
| 'last_used': data['last_used'], |
| 'usage_count': data['usage_count'], |
| 'active': data['active'] |
| }) |
| return user_keys |
| |
| def revoke_key(self, api_key: str) -> bool: |
| """Revoke an API key""" |
| with self._lock: |
| if api_key in self.keys: |
| self.keys[api_key]['active'] = False |
| self._save_keys() |
| return True |
| return False |
| |
| def delete_key(self, api_key: str) -> bool: |
| """Permanently delete an API key""" |
| with self._lock: |
| if api_key in self.keys: |
| del self.keys[api_key] |
| self._save_keys() |
| return True |
| return False |
| |
| def regenerate_secret(self, api_key: str) -> Optional[str]: |
| """Regenerate secret key for an existing API key""" |
| with self._lock: |
| if api_key not in self.keys: |
| return None |
| secret_key = '-'.join(str(secrets.randbelow(10000)).zfill(4) for _ in range(3)) |
| secret_hash = hashlib.sha256(secret_key.encode()).hexdigest() |
| self.keys[api_key]['secret_hash'] = secret_hash |
| self._save_keys() |
| return secret_key |
|
|
| |
| corpusdb_api_manager = CorpusDBAPIKeyManager() |
|
|