ramanna's picture
Deploy: fix newsletter parsing for state list items and linked bills
98bf60c
"""
pipeline_status.py
------------------
Shared module for pipeline status tracking via a JSON file on disk.
Used by:
- update_data.py: calls start_pipeline(), start_step(), complete_step(), etc.
- Individual scripts: call update_progress(), log_error()
- pages/Admin.py: calls PipelineStatus.load() to poll status for UI rendering
"""
import json
import os
import tempfile
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
# Human-readable names for each pipeline script
SCRIPT_DISPLAY_NAMES = {
"data_updating_scripts/get_data.py": "Pull Data from LegiScan",
"data_updating_scripts/fix_pdf_bills.py": "Fix PDF Bill Texts",
"data_updating_scripts/known_bills_status.py": "Update Bill Statuses",
"data_updating_scripts/migrate_iapp_categories.py": "Classify IAPP Categories",
"data_updating_scripts/mark_no_text_bills.py": "Mark No-Text Bills",
"data_updating_scripts/generate_summaries.py": "Generate Summaries",
"data_updating_scripts/generate_suggested_questions.py": "Generate Suggested Questions",
"data_updating_scripts/generate_reports.py": "Generate Reports",
"data_updating_scripts/build_bills_vectorstore.py": "Build Bills Vectorstore",
"data_updating_scripts/eu_vectorstore.py": "Build EU AI Act Vectorstore",
"data_updating_scripts/detect_changes.py": "Detect Changes & Save Snapshot",
"data_updating_scripts/build_calendar.py": "Build Legislative Calendar",
"data_updating_scripts/generate_newsletter.py": "Generate Newsletter Draft",
}
STATUS_FILE = Path("data/pipeline_status.json")
HISTORY_FILE = Path("data/pipeline_run_history.json")
MAX_ERRORS_PER_STEP = 100
MAX_HISTORY_ENTRIES = 50
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
class PipelineStatus:
"""Manages pipeline status tracking via a shared JSON file on disk."""
def __init__(self, status_file: Optional[Path] = None):
self._status_file = status_file or STATUS_FILE
self._data: Dict[str, Any] = {}
# ── Atomic file I/O ──────────────────────────────────────────────
def _write(self) -> None:
"""Write current state to disk atomically."""
self._status_file.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(
dir=str(self._status_file.parent), suffix=".tmp"
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(self._data, f, indent=2, ensure_ascii=False)
os.replace(tmp_path, str(self._status_file))
except Exception:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
@staticmethod
def load(status_file: Optional[Path] = None) -> Optional[Dict[str, Any]]:
"""Read current status from disk (used by Admin.py polling loop)."""
path = status_file or STATUS_FILE
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return None
# ── Called by update_data.py (orchestrator) ──────────────────────
def start_pipeline(self, scripts: List[str], triggered_by: str = "cli") -> None:
"""Initialize a new pipeline run."""
now = _now_iso()
self._data = {
"run_id": now,
"started_at": now,
"finished_at": None,
"overall_status": "running",
"triggered_by": triggered_by,
"pid": os.getpid(),
"current_step_index": 0,
"total_steps": len(scripts),
"steps": [
{
"script": script,
"name": SCRIPT_DISPLAY_NAMES.get(script, script),
"status": "pending",
"started_at": None,
"finished_at": None,
"duration_seconds": None,
"progress": {"current": 0, "total": 0, "unit": "items"},
"errors": [],
"message": "",
}
for script in scripts
],
}
self._write()
def start_step(self, script: str) -> None:
"""Mark a script as running."""
step = self._find_step(script)
if step:
step["status"] = "running"
step["started_at"] = _now_iso()
step["message"] = "Starting..."
self._data["current_step_index"] = self._step_index(script)
self._write()
def complete_step(self, script: str, message: str = "") -> None:
"""Mark a script as completed."""
step = self._find_step(script)
if step:
now = _now_iso()
step["status"] = "completed"
step["finished_at"] = now
step["duration_seconds"] = self._calc_duration(step["started_at"])
step["message"] = message or "Completed successfully"
self._write()
def fail_step(self, script: str, message: str = "") -> None:
"""Mark a script as failed."""
step = self._find_step(script)
if step:
step["status"] = "error"
step["finished_at"] = _now_iso()
step["duration_seconds"] = self._calc_duration(step["started_at"])
step["message"] = message or "Failed"
self._write()
def skip_step(self, script: str, message: str = "") -> None:
"""Mark a step as skipped."""
step = self._find_step(script)
if step:
step["status"] = "skipped"
step["message"] = message or "Skipped"
self._write()
def complete_pipeline(self) -> None:
"""Mark the entire pipeline as complete and append to history."""
any_errors = any(s["status"] == "error" for s in self._data.get("steps", []))
self._data["finished_at"] = _now_iso()
self._data["overall_status"] = "completed_with_errors" if any_errors else "completed"
self._write()
self._append_to_history()
def fail_pipeline(self, message: str = "") -> None:
"""Mark the entire pipeline as failed and append to history."""
self._data["finished_at"] = _now_iso()
self._data["overall_status"] = "failed"
self._write()
self._append_to_history()
# ── Called by individual scripts (child subprocesses) ────────────
def update_progress(
self,
script: str,
current: int,
total: int,
unit: str = "bills",
message: str = "",
) -> None:
"""Update progress for the currently-running step.
Re-reads the file from disk since the script runs in a subprocess.
"""
data = self.load(self._status_file)
if data is None:
return
self._data = data
step = self._find_step(script)
if step:
step["progress"]["current"] = current
step["progress"]["total"] = total
step["progress"]["unit"] = unit
step["message"] = message or f"Processing {current}/{total} {unit}..."
self._write()
def log_error(
self,
script: str,
error: str,
bill_id: str = "",
bill_key: str = "",
) -> None:
"""Log an error for the currently-running step."""
data = self.load(self._status_file)
if data is None:
return
self._data = data
step = self._find_step(script)
if step:
if len(step["errors"]) < MAX_ERRORS_PER_STEP:
step["errors"].append({
"bill_id": str(bill_id),
"bill_key": bill_key,
"error": str(error)[:500],
"timestamp": _now_iso(),
})
self._write()
# ── Internal helpers ─────────────────────────────────────────────
def _find_step(self, script: str) -> Optional[Dict]:
for step in self._data.get("steps", []):
if step["script"] == script:
return step
return None
def _step_index(self, script: str) -> int:
for i, step in enumerate(self._data.get("steps", [])):
if step["script"] == script:
return i
return 0
@staticmethod
def _calc_duration(started_at: Optional[str]) -> Optional[float]:
if not started_at:
return None
try:
started = datetime.fromisoformat(started_at)
return round((datetime.now(timezone.utc) - started).total_seconds(), 1)
except Exception:
return None
def _append_to_history(self) -> None:
"""Append a summary of this run to the history file."""
try:
with open(HISTORY_FILE, "r", encoding="utf-8") as f:
history = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
history = []
steps = self._data.get("steps", [])
total_errors = sum(len(s.get("errors", [])) for s in steps)
total_bills = max(
(s.get("progress", {}).get("total", 0) for s in steps), default=0
)
started = datetime.fromisoformat(self._data["started_at"])
finished_str = self._data.get("finished_at")
finished = (
datetime.fromisoformat(finished_str)
if finished_str
else datetime.now(timezone.utc)
)
history.append({
"run_id": self._data.get("run_id"),
"started_at": self._data.get("started_at"),
"finished_at": self._data.get("finished_at"),
"duration_seconds": round((finished - started).total_seconds(), 1),
"overall_status": self._data.get("overall_status"),
"total_steps": self._data.get("total_steps"),
"steps_completed": sum(1 for s in steps if s["status"] == "completed"),
"steps_failed": sum(1 for s in steps if s["status"] == "error"),
"total_bills_processed": total_bills,
"total_errors": total_errors,
"triggered_by": self._data.get("triggered_by", "cli"),
})
# Cap history size
history = history[-MAX_HISTORY_ENTRIES:]
HISTORY_FILE.parent.mkdir(parents=True, exist_ok=True)
fd, tmp = tempfile.mkstemp(dir=str(HISTORY_FILE.parent), suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(history, f, indent=2, ensure_ascii=False)
os.replace(tmp, str(HISTORY_FILE))
except Exception:
try:
os.unlink(tmp)
except OSError:
pass