corpusdb / app /api.py
mrsavage1's picture
Upload 62 files
d6b0096 verified
from fastapi import APIRouter, UploadFile, File, Form, Depends, Response, Cookie, Request, WebSocket, WebSocketDisconnect, Header, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from app.metadata import metadata
import logging
logger = logging.getLogger('corpusdb.api')
from app.table_manager import table_manager
from app.row_manager import row_manager
from app.schema_manager import schema_manager
from app.import_export import import_export
from app.relation_manager import relation_manager
from app.privilege_manager import privilege_manager
from app.audit_manager import audit
from app.backup_manager import backup_manager
from app.sample_data import init_sample_data
from app.user_manager import user_manager
from app.auth import get_current_user, require_auth, get_user_store
from app.admin import require_admin
from app.broadcaster import broadcaster
from app.query_engine import create_query_engine
from app.corpusdb_api_manager import corpusdb_api_manager
from app.db_api_key_manager import db_api_key_manager
from app.database_analytics import database_analytics
from app.data_visualizer import data_visualizer
from app.database_templates import database_templates
from app.advanced_import_export import advanced_import_export
from app.migration_manager import migration_manager
from app.query_builder import query_builder
from app.data_validator import data_validator
from app.job_scheduler import job_scheduler
from app.bookmark_manager import bookmark_manager
from app.global_search import global_search
from app.table_maintenance import table_maintenance
from app.blob_handler import blob_handler
from app.views_manager import views_manager
from app.privilege_manager_v2 import privilege_manager_v2
from app.stored_procedure_manager import stored_procedure_manager
from app.triggers_manager import triggers_manager
from app.process_manager import process_manager
from app.workspace_share_manager import workspace_share_manager
from app.query_complexity import query_complexity_analyzer
from app.project_manager import project_manager
from app.visual_query_builder import visual_query_builder
from app.data_visualization_engine import data_visualization_engine
from app.api_doc_generator import api_doc_generator
from app.database_template_manager import database_template_manager
from app.performance_monitor import performance_monitor
from app.ai_assistant import ai_assistant
from app.configuration_analyzer import configuration_analyzer
from app.security_logger import security_logger
import tempfile
from pathlib import Path
import json
import re
router = APIRouter()
class LoginRequest(BaseModel):
username: str
password: str
class SignupRequest(BaseModel):
username: str
email: str
password: str
full_name: str = ''
class CreateDatabaseRequest(BaseModel):
name: str
class CreateTableRequest(BaseModel):
database: str
table: str
columns: list
class SQLQueryRequest(BaseModel):
sql: str
class UserSettingsRequest(BaseModel):
email: str = None
full_name: str = None
hf_token: str = None
hf_repo: str = None
# ── AUTHENTICATION ──
@router.post("/auth/signup")
def signup(payload: SignupRequest, response: Response, request: Request):
from app.rate_limiter import rate_limiter
from app.input_validator import input_validator
from app.security_logger import security_logger
client_ip = request.client.host if request.client else 'unknown'
logger.info(f"SIGNUP REQUEST: username='{payload.username}', email='{payload.email}', ip={client_ip}")
try:
# Rate limiting
try:
rate_limiter.check_rate_limit(request, limit_type='auth')
except HTTPException as e:
logger.warning(f"SIGNUP RATE LIMITED: username='{payload.username}', ip={client_ip}")
security_logger.log_rate_limit_exceeded(
payload.username,
client_ip,
'auth'
)
return JSONResponse(
status_code=429,
content={'ok': False, 'error': 'Too many requests. Please try again later.'}
)
# Validate inputs
is_valid, error = input_validator.validate_username(payload.username)
if not is_valid:
logger.warning(f"SIGNUP VALIDATION FAILED (username): {error}, username='{payload.username}'")
return {'ok': False, 'error': error}
is_valid, error = input_validator.validate_email(payload.email)
if not is_valid:
logger.warning(f"SIGNUP VALIDATION FAILED (email): {error}, email='{payload.email}'")
return {'ok': False, 'error': error}
is_valid, error = input_validator.validate_password(payload.password)
if not is_valid:
logger.warning(f"SIGNUP VALIDATION FAILED (password): {error}")
return {'ok': False, 'error': error}
# Sanitize full name
full_name = input_validator.sanitize_string(payload.full_name, max_length=100)
result = user_manager.create_user(
username=payload.username,
email=payload.email,
password=payload.password,
full_name=full_name
)
if result['ok']:
logger.info(f"SIGNUP SUCCESS: username='{payload.username}', email='{payload.email}', ip={client_ip}")
security_logger.log_event(
'SIGNUP',
payload.username,
client_ip
)
# Auto-login after signup
auth_result = user_manager.authenticate(payload.username, payload.password)
if auth_result['ok']:
logger.info(f"SIGNUP AUTO-LOGIN SUCCESS: username='{payload.username}', token={auth_result['token'][:12]}..., ip={client_ip}")
response.set_cookie(
key="session_token",
value=auth_result['token'],
httponly=True,
max_age=7*24*60*60,
samesite="none",
secure=True
)
return auth_result
else:
logger.warning(f"SIGNUP AUTO-LOGIN FAILED: username='{payload.username}', reason='{auth_result.get('error')}'")
return result
except Exception as e:
logger.error(f"SIGNUP ERROR: username='{payload.username}', exception={e}", exc_info=True)
import traceback
traceback.print_exc()
return JSONResponse(
status_code=500,
content={'ok': False, 'error': f'Server error: {str(e)}'}
)
@router.post("/auth/register")
def register(payload: SignupRequest, response: Response, request: Request):
"""Alias for /auth/signup for backward compatibility"""
return signup(payload, response, request)
@router.get("/auth/admin-totp-setup")
def admin_totp_setup():
"""Get TOTP secret and QR code URL for admin setup"""
import os
admin_secret = os.getenv('ADMIN_TOTP_SECRET')
if not admin_secret:
return {'ok': False, 'error': 'ADMIN_TOTP_SECRET not configured'}
# Generate QR code URL
qr_data = f"otpauth://totp/CorpusDB:admin@corpusdb?secret={admin_secret}&issuer=CorpusDB"
qr_url = f"https://api.qrserver.com/v1/create-qr-code/?size=200x200&data={qr_data}"
return {'ok': True, 'secret': admin_secret, 'qr_url': qr_url}
@router.post("/auth/admin-totp")
def admin_totp_login(payload: dict, response: Response, request: Request):
"""Admin login using only TOTP code"""
from app.rate_limiter import rate_limiter
from app.security_logger import security_logger
import os
import base64
import hashlib
import hmac
import struct
import time
def verify_totp(secret, token, valid_window=1):
"""Verify TOTP token"""
try:
# Decode base32 secret
secret = secret.upper().replace(' ', '')
# Add padding if needed
missing_padding = len(secret) % 8
if missing_padding:
secret += '=' * (8 - missing_padding)
key = base64.b32decode(secret)
# Get current time counter
current_time = int(time.time() / 30)
# Check token against current and nearby time windows
for i in range(-valid_window, valid_window + 1):
time_counter = current_time + i
# Generate TOTP
msg = struct.pack('>Q', time_counter)
hmac_hash = hmac.new(key, msg, hashlib.sha1).digest()
offset = hmac_hash[-1] & 0x0f
code = struct.unpack('>I', hmac_hash[offset:offset+4])[0] & 0x7fffffff
code = code % 1000000
if str(code).zfill(6) == str(token).zfill(6):
return True
return False
except Exception:
return False
try:
# Rate limiting
try:
rate_limiter.check_rate_limit(request, limit_type='auth')
except HTTPException as e:
security_logger.log_rate_limit_exceeded(
'admin',
request.client.host if request.client else 'unknown',
'auth'
)
return JSONResponse(
status_code=429,
content={'ok': False, 'error': 'Too many requests. Please try again later.'}
)
totp_code = payload.get('totp_code')
if not totp_code:
return {'ok': False, 'error': 'totp_code required'}
# Admin TOTP secret - must be set via environment variable
admin_secret = os.getenv('ADMIN_TOTP_SECRET')
if not admin_secret:
return {'ok': False, 'error': 'Admin TOTP not configured. Set ADMIN_TOTP_SECRET environment variable.'}
if not verify_totp(admin_secret, totp_code, valid_window=1):
security_logger.log_auth_failure(
'admin',
request.client.host if request.client else 'unknown',
'Invalid TOTP code'
)
return {'ok': False, 'error': 'Invalid TOTP code'}
# TOTP verified - get or create admin user
admin_user = user_manager.get_user('admin')
if not admin_user:
# Create admin user
result = user_manager.create_user(
username='admin',
email='admin@corpusdb.local',
password='admin_' + os.urandom(16).hex(), # Random password
full_name='Administrator'
)
if not result['ok']:
return {'ok': False, 'error': 'Failed to create admin user'}
admin_user = user_manager.get_user('admin')
# Generate session token
import secrets
token = secrets.token_urlsafe(32)
# Create session in database
from datetime import datetime, timedelta
conn = user_manager._get_conn()
now = datetime.utcnow()
expires = now + timedelta(days=7)
conn.execute("""
INSERT INTO sessions (token, username, workspace_id, created_at, expires_at)
VALUES (?, ?, ?, ?, ?)
""", [token, admin_user['username'], admin_user['workspace_id'],
now.isoformat(), expires.isoformat()])
conn.close()
security_logger.log_auth_success(
'admin',
request.client.host if request.client else 'unknown'
)
response.set_cookie(
key="session_token",
value=token,
httponly=True,
max_age=7*24*60*60,
samesite="none",
secure=True
)
return {
'ok': True,
'token': token,
'user': {
'username': admin_user['username'],
'email': admin_user['email'],
'role': 'admin',
'workspace_id': admin_user['workspace_id']
}
}
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse(
status_code=500,
content={'ok': False, 'error': f'Server error: {str(e)}'}
)
@router.post("/auth/login")
def login(payload: LoginRequest, response: Response, request: Request):
from app.rate_limiter import rate_limiter
from app.security_logger import security_logger
client_ip = request.client.host if request.client else 'unknown'
logger.info(f"LOGIN REQUEST: username='{payload.username}', ip={client_ip}")
try:
# Rate limiting
try:
rate_limiter.check_rate_limit(request, limit_type='auth')
except HTTPException as e:
logger.warning(f"LOGIN RATE LIMITED: username='{payload.username}', ip={client_ip}")
security_logger.log_rate_limit_exceeded(
payload.username,
client_ip,
'auth'
)
return JSONResponse(
status_code=429,
content={'ok': False, 'error': 'Too many requests. Please try again later.'}
)
# Check for brute force
if security_logger.detect_brute_force(payload.username):
logger.warning(f"LOGIN BLOCKED (brute force): username='{payload.username}', ip={client_ip}")
security_logger.log_auth_blocked(
payload.username,
client_ip
)
return {'ok': False, 'error': 'Account temporarily locked due to too many failed attempts'}
# Authenticate
result = user_manager.authenticate(payload.username, payload.password)
if result['ok']:
logger.info(f"LOGIN SUCCESS: username='{payload.username}', token={result['token'][:12]}..., ip={client_ip}")
security_logger.log_auth_success(
payload.username,
client_ip
)
response.set_cookie(
key="session_token",
value=result['token'],
httponly=True,
max_age=7*24*60*60,
samesite="none",
secure=True
)
return result
else:
logger.warning(f"LOGIN FAILED: username='{payload.username}', reason='{result.get('error')}', ip={client_ip}")
security_logger.log_auth_failure(
payload.username,
client_ip,
result.get('error')
)
return result
except Exception as e:
logger.error(f"LOGIN ERROR: username='{payload.username}', exception={e}", exc_info=True)
import traceback
traceback.print_exc()
return JSONResponse(
status_code=500,
content={'ok': False, 'error': f'Server error: {str(e)}'}
)
@router.post("/auth/logout")
def logout(response: Response, session_token: str = Cookie(default=None)):
if session_token:
user_manager.logout(session_token)
response.delete_cookie("session_token")
return {"ok": True}
@router.get("/auth/me")
def get_me(request: Request, user = Depends(get_current_user)):
if not user:
return {"authenticated": False}
return {"authenticated": True, "user": user}
@router.get("/admin/users")
def list_all_users(request: Request, user = Depends(require_admin)):
"""Debug endpoint to list all users"""
users = user_manager.list_users()
return {"count": len(users), "users": [{"username": u['username'], "workspace_id": u['workspace_id']} for u in users]}
@router.get("/admin/all-databases")
def list_all_databases(request: Request, user = Depends(require_admin)):
"""Admin endpoint to list ALL databases from ALL users"""
from pathlib import Path
from app.config import settings
all_databases = []
users = user_manager.list_users()
for u in users:
try:
u_store = get_user_store(u)
result = metadata.list_databases(u_store)
if result and isinstance(result, dict) and 'databases' in result:
for db in result['databases']:
db['owner'] = u['username']
db['workspace_id'] = u['workspace_id']
all_databases.append(db)
except Exception:
continue
return {'databases': all_databases}
@router.get("/admin/all-tables")
def list_all_tables(request: Request, user = Depends(require_admin)):
"""Admin endpoint to list ALL tables from ALL users"""
all_tables = []
users = user_manager.list_users()
for u in users:
try:
u_store = get_user_store(u)
tables = table_manager.list(u_store)
if tables and isinstance(tables, list):
for table in tables:
table['owner'] = u['username']
table['workspace_id'] = u['workspace_id']
all_tables.append(table)
except Exception:
continue
return all_tables
# ── SITE MODE CONTROLS ──
_site_modes = {'maintenance': False, 'coming_soon': False}
@router.get("/admin/site-mode")
def get_site_mode(request: Request, user = Depends(require_admin)):
"""Get current site mode settings"""
return {
'ok': True,
'maintenance': _site_modes['maintenance'],
'coming_soon': _site_modes['coming_soon']
}
@router.post("/admin/site-mode")
def set_site_mode(request: Request, payload: dict, user = Depends(require_admin)):
"""Set site mode (maintenance/coming_soon)"""
global _site_modes
if 'maintenance' in payload:
_site_modes['maintenance'] = bool(payload['maintenance'])
if 'coming_soon' in payload:
_site_modes['coming_soon'] = bool(payload['coming_soon'])
return {
'ok': True,
'maintenance': _site_modes['maintenance'],
'coming_soon': _site_modes['coming_soon'],
'message': 'Site mode updated'
}
@router.get("/admin/system-info")
def system_info(request: Request, user = Depends(require_admin)):
"""Get system information"""
import platform
import os
try:
import psutil
mem = psutil.virtual_memory()
disk = psutil.disk_usage("/")
has_psutil = True
except ImportError:
has_psutil = False
users = user_manager.list_users()
total_storage = 0
total_tables = 0
for u in users:
try:
u_store = get_user_store(u)
data_path = u_store.local("data")
if data_path.exists():
# Use os.scandir for faster filesystem traversal
for entry in os.scandir(data_path):
if entry.is_file():
total_storage += entry.stat().st_size
elif entry.is_dir():
for sub in os.scandir(entry.path):
if sub.is_file():
total_storage += sub.stat().st_size
tables = metadata.list_tables(u_store)
total_tables += len(tables) if isinstance(tables, list) else 0
except Exception:
pass
result = {
'ok': True,
'python_version': platform.python_version(),
'platform': platform.system(),
'total_users': len(users),
'total_tables': total_tables,
'total_storage_bytes': total_storage,
'total_storage_human': _format_bytes(total_storage),
}
if has_psutil:
result['memory_percent'] = mem.percent
result['memory_total'] = mem.total
result['memory_available'] = mem.available
result['disk_total'] = disk.total
result['disk_used'] = disk.used
result['disk_free'] = disk.free
result['disk_percent'] = disk.percent
else:
result['note'] = 'Install psutil for memory/disk stats: pip install psutil'
return result
def _format_bytes(b):
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if b < 1024:
return f"{b:.1f} {unit}"
b /= 1024
return f"{b:.1f} PB"
@router.post("/admin/test-password")
def test_password(request: Request, payload: dict, user = Depends(require_admin)):
"""Debug endpoint to test password verification"""
username = payload.get('username')
password = payload.get('password')
if not username or not password:
return {'ok': False, 'error': 'username and password required'}
user = user_manager.get_user(username)
if not user:
return {'ok': False, 'error': 'User not found'}
is_valid = user_manager.verify_password(password, user['password'])
return {
'ok': True,
'username': username,
'password_valid': is_valid,
'stored_hash': user['password'][:30] + '...',
'is_active': user.get('is_active', True)
}
# ── USER MANAGEMENT (authenticated) ──
class CreateUserRequest(BaseModel):
username: str
email: str
password: str
full_name: str = ''
department: str = ''
phone: str = ''
role: str = 'viewer'
class UpdateUserRequest(BaseModel):
email: str = None
full_name: str = None
department: str = None
phone: str = None
role: str = None
is_active: bool = None
password: str = None
@router.get("/users")
def list_users(request: Request, user = Depends(require_auth)):
"""List all users (admin-only in production, open for now)"""
users = user_manager.list_users()
safe = []
for u in users:
safe.append({
'username': u.get('username'),
'email': u.get('email'),
'full_name': u.get('full_name') or '',
'role': u.get('role', 'viewer'),
'department': u.get('department') or '',
'phone': u.get('phone') or '',
'last_login': str(u.get('last_login')) if u.get('last_login') else None,
'is_active': bool(u.get('is_active', True)),
'workspace_id': u.get('workspace_id'),
'created_at': str(u.get('created_at')) if u.get('created_at') else None,
})
return safe
@router.get("/users/{username}")
def get_user(request: Request, username: str, user = Depends(require_auth)):
"""Get specific user info"""
u = user_manager.get_user(username)
if not u:
raise HTTPException(status_code=404, detail="User not found")
return {
'username': u.get('username'),
'email': u.get('email'),
'full_name': u.get('full_name') or '',
'role': u.get('role', 'viewer'),
'is_active': bool(u.get('is_active', True)),
'workspace_id': u.get('workspace_id'),
}
@router.post("/users")
def create_user_admin(request: Request, payload: CreateUserRequest, user = Depends(require_auth)):
"""Create a new user (admin action)"""
from app.input_validator import input_validator
is_valid, error = input_validator.validate_username(payload.username)
if not is_valid:
return {'ok': False, 'error': error}
is_valid, error = input_validator.validate_email(payload.email)
if not is_valid:
return {'ok': False, 'error': error}
is_valid, error = input_validator.validate_password(payload.password)
if not is_valid:
return {'ok': False, 'error': error}
return user_manager.create_user(
username=payload.username,
email=payload.email,
password=payload.password,
full_name=payload.full_name
)
@router.put("/users/{username}")
def update_user_admin(request: Request, username: str, payload: UpdateUserRequest, user = Depends(require_auth)):
"""Update user details"""
updates = {k: v for k, v in payload.model_dump().items() if v is not None}
return user_manager.update_user(username, updates)
@router.delete("/users/{username}")
def delete_user_admin(request: Request, username: str, user = Depends(require_auth)):
"""Delete user"""
if username == user.get('username'):
return {'ok': False, 'error': 'Cannot delete yourself'}
return user_manager.delete_user(username)
@router.post("/users/{username}/regenerate-api-key")
def regen_user_api_key(request: Request, username: str, user = Depends(require_auth)):
"""Regenerate user API key"""
return user_manager.regenerate_api_key(username)
# ── USER SETTINGS ──
@router.put("/user/settings")
def update_settings(request: Request, payload: UserSettingsRequest, user = Depends(require_auth)):
updates = {k: v for k, v in payload.model_dump().items() if v is not None}
return user_manager.update_user(user['username'], updates)
@router.put("/user/profile")
def update_profile(request: Request, payload: dict, user = Depends(require_auth)):
"""Update user profile (email, full_name)"""
allowed = ['email', 'full_name']
updates = {k: v for k, v in payload.items() if k in allowed and v is not None}
if not updates:
return {'ok': False, 'error': 'No valid fields to update'}
return user_manager.update_user(user['username'], updates)
@router.put("/user/password")
def change_password(request: Request, payload: dict, user = Depends(require_auth)):
"""Change user password"""
current_password = payload.get('current_password', '')
new_password = payload.get('new_password', '')
if not current_password or not new_password:
return {'ok': False, 'error': 'Current and new password required'}
if len(new_password) < 8:
return {'ok': False, 'error': 'Password must be at least 8 characters'}
user_data = user_manager.get_user(user['username'])
if not user_data:
return {'ok': False, 'error': 'User not found'}
if not user_manager.verify_password(current_password, user_data.get('password', '')):
return {'ok': False, 'error': 'Current password is incorrect'}
result = user_manager.update_user(user['username'], {
'password': new_password
})
if result.get('ok'):
return {'ok': True, 'message': 'Password changed successfully'}
return result
@router.post("/user/regenerate-key")
def regenerate_key(request: Request, user = Depends(require_auth)):
return user_manager.regenerate_api_key(user['username'])
# ── CORPUSDB API KEY MANAGEMENT ──
@router.post("/corpusdb/generate-api-key")
def generate_corpusdb_api_key(request: Request, user = Depends(require_auth)):
"""Generate new CorpusDB API key with format cDb_xxx and secret xxx-xxx"""
result = corpusdb_api_manager.generate_api_key(
user_id=user['user_id'],
username=user['username'],
workspace_id=user['workspace_id']
)
return {
'ok': True,
'api_key': result['api_key'],
'secret_key': result['secret_key'],
'created_at': result['created_at'],
'message': 'IMPORTANT: Save your secret key now. It will not be shown again!'
}
@router.get("/corpusdb/my-keys")
def get_my_corpusdb_keys(request: Request, user = Depends(require_auth)):
"""Get all CorpusDB API keys for current user"""
keys = corpusdb_api_manager.get_user_keys(user['user_id'])
return {'ok': True, 'keys': keys}
@router.post("/corpusdb/revoke-key")
def revoke_corpusdb_key(request: Request, payload: dict, user = Depends(require_auth)):
"""Revoke a CorpusDB API key"""
api_key = payload.get('api_key')
if not api_key:
return {'ok': False, 'error': 'api_key required'}
# Verify key belongs to user
keys = corpusdb_api_manager.get_user_keys(user['user_id'])
if not any(k['api_key'] == api_key for k in keys):
return {'ok': False, 'error': 'Key not found or does not belong to you'}
success = corpusdb_api_manager.revoke_key(api_key)
return {'ok': success}
@router.post("/corpusdb/delete-key")
def delete_corpusdb_key(request: Request, payload: dict, user = Depends(require_auth)):
"""Permanently delete a CorpusDB API key"""
api_key = payload.get('api_key')
if not api_key:
return {'ok': False, 'error': 'api_key required'}
# Verify key belongs to user
keys = corpusdb_api_manager.get_user_keys(user['user_id'])
if not any(k['api_key'] == api_key for k in keys):
return {'ok': False, 'error': 'Key not found or does not belong to you'}
success = corpusdb_api_manager.delete_key(api_key)
return {'ok': success}
@router.post("/corpusdb/regenerate-secret")
def regenerate_corpusdb_secret(request: Request, payload: dict, user = Depends(require_auth)):
"""Regenerate secret key for existing API key"""
api_key = payload.get('api_key')
if not api_key:
return {'ok': False, 'error': 'api_key required'}
# Verify key belongs to user
keys = corpusdb_api_manager.get_user_keys(user['user_id'])
if not any(k['api_key'] == api_key for k in keys):
return {'ok': False, 'error': 'Key not found or does not belong to you'}
new_secret = corpusdb_api_manager.regenerate_secret(api_key)
if new_secret:
return {
'ok': True,
'secret_key': new_secret,
'message': 'IMPORTANT: Save your new secret key now. It will not be shown again!'
}
return {'ok': False, 'error': 'Failed to regenerate secret'}
# ── CORPUSDB API AUTHENTICATION (for external apps) ──
def verify_corpusdb_api_key(x_api_key: str = Header(None), x_secret_key: str = Header(None)):
"""Verify API key — routes to user key (cDb_) or database key (CDB-) based on prefix"""
if not x_api_key or not x_secret_key:
raise HTTPException(status_code=401, detail="API key and secret required")
if x_api_key.startswith('cDb_'):
# User key — full access to all databases
user_info = corpusdb_api_manager.verify_api_key(x_api_key, x_secret_key)
if not user_info:
raise HTTPException(status_code=401, detail="Invalid API key or secret")
user = user_manager.get_user_by_id(user_info['user_id'])
if not user:
raise HTTPException(status_code=401, detail="User not found")
user_store = get_user_store(user)
return {'user': user, 'user_store': user_store, 'locked_database': None}
elif x_api_key.startswith('CDB-'):
# Database key - locked to one database
# workspace_id is stored in the key record, use it to find the owner directly
all_users = user_manager.list_users()
for u in all_users:
try:
u_store = get_user_store(u)
result = db_api_key_manager.verify_db_api_key(u_store, x_api_key, x_secret_key)
if result:
workspace_id = result.get('workspace_id')
if workspace_id:
owner = user_manager.get_user_by_workspace_id(workspace_id)
if owner:
return {'user': owner, 'user_store': get_user_store(owner), 'locked_database': result['database']}
return {'user': u, 'user_store': u_store, 'locked_database': result['database']}
except Exception:
continue
raise HTTPException(status_code=401, detail="Invalid database API key or secret")
else:
raise HTTPException(status_code=401, detail="Invalid API key format")
def _check_db_access(auth: dict, database: str):
"""Raise 403 if a DB key tries to access a database it is not locked to"""
locked = auth.get('locked_database')
if locked and locked != database:
raise HTTPException(status_code=403, detail=f"This API key is locked to database '{locked}'")
@router.get("/cdb/databases")
def corpusdb_list_databases(auth = Depends(verify_corpusdb_api_key)):
"""List databases using CorpusDB API key"""
locked = auth.get('locked_database')
if locked:
return {'ok': True, 'databases': [{'name': locked}]}
return metadata.list_databases(auth['user_store'])
@router.get("/cdb/tables")
def corpusdb_list_tables(auth = Depends(verify_corpusdb_api_key)):
"""List tables using CorpusDB API key"""
locked = auth.get('locked_database')
if locked:
return table_manager.list(auth['user_store'], database=locked)
return table_manager.list(auth['user_store'])
@router.post("/cdb/query")
def corpusdb_query(payload: SQLQueryRequest, request: Request, auth = Depends(verify_corpusdb_api_key)):
"""Execute SQL query using CorpusDB API key"""
from app.query_parser import query_parser
from app.rate_limiter import rate_limiter
rate_limiter.check_rate_limit(request, limit_type='api')
is_valid, error = query_parser.validate_query(payload.sql, allow_write=True)
if not is_valid:
return {'ok': False, 'error': error}
is_allowed, reason, score = query_complexity_analyzer.analyze_complexity(payload.sql)
if not is_allowed:
return {'ok': False, 'error': reason}
engine = create_query_engine(auth['user_store'])
try:
return engine.execute_sql(payload.sql)
finally:
engine.close()
@router.get("/cdb/rows/{database}/{table}")
def corpusdb_get_rows(database: str, table: str, limit: int = 100, offset: int = 0, auth = Depends(verify_corpusdb_api_key)):
"""Get rows using CorpusDB API key"""
_check_db_access(auth, database)
return row_manager.read(database, table, auth['user_store'], limit=limit, offset=offset)
@router.post("/cdb/rows/{database}/{table}")
def corpusdb_insert_row(database: str, table: str, payload: dict, auth = Depends(verify_corpusdb_api_key)):
"""Insert row using CorpusDB API key"""
_check_db_access(auth, database)
result = row_manager.insert(database, table, payload, auth['user']['username'], auth['user_store'], auth['user']['workspace_id'])
if result.get('ok'):
try:
engine = create_query_engine(auth['user_store'])
try:
_persist_all_tables(engine, auth['user_store'])
finally:
engine.close()
except Exception:
pass
return result
# -- DATABASE ANALYTICS --
@router.get("/analytics/table/{database}/{table}")
def get_table_analytics(request: Request, database: str, table: str, user = Depends(require_auth)):
"""Get detailed analytics for a table"""
user_store = get_user_store(user)
return database_analytics.get_table_stats(user_store, database, table)
@router.get("/analytics/database/{database}")
def get_database_analytics(request: Request, database: str, user = Depends(require_auth)):
"""Get summary analytics for entire database"""
user_store = get_user_store(user)
return database_analytics.get_database_summary(user_store, database)
@router.get("/analytics/suggestions/{database}/{table}")
def get_query_suggestions(request: Request, database: str, table: str, user = Depends(require_auth)):
"""Get helpful query suggestions"""
user_store = get_user_store(user)
suggestions = database_analytics.get_query_suggestions(user_store, database, table)
return {'ok': True, 'suggestions': suggestions}
# -- DATA VISUALIZATION --
@router.post("/visualize/chart")
def create_chart_data(request: Request, payload: dict, user = Depends(require_auth)):
"""Prepare data for chart visualization"""
data = payload.get('data', [])
x_column = payload.get('x_column')
y_column = payload.get('y_column')
chart_type = payload.get('chart_type', 'bar')
return data_visualizer.prepare_chart_data(data, x_column, y_column, chart_type)
@router.post("/visualize/analyze-column")
def analyze_column(request: Request, payload: dict, user = Depends(require_auth)):
"""Analyze a column and return statistics"""
data = payload.get('data', [])
column = payload.get('column')
return data_visualizer.analyze_column(data, column)
@router.post("/visualize/distribution")
def get_distribution(request: Request, payload: dict, user = Depends(require_auth)):
"""Get distribution of values for histogram"""
data = payload.get('data', [])
column = payload.get('column')
bins = payload.get('bins', 10)
return data_visualizer.get_distribution(data, column, bins)
# -- DATABASE TEMPLATES --
@router.get("/templates")
def list_templates(request: Request, user = Depends(require_auth)):
"""List all available database templates"""
templates = database_templates.list_templates()
return {'ok': True, 'templates': templates}
@router.get("/templates/{template_id}")
def get_template(request: Request, template_id: str, user = Depends(require_auth)):
"""Get template details"""
return database_templates.get_template(template_id)
@router.post("/templates/{template_id}/create")
def create_from_template(request: Request, template_id: str, payload: dict, user = Depends(require_auth)):
"""Create database from template"""
database_name = payload.get('database_name')
if not database_name:
return {'ok': False, 'error': 'database_name required'}
user_store = get_user_store(user)
return database_templates.create_from_template(
template_id,
database_name,
user_store,
user['username'],
user['workspace_id']
)
# -- ADVANCED IMPORT/EXPORT --
@router.post("/export/database/sql")
def export_database_sql(request: Request, payload: dict, user = Depends(require_auth)):
"""Export entire database to SQL file (phpMyAdmin style)"""
database = payload.get('database')
include_structure = payload.get('include_structure', True)
include_data = payload.get('include_data', True)
add_drop_table = payload.get('add_drop_table', True)
if not database:
return {'ok': False, 'error': 'database required'}
# Get all tables in database
user_store = get_user_store(user)
all_tables = table_manager.list(user_store)
db_tables = [t for t in all_tables if t['database'] == database]
if not db_tables:
return {'ok': False, 'error': 'No tables found in database'}
# Fetch data for each table
tables_data = {}
for table_info in db_tables:
table_name = table_info['table']
rows_result = row_manager.read(database, table_name, user_store, limit=100000)
if rows_result:
tables_data[table_name] = rows_result
else:
tables_data[table_name] = []
# Generate SQL dump
sql = advanced_import_export.export_database_to_sql(
database,
tables_data,
include_structure,
include_data,
add_drop_table
)
return {
'ok': True,
'sql': sql,
'format': 'sql',
'database': database,
'tables': len(tables_data),
'filename': f"{database}_export.sql"
}
@router.post("/export/database/json")
def export_database_json(request: Request, payload: dict, user = Depends(require_auth)):
"""Export entire database to JSON format"""
database = payload.get('database')
pretty = payload.get('pretty', True)
if not database:
return {'ok': False, 'error': 'database required'}
user_store = get_user_store(user)
all_tables = table_manager.list(user_store)
db_tables = [t for t in all_tables if t['database'] == database]
tables_data = {}
for table_info in db_tables:
table_name = table_info['table']
rows_result = row_manager.read(database, table_name, user_store, limit=100000)
tables_data[table_name] = rows_result if rows_result else []
json_data = advanced_import_export.export_database_to_json(database, tables_data, pretty)
return {
'ok': True,
'json': json_data,
'format': 'json',
'database': database,
'tables': len(tables_data),
'filename': f"{database}_export.json"
}
@router.post("/export/database/csv")
def export_database_csv(request: Request, payload: dict, user = Depends(require_auth)):
"""Export entire database to CSV (one file per table)"""
database = payload.get('database')
if not database:
return {'ok': False, 'error': 'database required'}
user_store = get_user_store(user)
all_tables = table_manager.list(user_store)
db_tables = [t for t in all_tables if t['database'] == database]
tables_data = {}
for table_info in db_tables:
table_name = table_info['table']
rows_result = row_manager.read(database, table_name, user_store, limit=100000)
tables_data[table_name] = rows_result if rows_result else []
csv_files = advanced_import_export.export_database_to_csv(tables_data)
return {
'ok': True,
'csv_files': csv_files,
'format': 'csv',
'database': database,
'tables': len(csv_files)
}
@router.post("/export/database/xml")
def export_database_xml(request: Request, payload: dict, user = Depends(require_auth)):
"""Export entire database to XML format"""
database = payload.get('database')
if not database:
return {'ok': False, 'error': 'database required'}
user_store = get_user_store(user)
all_tables = table_manager.list(user_store)
db_tables = [t for t in all_tables if t['database'] == database]
tables_data = {}
for table_info in db_tables:
table_name = table_info['table']
rows_result = row_manager.read(database, table_name, user_store, limit=100000)
tables_data[table_name] = rows_result if rows_result else []
xml = advanced_import_export.export_database_to_xml(database, tables_data)
return {
'ok': True,
'xml': xml,
'format': 'xml',
'database': database,
'tables': len(tables_data),
'filename': f"{database}_export.xml"
}
@router.post("/export/sql")
def export_sql(request: Request, payload: dict, user = Depends(require_auth)):
"""Export data to SQL format"""
database = payload.get('database')
table = payload.get('table')
data = payload.get('data', [])
sql = advanced_import_export.export_to_sql(database, table, data)
return {'ok': True, 'sql': sql, 'format': 'sql'}
@router.post("/export/csv")
def export_csv(request: Request, payload: dict, user = Depends(require_auth)):
"""Export data to CSV format"""
data = payload.get('data', [])
csv = advanced_import_export.export_to_csv(data)
return {'ok': True, 'csv': csv, 'format': 'csv'}
@router.post("/export/json")
def export_json_data(request: Request, payload: dict, user = Depends(require_auth)):
"""Export data to JSON format"""
data = payload.get('data', [])
pretty = payload.get('pretty', True)
json_data = advanced_import_export.export_to_json(data, pretty)
return {'ok': True, 'json': json_data, 'format': 'json'}
@router.post("/export/xml")
def export_xml(request: Request, payload: dict, user = Depends(require_auth)):
"""Export data to XML format"""
database = payload.get('database')
table = payload.get('table')
data = payload.get('data', [])
xml = advanced_import_export.export_to_xml(database, table, data)
return {'ok': True, 'xml': xml, 'format': 'xml'}
@router.post("/import/sql")
def import_sql(request: Request, payload: dict, user = Depends(require_auth)):
"""Import and execute SQL file (phpMyAdmin format)"""
sql_content = payload.get('sql')
execute = payload.get('execute', True)
database = payload.get('database')
if not sql_content:
return {'ok': False, 'error': 'sql content required'}
user_store = get_user_store(user)
if execute:
engine = create_query_engine(user_store)
try:
result = advanced_import_export.import_from_sql(sql_content, query_engine=engine, target_database=database)
# Persist all tables to Parquet and upload to HF
_persist_all_tables(engine, user_store)
# Log import result to security log (synced to HF Space)
is_ok = result.get('ok', False)
severity = 'INFO' if is_ok else 'ERROR'
details = {
'database': database,
'total_statements': result.get('total_statements', 0),
'executed': result.get('executed', 0),
'failed': result.get('failed', 0),
'tables_created': result.get('tables_created', 0),
'rows_inserted': result.get('rows_inserted', 0),
}
security_logger.log_event(
'DATA_IMPORT', username=user['username'],
ip_address=request.client.host if request.client else 'unknown',
details=details, severity=severity
)
return result
finally:
engine.close()
else:
# Just parse, don't execute
return advanced_import_export.import_from_sql(sql_content)
@router.post("/import/csv-to-table")
async def import_csv_to_table(request: Request, database: str = Form(...), table: str = Form(...),
file: UploadFile = File(...), user = Depends(require_auth)):
"""Import CSV file directly to a table"""
user_store = get_user_store(user)
# Read CSV content
content = await file.read()
csv_content = content.decode('utf-8')
# Parse CSV
import csv
import io
csv_reader = csv.DictReader(io.StringIO(csv_content))
rows = list(csv_reader)
if not rows:
return {'ok': False, 'error': 'CSV file is empty'}
# Check if table exists
all_tables = table_manager.list(user_store)
table_exists = any(t['database'] == database and t['table'] == table for t in all_tables)
if not table_exists:
# Create table from CSV columns
columns = []
for col_name in rows[0].keys():
# Infer type from first row
value = rows[0][col_name]
col_type = 'TEXT'
if value:
try:
int(value)
col_type = 'INTEGER'
except:
try:
float(value)
col_type = 'DOUBLE'
except:
col_type = 'TEXT'
columns.append({'name': col_name, 'type': col_type})
# Create table
create_result = table_manager.create(database, table, columns, user_store)
if not create_result.get('ok'):
return create_result
# Insert rows
success_count = 0
error_count = 0
errors = []
for row in rows:
try:
result = row_manager.insert(database, table, row, user['username'], user_store, user['workspace_id'])
if result.get('ok'):
success_count += 1
else:
error_count += 1
errors.append(result.get('error', 'Unknown error'))
except Exception as e:
error_count += 1
errors.append(str(e))
return {
'ok': True,
'message': f'Imported {success_count} rows successfully',
'success_count': success_count,
'error_count': error_count,
'errors': errors[:10] if errors else [] # Return first 10 errors
}
@router.post("/import/sql-dump")
def import_sql_dump(request: Request, payload: dict, user = Depends(require_auth)):
"""Import a phpMyAdmin-style SQL dump — converts MySQL syntax to DuckDB and executes"""
from app.query_parser import query_parser
from app.mysql_to_duckdb_converter import mysql_to_duckdb_converter
from app.table_manager import table_manager
from app.utils import write_json
database = payload.get('database')
sql_text = payload.get('sql', '')
if not database or not sql_text:
return {'ok': False, 'error': 'database and sql required'}
safe_db = query_parser.sanitize_identifier(database)
user_store = get_user_store(user)
engine = create_query_engine(user_store)
# Ensure schema exists
try:
engine.conn.execute(f'CREATE SCHEMA IF NOT EXISTS "{safe_db}"')
except:
pass
statements = query_parser.split_sql_statements(sql_text)
success_count = 0
error_count = 0
skipped_count = 0
errors = []
tables_created = []
try:
for stmt in statements:
stmt = stmt.strip()
if not stmt:
continue
# Convert MySQL syntax to DuckDB
converted, should_skip = mysql_to_duckdb_converter.convert_statement(stmt)
if should_skip or not converted:
skipped_count += 1
continue
# Prefix table references with database if unqualified
if database:
converted = re.sub(
r'(CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?)["`]?(\w+)["`]?',
lambda m, db=safe_db: f'{m.group(1)}"{db}"."{m.group(2)}"',
converted, count=1, flags=re.IGNORECASE
)
converted = re.sub(
r'(INSERT\s+INTO\s+)["`]?(\w+)["`]?',
lambda m, db=safe_db: f'{m.group(1)}"{db}"."{m.group(2)}"',
converted, count=1, flags=re.IGNORECASE
)
converted = re.sub(
r'(ALTER\s+TABLE\s+)["`]?(\w+)["`]?',
lambda m, db=safe_db: f'{m.group(1)}"{db}"."{m.group(2)}"',
converted, count=1, flags=re.IGNORECASE
)
try:
result = engine.execute_sql(converted)
if result.get('ok'):
success_count += 1
# Track created tables
create_match = re.search(r'CREATE\s+TABLE\s+["`]?(\w+)["`]?\.["`]?(\w+)["`]?', converted, re.IGNORECASE)
if create_match:
tables_created.append(create_match.group(2))
else:
error_count += 1
errors.append(result.get('error', 'Unknown error'))
except Exception as e:
error_count += 1
errors.append(str(e))
# Persist all tables to Parquet and update metadata
if success_count > 0:
try:
_persist_all_tables(engine, user_store)
except Exception as e:
print(f"Warning: Failed to persist import results: {e}")
finally:
engine.close()
return {
'ok': True,
'success_count': success_count,
'error_count': error_count,
'skipped_count': skipped_count,
'tables_created': tables_created,
'errors': errors[:10]
}
@router.post("/import/json-rows")
def import_json_rows(request: Request, payload: dict, user = Depends(require_auth)):
"""Import an array of JSON rows into a table, creating it if needed"""
database = payload.get('database')
table = payload.get('table')
rows = payload.get('rows', [])
if not database or not table or not rows:
return {'ok': False, 'error': 'database, table, and rows required'}
if not isinstance(rows, list) or len(rows) == 0:
return {'ok': False, 'error': 'rows must be a non-empty array'}
user_store = get_user_store(user)
success_count = 0
error_count = 0
errors = []
for row in rows:
try:
result = row_manager.insert(database, table, row, user['username'], user_store, user['workspace_id'])
if result.get('ok'):
success_count += 1
else:
error_count += 1
errors.append(result.get('error', 'Unknown error'))
except Exception as e:
error_count += 1
errors.append(str(e))
if success_count > 0:
try:
engine = create_query_engine(user_store)
try:
_persist_all_tables(engine, user_store)
finally:
engine.close()
except Exception:
pass
return {
'ok': True,
'success_count': success_count,
'error_count': error_count,
'errors': errors[:10]
}
@router.post("/import/csv-parse")
def import_csv_parse(request: Request, payload: dict, user = Depends(require_auth)):
"""Parse CSV data"""
csv_content = payload.get('csv')
return advanced_import_export.import_from_csv(csv_content)
@router.post("/import/json-parse")
def import_json_parse(request: Request, payload: dict, user = Depends(require_auth)):
"""Parse JSON data"""
json_content = payload.get('json')
return advanced_import_export.import_from_json(json_content)
# -- MIGRATIONS --
@router.post("/migrations/create")
def create_migration(request: Request, payload: dict, user = Depends(require_auth)):
"""Create a new migration"""
database = payload.get('database')
name = payload.get('name')
up_sql = payload.get('up_sql')
down_sql = payload.get('down_sql')
user_store = get_user_store(user)
return migration_manager.create_migration(user_store, database, name, up_sql, down_sql)
@router.get("/migrations/{database}")
def list_migrations(request: Request, database: str, user = Depends(require_auth)):
"""List migrations for database"""
user_store = get_user_store(user)
migrations = migration_manager.list_migrations(user_store, database)
return {'ok': True, 'migrations': migrations}
@router.post("/migrations/{database}/{migration_id}/apply")
def apply_migration(request: Request, database: str, migration_id: str, user = Depends(require_auth)):
"""Apply a migration"""
user_store = get_user_store(user)
engine = create_query_engine(user_store)
try:
return migration_manager.apply_migration(user_store, database, migration_id, engine)
finally:
engine.close()
@router.post("/migrations/{database}/{migration_id}/rollback")
def rollback_migration(request: Request, database: str, migration_id: str, user = Depends(require_auth)):
"""Rollback a migration"""
user_store = get_user_store(user)
engine = create_query_engine(user_store)
try:
return migration_manager.rollback_migration(user_store, database, migration_id, engine)
finally:
engine.close()
# -- QUERY BUILDER --
@router.post("/query-builder/select")
def build_select_query(request: Request, payload: dict, user = Depends(require_auth)):
"""Build SELECT query"""
sql = query_builder.build_select(**payload)
return {'ok': True, 'sql': sql}
@router.post("/query-builder/insert")
def build_insert_query(request: Request, payload: dict, user = Depends(require_auth)):
"""Build INSERT query"""
sql = query_builder.build_insert(**payload)
return {'ok': True, 'sql': sql}
@router.post("/query-builder/update")
def build_update_query(request: Request, payload: dict, user = Depends(require_auth)):
"""Build UPDATE query"""
sql = query_builder.build_update(**payload)
return {'ok': True, 'sql': sql}
@router.post("/query-builder/delete")
def build_delete_query(request: Request, payload: dict, user = Depends(require_auth)):
"""Build DELETE query"""
sql = query_builder.build_delete(**payload)
return {'ok': True, 'sql': sql}
@router.post("/query-builder/aggregate")
def build_aggregate_query(request: Request, payload: dict, user = Depends(require_auth)):
"""Build aggregate query"""
sql = query_builder.build_aggregate(**payload)
return {'ok': True, 'sql': sql}
# -- DATA VALIDATION --
@router.post("/validate/row")
def validate_row(request: Request, payload: dict, user = Depends(require_auth)):
"""Validate row data"""
data = payload.get('data')
schema = payload.get('schema')
return data_validator.validate_row(data, schema)
@router.post("/validate/email")
def validate_email(request: Request, payload: dict, user = Depends(require_auth)):
"""Validate email"""
email = payload.get('email')
valid = data_validator.validate_email(email)
return {'ok': True, 'valid': valid}
@router.post("/validate/phone")
def validate_phone(request: Request, payload: dict, user = Depends(require_auth)):
"""Validate phone number"""
phone = payload.get('phone')
valid = data_validator.validate_phone(phone)
return {'ok': True, 'valid': valid}
# -- JOB SCHEDULER --
@router.post("/jobs/create")
def create_job(request: Request, payload: dict, user = Depends(require_auth)):
"""Create scheduled job"""
name = payload.get('name')
job_type = payload.get('type')
config = payload.get('config')
schedule = payload.get('schedule')
user_store = get_user_store(user)
return job_scheduler.create_job(user_store, name, job_type, config, schedule)
@router.get("/jobs")
def list_jobs(request: Request, user = Depends(require_auth)):
"""List all jobs"""
user_store = get_user_store(user)
jobs = job_scheduler.list_jobs(user_store)
return {'ok': True, 'jobs': jobs}
@router.post("/jobs/{job_id}/run")
def run_job(request: Request, job_id: str, user = Depends(require_auth)):
"""Run job manually"""
user_store = get_user_store(user)
engine = create_query_engine(user_store)
try:
return job_scheduler.run_job_now(user_store, job_id, engine)
finally:
engine.close()
@router.post("/jobs/{job_id}/enable")
def enable_job(request: Request, job_id: str, user = Depends(require_auth)):
"""Enable job"""
user_store = get_user_store(user)
return job_scheduler.enable_job(user_store, job_id)
@router.post("/jobs/{job_id}/disable")
def disable_job(request: Request, job_id: str, user = Depends(require_auth)):
"""Disable job"""
user_store = get_user_store(user)
return job_scheduler.disable_job(user_store, job_id)
@router.delete("/jobs/{job_id}")
def delete_job(request: Request, job_id: str, user = Depends(require_auth)):
"""Delete job"""
user_store = get_user_store(user)
return job_scheduler.delete_job(user_store, job_id)
# -- QUERY BOOKMARKS --
@router.post("/bookmarks")
def create_bookmark(request: Request, payload: dict, user = Depends(require_auth)):
"""Save a new query bookmark"""
name = payload.get('name')
sql = payload.get('sql')
database = payload.get('database')
description = payload.get('description', '')
tags = payload.get('tags', [])
if not name or not sql:
return {'ok': False, 'error': 'name and sql required'}
user_store = get_user_store(user)
return bookmark_manager.save_bookmark(user_store, name, sql, database, description, tags)
@router.get("/bookmarks")
def list_bookmarks(request: Request, database: str = None, tags: str = None, user = Depends(require_auth)):
"""List all bookmarks with optional filtering"""
user_store = get_user_store(user)
tag_list = tags.split(',') if tags else None
bookmarks = bookmark_manager.list_bookmarks(user_store, database, tag_list)
return {'ok': True, 'bookmarks': bookmarks}
@router.get("/bookmarks/{bookmark_id}")
def get_bookmark(request: Request, bookmark_id: str, user = Depends(require_auth)):
"""Get a specific bookmark"""
user_store = get_user_store(user)
bookmark = bookmark_manager.get_bookmark(user_store, bookmark_id)
if not bookmark:
return {'ok': False, 'error': 'Bookmark not found'}
return {'ok': True, 'bookmark': bookmark}
@router.put("/bookmarks/{bookmark_id}")
def update_bookmark(request: Request, bookmark_id: str, payload: dict, user = Depends(require_auth)):
"""Update an existing bookmark"""
user_store = get_user_store(user)
return bookmark_manager.update_bookmark(user_store, bookmark_id, payload)
@router.delete("/bookmarks/{bookmark_id}")
def delete_bookmark(request: Request, bookmark_id: str, user = Depends(require_auth)):
"""Delete a bookmark"""
user_store = get_user_store(user)
return bookmark_manager.delete_bookmark(user_store, bookmark_id)
@router.post("/bookmarks/{bookmark_id}/execute")
def execute_bookmark(request: Request, bookmark_id: str, user = Depends(require_auth)):
"""Execute a bookmarked query"""
user_store = get_user_store(user)
engine = create_query_engine(user_store)
try:
result = bookmark_manager.execute_bookmark(user_store, bookmark_id, engine)
return result
finally:
engine.close()
@router.get("/bookmarks/search/{search_term}")
def search_bookmarks(request: Request, search_term: str, user = Depends(require_auth)):
"""Search bookmarks"""
user_store = get_user_store(user)
results = bookmark_manager.search_bookmarks(user_store, search_term)
return {'ok': True, 'bookmarks': results}
@router.get("/bookmarks/popular/list")
def get_popular_bookmarks(request: Request, limit: int = 10, user = Depends(require_auth)):
"""Get most popular bookmarks"""
user_store = get_user_store(user)
bookmarks = bookmark_manager.get_popular_bookmarks(user_store, limit)
return {'ok': True, 'bookmarks': bookmarks}
@router.get("/bookmarks/recent/list")
def get_recent_bookmarks(request: Request, limit: int = 10, user = Depends(require_auth)):
"""Get recently executed bookmarks"""
user_store = get_user_store(user)
bookmarks = bookmark_manager.get_recent_bookmarks(user_store, limit)
return {'ok': True, 'bookmarks': bookmarks}
@router.get("/bookmarks/export/json")
def export_bookmarks(request: Request, user = Depends(require_auth)):
"""Export all bookmarks as JSON"""
user_store = get_user_store(user)
json_data = bookmark_manager.export_bookmarks(user_store)
return Response(content=json_data, media_type="application/json")
@router.post("/bookmarks/import/json")
def import_bookmarks(request: Request, payload: dict, user = Depends(require_auth)):
"""Import bookmarks from JSON"""
json_data = payload.get('json')
merge = payload.get('merge', True)
if not json_data:
return {'ok': False, 'error': 'json data required'}
user_store = get_user_store(user)
return bookmark_manager.import_bookmarks(user_store, json_data, merge)
# -- GLOBAL SEARCH --
@router.post("/search/global")
def search_database_global(request: Request, payload: dict, user = Depends(require_auth)):
"""Search across all tables in a database"""
database = payload.get('database')
search_term = payload.get('search_term')
tables = payload.get('tables') # Optional: specific tables
case_sensitive = payload.get('case_sensitive', False)
regex = payload.get('regex', False)
limit_per_table = payload.get('limit_per_table', 100)
if not database or not search_term:
return {'ok': False, 'error': 'database and search_term required'}
user_store = get_user_store(user)
return global_search.search_database(
user_store, database, search_term, tables,
case_sensitive, regex, limit_per_table
)
@router.post("/search/column")
def search_column(request: Request, payload: dict, user = Depends(require_auth)):
"""Search in a specific column"""
database = payload.get('database')
table = payload.get('table')
column = payload.get('column')
search_term = payload.get('search_term')
case_sensitive = payload.get('case_sensitive', False)
regex = payload.get('regex', False)
limit = payload.get('limit', 100)
if not all([database, table, column, search_term]):
return {'ok': False, 'error': 'database, table, column, and search_term required'}
user_store = get_user_store(user)
return global_search.search_column(
user_store, database, table, column,
search_term, case_sensitive, regex, limit
)
@router.post("/search/duplicates")
def find_duplicates(request: Request, payload: dict, user = Depends(require_auth)):
"""Find duplicate rows in a table"""
database = payload.get('database')
table = payload.get('table')
columns = payload.get('columns', [])
limit = payload.get('limit', 100)
if not database or not table or not columns:
return {'ok': False, 'error': 'database, table, and columns required'}
user_store = get_user_store(user)
return global_search.find_duplicates(user_store, database, table, columns, limit)
@router.get("/search/nulls/{database}/{table}")
def find_null_values(request: Request, database: str, table: str, user = Depends(require_auth)):
"""Find NULL values in a table"""
user_store = get_user_store(user)
return global_search.find_null_values(user_store, database, table)
# -- TABLE MAINTENANCE --
@router.post("/maintenance/check/{database}/{table}")
def check_table(request: Request, database: str, table: str, user = Depends(require_auth)):
"""Check table for errors and integrity issues"""
user_store = get_user_store(user)
return table_maintenance.check_table(user_store, database, table)
@router.post("/maintenance/optimize/{database}/{table}")
def optimize_table(request: Request, database: str, table: str, user = Depends(require_auth)):
"""Optimize table storage and performance"""
user_store = get_user_store(user)
return table_maintenance.optimize_table(user_store, database, table)
@router.post("/maintenance/analyze/{database}/{table}")
def analyze_table(request: Request, database: str, table: str, user = Depends(require_auth)):
"""Analyze table and collect statistics"""
user_store = get_user_store(user)
return table_maintenance.analyze_table(user_store, database, table)
@router.post("/maintenance/repair/{database}/{table}")
def repair_table(request: Request, database: str, table: str, user = Depends(require_auth)):
"""Attempt to repair a corrupted table"""
user_store = get_user_store(user)
return table_maintenance.repair_table(user_store, database, table)
@router.get("/maintenance/status/{database}/{table}")
def get_table_status(request: Request, database: str, table: str, user = Depends(require_auth)):
"""Get comprehensive table status and statistics"""
user_store = get_user_store(user)
return table_maintenance.get_table_status(user_store, database, table)
# -- BLOB HANDLING --
@router.get("/blob/info/{database}/{table}/{row_id_field}/{row_id_value}/{blob_column}")
def get_blob_info(request: Request, database: str, table: str, row_id_field: str,
row_id_value: str, blob_column: str, user = Depends(require_auth)):
"""Get BLOB information and preview"""
user_store = get_user_store(user)
return blob_handler.get_blob_info(user_store, database, table, row_id_field, row_id_value, blob_column)
@router.post("/blob/analyze")
def analyze_blob(request: Request, payload: dict, user = Depends(require_auth)):
"""Analyze BLOB data"""
import base64
data_base64 = payload.get('data')
filename = payload.get('filename')
if not data_base64:
return {'ok': False, 'error': 'data required (base64 encoded)'}
try:
data = base64.b64decode(data_base64)
analysis = blob_handler.analyze_blob(data, filename)
analysis['ok'] = True
return analysis
except Exception as e:
return {'ok': False, 'error': f'Failed to analyze BLOB: {str(e)}'}
@router.post("/blob/download")
def create_blob_download(request: Request, payload: dict, user = Depends(require_auth)):
"""Create download response for BLOB"""
import base64
data_base64 = payload.get('data')
filename = payload.get('filename', 'download.bin')
mime_type = payload.get('mime_type')
if not data_base64:
return {'ok': False, 'error': 'data required (base64 encoded)'}
try:
data = base64.b64decode(data_base64)
return blob_handler.create_download_response(data, filename, mime_type)
except Exception as e:
return {'ok': False, 'error': f'Failed to create download: {str(e)}'}
# -- VIEWS MANAGEMENT --
@router.post("/views")
def create_view(request: Request, payload: dict, user = Depends(require_auth)):
"""Create a new view"""
database = payload.get('database')
view_name = payload.get('view_name')
sql_query = payload.get('sql_query')
replace = payload.get('replace', False)
if not all([database, view_name, sql_query]):
return {'ok': False, 'error': 'database, view_name, and sql_query required'}
user_store = get_user_store(user)
return views_manager.create_view(user_store, database, view_name, sql_query, replace)
@router.get("/views")
def list_views(request: Request, database: str = None, user = Depends(require_auth)):
"""List all views"""
user_store = get_user_store(user)
views = views_manager.list_views(user_store, database)
return {'ok': True, 'views': views}
@router.get("/views/{database}/{view_name}")
def get_view(request: Request, database: str, view_name: str, user = Depends(require_auth)):
"""Get view information"""
user_store = get_user_store(user)
view = views_manager.get_view(user_store, database, view_name)
if not view:
return {'ok': False, 'error': 'View not found'}
return {'ok': True, 'view': view}
@router.delete("/views/{database}/{view_name}")
def drop_view(request: Request, database: str, view_name: str, user = Depends(require_auth)):
"""Drop a view"""
user_store = get_user_store(user)
return views_manager.drop_view(user_store, database, view_name)
@router.get("/views/{database}/{view_name}/query")
def query_view(request: Request, database: str, view_name: str,
limit: int = 100, offset: int = 0, user = Depends(require_auth)):
"""Query a view"""
user_store = get_user_store(user)
return views_manager.query_view(user_store, database, view_name, limit, offset)
@router.get("/views/{database}/{view_name}/dependencies")
def get_view_dependencies(request: Request, database: str, view_name: str, user = Depends(require_auth)):
"""Get view dependencies"""
user_store = get_user_store(user)
return views_manager.get_view_dependencies(user_store, database, view_name)
# -- PRIVILEGE MANAGEMENT V2 --
@router.post("/privileges/grant/database")
def grant_database_privilege(request: Request, payload: dict, user = Depends(require_auth)):
"""Grant privileges on entire database"""
username = payload.get('username')
database = payload.get('database')
privileges = payload.get('privileges', [])
grant_option = payload.get('grant_option', False)
if not all([username, database, privileges]):
return {'ok': False, 'error': 'username, database, and privileges required'}
user_store = get_user_store(user)
return privilege_manager_v2.grant_database_privilege(
user_store, username, database, privileges, grant_option
)
@router.post("/privileges/grant/table")
def grant_table_privilege(request: Request, payload: dict, user = Depends(require_auth)):
"""Grant privileges on specific table"""
username = payload.get('username')
database = payload.get('database')
table = payload.get('table')
privileges = payload.get('privileges', [])
grant_option = payload.get('grant_option', False)
if not all([username, database, table, privileges]):
return {'ok': False, 'error': 'username, database, table, and privileges required'}
user_store = get_user_store(user)
return privilege_manager_v2.grant_table_privilege(
user_store, username, database, table, privileges, grant_option
)
@router.post("/privileges/grant/column")
def grant_column_privilege(request: Request, payload: dict, user = Depends(require_auth)):
"""Grant privileges on specific columns"""
username = payload.get('username')
database = payload.get('database')
table = payload.get('table')
columns = payload.get('columns', [])
privileges = payload.get('privileges', [])
if not all([username, database, table, columns, privileges]):
return {'ok': False, 'error': 'username, database, table, columns, and privileges required'}
user_store = get_user_store(user)
return privilege_manager_v2.grant_column_privilege(
user_store, username, database, table, columns, privileges
)
@router.post("/privileges/revoke/database")
def revoke_database_privilege(request: Request, payload: dict, user = Depends(require_auth)):
"""Revoke privileges on database"""
username = payload.get('username')
database = payload.get('database')
privileges = payload.get('privileges', [])
if not all([username, database, privileges]):
return {'ok': False, 'error': 'username, database, and privileges required'}
user_store = get_user_store(user)
return privilege_manager_v2.revoke_database_privilege(
user_store, username, database, privileges
)
@router.post("/privileges/revoke/table")
def revoke_table_privilege(request: Request, payload: dict, user = Depends(require_auth)):
"""Revoke privileges on table"""
username = payload.get('username')
database = payload.get('database')
table = payload.get('table')
privileges = payload.get('privileges', [])
if not all([username, database, table, privileges]):
return {'ok': False, 'error': 'username, database, table, and privileges required'}
user_store = get_user_store(user)
return privilege_manager_v2.revoke_table_privilege(
user_store, username, database, table, privileges
)
@router.get("/privileges/user/{username}")
def get_user_privileges(request: Request, username: str, user = Depends(require_auth)):
"""Get all privileges for a user"""
user_store = get_user_store(user)
return privilege_manager_v2.get_user_privileges(user_store, username)
@router.get("/privileges/check")
def check_privilege(request: Request, username: str, privilege: str,
database: str = None, table: str = None, column: str = None,
user = Depends(require_auth)):
"""Check if user has specific privilege"""
user_store = get_user_store(user)
has_privilege = privilege_manager_v2.check_privilege(
user_store, username, privilege, database, table, column
)
return {'ok': True, 'has_privilege': has_privilege}
@router.get("/privileges/list")
def list_all_privileges(request: Request, user = Depends(require_auth)):
"""List all users and their privileges"""
user_store = get_user_store(user)
return privilege_manager_v2.list_all_privileges(user_store)
@router.post("/privileges/copy")
def copy_privileges(request: Request, payload: dict, user = Depends(require_auth)):
"""Copy privileges from one user to another"""
from_user = payload.get('from_user')
to_user = payload.get('to_user')
if not all([from_user, to_user]):
return {'ok': False, 'error': 'from_user and to_user required'}
user_store = get_user_store(user)
return privilege_manager_v2.copy_privileges(user_store, from_user, to_user)
@router.get("/privileges/export")
def export_privileges(request: Request, username: str = None, user = Depends(require_auth)):
"""Export privileges as SQL GRANT statements"""
user_store = get_user_store(user)
sql = privilege_manager_v2.export_privileges(user_store, username)
return Response(content=sql, media_type="text/plain")
# -- STORED PROCEDURES AND FUNCTIONS --
@router.post("/procedures")
def create_procedure(request: Request, payload: dict, user = Depends(require_auth)):
"""Create a new stored procedure"""
database = payload.get('database')
name = payload.get('name')
parameters = payload.get('parameters', [])
body = payload.get('body')
description = payload.get('description', '')
if not all([database, name, body]):
return {'ok': False, 'error': 'database, name, and body required'}
user_store = get_user_store(user)
return stored_procedure_manager.create_procedure(
user_store, database, name, parameters, body, description
)
@router.post("/functions")
def create_function(request: Request, payload: dict, user = Depends(require_auth)):
"""Create a new function"""
database = payload.get('database')
name = payload.get('name')
parameters = payload.get('parameters', [])
return_type = payload.get('return_type')
body = payload.get('body')
description = payload.get('description', '')
if not all([database, name, return_type, body]):
return {'ok': False, 'error': 'database, name, return_type, and body required'}
user_store = get_user_store(user)
return stored_procedure_manager.create_function(
user_store, database, name, parameters, return_type, body, description
)
@router.get("/procedures")
def list_procedures(request: Request, database: str = None, user = Depends(require_auth)):
"""List all procedures and functions"""
user_store = get_user_store(user)
procedures = stored_procedure_manager.list_procedures(user_store, database)
return {'ok': True, 'procedures': procedures}
@router.get("/procedures/{database}/{name}")
def get_procedure(request: Request, database: str, name: str, user = Depends(require_auth)):
"""Get a specific procedure or function"""
user_store = get_user_store(user)
procedure = stored_procedure_manager.get_procedure(user_store, database, name)
if not procedure:
return {'ok': False, 'error': 'Procedure not found'}
return {'ok': True, 'procedure': procedure}
@router.put("/procedures/{database}/{name}")
def update_procedure(request: Request, database: str, name: str, payload: dict, user = Depends(require_auth)):
"""Update a procedure or function"""
user_store = get_user_store(user)
return stored_procedure_manager.update_procedure(user_store, database, name, payload)
@router.delete("/procedures/{database}/{name}")
def drop_procedure(request: Request, database: str, name: str, user = Depends(require_auth)):
"""Drop a procedure or function"""
user_store = get_user_store(user)
return stored_procedure_manager.drop_procedure(user_store, database, name)
@router.post("/procedures/{database}/{name}/execute")
def execute_procedure(request: Request, database: str, name: str, payload: dict, user = Depends(require_auth)):
"""Execute a stored procedure"""
arguments = payload.get('arguments', [])
user_store = get_user_store(user)
engine = create_query_engine(user_store)
try:
return stored_procedure_manager.execute_procedure(
user_store, database, name, arguments, engine
)
finally:
engine.close()
@router.post("/functions/{database}/{name}/call")
def call_function(request: Request, database: str, name: str, payload: dict, user = Depends(require_auth)):
"""Call a function"""
arguments = payload.get('arguments', [])
user_store = get_user_store(user)
engine = create_query_engine(user_store)
try:
return stored_procedure_manager.call_function(
user_store, database, name, arguments, engine
)
finally:
engine.close()
@router.get("/procedures/{database}/{name}/stats")
def get_procedure_stats(request: Request, database: str, name: str, user = Depends(require_auth)):
"""Get execution statistics for a procedure/function"""
user_store = get_user_store(user)
return stored_procedure_manager.get_procedure_stats(user_store, database, name)
@router.get("/procedures/{database}/export")
def export_procedures(request: Request, database: str, user = Depends(require_auth)):
"""Export all procedures and functions as SQL"""
user_store = get_user_store(user)
sql = stored_procedure_manager.export_procedures(user_store, database)
return Response(content=sql, media_type="text/plain")
# -- TRIGGERS --
@router.post("/triggers")
def create_trigger(request: Request, payload: dict, user = Depends(require_auth)):
"""Create a new trigger"""
database = payload.get('database')
table = payload.get('table')
name = payload.get('name')
timing = payload.get('timing') # BEFORE or AFTER
event = payload.get('event') # INSERT, UPDATE, DELETE
body = payload.get('body')
description = payload.get('description', '')
if not all([database, table, name, timing, event, body]):
return {'ok': False, 'error': 'database, table, name, timing, event, and body required'}
user_store = get_user_store(user)
return triggers_manager.create_trigger(
user_store, database, table, name, timing, event, body, description
)
@router.get("/triggers")
def list_triggers(request: Request, database: str = None, table: str = None, user = Depends(require_auth)):
"""List all triggers"""
user_store = get_user_store(user)
triggers = triggers_manager.list_triggers(user_store, database, table)
return {'ok': True, 'triggers': triggers}
@router.get("/triggers/{database}/{name}")
def get_trigger(request: Request, database: str, name: str, user = Depends(require_auth)):
"""Get a specific trigger"""
user_store = get_user_store(user)
trigger = triggers_manager.get_trigger(user_store, database, name)
if not trigger:
return {'ok': False, 'error': 'Trigger not found'}
return {'ok': True, 'trigger': trigger}
@router.put("/triggers/{database}/{name}")
def update_trigger(request: Request, database: str, name: str, payload: dict, user = Depends(require_auth)):
"""Update a trigger"""
user_store = get_user_store(user)
return triggers_manager.update_trigger(user_store, database, name, payload)
@router.delete("/triggers/{database}/{name}")
def drop_trigger(request: Request, database: str, name: str, user = Depends(require_auth)):
"""Drop a trigger"""
user_store = get_user_store(user)
return triggers_manager.drop_trigger(user_store, database, name)
@router.post("/triggers/{database}/{name}/enable")
def enable_trigger(request: Request, database: str, name: str, user = Depends(require_auth)):
"""Enable a trigger"""
user_store = get_user_store(user)
return triggers_manager.enable_trigger(user_store, database, name)
@router.post("/triggers/{database}/{name}/disable")
def disable_trigger(request: Request, database: str, name: str, user = Depends(require_auth)):
"""Disable a trigger"""
user_store = get_user_store(user)
return triggers_manager.disable_trigger(user_store, database, name)
@router.get("/triggers/{database}/{name}/stats")
def get_trigger_stats(request: Request, database: str, name: str, user = Depends(require_auth)):
"""Get execution statistics for a trigger"""
user_store = get_user_store(user)
return triggers_manager.get_trigger_stats(user_store, database, name)
@router.get("/triggers/{database}/export")
def export_triggers(request: Request, database: str, table: str = None, user = Depends(require_auth)):
"""Export triggers as SQL"""
user_store = get_user_store(user)
sql = triggers_manager.export_triggers(user_store, database, table)
return Response(content=sql, media_type="text/plain")
@router.get("/triggers/{database}/{table}/list")
def get_table_triggers(request: Request, database: str, table: str, user = Depends(require_auth)):
"""Get all triggers for a specific table"""
user_store = get_user_store(user)
return triggers_manager.get_table_triggers(user_store, database, table)
# -- PROCESS MANAGEMENT --
@router.get("/processes")
def list_processes(request: Request, username: str = None, status: str = None, user = Depends(require_auth)):
"""List all active queries"""
processes = process_manager.list_processes(username, status)
return {'ok': True, 'processes': processes}
@router.post("/processes/{query_id}/kill")
def kill_query(request: Request, query_id: str, user = Depends(require_auth)):
"""Kill a running query"""
return process_manager.kill_query(query_id)
@router.get("/processes/history")
def get_query_history(request: Request, username: str = None, limit: int = 100, user = Depends(require_auth)):
"""Get query execution history"""
history = process_manager.get_query_history(username, limit)
return {'ok': True, 'history': history}
@router.get("/processes/slow")
def get_slow_queries(request: Request, threshold: float = 5.0, user = Depends(require_auth)):
"""Get queries running longer than threshold"""
slow_queries = process_manager.get_slow_queries(threshold)
return {'ok': True, 'slow_queries': slow_queries}
@router.post("/processes/kill-slow")
def kill_slow_queries(request: Request, payload: dict, user = Depends(require_auth)):
"""Kill all slow queries"""
threshold = payload.get('threshold', 30.0)
return process_manager.kill_slow_queries(threshold)
@router.get("/processes/stats")
def get_query_stats(request: Request, username: str = None, user = Depends(require_auth)):
"""Get query execution statistics"""
return process_manager.get_query_stats(username)
@router.get("/processes/users")
def get_active_users(request: Request, user = Depends(require_auth)):
"""Get list of users with active queries"""
users = process_manager.get_active_users()
return {'ok': True, 'users': users}
@router.get("/processes/export")
def export_process_list(request: Request, user = Depends(require_auth)):
"""Export current process list as text"""
text = process_manager.export_process_list()
return Response(content=text, media_type="text/plain")
@router.post("/processes/clear-history")
def clear_query_history(request: Request, payload: dict, user = Depends(require_auth)):
"""Clear old query history"""
older_than_hours = payload.get('older_than_hours', 24)
return process_manager.clear_history(older_than_hours)
# -- CONFIGURATION ANALYZER --
@router.get("/config/analyze")
def analyze_configuration(request: Request, user = Depends(require_auth)):
"""Analyze workspace configuration and get suggestions"""
user_store = get_user_store(user)
return configuration_analyzer.analyze_workspace(user_store)
@router.get("/config/analyze-table/{database}/{table}")
def analyze_table_performance(request: Request, database: str, table: str, user = Depends(require_auth)):
"""Analyze specific table performance"""
user_store = get_user_store(user)
return configuration_analyzer.analyze_table_performance(user_store, database, table)
@router.get("/config/tips")
def get_optimization_tips(request: Request, user = Depends(require_auth)):
"""Get general optimization tips"""
tips = configuration_analyzer.get_optimization_tips()
return {'ok': True, 'tips': tips}
@router.get("/config/report")
def export_configuration_report(request: Request, user = Depends(require_auth)):
"""Export configuration analysis as text report"""
user_store = get_user_store(user)
report = configuration_analyzer.export_report(user_store)
return Response(content=report, media_type="text/plain")
@router.post("/init-sample-data")
def init_sample_data_route(request: Request, user = Depends(require_auth)):
"""Initialize sample data for the current user"""
user_store = get_user_store(user)
try:
result = init_sample_data(user_store, user['username'])
return result
except Exception as e:
return {'ok': False, 'error': str(e)}
# ── PROJECTS ──
class CreateProjectRequest(BaseModel):
name: str
description: str = ''
class UpdateProjectRequest(BaseModel):
name: str = None
description: str = None
class ShareProjectRequest(BaseModel):
username: str
role: str = 'viewer'
@router.post("/projects")
def create_project(request: Request, payload: CreateProjectRequest, user = Depends(require_auth)):
"""Create new project in user's workspace"""
user_store = get_user_store(user)
return project_manager.create_project(user_store, payload.name, payload.description)
@router.get("/projects")
def list_projects(request: Request, user = Depends(require_auth)):
"""List all projects in user's workspace"""
user_store = get_user_store(user)
projects = project_manager.list_projects(user_store)
return {'ok': True, 'projects': projects}
@router.get("/projects/{project_id}")
def get_project(request: Request, project_id: str, user = Depends(require_auth)):
"""Get specific project"""
user_store = get_user_store(user)
project = project_manager.get_project(user_store, project_id)
if not project:
return {'ok': False, 'error': 'Project not found'}
return {'ok': True, 'project': project}
@router.put("/projects/{project_id}")
def update_project(request: Request, project_id: str, payload: UpdateProjectRequest, user = Depends(require_auth)):
"""Update project details"""
user_store = get_user_store(user)
updates = {k: v for k, v in payload.model_dump().items() if v is not None}
return project_manager.update_project(user_store, project_id, updates)
@router.delete("/projects/{project_id}")
def delete_project(request: Request, project_id: str, user = Depends(require_auth)):
"""Delete project"""
user_store = get_user_store(user)
return project_manager.delete_project(user_store, project_id)
@router.post("/projects/{project_id}/databases/{database_name}")
def add_database_to_project(request: Request, project_id: str, database_name: str, user = Depends(require_auth)):
"""Link database to project"""
user_store = get_user_store(user)
return project_manager.add_database_to_project(user_store, project_id, database_name)
@router.delete("/projects/{project_id}/databases/{database_name}")
def remove_database_from_project(request: Request, project_id: str, database_name: str, user = Depends(require_auth)):
"""Unlink database from project"""
user_store = get_user_store(user)
return project_manager.remove_database_from_project(user_store, project_id, database_name)
@router.post("/projects/{project_id}/share")
def share_project(request: Request, project_id: str, payload: ShareProjectRequest, user = Depends(require_auth)):
"""Share project with another user"""
user_store = get_user_store(user)
return project_manager.share_project(user_store, project_id, payload.username, payload.role)
@router.delete("/projects/{project_id}/share/{username}")
def unshare_project(request: Request, project_id: str, username: str, user = Depends(require_auth)):
"""Remove user access from project"""
user_store = get_user_store(user)
return project_manager.unshare_project(user_store, project_id, username)
# ── VISUAL QUERY BUILDER ──
@router.post("/query-builder/build")
def build_visual_query(request: Request, payload: dict, user = Depends(require_auth)):
"""Build SQL query from visual configuration"""
return visual_query_builder.build_query(payload)
@router.get("/query-builder/operators")
def get_query_operators(request: Request, user = Depends(require_auth)):
"""Get available filter operators"""
return {'ok': True, 'operators': visual_query_builder.get_operators()}
@router.get("/query-builder/aggregations")
def get_query_aggregations(request: Request, user = Depends(require_auth)):
"""Get available aggregation functions"""
return {'ok': True, 'aggregations': visual_query_builder.get_aggregations()}
@router.get("/query-builder/join-types")
def get_join_types(request: Request, user = Depends(require_auth)):
"""Get available join types"""
return {'ok': True, 'join_types': visual_query_builder.get_join_types()}
# ── DATA VISUALIZATION ──
@router.post("/visualize/create-chart")
def create_chart(request: Request, payload: dict, user = Depends(require_auth)):
"""Create chart from data"""
data = payload.get('data', [])
chart_type = payload.get('chart_type', 'bar')
config = payload.get('config', {})
return data_visualization_engine.create_chart(data, chart_type, config)
@router.get("/visualize/chart-types")
def get_chart_types(request: Request, user = Depends(require_auth)):
"""Get available chart types"""
return {'ok': True, 'chart_types': data_visualization_engine.get_chart_types()}
@router.post("/visualize/analyze-data")
def analyze_data_for_chart(request: Request, payload: dict, user = Depends(require_auth)):
"""Analyze data and suggest chart types"""
data = payload.get('data', [])
return data_visualization_engine.analyze_data_for_chart(data)
# ── API DOCUMENTATION ──
@router.get("/docs/database/{database}")
def get_database_api_docs(request: Request, database: str, user = Depends(require_auth)):
"""Generate API documentation for database"""
user_store = get_user_store(user)
tables = table_manager.list(user_store)
return api_doc_generator.generate_docs(database, tables, user_store)
@router.get("/docs/sdk-examples/{database}/{table}")
def get_sdk_examples(request: Request, database: str, table: str, user = Depends(require_auth)):
"""Get SDK examples for table"""
examples = api_doc_generator.generate_sdk_examples(database, table)
return {'ok': True, 'examples': examples}
# ── DATABASE TEMPLATES ──
@router.get("/templates/list")
def list_database_templates(request: Request, user = Depends(require_auth)):
"""List available database templates"""
templates = database_template_manager.list_templates()
return {'ok': True, 'templates': templates}
@router.get("/templates/{template_id}")
def get_database_template(request: Request, template_id: str, user = Depends(require_auth)):
"""Get template details"""
return database_template_manager.get_template(template_id)
@router.post("/templates/{template_id}/create")
def create_database_from_template(request: Request, template_id: str, payload: dict, user = Depends(require_auth)):
"""Create database from template"""
database_name = payload.get('database_name')
if not database_name:
return {'ok': False, 'error': 'database_name required'}
user_store = get_user_store(user)
return database_template_manager.create_from_template(
template_id, database_name, user_store, user['username'], user['workspace_id']
)
@router.post("/databases/clone")
def clone_database(request: Request, payload: dict, user = Depends(require_auth)):
"""Clone existing database"""
source_db = payload.get('source_database')
target_db = payload.get('target_database')
if not source_db or not target_db:
return {'ok': False, 'error': 'source_database and target_database required'}
user_store = get_user_store(user)
return database_template_manager.clone_database(
source_db, target_db, user_store, user['username'], user['workspace_id']
)
# ── PERFORMANCE MONITORING ──
@router.get("/performance/stats")
def get_performance_stats(request: Request, hours: int = 24, user = Depends(require_auth)):
"""Get query performance statistics"""
user_store = get_user_store(user)
return performance_monitor.get_query_stats(user_store, hours)
@router.get("/performance/slow-queries")
def get_slow_queries(request: Request, threshold: float = 1.0, limit: int = 50, user = Depends(require_auth)):
"""Get slow queries"""
user_store = get_user_store(user)
return performance_monitor.get_slow_queries(user_store, threshold, limit)
@router.get("/performance/table/{database}/{table}")
def analyze_table_perf(request: Request, database: str, table: str, user = Depends(require_auth)):
"""Analyze performance for specific table"""
user_store = get_user_store(user)
return performance_monitor.analyze_table_performance(user_store, database, table)
@router.get("/performance/index-recommendations/{database}/{table}")
def get_index_recommendations(request: Request, database: str, table: str, user = Depends(require_auth)):
"""Get index recommendations for table"""
user_store = get_user_store(user)
return performance_monitor.get_index_recommendations(user_store, database, table)
@router.post("/performance/clear-logs")
def clear_performance_logs(request: Request, payload: dict, user = Depends(require_auth)):
"""Clear old performance logs"""
days = payload.get('days', 30)
user_store = get_user_store(user)
return performance_monitor.clear_old_logs(user_store, days)
# ── AI ASSISTANT ──
@router.post("/ai/chat")
def ai_chat(request: Request, payload: dict, user = Depends(require_auth)):
"""Chat with AI assistant"""
message = payload.get('message')
if not message:
return {'ok': False, 'error': 'message required'}
user_store = get_user_store(user)
response = ai_assistant.process_message(message, user_store, user['username'])
return {'ok': True, 'response': response}
@router.post("/ai/execute-query")
def ai_execute_query(request: Request, payload: dict, user = Depends(require_auth)):
"""Execute AI-generated query"""
sql = payload.get('sql')
if not sql:
return {'ok': False, 'error': 'sql required'}
user_store = get_user_store(user)
# Log query performance
import time
start_time = time.time()
result = ai_assistant.execute_query(sql, user_store)
execution_time = time.time() - start_time
# Log performance
if result.get('ok'):
database = payload.get('database', 'unknown')
performance_monitor.log_query(user_store, sql, execution_time, database, user['username'])
return result
@router.get("/settings/ai")
def get_ai_settings(request: Request, user = Depends(require_auth)):
"""Get AI assistant settings"""
import os
return {
'ok': True,
'provider': os.getenv('AI_PROVIDER', 'rule_based'),
'model': os.getenv('AI_MODEL', ''),
'api_key': '••••••••••••' if os.getenv('AI_API_KEY') else ''
}
@router.put("/settings/ai")
def update_ai_settings(request: Request, payload: dict, user = Depends(require_auth)):
"""Update AI assistant settings"""
import os
provider = payload.get('provider', 'rule_based')
model = payload.get('model', '')
api_key = payload.get('api_key', '')
# Validate provider
valid_providers = ['rule_based', 'groq', 'openai', 'anthropic']
if provider not in valid_providers:
return {'ok': False, 'error': f'Invalid provider. Must be one of: {valid_providers}'}
# Update environment variables (for current session)
os.environ['AI_PROVIDER'] = provider
if model:
os.environ['AI_MODEL'] = model
if api_key and not api_key.startswith('••'):
os.environ['AI_API_KEY'] = api_key
# Reinitialize AI assistant with new settings
ai_assistant.__init__()
return {
'ok': True,
'message': 'AI settings updated. Note: Settings are session-only. Set environment variables for persistence.'
}
# ── WORKSPACE SHARING ──
@router.post("/workspace/invite/create")
def create_workspace_invite(request: Request, payload: dict, user = Depends(require_auth)):
"""Create invite link for workspace sharing"""
role = payload.get('role', 'viewer')
expires_hours = payload.get('expires_hours')
max_uses = payload.get('max_uses')
return workspace_share_manager.create_invite_link(
workspace_id=user['workspace_id'],
created_by=user['username'],
role=role,
expires_hours=expires_hours,
max_uses=max_uses
)
@router.post("/workspace/invite/verify")
def verify_workspace_invite(request: Request, payload: dict):
"""Verify invite code and secret (public endpoint)"""
invite_code = payload.get('invite_code')
secret = payload.get('secret')
if not invite_code or not secret:
return JSONResponse(
status_code=400,
content={'ok': False, 'error': 'invite_code and secret required'}
)
result = workspace_share_manager.verify_invite(invite_code, secret)
return JSONResponse(content=result)
@router.post("/workspace/invite/accept")
def accept_workspace_invite(request: Request, payload: dict, user = Depends(require_auth)):
"""Accept invite and join workspace"""
invite_code = payload.get('invite_code')
secret = payload.get('secret')
if not invite_code or not secret:
return {'ok': False, 'error': 'invite_code and secret required'}
return workspace_share_manager.accept_invite(invite_code, secret, user['username'])
@router.get("/workspace/members")
def list_workspace_members(request: Request, user = Depends(require_auth)):
"""List all members of current workspace"""
members = workspace_share_manager.list_workspace_members(user['workspace_id'])
return {'ok': True, 'members': members}
@router.get("/workspace/invites")
def list_workspace_invites(request: Request, user = Depends(require_auth)):
"""List all invite links for current workspace"""
invites = workspace_share_manager.list_workspace_invites(user['workspace_id'])
return {'ok': True, 'invites': invites}
@router.post("/workspace/invite/revoke")
def revoke_workspace_invite(request: Request, payload: dict, user = Depends(require_auth)):
"""Revoke an invite link"""
invite_code = payload.get('invite_code')
if not invite_code:
return {'ok': False, 'error': 'invite_code required'}
return workspace_share_manager.revoke_invite(invite_code, user['workspace_id'])
@router.post("/workspace/member/remove")
def remove_workspace_member(request: Request, payload: dict, user = Depends(require_auth)):
"""Remove a member from workspace"""
username = payload.get('username')
if not username:
return {'ok': False, 'error': 'username required'}
return workspace_share_manager.remove_member(user['workspace_id'], username)
@router.post("/workspace/member/role")
def update_member_role(request: Request, payload: dict, user = Depends(require_auth)):
"""Update member's role"""
username = payload.get('username')
role = payload.get('role')
if not username or not role:
return {'ok': False, 'error': 'username and role required'}
return workspace_share_manager.update_member_role(user['workspace_id'], username, role)
@router.get("/workspace/shared")
def list_shared_workspaces(request: Request, user = Depends(require_auth)):
"""List all workspaces user has access to (shared with them)"""
workspaces = workspace_share_manager.get_user_workspaces(user['username'])
return {'ok': True, 'workspaces': workspaces}
@router.get("/workspace/accessible")
def list_accessible_workspaces(request: Request, user = Depends(require_auth)):
"""List all workspaces user can access (own + shared)"""
from app.auth import get_accessible_workspaces
workspaces = get_accessible_workspaces(user)
return {'ok': True, 'workspaces': workspaces}
@router.get("/workspace/debug/members")
def debug_all_members(request: Request, user = Depends(require_admin)):
"""Debug: List ALL workspace members in the database"""
import duckdb
from pathlib import Path
from app.config import settings
db_path = Path(settings.data_root) / "system" / "workspace_shares.duckdb"
conn = duckdb.connect(str(db_path))
result = conn.execute("SELECT * FROM workspace_members ORDER BY added_at DESC").fetchdf()
conn.close()
return {
'ok': True,
'total_members': len(result),
'members': result.to_dict('records'),
'current_user': user['username'],
'current_workspace': user['workspace_id']
}
@router.get("/workspace/debug/invites")
def debug_all_invites(request: Request, user = Depends(require_admin)):
"""Debug: List ALL invite links in the database"""
import duckdb
from pathlib import Path
from app.config import settings
db_path = Path(settings.data_root) / "system" / "workspace_shares.duckdb"
conn = duckdb.connect(str(db_path))
result = conn.execute("SELECT * FROM invite_links ORDER BY created_at DESC").fetchdf()
conn.close()
return {
'ok': True,
'total_invites': len(result),
'invites': result.to_dict('records')
}
@router.get("/workspace/{workspace_id}/databases")
def get_workspace_databases(request: Request, workspace_id: str, user = Depends(require_auth)):
"""Get databases from a specific workspace (own or shared)"""
from app.auth import get_user_store_for_workspace
user_store = get_user_store_for_workspace(user, workspace_id)
return metadata.list_databases(user_store)
@router.get("/workspace/{workspace_id}/tables")
def get_workspace_tables(request: Request, workspace_id: str, user = Depends(require_auth)):
"""Get tables from a specific workspace (own or shared)"""
from app.auth import get_user_store_for_workspace
user_store = get_user_store_for_workspace(user, workspace_id)
return table_manager.list(user_store)
# ── DATABASES ──
@router.get("/databases")
def get_databases(request: Request, user = Depends(require_auth)):
user_store = get_user_store(user)
# List databases from user's workspace
return metadata.list_databases(user_store)
@router.post("/databases")
def create_database(request: Request, payload: CreateDatabaseRequest, user = Depends(require_auth)):
from app.input_validator import input_validator
from app.rate_limiter import rate_limiter
# Rate limiting
rate_limiter.check_rate_limit(request, user, 'authenticated')
# Validate database name
is_valid, error = input_validator.validate_database_name(payload.name)
if not is_valid:
return {'ok': False, 'error': error}
user_store = get_user_store(user)
result = metadata.create_database(payload.name, user_store)
if result.get('ok'):
key_pair = db_api_key_manager.create_db_api_key(user_store, payload.name, user['workspace_id'])
result['api_key'] = key_pair['api_key']
result['secret_key'] = key_pair['secret_key']
result['message'] = 'IMPORTANT: Save your secret key now. It will not be shown again!'
return result
@router.delete("/databases/{name}")
def delete_database(request: Request, name: str, user = Depends(require_auth)):
user_store = get_user_store(user)
result = metadata.delete_database(name, user_store)
return result
@router.post("/databases/cleanup")
def cleanup_databases(request: Request, user = Depends(require_auth)):
"""Force cleanup - remove orphaned data and DuckDB schemas"""
user_store = get_user_store(user)
# Get current databases
dbs = metadata.list_databases(user_store)
db_names = [db['name'] if isinstance(db, dict) else db for db in dbs]
# Get all tables
tables = table_manager.list(user_store)
# Find orphaned tables (tables referencing non-existent databases)
orphaned = [t for t in tables if t['database'] not in db_names]
# Remove orphaned tables
for t in orphaned:
try:
table_manager.drop(t['database'], t['table'], user_store)
except:
pass
# Drop all DuckDB schemas not in database list
try:
from app.query_engine import create_query_engine
engine = create_query_engine(user_store)
# Get all schemas
schemas = engine.conn.execute("SELECT schema_name FROM information_schema.schemata").fetchall()
for (schema_name,) in schemas:
if schema_name not in ('main', 'temp', 'information_schema', 'pg_catalog'):
if schema_name not in db_names:
engine.conn.execute(f'DROP SCHEMA IF EXISTS "{schema_name}" CASCADE')
engine.close()
except Exception as e:
print(f"Warning: Schema cleanup error: {e}")
return {
'ok': True,
'databases': db_names,
'orphaned_tables_removed': len(orphaned),
'message': f'Cleanup complete. {len(orphaned)} orphaned tables removed.'
}
# ── TABLES ──
@router.get("/tables")
def get_tables(request: Request, user = Depends(require_auth)):
user_store = get_user_store(user)
return table_manager.list(user_store)
@router.post("/tables")
def create_table(request: Request, payload: CreateTableRequest, user = Depends(require_auth)):
user_store = get_user_store(user)
return table_manager.create(payload.database, payload.table, payload.columns, user_store)
@router.get("/schema/{database}/{table}")
def get_schema(request: Request, database: str, table: str, user = Depends(require_auth)):
user_store = get_user_store(user)
return schema_manager.get(database, table, user_store)
# ── ROWS ──
@router.get("/rows/{database}/{table}")
def get_rows(request: Request, database: str, table: str, limit: int = 100, offset: int = 0, user = Depends(require_auth)):
"""Get rows with pagination support"""
user_store = get_user_store(user)
return row_manager.read(database, table, user_store, limit=limit, offset=offset)
@router.post("/rows/{database}/{table}")
def insert_row(request: Request, database: str, table: str, payload: dict, user = Depends(require_auth)):
user_store = get_user_store(user)
try:
result = row_manager.insert(database, table, payload, user['username'], user_store, user['workspace_id'])
if result.get('ok'):
try:
engine = create_query_engine(user_store)
try:
_persist_all_tables(engine, user_store)
finally:
engine.close()
except Exception:
pass
return result
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse(status_code=500, content={'ok': False, 'error': str(e)})
@router.put("/rows/{database}/{table}/{row_id_field}/{row_id_value}")
def update_row(request: Request, database: str, table: str, row_id_field: str, row_id_value: str, payload: dict, user = Depends(require_auth)):
user_store = get_user_store(user)
result = row_manager.update(database, table, row_id_field, row_id_value, payload, user['username'], user_store, user['workspace_id'])
if result.get('ok'):
try:
engine = create_query_engine(user_store)
try:
_persist_all_tables(engine, user_store)
finally:
engine.close()
except Exception:
pass
return result
@router.delete("/rows/{database}/{table}/{row_id_field}/{row_id_value}")
def delete_row(request: Request, database: str, table: str, row_id_field: str, row_id_value: str, user = Depends(require_auth)):
user_store = get_user_store(user)
result = row_manager.delete(database, table, row_id_field, row_id_value, user['username'], user_store, user['workspace_id'])
if result.get('ok'):
try:
engine = create_query_engine(user_store)
try:
_persist_all_tables(engine, user_store)
finally:
engine.close()
except Exception:
pass
return result
# ── TABLE MANAGEMENT ──
@router.post("/tables/alter")
def alter_table(request: Request, payload: dict, user = Depends(require_auth)):
"""Alter table structure — add/drop/rename columns, change types"""
database = payload.get('database')
table = payload.get('table')
operations = payload.get('operations', [])
if not database or not table or not operations:
return {'ok': False, 'error': 'database, table, and operations required'}
user_store = get_user_store(user)
return table_manager.alter_table(database, table, operations, user_store)
@router.post("/tables/rename")
def rename_table(request: Request, payload: dict, user = Depends(require_auth)):
"""Rename a table"""
database = payload.get('database')
old_name = payload.get('old_name')
new_name = payload.get('new_name')
if not database or not old_name or not new_name:
return {'ok': False, 'error': 'database, old_name, and new_name required'}
user_store = get_user_store(user)
return table_manager.rename(database, old_name, new_name, user_store)
@router.post("/tables/copy")
def copy_table(request: Request, payload: dict, user = Depends(require_auth)):
"""Duplicate a table with all data"""
database = payload.get('database')
source = payload.get('source')
destination = payload.get('destination')
if not database or not source or not destination:
return {'ok': False, 'error': 'database, source, and destination required'}
user_store = get_user_store(user)
return table_manager.copy(database, source, destination, user_store)
@router.post("/tables/truncate")
def truncate_table(request: Request, payload: dict, user = Depends(require_auth)):
"""Delete all rows but keep table structure"""
database = payload.get('database')
table = payload.get('table')
if not database or not table:
return {'ok': False, 'error': 'database and table required'}
user_store = get_user_store(user)
return table_manager.truncate(database, table, user_store)
@router.get("/tables/stats/{database}/{table}")
def get_table_stats(request: Request, database: str, table: str, user = Depends(require_auth)):
"""Get table statistics — row count, file size, column info"""
user_store = get_user_store(user)
return table_manager.get_stats(database, table, user_store)
# ── SQL QUERY ENGINE ──
@router.post("/sql")
def execute_sql(request: Request, payload: SQLQueryRequest, user = Depends(require_auth)):
"""Execute raw SQL queries using DuckDB with validation and persistence"""
from app.query_parser import query_parser
from app.table_manager import table_manager
from app.utils import write_json
sql = payload.sql.strip()
if not sql:
return {'ok': False, 'error': 'Empty query'}
# Convert MySQL syntax to DuckDB-compatible SQL
from app.mysql_to_duckdb_converter import mysql_to_duckdb_converter
if re.match(r'CREATE\s+TABLE', sql, re.IGNORECASE):
converted, _ = mysql_to_duckdb_converter.convert_statement(sql)
if converted:
sql = converted
# Validate query with write operations allowed
is_valid, error = query_parser.validate_query(sql, allow_write=True)
if not is_valid:
return {'ok': False, 'error': f"Query validation failed: {error}"}
# Check query complexity
is_allowed, reason, score = query_complexity_analyzer.analyze_complexity(sql)
if not is_allowed:
return {'ok': False, 'error': f"Query complexity check failed: {reason}"}
user_store = get_user_store(user)
engine = create_query_engine(user_store)
try:
# Handle SHOW TABLES
if re.match(r'SHOW\s+TABLES', sql, re.IGNORECASE):
tables = metadata.list_tables(user_store)
return {
'ok': True,
'data': [{'Table': f"{t['database']}.{t['table']}"} for t in tables],
'columns': ['Table'],
'rows': len(tables)
}
# Handle DESCRIBE / DESC / SHOW COLUMNS
desc_match = re.match(r'(?:DESCRIBE|DESC|SHOW\s+COLUMNS\s+(?:FROM|IN))\s+["`]?(\w+(?:\.\w+)?)["`]?', sql, re.IGNORECASE)
if desc_match:
table_ref = desc_match.group(1)
if '.' in table_ref:
db, tbl = table_ref.split('.', 1)
else:
all_tables = metadata.list_tables(user_store)
matches = [t for t in all_tables if t['table'].lower() == table_ref.lower()]
if matches:
db, tbl = matches[0]['database'], matches[0]['table']
else:
return {'ok': False, 'error': f'Table {table_ref} not found'}
engine._register_table(db, tbl)
try:
safe_db = query_parser.sanitize_identifier(db)
safe_tbl = query_parser.sanitize_identifier(tbl)
cols = engine.conn.execute(f"""
SELECT column_name as Field, data_type as Type, 'YES' as "Null", '' as "Key", NULL as "Default", '' as "Extra"
FROM information_schema.columns
WHERE table_schema = '{safe_db}' AND table_name = '{safe_tbl}'
ORDER BY ordinal_position
""").fetchdf()
return {
'ok': True,
'data': cols.to_dict('records'),
'columns': list(cols.columns),
'rows': len(cols)
}
except Exception as e:
return {'ok': False, 'error': str(e)}
# Handle multi-statement SQL
statements = query_parser.split_sql_statements(sql)
is_multi = len(statements) > 1
if is_multi:
results = []
for stmt in statements:
stmt = stmt.strip().rstrip(';').strip()
if not stmt:
continue
stmt_valid, stmt_err = query_parser.validate_query(stmt, allow_write=True)
if not stmt_valid:
results.append({'ok': False, 'error': stmt_err, 'sql': stmt[:100]})
continue
stmt_result = engine.execute_sql(stmt, allow_write=True)
results.append(stmt_result)
# After multi-statement execution, persist all affected tables
_persist_all_tables(engine, user_store)
audit.log("sql_query_multi", actor=user['username'], target="sql_engine", after={"statements": len(results)})
return {'ok': True, 'results': results, 'total': len(results)}
# Single statement execution
drop_match = re.match(r'DROP\s+TABLE\s+(?:IF\s+EXISTS\s+)?"?(\w+)"?\."?(\w+)"?', sql, re.IGNORECASE)
create_match = re.match(r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?"?(\w+)"?\."?(\w+)"?', sql, re.IGNORECASE)
write_match = re.match(r'(INSERT|UPDATE|DELETE|ALTER)\b', sql, re.IGNORECASE)
result = engine.execute_sql(sql)
# If DROP TABLE succeeded, clean up metadata and files
if drop_match and result.get('ok'):
database = drop_match.group(1)
table = drop_match.group(2)
try:
existing_tables = table_manager.list(user_store)
updated_tables = [t for t in existing_tables if not (t['database'] == database and t['table'] == table)]
if len(updated_tables) < len(existing_tables):
tables_path = table_manager.tables_path(user_store)
write_json(tables_path, updated_tables)
user_store.upload_file(tables_path, "metadata/tables.json", f"Dropped table {database}.{table}")
parquet_path = user_store.local("data", database, f"{table}.parquet")
if parquet_path.exists():
parquet_path.unlink()
try:
user_store.delete_file(f"data/{database}/{table}.parquet", f"Drop {database}.{table}")
except Exception:
pass
schema_path = user_store.local("metadata", "schemas", f"{database}.{table}.json")
if schema_path.exists():
schema_path.unlink()
engine._registered_tables.discard(f"{database}.{table}")
except Exception as e:
print(f"Warning: Failed to clean up dropped table: {e}")
# If CREATE TABLE succeeded, register and persist
if create_match and result.get('ok'):
database = create_match.group(1)
table = create_match.group(2)
try:
engine._register_table(database, table)
_persist_table(engine, user_store, database, table)
except Exception as e:
print(f"Warning: Failed to persist created table: {e}")
# If INSERT/UPDATE/DELETE succeeded, persist affected tables
if write_match and result.get('ok'):
affected = query_parser.extract_tables(sql)
for table_ref in affected:
if '.' in table_ref:
db, tbl = table_ref.split('.', 1)
try:
_persist_table(engine, user_store, db, tbl)
except Exception as e:
print(f"Warning: Failed to persist {db}.{tbl}: {e}")
audit.log("sql_query", actor=user['username'], target="sql_engine", after={"sql": sql[:100], "score": score})
return result
finally:
engine.close()
def _persist_table(engine, user_store, database, table):
"""Export a table to Parquet and register in metadata"""
from app.table_manager import table_manager
from app.utils import write_json
from app.query_parser import query_parser
safe_db = query_parser.sanitize_identifier(database)
safe_tbl = query_parser.sanitize_identifier(table)
# Export to Parquet
parquet_path = user_store.local("data", safe_db, f"{safe_tbl}.parquet")
parquet_path.parent.mkdir(parents=True, exist_ok=True)
safe_path = str(parquet_path).replace("'", "''")
engine.conn.execute(f"""
COPY "{safe_db}"."{safe_tbl}" TO '{safe_path}' (FORMAT PARQUET)
""")
# Upload parquet file to HF
try:
user_store.upload_file(parquet_path, f"data/{safe_db}/{safe_tbl}.parquet", f"Persist {safe_db}.{safe_tbl}")
except Exception:
pass
# Update metadata
existing_tables = table_manager.list(user_store)
if not any(t['database'] == safe_db and t['table'] == safe_tbl for t in existing_tables):
try:
cols = engine.conn.execute(f"""
SELECT column_name, data_type FROM information_schema.columns
WHERE table_schema = '{safe_db}' AND table_name = '{safe_tbl}'
ORDER BY ordinal_position
""").fetchall()
existing_tables.append({
"database": safe_db,
"table": safe_tbl,
"columns": [{"name": c[0], "type": c[1]} for c in cols]
})
except:
existing_tables.append({"database": safe_db, "table": safe_tbl, "columns": []})
tables_path = table_manager.tables_path(user_store)
write_json(tables_path, existing_tables)
user_store.upload_file(tables_path, "metadata/tables.json", f"SQL: registered {safe_db}.{safe_tbl}")
def _persist_all_tables(engine, user_store):
"""Persist all user tables to Parquet and update metadata"""
from app.table_manager import table_manager
from app.utils import write_json
from app.query_parser import query_parser
try:
all_tables = engine.conn.execute("""
SELECT table_schema, table_name FROM information_schema.tables
WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'system')
""").fetchall()
existing_tables = table_manager.list(user_store)
for db, tbl in all_tables:
try:
safe_db = query_parser.sanitize_identifier(db)
safe_tbl = query_parser.sanitize_identifier(tbl)
parquet_path = user_store.local("data", safe_db, f"{safe_tbl}.parquet")
parquet_path.parent.mkdir(parents=True, exist_ok=True)
safe_path = str(parquet_path).replace("'", "''")
engine.conn.execute(f"""
COPY "{safe_db}"."{safe_tbl}" TO '{safe_path}' (FORMAT PARQUET)
""")
# Upload parquet file to HF
try:
user_store.upload_file(parquet_path, f"data/{safe_db}/{safe_tbl}.parquet", f"Persist {safe_db}.{safe_tbl}")
except Exception:
pass
if not any(t['database'] == safe_db and t['table'] == safe_tbl for t in existing_tables):
try:
cols = engine.conn.execute(f"""
SELECT column_name, data_type FROM information_schema.columns
WHERE table_schema = '{safe_db}' AND table_name = '{safe_tbl}'
ORDER BY ordinal_position
""").fetchall()
existing_tables.append({
"database": safe_db,
"table": safe_tbl,
"columns": [{"name": c[0], "type": c[1]} for c in cols]
})
except:
existing_tables.append({"database": safe_db, "table": safe_tbl, "columns": []})
except Exception:
pass
tables_path = table_manager.tables_path(user_store)
write_json(tables_path, existing_tables)
user_store.upload_file(tables_path, "metadata/tables.json", "SQL: persisted all tables")
except Exception:
pass
@router.post("/sql/explain")
def explain_sql(request: Request, payload: SQLQueryRequest, user = Depends(require_auth)):
"""Get query execution plan (EXPLAIN)"""
user_store = get_user_store(user)
engine = create_query_engine(user_store)
try:
try:
result = engine.conn.execute(f"EXPLAIN {payload.sql}").fetchdf()
return {'ok': True, 'plan': result.to_dict('records'), 'columns': list(result.columns)}
except Exception as e:
return {'ok': False, 'error': str(e)}
finally:
engine.close()
# ── DATABASE API KEY MANAGEMENT ──
@router.get("/databases/{database}/api-key")
def get_database_api_key(request: Request, database: str, user = Depends(require_auth)):
"""Get API key for a specific database"""
user_store = get_user_store(user)
api_key = db_api_key_manager.get_db_api_key(user_store, database)
if not api_key:
key_pair = db_api_key_manager.create_db_api_key(user_store, database, user['workspace_id'])
return {
'ok': True,
'database': database,
'api_key': key_pair['api_key'],
'secret_key': key_pair['secret_key'],
'new': True
}
return {
'ok': True,
'database': database,
'api_key': api_key
}
@router.post("/databases/{database}/regenerate-api-key")
def regenerate_database_api_key(request: Request, database: str, user = Depends(require_auth)):
"""Regenerate API key + secret for a database"""
user_store = get_user_store(user)
key_pair = db_api_key_manager.regenerate_db_api_key(user_store, user['workspace_id'], database)
return {
'ok': True,
'database': database,
'api_key': key_pair['api_key'],
'secret_key': key_pair['secret_key'],
'message': 'IMPORTANT: Save your new secret key now. It will not be shown again!'
}
@router.get("/databases/{database}/connection")
def get_database_connection(request: Request, database: str, user = Depends(require_auth)):
"""Get connection information for a database"""
user_store = get_user_store(user)
api_key = db_api_key_manager.get_db_api_key(user_store, database)
if not api_key:
return {'ok': False, 'error': 'No API key found for this database'}
# Return only the API key, not the full URL
return {
'ok': True,
'database': database,
'api_key': api_key,
'usage': {
'description': 'Use this API key with CorpusDB SDK or HTTP client',
'sdk_example': f'corpusdb.connect(api_key="{api_key}", database="{database}")',
'http_example': 'Use /api/cdb/* endpoints with X-Api-Key and X-Secret-Key headers'
}
}
# ── UTILITIES ──
@router.get("/stats")
def get_stats(request: Request, user = Depends(require_auth)):
user_store = get_user_store(user)
tables = table_manager.list(user_store)
dbs_raw = metadata.list_databases(user_store)
db_list = dbs_raw if isinstance(dbs_raw, list) else dbs_raw.get('databases', [])
total_rows = 0
db_stats = []
for db in db_list:
db_name = db['name'] if isinstance(db, dict) else db
db_tables = [t for t in tables if t['database'] == db_name]
db_rows = 0
for t in db_tables[:10]:
try:
engine = create_query_engine(user_store)
stats = engine.get_table_stats(db_name, t['table'])
if stats['ok'] and stats['data']:
db_rows += stats['data'][0].get('row_count', 0)
except Exception:
pass
total_rows += db_rows
db_stats.append({
'name': db_name,
'tables': len(db_tables),
'rows': db_rows,
'created': db.get('created_at', '') if isinstance(db, dict) else '',
})
# Active sessions for this user
user_manager._init()
conn = user_manager._get_conn()
from datetime import datetime as _dt
now = _dt.utcnow().isoformat()
session_count = conn.execute(
"SELECT COUNT(*) FROM sessions WHERE username = ? AND expires_at > ?",
[user['username'], now]
).fetchone()[0]
conn.close()
# Disk usage of workspace
import os
workspace_dir = user_store.local('.')
workspace_size = sum(
f.stat().st_size for f in workspace_dir.rglob('*') if f.is_file()
) if workspace_dir.exists() else 0
return {
"databases": len(db_list),
"tables": len(tables),
"rows": total_rows,
"queries": 0,
"workspace_id": user['workspace_id'],
"active_sessions": session_count,
"storage_bytes": workspace_size,
"storage_mb": round(workspace_size / 1024 / 1024, 2),
"db_stats": db_stats,
}
@router.get("/audit")
def get_audit(request: Request, user = Depends(require_auth)):
return audit.list()
@router.post("/backup")
def create_backup(request: Request, user = Depends(require_auth)):
user_store = get_user_store(user)
result = backup_manager.create_snapshot(f"{user['username']}_backup", user_store)
severity = 'INFO' if result.get('ok') else 'ERROR'
security_logger.log_event(
'DATA_EXPORT', username=user['username'],
ip_address=request.client.host if request.client else 'unknown',
details={'type': 'backup', 'name': f"{user['username']}_backup", 'status': 'success' if result.get('ok') else 'failed'},
severity=severity
)
return result
@router.post("/import/preview")
async def import_preview(request: Request, file: UploadFile = File(...), user = Depends(require_auth)):
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp.write(await file.read())
tmp_path = Path(tmp.name)
return import_export.preview_csv(tmp_path)
@router.post("/import/commit")
async def import_commit(request: Request, database: str = Form(...), table: str = Form(...), file: UploadFile = File(...), user = Depends(require_auth)):
user_store = get_user_store(user)
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp.write(await file.read())
tmp_path = Path(tmp.name)
return import_export.commit_csv(database, table, tmp_path, user_store)
@router.post("/ai/query")
def ai_query(request: Request, payload: dict, user = Depends(require_auth)):
user_store = get_user_store(user)
question = payload.get("question", "")
# Enhanced AI with SQL execution
if "table" in question.lower():
tables = table_manager.list(user_store)
table_names = [f"{t['database']}.{t['table']}" for t in tables[:10]]
return {
"response": f"Your tables: {', '.join(table_names)}",
"sql": None
}
elif "select" in question.lower() or "query" in question.lower():
# Try to extract and execute SQL
sql = question.strip()
if sql.upper().startswith('SELECT'):
engine = create_query_engine(user_store)
result = engine.execute_sql(sql)
engine.close()
if result['ok']:
return {
"response": f"Query executed successfully. Found {result['rows']} rows.",
"sql": sql,
"data": result['data'][:10] # Return first 10 rows
}
else:
return {
"response": f"Query error: {result['error']}",
"sql": sql
}
return {
"response": "I can help with SQL queries, table info, and data analysis. Try asking about your tables or write a SELECT query!",
"sql": None
}