| """ |
| Ingestion Statistics Tracking |
| Monitors ingestion performance, duplicate rates, and error rates over time |
| """ |
|
|
| from datetime import datetime |
| from typing import Dict, List, Optional |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class IngestionMetrics: |
| """Track ingestion metrics over time""" |
| |
| def __init__(self): |
| self.runs: List[Dict] = [] |
| self.total_fetched = 0 |
| self.total_saved = 0 |
| self.total_duplicates = 0 |
| self.total_errors = 0 |
| self.last_run_timestamp: Optional[datetime] = None |
| |
| def record_run( |
| self, |
| fetched: int, |
| saved: int, |
| duplicates: int, |
| errors: int, |
| categories_processed: int |
| ): |
| """Record metrics from an ingestion run""" |
| duplicate_rate = (duplicates / fetched * 100) if fetched > 0 else 0 |
| error_rate = (errors / fetched * 100) if fetched > 0 else 0 |
| |
| run_data = { |
| "timestamp": datetime.now().isoformat(), |
| "fetched": fetched, |
| "saved": saved, |
| "duplicates": duplicates, |
| "errors": errors, |
| "duplicate_rate": round(duplicate_rate, 2), |
| "error_rate": round(error_rate, 2), |
| "categories_processed": categories_processed |
| } |
| |
| self.runs.append(run_data) |
| |
| |
| if len(self.runs) > 100: |
| self.runs = self.runs[-100:] |
| |
| |
| self.total_fetched += fetched |
| self.total_saved += saved |
| self.total_duplicates += duplicates |
| self.total_errors += errors |
| self.last_run_timestamp = datetime.now() |
| |
| logger.info(f"📊 Ingestion run recorded: {saved}/{fetched} saved ({duplicate_rate:.1f}% duplicates)") |
| |
| def get_stats(self) -> Dict: |
| """Get current ingestion statistics""" |
| avg_duplicate_rate = 0.0 |
| avg_error_rate = 0.0 |
| |
| if len(self.runs) > 0: |
| avg_duplicate_rate = sum(r["duplicate_rate"] for r in self.runs) / len(self.runs) |
| avg_error_rate = sum(r["error_rate"] for r in self.runs) / len(self.runs) |
| |
| return { |
| "total_runs": len(self.runs), |
| "last_run": self.last_run_timestamp.isoformat() if self.last_run_timestamp else None, |
| "lifetime_totals": { |
| "fetched": self.total_fetched, |
| "saved": self.total_saved, |
| "duplicates": self.total_duplicates, |
| "errors": self.total_errors |
| }, |
| "averages": { |
| "duplicate_rate": round(avg_duplicate_rate, 2), |
| "error_rate": round(avg_error_rate, 2) |
| }, |
| "recent_runs": self.runs[-10:] |
| } |
| |
| def check_alerts(self) -> List[Dict]: |
| """Check if any metrics exceed thresholds""" |
| alerts = [] |
| |
| if len(self.runs) == 0: |
| return alerts |
| |
| latest_run = self.runs[-1] |
| |
| |
| if latest_run["duplicate_rate"] > 90: |
| alerts.append({ |
| "severity": "warning", |
| "type": "high_duplicate_rate", |
| "message": f"Duplicate rate is {latest_run['duplicate_rate']}% (threshold: 90%)", |
| "value": latest_run["duplicate_rate"] |
| }) |
| |
| |
| if latest_run["error_rate"] > 20: |
| alerts.append({ |
| "severity": "error", |
| "type": "high_error_rate", |
| "message": f"Error rate is {latest_run['error_rate']}% (threshold: 20%)", |
| "value": latest_run["error_rate"] |
| }) |
| |
| |
| if latest_run["saved"] == 0 and latest_run["fetched"] > 0: |
| alerts.append({ |
| "severity": "critical", |
| "type": "no_articles_saved", |
| "message": f"No articles saved despite {latest_run['fetched']} fetched", |
| "value": 0 |
| }) |
| |
| return alerts |
|
|
|
|
| |
| _ingestion_metrics: Optional[IngestionMetrics] = None |
|
|
|
|
| def get_ingestion_metrics() -> IngestionMetrics: |
| """Get or create global ingestion metrics instance""" |
| global _ingestion_metrics |
| |
| if _ingestion_metrics is None: |
| _ingestion_metrics = IngestionMetrics() |
| |
| return _ingestion_metrics |
|
|