corpusdb / app /corpusdb_api_manager.py
mrsavage1's picture
Upload 52 files
083b511 verified
import secrets
import hashlib
import threading
from datetime import datetime, timezone
from typing import Dict, Optional
from pathlib import Path
import json
class CorpusDBAPIKeyManager:
"""
Manages CorpusDB API keys with format:
- API Key: cDb_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx (44 chars total)
- Secret Key: xxx-xxx (7 chars with dash)
"""
def __init__(self, storage_path: str = "app_data/api_keys"):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
self.keys_file = self.storage_path / "keys.json"
self._lock = threading.Lock()
self._load_keys()
def _load_keys(self):
"""Load API keys from storage"""
if self.keys_file.exists():
with open(self.keys_file, 'r') as f:
self.keys = json.load(f)
else:
self.keys = {}
def _save_keys(self):
"""Save API keys to storage (caller must hold self._lock)"""
tmp = self.keys_file.with_suffix('.tmp')
with open(tmp, 'w') as f:
json.dump(self.keys, f, indent=2)
tmp.replace(self.keys_file)
def generate_api_key(self, user_id: str, username: str, workspace_id: str) -> Dict[str, str]:
"""
Generate new API key and secret for a user.
Returns api_key (cDb_ prefix) and secret (NNNN-NNNN-NNNN format, shown once).
"""
random_part = secrets.token_urlsafe(30)[:40]
api_key = f"cDb_{random_part}"
# Secret: 4-digit numeric groups NNNN-NNNN-NNNN
secret_key = '-'.join(str(secrets.randbelow(10000)).zfill(4) for _ in range(3))
# Hash the secret for storage
secret_hash = hashlib.sha256(secret_key.encode()).hexdigest()
# Store key info
key_data = {
'api_key': api_key,
'secret_hash': secret_hash,
'user_id': user_id,
'username': username,
'workspace_id': workspace_id,
'created_at': datetime.now(timezone.utc).isoformat(),
'last_used': None,
'usage_count': 0,
'active': True
}
with self._lock:
self.keys[api_key] = key_data
self._save_keys()
# Return with actual secret (only time it's shown)
return {
'api_key': api_key,
'secret_key': secret_key,
'user_id': user_id,
'username': username,
'workspace_id': workspace_id,
'created_at': key_data['created_at']
}
def verify_api_key(self, api_key: str, secret_key: str) -> Optional[Dict]:
"""
Verify API key and secret combination
Returns user info if valid, None if invalid
"""
if not api_key or not api_key.startswith('cDb_'):
return None
if api_key not in self.keys:
return None
key_data = self.keys[api_key]
if not key_data.get('active'):
return None
# Verify secret
secret_hash = hashlib.sha256(secret_key.encode()).hexdigest()
if secret_hash != key_data['secret_hash']:
return None
# Update usage
with self._lock:
key_data['last_used'] = datetime.now(timezone.utc).isoformat()
key_data['usage_count'] = key_data.get('usage_count', 0) + 1
self._save_keys()
return {
'user_id': key_data['user_id'],
'username': key_data['username'],
'workspace_id': key_data['workspace_id']
}
def get_user_keys(self, user_id: str) -> list:
"""Get all API keys for a user"""
user_keys = []
for api_key, data in self.keys.items():
if data['user_id'] == user_id:
user_keys.append({
'api_key': api_key,
'created_at': data['created_at'],
'last_used': data['last_used'],
'usage_count': data['usage_count'],
'active': data['active']
})
return user_keys
def revoke_key(self, api_key: str) -> bool:
"""Revoke an API key"""
with self._lock:
if api_key in self.keys:
self.keys[api_key]['active'] = False
self._save_keys()
return True
return False
def delete_key(self, api_key: str) -> bool:
"""Permanently delete an API key"""
with self._lock:
if api_key in self.keys:
del self.keys[api_key]
self._save_keys()
return True
return False
def regenerate_secret(self, api_key: str) -> Optional[str]:
"""Regenerate secret key for an existing API key"""
with self._lock:
if api_key not in self.keys:
return None
secret_key = '-'.join(str(secrets.randbelow(10000)).zfill(4) for _ in range(3))
secret_hash = hashlib.sha256(secret_key.encode()).hexdigest()
self.keys[api_key]['secret_hash'] = secret_hash
self._save_keys()
return secret_key
# Global instance
corpusdb_api_manager = CorpusDBAPIKeyManager()