Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| # Databricks notebook source | |
| # MAGIC %md | |
| # MAGIC # Oral Health Policy Pulse - Example Analysis | |
| # MAGIC | |
| # MAGIC This notebook demonstrates how to use the Oral Health Policy Pulse system | |
| # MAGIC to analyze local government meeting minutes and identify advocacy opportunities. | |
| # COMMAND ---------- | |
| # MAGIC %md | |
| # MAGIC ## Setup | |
| # COMMAND ---------- | |
| from agents.orchestrator import OrchestratorAgent | |
| from agents.scraper import ScraperAgent | |
| from agents.parser import ParserAgent | |
| from agents.classifier import ClassifierAgent | |
| from agents.sentiment import SentimentAnalyzerAgent | |
| from agents.advocacy import AdvocacyWriterAgent | |
| from pipeline.delta_lake import DeltaLakePipeline | |
| from visualization.heatmap import AdvocacyHeatmap | |
| import pandas as pd | |
| # COMMAND ---------- | |
| # MAGIC %md | |
| # MAGIC ## Initialize Components | |
| # COMMAND ---------- | |
| # Initialize pipeline | |
| pipeline = DeltaLakePipeline() | |
| # Initialize orchestrator | |
| orchestrator = OrchestratorAgent() | |
| # Register agents | |
| orchestrator.register_agent(ScraperAgent()) | |
| orchestrator.register_agent(ParserAgent()) | |
| orchestrator.register_agent(ClassifierAgent()) | |
| orchestrator.register_agent(SentimentAnalyzerAgent()) | |
| orchestrator.register_agent(AdvocacyWriterAgent()) | |
| # COMMAND ---------- | |
| # MAGIC %md | |
| # MAGIC ## Define Scraping Targets | |
| # COMMAND ---------- | |
| # Example targets across multiple cities | |
| targets = [ | |
| { | |
| "url": "https://example-city.legistar.com/Calendar.aspx", | |
| "municipality": "Example City", | |
| "state": "CA", | |
| "platform": "legistar" | |
| }, | |
| { | |
| "url": "https://another-city.gov/meetings", | |
| "municipality": "Another City", | |
| "state": "NY", | |
| "platform": "generic" | |
| } | |
| ] | |
| # COMMAND ---------- | |
| # MAGIC %md | |
| # MAGIC ## Run Analysis Pipeline | |
| # COMMAND ---------- | |
| # Execute pipeline | |
| results = await orchestrator.execute_pipeline( | |
| scrape_targets=targets, | |
| date_range={ | |
| "start": "2024-01-01", | |
| "end": "2024-12-31" | |
| } | |
| ) | |
| print(f"Pipeline Status: {results['success']}") | |
| # COMMAND ---------- | |
| # MAGIC %md | |
| # MAGIC ## Query Results from Delta Lake | |
| # COMMAND ---------- | |
| # Query advocacy opportunities | |
| opportunities_df = spark.sql(""" | |
| SELECT | |
| municipality, | |
| state, | |
| topic, | |
| stance, | |
| urgency, | |
| meeting_date, | |
| recommended_action | |
| FROM oral_health.policy_analysis.advocacy_opportunities | |
| WHERE urgency IN ('critical', 'high') | |
| ORDER BY meeting_date DESC | |
| """) | |
| display(opportunities_df) | |
| # COMMAND ---------- | |
| # MAGIC %md | |
| # MAGIC ## Analyze by State | |
| # COMMAND ---------- | |
| # State-level aggregation | |
| state_summary = spark.sql(""" | |
| SELECT | |
| state, | |
| COUNT(DISTINCT opportunity_id) as total_opportunities, | |
| COUNT(DISTINCT CASE WHEN urgency = 'critical' THEN opportunity_id END) as critical, | |
| COUNT(DISTINCT CASE WHEN urgency = 'high' THEN opportunity_id END) as high, | |
| COUNT(DISTINCT municipality) as municipalities | |
| FROM oral_health.policy_analysis.advocacy_opportunities | |
| GROUP BY state | |
| ORDER BY total_opportunities DESC | |
| """) | |
| display(state_summary) | |
| # COMMAND ---------- | |
| # MAGIC %md | |
| # MAGIC ## Topic Analysis | |
| # COMMAND ---------- | |
| # Topic distribution | |
| topic_analysis = spark.sql(""" | |
| SELECT | |
| topic, | |
| COUNT(*) as count, | |
| AVG(CASE | |
| WHEN urgency = 'critical' THEN 4 | |
| WHEN urgency = 'high' THEN 3 | |
| WHEN urgency = 'medium' THEN 2 | |
| WHEN urgency = 'low' THEN 1 | |
| ELSE 0 | |
| END) as avg_urgency_score | |
| FROM oral_health.policy_analysis.advocacy_opportunities | |
| GROUP BY topic | |
| ORDER BY count DESC | |
| """) | |
| display(topic_analysis) | |
| # COMMAND ---------- | |
| # MAGIC %md | |
| # MAGIC ## Generate Visualizations | |
| # COMMAND ---------- | |
| # Convert to pandas for visualization | |
| opportunities_pd = opportunities_df.toPandas() | |
| # Create heatmap | |
| heatmap_gen = AdvocacyHeatmap() | |
| m = heatmap_gen.create_folium_map(opportunities_pd.to_dict('records')) | |
| # Display map | |
| displayHTML(m._repr_html_()) | |
| # COMMAND ---------- | |
| # MAGIC %md | |
| # MAGIC ## Example: Retrieve Advocacy Materials | |
| # COMMAND ---------- | |
| # Query generated advocacy materials | |
| materials = spark.sql(""" | |
| SELECT | |
| opportunity_id, | |
| municipality, | |
| state, | |
| topic, | |
| email_subject, | |
| email_body, | |
| talking_points | |
| FROM oral_health.policy_analysis.advocacy_materials | |
| WHERE state = 'CA' | |
| LIMIT 5 | |
| """) | |
| display(materials) | |
| # COMMAND ---------- | |
| # MAGIC %md | |
| # MAGIC ## Export Results | |
| # COMMAND ---------- | |
| # Export critical opportunities to CSV | |
| critical_opps = opportunities_df.filter("urgency = 'critical'") | |
| critical_opps.write.mode("overwrite").csv("/dbfs/oral-health-exports/critical-opportunities.csv") | |
| print("Critical opportunities exported successfully") | |