Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| pipeline_status.py | |
| ------------------ | |
| Shared module for pipeline status tracking via a JSON file on disk. | |
| Used by: | |
| - update_data.py: calls start_pipeline(), start_step(), complete_step(), etc. | |
| - Individual scripts: call update_progress(), log_error() | |
| - pages/Admin.py: calls PipelineStatus.load() to poll status for UI rendering | |
| """ | |
| import json | |
| import os | |
| import tempfile | |
| import time | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| # Human-readable names for each pipeline script | |
| SCRIPT_DISPLAY_NAMES = { | |
| "data_updating_scripts/get_data.py": "Pull Data from LegiScan", | |
| "data_updating_scripts/fix_pdf_bills.py": "Fix PDF Bill Texts", | |
| "data_updating_scripts/known_bills_status.py": "Update Bill Statuses", | |
| "data_updating_scripts/migrate_iapp_categories.py": "Classify IAPP Categories", | |
| "data_updating_scripts/mark_no_text_bills.py": "Mark No-Text Bills", | |
| "data_updating_scripts/generate_summaries.py": "Generate Summaries", | |
| "data_updating_scripts/generate_suggested_questions.py": "Generate Suggested Questions", | |
| "data_updating_scripts/generate_reports.py": "Generate Reports", | |
| "data_updating_scripts/build_bills_vectorstore.py": "Build Bills Vectorstore", | |
| "data_updating_scripts/eu_vectorstore.py": "Build EU AI Act Vectorstore", | |
| "data_updating_scripts/detect_changes.py": "Detect Changes & Save Snapshot", | |
| "data_updating_scripts/build_calendar.py": "Build Legislative Calendar", | |
| "data_updating_scripts/generate_newsletter.py": "Generate Newsletter Draft", | |
| } | |
| STATUS_FILE = Path("data/pipeline_status.json") | |
| HISTORY_FILE = Path("data/pipeline_run_history.json") | |
| MAX_ERRORS_PER_STEP = 100 | |
| MAX_HISTORY_ENTRIES = 50 | |
| def _now_iso() -> str: | |
| return datetime.now(timezone.utc).isoformat() | |
| class PipelineStatus: | |
| """Manages pipeline status tracking via a shared JSON file on disk.""" | |
| def __init__(self, status_file: Optional[Path] = None): | |
| self._status_file = status_file or STATUS_FILE | |
| self._data: Dict[str, Any] = {} | |
| # ββ Atomic file I/O ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _write(self) -> None: | |
| """Write current state to disk atomically.""" | |
| self._status_file.parent.mkdir(parents=True, exist_ok=True) | |
| fd, tmp_path = tempfile.mkstemp( | |
| dir=str(self._status_file.parent), suffix=".tmp" | |
| ) | |
| try: | |
| with os.fdopen(fd, "w", encoding="utf-8") as f: | |
| json.dump(self._data, f, indent=2, ensure_ascii=False) | |
| os.replace(tmp_path, str(self._status_file)) | |
| except Exception: | |
| try: | |
| os.unlink(tmp_path) | |
| except OSError: | |
| pass | |
| raise | |
| def load(status_file: Optional[Path] = None) -> Optional[Dict[str, Any]]: | |
| """Read current status from disk (used by Admin.py polling loop).""" | |
| path = status_file or STATUS_FILE | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except (FileNotFoundError, json.JSONDecodeError): | |
| return None | |
| # ββ Called by update_data.py (orchestrator) ββββββββββββββββββββββ | |
| def start_pipeline(self, scripts: List[str], triggered_by: str = "cli") -> None: | |
| """Initialize a new pipeline run.""" | |
| now = _now_iso() | |
| self._data = { | |
| "run_id": now, | |
| "started_at": now, | |
| "finished_at": None, | |
| "overall_status": "running", | |
| "triggered_by": triggered_by, | |
| "pid": os.getpid(), | |
| "current_step_index": 0, | |
| "total_steps": len(scripts), | |
| "steps": [ | |
| { | |
| "script": script, | |
| "name": SCRIPT_DISPLAY_NAMES.get(script, script), | |
| "status": "pending", | |
| "started_at": None, | |
| "finished_at": None, | |
| "duration_seconds": None, | |
| "progress": {"current": 0, "total": 0, "unit": "items"}, | |
| "errors": [], | |
| "message": "", | |
| } | |
| for script in scripts | |
| ], | |
| } | |
| self._write() | |
| def start_step(self, script: str) -> None: | |
| """Mark a script as running.""" | |
| step = self._find_step(script) | |
| if step: | |
| step["status"] = "running" | |
| step["started_at"] = _now_iso() | |
| step["message"] = "Starting..." | |
| self._data["current_step_index"] = self._step_index(script) | |
| self._write() | |
| def complete_step(self, script: str, message: str = "") -> None: | |
| """Mark a script as completed.""" | |
| step = self._find_step(script) | |
| if step: | |
| now = _now_iso() | |
| step["status"] = "completed" | |
| step["finished_at"] = now | |
| step["duration_seconds"] = self._calc_duration(step["started_at"]) | |
| step["message"] = message or "Completed successfully" | |
| self._write() | |
| def fail_step(self, script: str, message: str = "") -> None: | |
| """Mark a script as failed.""" | |
| step = self._find_step(script) | |
| if step: | |
| step["status"] = "error" | |
| step["finished_at"] = _now_iso() | |
| step["duration_seconds"] = self._calc_duration(step["started_at"]) | |
| step["message"] = message or "Failed" | |
| self._write() | |
| def skip_step(self, script: str, message: str = "") -> None: | |
| """Mark a step as skipped.""" | |
| step = self._find_step(script) | |
| if step: | |
| step["status"] = "skipped" | |
| step["message"] = message or "Skipped" | |
| self._write() | |
| def complete_pipeline(self) -> None: | |
| """Mark the entire pipeline as complete and append to history.""" | |
| any_errors = any(s["status"] == "error" for s in self._data.get("steps", [])) | |
| self._data["finished_at"] = _now_iso() | |
| self._data["overall_status"] = "completed_with_errors" if any_errors else "completed" | |
| self._write() | |
| self._append_to_history() | |
| def fail_pipeline(self, message: str = "") -> None: | |
| """Mark the entire pipeline as failed and append to history.""" | |
| self._data["finished_at"] = _now_iso() | |
| self._data["overall_status"] = "failed" | |
| self._write() | |
| self._append_to_history() | |
| # ββ Called by individual scripts (child subprocesses) ββββββββββββ | |
| def update_progress( | |
| self, | |
| script: str, | |
| current: int, | |
| total: int, | |
| unit: str = "bills", | |
| message: str = "", | |
| ) -> None: | |
| """Update progress for the currently-running step. | |
| Re-reads the file from disk since the script runs in a subprocess. | |
| """ | |
| data = self.load(self._status_file) | |
| if data is None: | |
| return | |
| self._data = data | |
| step = self._find_step(script) | |
| if step: | |
| step["progress"]["current"] = current | |
| step["progress"]["total"] = total | |
| step["progress"]["unit"] = unit | |
| step["message"] = message or f"Processing {current}/{total} {unit}..." | |
| self._write() | |
| def log_error( | |
| self, | |
| script: str, | |
| error: str, | |
| bill_id: str = "", | |
| bill_key: str = "", | |
| ) -> None: | |
| """Log an error for the currently-running step.""" | |
| data = self.load(self._status_file) | |
| if data is None: | |
| return | |
| self._data = data | |
| step = self._find_step(script) | |
| if step: | |
| if len(step["errors"]) < MAX_ERRORS_PER_STEP: | |
| step["errors"].append({ | |
| "bill_id": str(bill_id), | |
| "bill_key": bill_key, | |
| "error": str(error)[:500], | |
| "timestamp": _now_iso(), | |
| }) | |
| self._write() | |
| # ββ Internal helpers βββββββββββββββββββββββββββββββββββββββββββββ | |
| def _find_step(self, script: str) -> Optional[Dict]: | |
| for step in self._data.get("steps", []): | |
| if step["script"] == script: | |
| return step | |
| return None | |
| def _step_index(self, script: str) -> int: | |
| for i, step in enumerate(self._data.get("steps", [])): | |
| if step["script"] == script: | |
| return i | |
| return 0 | |
| def _calc_duration(started_at: Optional[str]) -> Optional[float]: | |
| if not started_at: | |
| return None | |
| try: | |
| started = datetime.fromisoformat(started_at) | |
| return round((datetime.now(timezone.utc) - started).total_seconds(), 1) | |
| except Exception: | |
| return None | |
| def _append_to_history(self) -> None: | |
| """Append a summary of this run to the history file.""" | |
| try: | |
| with open(HISTORY_FILE, "r", encoding="utf-8") as f: | |
| history = json.load(f) | |
| except (FileNotFoundError, json.JSONDecodeError): | |
| history = [] | |
| steps = self._data.get("steps", []) | |
| total_errors = sum(len(s.get("errors", [])) for s in steps) | |
| total_bills = max( | |
| (s.get("progress", {}).get("total", 0) for s in steps), default=0 | |
| ) | |
| started = datetime.fromisoformat(self._data["started_at"]) | |
| finished_str = self._data.get("finished_at") | |
| finished = ( | |
| datetime.fromisoformat(finished_str) | |
| if finished_str | |
| else datetime.now(timezone.utc) | |
| ) | |
| history.append({ | |
| "run_id": self._data.get("run_id"), | |
| "started_at": self._data.get("started_at"), | |
| "finished_at": self._data.get("finished_at"), | |
| "duration_seconds": round((finished - started).total_seconds(), 1), | |
| "overall_status": self._data.get("overall_status"), | |
| "total_steps": self._data.get("total_steps"), | |
| "steps_completed": sum(1 for s in steps if s["status"] == "completed"), | |
| "steps_failed": sum(1 for s in steps if s["status"] == "error"), | |
| "total_bills_processed": total_bills, | |
| "total_errors": total_errors, | |
| "triggered_by": self._data.get("triggered_by", "cli"), | |
| }) | |
| # Cap history size | |
| history = history[-MAX_HISTORY_ENTRIES:] | |
| HISTORY_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| fd, tmp = tempfile.mkstemp(dir=str(HISTORY_FILE.parent), suffix=".tmp") | |
| try: | |
| with os.fdopen(fd, "w", encoding="utf-8") as f: | |
| json.dump(history, f, indent=2, ensure_ascii=False) | |
| os.replace(tmp, str(HISTORY_FILE)) | |
| except Exception: | |
| try: | |
| os.unlink(tmp) | |
| except OSError: | |
| pass | |