| |
| |
| |
| |
| |
| |
| |
|
|
| import logging |
| import os |
| import shutil |
| import subprocess |
| import tempfile |
| import time |
| from pathlib import Path |
|
|
| from PIL import Image as PILImage |
|
|
| from config import ( |
| EMBED_DIM, |
| FRAME_EVERY_SEC, |
| IMAGE_EXTENSIONS, |
| VIDEO_EXTENSIONS, |
| get_project_dir, |
| DEFAULT_PROJECT, |
| ) |
| from embedding import embed_image, embed_image_bytes |
| from vector_store import get_store |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
|
|
| def fmt_time(seconds: float) -> str: |
| m, s = divmod(int(seconds), 60) |
| return f"{m:02d}:{s:02d}" |
|
|
|
|
| def check_ffmpeg() -> bool: |
| try: |
| subprocess.run(["ffprobe", "-version"], capture_output=True, timeout=5) |
| return True |
| except (FileNotFoundError, subprocess.TimeoutExpired): |
| return False |
|
|
|
|
| HAS_FFMPEG = check_ffmpeg() |
|
|
|
|
| def get_duration(video_path: str) -> float: |
| try: |
| r = subprocess.run( |
| ["ffprobe", "-v", "error", |
| "-show_entries", "format=duration", |
| "-of", "default=noprint_wrappers=1:nokey=1", |
| video_path], |
| capture_output=True, text=True, timeout=30, |
| ) |
| return float(r.stdout.strip()) |
| except Exception as e: |
| logger.warning(f"ffprobe error: {e}") |
| return 0.0 |
|
|
|
|
| def extract_frame(video_path: str, timestamp_sec: float, out_path: str) -> bool: |
| result = subprocess.run( |
| ["ffmpeg", "-y", |
| "-ss", f"{timestamp_sec:.3f}", |
| "-i", video_path, |
| "-frames:v", "1", |
| "-q:v", "2", |
| "-vf", "scale=640:-1", |
| out_path], |
| capture_output=True, timeout=30, |
| ) |
| return result.returncode == 0 and os.path.exists(out_path) and os.path.getsize(out_path) > 0 |
|
|
|
|
| def get_image_meta(path: Path) -> dict: |
| stat = path.stat() |
| size = f"{round(stat.st_size / 1024, 1)}KB" |
| try: |
| with PILImage.open(path) as img: |
| res = f"{img.width}x{img.height}" |
| except Exception: |
| res = "unknown" |
| return { |
| "file_path": str(path.resolve()), |
| "file_name": path.name, |
| "file_size": size, |
| "resolution": res, |
| } |
|
|
|
|
| |
|
|
| def ingest_images(project: str = DEFAULT_PROJECT, progress_callback=None) -> tuple[int, str]: |
| """Ingest all images from a project's images/ directory.""" |
| proj_dir = get_project_dir(project) |
| image_dir = proj_dir / "images" |
| store = get_store(project, "image_index") |
|
|
| files = sorted( |
| f for f in image_dir.iterdir() |
| if f.suffix.lower() in IMAGE_EXTENSIONS |
| )[:200] |
|
|
| if not files: |
| return 0, f"No images found in {image_dir}" |
|
|
| store.clear() |
| log = [f"[{project}] Found {len(files)} images\n"] |
|
|
| import numpy as np |
| all_vectors = [] |
| all_ids = [] |
| all_meta = [] |
|
|
| for i, p in enumerate(files): |
| meta = get_image_meta(p) |
| try: |
| img = PILImage.open(p) |
| vec = embed_image(img) |
| all_vectors.append(vec) |
| all_ids.append(meta["file_name"]) |
| all_meta.append(meta) |
| log.append(f" [{i+1}/{len(files)}] {p.name} ({meta['resolution']})") |
| except Exception as e: |
| log.append(f" [{i+1}/{len(files)}] {p.name}: FAILED ({e})") |
|
|
| if progress_callback: |
| progress_callback((i + 1) / len(files), desc=f"Embedding {p.name}...") |
|
|
| if all_vectors: |
| vectors = np.stack(all_vectors) |
| store.add(vectors, all_ids, all_meta) |
|
|
| log.append(f"\n{len(all_vectors)} images indexed ({store.mode})") |
| return len(all_vectors), "\n".join(log) |
|
|
|
|
| def ingest_single_image(file_path: str, project: str = DEFAULT_PROJECT) -> tuple[bool, str]: |
| """Ingest a single uploaded image. CAGRA is rebuilt.""" |
| path = Path(file_path) |
| proj_dir = get_project_dir(project) |
| dest = proj_dir / "images" / path.name |
| shutil.copy2(str(path), str(dest)) |
|
|
| store = get_store(project, "image_index") |
| meta = get_image_meta(dest) |
|
|
| try: |
| img = PILImage.open(dest) |
| vec = embed_image(img) |
| store.append_and_rebuild(vec, meta["file_name"], meta) |
| return True, f"Indexed: {path.name} ({meta['resolution']})" |
| except Exception as e: |
| return False, f"Failed: {path.name} -- {e}" |
|
|
|
|
| def ingest_image_from_pil( |
| image: PILImage.Image, |
| file_name: str, |
| extra_meta: dict | None = None, |
| project: str = DEFAULT_PROJECT, |
| ) -> tuple[bool, str]: |
| """Ingest a PIL Image directly (used by seed_data). No CAGRA rebuild per-image.""" |
| proj_dir = get_project_dir(project) |
| dest = proj_dir / "images" / file_name |
| store = get_store(project, "image_index") |
| |
| try: |
| if not dest.exists(): |
| image.save(str(dest)) |
| |
| vec = embed_image(image) |
| meta = { |
| "file_name": file_name, |
| "file_path": str(dest.resolve()), |
| **(extra_meta or {}) |
| } |
| store.append(vec, file_name, meta) |
| return True, file_name |
| except Exception as e: |
| return False, str(e) |
|
|
|
|
| |
|
|
| def ingest_videos(project: str = DEFAULT_PROJECT, progress_callback=None) -> tuple[int, str]: |
| """Ingest all videos from a project's videos/ directory.""" |
| if not HAS_FFMPEG: |
| return 0, "ffmpeg not found -- install ffmpeg for video ingestion." |
|
|
| proj_dir = get_project_dir(project) |
| video_dir = proj_dir / "videos" |
| store = get_store(project, "video_index") |
|
|
| frames_root = proj_dir / "videos" / "frames" |
| frames_root.mkdir(parents=True, exist_ok=True) |
|
|
| files = sorted( |
| f for f in video_dir.iterdir() |
| if f.suffix.lower() in VIDEO_EXTENSIONS |
| ) |
| if not files: |
| return 0, f"No videos found in {video_dir}" |
|
|
| store.clear() |
| log = [f"[{project}] Found {len(files)} video(s) -- frame interval: {FRAME_EVERY_SEC}s\n"] |
| total = 0 |
|
|
| for video_path in files: |
| video_str = str(video_path.resolve()) |
| duration = get_duration(video_str) |
| if duration <= 0: |
| log.append(f" Skipping {video_path.name} (duration unreadable)") |
| continue |
|
|
| timestamps = [0.5] |
| t = float(FRAME_EVERY_SEC) |
| while t < duration: |
| timestamps.append(round(t, 2)) |
| t += FRAME_EVERY_SEC |
| if (duration - 1.0) not in timestamps: |
| timestamps.append(round(max(0, duration - 1.0), 2)) |
| timestamps = sorted(set(timestamps)) |
|
|
| log.append(f" {video_path.name} ({duration:.1f}s -> {len(timestamps)} frames)") |
|
|
| with tempfile.TemporaryDirectory() as tmp_dir: |
| for idx, ts in enumerate(timestamps): |
| frame_path = os.path.join(tmp_dir, f"frame_{idx:05d}.jpg") |
| if not extract_frame(video_str, ts, frame_path): |
| continue |
|
|
| try: |
| with open(frame_path, "rb") as f: |
| frame_data = f.read() |
| |
| |
| perm_frame_path = frames_root / f"{video_path.name}_{ts:.2f}.jpg" |
| shutil.copy2(frame_path, str(perm_frame_path)) |
| |
| vec = embed_image_bytes(frame_data) |
| frame_meta = { |
| "video_path": video_str, |
| "video_name": video_path.name, |
| "frame_path": str(perm_frame_path.resolve()), |
| "timestamp_sec": ts, |
| "timestamp_label": fmt_time(ts), |
| "duration_total": round(duration, 2), |
| } |
| store.append(vec, f"{video_path.name}@{ts}", frame_meta) |
| total += 1 |
| time.sleep(0.05) |
| except Exception as e: |
| log.append(f" ts={fmt_time(ts)}: FAILED ({e})") |
|
|
| if progress_callback: |
| progress_callback( |
| (idx + 1) / len(timestamps), |
| desc=f"{video_path.name} frame {idx+1}/{len(timestamps)}", |
| ) |
|
|
| log.append(f" Done ({len(timestamps)} frames)") |
|
|
| |
| if store.has_data(): |
| store.rebuild_gpu_index() |
| store._persist() |
|
|
| log.append(f"\n{total} video frames indexed ({store.mode})") |
| return total, "\n".join(log) |
|
|
|
|
| def ingest_single_video(file_path: str, project: str = DEFAULT_PROJECT, progress_callback=None) -> tuple[int, str]: |
| """Ingest a single uploaded video. CAGRA rebuilt at end.""" |
| path = Path(file_path) |
| proj_dir = get_project_dir(project) |
| dest = proj_dir / "videos" / path.name |
| shutil.copy2(str(path), str(dest)) |
|
|
| if not HAS_FFMPEG: |
| return 0, "ffmpeg not found" |
|
|
| store = get_store(project, "video_index") |
| video_str = str(dest.resolve()) |
| duration = get_duration(video_str) |
| if duration <= 0: |
| return 0, f"Could not read duration for {path.name}" |
|
|
| frames_root = proj_dir / "videos" / "frames" |
| frames_root.mkdir(parents=True, exist_ok=True) |
|
|
| timestamps = [0.5] |
| t = float(FRAME_EVERY_SEC) |
| while t < duration: |
| timestamps.append(round(t, 2)) |
| t += FRAME_EVERY_SEC |
| timestamps = sorted(set(timestamps)) |
| count = 0 |
|
|
| with tempfile.TemporaryDirectory() as tmp_dir: |
| for idx, ts in enumerate(timestamps): |
| frame_path = os.path.join(tmp_dir, f"frame_{idx:05d}.jpg") |
| if not extract_frame(video_str, ts, frame_path): |
| continue |
| try: |
| with open(frame_path, "rb") as f: |
| frame_data = f.read() |
| |
| |
| perm_frame_path = frames_root / f"{path.name}_{ts:.2f}.jpg" |
| shutil.copy2(frame_path, str(perm_frame_path)) |
|
|
| vec = embed_image_bytes(frame_data) |
| frame_meta = { |
| "video_path": video_str, |
| "video_name": path.name, |
| "frame_path": str(perm_frame_path.resolve()), |
| "timestamp_sec": ts, |
| "timestamp_label": fmt_time(ts), |
| "duration_total": round(duration, 2), |
| } |
| store.append(vec, f"{path.name}@{ts}", frame_meta) |
| count += 1 |
| except Exception as e: |
| logger.error(f"Frame embed error: {e}") |
|
|
| if progress_callback: |
| progress_callback((idx + 1) / len(timestamps)) |
|
|
| |
| if store.has_data(): |
| store.rebuild_gpu_index() |
| store._persist() |
|
|
| return count, f"{count} frames indexed for {path.name} ({duration:.1f}s)" |
|
|