Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| Batch processing and quality metrics for large-scale jurisdiction scraping. | |
| Based on LocalView patterns for handling thousands of jurisdictions | |
| with quality tracking and failure management. | |
| """ | |
| from typing import Dict, List, Optional, Iterator | |
| from dataclasses import dataclass, asdict | |
| from datetime import datetime, timedelta | |
| from enum import Enum | |
| import json | |
| from pyspark.sql import SparkSession, DataFrame | |
| from pyspark.sql.functions import col, count, sum as spark_sum, avg, max as spark_max | |
| from loguru import logger | |
| from config.settings import settings | |
| class ScrapeStatus(Enum): | |
| """Status of scraping operation.""" | |
| PENDING = "pending" | |
| IN_PROGRESS = "in_progress" | |
| SUCCESS = "success" | |
| PARTIAL = "partial" # Some data retrieved | |
| FAILED = "failed" | |
| SKIPPED = "skipped" | |
| class HealthStatus(Enum): | |
| """Health status of a jurisdiction scraper.""" | |
| HEALTHY = "healthy" # No recent failures | |
| DEGRADED = "degraded" # Some failures | |
| FAILED = "failed" # Multiple consecutive failures | |
| UNKNOWN = "unknown" # Never scraped | |
| class JurisdictionQuality: | |
| """ | |
| LocalView pattern: Track data quality and completeness per jurisdiction. | |
| """ | |
| # Identification | |
| jurisdiction_name: str | |
| state_code: str | |
| fips_code: Optional[str] | |
| url: str | |
| platform: Optional[str] | |
| # Completeness metrics | |
| total_meetings_expected: int # Based on typical schedule | |
| total_meetings_found: int | |
| meetings_with_agendas: int | |
| meetings_with_minutes: int | |
| meetings_with_videos: int | |
| meetings_with_transcripts: int | |
| # Freshness | |
| last_scraped: Optional[datetime] | |
| last_meeting_found: Optional[datetime] | |
| scraping_frequency: str # 'daily', 'weekly', 'monthly' | |
| # Reliability | |
| consecutive_successes: int | |
| consecutive_failures: int | |
| total_scrapes: int | |
| successful_scrapes: int | |
| last_success: Optional[datetime] | |
| last_error: Optional[str] | |
| # Quality scores | |
| completeness_score: float # 0-100 | |
| reliability_score: float # 0-100 | |
| freshness_score: float # 0-100 | |
| overall_quality: float # 0-100 (weighted average) | |
| health_status: str # healthy, degraded, failed | |
| # Timestamps | |
| created_at: datetime | |
| updated_at: datetime | |
| def from_dict(cls, data: dict) -> 'JurisdictionQuality': | |
| """Create from dictionary with datetime parsing.""" | |
| # Parse datetime fields | |
| for field in ['last_scraped', 'last_meeting_found', 'last_success', 'created_at', 'updated_at']: | |
| if data.get(field) and isinstance(data[field], str): | |
| data[field] = datetime.fromisoformat(data[field]) | |
| return cls(**data) | |
| def to_dict(self) -> dict: | |
| """Convert to dictionary with datetime serialization.""" | |
| data = asdict(self) | |
| # Serialize datetime fields | |
| for field in ['last_scraped', 'last_meeting_found', 'last_success', 'created_at', 'updated_at']: | |
| if data.get(field): | |
| data[field] = data[field].isoformat() | |
| return data | |
| class BatchResult: | |
| """Result of processing a batch of jurisdictions.""" | |
| batch_number: int | |
| batch_size: int | |
| jurisdictions_processed: int | |
| jurisdictions_succeeded: int | |
| jurisdictions_failed: int | |
| meetings_found: int | |
| agendas_found: int | |
| minutes_found: int | |
| errors: List[dict] | |
| start_time: datetime | |
| end_time: Optional[datetime] = None | |
| duration_seconds: float = 0.0 | |
| def success_rate(self) -> float: | |
| """Percentage of jurisdictions successfully scraped.""" | |
| if self.jurisdictions_processed == 0: | |
| return 0.0 | |
| return (self.jurisdictions_succeeded / self.jurisdictions_processed) * 100 | |
| class BatchProcessor: | |
| """ | |
| LocalView pattern: Process large numbers of jurisdictions in batches. | |
| Features: | |
| - Batch processing with configurable size | |
| - Quality metrics per jurisdiction | |
| - Failure tracking and retry logic | |
| - Progress monitoring | |
| - Resume from interruption | |
| Example: | |
| >>> processor = BatchProcessor(batch_size=100) | |
| >>> for batch_result in processor.process_all_jurisdictions(): | |
| ... print(f"Batch {batch_result.batch_number}: " | |
| ... f"{batch_result.success_rate:.1f}% success") | |
| """ | |
| def __init__( | |
| self, | |
| spark: Optional[SparkSession] = None, | |
| batch_size: int = 100, | |
| max_failures: int = 3, | |
| retry_delay_hours: int = 24 | |
| ): | |
| """ | |
| Initialize batch processor. | |
| Args: | |
| spark: SparkSession (creates new if None) | |
| batch_size: Number of jurisdictions per batch | |
| max_failures: Max consecutive failures before marking as failed | |
| retry_delay_hours: Hours to wait before retrying failed jurisdictions | |
| """ | |
| self.spark = spark or self._create_spark_session() | |
| self.batch_size = batch_size | |
| self.max_failures = max_failures | |
| self.retry_delay_hours = retry_delay_hours | |
| self.quality_metrics_path = f"{settings.delta_lake_path}/quality/jurisdiction_metrics" | |
| self.batch_results_path = f"{settings.delta_lake_path}/quality/batch_results" | |
| def _create_spark_session(self) -> SparkSession: | |
| """Create SparkSession if not provided.""" | |
| from delta import configure_spark_with_delta_pip | |
| builder = SparkSession.builder \ | |
| .appName("BatchProcessor") \ | |
| .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ | |
| .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") | |
| return configure_spark_with_delta_pip(builder).getOrCreate() | |
| def process_all_jurisdictions( | |
| self, | |
| priority_filter: str = "high", | |
| resume_from_batch: Optional[int] = None | |
| ) -> Iterator[BatchResult]: | |
| """ | |
| Process all jurisdictions in batches. | |
| Args: | |
| priority_filter: Priority tier to process ('high', 'medium', 'low', 'all') | |
| resume_from_batch: Resume from specific batch number (for interruptions) | |
| Yields: | |
| BatchResult for each processed batch | |
| """ | |
| logger.info(f"Starting batch processing (batch_size={self.batch_size})") | |
| # Load targets from Gold layer | |
| targets_df = self.spark.read.format("delta").load( | |
| f"{settings.delta_lake_path}/gold/scraping_targets" | |
| ) | |
| # Filter by priority | |
| if priority_filter != "all": | |
| targets_df = targets_df.filter(col("priority_tier") == priority_filter) | |
| # Filter out recently failed jurisdictions | |
| quality_df = self._load_quality_metrics() | |
| if quality_df is not None: | |
| # Skip jurisdictions that failed recently and are within retry delay | |
| retry_cutoff = datetime.utcnow() - timedelta(hours=self.retry_delay_hours) | |
| retry_cutoff_str = retry_cutoff.isoformat() | |
| # Join with quality metrics and filter | |
| targets_df = targets_df.join( | |
| quality_df.select("url", "consecutive_failures", "last_scraped", "health_status"), | |
| on="url", | |
| how="left" | |
| ).filter( | |
| (col("consecutive_failures").isNull()) | # Never scraped | |
| (col("consecutive_failures") < self.max_failures) | # Not max failures | |
| (col("last_scraped") < retry_cutoff_str) # Retry delay passed | |
| ) | |
| # Order by priority score | |
| targets_df = targets_df.orderBy(col("priority_score").desc()) | |
| total_targets = targets_df.count() | |
| logger.info(f"Processing {total_targets} jurisdictions") | |
| # Calculate starting batch | |
| start_batch = resume_from_batch or 0 | |
| # Process in batches | |
| for batch_num in range(start_batch, (total_targets // self.batch_size) + 1): | |
| offset = batch_num * self.batch_size | |
| # Get batch | |
| batch_df = targets_df.offset(offset).limit(self.batch_size) | |
| batch_data = batch_df.collect() | |
| if not batch_data: | |
| break | |
| logger.info(f"Processing batch {batch_num + 1} ({len(batch_data)} jurisdictions)") | |
| # Process batch | |
| batch_result = self._process_batch(batch_num + 1, batch_data) | |
| # Save batch result | |
| self._save_batch_result(batch_result) | |
| # Update quality metrics | |
| # (In real implementation, this would be called after actual scraping) | |
| yield batch_result | |
| def _process_batch(self, batch_num: int, batch_data: List) -> BatchResult: | |
| """ | |
| Process a single batch of jurisdictions. | |
| Note: This is a skeleton. Actual scraping logic would go here. | |
| """ | |
| result = BatchResult( | |
| batch_number=batch_num, | |
| batch_size=len(batch_data), | |
| jurisdictions_processed=0, | |
| jurisdictions_succeeded=0, | |
| jurisdictions_failed=0, | |
| meetings_found=0, | |
| agendas_found=0, | |
| minutes_found=0, | |
| errors=[], | |
| start_time=datetime.utcnow() | |
| ) | |
| for row in batch_data: | |
| jurisdiction = row['jurisdiction_name'] | |
| url = row['url'] | |
| platform = row.get('platform') | |
| try: | |
| # TODO: Replace with actual scraping logic | |
| # For now, simulate scraping | |
| logger.info(f"Processing {jurisdiction}: {url}") | |
| # Placeholder: Would call appropriate scraper here | |
| # meetings = scrape_jurisdiction(url, platform) | |
| # Simulate success | |
| result.jurisdictions_processed += 1 | |
| result.jurisdictions_succeeded += 1 | |
| result.meetings_found += 5 # Placeholder | |
| result.agendas_found += 5 | |
| result.minutes_found += 3 | |
| except Exception as e: | |
| logger.error(f"Error processing {jurisdiction}: {e}") | |
| result.jurisdictions_processed += 1 | |
| result.jurisdictions_failed += 1 | |
| result.errors.append({ | |
| 'jurisdiction': jurisdiction, | |
| 'url': url, | |
| 'error': str(e) | |
| }) | |
| result.end_time = datetime.utcnow() | |
| result.duration_seconds = (result.end_time - result.start_time).total_seconds() | |
| return result | |
| def calculate_quality_metrics(self, jurisdiction_url: str) -> JurisdictionQuality: | |
| """ | |
| Calculate quality metrics for a jurisdiction. | |
| Args: | |
| jurisdiction_url: URL of the jurisdiction | |
| Returns: | |
| JurisdictionQuality object with all scores | |
| """ | |
| # Load existing metrics | |
| existing = self._get_existing_metrics(jurisdiction_url) | |
| # Load scraped data for this jurisdiction | |
| # (In production, query from silver/gold layers) | |
| # For now, create placeholder metrics | |
| now = datetime.utcnow() | |
| # Calculate completeness score | |
| if existing: | |
| total_expected = existing.total_meetings_expected or 12 # Assume monthly meetings | |
| total_found = existing.total_meetings_found or 0 | |
| with_agendas = existing.meetings_with_agendas or 0 | |
| with_minutes = existing.meetings_with_minutes or 0 | |
| found_rate = min(total_found / total_expected, 1.0) if total_expected > 0 else 0 | |
| agenda_rate = with_agendas / total_found if total_found > 0 else 0 | |
| minutes_rate = with_minutes / total_found if total_found > 0 else 0 | |
| completeness_score = ( | |
| found_rate * 40 + # 40%: Finding meetings | |
| agenda_rate * 30 + # 30%: Having agendas | |
| minutes_rate * 30 # 30%: Having minutes | |
| ) | |
| else: | |
| completeness_score = 0.0 | |
| # Calculate reliability score | |
| if existing: | |
| total_scrapes = existing.total_scrapes or 0 | |
| successful = existing.successful_scrapes or 0 | |
| reliability_score = (successful / total_scrapes * 100) if total_scrapes > 0 else 0 | |
| else: | |
| reliability_score = 0.0 | |
| # Calculate freshness score | |
| if existing and existing.last_scraped: | |
| days_since = (now - existing.last_scraped).days | |
| if days_since <= 1: | |
| freshness_score = 100 | |
| elif days_since <= 7: | |
| freshness_score = 80 | |
| elif days_since <= 30: | |
| freshness_score = 60 | |
| else: | |
| freshness_score = 40 | |
| else: | |
| freshness_score = 0.0 | |
| # Overall quality (weighted average) | |
| overall_quality = ( | |
| completeness_score * 0.5 + | |
| reliability_score * 0.3 + | |
| freshness_score * 0.2 | |
| ) | |
| # Determine health status | |
| consecutive_failures = existing.consecutive_failures if existing else 0 | |
| if consecutive_failures >= self.max_failures: | |
| health_status = HealthStatus.FAILED | |
| elif consecutive_failures >= 2: | |
| health_status = HealthStatus.DEGRADED | |
| elif reliability_score >= 70: | |
| health_status = HealthStatus.HEALTHY | |
| else: | |
| health_status = HealthStatus.UNKNOWN | |
| # Create metrics object | |
| metrics = JurisdictionQuality( | |
| jurisdiction_name=existing.jurisdiction_name if existing else "Unknown", | |
| state_code=existing.state_code if existing else "XX", | |
| fips_code=existing.fips_code if existing else None, | |
| url=jurisdiction_url, | |
| platform=existing.platform if existing else None, | |
| total_meetings_expected=existing.total_meetings_expected if existing else 12, | |
| total_meetings_found=existing.total_meetings_found if existing else 0, | |
| meetings_with_agendas=existing.meetings_with_agendas if existing else 0, | |
| meetings_with_minutes=existing.meetings_with_minutes if existing else 0, | |
| meetings_with_videos=existing.meetings_with_videos if existing else 0, | |
| meetings_with_transcripts=existing.meetings_with_transcripts if existing else 0, | |
| last_scraped=now, | |
| last_meeting_found=existing.last_meeting_found if existing else None, | |
| scraping_frequency=existing.scraping_frequency if existing else "monthly", | |
| consecutive_successes=existing.consecutive_successes if existing else 0, | |
| consecutive_failures=consecutive_failures, | |
| total_scrapes=existing.total_scrapes + 1 if existing else 1, | |
| successful_scrapes=existing.successful_scrapes if existing else 0, | |
| last_success=existing.last_success if existing else None, | |
| last_error=existing.last_error if existing else None, | |
| completeness_score=round(completeness_score, 2), | |
| reliability_score=round(reliability_score, 2), | |
| freshness_score=round(freshness_score, 2), | |
| overall_quality=round(overall_quality, 2), | |
| health_status=health_status.value, | |
| created_at=existing.created_at if existing else now, | |
| updated_at=now | |
| ) | |
| return metrics | |
| def _get_existing_metrics(self, url: str) -> Optional[JurisdictionQuality]: | |
| """Load existing metrics for a jurisdiction.""" | |
| try: | |
| df = self.spark.read.format("delta").load(self.quality_metrics_path) | |
| result = df.filter(col("url") == url).first() | |
| if result: | |
| return JurisdictionQuality.from_dict(result.asDict()) | |
| except Exception: | |
| pass | |
| return None | |
| def _load_quality_metrics(self) -> Optional[DataFrame]: | |
| """Load all quality metrics.""" | |
| try: | |
| return self.spark.read.format("delta").load(self.quality_metrics_path) | |
| except Exception: | |
| return None | |
| def _save_batch_result(self, result: BatchResult): | |
| """Save batch result to Delta Lake.""" | |
| # Convert to DataFrame | |
| data = [{ | |
| 'batch_number': result.batch_number, | |
| 'batch_size': result.batch_size, | |
| 'jurisdictions_processed': result.jurisdictions_processed, | |
| 'jurisdictions_succeeded': result.jurisdictions_succeeded, | |
| 'jurisdictions_failed': result.jurisdictions_failed, | |
| 'meetings_found': result.meetings_found, | |
| 'agendas_found': result.agendas_found, | |
| 'minutes_found': result.minutes_found, | |
| 'success_rate': result.success_rate, | |
| 'duration_seconds': result.duration_seconds, | |
| 'start_time': result.start_time.isoformat(), | |
| 'end_time': result.end_time.isoformat() if result.end_time else None, | |
| 'errors': json.dumps(result.errors) | |
| }] | |
| df = self.spark.createDataFrame(data) | |
| # Write to Delta Lake | |
| df.write \ | |
| .format("delta") \ | |
| .mode("append") \ | |
| .save(self.batch_results_path) | |
| logger.info(f"Saved batch result {result.batch_number} to Delta Lake") | |
| def get_system_health_report(self) -> dict: | |
| """ | |
| Generate overall system health report. | |
| Returns: | |
| Dictionary with aggregate statistics | |
| """ | |
| quality_df = self._load_quality_metrics() | |
| if quality_df is None: | |
| return { | |
| 'status': 'no_data', | |
| 'message': 'No quality metrics available yet' | |
| } | |
| # Aggregate statistics | |
| stats = quality_df.agg( | |
| count("*").alias("total_jurisdictions"), | |
| avg("overall_quality").alias("avg_quality"), | |
| avg("completeness_score").alias("avg_completeness"), | |
| avg("reliability_score").alias("avg_reliability"), | |
| spark_sum((col("health_status") == "healthy").cast("int")).alias("healthy_count"), | |
| spark_sum((col("health_status") == "degraded").cast("int")).alias("degraded_count"), | |
| spark_sum((col("health_status") == "failed").cast("int")).alias("failed_count") | |
| ).first() | |
| return { | |
| 'total_jurisdictions': stats['total_jurisdictions'], | |
| 'average_quality': round(stats['avg_quality'], 2), | |
| 'average_completeness': round(stats['avg_completeness'], 2), | |
| 'average_reliability': round(stats['avg_reliability'], 2), | |
| 'healthy_count': stats['healthy_count'], | |
| 'degraded_count': stats['degraded_count'], | |
| 'failed_count': stats['failed_count'], | |
| 'health_percentage': round( | |
| (stats['healthy_count'] / stats['total_jurisdictions']) * 100, 1 | |
| ) if stats['total_jurisdictions'] > 0 else 0 | |
| } | |
| if __name__ == "__main__": | |
| # Demo | |
| processor = BatchProcessor(batch_size=10) | |
| print("🔄 Batch Processing Demo") | |
| print("=" * 70) | |
| print("\nThis would process jurisdictions in batches with quality tracking.") | |
| print("\nExample batch results:\n") | |
| # Simulate processing (would normally call process_all_jurisdictions) | |
| for i in range(3): | |
| result = BatchResult( | |
| batch_number=i + 1, | |
| batch_size=10, | |
| jurisdictions_processed=10, | |
| jurisdictions_succeeded=8, | |
| jurisdictions_failed=2, | |
| meetings_found=45, | |
| agendas_found=40, | |
| minutes_found=30, | |
| errors=[], | |
| start_time=datetime.utcnow(), | |
| end_time=datetime.utcnow() + timedelta(minutes=5), | |
| duration_seconds=300 | |
| ) | |
| print(f"Batch {result.batch_number}:") | |
| print(f" Processed: {result.jurisdictions_processed}") | |
| print(f" Success rate: {result.success_rate:.1f}%") | |
| print(f" Meetings found: {result.meetings_found}") | |
| print(f" Duration: {result.duration_seconds:.0f}s") | |
| print() | |
| print("📊 System health tracking:") | |
| print(" • Quality scores per jurisdiction") | |
| print(" • Completeness, reliability, freshness metrics") | |
| print(" • Health status: healthy, degraded, failed") | |
| print(" • Automatic retry with exponential backoff") | |