jw-search / scripts /search_diagnostics.py
jw-tools's picture
deploy: latest main (lazy-ML cold start, durable launcher, web-image search, scene search) + full-app data refresh
7ea1851 verified
#!/usr/bin/env python3
"""Run Search-UI query diagnostics and build a reviewable HTML report."""
from __future__ import annotations
import argparse
import html
import json
import re
import sqlite3
import sys
import time
import urllib.parse
from collections import Counter, defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import requests
from PIL import Image, ImageDraw, ImageFont, ImageOps
from search_diagnostics_quality import (
classify_query,
diagnostic_query_for_visual,
domain_expansions,
expected_hints,
)
QUERY_RE = re.compile(r"^- \[ \] (.+?)\s*$")
SECTION_RE = re.compile(r"^##\s+\d+\.\s+(.+?)\s*$")
@dataclass(frozen=True)
class QueryItem:
section: str
query: str
def parse_queries(path: Path) -> list[QueryItem]:
items: list[QueryItem] = []
section = "Unsectioned"
for line in path.read_text(encoding="utf-8").splitlines():
section_match = SECTION_RE.match(line)
if section_match:
section = section_match.group(1)
continue
query_match = QUERY_RE.match(line)
if query_match:
items.append(QueryItem(section=section, query=query_match.group(1)))
return items
def request_json(session: requests.Session, base_url: str, path: str, params: dict[str, Any], timeout: int) -> dict[str, Any]:
response = session.get(f"{base_url}{path}", params=params, timeout=timeout)
response.raise_for_status()
return response.json()
def compact_media_item(media_item: dict[str, Any] | None) -> dict[str, Any]:
if not media_item:
return {}
return {
"title": media_item.get("title"),
"primaryCategory": media_item.get("primaryCategory"),
"_category": media_item.get("_category"),
"_subcategory": media_item.get("_subcategory"),
"firstPublished": media_item.get("firstPublished"),
"duration": media_item.get("duration"),
"durationFormattedMinSec": media_item.get("durationFormattedMinSec"),
}
def compact_person(person: dict[str, Any] | None) -> dict[str, Any] | None:
if not person:
return None
return {
"id": person.get("id"),
"name": person.get("name"),
"reference_count": person.get("reference_count"),
"appearance_count": person.get("appearance_count"),
}
def compact_person_search(person_search: dict[str, Any] | None) -> dict[str, Any] | None:
if not person_search:
return None
return {
"matched": person_search.get("matched"),
"match_type": person_search.get("match_type"),
"reason": person_search.get("reason"),
"person": compact_person(person_search.get("person")),
"backfilled": person_search.get("backfilled"),
"backfill_count": person_search.get("backfill_count"),
}
def compact_result(result: dict[str, Any], rank: int) -> dict[str, Any]:
snippet = re.sub(r"<[^>]+>", "", result.get("snippet") or "")
snippet = " ".join(snippet.split())
media_item = compact_media_item(result.get("mediaItem"))
return {
"rank": rank,
"natural_key": result.get("natural_key"),
"frame_number": result.get("frame_number"),
"score": result.get("score"),
"source_method": result.get("source_method"),
"matched_person": compact_person(result.get("matched_person")),
"thumbnail": result.get("thumbnail"),
"timestamp": result.get("timestamp"),
"matched_categories": result.get("matched_categories") or [],
"category_scores": (result.get("category_scores") or [])[:6],
"mediaItem": media_item,
"snippet": snippet[:320],
}
def average_hash(path: Path, hash_size: int = 8) -> int | None:
try:
with Image.open(path) as image:
gray = ImageOps.grayscale(image).resize((hash_size, hash_size), Image.Resampling.LANCZOS)
pixels = list(gray.getdata())
except (OSError, ValueError):
return None
avg = sum(pixels) / len(pixels)
value = 0
for pixel in pixels:
value = (value << 1) | int(pixel >= avg)
return value
def hamming(a: int, b: int) -> int:
return (a ^ b).bit_count()
def safe_slug(text: str, max_length: int = 72) -> str:
slug = re.sub(r"[^a-z0-9]+", "-", text.lower()).strip("-")
return slug[:max_length] or "query"
def download_thumbnail(session: requests.Session, base_url: str, thumbnail: str, target: Path, timeout: int) -> bool:
if target.exists() and target.stat().st_size > 0:
return True
url = urllib.parse.urljoin(base_url, thumbnail)
try:
response = session.get(url, timeout=timeout)
response.raise_for_status()
target.write_bytes(response.content)
return True
except requests.RequestException:
return False
def find_duplicate_groups(items: list[dict[str, Any]]) -> list[list[int]]:
hashes: list[tuple[int, int]] = []
for item in items:
if item.get("ahash") is not None:
hashes.append((int(item["rank"]), int(item["ahash"])))
parent = {rank: rank for rank, _hash in hashes}
def find(rank: int) -> int:
while parent[rank] != rank:
parent[rank] = parent[parent[rank]]
rank = parent[rank]
return rank
def union(a: int, b: int) -> None:
root_a = find(a)
root_b = find(b)
if root_a != root_b:
parent[root_b] = root_a
for index, (rank_a, hash_a) in enumerate(hashes):
for rank_b, hash_b in hashes[index + 1:]:
if hamming(hash_a, hash_b) <= 4:
union(rank_a, rank_b)
groups: dict[int, list[int]] = defaultdict(list)
for rank, _hash in hashes:
groups[find(rank)].append(rank)
return [sorted(ranks) for ranks in groups.values() if len(ranks) > 1]
def draw_contact_sheet(query: str, items: list[dict[str, Any]], output_path: Path, columns: int = 4) -> None:
cell_w = 320
cell_h = 230
title_h = 44
rows = max(1, (len(items) + columns - 1) // columns)
sheet = Image.new("RGB", (columns * cell_w, title_h + rows * cell_h), "white")
draw = ImageDraw.Draw(sheet)
font = ImageFont.load_default()
draw.text((12, 12), query[:140], fill=(0, 0, 0), font=font)
for idx, item in enumerate(items):
row = idx // columns
col = idx % columns
x = col * cell_w
y = title_h + row * cell_h
draw.rectangle((x, y, x + cell_w - 1, y + cell_h - 1), outline=(210, 210, 210))
thumb_path = item.get("thumbnail_file")
if thumb_path and Path(thumb_path).exists():
try:
with Image.open(thumb_path) as thumb:
thumb = thumb.convert("RGB")
thumb.thumbnail((cell_w, 176), Image.Resampling.LANCZOS)
tx = x + (cell_w - thumb.width) // 2
sheet.paste(thumb, (tx, y + 8))
except OSError:
pass
media = item.get("mediaItem") or {}
label = f"#{item['rank']} score {float(item.get('score') or 0):.3f}"
title = (media.get("title") or item.get("natural_key") or "")[:46]
frame = item.get("frame_number")
if frame is not None:
label += f" frame {frame}"
draw.text((x + 8, y + 184), label, fill=(0, 0, 0), font=font)
draw.text((x + 8, y + 202), title, fill=(40, 40, 40), font=font)
sheet.save(output_path)
def render_html_report(
*,
output_path: Path,
source_queries_path: Path,
generated_at: str,
query_items: list[dict[str, Any]],
section_summaries: dict[str, dict[str, Any]],
) -> None:
status_counts = Counter(item["status"] for item in query_items)
rows = []
for item in query_items:
visual = item["visual"]
audio = item["audio"]
top = visual["results"][0] if visual["results"] else {}
top_media = top.get("mediaItem") or {}
screenshot_rel = item.get("contact_sheet")
duplicate_text = ", ".join(str(group) for group in visual.get("duplicate_groups") or [])
reasons = "".join(f"<li>{html.escape(reason)}</li>" for reason in item.get("reasons") or [])
expansions = ", ".join(item.get("domain_expansions") or [])
rows.append(
f"""
<tr class="status-{html.escape(item['status'])}">
<td>{html.escape(item['section'])}</td>
<td><strong>{html.escape(item['query'])}</strong><br><span>{html.escape(expansions)}</span></td>
<td>{html.escape(item['status'])}</td>
<td>Visual {visual['count']}<br>Audio {audio['count']}</td>
<td>{float(top.get('score') or 0):.3f}<br>{html.escape(top_media.get('title') or '')}</td>
<td>{html.escape(duplicate_text)}</td>
<td><ul>{reasons}</ul></td>
<td>{f'<a href="{html.escape(screenshot_rel)}">contact sheet</a>' if screenshot_rel else ''}</td>
</tr>
"""
)
section_rows = []
for section, summary in section_summaries.items():
section_rows.append(
f"""
<tr>
<td>{html.escape(section)}</td>
<td>{summary['total']}</td>
<td>{summary.get('looks_promising', 0)}</td>
<td>{summary.get('mixed', 0)}</td>
<td>{summary.get('needs_review', 0)}</td>
<td>{summary.get('fail', 0)}</td>
</tr>
"""
)
output_path.write_text(
f"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Search UI Diagnostic Report</title>
<style>
body {{ font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; margin: 24px; color: #1f2933; }}
h1, h2 {{ margin-bottom: 8px; }}
.summary {{ display: flex; gap: 12px; flex-wrap: wrap; margin: 16px 0 24px; }}
.pill {{ padding: 8px 12px; border-radius: 8px; background: #eef2f7; }}
table {{ border-collapse: collapse; width: 100%; font-size: 13px; }}
th, td {{ border: 1px solid #d8dee7; padding: 8px; vertical-align: top; }}
th {{ background: #f3f6fa; position: sticky; top: 0; }}
tr.status-looks_promising {{ background: #f4fbf6; }}
tr.status-mixed {{ background: #fffaf0; }}
tr.status-needs_review {{ background: #fff4f0; }}
tr.status-fail {{ background: #fdecec; }}
td span {{ color: #5f6b7a; font-size: 12px; }}
ul {{ margin: 0; padding-left: 18px; }}
a {{ color: #0b65c2; }}
</style>
</head>
<body>
<h1>Search UI Diagnostic Report</h1>
<p>Generated {html.escape(generated_at)} from {html.escape(str(source_queries_path))}.</p>
<div class="summary">
<div class="pill">Queries: {len(query_items)}</div>
<div class="pill">Looks promising: {status_counts.get('looks_promising', 0)}</div>
<div class="pill">Mixed: {status_counts.get('mixed', 0)}</div>
<div class="pill">Needs review: {status_counts.get('needs_review', 0)}</div>
<div class="pill">Fail: {status_counts.get('fail', 0)}</div>
</div>
<h2>Section Summary</h2>
<table>
<thead><tr><th>Section</th><th>Total</th><th>Promising</th><th>Mixed</th><th>Needs review</th><th>Fail</th></tr></thead>
<tbody>{''.join(section_rows)}</tbody>
</table>
<h2>Query Details</h2>
<table>
<thead>
<tr>
<th>Section</th><th>Query</th><th>Status</th><th>Counts</th><th>Top Visual Hit</th><th>Duplicate Groups</th><th>Reasons</th><th>Visuals</th>
</tr>
</thead>
<tbody>{''.join(rows)}</tbody>
</table>
</body>
</html>
""",
encoding="utf-8",
)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--queries", type=Path, required=True)
parser.add_argument("--output-dir", type=Path, required=True)
parser.add_argument("--base-url", default="http://localhost:8001")
parser.add_argument("--limit", type=int, default=40)
parser.add_argument("--thumbs", type=int, default=12)
parser.add_argument("--timeout", type=int, default=90)
parser.add_argument("--sleep", type=float, default=0.0)
parser.add_argument("--max-queries", type=int, default=0)
parser.add_argument("--resume", action="store_true")
args = parser.parse_args()
args.output_dir.mkdir(parents=True, exist_ok=True)
thumbs_dir = args.output_dir / "thumbs"
sheets_dir = args.output_dir / "contact_sheets"
thumbs_dir.mkdir(exist_ok=True)
sheets_dir.mkdir(exist_ok=True)
summary_path = args.output_dir / "summary.json"
report_path = args.output_dir / "report.html"
query_items = parse_queries(args.queries)
if args.max_queries:
query_items = query_items[: args.max_queries]
if not query_items:
print("No queries parsed.", file=sys.stderr)
return 2
existing: dict[str, Any] = {}
if args.resume and summary_path.exists():
previous = json.loads(summary_path.read_text(encoding="utf-8"))
existing = {item["query"]: item for item in previous.get("queries", [])}
session = requests.Session()
all_items: list[dict[str, Any]] = []
started = time.time()
for index, query_item in enumerate(query_items, start=1):
query = query_item.query
if query in existing:
all_items.append(existing[query])
print(f"[{index:03d}/{len(query_items)}] resumed {query}")
continue
print(f"[{index:03d}/{len(query_items)}] {query}", flush=True)
query_start = time.time()
visual = request_json(
session,
args.base_url,
"/api/search",
{"q": query, "language": "E", "method": "image-content", "limit": args.limit},
args.timeout,
)
audio = request_json(
session,
args.base_url,
"/api/search",
{"q": query, "language": "E", "method": "hybrid", "limit": min(args.limit, 30)},
args.timeout,
)
title = request_json(
session,
args.base_url,
"/api/search-title",
{"q": query, "language": "E", "limit": 20},
args.timeout,
)
visual_results = [compact_result(result, rank) for rank, result in enumerate(visual.get("results") or [], start=1)]
audio_results = [compact_result(result, rank) for rank, result in enumerate(audio.get("results") or [], start=1)]
title_results = [compact_result(result, rank) for rank, result in enumerate(title.get("results") or [], start=1)]
thumb_items: list[dict[str, Any]] = []
for result in visual_results[: args.thumbs]:
thumbnail = result.get("thumbnail")
if not thumbnail:
continue
thumb_name = f"{index:03d}-{safe_slug(query)}-r{result['rank']:02d}.jpg"
thumb_path = thumbs_dir / thumb_name
if download_thumbnail(session, args.base_url, thumbnail, thumb_path, args.timeout):
ahash = average_hash(thumb_path)
result["thumbnail_file"] = str(thumb_path)
result["thumbnail_rel"] = str(thumb_path.relative_to(args.output_dir))
result["ahash"] = ahash
thumb_items.append(result)
duplicate_groups = find_duplicate_groups(thumb_items)
visual_summary = {
"method": visual.get("method"),
"count": len(visual_results),
"results": visual_results,
"duplicate_groups": duplicate_groups,
"query_time_ms": round((time.time() - query_start) * 1000, 1),
"visual_query": visual.get("visual_query"),
"person_search": compact_person_search(visual.get("person_search")),
}
audio_summary = {
"method": audio.get("method"),
"count": len(audio_results),
"results": audio_results,
}
title_summary = {
"method": title.get("method"),
"count": len(title_results),
"results": title_results,
}
status, reasons = classify_query(query, visual_summary, audio_summary)
diagnostic_query = diagnostic_query_for_visual(query, visual_summary)
contact_sheet_rel = None
if thumb_items:
sheet_path = sheets_dir / f"{index:03d}-{safe_slug(query)}.jpg"
draw_contact_sheet(query, thumb_items, sheet_path)
contact_sheet_rel = str(sheet_path.relative_to(args.output_dir))
item = {
"index": index,
"section": query_item.section,
"query": query,
"diagnostic_query": diagnostic_query,
"status": status,
"reasons": reasons,
"domain_expansions": domain_expansions(diagnostic_query),
"expected_hints": expected_hints(diagnostic_query),
"contact_sheet": contact_sheet_rel,
"visual": visual_summary,
"audio": audio_summary,
"title": title_summary,
}
all_items.append(item)
checkpoint = {
"generated_at": time.strftime("%Y-%m-%d %H:%M:%S"),
"source_queries": str(args.queries),
"elapsed_seconds": round(time.time() - started, 1),
"queries": all_items,
}
summary_path.write_text(json.dumps(checkpoint, indent=2), encoding="utf-8")
if args.sleep:
time.sleep(args.sleep)
section_summaries: dict[str, dict[str, Any]] = {}
for item in all_items:
summary = section_summaries.setdefault(
item["section"],
{"total": 0, "looks_promising": 0, "mixed": 0, "needs_review": 0, "fail": 0},
)
summary["total"] += 1
summary[item["status"]] += 1
final_summary = {
"generated_at": time.strftime("%Y-%m-%d %H:%M:%S"),
"source_queries": str(args.queries),
"elapsed_seconds": round(time.time() - started, 1),
"counts": dict(Counter(item["status"] for item in all_items)),
"sections": section_summaries,
"queries": all_items,
}
summary_path.write_text(json.dumps(final_summary, indent=2), encoding="utf-8")
render_html_report(
output_path=report_path,
source_queries_path=args.queries,
generated_at=final_summary["generated_at"],
query_items=all_items,
section_summaries=section_summaries,
)
print(f"Wrote {summary_path}")
print(f"Wrote {report_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())