Billavenu's picture
adding filleeeesssss
fb12ddc verified
# HF_Space_hipVS/ingest.py
# =========================
# Ingestion pipeline — embeds images/frames DIRECTLY with Qwen3-VL or CLIP.
# No captioning step. The vision-language model encodes images and text
# into the same vector space natively.
#
# CAGRA is rebuilt on every insert (optimized for query, not ingestion).
import logging
import os
import shutil
import subprocess
import tempfile
import time
from pathlib import Path
from PIL import Image as PILImage
from config import (
EMBED_DIM,
FRAME_EVERY_SEC,
IMAGE_EXTENSIONS,
VIDEO_EXTENSIONS,
get_project_dir,
DEFAULT_PROJECT,
)
from embedding import embed_image, embed_image_bytes
from vector_store import get_store
logger = logging.getLogger(__name__)
# ── Helpers ──────────────────────────────────────────────────────────────────
def fmt_time(seconds: float) -> str:
m, s = divmod(int(seconds), 60)
return f"{m:02d}:{s:02d}"
def check_ffmpeg() -> bool:
try:
subprocess.run(["ffprobe", "-version"], capture_output=True, timeout=5)
return True
except (FileNotFoundError, subprocess.TimeoutExpired):
return False
HAS_FFMPEG = check_ffmpeg()
def get_duration(video_path: str) -> float:
try:
r = subprocess.run(
["ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
video_path],
capture_output=True, text=True, timeout=30,
)
return float(r.stdout.strip())
except Exception as e:
logger.warning(f"ffprobe error: {e}")
return 0.0
def extract_frame(video_path: str, timestamp_sec: float, out_path: str) -> bool:
result = subprocess.run(
["ffmpeg", "-y",
"-ss", f"{timestamp_sec:.3f}",
"-i", video_path,
"-frames:v", "1",
"-q:v", "2",
"-vf", "scale=640:-1",
out_path],
capture_output=True, timeout=30,
)
return result.returncode == 0 and os.path.exists(out_path) and os.path.getsize(out_path) > 0
def get_image_meta(path: Path) -> dict:
stat = path.stat()
size = f"{round(stat.st_size / 1024, 1)}KB"
try:
with PILImage.open(path) as img:
res = f"{img.width}x{img.height}"
except Exception:
res = "unknown"
return {
"file_path": str(path.resolve()),
"file_name": path.name,
"file_size": size,
"resolution": res,
}
# ── Image Ingestion ─────────────────────────────────────────────────────────
def ingest_images(project: str = DEFAULT_PROJECT, progress_callback=None) -> tuple[int, str]:
"""Ingest all images from a project's images/ directory."""
proj_dir = get_project_dir(project)
image_dir = proj_dir / "images"
store = get_store(project, "image_index")
files = sorted(
f for f in image_dir.iterdir()
if f.suffix.lower() in IMAGE_EXTENSIONS
)[:200]
if not files:
return 0, f"No images found in {image_dir}"
store.clear()
log = [f"[{project}] Found {len(files)} images\n"]
import numpy as np
all_vectors = []
all_ids = []
all_meta = []
for i, p in enumerate(files):
meta = get_image_meta(p)
try:
img = PILImage.open(p)
vec = embed_image(img) # direct multimodal embed, no captioning
all_vectors.append(vec)
all_ids.append(meta["file_name"])
all_meta.append(meta)
log.append(f" [{i+1}/{len(files)}] {p.name} ({meta['resolution']})")
except Exception as e:
log.append(f" [{i+1}/{len(files)}] {p.name}: FAILED ({e})")
if progress_callback:
progress_callback((i + 1) / len(files), desc=f"Embedding {p.name}...")
if all_vectors:
vectors = np.stack(all_vectors)
store.add(vectors, all_ids, all_meta) # CAGRA rebuilt inside add()
log.append(f"\n{len(all_vectors)} images indexed ({store.mode})")
return len(all_vectors), "\n".join(log)
def ingest_single_image(file_path: str, project: str = DEFAULT_PROJECT) -> tuple[bool, str]:
"""Ingest a single uploaded image. CAGRA is rebuilt."""
path = Path(file_path)
proj_dir = get_project_dir(project)
dest = proj_dir / "images" / path.name
shutil.copy2(str(path), str(dest))
store = get_store(project, "image_index")
meta = get_image_meta(dest)
try:
img = PILImage.open(dest)
vec = embed_image(img)
store.append_and_rebuild(vec, meta["file_name"], meta)
return True, f"Indexed: {path.name} ({meta['resolution']})"
except Exception as e:
return False, f"Failed: {path.name} -- {e}"
def ingest_image_from_pil(
image: PILImage.Image,
file_name: str,
extra_meta: dict | None = None,
project: str = DEFAULT_PROJECT,
) -> tuple[bool, str]:
"""Ingest a PIL Image directly (used by seed_data). No CAGRA rebuild per-image."""
proj_dir = get_project_dir(project)
dest = proj_dir / "images" / file_name
store = get_store(project, "image_index")
try:
if not dest.exists():
image.save(str(dest))
vec = embed_image(image)
meta = {
"file_name": file_name,
"file_path": str(dest.resolve()),
**(extra_meta or {})
}
store.append(vec, file_name, meta) # no rebuild — seed_data calls rebuild at end
return True, file_name
except Exception as e:
return False, str(e)
# ── Video Ingestion ─────────────────────────────────────────────────────────
def ingest_videos(project: str = DEFAULT_PROJECT, progress_callback=None) -> tuple[int, str]:
"""Ingest all videos from a project's videos/ directory."""
if not HAS_FFMPEG:
return 0, "ffmpeg not found -- install ffmpeg for video ingestion."
proj_dir = get_project_dir(project)
video_dir = proj_dir / "videos"
store = get_store(project, "video_index")
frames_root = proj_dir / "videos" / "frames"
frames_root.mkdir(parents=True, exist_ok=True)
files = sorted(
f for f in video_dir.iterdir()
if f.suffix.lower() in VIDEO_EXTENSIONS
)
if not files:
return 0, f"No videos found in {video_dir}"
store.clear()
log = [f"[{project}] Found {len(files)} video(s) -- frame interval: {FRAME_EVERY_SEC}s\n"]
total = 0
for video_path in files:
video_str = str(video_path.resolve())
duration = get_duration(video_str)
if duration <= 0:
log.append(f" Skipping {video_path.name} (duration unreadable)")
continue
timestamps = [0.5]
t = float(FRAME_EVERY_SEC)
while t < duration:
timestamps.append(round(t, 2))
t += FRAME_EVERY_SEC
if (duration - 1.0) not in timestamps:
timestamps.append(round(max(0, duration - 1.0), 2))
timestamps = sorted(set(timestamps))
log.append(f" {video_path.name} ({duration:.1f}s -> {len(timestamps)} frames)")
with tempfile.TemporaryDirectory() as tmp_dir:
for idx, ts in enumerate(timestamps):
frame_path = os.path.join(tmp_dir, f"frame_{idx:05d}.jpg")
if not extract_frame(video_str, ts, frame_path):
continue
try:
with open(frame_path, "rb") as f:
frame_data = f.read()
# Save frame permanently
perm_frame_path = frames_root / f"{video_path.name}_{ts:.2f}.jpg"
shutil.copy2(frame_path, str(perm_frame_path))
vec = embed_image_bytes(frame_data)
frame_meta = {
"video_path": video_str,
"video_name": video_path.name,
"frame_path": str(perm_frame_path.resolve()),
"timestamp_sec": ts,
"timestamp_label": fmt_time(ts),
"duration_total": round(duration, 2),
}
store.append(vec, f"{video_path.name}@{ts}", frame_meta)
total += 1
time.sleep(0.05)
except Exception as e:
log.append(f" ts={fmt_time(ts)}: FAILED ({e})")
if progress_callback:
progress_callback(
(idx + 1) / len(timestamps),
desc=f"{video_path.name} frame {idx+1}/{len(timestamps)}",
)
log.append(f" Done ({len(timestamps)} frames)")
# Rebuild CAGRA once for all videos
if store.has_data():
store.rebuild_gpu_index()
store._persist()
log.append(f"\n{total} video frames indexed ({store.mode})")
return total, "\n".join(log)
def ingest_single_video(file_path: str, project: str = DEFAULT_PROJECT, progress_callback=None) -> tuple[int, str]:
"""Ingest a single uploaded video. CAGRA rebuilt at end."""
path = Path(file_path)
proj_dir = get_project_dir(project)
dest = proj_dir / "videos" / path.name
shutil.copy2(str(path), str(dest))
if not HAS_FFMPEG:
return 0, "ffmpeg not found"
store = get_store(project, "video_index")
video_str = str(dest.resolve())
duration = get_duration(video_str)
if duration <= 0:
return 0, f"Could not read duration for {path.name}"
frames_root = proj_dir / "videos" / "frames"
frames_root.mkdir(parents=True, exist_ok=True)
timestamps = [0.5]
t = float(FRAME_EVERY_SEC)
while t < duration:
timestamps.append(round(t, 2))
t += FRAME_EVERY_SEC
timestamps = sorted(set(timestamps))
count = 0
with tempfile.TemporaryDirectory() as tmp_dir:
for idx, ts in enumerate(timestamps):
frame_path = os.path.join(tmp_dir, f"frame_{idx:05d}.jpg")
if not extract_frame(video_str, ts, frame_path):
continue
try:
with open(frame_path, "rb") as f:
frame_data = f.read()
# Save frame permanently
perm_frame_path = frames_root / f"{path.name}_{ts:.2f}.jpg"
shutil.copy2(frame_path, str(perm_frame_path))
vec = embed_image_bytes(frame_data)
frame_meta = {
"video_path": video_str,
"video_name": path.name,
"frame_path": str(perm_frame_path.resolve()),
"timestamp_sec": ts,
"timestamp_label": fmt_time(ts),
"duration_total": round(duration, 2),
}
store.append(vec, f"{path.name}@{ts}", frame_meta)
count += 1
except Exception as e:
logger.error(f"Frame embed error: {e}")
if progress_callback:
progress_callback((idx + 1) / len(timestamps))
# Rebuild CAGRA after all frames
if store.has_data():
store.rebuild_gpu_index()
store._persist()
return count, f"{count} frames indexed for {path.name} ({duration:.1f}s)"