Spaces:
Sleeping
Sleeping
| # Databricks notebook source | |
| # MAGIC %md | |
| # MAGIC # ποΈ Data Engineering Knowledge Agent β Databricks Deployment | |
| # MAGIC | |
| # MAGIC This notebook deploys the DE Knowledge Assistant as a **Databricks Model Serving endpoint**. | |
| # MAGIC | |
| # MAGIC Architecture: | |
| # MAGIC ``` | |
| # MAGIC [PDF Knowledge Base] β [ChromaDB Vectors] β [MLflow PyFunc Agent] β [Databricks Model Serving] β [FastAPI PWA] | |
| # MAGIC ``` | |
| # MAGIC | |
| # MAGIC Prerequisites (all free on Databricks Community Edition or trial): | |
| # MAGIC - Databricks workspace (community.cloud.databricks.com) | |
| # MAGIC - GROQ_API_KEY stored in Databricks Secrets | |
| # MAGIC - Unity Catalog enabled (optional but recommended) | |
| # COMMAND ---------- | |
| # MAGIC %pip install groq langchain langchain-community chromadb sentence-transformers pypdf mlflow fastapi uvicorn | |
| # MAGIC dbutils.library.restartPython() | |
| # COMMAND ---------- | |
| import os | |
| import mlflow | |
| import mlflow.pyfunc | |
| from mlflow.models import infer_signature | |
| import pandas as pd | |
| # ββ 1. Configuration ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| EXPERIMENT_NAME = "/Users/your-email@domain.com/de-knowledge-assistant" | |
| MODEL_NAME = "de_knowledge_agent" | |
| PDF_VOLUME_PATH = "/Volumes/main/default/knowledge/data_engineering_patterns.pdf" | |
| # ^ Upload the PDF to a Unity Catalog Volume first: | |
| # databricks fs cp data_engineering_patterns.pdf dbfs:/Volumes/main/default/knowledge/ | |
| # Retrieve API key from Databricks secrets (safe β never hardcode) | |
| GROQ_API_KEY = dbutils.secrets.get(scope="de-assistant", key="groq-api-key") | |
| # Create the secret scope first: | |
| # databricks secrets create-scope --scope de-assistant | |
| # databricks secrets put --scope de-assistant --key groq-api-key | |
| # COMMAND ---------- | |
| # MAGIC %md ## 2. Define the MLflow PyFunc Model | |
| # COMMAND ---------- | |
| import sys | |
| sys.path.insert(0, "/Workspace/Repos/your-repo/de-assistant") # adjust to your repo path | |
| from rag import DataEngineeringRAG | |
| from agent import DataEngineeringAgent, DEAgentPyFunc | |
| class DEKnowledgeAssistant(mlflow.pyfunc.PythonModel): | |
| """ | |
| MLflow PyFunc wrapper that: | |
| 1. Loads the PDF β builds ChromaDB vectors on model load | |
| 2. Exposes a predict() method compatible with Databricks Model Serving | |
| 3. Supports chat history for multi-turn conversations | |
| """ | |
| def load_context(self, context: mlflow.pyfunc.PythonModelContext): | |
| """Called once when the model is loaded into serving.""" | |
| import os | |
| pdf_path = context.artifacts.get("pdf_path", PDF_VOLUME_PATH) | |
| groq_key = os.environ.get("GROQ_API_KEY", GROQ_API_KEY) | |
| self.rag = DataEngineeringRAG(pdf_path=pdf_path, groq_api_key=groq_key) | |
| self.rag.initialize() | |
| self.agent = DataEngineeringAgent(rag=self.rag, groq_api_key=groq_key) | |
| print("β DE Knowledge Agent loaded and ready") | |
| def predict( | |
| self, | |
| context: mlflow.pyfunc.PythonModelContext, | |
| model_input: pd.DataFrame, | |
| params: dict = None, | |
| ) -> pd.Series: | |
| """ | |
| Input DataFrame columns: | |
| - message (str): user question | |
| - history (str, JSON): previous conversation turns | |
| Returns: pd.Series of string responses | |
| """ | |
| import json | |
| def process_row(row): | |
| history = [] | |
| if row.get("history"): | |
| try: | |
| history = json.loads(row["history"]) | |
| except Exception: | |
| history = [] | |
| return self.agent.invoke(message=row["message"], history=history) | |
| return model_input.apply(process_row, axis=1) | |
| # COMMAND ---------- | |
| # MAGIC %md ## 3. Log the model to MLflow | |
| # COMMAND ---------- | |
| mlflow.set_experiment(EXPERIMENT_NAME) | |
| # Example input/output for signature inference | |
| sample_input = pd.DataFrame([{ | |
| "message": "What is the Medallion architecture?", | |
| "history": "[]", | |
| }]) | |
| with mlflow.start_run(run_name="de_knowledge_agent_v1") as run: | |
| # Log hyperparameters | |
| mlflow.log_params({ | |
| "llm_model": "llama-3.1-8b-instant", | |
| "embedding_model": "all-MiniLM-L6-v2", | |
| "chunk_size": 800, | |
| "chunk_overlap": 160, | |
| "retrieval_strategy": "mmr", | |
| "top_k": 5, | |
| }) | |
| # Infer signature from sample data | |
| model = DEKnowledgeAssistant() | |
| signature = infer_signature( | |
| model_input=sample_input, | |
| model_output=pd.Series(["Sample response from DE agent"]), | |
| ) | |
| # Log the model | |
| mlflow.pyfunc.log_model( | |
| artifact_path="de_agent", | |
| python_model=model, | |
| artifacts={"pdf_path": PDF_VOLUME_PATH}, | |
| signature=signature, | |
| pip_requirements=[ | |
| "groq>=0.9.0", | |
| "langchain>=0.2.0", | |
| "langchain-community>=0.2.0", | |
| "chromadb>=0.5.0", | |
| "sentence-transformers>=3.0.0", | |
| "pypdf>=4.0.0", | |
| "fastapi>=0.111.0", | |
| "uvicorn>=0.30.0", | |
| ], | |
| registered_model_name=MODEL_NAME, | |
| ) | |
| print(f"β Model logged β Run ID: {run.info.run_id}") | |
| # COMMAND ---------- | |
| # MAGIC %md ## 4. Register and deploy to Model Serving | |
| # COMMAND ---------- | |
| from mlflow.tracking import MlflowClient | |
| client = MlflowClient() | |
| # Get the latest version | |
| latest = client.get_latest_versions(MODEL_NAME, stages=["None"])[0] | |
| version = latest.version | |
| print(f"Latest model version: {version}") | |
| # Transition to Production | |
| client.transition_model_version_stage( | |
| name=MODEL_NAME, | |
| version=version, | |
| stage="Production", | |
| archive_existing_versions=True, | |
| ) | |
| print(f"β Model v{version} promoted to Production") | |
| # COMMAND ---------- | |
| # MAGIC %md | |
| # MAGIC ## 5. Create a Databricks Model Serving endpoint | |
| # MAGIC | |
| # MAGIC Run this via the Databricks SDK or UI: | |
| # MAGIC | |
| # MAGIC **UI path**: Machine Learning β Serving β Create Serving Endpoint | |
| # MAGIC - Name: `de-knowledge-assistant` | |
| # MAGIC - Model: `de_knowledge_agent` (Production) | |
| # MAGIC - Compute: Small (CPU) β sufficient for this workload | |
| # MAGIC - Environment variables: `GROQ_API_KEY` = your Groq key | |
| # COMMAND ---------- | |
| # MAGIC # (Optional) SDK deployment | |
| try: | |
| from databricks.sdk import WorkspaceClient | |
| from databricks.sdk.service.serving import ( | |
| EndpointCoreConfigInput, | |
| ServedModelInput, | |
| ServedModelInputWorkloadSize, | |
| ) | |
| w = WorkspaceClient() | |
| endpoint_config = EndpointCoreConfigInput( | |
| name="de-knowledge-assistant", | |
| served_models=[ | |
| ServedModelInput( | |
| model_name=MODEL_NAME, | |
| model_version=str(version), | |
| workload_size=ServedModelInputWorkloadSize.SMALL, | |
| scale_to_zero_enabled=True, # cost-saving: scale down when idle | |
| environment_vars={"GROQ_API_KEY": "{{secrets/de-assistant/groq-api-key}}"}, | |
| ) | |
| ], | |
| ) | |
| w.serving_endpoints.create(config=endpoint_config) | |
| print("β Serving endpoint created β check Databricks UI for status") | |
| except ImportError: | |
| print("databricks-sdk not installed β create the endpoint via Databricks UI instead") | |
| # COMMAND ---------- | |
| # MAGIC %md | |
| # MAGIC ## 6. Test the endpoint | |
| # COMMAND ---------- | |
| import requests | |
| import json | |
| ENDPOINT_URL = "https://<your-workspace>.azuredatabricks.net/serving-endpoints/de-knowledge-assistant/invocations" | |
| TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() | |
| test_payload = { | |
| "dataframe_records": [ | |
| { | |
| "message": "Explain the Medallion architecture and give a PySpark example", | |
| "history": "[]", | |
| } | |
| ] | |
| } | |
| response = requests.post( | |
| ENDPOINT_URL, | |
| headers={"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"}, | |
| data=json.dumps(test_payload), | |
| timeout=60, | |
| ) | |
| print("Status:", response.status_code) | |
| print("Response:", response.json()["predictions"][0][:500]) | |
| # COMMAND ---------- | |
| # MAGIC %md | |
| # MAGIC ## 7. Connect the FastAPI PWA to your Databricks endpoint | |
| # MAGIC | |
| # MAGIC Update `app.py` β replace the Groq streaming call with the Databricks endpoint: | |
| # MAGIC | |
| # MAGIC ```python | |
| # MAGIC # In agent.py, add this alternative invoke method: | |
| # MAGIC def invoke_via_databricks(self, message: str, history: list) -> str: | |
| # MAGIC import requests, json | |
| # MAGIC payload = {"dataframe_records": [{"message": message, "history": json.dumps(history)}]} | |
| # MAGIC r = requests.post( | |
| # MAGIC os.environ["DATABRICKS_ENDPOINT_URL"], | |
| # MAGIC headers={"Authorization": f"Bearer {os.environ['DATABRICKS_TOKEN']}"}, | |
| # MAGIC json=payload, timeout=30, | |
| # MAGIC ) | |
| # MAGIC return r.json()["predictions"][0] | |
| # MAGIC ``` | |
| # MAGIC | |
| # MAGIC Set `DATABRICKS_ENDPOINT_URL` and `DATABRICKS_TOKEN` in your Hugging Face Spaces secrets. | |