File size: 4,156 Bytes
fb12ddc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# HF_Space_hipVS/search.py
# =========================
# Search — embed query, search project's vector store, LLM interpret.

import logging
from embedding import embed_text, llm_summarize
from vector_store import get_store
from config import DEFAULT_PROJECT

logger = logging.getLogger(__name__)


def _fmt(seconds: float) -> str:
    m, s = divmod(int(seconds), 60)
    return f"{m:02d}:{s:02d}"


def _merge_video_hits(hits: list[dict], gap: float = 10.0) -> list[dict]:
    """Merge adjacent frame-level hits into time ranges."""
    if not hits:
        return []
    by_video: dict[str, list[dict]] = {}
    for h in hits:
        by_video.setdefault(h.get("video_name", "?"), []).append(h)

    merged = []
    for video_name, frames in by_video.items():
        frames.sort(key=lambda x: x.get("timestamp_sec", 0))
        cur = {
            "video_name": video_name,
            "video_path": frames[0].get("video_path", ""),
            "start_sec":  frames[0].get("timestamp_sec", 0),
            "end_sec":    frames[0].get("timestamp_sec", 0),
            "peak_score": frames[0].get("score", 0),
            "frames":     1,
        }
        for f in frames[1:]:
            ts = f.get("timestamp_sec", 0)
            if ts <= cur["end_sec"] + gap:
                cur["end_sec"]    = ts
                cur["peak_score"] = max(cur["peak_score"], f.get("score", 0))
                cur["frames"]    += 1
            else:
                merged.append(cur)
                cur = {
                    "video_name": video_name,
                    "video_path": f.get("video_path", ""),
                    "start_sec":  ts,
                    "end_sec":    ts,
                    "peak_score": f.get("score", 0),
                    "frames":     1,
                }
        merged.append(cur)

    return sorted(merged, key=lambda x: -x["peak_score"])


def search_images(query: str, project: str = DEFAULT_PROJECT, top_k: int = 10, min_score: float = 0.15) -> dict:
    store = get_store(project, "image_index")
    if store.count == 0:
        return {
            "query": query, "results": [],
            "llm_summary": f"No images indexed in project '{project}'. Upload images first.",
            "store_info": str(store),
        }

    query_vec = embed_text(query)
    raw = store.search(query_vec, top_k=top_k)
    filtered = [r for r in raw if r.get("score", 0) >= min_score]
    summary = llm_summarize(query, filtered, mode="image")

    return {
        "query": query,
        "results": filtered,
        "llm_summary": summary,
        "store_info": str(store),
    }


def search_videos(query: str, project: str = DEFAULT_PROJECT, top_k: int = 30, min_score: float = 0.15) -> dict:
    store = get_store(project, "video_index")
    if store.count == 0:
        return {
            "query": query, "matches": [],
            "llm_summary": f"No videos indexed in project '{project}'. Upload videos first.",
            "store_info": str(store),
        }

    query_vec = embed_text(query)
    raw = store.search(query_vec, top_k=top_k)
    filtered = [r for r in raw if r.get("score", 0) >= min_score]
    spans = _merge_video_hits(filtered)

    result_for_llm = [
        {
            "video_name": s["video_name"],
            "timestamp_sec": s["start_sec"],
            "timestamp_label": f"{_fmt(s['start_sec'])} - {_fmt(s['end_sec'])}",
            "score": s["peak_score"],
        }
        for s in spans
    ]
    summary = llm_summarize(query, result_for_llm, mode="video")

    return {
        "query": query,
        "matches": [
            {
                "id": i + 1,
                "video_name": s["video_name"],
                "start": _fmt(s["start_sec"]),
                "end": _fmt(s["end_sec"]),
                "start_seconds": s["start_sec"],
                "end_seconds": s["end_sec"],
                "score": round(s["peak_score"], 4),
                "frames": s["frames"],
                "representative_frame": s.get("frame_path", ""),
            }
            for i, s in enumerate(spans)
        ],
        "llm_summary": summary,
        "store_info": str(store),
    }