# Databricks notebook source # MAGIC %md # MAGIC # 🗄️ Data Engineering Knowledge Agent — Databricks Deployment # MAGIC # MAGIC This notebook deploys the DE Knowledge Assistant as a **Databricks Model Serving endpoint**. # MAGIC # MAGIC Architecture: # MAGIC ``` # MAGIC [PDF Knowledge Base] → [ChromaDB Vectors] → [MLflow PyFunc Agent] → [Databricks Model Serving] → [FastAPI PWA] # MAGIC ``` # MAGIC # MAGIC Prerequisites (all free on Databricks Community Edition or trial): # MAGIC - Databricks workspace (community.cloud.databricks.com) # MAGIC - GROQ_API_KEY stored in Databricks Secrets # MAGIC - Unity Catalog enabled (optional but recommended) # COMMAND ---------- # MAGIC %pip install groq langchain langchain-community chromadb sentence-transformers pypdf mlflow fastapi uvicorn # MAGIC dbutils.library.restartPython() # COMMAND ---------- import os import mlflow import mlflow.pyfunc from mlflow.models import infer_signature import pandas as pd # ── 1. Configuration ────────────────────────────────────────────────────────── EXPERIMENT_NAME = "/Users/your-email@domain.com/de-knowledge-assistant" MODEL_NAME = "de_knowledge_agent" PDF_VOLUME_PATH = "/Volumes/main/default/knowledge/data_engineering_patterns.pdf" # ^ Upload the PDF to a Unity Catalog Volume first: # databricks fs cp data_engineering_patterns.pdf dbfs:/Volumes/main/default/knowledge/ # Retrieve API key from Databricks secrets (safe — never hardcode) GROQ_API_KEY = dbutils.secrets.get(scope="de-assistant", key="groq-api-key") # Create the secret scope first: # databricks secrets create-scope --scope de-assistant # databricks secrets put --scope de-assistant --key groq-api-key # COMMAND ---------- # MAGIC %md ## 2. Define the MLflow PyFunc Model # COMMAND ---------- import sys sys.path.insert(0, "/Workspace/Repos/your-repo/de-assistant") # adjust to your repo path from rag import DataEngineeringRAG from agent import DataEngineeringAgent, DEAgentPyFunc class DEKnowledgeAssistant(mlflow.pyfunc.PythonModel): """ MLflow PyFunc wrapper that: 1. Loads the PDF → builds ChromaDB vectors on model load 2. Exposes a predict() method compatible with Databricks Model Serving 3. Supports chat history for multi-turn conversations """ def load_context(self, context: mlflow.pyfunc.PythonModelContext): """Called once when the model is loaded into serving.""" import os pdf_path = context.artifacts.get("pdf_path", PDF_VOLUME_PATH) groq_key = os.environ.get("GROQ_API_KEY", GROQ_API_KEY) self.rag = DataEngineeringRAG(pdf_path=pdf_path, groq_api_key=groq_key) self.rag.initialize() self.agent = DataEngineeringAgent(rag=self.rag, groq_api_key=groq_key) print("✅ DE Knowledge Agent loaded and ready") def predict( self, context: mlflow.pyfunc.PythonModelContext, model_input: pd.DataFrame, params: dict = None, ) -> pd.Series: """ Input DataFrame columns: - message (str): user question - history (str, JSON): previous conversation turns Returns: pd.Series of string responses """ import json def process_row(row): history = [] if row.get("history"): try: history = json.loads(row["history"]) except Exception: history = [] return self.agent.invoke(message=row["message"], history=history) return model_input.apply(process_row, axis=1) # COMMAND ---------- # MAGIC %md ## 3. Log the model to MLflow # COMMAND ---------- mlflow.set_experiment(EXPERIMENT_NAME) # Example input/output for signature inference sample_input = pd.DataFrame([{ "message": "What is the Medallion architecture?", "history": "[]", }]) with mlflow.start_run(run_name="de_knowledge_agent_v1") as run: # Log hyperparameters mlflow.log_params({ "llm_model": "llama-3.1-8b-instant", "embedding_model": "all-MiniLM-L6-v2", "chunk_size": 800, "chunk_overlap": 160, "retrieval_strategy": "mmr", "top_k": 5, }) # Infer signature from sample data model = DEKnowledgeAssistant() signature = infer_signature( model_input=sample_input, model_output=pd.Series(["Sample response from DE agent"]), ) # Log the model mlflow.pyfunc.log_model( artifact_path="de_agent", python_model=model, artifacts={"pdf_path": PDF_VOLUME_PATH}, signature=signature, pip_requirements=[ "groq>=0.9.0", "langchain>=0.2.0", "langchain-community>=0.2.0", "chromadb>=0.5.0", "sentence-transformers>=3.0.0", "pypdf>=4.0.0", "fastapi>=0.111.0", "uvicorn>=0.30.0", ], registered_model_name=MODEL_NAME, ) print(f"✅ Model logged — Run ID: {run.info.run_id}") # COMMAND ---------- # MAGIC %md ## 4. Register and deploy to Model Serving # COMMAND ---------- from mlflow.tracking import MlflowClient client = MlflowClient() # Get the latest version latest = client.get_latest_versions(MODEL_NAME, stages=["None"])[0] version = latest.version print(f"Latest model version: {version}") # Transition to Production client.transition_model_version_stage( name=MODEL_NAME, version=version, stage="Production", archive_existing_versions=True, ) print(f"✅ Model v{version} promoted to Production") # COMMAND ---------- # MAGIC %md # MAGIC ## 5. Create a Databricks Model Serving endpoint # MAGIC # MAGIC Run this via the Databricks SDK or UI: # MAGIC # MAGIC **UI path**: Machine Learning → Serving → Create Serving Endpoint # MAGIC - Name: `de-knowledge-assistant` # MAGIC - Model: `de_knowledge_agent` (Production) # MAGIC - Compute: Small (CPU) — sufficient for this workload # MAGIC - Environment variables: `GROQ_API_KEY` = your Groq key # COMMAND ---------- # MAGIC # (Optional) SDK deployment try: from databricks.sdk import WorkspaceClient from databricks.sdk.service.serving import ( EndpointCoreConfigInput, ServedModelInput, ServedModelInputWorkloadSize, ) w = WorkspaceClient() endpoint_config = EndpointCoreConfigInput( name="de-knowledge-assistant", served_models=[ ServedModelInput( model_name=MODEL_NAME, model_version=str(version), workload_size=ServedModelInputWorkloadSize.SMALL, scale_to_zero_enabled=True, # cost-saving: scale down when idle environment_vars={"GROQ_API_KEY": "{{secrets/de-assistant/groq-api-key}}"}, ) ], ) w.serving_endpoints.create(config=endpoint_config) print("✅ Serving endpoint created — check Databricks UI for status") except ImportError: print("databricks-sdk not installed — create the endpoint via Databricks UI instead") # COMMAND ---------- # MAGIC %md # MAGIC ## 6. Test the endpoint # COMMAND ---------- import requests import json ENDPOINT_URL = "https://.azuredatabricks.net/serving-endpoints/de-knowledge-assistant/invocations" TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() test_payload = { "dataframe_records": [ { "message": "Explain the Medallion architecture and give a PySpark example", "history": "[]", } ] } response = requests.post( ENDPOINT_URL, headers={"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"}, data=json.dumps(test_payload), timeout=60, ) print("Status:", response.status_code) print("Response:", response.json()["predictions"][0][:500]) # COMMAND ---------- # MAGIC %md # MAGIC ## 7. Connect the FastAPI PWA to your Databricks endpoint # MAGIC # MAGIC Update `app.py` → replace the Groq streaming call with the Databricks endpoint: # MAGIC # MAGIC ```python # MAGIC # In agent.py, add this alternative invoke method: # MAGIC def invoke_via_databricks(self, message: str, history: list) -> str: # MAGIC import requests, json # MAGIC payload = {"dataframe_records": [{"message": message, "history": json.dumps(history)}]} # MAGIC r = requests.post( # MAGIC os.environ["DATABRICKS_ENDPOINT_URL"], # MAGIC headers={"Authorization": f"Bearer {os.environ['DATABRICKS_TOKEN']}"}, # MAGIC json=payload, timeout=30, # MAGIC ) # MAGIC return r.json()["predictions"][0] # MAGIC ``` # MAGIC # MAGIC Set `DATABRICKS_ENDPOINT_URL` and `DATABRICKS_TOKEN` in your Hugging Face Spaces secrets.