""" Unity Catalog Chatbot Backend Service Handles authentication and execution of Unity Catalog operations """ from databricks.sdk import WorkspaceClient from databricks.sdk.service.catalog import * from typing import Dict, List, Optional, Any import os import re from datetime import datetime import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class UnityCatalogService: """Service for managing Unity Catalog operations through natural language""" def __init__(self, workspace_url: str = None, token: str = None): """ Initialize Databricks workspace client Args: workspace_url: Databricks workspace URL token: Personal access token """ self.workspace_url = workspace_url or os.getenv("DATABRICKS_HOST") self.token = token or os.getenv("DATABRICKS_TOKEN") self.client = WorkspaceClient( host=self.workspace_url, token=self.token ) # Cache for frequently accessed data self._catalog_cache = {} self._schema_cache = {} def parse_object_path(self, path: str) -> Dict[str, str]: """Parse a Unity Catalog object path into components""" parts = path.split('.') if len(parts) == 1: return {'catalog': parts[0]} elif len(parts) == 2: return {'catalog': parts[0], 'schema': parts[1]} elif len(parts) == 3: return {'catalog': parts[0], 'schema': parts[1], 'table': parts[2]} else: raise ValueError(f"Invalid object path: {path}") # ==================== CATALOG OPERATIONS ==================== def create_catalog(self, name: str, comment: str = None, properties: Dict = None) -> Dict: """Create a new catalog""" try: catalog = self.client.catalogs.create( name=name, comment=comment or f"Catalog created via chatbot on {datetime.now().isoformat()}", properties=properties or {} ) logger.info(f"Created catalog: {name}") return { 'success': True, 'message': f"Successfully created catalog '{name}'", 'catalog': { 'name': catalog.name, 'owner': catalog.owner, 'created_at': catalog.created_at }, 'sql': f"CREATE CATALOG IF NOT EXISTS {name}" } except Exception as e: logger.error(f"Error creating catalog {name}: {e}") return { 'success': False, 'message': f"Failed to create catalog: {str(e)}" } def list_catalogs(self) -> Dict: """List all available catalogs""" try: catalogs = list(self.client.catalogs.list()) return { 'success': True, 'message': f"Found {len(catalogs)} catalog(s)", 'catalogs': [ { 'name': cat.name, 'owner': cat.owner, 'comment': cat.comment } for cat in catalogs ], 'sql': "SHOW CATALOGS" } except Exception as e: logger.error(f"Error listing catalogs: {e}") return { 'success': False, 'message': f"Failed to list catalogs: {str(e)}" } def get_catalog(self, name: str) -> Dict: """Get catalog details""" try: catalog = self.client.catalogs.get(name) return { 'success': True, 'catalog': { 'name': catalog.name, 'owner': catalog.owner, 'comment': catalog.comment, 'created_at': catalog.created_at, 'updated_at': catalog.updated_at, 'properties': catalog.properties } } except Exception as e: return { 'success': False, 'message': f"Catalog not found: {str(e)}" } def delete_catalog(self, name: str, force: bool = False) -> Dict: """Delete a catalog""" try: self.client.catalogs.delete(name, force=force) return { 'success': True, 'message': f"Successfully deleted catalog '{name}'", 'sql': f"DROP CATALOG {name} {'CASCADE' if force else ''}" } except Exception as e: return { 'success': False, 'message': f"Failed to delete catalog: {str(e)}" } # ==================== SCHEMA OPERATIONS ==================== def create_schema(self, catalog: str, schema: str, comment: str = None) -> Dict: """Create a new schema""" try: full_name = f"{catalog}.{schema}" schema_obj = self.client.schemas.create( name=schema, catalog_name=catalog, comment=comment or f"Schema created via chatbot on {datetime.now().isoformat()}" ) logger.info(f"Created schema: {full_name}") return { 'success': True, 'message': f"Successfully created schema '{full_name}'", 'schema': { 'name': schema_obj.name, 'catalog': schema_obj.catalog_name, 'owner': schema_obj.owner }, 'sql': f"CREATE SCHEMA IF NOT EXISTS {full_name}" } except Exception as e: logger.error(f"Error creating schema {catalog}.{schema}: {e}") return { 'success': False, 'message': f"Failed to create schema: {str(e)}" } def list_schemas(self, catalog: str) -> Dict: """List all schemas in a catalog""" try: schemas = list(self.client.schemas.list(catalog_name=catalog)) return { 'success': True, 'message': f"Found {len(schemas)} schema(s) in catalog '{catalog}'", 'schemas': [ { 'name': sch.name, 'full_name': sch.full_name, 'owner': sch.owner, 'comment': sch.comment } for sch in schemas ], 'sql': f"SHOW SCHEMAS IN {catalog}" } except Exception as e: return { 'success': False, 'message': f"Failed to list schemas: {str(e)}" } def delete_schema(self, catalog: str, schema: str, force: bool = False) -> Dict: """Delete a schema""" try: full_name = f"{catalog}.{schema}" self.client.schemas.delete(full_name) return { 'success': True, 'message': f"Successfully deleted schema '{full_name}'", 'sql': f"DROP SCHEMA {full_name} {'CASCADE' if force else ''}" } except Exception as e: return { 'success': False, 'message': f"Failed to delete schema: {str(e)}" } # ==================== TABLE OPERATIONS ==================== def create_table( self, catalog: str, schema: str, table: str, columns: List[Dict[str, str]] = None, comment: str = None, table_type: str = "MANAGED" ) -> Dict: """Create a new table""" try: full_name = f"{catalog}.{schema}.{table}" # Default columns if none provided if not columns: columns = [ {"name": "id", "type_name": "BIGINT", "comment": "Primary key"}, {"name": "created_at", "type_name": "TIMESTAMP", "comment": "Creation timestamp"}, {"name": "data", "type_name": "STRING", "comment": "Data field"} ] # Build column definitions column_defs = [ ColumnInfo( name=col['name'], type_name=ColumnTypeName[col.get('type_name', 'STRING')], comment=col.get('comment') ) for col in columns ] table_obj = self.client.tables.create( name=table, catalog_name=catalog, schema_name=schema, columns=column_defs, table_type=TableType[table_type], data_source_format=DataSourceFormat.DELTA, comment=comment or f"Table created via chatbot on {datetime.now().isoformat()}" ) logger.info(f"Created table: {full_name}") # Generate SQL col_sql = ",\n ".join([ f"{col['name']} {col.get('type_name', 'STRING')}" for col in columns ]) return { 'success': True, 'message': f"Successfully created table '{full_name}'", 'table': { 'name': table_obj.name, 'full_name': full_name, 'owner': table_obj.owner, 'table_type': str(table_obj.table_type) }, 'sql': f"CREATE TABLE IF NOT EXISTS {full_name} (\n {col_sql}\n) USING DELTA" } except Exception as e: logger.error(f"Error creating table {catalog}.{schema}.{table}: {e}") return { 'success': False, 'message': f"Failed to create table: {str(e)}" } def list_tables(self, catalog: str, schema: str) -> Dict: """List all tables in a schema""" try: tables = list(self.client.tables.list( catalog_name=catalog, schema_name=schema )) return { 'success': True, 'message': f"Found {len(tables)} table(s) in {catalog}.{schema}", 'tables': [ { 'name': tbl.name, 'full_name': tbl.full_name, 'owner': tbl.owner, 'table_type': str(tbl.table_type), 'data_source_format': str(tbl.data_source_format) } for tbl in tables ], 'sql': f"SHOW TABLES IN {catalog}.{schema}" } except Exception as e: return { 'success': False, 'message': f"Failed to list tables: {str(e)}" } def get_table(self, catalog: str, schema: str, table: str) -> Dict: """Get table details including columns""" try: full_name = f"{catalog}.{schema}.{table}" table_obj = self.client.tables.get(full_name) return { 'success': True, 'table': { 'name': table_obj.name, 'full_name': full_name, 'owner': table_obj.owner, 'table_type': str(table_obj.table_type), 'data_source_format': str(table_obj.data_source_format), 'columns': [ { 'name': col.name, 'type': str(col.type_name), 'comment': col.comment } for col in (table_obj.columns or []) ], 'comment': table_obj.comment } } except Exception as e: return { 'success': False, 'message': f"Failed to get table details: {str(e)}" } # ==================== PERMISSION OPERATIONS ==================== def grant_permission( self, principal: str, privilege: str, securable_type: str, securable_name: str ) -> Dict: """Grant permission to a user or group""" try: # Map privilege strings to enum privilege_map = { 'SELECT': Privilege.SELECT, 'MODIFY': Privilege.MODIFY, 'CREATE': Privilege.CREATE, 'USAGE': Privilege.USAGE, 'READ_METADATA': Privilege.READ_METADATA, 'CREATE_TABLE': Privilege.CREATE_TABLE, 'CREATE_SCHEMA': Privilege.CREATE_SCHEMA, 'USE_CATALOG': Privilege.USE_CATALOG, 'USE_SCHEMA': Privilege.USE_SCHEMA, 'ALL_PRIVILEGES': Privilege.ALL_PRIVILEGES } privilege_enum = privilege_map.get(privilege.upper()) if not privilege_enum: return { 'success': False, 'message': f"Invalid privilege: {privilege}" } # Map securable type securable_type_map = { 'CATALOG': SecurableType.CATALOG, 'SCHEMA': SecurableType.SCHEMA, 'TABLE': SecurableType.TABLE, 'VOLUME': SecurableType.VOLUME, 'FUNCTION': SecurableType.FUNCTION } securable_enum = securable_type_map.get(securable_type.upper()) self.client.grants.update( securable_type=securable_enum, full_name=securable_name, changes=[ PermissionsChange( add=[privilege_enum], principal=principal ) ] ) logger.info(f"Granted {privilege} on {securable_name} to {principal}") return { 'success': True, 'message': f"Granted {privilege} on '{securable_name}' to '{principal}'", 'sql': f"GRANT {privilege.upper()} ON {securable_name} TO `{principal}`" } except Exception as e: logger.error(f"Error granting permission: {e}") return { 'success': False, 'message': f"Failed to grant permission: {str(e)}" } def revoke_permission( self, principal: str, privilege: str, securable_type: str, securable_name: str ) -> Dict: """Revoke permission from a user or group""" try: privilege_map = { 'SELECT': Privilege.SELECT, 'MODIFY': Privilege.MODIFY, 'CREATE': Privilege.CREATE, 'USAGE': Privilege.USAGE, 'ALL_PRIVILEGES': Privilege.ALL_PRIVILEGES } privilege_enum = privilege_map.get(privilege.upper()) securable_type_map = { 'CATALOG': SecurableType.CATALOG, 'SCHEMA': SecurableType.SCHEMA, 'TABLE': SecurableType.TABLE } securable_enum = securable_type_map.get(securable_type.upper()) self.client.grants.update( securable_type=securable_enum, full_name=securable_name, changes=[ PermissionsChange( remove=[privilege_enum], principal=principal ) ] ) logger.info(f"Revoked {privilege} on {securable_name} from {principal}") return { 'success': True, 'message': f"Revoked {privilege} on '{securable_name}' from '{principal}'", 'sql': f"REVOKE {privilege.upper()} ON {securable_name} FROM `{principal}`" } except Exception as e: return { 'success': False, 'message': f"Failed to revoke permission: {str(e)}" } def show_grants(self, securable_type: str, securable_name: str) -> Dict: """Show all grants on a securable object""" try: securable_type_map = { 'CATALOG': SecurableType.CATALOG, 'SCHEMA': SecurableType.SCHEMA, 'TABLE': SecurableType.TABLE } securable_enum = securable_type_map.get(securable_type.upper()) grants = self.client.grants.get( securable_type=securable_enum, full_name=securable_name ) permissions = [] if grants.privilege_assignments: for assignment in grants.privilege_assignments: permissions.append({ 'principal': assignment.principal, 'privileges': [str(p) for p in (assignment.privileges or [])] }) return { 'success': True, 'message': f"Permissions for '{securable_name}'", 'permissions': permissions, 'sql': f"SHOW GRANTS ON {securable_name}" } except Exception as e: return { 'success': False, 'message': f"Failed to show grants: {str(e)}" } def set_owner(self, securable_type: str, securable_name: str, owner: str) -> Dict: """Set the owner of a securable object""" try: if securable_type.upper() == 'CATALOG': self.client.catalogs.update(securable_name, owner=owner) elif securable_type.upper() == 'SCHEMA': self.client.schemas.update(securable_name, owner=owner) elif securable_type.upper() == 'TABLE': self.client.tables.update(securable_name, owner=owner) else: return { 'success': False, 'message': f"Invalid securable type: {securable_type}" } return { 'success': True, 'message': f"Set owner of '{securable_name}' to '{owner}'", 'sql': f"ALTER {securable_type.upper()} {securable_name} OWNER TO `{owner}`" } except Exception as e: return { 'success': False, 'message': f"Failed to set owner: {str(e)}" } # ==================== HELPER METHODS ==================== def execute_sql(self, sql: str, warehouse_id: str = None) -> Dict: """Execute a SQL statement""" try: # This would require SQL execution API # For now, return the SQL that should be executed return { 'success': True, 'message': "SQL statement prepared", 'sql': sql, 'note': "Execute this SQL in a Databricks SQL Warehouse or notebook" } except Exception as e: return { 'success': False, 'message': f"Failed to execute SQL: {str(e)}" } def validate_name(self, name: str) -> bool: """Validate a catalog/schema/table name""" # Unity Catalog naming rules pattern = r'^[a-zA-Z0-9_]+$' return bool(re.match(pattern, name)) # ==================== USAGE EXAMPLES ==================== if __name__ == "__main__": # Initialize service service = UnityCatalogService() # Example operations print("=== Unity Catalog Chatbot Service ===\n") # Create catalog result = service.create_catalog( name="demo_catalog", comment="Demo catalog created by chatbot" ) print(f"Create Catalog: {result}\n") # Create schema result = service.create_schema( catalog="demo_catalog", schema="analytics", comment="Analytics schema" ) print(f"Create Schema: {result}\n") # Create table result = service.create_table( catalog="demo_catalog", schema="analytics", table="customers", columns=[ {"name": "customer_id", "type_name": "BIGINT"}, {"name": "name", "type_name": "STRING"}, {"name": "email", "type_name": "STRING"}, {"name": "created_at", "type_name": "TIMESTAMP"} ] ) print(f"Create Table: {result}\n") # Grant permission result = service.grant_permission( principal="data_analysts", privilege="SELECT", securable_type="TABLE", securable_name="demo_catalog.analytics.customers" ) print(f"Grant Permission: {result}\n") # Show grants result = service.show_grants( securable_type="TABLE", securable_name="demo_catalog.analytics.customers" ) print(f"Show Grants: {result}\n")