Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| Main entry point for running the Oral Health Policy Pulse system. | |
| """ | |
| import asyncio | |
| import sys | |
| from pathlib import Path | |
| from typing import List, Optional | |
| import click | |
| from loguru import logger | |
| # Add project root to path | |
| sys.path.insert(0, str(Path(__file__).parent)) | |
| from agents.orchestrator import OrchestratorAgent | |
| from agents.scraper import ScraperAgent | |
| from agents.parser import ParserAgent | |
| from agents.classifier import ClassifierAgent | |
| from agents.sentiment import SentimentAnalyzerAgent | |
| from agents.advocacy import AdvocacyWriterAgent | |
| from pipeline.delta_lake import DeltaLakePipeline | |
| # Lazy import for visualization to avoid requiring folium in local mode | |
| # from visualization.heatmap import AdvocacyHeatmap | |
| from config import settings | |
| # Configure logging | |
| logger.remove() | |
| logger.add( | |
| sys.stderr, | |
| format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan> - <level>{message}</level>", | |
| level=settings.log_level | |
| ) | |
| logger.add( | |
| settings.log_file, | |
| rotation="500 MB", | |
| retention="10 days", | |
| level=settings.log_level | |
| ) | |
| def cli(): | |
| """Oral Health Policy Pulse - Multi-agent policy analysis system.""" | |
| pass | |
| def serve(host: str, port: int, reload: bool): | |
| """Start the API server.""" | |
| import uvicorn | |
| from api.main import app | |
| logger.info(f"Starting API server on {host}:{port}") | |
| uvicorn.run( | |
| "api.main:app", | |
| host=host, | |
| port=port, | |
| reload=reload, | |
| workers=1 if reload else settings.api_workers | |
| ) | |
| def scrape(state: str, municipality: str, url: str, platform: str, max_events: int, start_year: int, | |
| include_social: bool, output: str): | |
| """Scrape meeting minutes from a single source.""" | |
| import json | |
| from datetime import datetime | |
| logger.info(f"Scraping {url} for {municipality}, {state}") | |
| async def run_scrape(): | |
| scraper = ScraperAgent() | |
| async with scraper: | |
| targets = [{ | |
| "url": url, | |
| "municipality": municipality, | |
| "state": state, | |
| "platform": platform, | |
| "max_events": max_events, | |
| "start_year": start_year, | |
| }] | |
| documents = await scraper._scrape_targets(targets, {}) | |
| if include_social: | |
| social_docs = await scraper.scrape_social_sources( | |
| municipality=municipality, | |
| state=state, | |
| seed_url=url, | |
| ) | |
| documents.extend(social_docs) | |
| logger.info(f"Scraped {len(social_docs)} social media documents") | |
| logger.info(f"Scraped {len(documents)} documents") | |
| # Save to pipeline | |
| pipeline = DeltaLakePipeline() | |
| pipeline.write_raw_documents(documents) | |
| # Persist to JSON | |
| out_path = output | |
| if not out_path: | |
| safe_name = (municipality or "unknown").replace(" ", "_").lower() | |
| out_dir = Path("output") / safe_name | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| out_path = str(out_dir / f"{platform}_{timestamp}.json") | |
| # Make metadata JSON-serializable | |
| serializable = [] | |
| for doc in documents: | |
| d = dict(doc) | |
| if "metadata" in d and isinstance(d["metadata"], dict): | |
| d["metadata"] = {k: (str(v) if not isinstance(v, (str, int, float, bool, type(None))) else v) | |
| for k, v in d["metadata"].items()} | |
| serializable.append(d) | |
| Path(out_path).write_text(json.dumps(serializable, indent=2, default=str)) | |
| logger.info(f"Saved {len(serializable)} documents to {out_path}") | |
| asyncio.run(run_scrape()) | |
| def analyze(targets_file: str): | |
| """Run full analysis pipeline on targets.""" | |
| import json | |
| logger.info(f"Starting analysis pipeline with targets from {targets_file}") | |
| # Load targets | |
| with open(targets_file, 'r') as f: | |
| targets = json.load(f) | |
| async def run_pipeline(): | |
| # Initialize orchestrator and agents | |
| orchestrator = OrchestratorAgent() | |
| orchestrator.register_agent(ScraperAgent()) | |
| orchestrator.register_agent(ParserAgent()) | |
| orchestrator.register_agent(ClassifierAgent()) | |
| orchestrator.register_agent(SentimentAnalyzerAgent()) | |
| orchestrator.register_agent(AdvocacyWriterAgent()) | |
| # Execute pipeline | |
| results = await orchestrator.execute_pipeline(targets) | |
| logger.info(f"Pipeline completed: {results}") | |
| asyncio.run(run_pipeline()) | |
| def generate_heatmap(output: str, urgency: Optional[str]): | |
| """Generate advocacy heatmap visualization.""" | |
| logger.info(f"Generating heatmap (urgency={urgency})") | |
| # Lazy import - only load when generating heatmap | |
| try: | |
| from visualization.heatmap import AdvocacyHeatmap | |
| except ImportError: | |
| click.echo("β Visualization dependencies not installed!") | |
| click.echo(" Install with: pip install folium plotly") | |
| return | |
| # Query opportunities | |
| pipeline = DeltaLakePipeline() | |
| opportunities = pipeline.query_opportunities_by_state(None, urgency) | |
| # Generate map | |
| heatmap = AdvocacyHeatmap() | |
| m = heatmap.create_folium_map(opportunities) | |
| # Save | |
| heatmap.export_map_html(m, output) | |
| logger.info(f"Heatmap saved to {output}") | |
| def init(): | |
| """Initialize the system (create database tables, etc.).""" | |
| logger.info("Initializing Oral Health Policy Pulse system") | |
| try: | |
| pipeline = DeltaLakePipeline() | |
| pipeline.initialize_tables() | |
| logger.info("System initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Initialization failed: {e}") | |
| sys.exit(1) | |
| def status(): | |
| """Show system status.""" | |
| logger.info("Checking system status") | |
| # Check configuration | |
| click.echo(f"Catalog: {settings.catalog_name}") | |
| click.echo(f"Schema: {settings.schema_name}") | |
| click.echo(f"Delta Lake Path: {settings.delta_lake_path}") | |
| click.echo(f"Log Level: {settings.log_level}") | |
| # Check connections | |
| click.echo("\nSystem Status: OK") | |
| def discover_jurisdictions(limit: Optional[int], state: Optional[str], jurisdiction_type: Optional[str]): | |
| """Run jurisdiction discovery pipeline to identify government websites.""" | |
| logger.info(f"Starting jurisdiction discovery (limit={limit}, state={state}, type={jurisdiction_type})") | |
| # Check for PySpark | |
| try: | |
| from discovery.discovery_pipeline import PYSPARK_AVAILABLE | |
| if not PYSPARK_AVAILABLE: | |
| click.echo("β PySpark not installed!") | |
| click.echo(" For full discovery with data storage, install:") | |
| click.echo(" pip install pyspark delta-spark") | |
| click.echo("") | |
| click.echo(" PySpark can run locally without Databricks for data processing.") | |
| return | |
| except ImportError: | |
| click.echo("β Discovery modules not available!") | |
| return | |
| async def run_discovery(): | |
| from discovery.discovery_pipeline import DiscoveryPipeline | |
| pipeline = DiscoveryPipeline() | |
| # Run full pipeline with optional filters | |
| results = await pipeline.run_full_pipeline( | |
| discovery_limit=limit, | |
| state_filter=state, | |
| type_filter=jurisdiction_type | |
| ) | |
| logger.info(f"Discovery completed: {results}") | |
| click.echo(f"\nβ Discovery Complete!") | |
| click.echo(f" Bronze records: {results.get('bronze_records', 0)}") | |
| click.echo(f" URLs discovered: {results.get('urls_discovered', 0)}") | |
| click.echo(f" Scraping targets: {results.get('scraping_targets', 0)}") | |
| asyncio.run(run_discovery()) | |
| def discovery_stats(): | |
| """Show jurisdiction discovery statistics.""" | |
| logger.info("Fetching discovery statistics") | |
| from pyspark.sql import SparkSession | |
| from pyspark.sql.functions import col, count, avg, when | |
| spark = SparkSession.builder.getOrCreate() | |
| click.echo("\nπ Jurisdiction Discovery Statistics\n") | |
| try: | |
| # Bronze layer stats | |
| bronze_df = spark.read.format("delta").load(f"{settings.delta_lake_path}/bronze/jurisdictions/unified") | |
| total_jurisdictions = bronze_df.count() | |
| click.echo(f"Bronze Layer (Raw Data):") | |
| click.echo(f" Total jurisdictions: {total_jurisdictions:,}") | |
| by_type = bronze_df.groupBy("jurisdiction_type").count().collect() | |
| for row in by_type: | |
| click.echo(f" - {row['jurisdiction_type']}: {row['count']:,}") | |
| # Silver layer stats | |
| silver_df = spark.read.format("delta").load(f"{settings.delta_lake_path}/silver/discovered_urls") | |
| urls_discovered = silver_df.count() | |
| homepages_found = silver_df.filter(col("homepage_url").isNotNull()).count() | |
| minutes_found = silver_df.filter(col("minutes_url").isNotNull()).count() | |
| avg_confidence = silver_df.select(avg("confidence_score")).collect()[0][0] | |
| click.echo(f"\nSilver Layer (Discovered URLs):") | |
| click.echo(f" Total discoveries: {urls_discovered:,}") | |
| click.echo(f" Homepages found: {homepages_found:,} ({homepages_found/urls_discovered*100:.1f}%)") | |
| click.echo(f" Minutes URLs found: {minutes_found:,} ({minutes_found/urls_discovered*100:.1f}%)") | |
| click.echo(f" Avg confidence: {avg_confidence:.2f}") | |
| # Gold layer stats | |
| gold_df = spark.read.format("delta").load(f"{settings.delta_lake_path}/gold/scraping_targets") | |
| scraping_targets = gold_df.count() | |
| high_priority = gold_df.filter(col("priority_score") > 150).count() | |
| click.echo(f"\nGold Layer (Scraping Targets):") | |
| click.echo(f" Total targets: {scraping_targets:,}") | |
| click.echo(f" High priority: {high_priority:,}") | |
| by_status = gold_df.groupBy("scraping_status").count().collect() | |
| for row in by_status: | |
| click.echo(f" - {row['scraping_status']}: {row['count']:,}") | |
| except Exception as e: | |
| click.echo(f"β Error: {e}") | |
| click.echo("(Run 'python main.py discover-jurisdictions' first)") | |
| def scrape_batch(source: str, limit: int, priority: int): | |
| """Scrape multiple sites in batch mode.""" | |
| logger.info(f"Starting batch scrape (source={source}, limit={limit}, priority={priority})") | |
| async def run_batch_scrape(): | |
| from pyspark.sql import SparkSession | |
| from pyspark.sql.functions import col | |
| spark = SparkSession.builder.getOrCreate() | |
| if source == 'discovered': | |
| # Load from gold scraping targets | |
| targets_df = spark.read.format("delta").load( | |
| f"{settings.delta_lake_path}/gold/scraping_targets" | |
| ).filter( | |
| (col("priority_score") >= priority) & | |
| (col("scraping_status") == "pending") | |
| ).limit(limit) | |
| targets = [ | |
| { | |
| "url": row.minutes_url, | |
| "municipality": row.jurisdiction_name, | |
| "state": row.state, | |
| "platform": row.cms_platform or "generic", | |
| "jurisdiction_id": row.jurisdiction_id | |
| } | |
| for row in targets_df.collect() | |
| ] | |
| else: | |
| click.echo("Manual source not yet implemented") | |
| return | |
| click.echo(f"Scraping {len(targets)} targets...") | |
| scraper = ScraperAgent() | |
| async with scraper: | |
| documents = await scraper._scrape_targets(targets, {}) | |
| logger.info(f"Scraped {len(documents)} documents") | |
| click.echo(f"β Scraped {len(documents)} documents") | |
| # Save to pipeline | |
| pipeline = DeltaLakePipeline() | |
| pipeline.write_raw_documents(documents) | |
| click.echo(f"β Documents saved to Delta Lake") | |
| asyncio.run(run_batch_scrape()) | |
| def publish_to_hf(dataset: str, private: bool, sample: bool): | |
| """ | |
| Publish datasets to HuggingFace Hub for sharing. | |
| Examples: | |
| python main.py publish-to-hf --dataset all | |
| python main.py publish-to-hf --dataset discovered-urls --private | |
| python main.py publish-to-hf --dataset census --sample | |
| """ | |
| from pipeline.huggingface_publisher import HuggingFacePublisher, HF_AVAILABLE | |
| if not HF_AVAILABLE: | |
| click.echo("β HuggingFace libraries not installed!") | |
| click.echo(" Install with: pip install datasets huggingface-hub") | |
| return | |
| click.echo("π Publishing datasets to HuggingFace Hub...") | |
| try: | |
| publisher = HuggingFacePublisher() | |
| if dataset == 'all': | |
| results = publisher.publish_all(private=private, sample_census=sample) | |
| click.echo("\nπ Published Datasets:") | |
| for name, info in results.items(): | |
| if "url" in info: | |
| click.echo(f" β {name}: {info['url']}") | |
| else: | |
| click.echo(f" β {name}: {info.get('error', 'Unknown error')}") | |
| elif dataset == 'census': | |
| result = publisher.publish_census_data(private=private, sample_size=1000 if sample else None) | |
| click.echo(f"β Published to: {result['url']}") | |
| elif dataset == 'gov-domains': | |
| result = publisher.publish_gov_domains(private=private) | |
| click.echo(f"β Published {result['records']:,} domains to: {result['url']}") | |
| elif dataset == 'nces-schools': | |
| result = publisher.publish_nces_schools(private=private) | |
| click.echo(f"β Published {result['records']:,} schools to: {result['url']}") | |
| elif dataset == 'discovered-urls': | |
| result = publisher.publish_discovered_urls(private=private) | |
| click.echo(f"β Published {result['records']:,} URLs to: {result['url']}") | |
| elif dataset == 'scraping-targets': | |
| result = publisher.publish_scraping_targets(private=private) | |
| click.echo(f"β Published {result['records']:,} targets to: {result['url']}") | |
| click.echo("\nπ Publishing complete!") | |
| except ValueError as e: | |
| click.echo(f"β Configuration error: {e}") | |
| click.echo(" Set HUGGINGFACE_TOKEN in .env file") | |
| except Exception as e: | |
| click.echo(f"β Publishing failed: {e}") | |
| logger.exception("HuggingFace publishing error") | |
| if __name__ == "__main__": | |
| cli() | |