import secrets import hashlib import threading from datetime import datetime, timezone from typing import Dict, Optional from pathlib import Path import json class CorpusDBAPIKeyManager: """ Manages CorpusDB API keys with format: - API Key: cDb_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx (44 chars total) - Secret Key: xxx-xxx (7 chars with dash) """ def __init__(self, storage_path: str = "app_data/api_keys"): self.storage_path = Path(storage_path) self.storage_path.mkdir(parents=True, exist_ok=True) self.keys_file = self.storage_path / "keys.json" self._lock = threading.Lock() self._load_keys() def _load_keys(self): """Load API keys from storage""" if self.keys_file.exists(): with open(self.keys_file, 'r') as f: self.keys = json.load(f) else: self.keys = {} def _save_keys(self): """Save API keys to storage (caller must hold self._lock)""" tmp = self.keys_file.with_suffix('.tmp') with open(tmp, 'w') as f: json.dump(self.keys, f, indent=2) tmp.replace(self.keys_file) def generate_api_key(self, user_id: str, username: str, workspace_id: str) -> Dict[str, str]: """ Generate new API key and secret for a user. Returns api_key (cDb_ prefix) and secret (NNNN-NNNN-NNNN format, shown once). """ random_part = secrets.token_urlsafe(30)[:40] api_key = f"cDb_{random_part}" # Secret: 4-digit numeric groups NNNN-NNNN-NNNN secret_key = '-'.join(str(secrets.randbelow(10000)).zfill(4) for _ in range(3)) # Hash the secret for storage secret_hash = hashlib.sha256(secret_key.encode()).hexdigest() # Store key info key_data = { 'api_key': api_key, 'secret_hash': secret_hash, 'user_id': user_id, 'username': username, 'workspace_id': workspace_id, 'created_at': datetime.now(timezone.utc).isoformat(), 'last_used': None, 'usage_count': 0, 'active': True } with self._lock: self.keys[api_key] = key_data self._save_keys() # Return with actual secret (only time it's shown) return { 'api_key': api_key, 'secret_key': secret_key, 'user_id': user_id, 'username': username, 'workspace_id': workspace_id, 'created_at': key_data['created_at'] } def verify_api_key(self, api_key: str, secret_key: str) -> Optional[Dict]: """ Verify API key and secret combination Returns user info if valid, None if invalid """ if not api_key or not api_key.startswith('cDb_'): return None if api_key not in self.keys: return None key_data = self.keys[api_key] if not key_data.get('active'): return None # Verify secret secret_hash = hashlib.sha256(secret_key.encode()).hexdigest() if secret_hash != key_data['secret_hash']: return None # Update usage with self._lock: key_data['last_used'] = datetime.now(timezone.utc).isoformat() key_data['usage_count'] = key_data.get('usage_count', 0) + 1 self._save_keys() return { 'user_id': key_data['user_id'], 'username': key_data['username'], 'workspace_id': key_data['workspace_id'] } def get_user_keys(self, user_id: str) -> list: """Get all API keys for a user""" user_keys = [] for api_key, data in self.keys.items(): if data['user_id'] == user_id: user_keys.append({ 'api_key': api_key, 'created_at': data['created_at'], 'last_used': data['last_used'], 'usage_count': data['usage_count'], 'active': data['active'] }) return user_keys def revoke_key(self, api_key: str) -> bool: """Revoke an API key""" with self._lock: if api_key in self.keys: self.keys[api_key]['active'] = False self._save_keys() return True return False def delete_key(self, api_key: str) -> bool: """Permanently delete an API key""" with self._lock: if api_key in self.keys: del self.keys[api_key] self._save_keys() return True return False def regenerate_secret(self, api_key: str) -> Optional[str]: """Regenerate secret key for an existing API key""" with self._lock: if api_key not in self.keys: return None secret_key = '-'.join(str(secrets.randbelow(10000)).zfill(4) for _ in range(3)) secret_hash = hashlib.sha256(secret_key.encode()).hexdigest() self.keys[api_key]['secret_hash'] = secret_hash self._save_keys() return secret_key # Global instance corpusdb_api_manager = CorpusDBAPIKeyManager()