File size: 8,865 Bytes
9bcadf3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# Databricks notebook source
# MAGIC %md
# MAGIC # πŸ—„οΈ Data Engineering Knowledge Agent β€” Databricks Deployment
# MAGIC
# MAGIC This notebook deploys the DE Knowledge Assistant as a **Databricks Model Serving endpoint**.
# MAGIC
# MAGIC Architecture:
# MAGIC ```
# MAGIC [PDF Knowledge Base] β†’ [ChromaDB Vectors] β†’ [MLflow PyFunc Agent] β†’ [Databricks Model Serving] β†’ [FastAPI PWA]
# MAGIC ```
# MAGIC
# MAGIC Prerequisites (all free on Databricks Community Edition or trial):
# MAGIC - Databricks workspace (community.cloud.databricks.com)
# MAGIC - GROQ_API_KEY stored in Databricks Secrets
# MAGIC - Unity Catalog enabled (optional but recommended)

# COMMAND ----------
# MAGIC %pip install groq langchain langchain-community chromadb sentence-transformers pypdf mlflow fastapi uvicorn
# MAGIC dbutils.library.restartPython()

# COMMAND ----------

import os
import mlflow
import mlflow.pyfunc
from mlflow.models import infer_signature
import pandas as pd

# ── 1. Configuration ──────────────────────────────────────────────────────────
EXPERIMENT_NAME = "/Users/your-email@domain.com/de-knowledge-assistant"
MODEL_NAME      = "de_knowledge_agent"
PDF_VOLUME_PATH = "/Volumes/main/default/knowledge/data_engineering_patterns.pdf"
# ^ Upload the PDF to a Unity Catalog Volume first:
# databricks fs cp data_engineering_patterns.pdf dbfs:/Volumes/main/default/knowledge/

# Retrieve API key from Databricks secrets (safe β€” never hardcode)
GROQ_API_KEY = dbutils.secrets.get(scope="de-assistant", key="groq-api-key")
# Create the secret scope first:
# databricks secrets create-scope --scope de-assistant
# databricks secrets put --scope de-assistant --key groq-api-key

# COMMAND ----------
# MAGIC %md ## 2. Define the MLflow PyFunc Model

# COMMAND ----------

import sys
sys.path.insert(0, "/Workspace/Repos/your-repo/de-assistant")  # adjust to your repo path

from rag import DataEngineeringRAG
from agent import DataEngineeringAgent, DEAgentPyFunc


class DEKnowledgeAssistant(mlflow.pyfunc.PythonModel):
    """
    MLflow PyFunc wrapper that:
    1. Loads the PDF β†’ builds ChromaDB vectors on model load
    2. Exposes a predict() method compatible with Databricks Model Serving
    3. Supports chat history for multi-turn conversations
    """

    def load_context(self, context: mlflow.pyfunc.PythonModelContext):
        """Called once when the model is loaded into serving."""
        import os

        pdf_path = context.artifacts.get("pdf_path", PDF_VOLUME_PATH)
        groq_key = os.environ.get("GROQ_API_KEY", GROQ_API_KEY)

        self.rag = DataEngineeringRAG(pdf_path=pdf_path, groq_api_key=groq_key)
        self.rag.initialize()
        self.agent = DataEngineeringAgent(rag=self.rag, groq_api_key=groq_key)

        print("βœ… DE Knowledge Agent loaded and ready")

    def predict(
        self,
        context: mlflow.pyfunc.PythonModelContext,
        model_input: pd.DataFrame,
        params: dict = None,
    ) -> pd.Series:
        """
        Input DataFrame columns:
          - message (str): user question
          - history (str, JSON): previous conversation turns

        Returns: pd.Series of string responses
        """
        import json

        def process_row(row):
            history = []
            if row.get("history"):
                try:
                    history = json.loads(row["history"])
                except Exception:
                    history = []
            return self.agent.invoke(message=row["message"], history=history)

        return model_input.apply(process_row, axis=1)


# COMMAND ----------
# MAGIC %md ## 3. Log the model to MLflow

# COMMAND ----------

mlflow.set_experiment(EXPERIMENT_NAME)

# Example input/output for signature inference
sample_input = pd.DataFrame([{
    "message": "What is the Medallion architecture?",
    "history": "[]",
}])

