Data_eng_designer / agent_notebook.py
focustiki's picture
Upload 12 files
9bcadf3 verified
# Databricks notebook source
# MAGIC %md
# MAGIC # πŸ—„οΈ Data Engineering Knowledge Agent β€” Databricks Deployment
# MAGIC
# MAGIC This notebook deploys the DE Knowledge Assistant as a **Databricks Model Serving endpoint**.
# MAGIC
# MAGIC Architecture:
# MAGIC ```
# MAGIC [PDF Knowledge Base] β†’ [ChromaDB Vectors] β†’ [MLflow PyFunc Agent] β†’ [Databricks Model Serving] β†’ [FastAPI PWA]
# MAGIC ```
# MAGIC
# MAGIC Prerequisites (all free on Databricks Community Edition or trial):
# MAGIC - Databricks workspace (community.cloud.databricks.com)
# MAGIC - GROQ_API_KEY stored in Databricks Secrets
# MAGIC - Unity Catalog enabled (optional but recommended)
# COMMAND ----------
# MAGIC %pip install groq langchain langchain-community chromadb sentence-transformers pypdf mlflow fastapi uvicorn
# MAGIC dbutils.library.restartPython()
# COMMAND ----------
import os
import mlflow
import mlflow.pyfunc
from mlflow.models import infer_signature
import pandas as pd
# ── 1. Configuration ──────────────────────────────────────────────────────────
EXPERIMENT_NAME = "/Users/your-email@domain.com/de-knowledge-assistant"
MODEL_NAME = "de_knowledge_agent"
PDF_VOLUME_PATH = "/Volumes/main/default/knowledge/data_engineering_patterns.pdf"
# ^ Upload the PDF to a Unity Catalog Volume first:
# databricks fs cp data_engineering_patterns.pdf dbfs:/Volumes/main/default/knowledge/
# Retrieve API key from Databricks secrets (safe β€” never hardcode)
GROQ_API_KEY = dbutils.secrets.get(scope="de-assistant", key="groq-api-key")
# Create the secret scope first:
# databricks secrets create-scope --scope de-assistant
# databricks secrets put --scope de-assistant --key groq-api-key
# COMMAND ----------
# MAGIC %md ## 2. Define the MLflow PyFunc Model
# COMMAND ----------
import sys
sys.path.insert(0, "/Workspace/Repos/your-repo/de-assistant") # adjust to your repo path
from rag import DataEngineeringRAG
from agent import DataEngineeringAgent, DEAgentPyFunc
class DEKnowledgeAssistant(mlflow.pyfunc.PythonModel):
"""
MLflow PyFunc wrapper that:
1. Loads the PDF β†’ builds ChromaDB vectors on model load
2. Exposes a predict() method compatible with Databricks Model Serving
3. Supports chat history for multi-turn conversations
"""
def load_context(self, context: mlflow.pyfunc.PythonModelContext):
"""Called once when the model is loaded into serving."""
import os
pdf_path = context.artifacts.get("pdf_path", PDF_VOLUME_PATH)
groq_key = os.environ.get("GROQ_API_KEY", GROQ_API_KEY)
self.rag = DataEngineeringRAG(pdf_path=pdf_path, groq_api_key=groq_key)
self.rag.initialize()
self.agent = DataEngineeringAgent(rag=self.rag, groq_api_key=groq_key)
print("βœ… DE Knowledge Agent loaded and ready")
def predict(
self,
context: mlflow.pyfunc.PythonModelContext,
model_input: pd.DataFrame,
params: dict = None,
) -> pd.Series:
"""
Input DataFrame columns:
- message (str): user question
- history (str, JSON): previous conversation turns
Returns: pd.Series of string responses
"""
import json
def process_row(row):
history = []
if row.get("history"):
try:
history = json.loads(row["history"])
except Exception:
history = []
return self.agent.invoke(message=row["message"], history=history)
return model_input.apply(process_row, axis=1)
# COMMAND ----------
# MAGIC %md ## 3. Log the model to MLflow
# COMMAND ----------
mlflow.set_experiment(EXPERIMENT_NAME)
# Example input/output for signature inference
sample_input = pd.DataFrame([{
"message": "What is the Medallion architecture?",
"history": "[]",
}])
with mlflow.start_run(run_name="de_knowledge_agent_v1") as run:
# Log hyperparameters
mlflow.log_params({
"llm_model": "llama-3.1-8b-instant",
"embedding_model": "all-MiniLM-L6-v2",
"chunk_size": 800,
"chunk_overlap": 160,
"retrieval_strategy": "mmr",
"top_k": 5,
})
# Infer signature from sample data
model = DEKnowledgeAssistant()
signature = infer_signature(
model_input=sample_input,
model_output=pd.Series(["Sample response from DE agent"]),
)
# Log the model
mlflow.pyfunc.log_model(
artifact_path="de_agent",
python_model=model,
artifacts={"pdf_path": PDF_VOLUME_PATH},
signature=signature,
pip_requirements=[
"groq>=0.9.0",
"langchain>=0.2.0",
"langchain-community>=0.2.0",
"chromadb>=0.5.0",
"sentence-transformers>=3.0.0",
"pypdf>=4.0.0",
"fastapi>=0.111.0",
"uvicorn>=0.30.0",
],
registered_model_name=MODEL_NAME,
)
print(f"βœ… Model logged β€” Run ID: {run.info.run_id}")
# COMMAND ----------
# MAGIC %md ## 4. Register and deploy to Model Serving
# COMMAND ----------
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Get the latest version
latest = client.get_latest_versions(MODEL_NAME, stages=["None"])[0]
version = latest.version
print(f"Latest model version: {version}")
# Transition to Production
client.transition_model_version_stage(
name=MODEL_NAME,
version=version,
stage="Production",
archive_existing_versions=True,
)
print(f"βœ… Model v{version} promoted to Production")
# COMMAND ----------
# MAGIC %md
# MAGIC ## 5. Create a Databricks Model Serving endpoint
# MAGIC
# MAGIC Run this via the Databricks SDK or UI:
# MAGIC
# MAGIC **UI path**: Machine Learning β†’ Serving β†’ Create Serving Endpoint
# MAGIC - Name: `de-knowledge-assistant`
# MAGIC - Model: `de_knowledge_agent` (Production)
# MAGIC - Compute: Small (CPU) β€” sufficient for this workload
# MAGIC - Environment variables: `GROQ_API_KEY` = your Groq key
# COMMAND ----------
# MAGIC # (Optional) SDK deployment
try:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import (
EndpointCoreConfigInput,
ServedModelInput,
ServedModelInputWorkloadSize,
)
w = WorkspaceClient()
endpoint_config = EndpointCoreConfigInput(
name="de-knowledge-assistant",
served_models=[
ServedModelInput(
model_name=MODEL_NAME,
model_version=str(version),
workload_size=ServedModelInputWorkloadSize.SMALL,
scale_to_zero_enabled=True, # cost-saving: scale down when idle
environment_vars={"GROQ_API_KEY": "{{secrets/de-assistant/groq-api-key}}"},
)
],
)
w.serving_endpoints.create(config=endpoint_config)
print("βœ… Serving endpoint created β€” check Databricks UI for status")
except ImportError:
print("databricks-sdk not installed β€” create the endpoint via Databricks UI instead")
# COMMAND ----------
# MAGIC %md
# MAGIC ## 6. Test the endpoint
# COMMAND ----------
import requests
import json
ENDPOINT_URL = "https://<your-workspace>.azuredatabricks.net/serving-endpoints/de-knowledge-assistant/invocations"
TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
test_payload = {
"dataframe_records": [
{
"message": "Explain the Medallion architecture and give a PySpark example",
"history": "[]",
}
]
}
response = requests.post(
ENDPOINT_URL,
headers={"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"},
data=json.dumps(test_payload),
timeout=60,
)
print("Status:", response.status_code)
print("Response:", response.json()["predictions"][0][:500])
# COMMAND ----------
# MAGIC %md
# MAGIC ## 7. Connect the FastAPI PWA to your Databricks endpoint
# MAGIC
# MAGIC Update `app.py` β†’ replace the Groq streaming call with the Databricks endpoint:
# MAGIC
# MAGIC ```python
# MAGIC # In agent.py, add this alternative invoke method:
# MAGIC def invoke_via_databricks(self, message: str, history: list) -> str:
# MAGIC import requests, json
# MAGIC payload = {"dataframe_records": [{"message": message, "history": json.dumps(history)}]}
# MAGIC r = requests.post(
# MAGIC os.environ["DATABRICKS_ENDPOINT_URL"],
# MAGIC headers={"Authorization": f"Bearer {os.environ['DATABRICKS_TOKEN']}"},
# MAGIC json=payload, timeout=30,
# MAGIC )
# MAGIC return r.json()["predictions"][0]
# MAGIC ```
# MAGIC
# MAGIC Set `DATABRICKS_ENDPOINT_URL` and `DATABRICKS_TOKEN` in your Hugging Face Spaces secrets.