jw-search / scripts /rematch-face-database.py
jw-tools's picture
deploy: latest main (lazy-ML cold start, durable launcher, web-image search, scene search) + full-app data refresh
7ea1851 verified
#!/usr/bin/env python3
"""
Re-evaluate all stored face detections against the current person reference set.
This is the backfill step that makes newly curated references pay off across
already processed videos without rescanning the media files.
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any
import numpy as np
ROOT_DIR = Path(__file__).resolve().parents[1]
BACKEND_DIR = ROOT_DIR / "backend"
if str(BACKEND_DIR) not in sys.path:
sys.path.insert(0, str(BACKEND_DIR))
from face_search import MATCH_THRESHOLD, compare_embeddings, get_face_search_instance # noqa: E402
from utils import log_message # noqa: E402
def parse_embedding(raw_value: Any) -> np.ndarray | None:
if raw_value is None:
return None
if isinstance(raw_value, np.ndarray):
return raw_value
if isinstance(raw_value, memoryview):
raw_value = raw_value.tobytes()
if isinstance(raw_value, (bytes, bytearray)):
return np.frombuffer(raw_value, dtype=np.float32)
if isinstance(raw_value, str):
return np.array(json.loads(raw_value), dtype=np.float32)
if isinstance(raw_value, list):
return np.array(raw_value, dtype=np.float32)
return None
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Rematch stored face detections against current person references.")
parser.add_argument("--threshold", type=float, default=MATCH_THRESHOLD)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument(
"--source-type",
dest="source_types",
action="append",
help="Limit person reference averages to these source types. Repeat to allow multiple.",
)
parser.add_argument(
"--report",
type=Path,
default=ROOT_DIR / "docs" / "reports" / "face-rematch-report-v1.json",
)
return parser
def get_filtered_person_average_embedding(face_search, person_id: int, source_types: list[str] | None) -> np.ndarray | None:
conn = face_search._get_db_connection()
try:
if source_types:
placeholders = ",".join("?" for _ in source_types)
rows = conn.execute(
f"""
SELECT embedding_json
FROM person_embeddings
WHERE person_id = ? AND source_type IN ({placeholders})
""",
[person_id, *source_types],
).fetchall()
else:
rows = conn.execute(
"""
SELECT embedding_json
FROM person_embeddings
WHERE person_id = ?
""",
(person_id,),
).fetchall()
finally:
conn.close()
embeddings = [parse_embedding(row[0]) for row in rows]
embeddings = [embedding for embedding in embeddings if embedding is not None]
if not embeddings:
return None
avg_embedding = np.mean(embeddings, axis=0)
norm = np.linalg.norm(avg_embedding)
if norm == 0:
return avg_embedding
return avg_embedding / norm
def main() -> int:
args = build_parser().parse_args()
face_search = get_face_search_instance()
people = face_search.list_persons()
person_averages: dict[int, np.ndarray] = {}
for person in people:
avg = get_filtered_person_average_embedding(face_search, person["id"], args.source_types)
if avg is not None:
person_averages[person["id"]] = avg
conn = face_search._get_db_connection()
rows = conn.execute(
"""
SELECT natural_key, frame_number, face_index, person_id, embedding
FROM face_embeddings
ORDER BY natural_key, frame_number, face_index
"""
).fetchall()
conn.close()
updates: list[dict[str, Any]] = []
unchanged = 0
for natural_key, frame_number, face_index, current_person_id, embedding_raw in rows:
embedding = parse_embedding(embedding_raw)
if embedding is None:
unchanged += 1
continue
best_person_id = 0
best_similarity = 0.0
for person_id, avg in person_averages.items():
similarity = compare_embeddings(embedding, avg)
if similarity > best_similarity and similarity >= args.threshold:
best_similarity = similarity
best_person_id = person_id
current_person_value = int(current_person_id or 0)
if best_person_id == current_person_value:
unchanged += 1
continue
update = {
"natural_key": natural_key,
"frame_number": int(frame_number),
"face_index": int(face_index),
"old_person_id": current_person_value,
"new_person_id": best_person_id,
"similarity": round(float(best_similarity), 4),
}
updates.append(update)
if not args.dry_run:
face_search.reassign_face(natural_key, int(frame_number), int(face_index), best_person_id or None)
report = {
"threshold": args.threshold,
"dry_run": args.dry_run,
"source_types": args.source_types or [],
"persons_considered": len(person_averages),
"total_faces": len(rows),
"unchanged": unchanged,
"updated": len(updates),
"to_identified": sum(1 for item in updates if item["new_person_id"] > 0),
"to_unidentified": sum(1 for item in updates if item["new_person_id"] == 0),
"updates": updates,
}
args.report.parent.mkdir(parents=True, exist_ok=True)
args.report.write_text(json.dumps(report, indent=2, ensure_ascii=False), encoding="utf-8")
log_message(f"Rematch finished: {len(updates)} updates, {unchanged} unchanged")
print(f"Wrote report to {args.report}")
return 0
if __name__ == "__main__":
raise SystemExit(main())