github-actions
Deploy e9638c4ddc3ed29a18779b38f43922aa3139b311
611bfd9
from __future__ import annotations
import typer
from sqlalchemy import select
from pots_shutdown_tracker.application import create_app
from pots_shutdown_tracker.jobs.scheduler import run_connector_job
from pots_shutdown_tracker.models.entities import RawNotice
from pots_shutdown_tracker.services.bulk_lookup import cleanup_expired_jobs
from pots_shutdown_tracker.services.crawler import (
refresh_active_notice_window,
refresh_embeddings_for_all,
sanitize_existing_notice_dates,
)
from pots_shutdown_tracker.services.parser import reparse_failed_notices, reparse_notices_for_source
from pots_shutdown_tracker.services.storage import (
HfStorageVisibilityError,
ensure_hf_storage_repo,
get_storage_backend,
normalize_blob_reference,
upload_storage_folder_to_hf_dataset,
)
from pots_shutdown_tracker.utils.locks import MaintenanceLockError, maintenance_lock
cli = typer.Typer(help="POTS shutdown tracker maintenance commands.")
@cli.command("backfill")
def backfill(
source: str | None = typer.Option(
default=None, help="Optional source code to run, e.g. att, verizon, frontier, fcc."
),
) -> None:
app = create_app()
try:
with app.state.SessionLocal() as session:
run_connector_job(app, session=session, source_codes=[source] if source else None)
except MaintenanceLockError as exc:
typer.echo(str(exc))
raise typer.Exit(code=1) from exc
typer.echo("Backfill completed.")
@cli.command("reparse-failures")
def reparse_failures(
source: str | None = typer.Option(default=None, help="Optional source code to fully reparse, e.g. lumen or fcc."),
) -> None:
app = create_app()
total_logged = {"count": 0}
def report_progress(processed: int, total: int) -> None:
if total <= 0 or processed == total_logged["count"]:
return
total_logged["count"] = processed
typer.echo(f"Reparsed {processed}/{total} {source or 'candidate'} notices...")
lock_name = f"reparse-{source.lower()}" if source else "reparse-failures"
try:
with maintenance_lock(app.state.settings.storage_path, lock_name):
with app.state.SessionLocal() as session:
reparsed = (
reparse_notices_for_source(
session,
app.state.settings,
source,
batch_size=10,
commit_each_batch=True,
progress_callback=report_progress,
)
if source
else reparse_failed_notices(session, app.state.settings)
)
session.commit()
except MaintenanceLockError as exc:
typer.echo(str(exc))
raise typer.Exit(code=1) from exc
typer.echo(f"Reparse completed. {reparsed} notices refreshed.")
@cli.command("refresh-embeddings")
def refresh_embeddings() -> None:
app = create_app()
try:
with app.state.SessionLocal() as session:
refresh_embeddings_for_all(session, app.state.settings)
session.commit()
except RuntimeError as exc:
typer.echo(str(exc))
raise typer.Exit(code=1) from exc
typer.echo("Embedding refresh completed.")
@cli.command("reapply-active-window")
def reapply_active_window() -> None:
app = create_app()
with app.state.SessionLocal() as session:
results = refresh_active_notice_window(session, app.state.settings)
session.commit()
typer.echo(
"Active corpus window reapplied. "
f"{results['updated']} notices updated, "
f"{results['activated']} activated, "
f"{results['archived']} archived."
)
@cli.command("sanitize-notice-dates")
def sanitize_notice_dates() -> None:
app = create_app()
with app.state.SessionLocal() as session:
results = sanitize_existing_notice_dates(session, app.state.settings)
session.commit()
typer.echo(
"Notice date sanitization completed. "
f"{results['updated']} notices updated, "
f"{results['cleared_date_fields']} date fields cleared, "
f"{results['activated']} activated, "
f"{results['archived']} archived."
)
@cli.command("cleanup-bulk-lookup-jobs")
def cleanup_bulk_lookup_jobs() -> None:
app = create_app()
with app.state.SessionLocal() as session:
results = cleanup_expired_jobs(session, app.state.settings, get_storage_backend(app.state.settings))
session.commit()
typer.echo(
f"Bulk lookup cleanup completed. {results['swept']} jobs expired, {results['blobs_deleted']} blobs deleted."
)
@cli.command("sync-storage-to-hf-dataset")
def sync_storage_to_hf_dataset(
private_repo: bool = typer.Option(True, help="Create the dataset repo as private if it does not exist."),
) -> None:
app = create_app()
settings = app.state.settings
if not settings.hf_storage_enabled:
typer.echo("Set POTS_TRACKER_STORAGE_BACKEND=huggingface_dataset before syncing storage.")
raise typer.Exit(code=1)
if not settings.hf_storage_repo_id:
typer.echo("Set POTS_TRACKER_HF_STORAGE_REPO_ID before syncing storage.")
raise typer.Exit(code=1)
uploaded = 0
normalized = 0
missing = 0
try:
with maintenance_lock(settings.storage_path, "sync-storage"):
ensure_hf_storage_repo(settings, private=private_repo)
upload_storage_folder_to_hf_dataset(settings, commit_message="Sync local storage archive")
with app.state.SessionLocal() as session:
notices = session.scalars(
select(RawNotice).where(RawNotice.raw_blob_path.is_not(None)).order_by(RawNotice.id)
).all()
for raw_notice in notices:
target_reference = normalize_blob_reference(
raw_notice.raw_blob_path,
prefix=settings.hf_storage_path_prefix,
)
if not target_reference:
missing += 1
continue
uploaded += 1
if raw_notice.raw_blob_path != target_reference:
raw_notice.raw_blob_path = target_reference
normalized += 1
session.commit()
except HfStorageVisibilityError as exc:
typer.echo(str(exc))
raise typer.Exit(code=1) from exc
except MaintenanceLockError as exc:
typer.echo(str(exc))
raise typer.Exit(code=1) from exc
typer.echo(
"HF dataset storage sync completed. "
f"{uploaded} blobs uploaded or refreshed, "
f"{normalized} blob references normalized, "
f"{missing} missing."
)
if __name__ == "__main__":
cli()