# Databricks Agent Bricks Implementation This directory contains the Databricks Agent Bricks (Mosaic AI Agent Framework) implementation for CommunityOne - a generic civic engagement and community data platform. ## Schema Files - **`communityone_schema.sql`** - Current comprehensive schema for all community data (jurisdictions, nonprofits, grants, meetings, observations) - **`oral_health_schema.sql`** - DEPRECATED - Legacy oral health-specific schema (use communityone_schema.sql instead) ## Architecture ``` ┌─────────────────────────────────────────────────────────────┐ │ Databricks Workspace │ │ │ │ ┌────────────────┐ ┌──────────────────┐ │ │ │ Unity Catalog │◄─────┤ MLflow Tracking │ │ │ │ - Models │ │ - Experiments │ │ │ │ - Governance │ │ - Runs │ │ │ │ - Lineage │ │ - Artifacts │ │ │ └────────┬───────┘ └──────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────┐ │ │ │ Model Serving Endpoints │ │ │ │ ┌──────────────┐ ┌─────────────────┐ │ │ │ │ │ Classifier │ │ Sentiment │ │ │ │ │ │ Agent │ │ Analyzer │ │ │ │ │ └──────────────┘ └─────────────────┘ │ │ │ │ Auto-scaling • Observability • A/B │ │ │ └────────────┬────────────────────────────┘ │ │ │ │ └───────────────┼──────────────────────────────────────────────┘ │ ▼ ┌────────────────┐ │ REST API │ │ Clients │ └────────────────┘ ``` ## Database Schema ### CommunityOne Schema (`communityone_schema.sql`) Comprehensive data warehouse schema supporting: **Dimension Tables:** - `dim_jurisdiction` - Cities, counties, states, school districts - `dim_organization` - Nonprofits, foundations, churches (IRS EO-BMF) - `dim_geography` - Geographic hierarchies and FIPS codes - `dim_date` - Time dimension for temporal analysis - `dim_measure` - Community outcome indicators (health, education, economic, social) **Fact Tables:** - `fact_communityone_observation` - Community outcome measurements (replaces fact_oral_health_observation) - `fact_grant` - **NEW** Grant transactions (990 Schedule I, 990-PF, USASpending.gov) - `fact_nonprofit_finance` - **NEW** Annual 990 filings with revenue breakdowns - `fact_jurisdiction_budget` - **NEW** Government budgets and spending - `fact_meeting` - **NEW** Government meetings and public hearings **Bridge Tables:** - `bridge_grant_program_area` - **NEW** Multi-purpose grant program areas ### Key Changes from oral_health_schema.sql ✅ **Generic community platform** (not oral health-specific) ✅ **Grant tracking system** aligned with ERD documentation ✅ **Nonprofit-government relationships** via fact_grant ✅ **Foundation giving patterns** (990-PF Schedule I data) ✅ **Complete financial transparency** for grants and budgets ## Components ### 1. MLflow Agent Base (`agents/mlflow_base.py`) - `MLflowAgentBase`: Base class for all agents with MLflow Pyfunc interface - `MLflowChainAgent`: Base for LangChain-powered agents - Automatic tracing and observability - Model Serving compatibility ### 2. Classifier Agent (`agents/mlflow_classifier.py`) - Policy topic classification - Hybrid keyword + LLM approach - Unity Catalog registered - Deployable to Model Serving ### 3. Deployment (`databricks/deployment.py`) - `AgentDeploymentManager`: Handles registration and deployment - Unity Catalog integration - Endpoint management - A/B testing support ### 4. Evaluation (`databricks/evaluation.py`) - `AgentEvaluator`: Quality metrics tracking - Automated evaluation pipelines - Regression detection - Version comparison ### 5. Notebooks (`databricks/notebooks/`) - Interactive development environment - Step-by-step deployment guide - Evaluation examples - Delta Lake integration ## Getting Started ### Option 1: Databricks Notebook (Recommended) 1. Import notebook to your workspace: ``` databricks workspace import \ databricks/notebooks/01_agent_bricks_quickstart.py \ /Users/your-email@company.com/oral-health-agents ``` 2. Attach to a cluster with: - DBR 14.3 LTS ML or higher - Unity Catalog enabled 3. Run all cells to: - Register agents - Deploy to Model Serving - Evaluate performance ### Option 2: Python Script 1. Set environment variables: ```bash export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com" export DATABRICKS_TOKEN="your-token" export OPENAI_API_KEY="your-openai-key" ``` 2. Register agents: ```bash source venv/bin/activate python -m databricks.deployment ``` 3. Run evaluation: ```bash python -m databricks.evaluation ``` ## Unity Catalog Structure ``` main/ # Catalog ├── agents/ # Schema for agent models │ ├── policy_classifier # Classifier agent │ ├── sentiment_analyzer # Sentiment agent │ └── advocacy_writer # Advocacy agent └── policy_data/ # Schema for data ├── raw_documents # Scraped documents ├── classified_documents # Classified results └── advocacy_opportunities # Identified opportunities ``` ## Model Serving Endpoints ### Development Endpoints - `policy-classifier-dev`: Classifier for testing - `sentiment-analyzer-dev`: Sentiment analysis - `advocacy-writer-dev`: Content generation ### Production Endpoints - `policy-classifier-prod`: Production classifier - `multi-agent-pipeline`: Full pipeline with traffic splitting ## Deployment Workflow ``` ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Development │────►│ Staging │────►│ Production │ │ │ │ │ │ │ │ • Local test │ │ • A/B test │ │ • Monitor │ │ • Register │ │ • Evaluate │ │ • Scale │ │ • Deploy dev │ │ • Approve │ │ • Feedback │ └──────────────┘ └──────────────┘ └──────────────┘ ``` ## API Usage ### Invoke via REST ```bash curl -X POST https://your-workspace.cloud.databricks.com/serving-endpoints/policy-classifier-prod/invocations \ -H "Authorization: Bearer $DATABRICKS_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "dataframe_records": [{ "document_id": "doc_001", "title": "City Council Meeting", "content": "Discussion on water fluoridation..." }] }' ``` ### Invoke via Python SDK ```python from databricks.sdk import WorkspaceClient w = WorkspaceClient() response = w.serving_endpoints.query( name="policy-classifier-prod", dataframe_records=[{ "document_id": "doc_001", "title": "City Council Meeting", "content": "Discussion on water fluoridation..." }] ) print(response.predictions[0]) ``` ## Monitoring & Observability ### MLflow Tracing - Automatic trace capture for all agent calls - LLM request/response logging - Latency tracking - Cost estimation ### View Traces ```python import mlflow # Get traces for a run traces = mlflow.get_traces( experiment_id="your-experiment-id", filter_string="attributes.agent_role = 'classifier'" ) for trace in traces: print(f"Trace ID: {trace.request_id}") print(f"Latency: {trace.execution_time_ms}ms") print(f"Status: {trace.status}") ``` ### Endpoint Metrics - Request rate - Latency (P50, P95, P99) - Error rate - Token usage - Cost per request ## Evaluation ### Automated Evaluation ```python from databricks.evaluation import AgentEvaluator evaluator = AgentEvaluator("policy_classifier") metrics = evaluator.evaluate_classifier( model_uri="models:/main.agents.policy_classifier/1", test_documents=test_docs, ground_truth=labels ) print(f"Accuracy: {metrics.accuracy:.2%}") print(f"F1 Score: {metrics.f1_score:.2%}") ``` ### A/B Testing ```python comparison = evaluator.compare_versions( version_a="1", version_b="2", eval_data=eval_df ) # Automatically promote if v2 is better if comparison["improvements"]["accuracy"]["improvement_pct"] > 5: # Promote to production manager.deploy_agent( agent_name="policy_classifier", endpoint_name="policy-classifier-prod", version="2" ) ``` ## Best Practices 1. **Version Control**: Always register new versions to Unity Catalog 2. **Evaluate First**: Run evaluation before deploying to production 3. **Monitor Continuously**: Set up alerts for drift and errors 4. **Use Feedback**: Collect corrections and retrain regularly 5. **Scale Gradually**: Start with small workloads, scale up 6. **Cost Optimization**: Use scale-to-zero for dev/staging endpoints ## Cost Considerations | Component | Estimated Cost | |-----------|---------------| | Model Serving (Small, scale-to-zero) | ~$0.10-0.50/hour active | | MLflow Tracking | Included | | Unity Catalog | Included | | LLM API calls | $0.002-0.03 per request | **Cost Optimization Tips:** - Use keyword classification before LLM - Enable scale-to-zero for dev endpoints - Batch requests when possible - Cache frequent queries - Monitor token usage ## Troubleshooting ### Issue: Agent fails to load ```python # Check model status from mlflow.tracking import MlflowClient client = MlflowClient() versions = client.search_model_versions( filter_string="name='main.agents.policy_classifier'" ) print(versions[0].status) ``` ### Issue: Endpoint is slow - Check workload size (upgrade from Small to Medium) - Enable auto-scaling - Review LLM prompt length - Add caching layer ### Issue: High error rate - Check MLflow traces for specific errors - Verify input schema matches signature - Review LLM API rate limits - Check Unity Catalog permissions ## Next Steps 1. **Deploy More Agents**: Add sentiment analyzer and advocacy writer 2. **Create Workflows**: Use Databricks Workflows for scheduled processing 3. **Add Feedback Loop**: Store corrections in Delta Lake 4. **Set Up Alerts**: Monitor for drift and errors 5. **Scale Production**: Process thousands of documents ## Resources - [Databricks Agent Framework Docs](https://docs.databricks.com/en/generative-ai/agent-framework/index.html) - [MLflow Agent Deployment](https://mlflow.org/docs/latest/llms/deployments/index.html) - [Unity Catalog AI](https://docs.databricks.com/en/generative-ai/unity-catalog-ai.html) - [Model Serving Guide](https://docs.databricks.com/en/machine-learning/model-serving/index.html)