with mlflow.start_run(run_name="de_knowledge_agent_v1") as run:

    # Log hyperparameters
    mlflow.log_params({
        "llm_model": "llama-3.1-8b-instant",
        "embedding_model": "all-MiniLM-L6-v2",
        "chunk_size": 800,
        "chunk_overlap": 160,
        "retrieval_strategy": "mmr",
        "top_k": 5,
    })

    # Infer signature from sample data
    model = DEKnowledgeAssistant()

    signature = infer_signature(
        model_input=sample_input,
        model_output=pd.Series(["Sample response from DE agent"]),
    )

    # Log the model
    mlflow.pyfunc.log_model(
        artifact_path="de_agent",
        python_model=model,
        artifacts={"pdf_path": PDF_VOLUME_PATH},
        signature=signature,
        pip_requirements=[
            "groq>=0.9.0",
            "langchain>=0.2.0",
            "langchain-community>=0.2.0",
            "chromadb>=0.5.0",
            "sentence-transformers>=3.0.0",
            "pypdf>=4.0.0",
            "fastapi>=0.111.0",
            "uvicorn>=0.30.0",
        ],
        registered_model_name=MODEL_NAME,
    )

    print(f"βœ… Model logged β€” Run ID: {run.info.run_id}")

# COMMAND ----------
# MAGIC %md ## 4. Register and deploy to Model Serving

# COMMAND ----------

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Get the latest version
latest = client.get_latest_versions(MODEL_NAME, stages=["None"])[0]
version = latest.version
print(f"Latest model version: {version}")

# Transition to Production
client.transition_model_version_stage(
    name=MODEL_NAME,
    version=version,
    stage="Production",
    archive_existing_versions=True,
)
print(f"βœ… Model v{version} promoted to Production")

# COMMAND ----------
# MAGIC %md
# MAGIC ## 5. Create a Databricks Model Serving endpoint
# MAGIC
# MAGIC Run this via the Databricks SDK or UI:
# MAGIC
# MAGIC **UI path**: Machine Learning β†’ Serving β†’ Create Serving Endpoint
# MAGIC   - Name: `de-knowledge-assistant`
# MAGIC   - Model: `de_knowledge_agent` (Production)
# MAGIC   - Compute: Small (CPU) β€” sufficient for this workload
# MAGIC   - Environment variables: `GROQ_API_KEY` = your Groq key

# COMMAND ----------
# MAGIC # (Optional) SDK deployment

try:
    from databricks.sdk import WorkspaceClient
    from databricks.sdk.service.serving import (
        EndpointCoreConfigInput,
        ServedModelInput,
        ServedModelInputWorkloadSize,
    )

    w = WorkspaceClient()

    endpoint_config = EndpointCoreConfigInput(
        name="de-knowledge-assistant",
        served_models=[
            ServedModelInput(
                model_name=MODEL_NAME,
                model_version=str(version),
                workload_size=ServedModelInputWorkloadSize.SMALL,
                scale_to_zero_enabled=True,   # cost-saving: scale down when idle
                environment_vars={"GROQ_API_KEY": "{{secrets/de-assistant/groq-api-key}}"},
            )
        ],
    )

    w.serving_endpoints.create(config=endpoint_config)
    print("βœ… Serving endpoint created β€” check Databricks UI for status")

except ImportError:
    print("databricks-sdk not installed β€” create the endpoint via Databricks UI instead")

# COMMAND ----------
# MAGIC %md
# MAGIC ## 6. Test the endpoint

# COMMAND ----------

import requests
import json

ENDPOINT_URL = "https://<your-workspace>.azuredatabricks.net/serving-endpoints/de-knowledge-assistant/invocations"
TOKEN        = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()

test_payload = {
    "dataframe_records": [
        {
            "message": "Explain the Medallion architecture and give a PySpark example",
            "history": "[]",
        }
    ]
}

response = requests.post(
    ENDPOINT_URL,
    headers={"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"},
    data=json.dumps(test_payload),
    timeout=60,
)

print("Status:", response.status_code)
print("Response:", response.json()["predictions"][0][:500])

# COMMAND ----------
# MAGIC %md
# MAGIC ## 7. Connect the FastAPI PWA to your Databricks endpoint
# MAGIC
# MAGIC Update `app.py` β†’ replace the Groq streaming call with the Databricks endpoint:
# MAGIC
# MAGIC ```python
# MAGIC # In agent.py, add this alternative invoke method:
# MAGIC def invoke_via_databricks(self, message: str, history: list) -> str:
# MAGIC     import requests, json
# MAGIC     payload = {"dataframe_records": [{"message": message, "history": json.dumps(history)}]}
# MAGIC     r = requests.post(
# MAGIC         os.environ["DATABRICKS_ENDPOINT_URL"],
# MAGIC         headers={"Authorization": f"Bearer {os.environ['DATABRICKS_TOKEN']}"},
# MAGIC         json=payload, timeout=30,
# MAGIC     )
# MAGIC     return r.json()["predictions"][0]
# MAGIC ```
# MAGIC
# MAGIC Set `DATABRICKS_ENDPOINT_URL` and `DATABRICKS_TOKEN` in your Hugging Face Spaces secrets.