#!/usr/bin/env python3 """Run Search-UI query diagnostics and build a reviewable HTML report.""" from __future__ import annotations import argparse import html import json import re import sqlite3 import sys import time import urllib.parse from collections import Counter, defaultdict from dataclasses import dataclass from pathlib import Path from typing import Any import requests from PIL import Image, ImageDraw, ImageFont, ImageOps from search_diagnostics_quality import ( classify_query, diagnostic_query_for_visual, domain_expansions, expected_hints, ) QUERY_RE = re.compile(r"^- \[ \] (.+?)\s*$") SECTION_RE = re.compile(r"^##\s+\d+\.\s+(.+?)\s*$") @dataclass(frozen=True) class QueryItem: section: str query: str def parse_queries(path: Path) -> list[QueryItem]: items: list[QueryItem] = [] section = "Unsectioned" for line in path.read_text(encoding="utf-8").splitlines(): section_match = SECTION_RE.match(line) if section_match: section = section_match.group(1) continue query_match = QUERY_RE.match(line) if query_match: items.append(QueryItem(section=section, query=query_match.group(1))) return items def request_json(session: requests.Session, base_url: str, path: str, params: dict[str, Any], timeout: int) -> dict[str, Any]: response = session.get(f"{base_url}{path}", params=params, timeout=timeout) response.raise_for_status() return response.json() def compact_media_item(media_item: dict[str, Any] | None) -> dict[str, Any]: if not media_item: return {} return { "title": media_item.get("title"), "primaryCategory": media_item.get("primaryCategory"), "_category": media_item.get("_category"), "_subcategory": media_item.get("_subcategory"), "firstPublished": media_item.get("firstPublished"), "duration": media_item.get("duration"), "durationFormattedMinSec": media_item.get("durationFormattedMinSec"), } def compact_person(person: dict[str, Any] | None) -> dict[str, Any] | None: if not person: return None return { "id": person.get("id"), "name": person.get("name"), "reference_count": person.get("reference_count"), "appearance_count": person.get("appearance_count"), } def compact_person_search(person_search: dict[str, Any] | None) -> dict[str, Any] | None: if not person_search: return None return { "matched": person_search.get("matched"), "match_type": person_search.get("match_type"), "reason": person_search.get("reason"), "person": compact_person(person_search.get("person")), "backfilled": person_search.get("backfilled"), "backfill_count": person_search.get("backfill_count"), } def compact_result(result: dict[str, Any], rank: int) -> dict[str, Any]: snippet = re.sub(r"<[^>]+>", "", result.get("snippet") or "") snippet = " ".join(snippet.split()) media_item = compact_media_item(result.get("mediaItem")) return { "rank": rank, "natural_key": result.get("natural_key"), "frame_number": result.get("frame_number"), "score": result.get("score"), "source_method": result.get("source_method"), "matched_person": compact_person(result.get("matched_person")), "thumbnail": result.get("thumbnail"), "timestamp": result.get("timestamp"), "matched_categories": result.get("matched_categories") or [], "category_scores": (result.get("category_scores") or [])[:6], "mediaItem": media_item, "snippet": snippet[:320], } def average_hash(path: Path, hash_size: int = 8) -> int | None: try: with Image.open(path) as image: gray = ImageOps.grayscale(image).resize((hash_size, hash_size), Image.Resampling.LANCZOS) pixels = list(gray.getdata()) except (OSError, ValueError): return None avg = sum(pixels) / len(pixels) value = 0 for pixel in pixels: value = (value << 1) | int(pixel >= avg) return value def hamming(a: int, b: int) -> int: return (a ^ b).bit_count() def safe_slug(text: str, max_length: int = 72) -> str: slug = re.sub(r"[^a-z0-9]+", "-", text.lower()).strip("-") return slug[:max_length] or "query" def download_thumbnail(session: requests.Session, base_url: str, thumbnail: str, target: Path, timeout: int) -> bool: if target.exists() and target.stat().st_size > 0: return True url = urllib.parse.urljoin(base_url, thumbnail) try: response = session.get(url, timeout=timeout) response.raise_for_status() target.write_bytes(response.content) return True except requests.RequestException: return False def find_duplicate_groups(items: list[dict[str, Any]]) -> list[list[int]]: hashes: list[tuple[int, int]] = [] for item in items: if item.get("ahash") is not None: hashes.append((int(item["rank"]), int(item["ahash"]))) parent = {rank: rank for rank, _hash in hashes} def find(rank: int) -> int: while parent[rank] != rank: parent[rank] = parent[parent[rank]] rank = parent[rank] return rank def union(a: int, b: int) -> None: root_a = find(a) root_b = find(b) if root_a != root_b: parent[root_b] = root_a for index, (rank_a, hash_a) in enumerate(hashes): for rank_b, hash_b in hashes[index + 1:]: if hamming(hash_a, hash_b) <= 4: union(rank_a, rank_b) groups: dict[int, list[int]] = defaultdict(list) for rank, _hash in hashes: groups[find(rank)].append(rank) return [sorted(ranks) for ranks in groups.values() if len(ranks) > 1] def draw_contact_sheet(query: str, items: list[dict[str, Any]], output_path: Path, columns: int = 4) -> None: cell_w = 320 cell_h = 230 title_h = 44 rows = max(1, (len(items) + columns - 1) // columns) sheet = Image.new("RGB", (columns * cell_w, title_h + rows * cell_h), "white") draw = ImageDraw.Draw(sheet) font = ImageFont.load_default() draw.text((12, 12), query[:140], fill=(0, 0, 0), font=font) for idx, item in enumerate(items): row = idx // columns col = idx % columns x = col * cell_w y = title_h + row * cell_h draw.rectangle((x, y, x + cell_w - 1, y + cell_h - 1), outline=(210, 210, 210)) thumb_path = item.get("thumbnail_file") if thumb_path and Path(thumb_path).exists(): try: with Image.open(thumb_path) as thumb: thumb = thumb.convert("RGB") thumb.thumbnail((cell_w, 176), Image.Resampling.LANCZOS) tx = x + (cell_w - thumb.width) // 2 sheet.paste(thumb, (tx, y + 8)) except OSError: pass media = item.get("mediaItem") or {} label = f"#{item['rank']} score {float(item.get('score') or 0):.3f}" title = (media.get("title") or item.get("natural_key") or "")[:46] frame = item.get("frame_number") if frame is not None: label += f" frame {frame}" draw.text((x + 8, y + 184), label, fill=(0, 0, 0), font=font) draw.text((x + 8, y + 202), title, fill=(40, 40, 40), font=font) sheet.save(output_path) def render_html_report( *, output_path: Path, source_queries_path: Path, generated_at: str, query_items: list[dict[str, Any]], section_summaries: dict[str, dict[str, Any]], ) -> None: status_counts = Counter(item["status"] for item in query_items) rows = [] for item in query_items: visual = item["visual"] audio = item["audio"] top = visual["results"][0] if visual["results"] else {} top_media = top.get("mediaItem") or {} screenshot_rel = item.get("contact_sheet") duplicate_text = ", ".join(str(group) for group in visual.get("duplicate_groups") or []) reasons = "".join(f"
  • {html.escape(reason)}
  • " for reason in item.get("reasons") or []) expansions = ", ".join(item.get("domain_expansions") or []) rows.append( f""" {html.escape(item['section'])} {html.escape(item['query'])}
    {html.escape(expansions)} {html.escape(item['status'])} Visual {visual['count']}
    Audio {audio['count']} {float(top.get('score') or 0):.3f}
    {html.escape(top_media.get('title') or '')} {html.escape(duplicate_text)} {f'contact sheet' if screenshot_rel else ''} """ ) section_rows = [] for section, summary in section_summaries.items(): section_rows.append( f""" {html.escape(section)} {summary['total']} {summary.get('looks_promising', 0)} {summary.get('mixed', 0)} {summary.get('needs_review', 0)} {summary.get('fail', 0)} """ ) output_path.write_text( f""" Search UI Diagnostic Report

    Search UI Diagnostic Report

    Generated {html.escape(generated_at)} from {html.escape(str(source_queries_path))}.

    Queries: {len(query_items)}
    Looks promising: {status_counts.get('looks_promising', 0)}
    Mixed: {status_counts.get('mixed', 0)}
    Needs review: {status_counts.get('needs_review', 0)}
    Fail: {status_counts.get('fail', 0)}

    Section Summary

    {''.join(section_rows)}
    SectionTotalPromisingMixedNeeds reviewFail

    Query Details

    {''.join(rows)}
    SectionQueryStatusCountsTop Visual HitDuplicate GroupsReasonsVisuals
    """, encoding="utf-8", ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--queries", type=Path, required=True) parser.add_argument("--output-dir", type=Path, required=True) parser.add_argument("--base-url", default="http://localhost:8001") parser.add_argument("--limit", type=int, default=40) parser.add_argument("--thumbs", type=int, default=12) parser.add_argument("--timeout", type=int, default=90) parser.add_argument("--sleep", type=float, default=0.0) parser.add_argument("--max-queries", type=int, default=0) parser.add_argument("--resume", action="store_true") args = parser.parse_args() args.output_dir.mkdir(parents=True, exist_ok=True) thumbs_dir = args.output_dir / "thumbs" sheets_dir = args.output_dir / "contact_sheets" thumbs_dir.mkdir(exist_ok=True) sheets_dir.mkdir(exist_ok=True) summary_path = args.output_dir / "summary.json" report_path = args.output_dir / "report.html" query_items = parse_queries(args.queries) if args.max_queries: query_items = query_items[: args.max_queries] if not query_items: print("No queries parsed.", file=sys.stderr) return 2 existing: dict[str, Any] = {} if args.resume and summary_path.exists(): previous = json.loads(summary_path.read_text(encoding="utf-8")) existing = {item["query"]: item for item in previous.get("queries", [])} session = requests.Session() all_items: list[dict[str, Any]] = [] started = time.time() for index, query_item in enumerate(query_items, start=1): query = query_item.query if query in existing: all_items.append(existing[query]) print(f"[{index:03d}/{len(query_items)}] resumed {query}") continue print(f"[{index:03d}/{len(query_items)}] {query}", flush=True) query_start = time.time() visual = request_json( session, args.base_url, "/api/search", {"q": query, "language": "E", "method": "image-content", "limit": args.limit}, args.timeout, ) audio = request_json( session, args.base_url, "/api/search", {"q": query, "language": "E", "method": "hybrid", "limit": min(args.limit, 30)}, args.timeout, ) title = request_json( session, args.base_url, "/api/search-title", {"q": query, "language": "E", "limit": 20}, args.timeout, ) visual_results = [compact_result(result, rank) for rank, result in enumerate(visual.get("results") or [], start=1)] audio_results = [compact_result(result, rank) for rank, result in enumerate(audio.get("results") or [], start=1)] title_results = [compact_result(result, rank) for rank, result in enumerate(title.get("results") or [], start=1)] thumb_items: list[dict[str, Any]] = [] for result in visual_results[: args.thumbs]: thumbnail = result.get("thumbnail") if not thumbnail: continue thumb_name = f"{index:03d}-{safe_slug(query)}-r{result['rank']:02d}.jpg" thumb_path = thumbs_dir / thumb_name if download_thumbnail(session, args.base_url, thumbnail, thumb_path, args.timeout): ahash = average_hash(thumb_path) result["thumbnail_file"] = str(thumb_path) result["thumbnail_rel"] = str(thumb_path.relative_to(args.output_dir)) result["ahash"] = ahash thumb_items.append(result) duplicate_groups = find_duplicate_groups(thumb_items) visual_summary = { "method": visual.get("method"), "count": len(visual_results), "results": visual_results, "duplicate_groups": duplicate_groups, "query_time_ms": round((time.time() - query_start) * 1000, 1), "visual_query": visual.get("visual_query"), "person_search": compact_person_search(visual.get("person_search")), } audio_summary = { "method": audio.get("method"), "count": len(audio_results), "results": audio_results, } title_summary = { "method": title.get("method"), "count": len(title_results), "results": title_results, } status, reasons = classify_query(query, visual_summary, audio_summary) diagnostic_query = diagnostic_query_for_visual(query, visual_summary) contact_sheet_rel = None if thumb_items: sheet_path = sheets_dir / f"{index:03d}-{safe_slug(query)}.jpg" draw_contact_sheet(query, thumb_items, sheet_path) contact_sheet_rel = str(sheet_path.relative_to(args.output_dir)) item = { "index": index, "section": query_item.section, "query": query, "diagnostic_query": diagnostic_query, "status": status, "reasons": reasons, "domain_expansions": domain_expansions(diagnostic_query), "expected_hints": expected_hints(diagnostic_query), "contact_sheet": contact_sheet_rel, "visual": visual_summary, "audio": audio_summary, "title": title_summary, } all_items.append(item) checkpoint = { "generated_at": time.strftime("%Y-%m-%d %H:%M:%S"), "source_queries": str(args.queries), "elapsed_seconds": round(time.time() - started, 1), "queries": all_items, } summary_path.write_text(json.dumps(checkpoint, indent=2), encoding="utf-8") if args.sleep: time.sleep(args.sleep) section_summaries: dict[str, dict[str, Any]] = {} for item in all_items: summary = section_summaries.setdefault( item["section"], {"total": 0, "looks_promising": 0, "mixed": 0, "needs_review": 0, "fail": 0}, ) summary["total"] += 1 summary[item["status"]] += 1 final_summary = { "generated_at": time.strftime("%Y-%m-%d %H:%M:%S"), "source_queries": str(args.queries), "elapsed_seconds": round(time.time() - started, 1), "counts": dict(Counter(item["status"] for item in all_items)), "sections": section_summaries, "queries": all_items, } summary_path.write_text(json.dumps(final_summary, indent=2), encoding="utf-8") render_html_report( output_path=report_path, source_queries_path=args.queries, generated_at=final_summary["generated_at"], query_items=all_items, section_summaries=section_summaries, ) print(f"Wrote {summary_path}") print(f"Wrote {report_path}") return 0 if __name__ == "__main__": raise SystemExit(main())