""" Standalone vision pipeline test script. Captures frames every 2 seconds, runs gesture/affect/VLM scene detection, stores results in a 30-second rolling buffer. Press Enter to dump the buffer state, q + Enter to quit. Usage: cd "j:\\My Drive\\UB\\SPRING 26\\cse635\\Term Project\\final_architecture" python scripts/test_vision_pipeline.py """ from __future__ import annotations import sys import time from collections import deque from dataclasses import dataclass from typing import Optional import cv2 import numpy as np from PIL import Image # ── Constants ───────────────────────────────────────────────────────────────── BUFFER_MAXLEN = 15 # 15 × 2 s = 30 seconds CAPTURE_INTERVAL = 2.0 # seconds between frames SCENE_SIZE = 512 # pixels for VLM resize CONFIG_PATH = "memorybridge/config/settings.yaml" SCENE_PROMPT = ( "Look at this image and respond with ONE of the following:\n" "- If you see a clear smile or positive expression: 'smiling'\n" "- If you see a frustrated, angry, or negative expression: 'frustrated'\n" "- If you see a surprised expression: 'surprised'\n" "- If you see an object being held up or introduced (not a hand gesture): " "describe the object in 3 words max, e.g. 'holding phone', 'showing cat photo'\n" "- If you see a thumbs up, thumbs down, pointing, or other hand gesture: " "name it in 2 words, e.g. 'thumbs up'\n" "- If nothing significant: respond exactly 'no_signal'\n\n" "Respond with ONLY one of these options. No other text." ) # ── Test-only data structure (NOT in production schemas) ────────────────────── @dataclass class TestFrame: timestamp: float snapshot: object # VisionSnapshot — imported at runtime scene_description: str # ── Camera helpers ──────────────────────────────────────────────────────────── def _open_camera() -> cv2.VideoCapture: cap = cv2.VideoCapture(0) if not cap.isOpened(): print("[ERROR] Cannot open camera (VideoCapture(0) failed).", file=sys.stderr) sys.exit(1) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) return cap def _capture_frame(cap: cv2.VideoCapture) -> Optional[np.ndarray]: ret, frame = cap.read() return frame if ret else None # ── VLM scene description ───────────────────────────────────────────────────── def _call_vlm_scene(vlm, bgr_frame: np.ndarray) -> str: try: resized = cv2.resize(bgr_frame, (SCENE_SIZE, SCENE_SIZE)) rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB) pil_img = Image.fromarray(rgb) # Moondream path: exposes encode_image() + query() if hasattr(vlm, "encode_image") and hasattr(vlm, "query"): encoded = vlm.encode_image(pil_img) answer = vlm.query(encoded, SCENE_PROMPT) if isinstance(answer, dict): return answer.get("answer", "").strip() return str(answer).strip() # LangChain cloud VLM path import base64, io from langchain_core.messages import HumanMessage buf = io.BytesIO() pil_img.save(buf, format="JPEG") b64 = base64.b64encode(buf.getvalue()).decode("utf-8") msg = HumanMessage(content=[ {"type": "text", "text": SCENE_PROMPT}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}}, ]) return vlm.invoke([msg]).content.strip() except Exception as exc: print(f"[VLM] Scene description error: {exc}", file=sys.stderr) return "(VLM error)" # ── Main per-frame logic ────────────────────────────────────────────────────── def _run_frame(cap, gesture_det, affect_det, air_sign_det, vlm): from memorybridge.core.schemas import AirSignChar, VisionSnapshot bgr = _capture_frame(cap) if bgr is None: print("[WARN] Frame capture failed — skipping.", file=sys.stderr) return None # Gesture gesture_signals = gesture_det.detect(bgr) or [] # Affect affect_signal = affect_det.detect(bgr) # Air-sign (sync camera-loop path) air_sign_char: Optional[AirSignChar] = None tip = gesture_det.get_fingertip() now = time.monotonic() n_pts, should_fire, _ = air_sign_det.update_trajectory_from_tip(tip, now) if should_fire: canvas = air_sign_det._render_trajectory() air_sign_det._reset() letter = air_sign_det.call_vlm_sync(canvas, n_pts) if letter is not None: air_sign_char = AirSignChar( character=letter, confidence=0.85, timestamp=time.time(), ) print(f"[AirSign] Confirmed letter: {letter}", file=sys.stderr) # Scene description from VLM scene_description = _call_vlm_scene(vlm, bgr) snapshot = VisionSnapshot( timestamp=time.time(), gestures=gesture_signals, affect=affect_signal, air_sign_char=air_sign_char, ) return TestFrame( timestamp=snapshot.timestamp, snapshot=snapshot, scene_description=scene_description, ) # ── Deduplication ───────────────────────────────────────────────────────────── def _is_no_signal(frame: TestFrame) -> bool: return "no_signal" in frame.scene_description.lower() def _is_duplicate(last: Optional[TestFrame], new: TestFrame) -> bool: """Compare only against the last frame that had a real behavioral signal.""" if last is None: return False def top_gesture(frame: TestFrame) -> str: if frame.snapshot.gestures: return frame.snapshot.gestures[0].gesture_class return "neutral" same_gesture = top_gesture(last) == top_gesture(new) same_scene = last.scene_description == new.scene_description return same_gesture and same_scene # ── Display helpers ─────────────────────────────────────────────────────────── def _fmt_time(ts: float) -> str: import datetime return datetime.datetime.fromtimestamp(ts).strftime("%H:%M:%S.%f")[:-3] def _print_frame(frame: TestFrame, index: int) -> None: snap = frame.snapshot top_g = "None" if snap.gestures: g = snap.gestures[0] top_g = f"{g.gesture_class} (conf={g.confidence:.2f})" aff = "None" if snap.affect is not None: a = snap.affect aff = f"{a.affect_class} (conf={a.confidence:.2f})" air = "None" if snap.air_sign_char is not None: air = f"'{snap.air_sign_char.character}' (conf={snap.air_sign_char.confidence:.2f})" print(f"\n[Frame {index} | {_fmt_time(frame.timestamp)}]") print(f" Gesture : {top_g}") print(f" Affect : {aff}") print(f" Air-Sign : {air}") print(f" Scene : {frame.scene_description}") def _print_buffer_state(test_buffer: deque, signal_buffer) -> None: print("\n" + "=" * 60) print(f" BUFFER STATE ({len(test_buffer)} frames stored)") print("=" * 60) for i, frame in enumerate(test_buffer): _print_frame(frame, i + 1) print("\n── Aggregated signals ──") state = signal_buffer.get_state_sync() if state.aggregated_gesture: g = state.aggregated_gesture print(f" Gesture : {g.gesture_class} (conf={g.confidence:.2f})") else: print(" Gesture : None (all neutral)") if state.aggregated_affect: a = state.aggregated_affect print(f" Affect : {a.affect_class} (conf={a.confidence:.2f})") else: print(" Affect : None (no dominant class)") if state.air_sign_sequence: letters = "".join(c.character for c in state.air_sign_sequence) print(f" Air-Sign : confirmed sequence = '{letters}'") else: print(" Air-Sign : (none confirmed)") print("=" * 60) # ── Keyboard input (Windows — msvcrt) ──────────────────────────────────────── def _check_keypress() -> Optional[str]: try: import msvcrt if msvcrt.kbhit(): return msvcrt.getch().decode("utf-8", errors="ignore") except ImportError: pass return None # ── Main ────────────────────────────────────────────────────────────────────── def main() -> None: print("Initialising MemoryBridge vision pipeline test…") print(f" Buffer: {BUFFER_MAXLEN} frames × {CAPTURE_INTERVAL:.0f}s = {BUFFER_MAXLEN * int(CAPTURE_INTERVAL)}s window") # Load registry + VLM from memorybridge.core.models import ModelRegistry registry = ModelRegistry(CONFIG_PATH) print(" Loading VLM (may take a moment on first run)…", end="", flush=True) vlm = registry.get_vlm() print(" done.") # Vision detectors from memorybridge.vision_path.gesture_detector import GestureDetector from memorybridge.vision_path.affect_detector import AffectDetector from memorybridge.vision_path.air_sign_detector import AirSignDetector from memorybridge.vision_path.signal_buffer import SignalBuffer gesture_det = GestureDetector() affect_det = AffectDetector() air_sign_det = AirSignDetector(registry) signal_buffer = SignalBuffer( buffer_size=BUFFER_MAXLEN, air_sign_confirmation_windows=2, ) # Test-side deque (mirrors signal_buffer for display) test_buffer: deque[TestFrame] = deque(maxlen=BUFFER_MAXLEN) # Open camera cap = _open_camera() print("\nStarting — press Enter to dump buffer, q + Enter to quit.\n") print("-" * 60) frame_index = 0 last_frame: Optional[TestFrame] = None pending_quit = False running = True try: while running: loop_start = time.monotonic() new_frame = _run_frame(cap, gesture_det, affect_det, air_sign_det, vlm) if new_frame is not None: frame_index += 1 _print_frame(new_frame, frame_index) has_vlm_signal = not _is_no_signal(new_frame) has_gesture_signal = bool( new_frame.snapshot.gestures and new_frame.snapshot.gestures[0].gesture_class != "neutral" and new_frame.snapshot.gestures[0].confidence >= 0.75 ) has_air_sign = new_frame.snapshot.air_sign_char is not None has_any_signal = has_vlm_signal or has_gesture_signal or has_air_sign if not has_any_signal: print(" (no_signal — buffer unchanged)") elif _is_duplicate(last_frame, new_frame): print(" (no change — buffer unchanged)") else: test_buffer.append(new_frame) # Direct append is GIL-atomic and safe in a single-threaded script signal_buffer._buffer.append(new_frame.snapshot) last_frame = new_frame # Keyboard input ch = _check_keypress() if ch == "q": pending_quit = True print(" [Press Enter to confirm quit]") elif ch in ("\r", "\n"): if pending_quit: running = False else: _print_buffer_state(test_buffer, signal_buffer) elapsed = time.monotonic() - loop_start sleep_s = max(0.0, CAPTURE_INTERVAL - elapsed) time.sleep(sleep_s) except KeyboardInterrupt: print("\n[Interrupted]") finally: cap.release() gesture_det.close() affect_det.close() air_sign_det.close() print("\nCamera released. Done.") if __name__ == "__main__": main()