""" Unity Catalog Chatbot API Server Flask API to handle natural language requests and execute Unity Catalog operations """ from flask import Flask, request, jsonify, send_from_directory from flask_cors import CORS import os import re from typing import Dict, List, Optional import anthropic from unity_catalog_service import UnityCatalogService app = Flask(__name__, static_folder='.', static_url_path='') CORS(app) def _no_cache(response): """Disable caching for SPA assets to avoid stale UI (304s).""" response.headers["Cache-Control"] = "no-store" response.headers["Pragma"] = "no-cache" response.headers["Expires"] = "0" return response # Initialize services (lazy to allow mocking in tests) uc_service = None claude_client = None def _init_services(): """Lazy initialize services.""" global uc_service, claude_client if uc_service is None: uc_service = UnityCatalogService() if claude_client is None: claude_client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) return uc_service, claude_client def validate_databricks_connection(host: str, token: str, workspace_id: str = None) -> Dict: """Validate connection to Databricks workspace.""" try: from databricks.sdk import WorkspaceClient # Create client with provided credentials client = WorkspaceClient( host=host, token=token ) # Try to get workspace info workspace_info = client.workspace.get_status(path="/") return { "success": True, "message": "Successfully connected to Databricks workspace", "workspace_path": workspace_info.path } except Exception as e: return { "success": False, "message": f"Connection failed: {str(e)}" } # System prompt for Claude to parse Unity Catalog requests SYSTEM_PROMPT = """You are an expert Unity Catalog assistant. Your role is to: 1. Parse natural language requests about Databricks Unity Catalog operations 2. Extract the intent and parameters from user messages 3. Return structured JSON responses Available intents: - createCatalog: Create a new catalog - createSchema: Create a new schema - createTable: Create a new table - grantPermission: Grant permissions to users/groups - revokePermission: Revoke permissions from users/groups - listCatalogs: List all catalogs - listSchemas: List schemas in a catalog - listTables: List tables in a schema - showPermissions: Show permissions for an object - setOwner: Set the owner of an object - getTableDetails: Get detailed information about a table - help: Provide help information - complex: Multi-step operation requiring clarification For each request, analyze the user's intent and return a JSON object with: { "intent": "string", "params": { "catalog": "string (optional)", "schema": "string (optional)", "table": "string (optional)", "principal": "string (optional)", "privilege": "string (optional)", "comment": "string (optional)", "columns": [{"name": "string", "type_name": "string"}] (optional) }, "explanation": "Brief explanation of what will be done" } Examples: User: "Create a catalog called sales_data" Response: {"intent": "createCatalog", "params": {"catalog": "sales_data"}, "explanation": "Will create a new catalog named sales_data"} User: "Grant SELECT permission on sales.customers to data_analysts group" Response: {"intent": "grantPermission", "params": {"privilege": "SELECT", "object": "sales.customers", "principal": "data_analysts"}, "explanation": "Will grant SELECT privileges on sales.customers table to data_analysts group"} Always return valid JSON only, no additional text.""" def parse_with_claude(user_message: str) -> Dict: """Use Claude to parse complex natural language requests""" try: _, client = _init_services() # Lazy init message = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=1000, system=SYSTEM_PROMPT, messages=[{ "role": "user", "content": user_message }] ) # Extract JSON from response response_text = message.content[0].text # Try to find JSON in the response import json # Remove markdown code blocks if present response_text = re.sub(r'```json\s*|\s*```', '', response_text) parsed = json.loads(response_text.strip()) return parsed except Exception as e: print(f"Error parsing with Claude: {e}") return { "intent": "help", "params": {}, "explanation": "I couldn't understand that request. Please rephrase." } def execute_intent(intent_data: Dict) -> Dict: """Execute the parsed intent using Unity Catalog service""" uc, _ = _init_services() # Lazy init intent = intent_data.get("intent") params = intent_data.get("params", {}) try: if intent == "createCatalog": return uc.create_catalog( name=params.get("catalog"), comment=params.get("comment") ) elif intent == "createSchema": catalog, schema = params.get("catalog"), params.get("schema") # If full path provided (e.g., "catalog.schema") if not schema and catalog and '.' in catalog: catalog, schema = catalog.split('.', 1) return uc.create_schema( catalog=catalog, schema=schema, comment=params.get("comment") ) elif intent == "createTable": catalog = params.get("catalog") schema = params.get("schema") table = params.get("table") # Parse full table path if needed if table and '.' in table: parts = table.split('.') if len(parts) == 3: catalog, schema, table = parts elif len(parts) == 2 and catalog: schema, table = parts return uc.create_table( catalog=catalog, schema=schema, table=table, columns=params.get("columns"), comment=params.get("comment") ) elif intent == "grantPermission": # Determine securable type from object path obj = params.get("object", "") parts = obj.split('.') if len(parts) == 1: securable_type = "CATALOG" elif len(parts) == 2: securable_type = "SCHEMA" else: securable_type = "TABLE" return uc.grant_permission( principal=params.get("principal"), privilege=params.get("privilege"), securable_type=securable_type, securable_name=obj ) elif intent == "revokePermission": obj = params.get("object", "") parts = obj.split('.') if len(parts) == 1: securable_type = "CATALOG" elif len(parts) == 2: securable_type = "SCHEMA" else: securable_type = "TABLE" return uc.revoke_permission( principal=params.get("principal"), privilege=params.get("privilege"), securable_type=securable_type, securable_name=obj ) elif intent == "listCatalogs": return uc.list_catalogs() elif intent == "listSchemas": return uc.list_schemas(params.get("catalog")) elif intent == "listTables": return uc.list_tables( params.get("catalog"), params.get("schema") ) elif intent == "showPermissions": obj = params.get("object", "") parts = obj.split('.') if len(parts) == 1: securable_type = "CATALOG" elif len(parts) == 2: securable_type = "SCHEMA" else: securable_type = "TABLE" return uc.show_grants(securable_type, obj) elif intent == "setOwner": obj = params.get("object", "") parts = obj.split('.') if len(parts) == 1: securable_type = "CATALOG" elif len(parts) == 2: securable_type = "SCHEMA" else: securable_type = "TABLE" return uc.set_owner( securable_type=securable_type, securable_name=obj, owner=params.get("owner") ) elif intent == "getTableDetails": parts = params.get("table", "").split('.') if len(parts) == 3: return uc.get_table(parts[0], parts[1], parts[2]) else: return { "success": False, "message": "Invalid table path. Use format: catalog.schema.table" } elif intent == "help": return { "success": True, "message": """I can help you with Unity Catalog operations: **Creating Objects:** • Create a catalog: "Create a catalog named sales_catalog" • Create a schema: "Create schema analytics in sales_catalog" • Create a table: "Create table sales_catalog.analytics.customers" **Managing Permissions:** • Grant access: "Grant SELECT on sales_catalog.analytics to data_analysts" • Revoke access: "Revoke MODIFY on sales_catalog.analytics.customers from john_doe" • Show permissions: "Show permissions for sales_catalog.analytics" • Set owner: "Set owner of sales_catalog to admin_user" **Listing Objects:** • List catalogs: "List all catalogs" or "Show catalogs" • List schemas: "List schemas in sales_catalog" • List tables: "Show tables in sales_catalog.analytics" **Table Details:** • Get table info: "Show details for sales_catalog.analytics.customers" Just describe what you want to do in natural language!""", "sql": None } else: return { "success": False, "message": f"Unknown intent: {intent}. Type 'help' for available commands." } except Exception as e: return { "success": False, "message": f"Error executing operation: {str(e)}" } @app.route('/', methods=['GET']) def index(): """Serve the React UI""" return _no_cache(send_from_directory('.', 'index.html')) @app.route('/', methods=['GET']) def serve_static(path): """Serve static files""" if path and os.path.exists(path): return _no_cache(send_from_directory('.', path)) return _no_cache(send_from_directory('.', 'index.html')) @app.route('/api/chat', methods=['POST']) def chat(): """Main chat endpoint""" try: data = request.json user_message = data.get('message', '') if not user_message: return jsonify({ 'error': 'No message provided' }), 400 # Parse intent with Claude intent_data = parse_with_claude(user_message) # Execute the operation result = execute_intent(intent_data) # Add explanation to response result['explanation'] = intent_data.get('explanation', '') result['intent'] = intent_data.get('intent') return jsonify(result) except Exception as e: return jsonify({ 'success': False, 'message': f'Server error: {str(e)}' }), 500 @app.route('/api/health', methods=['GET']) def health(): """Health check endpoint""" return jsonify({ 'status': 'healthy', 'service': 'Unity Catalog Chatbot API' }) @app.route('/api/catalogs', methods=['GET']) def get_catalogs(): """Get all catalogs""" uc, _ = _init_services() result = uc.list_catalogs() return jsonify(result) @app.route('/api/schemas/', methods=['GET']) def get_schemas(catalog): """Get schemas in a catalog""" uc, _ = _init_services() result = uc.list_schemas(catalog) return jsonify(result) @app.route('/api/tables//', methods=['GET']) def get_tables(catalog, schema): """Get tables in a schema""" uc, _ = _init_services() result = uc.list_tables(catalog, schema) return jsonify(result) @app.route('/api/execute', methods=['POST']) def execute_sql(): """Execute a SQL statement""" try: data = request.json sql = data.get('sql', '') warehouse_id = data.get('warehouse_id') uc, _ = _init_services() result = uc.execute_sql(sql, warehouse_id) return jsonify(result) except Exception as e: return jsonify({ 'success': False, 'message': f'Error: {str(e)}' }), 500 @app.route('/api/validate-connection', methods=['POST']) def validate_connection(): """Validate Databricks connection with provided credentials""" try: data = request.json host = data.get('host', '').strip() token = data.get('token', '').strip() workspace_id = data.get('workspaceId', '').strip() if not host or not token: return jsonify({ 'success': False, 'message': 'Host and token are required' }), 400 # Ensure host starts with https:// if not host.startswith('https://'): if host.startswith('http://'): host = 'https://' + host[7:] else: host = 'https://' + host result = validate_databricks_connection(host, token, workspace_id) if result['success']: return jsonify(result), 200 else: return jsonify(result), 401 except Exception as e: return jsonify({ 'success': False, 'message': f'Validation error: {str(e)}' }), 500 if __name__ == '__main__': # Get port from environment variable (HF Spaces uses 7860) port = int(os.getenv('PORT', 7860)) host = os.getenv('HOST', '0.0.0.0') # Development server app.run( host=host, port=port, debug=os.getenv('FLASK_ENV') == 'development' )