#!/usr/bin/env python3 """Run Search-UI query diagnostics and build a reviewable HTML report.""" from __future__ import annotations import argparse import html import json import re import sqlite3 import sys import time import urllib.parse from collections import Counter, defaultdict from dataclasses import dataclass from pathlib import Path from typing import Any import requests from PIL import Image, ImageDraw, ImageFont, ImageOps from search_diagnostics_quality import ( classify_query, diagnostic_query_for_visual, domain_expansions, expected_hints, ) QUERY_RE = re.compile(r"^- \[ \] (.+?)\s*$") SECTION_RE = re.compile(r"^##\s+\d+\.\s+(.+?)\s*$") @dataclass(frozen=True) class QueryItem: section: str query: str def parse_queries(path: Path) -> list[QueryItem]: items: list[QueryItem] = [] section = "Unsectioned" for line in path.read_text(encoding="utf-8").splitlines(): section_match = SECTION_RE.match(line) if section_match: section = section_match.group(1) continue query_match = QUERY_RE.match(line) if query_match: items.append(QueryItem(section=section, query=query_match.group(1))) return items def request_json(session: requests.Session, base_url: str, path: str, params: dict[str, Any], timeout: int) -> dict[str, Any]: response = session.get(f"{base_url}{path}", params=params, timeout=timeout) response.raise_for_status() return response.json() def compact_media_item(media_item: dict[str, Any] | None) -> dict[str, Any]: if not media_item: return {} return { "title": media_item.get("title"), "primaryCategory": media_item.get("primaryCategory"), "_category": media_item.get("_category"), "_subcategory": media_item.get("_subcategory"), "firstPublished": media_item.get("firstPublished"), "duration": media_item.get("duration"), "durationFormattedMinSec": media_item.get("durationFormattedMinSec"), } def compact_person(person: dict[str, Any] | None) -> dict[str, Any] | None: if not person: return None return { "id": person.get("id"), "name": person.get("name"), "reference_count": person.get("reference_count"), "appearance_count": person.get("appearance_count"), } def compact_person_search(person_search: dict[str, Any] | None) -> dict[str, Any] | None: if not person_search: return None return { "matched": person_search.get("matched"), "match_type": person_search.get("match_type"), "reason": person_search.get("reason"), "person": compact_person(person_search.get("person")), "backfilled": person_search.get("backfilled"), "backfill_count": person_search.get("backfill_count"), } def compact_result(result: dict[str, Any], rank: int) -> dict[str, Any]: snippet = re.sub(r"<[^>]+>", "", result.get("snippet") or "") snippet = " ".join(snippet.split()) media_item = compact_media_item(result.get("mediaItem")) return { "rank": rank, "natural_key": result.get("natural_key"), "frame_number": result.get("frame_number"), "score": result.get("score"), "source_method": result.get("source_method"), "matched_person": compact_person(result.get("matched_person")), "thumbnail": result.get("thumbnail"), "timestamp": result.get("timestamp"), "matched_categories": result.get("matched_categories") or [], "category_scores": (result.get("category_scores") or [])[:6], "mediaItem": media_item, "snippet": snippet[:320], } def average_hash(path: Path, hash_size: int = 8) -> int | None: try: with Image.open(path) as image: gray = ImageOps.grayscale(image).resize((hash_size, hash_size), Image.Resampling.LANCZOS) pixels = list(gray.getdata()) except (OSError, ValueError): return None avg = sum(pixels) / len(pixels) value = 0 for pixel in pixels: value = (value << 1) | int(pixel >= avg) return value def hamming(a: int, b: int) -> int: return (a ^ b).bit_count() def safe_slug(text: str, max_length: int = 72) -> str: slug = re.sub(r"[^a-z0-9]+", "-", text.lower()).strip("-") return slug[:max_length] or "query" def download_thumbnail(session: requests.Session, base_url: str, thumbnail: str, target: Path, timeout: int) -> bool: if target.exists() and target.stat().st_size > 0: return True url = urllib.parse.urljoin(base_url, thumbnail) try: response = session.get(url, timeout=timeout) response.raise_for_status() target.write_bytes(response.content) return True except requests.RequestException: return False def find_duplicate_groups(items: list[dict[str, Any]]) -> list[list[int]]: hashes: list[tuple[int, int]] = [] for item in items: if item.get("ahash") is not None: hashes.append((int(item["rank"]), int(item["ahash"]))) parent = {rank: rank for rank, _hash in hashes} def find(rank: int) -> int: while parent[rank] != rank: parent[rank] = parent[parent[rank]] rank = parent[rank] return rank def union(a: int, b: int) -> None: root_a = find(a) root_b = find(b) if root_a != root_b: parent[root_b] = root_a for index, (rank_a, hash_a) in enumerate(hashes): for rank_b, hash_b in hashes[index + 1:]: if hamming(hash_a, hash_b) <= 4: union(rank_a, rank_b) groups: dict[int, list[int]] = defaultdict(list) for rank, _hash in hashes: groups[find(rank)].append(rank) return [sorted(ranks) for ranks in groups.values() if len(ranks) > 1] def draw_contact_sheet(query: str, items: list[dict[str, Any]], output_path: Path, columns: int = 4) -> None: cell_w = 320 cell_h = 230 title_h = 44 rows = max(1, (len(items) + columns - 1) // columns) sheet = Image.new("RGB", (columns * cell_w, title_h + rows * cell_h), "white") draw = ImageDraw.Draw(sheet) font = ImageFont.load_default() draw.text((12, 12), query[:140], fill=(0, 0, 0), font=font) for idx, item in enumerate(items): row = idx // columns col = idx % columns x = col * cell_w y = title_h + row * cell_h draw.rectangle((x, y, x + cell_w - 1, y + cell_h - 1), outline=(210, 210, 210)) thumb_path = item.get("thumbnail_file") if thumb_path and Path(thumb_path).exists(): try: with Image.open(thumb_path) as thumb: thumb = thumb.convert("RGB") thumb.thumbnail((cell_w, 176), Image.Resampling.LANCZOS) tx = x + (cell_w - thumb.width) // 2 sheet.paste(thumb, (tx, y + 8)) except OSError: pass media = item.get("mediaItem") or {} label = f"#{item['rank']} score {float(item.get('score') or 0):.3f}" title = (media.get("title") or item.get("natural_key") or "")[:46] frame = item.get("frame_number") if frame is not None: label += f" frame {frame}" draw.text((x + 8, y + 184), label, fill=(0, 0, 0), font=font) draw.text((x + 8, y + 202), title, fill=(40, 40, 40), font=font) sheet.save(output_path) def render_html_report( *, output_path: Path, source_queries_path: Path, generated_at: str, query_items: list[dict[str, Any]], section_summaries: dict[str, dict[str, Any]], ) -> None: status_counts = Counter(item["status"] for item in query_items) rows = [] for item in query_items: visual = item["visual"] audio = item["audio"] top = visual["results"][0] if visual["results"] else {} top_media = top.get("mediaItem") or {} screenshot_rel = item.get("contact_sheet") duplicate_text = ", ".join(str(group) for group in visual.get("duplicate_groups") or []) reasons = "".join(f"
Generated {html.escape(generated_at)} from {html.escape(str(source_queries_path))}.
| Section | Total | Promising | Mixed | Needs review | Fail |
|---|
| Section | Query | Status | Counts | Top Visual Hit | Duplicate Groups | Reasons | Visuals |
|---|