| from __future__ import annotations |
|
|
| import typer |
| from sqlalchemy import select |
|
|
| from pots_shutdown_tracker.application import create_app |
| from pots_shutdown_tracker.jobs.scheduler import run_connector_job |
| from pots_shutdown_tracker.models.entities import RawNotice |
| from pots_shutdown_tracker.services.bulk_lookup import cleanup_expired_jobs |
| from pots_shutdown_tracker.services.crawler import ( |
| refresh_active_notice_window, |
| refresh_embeddings_for_all, |
| sanitize_existing_notice_dates, |
| ) |
| from pots_shutdown_tracker.services.parser import reparse_failed_notices, reparse_notices_for_source |
| from pots_shutdown_tracker.services.storage import ( |
| HfStorageVisibilityError, |
| ensure_hf_storage_repo, |
| get_storage_backend, |
| normalize_blob_reference, |
| upload_storage_folder_to_hf_dataset, |
| ) |
| from pots_shutdown_tracker.utils.locks import MaintenanceLockError, maintenance_lock |
|
|
| cli = typer.Typer(help="POTS shutdown tracker maintenance commands.") |
|
|
|
|
| @cli.command("backfill") |
| def backfill( |
| source: str | None = typer.Option( |
| default=None, help="Optional source code to run, e.g. att, verizon, frontier, fcc." |
| ), |
| ) -> None: |
| app = create_app() |
| try: |
| with app.state.SessionLocal() as session: |
| run_connector_job(app, session=session, source_codes=[source] if source else None) |
| except MaintenanceLockError as exc: |
| typer.echo(str(exc)) |
| raise typer.Exit(code=1) from exc |
| typer.echo("Backfill completed.") |
|
|
|
|
| @cli.command("reparse-failures") |
| def reparse_failures( |
| source: str | None = typer.Option(default=None, help="Optional source code to fully reparse, e.g. lumen or fcc."), |
| ) -> None: |
| app = create_app() |
| total_logged = {"count": 0} |
|
|
| def report_progress(processed: int, total: int) -> None: |
| if total <= 0 or processed == total_logged["count"]: |
| return |
| total_logged["count"] = processed |
| typer.echo(f"Reparsed {processed}/{total} {source or 'candidate'} notices...") |
|
|
| lock_name = f"reparse-{source.lower()}" if source else "reparse-failures" |
| try: |
| with maintenance_lock(app.state.settings.storage_path, lock_name): |
| with app.state.SessionLocal() as session: |
| reparsed = ( |
| reparse_notices_for_source( |
| session, |
| app.state.settings, |
| source, |
| batch_size=10, |
| commit_each_batch=True, |
| progress_callback=report_progress, |
| ) |
| if source |
| else reparse_failed_notices(session, app.state.settings) |
| ) |
| session.commit() |
| except MaintenanceLockError as exc: |
| typer.echo(str(exc)) |
| raise typer.Exit(code=1) from exc |
| typer.echo(f"Reparse completed. {reparsed} notices refreshed.") |
|
|
|
|
| @cli.command("refresh-embeddings") |
| def refresh_embeddings() -> None: |
| app = create_app() |
| try: |
| with app.state.SessionLocal() as session: |
| refresh_embeddings_for_all(session, app.state.settings) |
| session.commit() |
| except RuntimeError as exc: |
| typer.echo(str(exc)) |
| raise typer.Exit(code=1) from exc |
| typer.echo("Embedding refresh completed.") |
|
|
|
|
| @cli.command("reapply-active-window") |
| def reapply_active_window() -> None: |
| app = create_app() |
| with app.state.SessionLocal() as session: |
| results = refresh_active_notice_window(session, app.state.settings) |
| session.commit() |
| typer.echo( |
| "Active corpus window reapplied. " |
| f"{results['updated']} notices updated, " |
| f"{results['activated']} activated, " |
| f"{results['archived']} archived." |
| ) |
|
|
|
|
| @cli.command("sanitize-notice-dates") |
| def sanitize_notice_dates() -> None: |
| app = create_app() |
| with app.state.SessionLocal() as session: |
| results = sanitize_existing_notice_dates(session, app.state.settings) |
| session.commit() |
| typer.echo( |
| "Notice date sanitization completed. " |
| f"{results['updated']} notices updated, " |
| f"{results['cleared_date_fields']} date fields cleared, " |
| f"{results['activated']} activated, " |
| f"{results['archived']} archived." |
| ) |
|
|
|
|
| @cli.command("cleanup-bulk-lookup-jobs") |
| def cleanup_bulk_lookup_jobs() -> None: |
| app = create_app() |
| with app.state.SessionLocal() as session: |
| results = cleanup_expired_jobs(session, app.state.settings, get_storage_backend(app.state.settings)) |
| session.commit() |
| typer.echo( |
| f"Bulk lookup cleanup completed. {results['swept']} jobs expired, {results['blobs_deleted']} blobs deleted." |
| ) |
|
|
|
|
| @cli.command("sync-storage-to-hf-dataset") |
| def sync_storage_to_hf_dataset( |
| private_repo: bool = typer.Option(True, help="Create the dataset repo as private if it does not exist."), |
| ) -> None: |
| app = create_app() |
| settings = app.state.settings |
| if not settings.hf_storage_enabled: |
| typer.echo("Set POTS_TRACKER_STORAGE_BACKEND=huggingface_dataset before syncing storage.") |
| raise typer.Exit(code=1) |
| if not settings.hf_storage_repo_id: |
| typer.echo("Set POTS_TRACKER_HF_STORAGE_REPO_ID before syncing storage.") |
| raise typer.Exit(code=1) |
|
|
| uploaded = 0 |
| normalized = 0 |
| missing = 0 |
|
|
| try: |
| with maintenance_lock(settings.storage_path, "sync-storage"): |
| ensure_hf_storage_repo(settings, private=private_repo) |
| upload_storage_folder_to_hf_dataset(settings, commit_message="Sync local storage archive") |
| with app.state.SessionLocal() as session: |
| notices = session.scalars( |
| select(RawNotice).where(RawNotice.raw_blob_path.is_not(None)).order_by(RawNotice.id) |
| ).all() |
| for raw_notice in notices: |
| target_reference = normalize_blob_reference( |
| raw_notice.raw_blob_path, |
| prefix=settings.hf_storage_path_prefix, |
| ) |
| if not target_reference: |
| missing += 1 |
| continue |
| uploaded += 1 |
| if raw_notice.raw_blob_path != target_reference: |
| raw_notice.raw_blob_path = target_reference |
| normalized += 1 |
| session.commit() |
| except HfStorageVisibilityError as exc: |
| typer.echo(str(exc)) |
| raise typer.Exit(code=1) from exc |
| except MaintenanceLockError as exc: |
| typer.echo(str(exc)) |
| raise typer.Exit(code=1) from exc |
|
|
| typer.echo( |
| "HF dataset storage sync completed. " |
| f"{uploaded} blobs uploaded or refreshed, " |
| f"{normalized} blob references normalized, " |
| f"{missing} missing." |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| cli() |
|
|