from fastapi import APIRouter, UploadFile, File, Form, Depends, Response, Cookie, Request, WebSocket, WebSocketDisconnect, Header, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel from app.metadata import metadata import logging logger = logging.getLogger('corpusdb.api') from app.table_manager import table_manager from app.row_manager import row_manager from app.schema_manager import schema_manager from app.import_export import import_export from app.relation_manager import relation_manager from app.privilege_manager import privilege_manager from app.audit_manager import audit from app.backup_manager import backup_manager from app.sample_data import init_sample_data from app.user_manager import user_manager from app.auth import get_current_user, require_auth, get_user_store from app.admin import require_admin from app.broadcaster import broadcaster from app.query_engine import create_query_engine from app.corpusdb_api_manager import corpusdb_api_manager from app.db_api_key_manager import db_api_key_manager from app.database_analytics import database_analytics from app.data_visualizer import data_visualizer from app.database_templates import database_templates from app.advanced_import_export import advanced_import_export from app.migration_manager import migration_manager from app.query_builder import query_builder from app.data_validator import data_validator from app.job_scheduler import job_scheduler from app.bookmark_manager import bookmark_manager from app.global_search import global_search from app.table_maintenance import table_maintenance from app.blob_handler import blob_handler from app.views_manager import views_manager from app.privilege_manager_v2 import privilege_manager_v2 from app.stored_procedure_manager import stored_procedure_manager from app.triggers_manager import triggers_manager from app.process_manager import process_manager from app.workspace_share_manager import workspace_share_manager from app.query_complexity import query_complexity_analyzer from app.project_manager import project_manager from app.visual_query_builder import visual_query_builder from app.data_visualization_engine import data_visualization_engine from app.api_doc_generator import api_doc_generator from app.database_template_manager import database_template_manager from app.performance_monitor import performance_monitor from app.ai_assistant import ai_assistant from app.configuration_analyzer import configuration_analyzer from app.security_logger import security_logger import tempfile from pathlib import Path import json import re router = APIRouter() class LoginRequest(BaseModel): username: str password: str class SignupRequest(BaseModel): username: str email: str password: str full_name: str = '' class CreateDatabaseRequest(BaseModel): name: str class CreateTableRequest(BaseModel): database: str table: str columns: list class SQLQueryRequest(BaseModel): sql: str class UserSettingsRequest(BaseModel): email: str = None full_name: str = None hf_token: str = None hf_repo: str = None # ── AUTHENTICATION ── @router.post("/auth/signup") def signup(payload: SignupRequest, response: Response, request: Request): from app.rate_limiter import rate_limiter from app.input_validator import input_validator from app.security_logger import security_logger client_ip = request.client.host if request.client else 'unknown' logger.info(f"SIGNUP REQUEST: username='{payload.username}', email='{payload.email}', ip={client_ip}") try: # Rate limiting try: rate_limiter.check_rate_limit(request, limit_type='auth') except HTTPException as e: logger.warning(f"SIGNUP RATE LIMITED: username='{payload.username}', ip={client_ip}") security_logger.log_rate_limit_exceeded( payload.username, client_ip, 'auth' ) return JSONResponse( status_code=429, content={'ok': False, 'error': 'Too many requests. Please try again later.'} ) # Validate inputs is_valid, error = input_validator.validate_username(payload.username) if not is_valid: logger.warning(f"SIGNUP VALIDATION FAILED (username): {error}, username='{payload.username}'") return {'ok': False, 'error': error} is_valid, error = input_validator.validate_email(payload.email) if not is_valid: logger.warning(f"SIGNUP VALIDATION FAILED (email): {error}, email='{payload.email}'") return {'ok': False, 'error': error} is_valid, error = input_validator.validate_password(payload.password) if not is_valid: logger.warning(f"SIGNUP VALIDATION FAILED (password): {error}") return {'ok': False, 'error': error} # Sanitize full name full_name = input_validator.sanitize_string(payload.full_name, max_length=100) result = user_manager.create_user( username=payload.username, email=payload.email, password=payload.password, full_name=full_name ) if result['ok']: logger.info(f"SIGNUP SUCCESS: username='{payload.username}', email='{payload.email}', ip={client_ip}") security_logger.log_event( 'SIGNUP', payload.username, client_ip ) # Auto-login after signup auth_result = user_manager.authenticate(payload.username, payload.password) if auth_result['ok']: logger.info(f"SIGNUP AUTO-LOGIN SUCCESS: username='{payload.username}', token={auth_result['token'][:12]}..., ip={client_ip}") response.set_cookie( key="session_token", value=auth_result['token'], httponly=True, max_age=7*24*60*60, samesite="none", secure=True ) return auth_result else: logger.warning(f"SIGNUP AUTO-LOGIN FAILED: username='{payload.username}', reason='{auth_result.get('error')}'") return result except Exception as e: logger.error(f"SIGNUP ERROR: username='{payload.username}', exception={e}", exc_info=True) import traceback traceback.print_exc() return JSONResponse( status_code=500, content={'ok': False, 'error': f'Server error: {str(e)}'} ) @router.post("/auth/register") def register(payload: SignupRequest, response: Response, request: Request): """Alias for /auth/signup for backward compatibility""" return signup(payload, response, request) @router.get("/auth/admin-totp-setup") def admin_totp_setup(): """Get TOTP secret and QR code URL for admin setup""" import os admin_secret = os.getenv('ADMIN_TOTP_SECRET') if not admin_secret: return {'ok': False, 'error': 'ADMIN_TOTP_SECRET not configured'} # Generate QR code URL qr_data = f"otpauth://totp/CorpusDB:admin@corpusdb?secret={admin_secret}&issuer=CorpusDB" qr_url = f"https://api.qrserver.com/v1/create-qr-code/?size=200x200&data={qr_data}" return {'ok': True, 'secret': admin_secret, 'qr_url': qr_url} @router.post("/auth/admin-totp") def admin_totp_login(payload: dict, response: Response, request: Request): """Admin login using only TOTP code""" from app.rate_limiter import rate_limiter from app.security_logger import security_logger import os import base64 import hashlib import hmac import struct import time def verify_totp(secret, token, valid_window=1): """Verify TOTP token""" try: # Decode base32 secret secret = secret.upper().replace(' ', '') # Add padding if needed missing_padding = len(secret) % 8 if missing_padding: secret += '=' * (8 - missing_padding) key = base64.b32decode(secret) # Get current time counter current_time = int(time.time() / 30) # Check token against current and nearby time windows for i in range(-valid_window, valid_window + 1): time_counter = current_time + i # Generate TOTP msg = struct.pack('>Q', time_counter) hmac_hash = hmac.new(key, msg, hashlib.sha1).digest() offset = hmac_hash[-1] & 0x0f code = struct.unpack('>I', hmac_hash[offset:offset+4])[0] & 0x7fffffff code = code % 1000000 if str(code).zfill(6) == str(token).zfill(6): return True return False except Exception: return False try: # Rate limiting try: rate_limiter.check_rate_limit(request, limit_type='auth') except HTTPException as e: security_logger.log_rate_limit_exceeded( 'admin', request.client.host if request.client else 'unknown', 'auth' ) return JSONResponse( status_code=429, content={'ok': False, 'error': 'Too many requests. Please try again later.'} ) totp_code = payload.get('totp_code') if not totp_code: return {'ok': False, 'error': 'totp_code required'} # Admin TOTP secret - must be set via environment variable admin_secret = os.getenv('ADMIN_TOTP_SECRET') if not admin_secret: return {'ok': False, 'error': 'Admin TOTP not configured. Set ADMIN_TOTP_SECRET environment variable.'} if not verify_totp(admin_secret, totp_code, valid_window=1): security_logger.log_auth_failure( 'admin', request.client.host if request.client else 'unknown', 'Invalid TOTP code' ) return {'ok': False, 'error': 'Invalid TOTP code'} # TOTP verified - get or create admin user admin_user = user_manager.get_user('admin') if not admin_user: # Create admin user result = user_manager.create_user( username='admin', email='admin@corpusdb.local', password='admin_' + os.urandom(16).hex(), # Random password full_name='Administrator' ) if not result['ok']: return {'ok': False, 'error': 'Failed to create admin user'} admin_user = user_manager.get_user('admin') # Generate session token import secrets token = secrets.token_urlsafe(32) # Create session in database from datetime import datetime, timedelta conn = user_manager._get_conn() now = datetime.utcnow() expires = now + timedelta(days=7) conn.execute(""" INSERT INTO sessions (token, username, workspace_id, created_at, expires_at) VALUES (?, ?, ?, ?, ?) """, [token, admin_user['username'], admin_user['workspace_id'], now.isoformat(), expires.isoformat()]) conn.close() security_logger.log_auth_success( 'admin', request.client.host if request.client else 'unknown' ) response.set_cookie( key="session_token", value=token, httponly=True, max_age=7*24*60*60, samesite="none", secure=True ) return { 'ok': True, 'token': token, 'user': { 'username': admin_user['username'], 'email': admin_user['email'], 'role': 'admin', 'workspace_id': admin_user['workspace_id'] } } except Exception as e: import traceback traceback.print_exc() return JSONResponse( status_code=500, content={'ok': False, 'error': f'Server error: {str(e)}'} ) @router.post("/auth/login") def login(payload: LoginRequest, response: Response, request: Request): from app.rate_limiter import rate_limiter from app.security_logger import security_logger client_ip = request.client.host if request.client else 'unknown' logger.info(f"LOGIN REQUEST: username='{payload.username}', ip={client_ip}") try: # Rate limiting try: rate_limiter.check_rate_limit(request, limit_type='auth') except HTTPException as e: logger.warning(f"LOGIN RATE LIMITED: username='{payload.username}', ip={client_ip}") security_logger.log_rate_limit_exceeded( payload.username, client_ip, 'auth' ) return JSONResponse( status_code=429, content={'ok': False, 'error': 'Too many requests. Please try again later.'} ) # Check for brute force if security_logger.detect_brute_force(payload.username): logger.warning(f"LOGIN BLOCKED (brute force): username='{payload.username}', ip={client_ip}") security_logger.log_auth_blocked( payload.username, client_ip ) return {'ok': False, 'error': 'Account temporarily locked due to too many failed attempts'} # Authenticate result = user_manager.authenticate(payload.username, payload.password) if result['ok']: logger.info(f"LOGIN SUCCESS: username='{payload.username}', token={result['token'][:12]}..., ip={client_ip}") security_logger.log_auth_success( payload.username, client_ip ) response.set_cookie( key="session_token", value=result['token'], httponly=True, max_age=7*24*60*60, samesite="none", secure=True ) return result else: logger.warning(f"LOGIN FAILED: username='{payload.username}', reason='{result.get('error')}', ip={client_ip}") security_logger.log_auth_failure( payload.username, client_ip, result.get('error') ) return result except Exception as e: logger.error(f"LOGIN ERROR: username='{payload.username}', exception={e}", exc_info=True) import traceback traceback.print_exc() return JSONResponse( status_code=500, content={'ok': False, 'error': f'Server error: {str(e)}'} ) @router.post("/auth/logout") def logout(response: Response, session_token: str = Cookie(default=None)): if session_token: user_manager.logout(session_token) response.delete_cookie("session_token") return {"ok": True} @router.get("/auth/me") def get_me(request: Request, user = Depends(get_current_user)): if not user: return {"authenticated": False} return {"authenticated": True, "user": user} @router.get("/admin/users") def list_all_users(request: Request, user = Depends(require_admin)): """Debug endpoint to list all users""" users = user_manager.list_users() return {"count": len(users), "users": [{"username": u['username'], "workspace_id": u['workspace_id']} for u in users]} @router.get("/admin/all-databases") def list_all_databases(request: Request, user = Depends(require_admin)): """Admin endpoint to list ALL databases from ALL users""" from pathlib import Path from app.config import settings all_databases = [] users = user_manager.list_users() for u in users: try: u_store = get_user_store(u) result = metadata.list_databases(u_store) if result and isinstance(result, dict) and 'databases' in result: for db in result['databases']: db['owner'] = u['username'] db['workspace_id'] = u['workspace_id'] all_databases.append(db) except Exception: continue return {'databases': all_databases} @router.get("/admin/all-tables") def list_all_tables(request: Request, user = Depends(require_admin)): """Admin endpoint to list ALL tables from ALL users""" all_tables = [] users = user_manager.list_users() for u in users: try: u_store = get_user_store(u) tables = table_manager.list(u_store) if tables and isinstance(tables, list): for table in tables: table['owner'] = u['username'] table['workspace_id'] = u['workspace_id'] all_tables.append(table) except Exception: continue return all_tables # ── SITE MODE CONTROLS ── _site_modes = {'maintenance': False, 'coming_soon': False} @router.get("/admin/site-mode") def get_site_mode(request: Request, user = Depends(require_admin)): """Get current site mode settings""" return { 'ok': True, 'maintenance': _site_modes['maintenance'], 'coming_soon': _site_modes['coming_soon'] } @router.post("/admin/site-mode") def set_site_mode(request: Request, payload: dict, user = Depends(require_admin)): """Set site mode (maintenance/coming_soon)""" global _site_modes if 'maintenance' in payload: _site_modes['maintenance'] = bool(payload['maintenance']) if 'coming_soon' in payload: _site_modes['coming_soon'] = bool(payload['coming_soon']) return { 'ok': True, 'maintenance': _site_modes['maintenance'], 'coming_soon': _site_modes['coming_soon'], 'message': 'Site mode updated' } @router.get("/admin/system-info") def system_info(request: Request, user = Depends(require_admin)): """Get system information""" import platform import os try: import psutil mem = psutil.virtual_memory() disk = psutil.disk_usage("/") has_psutil = True except ImportError: has_psutil = False users = user_manager.list_users() total_storage = 0 total_tables = 0 for u in users: try: u_store = get_user_store(u) data_path = u_store.local("data") if data_path.exists(): # Use os.scandir for faster filesystem traversal for entry in os.scandir(data_path): if entry.is_file(): total_storage += entry.stat().st_size elif entry.is_dir(): for sub in os.scandir(entry.path): if sub.is_file(): total_storage += sub.stat().st_size tables = metadata.list_tables(u_store) total_tables += len(tables) if isinstance(tables, list) else 0 except Exception: pass result = { 'ok': True, 'python_version': platform.python_version(), 'platform': platform.system(), 'total_users': len(users), 'total_tables': total_tables, 'total_storage_bytes': total_storage, 'total_storage_human': _format_bytes(total_storage), } if has_psutil: result['memory_percent'] = mem.percent result['memory_total'] = mem.total result['memory_available'] = mem.available result['disk_total'] = disk.total result['disk_used'] = disk.used result['disk_free'] = disk.free result['disk_percent'] = disk.percent else: result['note'] = 'Install psutil for memory/disk stats: pip install psutil' return result def _format_bytes(b): for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if b < 1024: return f"{b:.1f} {unit}" b /= 1024 return f"{b:.1f} PB" @router.post("/admin/test-password") def test_password(request: Request, payload: dict, user = Depends(require_admin)): """Debug endpoint to test password verification""" username = payload.get('username') password = payload.get('password') if not username or not password: return {'ok': False, 'error': 'username and password required'} user = user_manager.get_user(username) if not user: return {'ok': False, 'error': 'User not found'} is_valid = user_manager.verify_password(password, user['password']) return { 'ok': True, 'username': username, 'password_valid': is_valid, 'stored_hash': user['password'][:30] + '...', 'is_active': user.get('is_active', True) } # ── USER MANAGEMENT (authenticated) ── class CreateUserRequest(BaseModel): username: str email: str password: str full_name: str = '' department: str = '' phone: str = '' role: str = 'viewer' class UpdateUserRequest(BaseModel): email: str = None full_name: str = None department: str = None phone: str = None role: str = None is_active: bool = None password: str = None @router.get("/users") def list_users(request: Request, user = Depends(require_auth)): """List all users (admin-only in production, open for now)""" users = user_manager.list_users() safe = [] for u in users: safe.append({ 'username': u.get('username'), 'email': u.get('email'), 'full_name': u.get('full_name') or '', 'role': u.get('role', 'viewer'), 'department': u.get('department') or '', 'phone': u.get('phone') or '', 'last_login': str(u.get('last_login')) if u.get('last_login') else None, 'is_active': bool(u.get('is_active', True)), 'workspace_id': u.get('workspace_id'), 'created_at': str(u.get('created_at')) if u.get('created_at') else None, }) return safe @router.get("/users/{username}") def get_user(request: Request, username: str, user = Depends(require_auth)): """Get specific user info""" u = user_manager.get_user(username) if not u: raise HTTPException(status_code=404, detail="User not found") return { 'username': u.get('username'), 'email': u.get('email'), 'full_name': u.get('full_name') or '', 'role': u.get('role', 'viewer'), 'is_active': bool(u.get('is_active', True)), 'workspace_id': u.get('workspace_id'), } @router.post("/users") def create_user_admin(request: Request, payload: CreateUserRequest, user = Depends(require_auth)): """Create a new user (admin action)""" from app.input_validator import input_validator is_valid, error = input_validator.validate_username(payload.username) if not is_valid: return {'ok': False, 'error': error} is_valid, error = input_validator.validate_email(payload.email) if not is_valid: return {'ok': False, 'error': error} is_valid, error = input_validator.validate_password(payload.password) if not is_valid: return {'ok': False, 'error': error} return user_manager.create_user( username=payload.username, email=payload.email, password=payload.password, full_name=payload.full_name ) @router.put("/users/{username}") def update_user_admin(request: Request, username: str, payload: UpdateUserRequest, user = Depends(require_auth)): """Update user details""" updates = {k: v for k, v in payload.model_dump().items() if v is not None} return user_manager.update_user(username, updates) @router.delete("/users/{username}") def delete_user_admin(request: Request, username: str, user = Depends(require_auth)): """Delete user""" if username == user.get('username'): return {'ok': False, 'error': 'Cannot delete yourself'} return user_manager.delete_user(username) @router.post("/users/{username}/regenerate-api-key") def regen_user_api_key(request: Request, username: str, user = Depends(require_auth)): """Regenerate user API key""" return user_manager.regenerate_api_key(username) # ── USER SETTINGS ── @router.put("/user/settings") def update_settings(request: Request, payload: UserSettingsRequest, user = Depends(require_auth)): updates = {k: v for k, v in payload.model_dump().items() if v is not None} return user_manager.update_user(user['username'], updates) @router.put("/user/profile") def update_profile(request: Request, payload: dict, user = Depends(require_auth)): """Update user profile (email, full_name)""" allowed = ['email', 'full_name'] updates = {k: v for k, v in payload.items() if k in allowed and v is not None} if not updates: return {'ok': False, 'error': 'No valid fields to update'} return user_manager.update_user(user['username'], updates) @router.put("/user/password") def change_password(request: Request, payload: dict, user = Depends(require_auth)): """Change user password""" current_password = payload.get('current_password', '') new_password = payload.get('new_password', '') if not current_password or not new_password: return {'ok': False, 'error': 'Current and new password required'} if len(new_password) < 8: return {'ok': False, 'error': 'Password must be at least 8 characters'} user_data = user_manager.get_user(user['username']) if not user_data: return {'ok': False, 'error': 'User not found'} if not user_manager.verify_password(current_password, user_data.get('password', '')): return {'ok': False, 'error': 'Current password is incorrect'} result = user_manager.update_user(user['username'], { 'password': new_password }) if result.get('ok'): return {'ok': True, 'message': 'Password changed successfully'} return result @router.post("/user/regenerate-key") def regenerate_key(request: Request, user = Depends(require_auth)): return user_manager.regenerate_api_key(user['username']) # ── CORPUSDB API KEY MANAGEMENT ── @router.post("/corpusdb/generate-api-key") def generate_corpusdb_api_key(request: Request, user = Depends(require_auth)): """Generate new CorpusDB API key with format cDb_xxx and secret xxx-xxx""" result = corpusdb_api_manager.generate_api_key( user_id=user['user_id'], username=user['username'], workspace_id=user['workspace_id'] ) return { 'ok': True, 'api_key': result['api_key'], 'secret_key': result['secret_key'], 'created_at': result['created_at'], 'message': 'IMPORTANT: Save your secret key now. It will not be shown again!' } @router.get("/corpusdb/my-keys") def get_my_corpusdb_keys(request: Request, user = Depends(require_auth)): """Get all CorpusDB API keys for current user""" keys = corpusdb_api_manager.get_user_keys(user['user_id']) return {'ok': True, 'keys': keys} @router.post("/corpusdb/revoke-key") def revoke_corpusdb_key(request: Request, payload: dict, user = Depends(require_auth)): """Revoke a CorpusDB API key""" api_key = payload.get('api_key') if not api_key: return {'ok': False, 'error': 'api_key required'} # Verify key belongs to user keys = corpusdb_api_manager.get_user_keys(user['user_id']) if not any(k['api_key'] == api_key for k in keys): return {'ok': False, 'error': 'Key not found or does not belong to you'} success = corpusdb_api_manager.revoke_key(api_key) return {'ok': success} @router.post("/corpusdb/delete-key") def delete_corpusdb_key(request: Request, payload: dict, user = Depends(require_auth)): """Permanently delete a CorpusDB API key""" api_key = payload.get('api_key') if not api_key: return {'ok': False, 'error': 'api_key required'} # Verify key belongs to user keys = corpusdb_api_manager.get_user_keys(user['user_id']) if not any(k['api_key'] == api_key for k in keys): return {'ok': False, 'error': 'Key not found or does not belong to you'} success = corpusdb_api_manager.delete_key(api_key) return {'ok': success} @router.post("/corpusdb/regenerate-secret") def regenerate_corpusdb_secret(request: Request, payload: dict, user = Depends(require_auth)): """Regenerate secret key for existing API key""" api_key = payload.get('api_key') if not api_key: return {'ok': False, 'error': 'api_key required'} # Verify key belongs to user keys = corpusdb_api_manager.get_user_keys(user['user_id']) if not any(k['api_key'] == api_key for k in keys): return {'ok': False, 'error': 'Key not found or does not belong to you'} new_secret = corpusdb_api_manager.regenerate_secret(api_key) if new_secret: return { 'ok': True, 'secret_key': new_secret, 'message': 'IMPORTANT: Save your new secret key now. It will not be shown again!' } return {'ok': False, 'error': 'Failed to regenerate secret'} # ── CORPUSDB API AUTHENTICATION (for external apps) ── def verify_corpusdb_api_key(x_api_key: str = Header(None), x_secret_key: str = Header(None)): """Verify API key — routes to user key (cDb_) or database key (CDB-) based on prefix""" if not x_api_key or not x_secret_key: raise HTTPException(status_code=401, detail="API key and secret required") if x_api_key.startswith('cDb_'): # User key — full access to all databases user_info = corpusdb_api_manager.verify_api_key(x_api_key, x_secret_key) if not user_info: raise HTTPException(status_code=401, detail="Invalid API key or secret") user = user_manager.get_user_by_id(user_info['user_id']) if not user: raise HTTPException(status_code=401, detail="User not found") user_store = get_user_store(user) return {'user': user, 'user_store': user_store, 'locked_database': None} elif x_api_key.startswith('CDB-'): # Database key - locked to one database # workspace_id is stored in the key record, use it to find the owner directly all_users = user_manager.list_users() for u in all_users: try: u_store = get_user_store(u) result = db_api_key_manager.verify_db_api_key(u_store, x_api_key, x_secret_key) if result: workspace_id = result.get('workspace_id') if workspace_id: owner = user_manager.get_user_by_workspace_id(workspace_id) if owner: return {'user': owner, 'user_store': get_user_store(owner), 'locked_database': result['database']} return {'user': u, 'user_store': u_store, 'locked_database': result['database']} except Exception: continue raise HTTPException(status_code=401, detail="Invalid database API key or secret") else: raise HTTPException(status_code=401, detail="Invalid API key format") def _check_db_access(auth: dict, database: str): """Raise 403 if a DB key tries to access a database it is not locked to""" locked = auth.get('locked_database') if locked and locked != database: raise HTTPException(status_code=403, detail=f"This API key is locked to database '{locked}'") @router.get("/cdb/databases") def corpusdb_list_databases(auth = Depends(verify_corpusdb_api_key)): """List databases using CorpusDB API key""" locked = auth.get('locked_database') if locked: return {'ok': True, 'databases': [{'name': locked}]} return metadata.list_databases(auth['user_store']) @router.get("/cdb/tables") def corpusdb_list_tables(auth = Depends(verify_corpusdb_api_key)): """List tables using CorpusDB API key""" locked = auth.get('locked_database') if locked: return table_manager.list(auth['user_store'], database=locked) return table_manager.list(auth['user_store']) @router.post("/cdb/query") def corpusdb_query(payload: SQLQueryRequest, request: Request, auth = Depends(verify_corpusdb_api_key)): """Execute SQL query using CorpusDB API key""" from app.query_parser import query_parser from app.rate_limiter import rate_limiter rate_limiter.check_rate_limit(request, limit_type='api') is_valid, error = query_parser.validate_query(payload.sql, allow_write=True) if not is_valid: return {'ok': False, 'error': error} is_allowed, reason, score = query_complexity_analyzer.analyze_complexity(payload.sql) if not is_allowed: return {'ok': False, 'error': reason} engine = create_query_engine(auth['user_store']) try: return engine.execute_sql(payload.sql) finally: engine.close() @router.get("/cdb/rows/{database}/{table}") def corpusdb_get_rows(database: str, table: str, limit: int = 100, offset: int = 0, auth = Depends(verify_corpusdb_api_key)): """Get rows using CorpusDB API key""" _check_db_access(auth, database) return row_manager.read(database, table, auth['user_store'], limit=limit, offset=offset) @router.post("/cdb/rows/{database}/{table}") def corpusdb_insert_row(database: str, table: str, payload: dict, auth = Depends(verify_corpusdb_api_key)): """Insert row using CorpusDB API key""" _check_db_access(auth, database) result = row_manager.insert(database, table, payload, auth['user']['username'], auth['user_store'], auth['user']['workspace_id']) if result.get('ok'): try: engine = create_query_engine(auth['user_store']) try: _persist_all_tables(engine, auth['user_store']) finally: engine.close() except Exception: pass return result # -- DATABASE ANALYTICS -- @router.get("/analytics/table/{database}/{table}") def get_table_analytics(request: Request, database: str, table: str, user = Depends(require_auth)): """Get detailed analytics for a table""" user_store = get_user_store(user) return database_analytics.get_table_stats(user_store, database, table) @router.get("/analytics/database/{database}") def get_database_analytics(request: Request, database: str, user = Depends(require_auth)): """Get summary analytics for entire database""" user_store = get_user_store(user) return database_analytics.get_database_summary(user_store, database) @router.get("/analytics/suggestions/{database}/{table}") def get_query_suggestions(request: Request, database: str, table: str, user = Depends(require_auth)): """Get helpful query suggestions""" user_store = get_user_store(user) suggestions = database_analytics.get_query_suggestions(user_store, database, table) return {'ok': True, 'suggestions': suggestions} # -- DATA VISUALIZATION -- @router.post("/visualize/chart") def create_chart_data(request: Request, payload: dict, user = Depends(require_auth)): """Prepare data for chart visualization""" data = payload.get('data', []) x_column = payload.get('x_column') y_column = payload.get('y_column') chart_type = payload.get('chart_type', 'bar') return data_visualizer.prepare_chart_data(data, x_column, y_column, chart_type) @router.post("/visualize/analyze-column") def analyze_column(request: Request, payload: dict, user = Depends(require_auth)): """Analyze a column and return statistics""" data = payload.get('data', []) column = payload.get('column') return data_visualizer.analyze_column(data, column) @router.post("/visualize/distribution") def get_distribution(request: Request, payload: dict, user = Depends(require_auth)): """Get distribution of values for histogram""" data = payload.get('data', []) column = payload.get('column') bins = payload.get('bins', 10) return data_visualizer.get_distribution(data, column, bins) # -- DATABASE TEMPLATES -- @router.get("/templates") def list_templates(request: Request, user = Depends(require_auth)): """List all available database templates""" templates = database_templates.list_templates() return {'ok': True, 'templates': templates} @router.get("/templates/{template_id}") def get_template(request: Request, template_id: str, user = Depends(require_auth)): """Get template details""" return database_templates.get_template(template_id) @router.post("/templates/{template_id}/create") def create_from_template(request: Request, template_id: str, payload: dict, user = Depends(require_auth)): """Create database from template""" database_name = payload.get('database_name') if not database_name: return {'ok': False, 'error': 'database_name required'} user_store = get_user_store(user) return database_templates.create_from_template( template_id, database_name, user_store, user['username'], user['workspace_id'] ) # -- ADVANCED IMPORT/EXPORT -- @router.post("/export/database/sql") def export_database_sql(request: Request, payload: dict, user = Depends(require_auth)): """Export entire database to SQL file (phpMyAdmin style)""" database = payload.get('database') include_structure = payload.get('include_structure', True) include_data = payload.get('include_data', True) add_drop_table = payload.get('add_drop_table', True) if not database: return {'ok': False, 'error': 'database required'} # Get all tables in database user_store = get_user_store(user) all_tables = table_manager.list(user_store) db_tables = [t for t in all_tables if t['database'] == database] if not db_tables: return {'ok': False, 'error': 'No tables found in database'} # Fetch data for each table tables_data = {} for table_info in db_tables: table_name = table_info['table'] rows_result = row_manager.read(database, table_name, user_store, limit=100000) if rows_result: tables_data[table_name] = rows_result else: tables_data[table_name] = [] # Generate SQL dump sql = advanced_import_export.export_database_to_sql( database, tables_data, include_structure, include_data, add_drop_table ) return { 'ok': True, 'sql': sql, 'format': 'sql', 'database': database, 'tables': len(tables_data), 'filename': f"{database}_export.sql" } @router.post("/export/database/json") def export_database_json(request: Request, payload: dict, user = Depends(require_auth)): """Export entire database to JSON format""" database = payload.get('database') pretty = payload.get('pretty', True) if not database: return {'ok': False, 'error': 'database required'} user_store = get_user_store(user) all_tables = table_manager.list(user_store) db_tables = [t for t in all_tables if t['database'] == database] tables_data = {} for table_info in db_tables: table_name = table_info['table'] rows_result = row_manager.read(database, table_name, user_store, limit=100000) tables_data[table_name] = rows_result if rows_result else [] json_data = advanced_import_export.export_database_to_json(database, tables_data, pretty) return { 'ok': True, 'json': json_data, 'format': 'json', 'database': database, 'tables': len(tables_data), 'filename': f"{database}_export.json" } @router.post("/export/database/csv") def export_database_csv(request: Request, payload: dict, user = Depends(require_auth)): """Export entire database to CSV (one file per table)""" database = payload.get('database') if not database: return {'ok': False, 'error': 'database required'} user_store = get_user_store(user) all_tables = table_manager.list(user_store) db_tables = [t for t in all_tables if t['database'] == database] tables_data = {} for table_info in db_tables: table_name = table_info['table'] rows_result = row_manager.read(database, table_name, user_store, limit=100000) tables_data[table_name] = rows_result if rows_result else [] csv_files = advanced_import_export.export_database_to_csv(tables_data) return { 'ok': True, 'csv_files': csv_files, 'format': 'csv', 'database': database, 'tables': len(csv_files) } @router.post("/export/database/xml") def export_database_xml(request: Request, payload: dict, user = Depends(require_auth)): """Export entire database to XML format""" database = payload.get('database') if not database: return {'ok': False, 'error': 'database required'} user_store = get_user_store(user) all_tables = table_manager.list(user_store) db_tables = [t for t in all_tables if t['database'] == database] tables_data = {} for table_info in db_tables: table_name = table_info['table'] rows_result = row_manager.read(database, table_name, user_store, limit=100000) tables_data[table_name] = rows_result if rows_result else [] xml = advanced_import_export.export_database_to_xml(database, tables_data) return { 'ok': True, 'xml': xml, 'format': 'xml', 'database': database, 'tables': len(tables_data), 'filename': f"{database}_export.xml" } @router.post("/export/sql") def export_sql(request: Request, payload: dict, user = Depends(require_auth)): """Export data to SQL format""" database = payload.get('database') table = payload.get('table') data = payload.get('data', []) sql = advanced_import_export.export_to_sql(database, table, data) return {'ok': True, 'sql': sql, 'format': 'sql'} @router.post("/export/csv") def export_csv(request: Request, payload: dict, user = Depends(require_auth)): """Export data to CSV format""" data = payload.get('data', []) csv = advanced_import_export.export_to_csv(data) return {'ok': True, 'csv': csv, 'format': 'csv'} @router.post("/export/json") def export_json_data(request: Request, payload: dict, user = Depends(require_auth)): """Export data to JSON format""" data = payload.get('data', []) pretty = payload.get('pretty', True) json_data = advanced_import_export.export_to_json(data, pretty) return {'ok': True, 'json': json_data, 'format': 'json'} @router.post("/export/xml") def export_xml(request: Request, payload: dict, user = Depends(require_auth)): """Export data to XML format""" database = payload.get('database') table = payload.get('table') data = payload.get('data', []) xml = advanced_import_export.export_to_xml(database, table, data) return {'ok': True, 'xml': xml, 'format': 'xml'} @router.post("/import/sql") def import_sql(request: Request, payload: dict, user = Depends(require_auth)): """Import and execute SQL file (phpMyAdmin format)""" sql_content = payload.get('sql') execute = payload.get('execute', True) database = payload.get('database') if not sql_content: return {'ok': False, 'error': 'sql content required'} user_store = get_user_store(user) if execute: engine = create_query_engine(user_store) try: result = advanced_import_export.import_from_sql(sql_content, query_engine=engine, target_database=database) # Persist all tables to Parquet and upload to HF _persist_all_tables(engine, user_store) # Log import result to security log (synced to HF Space) is_ok = result.get('ok', False) severity = 'INFO' if is_ok else 'ERROR' details = { 'database': database, 'total_statements': result.get('total_statements', 0), 'executed': result.get('executed', 0), 'failed': result.get('failed', 0), 'tables_created': result.get('tables_created', 0), 'rows_inserted': result.get('rows_inserted', 0), } security_logger.log_event( 'DATA_IMPORT', username=user['username'], ip_address=request.client.host if request.client else 'unknown', details=details, severity=severity ) return result finally: engine.close() else: # Just parse, don't execute return advanced_import_export.import_from_sql(sql_content) @router.post("/import/csv-to-table") async def import_csv_to_table(request: Request, database: str = Form(...), table: str = Form(...), file: UploadFile = File(...), user = Depends(require_auth)): """Import CSV file directly to a table""" user_store = get_user_store(user) # Read CSV content content = await file.read() csv_content = content.decode('utf-8') # Parse CSV import csv import io csv_reader = csv.DictReader(io.StringIO(csv_content)) rows = list(csv_reader) if not rows: return {'ok': False, 'error': 'CSV file is empty'} # Check if table exists all_tables = table_manager.list(user_store) table_exists = any(t['database'] == database and t['table'] == table for t in all_tables) if not table_exists: # Create table from CSV columns columns = [] for col_name in rows[0].keys(): # Infer type from first row value = rows[0][col_name] col_type = 'TEXT' if value: try: int(value) col_type = 'INTEGER' except: try: float(value) col_type = 'DOUBLE' except: col_type = 'TEXT' columns.append({'name': col_name, 'type': col_type}) # Create table create_result = table_manager.create(database, table, columns, user_store) if not create_result.get('ok'): return create_result # Insert rows success_count = 0 error_count = 0 errors = [] for row in rows: try: result = row_manager.insert(database, table, row, user['username'], user_store, user['workspace_id']) if result.get('ok'): success_count += 1 else: error_count += 1 errors.append(result.get('error', 'Unknown error')) except Exception as e: error_count += 1 errors.append(str(e)) return { 'ok': True, 'message': f'Imported {success_count} rows successfully', 'success_count': success_count, 'error_count': error_count, 'errors': errors[:10] if errors else [] # Return first 10 errors } @router.post("/import/sql-dump") def import_sql_dump(request: Request, payload: dict, user = Depends(require_auth)): """Import a phpMyAdmin-style SQL dump — converts MySQL syntax to DuckDB and executes""" from app.query_parser import query_parser from app.mysql_to_duckdb_converter import mysql_to_duckdb_converter from app.table_manager import table_manager from app.utils import write_json database = payload.get('database') sql_text = payload.get('sql', '') if not database or not sql_text: return {'ok': False, 'error': 'database and sql required'} safe_db = query_parser.sanitize_identifier(database) user_store = get_user_store(user) engine = create_query_engine(user_store) # Ensure schema exists try: engine.conn.execute(f'CREATE SCHEMA IF NOT EXISTS "{safe_db}"') except: pass statements = query_parser.split_sql_statements(sql_text) success_count = 0 error_count = 0 skipped_count = 0 errors = [] tables_created = [] try: for stmt in statements: stmt = stmt.strip() if not stmt: continue # Convert MySQL syntax to DuckDB converted, should_skip = mysql_to_duckdb_converter.convert_statement(stmt) if should_skip or not converted: skipped_count += 1 continue # Prefix table references with database if unqualified if database: converted = re.sub( r'(CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?)["`]?(\w+)["`]?', lambda m, db=safe_db: f'{m.group(1)}"{db}"."{m.group(2)}"', converted, count=1, flags=re.IGNORECASE ) converted = re.sub( r'(INSERT\s+INTO\s+)["`]?(\w+)["`]?', lambda m, db=safe_db: f'{m.group(1)}"{db}"."{m.group(2)}"', converted, count=1, flags=re.IGNORECASE ) converted = re.sub( r'(ALTER\s+TABLE\s+)["`]?(\w+)["`]?', lambda m, db=safe_db: f'{m.group(1)}"{db}"."{m.group(2)}"', converted, count=1, flags=re.IGNORECASE ) try: result = engine.execute_sql(converted) if result.get('ok'): success_count += 1 # Track created tables create_match = re.search(r'CREATE\s+TABLE\s+["`]?(\w+)["`]?\.["`]?(\w+)["`]?', converted, re.IGNORECASE) if create_match: tables_created.append(create_match.group(2)) else: error_count += 1 errors.append(result.get('error', 'Unknown error')) except Exception as e: error_count += 1 errors.append(str(e)) # Persist all tables to Parquet and update metadata if success_count > 0: try: _persist_all_tables(engine, user_store) except Exception as e: print(f"Warning: Failed to persist import results: {e}") finally: engine.close() return { 'ok': True, 'success_count': success_count, 'error_count': error_count, 'skipped_count': skipped_count, 'tables_created': tables_created, 'errors': errors[:10] } @router.post("/import/json-rows") def import_json_rows(request: Request, payload: dict, user = Depends(require_auth)): """Import an array of JSON rows into a table, creating it if needed""" database = payload.get('database') table = payload.get('table') rows = payload.get('rows', []) if not database or not table or not rows: return {'ok': False, 'error': 'database, table, and rows required'} if not isinstance(rows, list) or len(rows) == 0: return {'ok': False, 'error': 'rows must be a non-empty array'} user_store = get_user_store(user) success_count = 0 error_count = 0 errors = [] for row in rows: try: result = row_manager.insert(database, table, row, user['username'], user_store, user['workspace_id']) if result.get('ok'): success_count += 1 else: error_count += 1 errors.append(result.get('error', 'Unknown error')) except Exception as e: error_count += 1 errors.append(str(e)) if success_count > 0: try: engine = create_query_engine(user_store) try: _persist_all_tables(engine, user_store) finally: engine.close() except Exception: pass return { 'ok': True, 'success_count': success_count, 'error_count': error_count, 'errors': errors[:10] } @router.post("/import/csv-parse") def import_csv_parse(request: Request, payload: dict, user = Depends(require_auth)): """Parse CSV data""" csv_content = payload.get('csv') return advanced_import_export.import_from_csv(csv_content) @router.post("/import/json-parse") def import_json_parse(request: Request, payload: dict, user = Depends(require_auth)): """Parse JSON data""" json_content = payload.get('json') return advanced_import_export.import_from_json(json_content) # -- MIGRATIONS -- @router.post("/migrations/create") def create_migration(request: Request, payload: dict, user = Depends(require_auth)): """Create a new migration""" database = payload.get('database') name = payload.get('name') up_sql = payload.get('up_sql') down_sql = payload.get('down_sql') user_store = get_user_store(user) return migration_manager.create_migration(user_store, database, name, up_sql, down_sql) @router.get("/migrations/{database}") def list_migrations(request: Request, database: str, user = Depends(require_auth)): """List migrations for database""" user_store = get_user_store(user) migrations = migration_manager.list_migrations(user_store, database) return {'ok': True, 'migrations': migrations} @router.post("/migrations/{database}/{migration_id}/apply") def apply_migration(request: Request, database: str, migration_id: str, user = Depends(require_auth)): """Apply a migration""" user_store = get_user_store(user) engine = create_query_engine(user_store) try: return migration_manager.apply_migration(user_store, database, migration_id, engine) finally: engine.close() @router.post("/migrations/{database}/{migration_id}/rollback") def rollback_migration(request: Request, database: str, migration_id: str, user = Depends(require_auth)): """Rollback a migration""" user_store = get_user_store(user) engine = create_query_engine(user_store) try: return migration_manager.rollback_migration(user_store, database, migration_id, engine) finally: engine.close() # -- QUERY BUILDER -- @router.post("/query-builder/select") def build_select_query(request: Request, payload: dict, user = Depends(require_auth)): """Build SELECT query""" sql = query_builder.build_select(**payload) return {'ok': True, 'sql': sql} @router.post("/query-builder/insert") def build_insert_query(request: Request, payload: dict, user = Depends(require_auth)): """Build INSERT query""" sql = query_builder.build_insert(**payload) return {'ok': True, 'sql': sql} @router.post("/query-builder/update") def build_update_query(request: Request, payload: dict, user = Depends(require_auth)): """Build UPDATE query""" sql = query_builder.build_update(**payload) return {'ok': True, 'sql': sql} @router.post("/query-builder/delete") def build_delete_query(request: Request, payload: dict, user = Depends(require_auth)): """Build DELETE query""" sql = query_builder.build_delete(**payload) return {'ok': True, 'sql': sql} @router.post("/query-builder/aggregate") def build_aggregate_query(request: Request, payload: dict, user = Depends(require_auth)): """Build aggregate query""" sql = query_builder.build_aggregate(**payload) return {'ok': True, 'sql': sql} # -- DATA VALIDATION -- @router.post("/validate/row") def validate_row(request: Request, payload: dict, user = Depends(require_auth)): """Validate row data""" data = payload.get('data') schema = payload.get('schema') return data_validator.validate_row(data, schema) @router.post("/validate/email") def validate_email(request: Request, payload: dict, user = Depends(require_auth)): """Validate email""" email = payload.get('email') valid = data_validator.validate_email(email) return {'ok': True, 'valid': valid} @router.post("/validate/phone") def validate_phone(request: Request, payload: dict, user = Depends(require_auth)): """Validate phone number""" phone = payload.get('phone') valid = data_validator.validate_phone(phone) return {'ok': True, 'valid': valid} # -- JOB SCHEDULER -- @router.post("/jobs/create") def create_job(request: Request, payload: dict, user = Depends(require_auth)): """Create scheduled job""" name = payload.get('name') job_type = payload.get('type') config = payload.get('config') schedule = payload.get('schedule') user_store = get_user_store(user) return job_scheduler.create_job(user_store, name, job_type, config, schedule) @router.get("/jobs") def list_jobs(request: Request, user = Depends(require_auth)): """List all jobs""" user_store = get_user_store(user) jobs = job_scheduler.list_jobs(user_store) return {'ok': True, 'jobs': jobs} @router.post("/jobs/{job_id}/run") def run_job(request: Request, job_id: str, user = Depends(require_auth)): """Run job manually""" user_store = get_user_store(user) engine = create_query_engine(user_store) try: return job_scheduler.run_job_now(user_store, job_id, engine) finally: engine.close() @router.post("/jobs/{job_id}/enable") def enable_job(request: Request, job_id: str, user = Depends(require_auth)): """Enable job""" user_store = get_user_store(user) return job_scheduler.enable_job(user_store, job_id) @router.post("/jobs/{job_id}/disable") def disable_job(request: Request, job_id: str, user = Depends(require_auth)): """Disable job""" user_store = get_user_store(user) return job_scheduler.disable_job(user_store, job_id) @router.delete("/jobs/{job_id}") def delete_job(request: Request, job_id: str, user = Depends(require_auth)): """Delete job""" user_store = get_user_store(user) return job_scheduler.delete_job(user_store, job_id) # -- QUERY BOOKMARKS -- @router.post("/bookmarks") def create_bookmark(request: Request, payload: dict, user = Depends(require_auth)): """Save a new query bookmark""" name = payload.get('name') sql = payload.get('sql') database = payload.get('database') description = payload.get('description', '') tags = payload.get('tags', []) if not name or not sql: return {'ok': False, 'error': 'name and sql required'} user_store = get_user_store(user) return bookmark_manager.save_bookmark(user_store, name, sql, database, description, tags) @router.get("/bookmarks") def list_bookmarks(request: Request, database: str = None, tags: str = None, user = Depends(require_auth)): """List all bookmarks with optional filtering""" user_store = get_user_store(user) tag_list = tags.split(',') if tags else None bookmarks = bookmark_manager.list_bookmarks(user_store, database, tag_list) return {'ok': True, 'bookmarks': bookmarks} @router.get("/bookmarks/{bookmark_id}") def get_bookmark(request: Request, bookmark_id: str, user = Depends(require_auth)): """Get a specific bookmark""" user_store = get_user_store(user) bookmark = bookmark_manager.get_bookmark(user_store, bookmark_id) if not bookmark: return {'ok': False, 'error': 'Bookmark not found'} return {'ok': True, 'bookmark': bookmark} @router.put("/bookmarks/{bookmark_id}") def update_bookmark(request: Request, bookmark_id: str, payload: dict, user = Depends(require_auth)): """Update an existing bookmark""" user_store = get_user_store(user) return bookmark_manager.update_bookmark(user_store, bookmark_id, payload) @router.delete("/bookmarks/{bookmark_id}") def delete_bookmark(request: Request, bookmark_id: str, user = Depends(require_auth)): """Delete a bookmark""" user_store = get_user_store(user) return bookmark_manager.delete_bookmark(user_store, bookmark_id) @router.post("/bookmarks/{bookmark_id}/execute") def execute_bookmark(request: Request, bookmark_id: str, user = Depends(require_auth)): """Execute a bookmarked query""" user_store = get_user_store(user) engine = create_query_engine(user_store) try: result = bookmark_manager.execute_bookmark(user_store, bookmark_id, engine) return result finally: engine.close() @router.get("/bookmarks/search/{search_term}") def search_bookmarks(request: Request, search_term: str, user = Depends(require_auth)): """Search bookmarks""" user_store = get_user_store(user) results = bookmark_manager.search_bookmarks(user_store, search_term) return {'ok': True, 'bookmarks': results} @router.get("/bookmarks/popular/list") def get_popular_bookmarks(request: Request, limit: int = 10, user = Depends(require_auth)): """Get most popular bookmarks""" user_store = get_user_store(user) bookmarks = bookmark_manager.get_popular_bookmarks(user_store, limit) return {'ok': True, 'bookmarks': bookmarks} @router.get("/bookmarks/recent/list") def get_recent_bookmarks(request: Request, limit: int = 10, user = Depends(require_auth)): """Get recently executed bookmarks""" user_store = get_user_store(user) bookmarks = bookmark_manager.get_recent_bookmarks(user_store, limit) return {'ok': True, 'bookmarks': bookmarks} @router.get("/bookmarks/export/json") def export_bookmarks(request: Request, user = Depends(require_auth)): """Export all bookmarks as JSON""" user_store = get_user_store(user) json_data = bookmark_manager.export_bookmarks(user_store) return Response(content=json_data, media_type="application/json") @router.post("/bookmarks/import/json") def import_bookmarks(request: Request, payload: dict, user = Depends(require_auth)): """Import bookmarks from JSON""" json_data = payload.get('json') merge = payload.get('merge', True) if not json_data: return {'ok': False, 'error': 'json data required'} user_store = get_user_store(user) return bookmark_manager.import_bookmarks(user_store, json_data, merge) # -- GLOBAL SEARCH -- @router.post("/search/global") def search_database_global(request: Request, payload: dict, user = Depends(require_auth)): """Search across all tables in a database""" database = payload.get('database') search_term = payload.get('search_term') tables = payload.get('tables') # Optional: specific tables case_sensitive = payload.get('case_sensitive', False) regex = payload.get('regex', False) limit_per_table = payload.get('limit_per_table', 100) if not database or not search_term: return {'ok': False, 'error': 'database and search_term required'} user_store = get_user_store(user) return global_search.search_database( user_store, database, search_term, tables, case_sensitive, regex, limit_per_table ) @router.post("/search/column") def search_column(request: Request, payload: dict, user = Depends(require_auth)): """Search in a specific column""" database = payload.get('database') table = payload.get('table') column = payload.get('column') search_term = payload.get('search_term') case_sensitive = payload.get('case_sensitive', False) regex = payload.get('regex', False) limit = payload.get('limit', 100) if not all([database, table, column, search_term]): return {'ok': False, 'error': 'database, table, column, and search_term required'} user_store = get_user_store(user) return global_search.search_column( user_store, database, table, column, search_term, case_sensitive, regex, limit ) @router.post("/search/duplicates") def find_duplicates(request: Request, payload: dict, user = Depends(require_auth)): """Find duplicate rows in a table""" database = payload.get('database') table = payload.get('table') columns = payload.get('columns', []) limit = payload.get('limit', 100) if not database or not table or not columns: return {'ok': False, 'error': 'database, table, and columns required'} user_store = get_user_store(user) return global_search.find_duplicates(user_store, database, table, columns, limit) @router.get("/search/nulls/{database}/{table}") def find_null_values(request: Request, database: str, table: str, user = Depends(require_auth)): """Find NULL values in a table""" user_store = get_user_store(user) return global_search.find_null_values(user_store, database, table) # -- TABLE MAINTENANCE -- @router.post("/maintenance/check/{database}/{table}") def check_table(request: Request, database: str, table: str, user = Depends(require_auth)): """Check table for errors and integrity issues""" user_store = get_user_store(user) return table_maintenance.check_table(user_store, database, table) @router.post("/maintenance/optimize/{database}/{table}") def optimize_table(request: Request, database: str, table: str, user = Depends(require_auth)): """Optimize table storage and performance""" user_store = get_user_store(user) return table_maintenance.optimize_table(user_store, database, table) @router.post("/maintenance/analyze/{database}/{table}") def analyze_table(request: Request, database: str, table: str, user = Depends(require_auth)): """Analyze table and collect statistics""" user_store = get_user_store(user) return table_maintenance.analyze_table(user_store, database, table) @router.post("/maintenance/repair/{database}/{table}") def repair_table(request: Request, database: str, table: str, user = Depends(require_auth)): """Attempt to repair a corrupted table""" user_store = get_user_store(user) return table_maintenance.repair_table(user_store, database, table) @router.get("/maintenance/status/{database}/{table}") def get_table_status(request: Request, database: str, table: str, user = Depends(require_auth)): """Get comprehensive table status and statistics""" user_store = get_user_store(user) return table_maintenance.get_table_status(user_store, database, table) # -- BLOB HANDLING -- @router.get("/blob/info/{database}/{table}/{row_id_field}/{row_id_value}/{blob_column}") def get_blob_info(request: Request, database: str, table: str, row_id_field: str, row_id_value: str, blob_column: str, user = Depends(require_auth)): """Get BLOB information and preview""" user_store = get_user_store(user) return blob_handler.get_blob_info(user_store, database, table, row_id_field, row_id_value, blob_column) @router.post("/blob/analyze") def analyze_blob(request: Request, payload: dict, user = Depends(require_auth)): """Analyze BLOB data""" import base64 data_base64 = payload.get('data') filename = payload.get('filename') if not data_base64: return {'ok': False, 'error': 'data required (base64 encoded)'} try: data = base64.b64decode(data_base64) analysis = blob_handler.analyze_blob(data, filename) analysis['ok'] = True return analysis except Exception as e: return {'ok': False, 'error': f'Failed to analyze BLOB: {str(e)}'} @router.post("/blob/download") def create_blob_download(request: Request, payload: dict, user = Depends(require_auth)): """Create download response for BLOB""" import base64 data_base64 = payload.get('data') filename = payload.get('filename', 'download.bin') mime_type = payload.get('mime_type') if not data_base64: return {'ok': False, 'error': 'data required (base64 encoded)'} try: data = base64.b64decode(data_base64) return blob_handler.create_download_response(data, filename, mime_type) except Exception as e: return {'ok': False, 'error': f'Failed to create download: {str(e)}'} # -- VIEWS MANAGEMENT -- @router.post("/views") def create_view(request: Request, payload: dict, user = Depends(require_auth)): """Create a new view""" database = payload.get('database') view_name = payload.get('view_name') sql_query = payload.get('sql_query') replace = payload.get('replace', False) if not all([database, view_name, sql_query]): return {'ok': False, 'error': 'database, view_name, and sql_query required'} user_store = get_user_store(user) return views_manager.create_view(user_store, database, view_name, sql_query, replace) @router.get("/views") def list_views(request: Request, database: str = None, user = Depends(require_auth)): """List all views""" user_store = get_user_store(user) views = views_manager.list_views(user_store, database) return {'ok': True, 'views': views} @router.get("/views/{database}/{view_name}") def get_view(request: Request, database: str, view_name: str, user = Depends(require_auth)): """Get view information""" user_store = get_user_store(user) view = views_manager.get_view(user_store, database, view_name) if not view: return {'ok': False, 'error': 'View not found'} return {'ok': True, 'view': view} @router.delete("/views/{database}/{view_name}") def drop_view(request: Request, database: str, view_name: str, user = Depends(require_auth)): """Drop a view""" user_store = get_user_store(user) return views_manager.drop_view(user_store, database, view_name) @router.get("/views/{database}/{view_name}/query") def query_view(request: Request, database: str, view_name: str, limit: int = 100, offset: int = 0, user = Depends(require_auth)): """Query a view""" user_store = get_user_store(user) return views_manager.query_view(user_store, database, view_name, limit, offset) @router.get("/views/{database}/{view_name}/dependencies") def get_view_dependencies(request: Request, database: str, view_name: str, user = Depends(require_auth)): """Get view dependencies""" user_store = get_user_store(user) return views_manager.get_view_dependencies(user_store, database, view_name) # -- PRIVILEGE MANAGEMENT V2 -- @router.post("/privileges/grant/database") def grant_database_privilege(request: Request, payload: dict, user = Depends(require_auth)): """Grant privileges on entire database""" username = payload.get('username') database = payload.get('database') privileges = payload.get('privileges', []) grant_option = payload.get('grant_option', False) if not all([username, database, privileges]): return {'ok': False, 'error': 'username, database, and privileges required'} user_store = get_user_store(user) return privilege_manager_v2.grant_database_privilege( user_store, username, database, privileges, grant_option ) @router.post("/privileges/grant/table") def grant_table_privilege(request: Request, payload: dict, user = Depends(require_auth)): """Grant privileges on specific table""" username = payload.get('username') database = payload.get('database') table = payload.get('table') privileges = payload.get('privileges', []) grant_option = payload.get('grant_option', False) if not all([username, database, table, privileges]): return {'ok': False, 'error': 'username, database, table, and privileges required'} user_store = get_user_store(user) return privilege_manager_v2.grant_table_privilege( user_store, username, database, table, privileges, grant_option ) @router.post("/privileges/grant/column") def grant_column_privilege(request: Request, payload: dict, user = Depends(require_auth)): """Grant privileges on specific columns""" username = payload.get('username') database = payload.get('database') table = payload.get('table') columns = payload.get('columns', []) privileges = payload.get('privileges', []) if not all([username, database, table, columns, privileges]): return {'ok': False, 'error': 'username, database, table, columns, and privileges required'} user_store = get_user_store(user) return privilege_manager_v2.grant_column_privilege( user_store, username, database, table, columns, privileges ) @router.post("/privileges/revoke/database") def revoke_database_privilege(request: Request, payload: dict, user = Depends(require_auth)): """Revoke privileges on database""" username = payload.get('username') database = payload.get('database') privileges = payload.get('privileges', []) if not all([username, database, privileges]): return {'ok': False, 'error': 'username, database, and privileges required'} user_store = get_user_store(user) return privilege_manager_v2.revoke_database_privilege( user_store, username, database, privileges ) @router.post("/privileges/revoke/table") def revoke_table_privilege(request: Request, payload: dict, user = Depends(require_auth)): """Revoke privileges on table""" username = payload.get('username') database = payload.get('database') table = payload.get('table') privileges = payload.get('privileges', []) if not all([username, database, table, privileges]): return {'ok': False, 'error': 'username, database, table, and privileges required'} user_store = get_user_store(user) return privilege_manager_v2.revoke_table_privilege( user_store, username, database, table, privileges ) @router.get("/privileges/user/{username}") def get_user_privileges(request: Request, username: str, user = Depends(require_auth)): """Get all privileges for a user""" user_store = get_user_store(user) return privilege_manager_v2.get_user_privileges(user_store, username) @router.get("/privileges/check") def check_privilege(request: Request, username: str, privilege: str, database: str = None, table: str = None, column: str = None, user = Depends(require_auth)): """Check if user has specific privilege""" user_store = get_user_store(user) has_privilege = privilege_manager_v2.check_privilege( user_store, username, privilege, database, table, column ) return {'ok': True, 'has_privilege': has_privilege} @router.get("/privileges/list") def list_all_privileges(request: Request, user = Depends(require_auth)): """List all users and their privileges""" user_store = get_user_store(user) return privilege_manager_v2.list_all_privileges(user_store) @router.post("/privileges/copy") def copy_privileges(request: Request, payload: dict, user = Depends(require_auth)): """Copy privileges from one user to another""" from_user = payload.get('from_user') to_user = payload.get('to_user') if not all([from_user, to_user]): return {'ok': False, 'error': 'from_user and to_user required'} user_store = get_user_store(user) return privilege_manager_v2.copy_privileges(user_store, from_user, to_user) @router.get("/privileges/export") def export_privileges(request: Request, username: str = None, user = Depends(require_auth)): """Export privileges as SQL GRANT statements""" user_store = get_user_store(user) sql = privilege_manager_v2.export_privileges(user_store, username) return Response(content=sql, media_type="text/plain") # -- STORED PROCEDURES AND FUNCTIONS -- @router.post("/procedures") def create_procedure(request: Request, payload: dict, user = Depends(require_auth)): """Create a new stored procedure""" database = payload.get('database') name = payload.get('name') parameters = payload.get('parameters', []) body = payload.get('body') description = payload.get('description', '') if not all([database, name, body]): return {'ok': False, 'error': 'database, name, and body required'} user_store = get_user_store(user) return stored_procedure_manager.create_procedure( user_store, database, name, parameters, body, description ) @router.post("/functions") def create_function(request: Request, payload: dict, user = Depends(require_auth)): """Create a new function""" database = payload.get('database') name = payload.get('name') parameters = payload.get('parameters', []) return_type = payload.get('return_type') body = payload.get('body') description = payload.get('description', '') if not all([database, name, return_type, body]): return {'ok': False, 'error': 'database, name, return_type, and body required'} user_store = get_user_store(user) return stored_procedure_manager.create_function( user_store, database, name, parameters, return_type, body, description ) @router.get("/procedures") def list_procedures(request: Request, database: str = None, user = Depends(require_auth)): """List all procedures and functions""" user_store = get_user_store(user) procedures = stored_procedure_manager.list_procedures(user_store, database) return {'ok': True, 'procedures': procedures} @router.get("/procedures/{database}/{name}") def get_procedure(request: Request, database: str, name: str, user = Depends(require_auth)): """Get a specific procedure or function""" user_store = get_user_store(user) procedure = stored_procedure_manager.get_procedure(user_store, database, name) if not procedure: return {'ok': False, 'error': 'Procedure not found'} return {'ok': True, 'procedure': procedure} @router.put("/procedures/{database}/{name}") def update_procedure(request: Request, database: str, name: str, payload: dict, user = Depends(require_auth)): """Update a procedure or function""" user_store = get_user_store(user) return stored_procedure_manager.update_procedure(user_store, database, name, payload) @router.delete("/procedures/{database}/{name}") def drop_procedure(request: Request, database: str, name: str, user = Depends(require_auth)): """Drop a procedure or function""" user_store = get_user_store(user) return stored_procedure_manager.drop_procedure(user_store, database, name) @router.post("/procedures/{database}/{name}/execute") def execute_procedure(request: Request, database: str, name: str, payload: dict, user = Depends(require_auth)): """Execute a stored procedure""" arguments = payload.get('arguments', []) user_store = get_user_store(user) engine = create_query_engine(user_store) try: return stored_procedure_manager.execute_procedure( user_store, database, name, arguments, engine ) finally: engine.close() @router.post("/functions/{database}/{name}/call") def call_function(request: Request, database: str, name: str, payload: dict, user = Depends(require_auth)): """Call a function""" arguments = payload.get('arguments', []) user_store = get_user_store(user) engine = create_query_engine(user_store) try: return stored_procedure_manager.call_function( user_store, database, name, arguments, engine ) finally: engine.close() @router.get("/procedures/{database}/{name}/stats") def get_procedure_stats(request: Request, database: str, name: str, user = Depends(require_auth)): """Get execution statistics for a procedure/function""" user_store = get_user_store(user) return stored_procedure_manager.get_procedure_stats(user_store, database, name) @router.get("/procedures/{database}/export") def export_procedures(request: Request, database: str, user = Depends(require_auth)): """Export all procedures and functions as SQL""" user_store = get_user_store(user) sql = stored_procedure_manager.export_procedures(user_store, database) return Response(content=sql, media_type="text/plain") # -- TRIGGERS -- @router.post("/triggers") def create_trigger(request: Request, payload: dict, user = Depends(require_auth)): """Create a new trigger""" database = payload.get('database') table = payload.get('table') name = payload.get('name') timing = payload.get('timing') # BEFORE or AFTER event = payload.get('event') # INSERT, UPDATE, DELETE body = payload.get('body') description = payload.get('description', '') if not all([database, table, name, timing, event, body]): return {'ok': False, 'error': 'database, table, name, timing, event, and body required'} user_store = get_user_store(user) return triggers_manager.create_trigger( user_store, database, table, name, timing, event, body, description ) @router.get("/triggers") def list_triggers(request: Request, database: str = None, table: str = None, user = Depends(require_auth)): """List all triggers""" user_store = get_user_store(user) triggers = triggers_manager.list_triggers(user_store, database, table) return {'ok': True, 'triggers': triggers} @router.get("/triggers/{database}/{name}") def get_trigger(request: Request, database: str, name: str, user = Depends(require_auth)): """Get a specific trigger""" user_store = get_user_store(user) trigger = triggers_manager.get_trigger(user_store, database, name) if not trigger: return {'ok': False, 'error': 'Trigger not found'} return {'ok': True, 'trigger': trigger} @router.put("/triggers/{database}/{name}") def update_trigger(request: Request, database: str, name: str, payload: dict, user = Depends(require_auth)): """Update a trigger""" user_store = get_user_store(user) return triggers_manager.update_trigger(user_store, database, name, payload) @router.delete("/triggers/{database}/{name}") def drop_trigger(request: Request, database: str, name: str, user = Depends(require_auth)): """Drop a trigger""" user_store = get_user_store(user) return triggers_manager.drop_trigger(user_store, database, name) @router.post("/triggers/{database}/{name}/enable") def enable_trigger(request: Request, database: str, name: str, user = Depends(require_auth)): """Enable a trigger""" user_store = get_user_store(user) return triggers_manager.enable_trigger(user_store, database, name) @router.post("/triggers/{database}/{name}/disable") def disable_trigger(request: Request, database: str, name: str, user = Depends(require_auth)): """Disable a trigger""" user_store = get_user_store(user) return triggers_manager.disable_trigger(user_store, database, name) @router.get("/triggers/{database}/{name}/stats") def get_trigger_stats(request: Request, database: str, name: str, user = Depends(require_auth)): """Get execution statistics for a trigger""" user_store = get_user_store(user) return triggers_manager.get_trigger_stats(user_store, database, name) @router.get("/triggers/{database}/export") def export_triggers(request: Request, database: str, table: str = None, user = Depends(require_auth)): """Export triggers as SQL""" user_store = get_user_store(user) sql = triggers_manager.export_triggers(user_store, database, table) return Response(content=sql, media_type="text/plain") @router.get("/triggers/{database}/{table}/list") def get_table_triggers(request: Request, database: str, table: str, user = Depends(require_auth)): """Get all triggers for a specific table""" user_store = get_user_store(user) return triggers_manager.get_table_triggers(user_store, database, table) # -- PROCESS MANAGEMENT -- @router.get("/processes") def list_processes(request: Request, username: str = None, status: str = None, user = Depends(require_auth)): """List all active queries""" processes = process_manager.list_processes(username, status) return {'ok': True, 'processes': processes} @router.post("/processes/{query_id}/kill") def kill_query(request: Request, query_id: str, user = Depends(require_auth)): """Kill a running query""" return process_manager.kill_query(query_id) @router.get("/processes/history") def get_query_history(request: Request, username: str = None, limit: int = 100, user = Depends(require_auth)): """Get query execution history""" history = process_manager.get_query_history(username, limit) return {'ok': True, 'history': history} @router.get("/processes/slow") def get_slow_queries(request: Request, threshold: float = 5.0, user = Depends(require_auth)): """Get queries running longer than threshold""" slow_queries = process_manager.get_slow_queries(threshold) return {'ok': True, 'slow_queries': slow_queries} @router.post("/processes/kill-slow") def kill_slow_queries(request: Request, payload: dict, user = Depends(require_auth)): """Kill all slow queries""" threshold = payload.get('threshold', 30.0) return process_manager.kill_slow_queries(threshold) @router.get("/processes/stats") def get_query_stats(request: Request, username: str = None, user = Depends(require_auth)): """Get query execution statistics""" return process_manager.get_query_stats(username) @router.get("/processes/users") def get_active_users(request: Request, user = Depends(require_auth)): """Get list of users with active queries""" users = process_manager.get_active_users() return {'ok': True, 'users': users} @router.get("/processes/export") def export_process_list(request: Request, user = Depends(require_auth)): """Export current process list as text""" text = process_manager.export_process_list() return Response(content=text, media_type="text/plain") @router.post("/processes/clear-history") def clear_query_history(request: Request, payload: dict, user = Depends(require_auth)): """Clear old query history""" older_than_hours = payload.get('older_than_hours', 24) return process_manager.clear_history(older_than_hours) # -- CONFIGURATION ANALYZER -- @router.get("/config/analyze") def analyze_configuration(request: Request, user = Depends(require_auth)): """Analyze workspace configuration and get suggestions""" user_store = get_user_store(user) return configuration_analyzer.analyze_workspace(user_store) @router.get("/config/analyze-table/{database}/{table}") def analyze_table_performance(request: Request, database: str, table: str, user = Depends(require_auth)): """Analyze specific table performance""" user_store = get_user_store(user) return configuration_analyzer.analyze_table_performance(user_store, database, table) @router.get("/config/tips") def get_optimization_tips(request: Request, user = Depends(require_auth)): """Get general optimization tips""" tips = configuration_analyzer.get_optimization_tips() return {'ok': True, 'tips': tips} @router.get("/config/report") def export_configuration_report(request: Request, user = Depends(require_auth)): """Export configuration analysis as text report""" user_store = get_user_store(user) report = configuration_analyzer.export_report(user_store) return Response(content=report, media_type="text/plain") @router.post("/init-sample-data") def init_sample_data_route(request: Request, user = Depends(require_auth)): """Initialize sample data for the current user""" user_store = get_user_store(user) try: result = init_sample_data(user_store, user['username']) return result except Exception as e: return {'ok': False, 'error': str(e)} # ── PROJECTS ── class CreateProjectRequest(BaseModel): name: str description: str = '' class UpdateProjectRequest(BaseModel): name: str = None description: str = None class ShareProjectRequest(BaseModel): username: str role: str = 'viewer' @router.post("/projects") def create_project(request: Request, payload: CreateProjectRequest, user = Depends(require_auth)): """Create new project in user's workspace""" user_store = get_user_store(user) return project_manager.create_project(user_store, payload.name, payload.description) @router.get("/projects") def list_projects(request: Request, user = Depends(require_auth)): """List all projects in user's workspace""" user_store = get_user_store(user) projects = project_manager.list_projects(user_store) return {'ok': True, 'projects': projects} @router.get("/projects/{project_id}") def get_project(request: Request, project_id: str, user = Depends(require_auth)): """Get specific project""" user_store = get_user_store(user) project = project_manager.get_project(user_store, project_id) if not project: return {'ok': False, 'error': 'Project not found'} return {'ok': True, 'project': project} @router.put("/projects/{project_id}") def update_project(request: Request, project_id: str, payload: UpdateProjectRequest, user = Depends(require_auth)): """Update project details""" user_store = get_user_store(user) updates = {k: v for k, v in payload.model_dump().items() if v is not None} return project_manager.update_project(user_store, project_id, updates) @router.delete("/projects/{project_id}") def delete_project(request: Request, project_id: str, user = Depends(require_auth)): """Delete project""" user_store = get_user_store(user) return project_manager.delete_project(user_store, project_id) @router.post("/projects/{project_id}/databases/{database_name}") def add_database_to_project(request: Request, project_id: str, database_name: str, user = Depends(require_auth)): """Link database to project""" user_store = get_user_store(user) return project_manager.add_database_to_project(user_store, project_id, database_name) @router.delete("/projects/{project_id}/databases/{database_name}") def remove_database_from_project(request: Request, project_id: str, database_name: str, user = Depends(require_auth)): """Unlink database from project""" user_store = get_user_store(user) return project_manager.remove_database_from_project(user_store, project_id, database_name) @router.post("/projects/{project_id}/share") def share_project(request: Request, project_id: str, payload: ShareProjectRequest, user = Depends(require_auth)): """Share project with another user""" user_store = get_user_store(user) return project_manager.share_project(user_store, project_id, payload.username, payload.role) @router.delete("/projects/{project_id}/share/{username}") def unshare_project(request: Request, project_id: str, username: str, user = Depends(require_auth)): """Remove user access from project""" user_store = get_user_store(user) return project_manager.unshare_project(user_store, project_id, username) # ── VISUAL QUERY BUILDER ── @router.post("/query-builder/build") def build_visual_query(request: Request, payload: dict, user = Depends(require_auth)): """Build SQL query from visual configuration""" return visual_query_builder.build_query(payload) @router.get("/query-builder/operators") def get_query_operators(request: Request, user = Depends(require_auth)): """Get available filter operators""" return {'ok': True, 'operators': visual_query_builder.get_operators()} @router.get("/query-builder/aggregations") def get_query_aggregations(request: Request, user = Depends(require_auth)): """Get available aggregation functions""" return {'ok': True, 'aggregations': visual_query_builder.get_aggregations()} @router.get("/query-builder/join-types") def get_join_types(request: Request, user = Depends(require_auth)): """Get available join types""" return {'ok': True, 'join_types': visual_query_builder.get_join_types()} # ── DATA VISUALIZATION ── @router.post("/visualize/create-chart") def create_chart(request: Request, payload: dict, user = Depends(require_auth)): """Create chart from data""" data = payload.get('data', []) chart_type = payload.get('chart_type', 'bar') config = payload.get('config', {}) return data_visualization_engine.create_chart(data, chart_type, config) @router.get("/visualize/chart-types") def get_chart_types(request: Request, user = Depends(require_auth)): """Get available chart types""" return {'ok': True, 'chart_types': data_visualization_engine.get_chart_types()} @router.post("/visualize/analyze-data") def analyze_data_for_chart(request: Request, payload: dict, user = Depends(require_auth)): """Analyze data and suggest chart types""" data = payload.get('data', []) return data_visualization_engine.analyze_data_for_chart(data) # ── API DOCUMENTATION ── @router.get("/docs/database/{database}") def get_database_api_docs(request: Request, database: str, user = Depends(require_auth)): """Generate API documentation for database""" user_store = get_user_store(user) tables = table_manager.list(user_store) return api_doc_generator.generate_docs(database, tables, user_store) @router.get("/docs/sdk-examples/{database}/{table}") def get_sdk_examples(request: Request, database: str, table: str, user = Depends(require_auth)): """Get SDK examples for table""" examples = api_doc_generator.generate_sdk_examples(database, table) return {'ok': True, 'examples': examples} # ── DATABASE TEMPLATES ── @router.get("/templates/list") def list_database_templates(request: Request, user = Depends(require_auth)): """List available database templates""" templates = database_template_manager.list_templates() return {'ok': True, 'templates': templates} @router.get("/templates/{template_id}") def get_database_template(request: Request, template_id: str, user = Depends(require_auth)): """Get template details""" return database_template_manager.get_template(template_id) @router.post("/templates/{template_id}/create") def create_database_from_template(request: Request, template_id: str, payload: dict, user = Depends(require_auth)): """Create database from template""" database_name = payload.get('database_name') if not database_name: return {'ok': False, 'error': 'database_name required'} user_store = get_user_store(user) return database_template_manager.create_from_template( template_id, database_name, user_store, user['username'], user['workspace_id'] ) @router.post("/databases/clone") def clone_database(request: Request, payload: dict, user = Depends(require_auth)): """Clone existing database""" source_db = payload.get('source_database') target_db = payload.get('target_database') if not source_db or not target_db: return {'ok': False, 'error': 'source_database and target_database required'} user_store = get_user_store(user) return database_template_manager.clone_database( source_db, target_db, user_store, user['username'], user['workspace_id'] ) # ── PERFORMANCE MONITORING ── @router.get("/performance/stats") def get_performance_stats(request: Request, hours: int = 24, user = Depends(require_auth)): """Get query performance statistics""" user_store = get_user_store(user) return performance_monitor.get_query_stats(user_store, hours) @router.get("/performance/slow-queries") def get_slow_queries(request: Request, threshold: float = 1.0, limit: int = 50, user = Depends(require_auth)): """Get slow queries""" user_store = get_user_store(user) return performance_monitor.get_slow_queries(user_store, threshold, limit) @router.get("/performance/table/{database}/{table}") def analyze_table_perf(request: Request, database: str, table: str, user = Depends(require_auth)): """Analyze performance for specific table""" user_store = get_user_store(user) return performance_monitor.analyze_table_performance(user_store, database, table) @router.get("/performance/index-recommendations/{database}/{table}") def get_index_recommendations(request: Request, database: str, table: str, user = Depends(require_auth)): """Get index recommendations for table""" user_store = get_user_store(user) return performance_monitor.get_index_recommendations(user_store, database, table) @router.post("/performance/clear-logs") def clear_performance_logs(request: Request, payload: dict, user = Depends(require_auth)): """Clear old performance logs""" days = payload.get('days', 30) user_store = get_user_store(user) return performance_monitor.clear_old_logs(user_store, days) # ── AI ASSISTANT ── @router.post("/ai/chat") def ai_chat(request: Request, payload: dict, user = Depends(require_auth)): """Chat with AI assistant""" message = payload.get('message') if not message: return {'ok': False, 'error': 'message required'} user_store = get_user_store(user) response = ai_assistant.process_message(message, user_store, user['username']) return {'ok': True, 'response': response} @router.post("/ai/execute-query") def ai_execute_query(request: Request, payload: dict, user = Depends(require_auth)): """Execute AI-generated query""" sql = payload.get('sql') if not sql: return {'ok': False, 'error': 'sql required'} user_store = get_user_store(user) # Log query performance import time start_time = time.time() result = ai_assistant.execute_query(sql, user_store) execution_time = time.time() - start_time # Log performance if result.get('ok'): database = payload.get('database', 'unknown') performance_monitor.log_query(user_store, sql, execution_time, database, user['username']) return result @router.get("/settings/ai") def get_ai_settings(request: Request, user = Depends(require_auth)): """Get AI assistant settings""" import os return { 'ok': True, 'provider': os.getenv('AI_PROVIDER', 'rule_based'), 'model': os.getenv('AI_MODEL', ''), 'api_key': '••••••••••••' if os.getenv('AI_API_KEY') else '' } @router.put("/settings/ai") def update_ai_settings(request: Request, payload: dict, user = Depends(require_auth)): """Update AI assistant settings""" import os provider = payload.get('provider', 'rule_based') model = payload.get('model', '') api_key = payload.get('api_key', '') # Validate provider valid_providers = ['rule_based', 'groq', 'openai', 'anthropic'] if provider not in valid_providers: return {'ok': False, 'error': f'Invalid provider. Must be one of: {valid_providers}'} # Update environment variables (for current session) os.environ['AI_PROVIDER'] = provider if model: os.environ['AI_MODEL'] = model if api_key and not api_key.startswith('••'): os.environ['AI_API_KEY'] = api_key # Reinitialize AI assistant with new settings ai_assistant.__init__() return { 'ok': True, 'message': 'AI settings updated. Note: Settings are session-only. Set environment variables for persistence.' } # ── WORKSPACE SHARING ── @router.post("/workspace/invite/create") def create_workspace_invite(request: Request, payload: dict, user = Depends(require_auth)): """Create invite link for workspace sharing""" role = payload.get('role', 'viewer') expires_hours = payload.get('expires_hours') max_uses = payload.get('max_uses') return workspace_share_manager.create_invite_link( workspace_id=user['workspace_id'], created_by=user['username'], role=role, expires_hours=expires_hours, max_uses=max_uses ) @router.post("/workspace/invite/verify") def verify_workspace_invite(request: Request, payload: dict): """Verify invite code and secret (public endpoint)""" invite_code = payload.get('invite_code') secret = payload.get('secret') if not invite_code or not secret: return JSONResponse( status_code=400, content={'ok': False, 'error': 'invite_code and secret required'} ) result = workspace_share_manager.verify_invite(invite_code, secret) return JSONResponse(content=result) @router.post("/workspace/invite/accept") def accept_workspace_invite(request: Request, payload: dict, user = Depends(require_auth)): """Accept invite and join workspace""" invite_code = payload.get('invite_code') secret = payload.get('secret') if not invite_code or not secret: return {'ok': False, 'error': 'invite_code and secret required'} return workspace_share_manager.accept_invite(invite_code, secret, user['username']) @router.get("/workspace/members") def list_workspace_members(request: Request, user = Depends(require_auth)): """List all members of current workspace""" members = workspace_share_manager.list_workspace_members(user['workspace_id']) return {'ok': True, 'members': members} @router.get("/workspace/invites") def list_workspace_invites(request: Request, user = Depends(require_auth)): """List all invite links for current workspace""" invites = workspace_share_manager.list_workspace_invites(user['workspace_id']) return {'ok': True, 'invites': invites} @router.post("/workspace/invite/revoke") def revoke_workspace_invite(request: Request, payload: dict, user = Depends(require_auth)): """Revoke an invite link""" invite_code = payload.get('invite_code') if not invite_code: return {'ok': False, 'error': 'invite_code required'} return workspace_share_manager.revoke_invite(invite_code, user['workspace_id']) @router.post("/workspace/member/remove") def remove_workspace_member(request: Request, payload: dict, user = Depends(require_auth)): """Remove a member from workspace""" username = payload.get('username') if not username: return {'ok': False, 'error': 'username required'} return workspace_share_manager.remove_member(user['workspace_id'], username) @router.post("/workspace/member/role") def update_member_role(request: Request, payload: dict, user = Depends(require_auth)): """Update member's role""" username = payload.get('username') role = payload.get('role') if not username or not role: return {'ok': False, 'error': 'username and role required'} return workspace_share_manager.update_member_role(user['workspace_id'], username, role) @router.get("/workspace/shared") def list_shared_workspaces(request: Request, user = Depends(require_auth)): """List all workspaces user has access to (shared with them)""" workspaces = workspace_share_manager.get_user_workspaces(user['username']) return {'ok': True, 'workspaces': workspaces} @router.get("/workspace/accessible") def list_accessible_workspaces(request: Request, user = Depends(require_auth)): """List all workspaces user can access (own + shared)""" from app.auth import get_accessible_workspaces workspaces = get_accessible_workspaces(user) return {'ok': True, 'workspaces': workspaces} @router.get("/workspace/debug/members") def debug_all_members(request: Request, user = Depends(require_admin)): """Debug: List ALL workspace members in the database""" import duckdb from pathlib import Path from app.config import settings db_path = Path(settings.data_root) / "system" / "workspace_shares.duckdb" conn = duckdb.connect(str(db_path)) result = conn.execute("SELECT * FROM workspace_members ORDER BY added_at DESC").fetchdf() conn.close() return { 'ok': True, 'total_members': len(result), 'members': result.to_dict('records'), 'current_user': user['username'], 'current_workspace': user['workspace_id'] } @router.get("/workspace/debug/invites") def debug_all_invites(request: Request, user = Depends(require_admin)): """Debug: List ALL invite links in the database""" import duckdb from pathlib import Path from app.config import settings db_path = Path(settings.data_root) / "system" / "workspace_shares.duckdb" conn = duckdb.connect(str(db_path)) result = conn.execute("SELECT * FROM invite_links ORDER BY created_at DESC").fetchdf() conn.close() return { 'ok': True, 'total_invites': len(result), 'invites': result.to_dict('records') } @router.get("/workspace/{workspace_id}/databases") def get_workspace_databases(request: Request, workspace_id: str, user = Depends(require_auth)): """Get databases from a specific workspace (own or shared)""" from app.auth import get_user_store_for_workspace user_store = get_user_store_for_workspace(user, workspace_id) return metadata.list_databases(user_store) @router.get("/workspace/{workspace_id}/tables") def get_workspace_tables(request: Request, workspace_id: str, user = Depends(require_auth)): """Get tables from a specific workspace (own or shared)""" from app.auth import get_user_store_for_workspace user_store = get_user_store_for_workspace(user, workspace_id) return table_manager.list(user_store) # ── DATABASES ── @router.get("/databases") def get_databases(request: Request, user = Depends(require_auth)): user_store = get_user_store(user) # List databases from user's workspace return metadata.list_databases(user_store) @router.post("/databases") def create_database(request: Request, payload: CreateDatabaseRequest, user = Depends(require_auth)): from app.input_validator import input_validator from app.rate_limiter import rate_limiter # Rate limiting rate_limiter.check_rate_limit(request, user, 'authenticated') # Validate database name is_valid, error = input_validator.validate_database_name(payload.name) if not is_valid: return {'ok': False, 'error': error} user_store = get_user_store(user) result = metadata.create_database(payload.name, user_store) if result.get('ok'): key_pair = db_api_key_manager.create_db_api_key(user_store, payload.name, user['workspace_id']) result['api_key'] = key_pair['api_key'] result['secret_key'] = key_pair['secret_key'] result['message'] = 'IMPORTANT: Save your secret key now. It will not be shown again!' return result @router.delete("/databases/{name}") def delete_database(request: Request, name: str, user = Depends(require_auth)): user_store = get_user_store(user) result = metadata.delete_database(name, user_store) return result @router.post("/databases/cleanup") def cleanup_databases(request: Request, user = Depends(require_auth)): """Force cleanup - remove orphaned data and DuckDB schemas""" user_store = get_user_store(user) # Get current databases dbs = metadata.list_databases(user_store) db_names = [db['name'] if isinstance(db, dict) else db for db in dbs] # Get all tables tables = table_manager.list(user_store) # Find orphaned tables (tables referencing non-existent databases) orphaned = [t for t in tables if t['database'] not in db_names] # Remove orphaned tables for t in orphaned: try: table_manager.drop(t['database'], t['table'], user_store) except: pass # Drop all DuckDB schemas not in database list try: from app.query_engine import create_query_engine engine = create_query_engine(user_store) # Get all schemas schemas = engine.conn.execute("SELECT schema_name FROM information_schema.schemata").fetchall() for (schema_name,) in schemas: if schema_name not in ('main', 'temp', 'information_schema', 'pg_catalog'): if schema_name not in db_names: engine.conn.execute(f'DROP SCHEMA IF EXISTS "{schema_name}" CASCADE') engine.close() except Exception as e: print(f"Warning: Schema cleanup error: {e}") return { 'ok': True, 'databases': db_names, 'orphaned_tables_removed': len(orphaned), 'message': f'Cleanup complete. {len(orphaned)} orphaned tables removed.' } # ── TABLES ── @router.get("/tables") def get_tables(request: Request, user = Depends(require_auth)): user_store = get_user_store(user) return table_manager.list(user_store) @router.post("/tables") def create_table(request: Request, payload: CreateTableRequest, user = Depends(require_auth)): user_store = get_user_store(user) return table_manager.create(payload.database, payload.table, payload.columns, user_store) @router.get("/schema/{database}/{table}") def get_schema(request: Request, database: str, table: str, user = Depends(require_auth)): user_store = get_user_store(user) return schema_manager.get(database, table, user_store) # ── ROWS ── @router.get("/rows/{database}/{table}") def get_rows(request: Request, database: str, table: str, limit: int = 100, offset: int = 0, user = Depends(require_auth)): """Get rows with pagination support""" user_store = get_user_store(user) return row_manager.read(database, table, user_store, limit=limit, offset=offset) @router.post("/rows/{database}/{table}") def insert_row(request: Request, database: str, table: str, payload: dict, user = Depends(require_auth)): user_store = get_user_store(user) try: result = row_manager.insert(database, table, payload, user['username'], user_store, user['workspace_id']) if result.get('ok'): try: engine = create_query_engine(user_store) try: _persist_all_tables(engine, user_store) finally: engine.close() except Exception: pass return result except Exception as e: import traceback traceback.print_exc() return JSONResponse(status_code=500, content={'ok': False, 'error': str(e)}) @router.put("/rows/{database}/{table}/{row_id_field}/{row_id_value}") def update_row(request: Request, database: str, table: str, row_id_field: str, row_id_value: str, payload: dict, user = Depends(require_auth)): user_store = get_user_store(user) result = row_manager.update(database, table, row_id_field, row_id_value, payload, user['username'], user_store, user['workspace_id']) if result.get('ok'): try: engine = create_query_engine(user_store) try: _persist_all_tables(engine, user_store) finally: engine.close() except Exception: pass return result @router.delete("/rows/{database}/{table}/{row_id_field}/{row_id_value}") def delete_row(request: Request, database: str, table: str, row_id_field: str, row_id_value: str, user = Depends(require_auth)): user_store = get_user_store(user) result = row_manager.delete(database, table, row_id_field, row_id_value, user['username'], user_store, user['workspace_id']) if result.get('ok'): try: engine = create_query_engine(user_store) try: _persist_all_tables(engine, user_store) finally: engine.close() except Exception: pass return result # ── TABLE MANAGEMENT ── @router.post("/tables/alter") def alter_table(request: Request, payload: dict, user = Depends(require_auth)): """Alter table structure — add/drop/rename columns, change types""" database = payload.get('database') table = payload.get('table') operations = payload.get('operations', []) if not database or not table or not operations: return {'ok': False, 'error': 'database, table, and operations required'} user_store = get_user_store(user) return table_manager.alter_table(database, table, operations, user_store) @router.post("/tables/rename") def rename_table(request: Request, payload: dict, user = Depends(require_auth)): """Rename a table""" database = payload.get('database') old_name = payload.get('old_name') new_name = payload.get('new_name') if not database or not old_name or not new_name: return {'ok': False, 'error': 'database, old_name, and new_name required'} user_store = get_user_store(user) return table_manager.rename(database, old_name, new_name, user_store) @router.post("/tables/copy") def copy_table(request: Request, payload: dict, user = Depends(require_auth)): """Duplicate a table with all data""" database = payload.get('database') source = payload.get('source') destination = payload.get('destination') if not database or not source or not destination: return {'ok': False, 'error': 'database, source, and destination required'} user_store = get_user_store(user) return table_manager.copy(database, source, destination, user_store) @router.post("/tables/truncate") def truncate_table(request: Request, payload: dict, user = Depends(require_auth)): """Delete all rows but keep table structure""" database = payload.get('database') table = payload.get('table') if not database or not table: return {'ok': False, 'error': 'database and table required'} user_store = get_user_store(user) return table_manager.truncate(database, table, user_store) @router.get("/tables/stats/{database}/{table}") def get_table_stats(request: Request, database: str, table: str, user = Depends(require_auth)): """Get table statistics — row count, file size, column info""" user_store = get_user_store(user) return table_manager.get_stats(database, table, user_store) # ── SQL QUERY ENGINE ── @router.post("/sql") def execute_sql(request: Request, payload: SQLQueryRequest, user = Depends(require_auth)): """Execute raw SQL queries using DuckDB with validation and persistence""" from app.query_parser import query_parser from app.table_manager import table_manager from app.utils import write_json sql = payload.sql.strip() if not sql: return {'ok': False, 'error': 'Empty query'} # Convert MySQL syntax to DuckDB-compatible SQL from app.mysql_to_duckdb_converter import mysql_to_duckdb_converter if re.match(r'CREATE\s+TABLE', sql, re.IGNORECASE): converted, _ = mysql_to_duckdb_converter.convert_statement(sql) if converted: sql = converted # Validate query with write operations allowed is_valid, error = query_parser.validate_query(sql, allow_write=True) if not is_valid: return {'ok': False, 'error': f"Query validation failed: {error}"} # Check query complexity is_allowed, reason, score = query_complexity_analyzer.analyze_complexity(sql) if not is_allowed: return {'ok': False, 'error': f"Query complexity check failed: {reason}"} user_store = get_user_store(user) engine = create_query_engine(user_store) try: # Handle SHOW TABLES if re.match(r'SHOW\s+TABLES', sql, re.IGNORECASE): tables = metadata.list_tables(user_store) return { 'ok': True, 'data': [{'Table': f"{t['database']}.{t['table']}"} for t in tables], 'columns': ['Table'], 'rows': len(tables) } # Handle DESCRIBE / DESC / SHOW COLUMNS desc_match = re.match(r'(?:DESCRIBE|DESC|SHOW\s+COLUMNS\s+(?:FROM|IN))\s+["`]?(\w+(?:\.\w+)?)["`]?', sql, re.IGNORECASE) if desc_match: table_ref = desc_match.group(1) if '.' in table_ref: db, tbl = table_ref.split('.', 1) else: all_tables = metadata.list_tables(user_store) matches = [t for t in all_tables if t['table'].lower() == table_ref.lower()] if matches: db, tbl = matches[0]['database'], matches[0]['table'] else: return {'ok': False, 'error': f'Table {table_ref} not found'} engine._register_table(db, tbl) try: safe_db = query_parser.sanitize_identifier(db) safe_tbl = query_parser.sanitize_identifier(tbl) cols = engine.conn.execute(f""" SELECT column_name as Field, data_type as Type, 'YES' as "Null", '' as "Key", NULL as "Default", '' as "Extra" FROM information_schema.columns WHERE table_schema = '{safe_db}' AND table_name = '{safe_tbl}' ORDER BY ordinal_position """).fetchdf() return { 'ok': True, 'data': cols.to_dict('records'), 'columns': list(cols.columns), 'rows': len(cols) } except Exception as e: return {'ok': False, 'error': str(e)} # Handle multi-statement SQL statements = query_parser.split_sql_statements(sql) is_multi = len(statements) > 1 if is_multi: results = [] for stmt in statements: stmt = stmt.strip().rstrip(';').strip() if not stmt: continue stmt_valid, stmt_err = query_parser.validate_query(stmt, allow_write=True) if not stmt_valid: results.append({'ok': False, 'error': stmt_err, 'sql': stmt[:100]}) continue stmt_result = engine.execute_sql(stmt, allow_write=True) results.append(stmt_result) # After multi-statement execution, persist all affected tables _persist_all_tables(engine, user_store) audit.log("sql_query_multi", actor=user['username'], target="sql_engine", after={"statements": len(results)}) return {'ok': True, 'results': results, 'total': len(results)} # Single statement execution drop_match = re.match(r'DROP\s+TABLE\s+(?:IF\s+EXISTS\s+)?"?(\w+)"?\."?(\w+)"?', sql, re.IGNORECASE) create_match = re.match(r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?"?(\w+)"?\."?(\w+)"?', sql, re.IGNORECASE) write_match = re.match(r'(INSERT|UPDATE|DELETE|ALTER)\b', sql, re.IGNORECASE) result = engine.execute_sql(sql) # If DROP TABLE succeeded, clean up metadata and files if drop_match and result.get('ok'): database = drop_match.group(1) table = drop_match.group(2) try: existing_tables = table_manager.list(user_store) updated_tables = [t for t in existing_tables if not (t['database'] == database and t['table'] == table)] if len(updated_tables) < len(existing_tables): tables_path = table_manager.tables_path(user_store) write_json(tables_path, updated_tables) user_store.upload_file(tables_path, "metadata/tables.json", f"Dropped table {database}.{table}") parquet_path = user_store.local("data", database, f"{table}.parquet") if parquet_path.exists(): parquet_path.unlink() try: user_store.delete_file(f"data/{database}/{table}.parquet", f"Drop {database}.{table}") except Exception: pass schema_path = user_store.local("metadata", "schemas", f"{database}.{table}.json") if schema_path.exists(): schema_path.unlink() engine._registered_tables.discard(f"{database}.{table}") except Exception as e: print(f"Warning: Failed to clean up dropped table: {e}") # If CREATE TABLE succeeded, register and persist if create_match and result.get('ok'): database = create_match.group(1) table = create_match.group(2) try: engine._register_table(database, table) _persist_table(engine, user_store, database, table) except Exception as e: print(f"Warning: Failed to persist created table: {e}") # If INSERT/UPDATE/DELETE succeeded, persist affected tables if write_match and result.get('ok'): affected = query_parser.extract_tables(sql) for table_ref in affected: if '.' in table_ref: db, tbl = table_ref.split('.', 1) try: _persist_table(engine, user_store, db, tbl) except Exception as e: print(f"Warning: Failed to persist {db}.{tbl}: {e}") audit.log("sql_query", actor=user['username'], target="sql_engine", after={"sql": sql[:100], "score": score}) return result finally: engine.close() def _persist_table(engine, user_store, database, table): """Export a table to Parquet and register in metadata""" from app.table_manager import table_manager from app.utils import write_json from app.query_parser import query_parser safe_db = query_parser.sanitize_identifier(database) safe_tbl = query_parser.sanitize_identifier(table) # Export to Parquet parquet_path = user_store.local("data", safe_db, f"{safe_tbl}.parquet") parquet_path.parent.mkdir(parents=True, exist_ok=True) safe_path = str(parquet_path).replace("'", "''") engine.conn.execute(f""" COPY "{safe_db}"."{safe_tbl}" TO '{safe_path}' (FORMAT PARQUET) """) # Upload parquet file to HF try: user_store.upload_file(parquet_path, f"data/{safe_db}/{safe_tbl}.parquet", f"Persist {safe_db}.{safe_tbl}") except Exception: pass # Update metadata existing_tables = table_manager.list(user_store) if not any(t['database'] == safe_db and t['table'] == safe_tbl for t in existing_tables): try: cols = engine.conn.execute(f""" SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = '{safe_db}' AND table_name = '{safe_tbl}' ORDER BY ordinal_position """).fetchall() existing_tables.append({ "database": safe_db, "table": safe_tbl, "columns": [{"name": c[0], "type": c[1]} for c in cols] }) except: existing_tables.append({"database": safe_db, "table": safe_tbl, "columns": []}) tables_path = table_manager.tables_path(user_store) write_json(tables_path, existing_tables) user_store.upload_file(tables_path, "metadata/tables.json", f"SQL: registered {safe_db}.{safe_tbl}") def _persist_all_tables(engine, user_store): """Persist all user tables to Parquet and update metadata""" from app.table_manager import table_manager from app.utils import write_json from app.query_parser import query_parser try: all_tables = engine.conn.execute(""" SELECT table_schema, table_name FROM information_schema.tables WHERE table_schema NOT IN ('information_schema', 'pg_catalog', 'system') """).fetchall() existing_tables = table_manager.list(user_store) for db, tbl in all_tables: try: safe_db = query_parser.sanitize_identifier(db) safe_tbl = query_parser.sanitize_identifier(tbl) parquet_path = user_store.local("data", safe_db, f"{safe_tbl}.parquet") parquet_path.parent.mkdir(parents=True, exist_ok=True) safe_path = str(parquet_path).replace("'", "''") engine.conn.execute(f""" COPY "{safe_db}"."{safe_tbl}" TO '{safe_path}' (FORMAT PARQUET) """) # Upload parquet file to HF try: user_store.upload_file(parquet_path, f"data/{safe_db}/{safe_tbl}.parquet", f"Persist {safe_db}.{safe_tbl}") except Exception: pass if not any(t['database'] == safe_db and t['table'] == safe_tbl for t in existing_tables): try: cols = engine.conn.execute(f""" SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = '{safe_db}' AND table_name = '{safe_tbl}' ORDER BY ordinal_position """).fetchall() existing_tables.append({ "database": safe_db, "table": safe_tbl, "columns": [{"name": c[0], "type": c[1]} for c in cols] }) except: existing_tables.append({"database": safe_db, "table": safe_tbl, "columns": []}) except Exception: pass tables_path = table_manager.tables_path(user_store) write_json(tables_path, existing_tables) user_store.upload_file(tables_path, "metadata/tables.json", "SQL: persisted all tables") except Exception: pass @router.post("/sql/explain") def explain_sql(request: Request, payload: SQLQueryRequest, user = Depends(require_auth)): """Get query execution plan (EXPLAIN)""" user_store = get_user_store(user) engine = create_query_engine(user_store) try: try: result = engine.conn.execute(f"EXPLAIN {payload.sql}").fetchdf() return {'ok': True, 'plan': result.to_dict('records'), 'columns': list(result.columns)} except Exception as e: return {'ok': False, 'error': str(e)} finally: engine.close() # ── DATABASE API KEY MANAGEMENT ── @router.get("/databases/{database}/api-key") def get_database_api_key(request: Request, database: str, user = Depends(require_auth)): """Get API key for a specific database""" user_store = get_user_store(user) api_key = db_api_key_manager.get_db_api_key(user_store, database) if not api_key: key_pair = db_api_key_manager.create_db_api_key(user_store, database, user['workspace_id']) return { 'ok': True, 'database': database, 'api_key': key_pair['api_key'], 'secret_key': key_pair['secret_key'], 'new': True } return { 'ok': True, 'database': database, 'api_key': api_key } @router.post("/databases/{database}/regenerate-api-key") def regenerate_database_api_key(request: Request, database: str, user = Depends(require_auth)): """Regenerate API key + secret for a database""" user_store = get_user_store(user) key_pair = db_api_key_manager.regenerate_db_api_key(user_store, user['workspace_id'], database) return { 'ok': True, 'database': database, 'api_key': key_pair['api_key'], 'secret_key': key_pair['secret_key'], 'message': 'IMPORTANT: Save your new secret key now. It will not be shown again!' } @router.get("/databases/{database}/connection") def get_database_connection(request: Request, database: str, user = Depends(require_auth)): """Get connection information for a database""" user_store = get_user_store(user) api_key = db_api_key_manager.get_db_api_key(user_store, database) if not api_key: return {'ok': False, 'error': 'No API key found for this database'} # Return only the API key, not the full URL return { 'ok': True, 'database': database, 'api_key': api_key, 'usage': { 'description': 'Use this API key with CorpusDB SDK or HTTP client', 'sdk_example': f'corpusdb.connect(api_key="{api_key}", database="{database}")', 'http_example': 'Use /api/cdb/* endpoints with X-Api-Key and X-Secret-Key headers' } } # ── UTILITIES ── @router.get("/stats") def get_stats(request: Request, user = Depends(require_auth)): user_store = get_user_store(user) tables = table_manager.list(user_store) dbs_raw = metadata.list_databases(user_store) db_list = dbs_raw if isinstance(dbs_raw, list) else dbs_raw.get('databases', []) total_rows = 0 db_stats = [] for db in db_list: db_name = db['name'] if isinstance(db, dict) else db db_tables = [t for t in tables if t['database'] == db_name] db_rows = 0 for t in db_tables[:10]: try: engine = create_query_engine(user_store) stats = engine.get_table_stats(db_name, t['table']) if stats['ok'] and stats['data']: db_rows += stats['data'][0].get('row_count', 0) except Exception: pass total_rows += db_rows db_stats.append({ 'name': db_name, 'tables': len(db_tables), 'rows': db_rows, 'created': db.get('created_at', '') if isinstance(db, dict) else '', }) # Active sessions for this user user_manager._init() conn = user_manager._get_conn() from datetime import datetime as _dt now = _dt.utcnow().isoformat() session_count = conn.execute( "SELECT COUNT(*) FROM sessions WHERE username = ? AND expires_at > ?", [user['username'], now] ).fetchone()[0] conn.close() # Disk usage of workspace import os workspace_dir = user_store.local('.') workspace_size = sum( f.stat().st_size for f in workspace_dir.rglob('*') if f.is_file() ) if workspace_dir.exists() else 0 return { "databases": len(db_list), "tables": len(tables), "rows": total_rows, "queries": 0, "workspace_id": user['workspace_id'], "active_sessions": session_count, "storage_bytes": workspace_size, "storage_mb": round(workspace_size / 1024 / 1024, 2), "db_stats": db_stats, } @router.get("/audit") def get_audit(request: Request, user = Depends(require_auth)): return audit.list() @router.post("/backup") def create_backup(request: Request, user = Depends(require_auth)): user_store = get_user_store(user) result = backup_manager.create_snapshot(f"{user['username']}_backup", user_store) severity = 'INFO' if result.get('ok') else 'ERROR' security_logger.log_event( 'DATA_EXPORT', username=user['username'], ip_address=request.client.host if request.client else 'unknown', details={'type': 'backup', 'name': f"{user['username']}_backup", 'status': 'success' if result.get('ok') else 'failed'}, severity=severity ) return result @router.post("/import/preview") async def import_preview(request: Request, file: UploadFile = File(...), user = Depends(require_auth)): with tempfile.NamedTemporaryFile(delete=False) as tmp: tmp.write(await file.read()) tmp_path = Path(tmp.name) return import_export.preview_csv(tmp_path) @router.post("/import/commit") async def import_commit(request: Request, database: str = Form(...), table: str = Form(...), file: UploadFile = File(...), user = Depends(require_auth)): user_store = get_user_store(user) with tempfile.NamedTemporaryFile(delete=False) as tmp: tmp.write(await file.read()) tmp_path = Path(tmp.name) return import_export.commit_csv(database, table, tmp_path, user_store) @router.post("/ai/query") def ai_query(request: Request, payload: dict, user = Depends(require_auth)): user_store = get_user_store(user) question = payload.get("question", "") # Enhanced AI with SQL execution if "table" in question.lower(): tables = table_manager.list(user_store) table_names = [f"{t['database']}.{t['table']}" for t in tables[:10]] return { "response": f"Your tables: {', '.join(table_names)}", "sql": None } elif "select" in question.lower() or "query" in question.lower(): # Try to extract and execute SQL sql = question.strip() if sql.upper().startswith('SELECT'): engine = create_query_engine(user_store) result = engine.execute_sql(sql) engine.close() if result['ok']: return { "response": f"Query executed successfully. Found {result['rows']} rows.", "sql": sql, "data": result['data'][:10] # Return first 10 rows } else: return { "response": f"Query error: {result['error']}", "sql": sql } return { "response": "I can help with SQL queries, table info, and data analysis. Try asking about your tables or write a SELECT query!", "sql": None }