unityCatalog-ChatBot / unity_catalog_service.py
Jagjeet Singh Makhija
Add files via upload
0cc7766 unverified
"""
Unity Catalog Chatbot Backend Service
Handles authentication and execution of Unity Catalog operations
"""
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import *
from typing import Dict, List, Optional, Any
import os
import re
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class UnityCatalogService:
"""Service for managing Unity Catalog operations through natural language"""
def __init__(self, workspace_url: str = None, token: str = None):
"""
Initialize Databricks workspace client
Args:
workspace_url: Databricks workspace URL
token: Personal access token
"""
self.workspace_url = workspace_url or os.getenv("DATABRICKS_HOST")
self.token = token or os.getenv("DATABRICKS_TOKEN")
self.client = WorkspaceClient(
host=self.workspace_url,
token=self.token
)
# Cache for frequently accessed data
self._catalog_cache = {}
self._schema_cache = {}
def parse_object_path(self, path: str) -> Dict[str, str]:
"""Parse a Unity Catalog object path into components"""
parts = path.split('.')
if len(parts) == 1:
return {'catalog': parts[0]}
elif len(parts) == 2:
return {'catalog': parts[0], 'schema': parts[1]}
elif len(parts) == 3:
return {'catalog': parts[0], 'schema': parts[1], 'table': parts[2]}
else:
raise ValueError(f"Invalid object path: {path}")
# ==================== CATALOG OPERATIONS ====================
def create_catalog(self, name: str, comment: str = None, properties: Dict = None) -> Dict:
"""Create a new catalog"""
try:
catalog = self.client.catalogs.create(
name=name,
comment=comment or f"Catalog created via chatbot on {datetime.now().isoformat()}",
properties=properties or {}
)
logger.info(f"Created catalog: {name}")
return {
'success': True,
'message': f"Successfully created catalog '{name}'",
'catalog': {
'name': catalog.name,
'owner': catalog.owner,
'created_at': catalog.created_at
},
'sql': f"CREATE CATALOG IF NOT EXISTS {name}"
}
except Exception as e:
logger.error(f"Error creating catalog {name}: {e}")
return {
'success': False,
'message': f"Failed to create catalog: {str(e)}"
}
def list_catalogs(self) -> Dict:
"""List all available catalogs"""
try:
catalogs = list(self.client.catalogs.list())
return {
'success': True,
'message': f"Found {len(catalogs)} catalog(s)",
'catalogs': [
{
'name': cat.name,
'owner': cat.owner,
'comment': cat.comment
}
for cat in catalogs
],
'sql': "SHOW CATALOGS"
}
except Exception as e:
logger.error(f"Error listing catalogs: {e}")
return {
'success': False,
'message': f"Failed to list catalogs: {str(e)}"
}
def get_catalog(self, name: str) -> Dict:
"""Get catalog details"""
try:
catalog = self.client.catalogs.get(name)
return {
'success': True,
'catalog': {
'name': catalog.name,
'owner': catalog.owner,
'comment': catalog.comment,
'created_at': catalog.created_at,
'updated_at': catalog.updated_at,
'properties': catalog.properties
}
}
except Exception as e:
return {
'success': False,
'message': f"Catalog not found: {str(e)}"
}
def delete_catalog(self, name: str, force: bool = False) -> Dict:
"""Delete a catalog"""
try:
self.client.catalogs.delete(name, force=force)
return {
'success': True,
'message': f"Successfully deleted catalog '{name}'",
'sql': f"DROP CATALOG {name} {'CASCADE' if force else ''}"
}
except Exception as e:
return {
'success': False,
'message': f"Failed to delete catalog: {str(e)}"
}
# ==================== SCHEMA OPERATIONS ====================
def create_schema(self, catalog: str, schema: str, comment: str = None) -> Dict:
"""Create a new schema"""
try:
full_name = f"{catalog}.{schema}"
schema_obj = self.client.schemas.create(
name=schema,
catalog_name=catalog,
comment=comment or f"Schema created via chatbot on {datetime.now().isoformat()}"
)
logger.info(f"Created schema: {full_name}")
return {
'success': True,
'message': f"Successfully created schema '{full_name}'",
'schema': {
'name': schema_obj.name,
'catalog': schema_obj.catalog_name,
'owner': schema_obj.owner
},
'sql': f"CREATE SCHEMA IF NOT EXISTS {full_name}"
}
except Exception as e:
logger.error(f"Error creating schema {catalog}.{schema}: {e}")
return {
'success': False,
'message': f"Failed to create schema: {str(e)}"
}
def list_schemas(self, catalog: str) -> Dict:
"""List all schemas in a catalog"""
try:
schemas = list(self.client.schemas.list(catalog_name=catalog))
return {
'success': True,
'message': f"Found {len(schemas)} schema(s) in catalog '{catalog}'",
'schemas': [
{
'name': sch.name,
'full_name': sch.full_name,
'owner': sch.owner,
'comment': sch.comment
}
for sch in schemas
],
'sql': f"SHOW SCHEMAS IN {catalog}"
}
except Exception as e:
return {
'success': False,
'message': f"Failed to list schemas: {str(e)}"
}
def delete_schema(self, catalog: str, schema: str, force: bool = False) -> Dict:
"""Delete a schema"""
try:
full_name = f"{catalog}.{schema}"
self.client.schemas.delete(full_name)
return {
'success': True,
'message': f"Successfully deleted schema '{full_name}'",
'sql': f"DROP SCHEMA {full_name} {'CASCADE' if force else ''}"
}
except Exception as e:
return {
'success': False,
'message': f"Failed to delete schema: {str(e)}"
}
# ==================== TABLE OPERATIONS ====================
def create_table(
self,
catalog: str,
schema: str,
table: str,
columns: List[Dict[str, str]] = None,
comment: str = None,
table_type: str = "MANAGED"
) -> Dict:
"""Create a new table"""
try:
full_name = f"{catalog}.{schema}.{table}"
# Default columns if none provided
if not columns:
columns = [
{"name": "id", "type_name": "BIGINT", "comment": "Primary key"},
{"name": "created_at", "type_name": "TIMESTAMP", "comment": "Creation timestamp"},
{"name": "data", "type_name": "STRING", "comment": "Data field"}
]
# Build column definitions
column_defs = [
ColumnInfo(
name=col['name'],
type_name=ColumnTypeName[col.get('type_name', 'STRING')],
comment=col.get('comment')
)
for col in columns
]
table_obj = self.client.tables.create(
name=table,
catalog_name=catalog,
schema_name=schema,
columns=column_defs,
table_type=TableType[table_type],
data_source_format=DataSourceFormat.DELTA,
comment=comment or f"Table created via chatbot on {datetime.now().isoformat()}"
)
logger.info(f"Created table: {full_name}")
# Generate SQL
col_sql = ",\n ".join([
f"{col['name']} {col.get('type_name', 'STRING')}"
for col in columns
])
return {
'success': True,
'message': f"Successfully created table '{full_name}'",
'table': {
'name': table_obj.name,
'full_name': full_name,
'owner': table_obj.owner,
'table_type': str(table_obj.table_type)
},
'sql': f"CREATE TABLE IF NOT EXISTS {full_name} (\n {col_sql}\n) USING DELTA"
}
except Exception as e:
logger.error(f"Error creating table {catalog}.{schema}.{table}: {e}")
return {
'success': False,
'message': f"Failed to create table: {str(e)}"
}
def list_tables(self, catalog: str, schema: str) -> Dict:
"""List all tables in a schema"""
try:
tables = list(self.client.tables.list(
catalog_name=catalog,
schema_name=schema
))
return {
'success': True,
'message': f"Found {len(tables)} table(s) in {catalog}.{schema}",
'tables': [
{
'name': tbl.name,
'full_name': tbl.full_name,
'owner': tbl.owner,
'table_type': str(tbl.table_type),
'data_source_format': str(tbl.data_source_format)
}
for tbl in tables
],
'sql': f"SHOW TABLES IN {catalog}.{schema}"
}
except Exception as e:
return {
'success': False,
'message': f"Failed to list tables: {str(e)}"
}
def get_table(self, catalog: str, schema: str, table: str) -> Dict:
"""Get table details including columns"""
try:
full_name = f"{catalog}.{schema}.{table}"
table_obj = self.client.tables.get(full_name)
return {
'success': True,
'table': {
'name': table_obj.name,
'full_name': full_name,
'owner': table_obj.owner,
'table_type': str(table_obj.table_type),
'data_source_format': str(table_obj.data_source_format),
'columns': [
{
'name': col.name,
'type': str(col.type_name),
'comment': col.comment
}
for col in (table_obj.columns or [])
],
'comment': table_obj.comment
}
}
except Exception as e:
return {
'success': False,
'message': f"Failed to get table details: {str(e)}"
}
# ==================== PERMISSION OPERATIONS ====================
def grant_permission(
self,
principal: str,
privilege: str,
securable_type: str,
securable_name: str
) -> Dict:
"""Grant permission to a user or group"""
try:
# Map privilege strings to enum
privilege_map = {
'SELECT': Privilege.SELECT,
'MODIFY': Privilege.MODIFY,
'CREATE': Privilege.CREATE,
'USAGE': Privilege.USAGE,
'READ_METADATA': Privilege.READ_METADATA,
'CREATE_TABLE': Privilege.CREATE_TABLE,
'CREATE_SCHEMA': Privilege.CREATE_SCHEMA,
'USE_CATALOG': Privilege.USE_CATALOG,
'USE_SCHEMA': Privilege.USE_SCHEMA,
'ALL_PRIVILEGES': Privilege.ALL_PRIVILEGES
}
privilege_enum = privilege_map.get(privilege.upper())
if not privilege_enum:
return {
'success': False,
'message': f"Invalid privilege: {privilege}"
}
# Map securable type
securable_type_map = {
'CATALOG': SecurableType.CATALOG,
'SCHEMA': SecurableType.SCHEMA,
'TABLE': SecurableType.TABLE,
'VOLUME': SecurableType.VOLUME,
'FUNCTION': SecurableType.FUNCTION
}
securable_enum = securable_type_map.get(securable_type.upper())
self.client.grants.update(
securable_type=securable_enum,
full_name=securable_name,
changes=[
PermissionsChange(
add=[privilege_enum],
principal=principal
)
]
)
logger.info(f"Granted {privilege} on {securable_name} to {principal}")
return {
'success': True,
'message': f"Granted {privilege} on '{securable_name}' to '{principal}'",
'sql': f"GRANT {privilege.upper()} ON {securable_name} TO `{principal}`"
}
except Exception as e:
logger.error(f"Error granting permission: {e}")
return {
'success': False,
'message': f"Failed to grant permission: {str(e)}"
}
def revoke_permission(
self,
principal: str,
privilege: str,
securable_type: str,
securable_name: str
) -> Dict:
"""Revoke permission from a user or group"""
try:
privilege_map = {
'SELECT': Privilege.SELECT,
'MODIFY': Privilege.MODIFY,
'CREATE': Privilege.CREATE,
'USAGE': Privilege.USAGE,
'ALL_PRIVILEGES': Privilege.ALL_PRIVILEGES
}
privilege_enum = privilege_map.get(privilege.upper())
securable_type_map = {
'CATALOG': SecurableType.CATALOG,
'SCHEMA': SecurableType.SCHEMA,
'TABLE': SecurableType.TABLE
}
securable_enum = securable_type_map.get(securable_type.upper())
self.client.grants.update(
securable_type=securable_enum,
full_name=securable_name,
changes=[
PermissionsChange(
remove=[privilege_enum],
principal=principal
)
]
)
logger.info(f"Revoked {privilege} on {securable_name} from {principal}")
return {
'success': True,
'message': f"Revoked {privilege} on '{securable_name}' from '{principal}'",
'sql': f"REVOKE {privilege.upper()} ON {securable_name} FROM `{principal}`"
}
except Exception as e:
return {
'success': False,
'message': f"Failed to revoke permission: {str(e)}"
}
def show_grants(self, securable_type: str, securable_name: str) -> Dict:
"""Show all grants on a securable object"""
try:
securable_type_map = {
'CATALOG': SecurableType.CATALOG,
'SCHEMA': SecurableType.SCHEMA,
'TABLE': SecurableType.TABLE
}
securable_enum = securable_type_map.get(securable_type.upper())
grants = self.client.grants.get(
securable_type=securable_enum,
full_name=securable_name
)
permissions = []
if grants.privilege_assignments:
for assignment in grants.privilege_assignments:
permissions.append({
'principal': assignment.principal,
'privileges': [str(p) for p in (assignment.privileges or [])]
})
return {
'success': True,
'message': f"Permissions for '{securable_name}'",
'permissions': permissions,
'sql': f"SHOW GRANTS ON {securable_name}"
}
except Exception as e:
return {
'success': False,
'message': f"Failed to show grants: {str(e)}"
}
def set_owner(self, securable_type: str, securable_name: str, owner: str) -> Dict:
"""Set the owner of a securable object"""
try:
if securable_type.upper() == 'CATALOG':
self.client.catalogs.update(securable_name, owner=owner)
elif securable_type.upper() == 'SCHEMA':
self.client.schemas.update(securable_name, owner=owner)
elif securable_type.upper() == 'TABLE':
self.client.tables.update(securable_name, owner=owner)
else:
return {
'success': False,
'message': f"Invalid securable type: {securable_type}"
}
return {
'success': True,
'message': f"Set owner of '{securable_name}' to '{owner}'",
'sql': f"ALTER {securable_type.upper()} {securable_name} OWNER TO `{owner}`"
}
except Exception as e:
return {
'success': False,
'message': f"Failed to set owner: {str(e)}"
}
# ==================== HELPER METHODS ====================
def execute_sql(self, sql: str, warehouse_id: str = None) -> Dict:
"""Execute a SQL statement"""
try:
# This would require SQL execution API
# For now, return the SQL that should be executed
return {
'success': True,
'message': "SQL statement prepared",
'sql': sql,
'note': "Execute this SQL in a Databricks SQL Warehouse or notebook"
}
except Exception as e:
return {
'success': False,
'message': f"Failed to execute SQL: {str(e)}"
}
def validate_name(self, name: str) -> bool:
"""Validate a catalog/schema/table name"""
# Unity Catalog naming rules
pattern = r'^[a-zA-Z0-9_]+$'
return bool(re.match(pattern, name))
# ==================== USAGE EXAMPLES ====================
if __name__ == "__main__":
# Initialize service
service = UnityCatalogService()
# Example operations
print("=== Unity Catalog Chatbot Service ===\n")
# Create catalog
result = service.create_catalog(
name="demo_catalog",
comment="Demo catalog created by chatbot"
)
print(f"Create Catalog: {result}\n")
# Create schema
result = service.create_schema(
catalog="demo_catalog",
schema="analytics",
comment="Analytics schema"
)
print(f"Create Schema: {result}\n")
# Create table
result = service.create_table(
catalog="demo_catalog",
schema="analytics",
table="customers",
columns=[
{"name": "customer_id", "type_name": "BIGINT"},
{"name": "name", "type_name": "STRING"},
{"name": "email", "type_name": "STRING"},
{"name": "created_at", "type_name": "TIMESTAMP"}
]
)
print(f"Create Table: {result}\n")
# Grant permission
result = service.grant_permission(
principal="data_analysts",
privilege="SELECT",
securable_type="TABLE",
securable_name="demo_catalog.analytics.customers"
)
print(f"Grant Permission: {result}\n")
# Show grants
result = service.show_grants(
securable_type="TABLE",
securable_name="demo_catalog.analytics.customers"
)
print(f"Show Grants: {result}\n")