Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
Databricks Agent Bricks Implementation
This directory contains the Databricks Agent Bricks (Mosaic AI Agent Framework) implementation for CommunityOne - a generic civic engagement and community data platform.
Schema Files
communityone_schema.sql- Current comprehensive schema for all community data (jurisdictions, nonprofits, grants, meetings, observations)oral_health_schema.sql- DEPRECATED - Legacy oral health-specific schema (use communityone_schema.sql instead)
Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Databricks Workspace β
β β
β ββββββββββββββββββ ββββββββββββββββββββ β
β β Unity Catalog ββββββββ€ MLflow Tracking β β
β β - Models β β - Experiments β β
β β - Governance β β - Runs β β
β β - Lineage β β - Artifacts β β
β ββββββββββ¬ββββββββ ββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββ β
β β Model Serving Endpoints β β
β β ββββββββββββββββ βββββββββββββββββββ β β
β β β Classifier β β Sentiment β β β
β β β Agent β β Analyzer β β β
β β ββββββββββββββββ βββββββββββββββββββ β β
β β Auto-scaling β’ Observability β’ A/B β β
β ββββββββββββββ¬βββββββββββββββββββββββββββββ β
β β β
βββββββββββββββββΌβββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
ββββββββββββββββββ
β REST API β
β Clients β
ββββββββββββββββββ
Database Schema
CommunityOne Schema (communityone_schema.sql)
Comprehensive data warehouse schema supporting:
Dimension Tables:
dim_jurisdiction- Cities, counties, states, school districtsdim_organization- Nonprofits, foundations, churches (IRS EO-BMF)dim_geography- Geographic hierarchies and FIPS codesdim_date- Time dimension for temporal analysisdim_measure- Community outcome indicators (health, education, economic, social)
Fact Tables:
fact_communityone_observation- Community outcome measurements (replaces fact_oral_health_observation)fact_grant- NEW Grant transactions (990 Schedule I, 990-PF, USASpending.gov)fact_nonprofit_finance- NEW Annual 990 filings with revenue breakdownsfact_jurisdiction_budget- NEW Government budgets and spendingfact_meeting- NEW Government meetings and public hearings
Bridge Tables:
bridge_grant_program_area- NEW Multi-purpose grant program areas
Key Changes from oral_health_schema.sql
β
Generic community platform (not oral health-specific)
β
Grant tracking system aligned with ERD documentation
β
Nonprofit-government relationships via fact_grant
β
Foundation giving patterns (990-PF Schedule I data)
β
Complete financial transparency for grants and budgets
Components
1. MLflow Agent Base (agents/mlflow_base.py)
MLflowAgentBase: Base class for all agents with MLflow Pyfunc interfaceMLflowChainAgent: Base for LangChain-powered agents- Automatic tracing and observability
- Model Serving compatibility
2. Classifier Agent (agents/mlflow_classifier.py)
- Policy topic classification
- Hybrid keyword + LLM approach
- Unity Catalog registered
- Deployable to Model Serving
3. Deployment (databricks/deployment.py)
AgentDeploymentManager: Handles registration and deployment- Unity Catalog integration
- Endpoint management
- A/B testing support
4. Evaluation (databricks/evaluation.py)
AgentEvaluator: Quality metrics tracking- Automated evaluation pipelines
- Regression detection
- Version comparison
5. Notebooks (databricks/notebooks/)
- Interactive development environment
- Step-by-step deployment guide
- Evaluation examples
- Delta Lake integration
Getting Started
Option 1: Databricks Notebook (Recommended)
Import notebook to your workspace:
databricks workspace import \ databricks/notebooks/01_agent_bricks_quickstart.py \ /Users/your-email@company.com/oral-health-agentsAttach to a cluster with:
- DBR 14.3 LTS ML or higher
- Unity Catalog enabled
Run all cells to:
- Register agents
- Deploy to Model Serving
- Evaluate performance
Option 2: Python Script
Set environment variables:
export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com" export DATABRICKS_TOKEN="your-token" export OPENAI_API_KEY="your-openai-key"Register agents:
source venv/bin/activate python -m databricks.deploymentRun evaluation:
python -m databricks.evaluation
Unity Catalog Structure
main/ # Catalog
βββ agents/ # Schema for agent models
β βββ policy_classifier # Classifier agent
β βββ sentiment_analyzer # Sentiment agent
β βββ advocacy_writer # Advocacy agent
βββ policy_data/ # Schema for data
βββ raw_documents # Scraped documents
βββ classified_documents # Classified results
βββ advocacy_opportunities # Identified opportunities
Model Serving Endpoints
Development Endpoints
policy-classifier-dev: Classifier for testingsentiment-analyzer-dev: Sentiment analysisadvocacy-writer-dev: Content generation
Production Endpoints
policy-classifier-prod: Production classifiermulti-agent-pipeline: Full pipeline with traffic splitting
Deployment Workflow
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β Development ββββββΊβ Staging ββββββΊβ Production β
β β β β β β
β β’ Local test β β β’ A/B test β β β’ Monitor β
β β’ Register β β β’ Evaluate β β β’ Scale β
β β’ Deploy dev β β β’ Approve β β β’ Feedback β
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
API Usage
Invoke via REST
curl -X POST https://your-workspace.cloud.databricks.com/serving-endpoints/policy-classifier-prod/invocations \
-H "Authorization: Bearer $DATABRICKS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"dataframe_records": [{
"document_id": "doc_001",
"title": "City Council Meeting",
"content": "Discussion on water fluoridation..."
}]
}'
Invoke via Python SDK
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
response = w.serving_endpoints.query(
name="policy-classifier-prod",
dataframe_records=[{
"document_id": "doc_001",
"title": "City Council Meeting",
"content": "Discussion on water fluoridation..."
}]
)
print(response.predictions[0])
Monitoring & Observability
MLflow Tracing
- Automatic trace capture for all agent calls
- LLM request/response logging
- Latency tracking
- Cost estimation
View Traces
import mlflow
# Get traces for a run
traces = mlflow.get_traces(
experiment_id="your-experiment-id",
filter_string="attributes.agent_role = 'classifier'"
)
for trace in traces:
print(f"Trace ID: {trace.request_id}")
print(f"Latency: {trace.execution_time_ms}ms")
print(f"Status: {trace.status}")
Endpoint Metrics
- Request rate
- Latency (P50, P95, P99)
- Error rate
- Token usage
- Cost per request
Evaluation
Automated Evaluation
from databricks.evaluation import AgentEvaluator
evaluator = AgentEvaluator("policy_classifier")
metrics = evaluator.evaluate_classifier(
model_uri="models:/main.agents.policy_classifier/1",
test_documents=test_docs,
ground_truth=labels
)
print(f"Accuracy: {metrics.accuracy:.2%}")
print(f"F1 Score: {metrics.f1_score:.2%}")
A/B Testing
comparison = evaluator.compare_versions(
version_a="1",
version_b="2",
eval_data=eval_df
)
# Automatically promote if v2 is better
if comparison["improvements"]["accuracy"]["improvement_pct"] > 5:
# Promote to production
manager.deploy_agent(
agent_name="policy_classifier",
endpoint_name="policy-classifier-prod",
version="2"
)
Best Practices
- Version Control: Always register new versions to Unity Catalog
- Evaluate First: Run evaluation before deploying to production
- Monitor Continuously: Set up alerts for drift and errors
- Use Feedback: Collect corrections and retrain regularly
- Scale Gradually: Start with small workloads, scale up
- Cost Optimization: Use scale-to-zero for dev/staging endpoints
Cost Considerations
| Component | Estimated Cost |
|---|---|
| Model Serving (Small, scale-to-zero) | ~$0.10-0.50/hour active |
| MLflow Tracking | Included |
| Unity Catalog | Included |
| LLM API calls | $0.002-0.03 per request |
Cost Optimization Tips:
- Use keyword classification before LLM
- Enable scale-to-zero for dev endpoints
- Batch requests when possible
- Cache frequent queries
- Monitor token usage
Troubleshooting
Issue: Agent fails to load
# Check model status
from mlflow.tracking import MlflowClient
client = MlflowClient()
versions = client.search_model_versions(
filter_string="name='main.agents.policy_classifier'"
)
print(versions[0].status)
Issue: Endpoint is slow
- Check workload size (upgrade from Small to Medium)
- Enable auto-scaling
- Review LLM prompt length
- Add caching layer
Issue: High error rate
- Check MLflow traces for specific errors
- Verify input schema matches signature
- Review LLM API rate limits
- Check Unity Catalog permissions
Next Steps
- Deploy More Agents: Add sentiment analyzer and advocacy writer
- Create Workflows: Use Databricks Workflows for scheduled processing
- Add Feedback Loop: Store corrections in Delta Lake
- Set Up Alerts: Monitor for drift and errors
- Scale Production: Process thousands of documents