| |
| """Run Search-UI query diagnostics and build a reviewable HTML report.""" |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import html |
| import json |
| import re |
| import sqlite3 |
| import sys |
| import time |
| import urllib.parse |
| from collections import Counter, defaultdict |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any |
|
|
| import requests |
| from PIL import Image, ImageDraw, ImageFont, ImageOps |
|
|
| from search_diagnostics_quality import ( |
| classify_query, |
| diagnostic_query_for_visual, |
| domain_expansions, |
| expected_hints, |
| ) |
|
|
|
|
| QUERY_RE = re.compile(r"^- \[ \] (.+?)\s*$") |
| SECTION_RE = re.compile(r"^##\s+\d+\.\s+(.+?)\s*$") |
|
|
|
|
| @dataclass(frozen=True) |
| class QueryItem: |
| section: str |
| query: str |
|
|
|
|
| def parse_queries(path: Path) -> list[QueryItem]: |
| items: list[QueryItem] = [] |
| section = "Unsectioned" |
| for line in path.read_text(encoding="utf-8").splitlines(): |
| section_match = SECTION_RE.match(line) |
| if section_match: |
| section = section_match.group(1) |
| continue |
| query_match = QUERY_RE.match(line) |
| if query_match: |
| items.append(QueryItem(section=section, query=query_match.group(1))) |
| return items |
|
|
|
|
| def request_json(session: requests.Session, base_url: str, path: str, params: dict[str, Any], timeout: int) -> dict[str, Any]: |
| response = session.get(f"{base_url}{path}", params=params, timeout=timeout) |
| response.raise_for_status() |
| return response.json() |
|
|
|
|
| def compact_media_item(media_item: dict[str, Any] | None) -> dict[str, Any]: |
| if not media_item: |
| return {} |
| return { |
| "title": media_item.get("title"), |
| "primaryCategory": media_item.get("primaryCategory"), |
| "_category": media_item.get("_category"), |
| "_subcategory": media_item.get("_subcategory"), |
| "firstPublished": media_item.get("firstPublished"), |
| "duration": media_item.get("duration"), |
| "durationFormattedMinSec": media_item.get("durationFormattedMinSec"), |
| } |
|
|
|
|
| def compact_person(person: dict[str, Any] | None) -> dict[str, Any] | None: |
| if not person: |
| return None |
| return { |
| "id": person.get("id"), |
| "name": person.get("name"), |
| "reference_count": person.get("reference_count"), |
| "appearance_count": person.get("appearance_count"), |
| } |
|
|
|
|
| def compact_person_search(person_search: dict[str, Any] | None) -> dict[str, Any] | None: |
| if not person_search: |
| return None |
| return { |
| "matched": person_search.get("matched"), |
| "match_type": person_search.get("match_type"), |
| "reason": person_search.get("reason"), |
| "person": compact_person(person_search.get("person")), |
| "backfilled": person_search.get("backfilled"), |
| "backfill_count": person_search.get("backfill_count"), |
| } |
|
|
|
|
| def compact_result(result: dict[str, Any], rank: int) -> dict[str, Any]: |
| snippet = re.sub(r"<[^>]+>", "", result.get("snippet") or "") |
| snippet = " ".join(snippet.split()) |
| media_item = compact_media_item(result.get("mediaItem")) |
| return { |
| "rank": rank, |
| "natural_key": result.get("natural_key"), |
| "frame_number": result.get("frame_number"), |
| "score": result.get("score"), |
| "source_method": result.get("source_method"), |
| "matched_person": compact_person(result.get("matched_person")), |
| "thumbnail": result.get("thumbnail"), |
| "timestamp": result.get("timestamp"), |
| "matched_categories": result.get("matched_categories") or [], |
| "category_scores": (result.get("category_scores") or [])[:6], |
| "mediaItem": media_item, |
| "snippet": snippet[:320], |
| } |
|
|
|
|
| def average_hash(path: Path, hash_size: int = 8) -> int | None: |
| try: |
| with Image.open(path) as image: |
| gray = ImageOps.grayscale(image).resize((hash_size, hash_size), Image.Resampling.LANCZOS) |
| pixels = list(gray.getdata()) |
| except (OSError, ValueError): |
| return None |
| avg = sum(pixels) / len(pixels) |
| value = 0 |
| for pixel in pixels: |
| value = (value << 1) | int(pixel >= avg) |
| return value |
|
|
|
|
| def hamming(a: int, b: int) -> int: |
| return (a ^ b).bit_count() |
|
|
|
|
| def safe_slug(text: str, max_length: int = 72) -> str: |
| slug = re.sub(r"[^a-z0-9]+", "-", text.lower()).strip("-") |
| return slug[:max_length] or "query" |
|
|
|
|
| def download_thumbnail(session: requests.Session, base_url: str, thumbnail: str, target: Path, timeout: int) -> bool: |
| if target.exists() and target.stat().st_size > 0: |
| return True |
| url = urllib.parse.urljoin(base_url, thumbnail) |
| try: |
| response = session.get(url, timeout=timeout) |
| response.raise_for_status() |
| target.write_bytes(response.content) |
| return True |
| except requests.RequestException: |
| return False |
|
|
|
|
| def find_duplicate_groups(items: list[dict[str, Any]]) -> list[list[int]]: |
| hashes: list[tuple[int, int]] = [] |
| for item in items: |
| if item.get("ahash") is not None: |
| hashes.append((int(item["rank"]), int(item["ahash"]))) |
|
|
| parent = {rank: rank for rank, _hash in hashes} |
|
|
| def find(rank: int) -> int: |
| while parent[rank] != rank: |
| parent[rank] = parent[parent[rank]] |
| rank = parent[rank] |
| return rank |
|
|
| def union(a: int, b: int) -> None: |
| root_a = find(a) |
| root_b = find(b) |
| if root_a != root_b: |
| parent[root_b] = root_a |
|
|
| for index, (rank_a, hash_a) in enumerate(hashes): |
| for rank_b, hash_b in hashes[index + 1:]: |
| if hamming(hash_a, hash_b) <= 4: |
| union(rank_a, rank_b) |
|
|
| groups: dict[int, list[int]] = defaultdict(list) |
| for rank, _hash in hashes: |
| groups[find(rank)].append(rank) |
| return [sorted(ranks) for ranks in groups.values() if len(ranks) > 1] |
|
|
|
|
| def draw_contact_sheet(query: str, items: list[dict[str, Any]], output_path: Path, columns: int = 4) -> None: |
| cell_w = 320 |
| cell_h = 230 |
| title_h = 44 |
| rows = max(1, (len(items) + columns - 1) // columns) |
| sheet = Image.new("RGB", (columns * cell_w, title_h + rows * cell_h), "white") |
| draw = ImageDraw.Draw(sheet) |
| font = ImageFont.load_default() |
| draw.text((12, 12), query[:140], fill=(0, 0, 0), font=font) |
| for idx, item in enumerate(items): |
| row = idx // columns |
| col = idx % columns |
| x = col * cell_w |
| y = title_h + row * cell_h |
| draw.rectangle((x, y, x + cell_w - 1, y + cell_h - 1), outline=(210, 210, 210)) |
| thumb_path = item.get("thumbnail_file") |
| if thumb_path and Path(thumb_path).exists(): |
| try: |
| with Image.open(thumb_path) as thumb: |
| thumb = thumb.convert("RGB") |
| thumb.thumbnail((cell_w, 176), Image.Resampling.LANCZOS) |
| tx = x + (cell_w - thumb.width) // 2 |
| sheet.paste(thumb, (tx, y + 8)) |
| except OSError: |
| pass |
| media = item.get("mediaItem") or {} |
| label = f"#{item['rank']} score {float(item.get('score') or 0):.3f}" |
| title = (media.get("title") or item.get("natural_key") or "")[:46] |
| frame = item.get("frame_number") |
| if frame is not None: |
| label += f" frame {frame}" |
| draw.text((x + 8, y + 184), label, fill=(0, 0, 0), font=font) |
| draw.text((x + 8, y + 202), title, fill=(40, 40, 40), font=font) |
| sheet.save(output_path) |
|
|
|
|
| def render_html_report( |
| *, |
| output_path: Path, |
| source_queries_path: Path, |
| generated_at: str, |
| query_items: list[dict[str, Any]], |
| section_summaries: dict[str, dict[str, Any]], |
| ) -> None: |
| status_counts = Counter(item["status"] for item in query_items) |
| rows = [] |
| for item in query_items: |
| visual = item["visual"] |
| audio = item["audio"] |
| top = visual["results"][0] if visual["results"] else {} |
| top_media = top.get("mediaItem") or {} |
| screenshot_rel = item.get("contact_sheet") |
| duplicate_text = ", ".join(str(group) for group in visual.get("duplicate_groups") or []) |
| reasons = "".join(f"<li>{html.escape(reason)}</li>" for reason in item.get("reasons") or []) |
| expansions = ", ".join(item.get("domain_expansions") or []) |
| rows.append( |
| f""" |
| <tr class="status-{html.escape(item['status'])}"> |
| <td>{html.escape(item['section'])}</td> |
| <td><strong>{html.escape(item['query'])}</strong><br><span>{html.escape(expansions)}</span></td> |
| <td>{html.escape(item['status'])}</td> |
| <td>Visual {visual['count']}<br>Audio {audio['count']}</td> |
| <td>{float(top.get('score') or 0):.3f}<br>{html.escape(top_media.get('title') or '')}</td> |
| <td>{html.escape(duplicate_text)}</td> |
| <td><ul>{reasons}</ul></td> |
| <td>{f'<a href="{html.escape(screenshot_rel)}">contact sheet</a>' if screenshot_rel else ''}</td> |
| </tr> |
| """ |
| ) |
|
|
| section_rows = [] |
| for section, summary in section_summaries.items(): |
| section_rows.append( |
| f""" |
| <tr> |
| <td>{html.escape(section)}</td> |
| <td>{summary['total']}</td> |
| <td>{summary.get('looks_promising', 0)}</td> |
| <td>{summary.get('mixed', 0)}</td> |
| <td>{summary.get('needs_review', 0)}</td> |
| <td>{summary.get('fail', 0)}</td> |
| </tr> |
| """ |
| ) |
|
|
| output_path.write_text( |
| f"""<!doctype html> |
| <html lang="en"> |
| <head> |
| <meta charset="utf-8"> |
| <title>Search UI Diagnostic Report</title> |
| <style> |
| body {{ font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; margin: 24px; color: #1f2933; }} |
| h1, h2 {{ margin-bottom: 8px; }} |
| .summary {{ display: flex; gap: 12px; flex-wrap: wrap; margin: 16px 0 24px; }} |
| .pill {{ padding: 8px 12px; border-radius: 8px; background: #eef2f7; }} |
| table {{ border-collapse: collapse; width: 100%; font-size: 13px; }} |
| th, td {{ border: 1px solid #d8dee7; padding: 8px; vertical-align: top; }} |
| th {{ background: #f3f6fa; position: sticky; top: 0; }} |
| tr.status-looks_promising {{ background: #f4fbf6; }} |
| tr.status-mixed {{ background: #fffaf0; }} |
| tr.status-needs_review {{ background: #fff4f0; }} |
| tr.status-fail {{ background: #fdecec; }} |
| td span {{ color: #5f6b7a; font-size: 12px; }} |
| ul {{ margin: 0; padding-left: 18px; }} |
| a {{ color: #0b65c2; }} |
| </style> |
| </head> |
| <body> |
| <h1>Search UI Diagnostic Report</h1> |
| <p>Generated {html.escape(generated_at)} from {html.escape(str(source_queries_path))}.</p> |
| <div class="summary"> |
| <div class="pill">Queries: {len(query_items)}</div> |
| <div class="pill">Looks promising: {status_counts.get('looks_promising', 0)}</div> |
| <div class="pill">Mixed: {status_counts.get('mixed', 0)}</div> |
| <div class="pill">Needs review: {status_counts.get('needs_review', 0)}</div> |
| <div class="pill">Fail: {status_counts.get('fail', 0)}</div> |
| </div> |
| <h2>Section Summary</h2> |
| <table> |
| <thead><tr><th>Section</th><th>Total</th><th>Promising</th><th>Mixed</th><th>Needs review</th><th>Fail</th></tr></thead> |
| <tbody>{''.join(section_rows)}</tbody> |
| </table> |
| <h2>Query Details</h2> |
| <table> |
| <thead> |
| <tr> |
| <th>Section</th><th>Query</th><th>Status</th><th>Counts</th><th>Top Visual Hit</th><th>Duplicate Groups</th><th>Reasons</th><th>Visuals</th> |
| </tr> |
| </thead> |
| <tbody>{''.join(rows)}</tbody> |
| </table> |
| </body> |
| </html> |
| """, |
| encoding="utf-8", |
| ) |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--queries", type=Path, required=True) |
| parser.add_argument("--output-dir", type=Path, required=True) |
| parser.add_argument("--base-url", default="http://localhost:8001") |
| parser.add_argument("--limit", type=int, default=40) |
| parser.add_argument("--thumbs", type=int, default=12) |
| parser.add_argument("--timeout", type=int, default=90) |
| parser.add_argument("--sleep", type=float, default=0.0) |
| parser.add_argument("--max-queries", type=int, default=0) |
| parser.add_argument("--resume", action="store_true") |
| args = parser.parse_args() |
|
|
| args.output_dir.mkdir(parents=True, exist_ok=True) |
| thumbs_dir = args.output_dir / "thumbs" |
| sheets_dir = args.output_dir / "contact_sheets" |
| thumbs_dir.mkdir(exist_ok=True) |
| sheets_dir.mkdir(exist_ok=True) |
| summary_path = args.output_dir / "summary.json" |
| report_path = args.output_dir / "report.html" |
|
|
| query_items = parse_queries(args.queries) |
| if args.max_queries: |
| query_items = query_items[: args.max_queries] |
| if not query_items: |
| print("No queries parsed.", file=sys.stderr) |
| return 2 |
|
|
| existing: dict[str, Any] = {} |
| if args.resume and summary_path.exists(): |
| previous = json.loads(summary_path.read_text(encoding="utf-8")) |
| existing = {item["query"]: item for item in previous.get("queries", [])} |
|
|
| session = requests.Session() |
| all_items: list[dict[str, Any]] = [] |
| started = time.time() |
|
|
| for index, query_item in enumerate(query_items, start=1): |
| query = query_item.query |
| if query in existing: |
| all_items.append(existing[query]) |
| print(f"[{index:03d}/{len(query_items)}] resumed {query}") |
| continue |
|
|
| print(f"[{index:03d}/{len(query_items)}] {query}", flush=True) |
| query_start = time.time() |
| visual = request_json( |
| session, |
| args.base_url, |
| "/api/search", |
| {"q": query, "language": "E", "method": "image-content", "limit": args.limit}, |
| args.timeout, |
| ) |
| audio = request_json( |
| session, |
| args.base_url, |
| "/api/search", |
| {"q": query, "language": "E", "method": "hybrid", "limit": min(args.limit, 30)}, |
| args.timeout, |
| ) |
| title = request_json( |
| session, |
| args.base_url, |
| "/api/search-title", |
| {"q": query, "language": "E", "limit": 20}, |
| args.timeout, |
| ) |
|
|
| visual_results = [compact_result(result, rank) for rank, result in enumerate(visual.get("results") or [], start=1)] |
| audio_results = [compact_result(result, rank) for rank, result in enumerate(audio.get("results") or [], start=1)] |
| title_results = [compact_result(result, rank) for rank, result in enumerate(title.get("results") or [], start=1)] |
|
|
| thumb_items: list[dict[str, Any]] = [] |
| for result in visual_results[: args.thumbs]: |
| thumbnail = result.get("thumbnail") |
| if not thumbnail: |
| continue |
| thumb_name = f"{index:03d}-{safe_slug(query)}-r{result['rank']:02d}.jpg" |
| thumb_path = thumbs_dir / thumb_name |
| if download_thumbnail(session, args.base_url, thumbnail, thumb_path, args.timeout): |
| ahash = average_hash(thumb_path) |
| result["thumbnail_file"] = str(thumb_path) |
| result["thumbnail_rel"] = str(thumb_path.relative_to(args.output_dir)) |
| result["ahash"] = ahash |
| thumb_items.append(result) |
|
|
| duplicate_groups = find_duplicate_groups(thumb_items) |
| visual_summary = { |
| "method": visual.get("method"), |
| "count": len(visual_results), |
| "results": visual_results, |
| "duplicate_groups": duplicate_groups, |
| "query_time_ms": round((time.time() - query_start) * 1000, 1), |
| "visual_query": visual.get("visual_query"), |
| "person_search": compact_person_search(visual.get("person_search")), |
| } |
| audio_summary = { |
| "method": audio.get("method"), |
| "count": len(audio_results), |
| "results": audio_results, |
| } |
| title_summary = { |
| "method": title.get("method"), |
| "count": len(title_results), |
| "results": title_results, |
| } |
|
|
| status, reasons = classify_query(query, visual_summary, audio_summary) |
| diagnostic_query = diagnostic_query_for_visual(query, visual_summary) |
| contact_sheet_rel = None |
| if thumb_items: |
| sheet_path = sheets_dir / f"{index:03d}-{safe_slug(query)}.jpg" |
| draw_contact_sheet(query, thumb_items, sheet_path) |
| contact_sheet_rel = str(sheet_path.relative_to(args.output_dir)) |
|
|
| item = { |
| "index": index, |
| "section": query_item.section, |
| "query": query, |
| "diagnostic_query": diagnostic_query, |
| "status": status, |
| "reasons": reasons, |
| "domain_expansions": domain_expansions(diagnostic_query), |
| "expected_hints": expected_hints(diagnostic_query), |
| "contact_sheet": contact_sheet_rel, |
| "visual": visual_summary, |
| "audio": audio_summary, |
| "title": title_summary, |
| } |
| all_items.append(item) |
| checkpoint = { |
| "generated_at": time.strftime("%Y-%m-%d %H:%M:%S"), |
| "source_queries": str(args.queries), |
| "elapsed_seconds": round(time.time() - started, 1), |
| "queries": all_items, |
| } |
| summary_path.write_text(json.dumps(checkpoint, indent=2), encoding="utf-8") |
| if args.sleep: |
| time.sleep(args.sleep) |
|
|
| section_summaries: dict[str, dict[str, Any]] = {} |
| for item in all_items: |
| summary = section_summaries.setdefault( |
| item["section"], |
| {"total": 0, "looks_promising": 0, "mixed": 0, "needs_review": 0, "fail": 0}, |
| ) |
| summary["total"] += 1 |
| summary[item["status"]] += 1 |
|
|
| final_summary = { |
| "generated_at": time.strftime("%Y-%m-%d %H:%M:%S"), |
| "source_queries": str(args.queries), |
| "elapsed_seconds": round(time.time() - started, 1), |
| "counts": dict(Counter(item["status"] for item in all_items)), |
| "sections": section_summaries, |
| "queries": all_items, |
| } |
| summary_path.write_text(json.dumps(final_summary, indent=2), encoding="utf-8") |
| render_html_report( |
| output_path=report_path, |
| source_queries_path=args.queries, |
| generated_at=final_summary["generated_at"], |
| query_items=all_items, |
| section_summaries=section_summaries, |
| ) |
| print(f"Wrote {summary_path}") |
| print(f"Wrote {report_path}") |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|