jw-search / scripts /report-reprocess-backlog.py
jw-tools's picture
deploy: latest main (lazy-ML cold start, durable launcher, web-image search, scene search) + full-app data refresh
7ea1851 verified
#!/usr/bin/env python3
"""
Report which processed videos need a later reprocess pass.
This is intended for long-running library builds where we want to finish the
first full pass, then come back and selectively reprocess videos that were
indexed before a newer thumbnail or embedding recipe landed.
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from collections import Counter
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
REPO_ROOT = os.path.dirname(SCRIPT_DIR)
BACKEND_DIR = os.path.join(REPO_ROOT, "backend")
if BACKEND_DIR not in sys.path:
sys.path.insert(0, BACKEND_DIR)
from search_images import ImageSearch, get_labels_version, get_thumbnail_sampling_version
from search import SubtitleSearch
from utils import atomic_write_json
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Report image-pipeline reprocess backlog.")
parser.add_argument(
"--output",
required=True,
help="Path to write the JSON report.",
)
parser.add_argument(
"--sample-limit",
type=int,
default=100,
help="Maximum number of natural keys to include in each sample list.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
image_search = ImageSearch()
subtitle_search = SubtitleSearch()
backlog = image_search.get_videos_needing_reprocessing()
subtitle_backlog = subtitle_search.get_videos_needing_reembedding("E")
concept_backlog = subtitle_search.get_videos_needing_concept_refresh("E")
reason_counts: Counter[str] = Counter()
by_reason: dict[str, list[str]] = {}
subtitle_reason_counts: Counter[str] = Counter()
subtitle_by_reason: dict[str, list[str]] = {}
concept_reason_counts: Counter[str] = Counter()
concept_by_reason: dict[str, list[str]] = {}
for natural_key, reasons in backlog:
for reason in reasons:
reason_counts[reason] += 1
by_reason.setdefault(reason, [])
if len(by_reason[reason]) < args.sample_limit:
by_reason[reason].append(natural_key)
for natural_key, reasons in subtitle_backlog:
for reason in reasons:
subtitle_reason_counts[reason] += 1
subtitle_by_reason.setdefault(reason, [])
if len(subtitle_by_reason[reason]) < args.sample_limit:
subtitle_by_reason[reason].append(natural_key)
for natural_key, reasons in concept_backlog:
for reason in reasons:
concept_reason_counts[reason] += 1
concept_by_reason.setdefault(reason, [])
if len(concept_by_reason[reason]) < args.sample_limit:
concept_by_reason[reason].append(natural_key)
payload = {
"generated_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
"image_pipeline": {
"current_labels_version": get_labels_version(),
"current_thumbnail_sampling_version": get_thumbnail_sampling_version(),
"processed_videos": image_search.count_processed_videos(),
"videos_needing_reprocessing": len(backlog),
"reason_counts": dict(reason_counts),
"sample_videos_by_reason": by_reason,
},
"subtitle_pipeline": {
"current_recipe": subtitle_search.get_current_subtitle_embedding_recipe(),
"processed_videos": subtitle_search.count_indexed_subtitles("E"),
"videos_needing_reembedding": len(subtitle_backlog),
"reason_counts": dict(subtitle_reason_counts),
"sample_videos_by_reason": subtitle_by_reason,
},
"video_concept_pipeline": {
"current_recipe": subtitle_search.get_current_video_concept_recipe(),
"processed_videos": subtitle_search.count_video_concepts("E"),
"videos_needing_refresh": len(concept_backlog),
"reason_counts": dict(concept_reason_counts),
"sample_videos_by_reason": concept_by_reason,
},
}
atomic_write_json(args.output, payload, indent=2)
print(json.dumps(payload, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())