memorybridge / scripts /test_vision_pipeline.py
kimandrew927's picture
Initial Space deployment
1004967
"""
Standalone vision pipeline test script.
Captures frames every 2 seconds, runs gesture/affect/VLM scene detection,
stores results in a 30-second rolling buffer. Press Enter to dump the buffer
state, q + Enter to quit.
Usage:
cd "j:\\My Drive\\UB\\SPRING 26\\cse635\\Term Project\\final_architecture"
python scripts/test_vision_pipeline.py
"""
from __future__ import annotations
import sys
import time
from collections import deque
from dataclasses import dataclass
from typing import Optional
import cv2
import numpy as np
from PIL import Image
# ── Constants ─────────────────────────────────────────────────────────────────
BUFFER_MAXLEN = 15 # 15 Γ— 2 s = 30 seconds
CAPTURE_INTERVAL = 2.0 # seconds between frames
SCENE_SIZE = 512 # pixels for VLM resize
CONFIG_PATH = "memorybridge/config/settings.yaml"
SCENE_PROMPT = (
"Look at this image and respond with ONE of the following:\n"
"- If you see a clear smile or positive expression: 'smiling'\n"
"- If you see a frustrated, angry, or negative expression: 'frustrated'\n"
"- If you see a surprised expression: 'surprised'\n"
"- If you see an object being held up or introduced (not a hand gesture): "
"describe the object in 3 words max, e.g. 'holding phone', 'showing cat photo'\n"
"- If you see a thumbs up, thumbs down, pointing, or other hand gesture: "
"name it in 2 words, e.g. 'thumbs up'\n"
"- If nothing significant: respond exactly 'no_signal'\n\n"
"Respond with ONLY one of these options. No other text."
)
# ── Test-only data structure (NOT in production schemas) ──────────────────────
@dataclass
class TestFrame:
timestamp: float
snapshot: object # VisionSnapshot β€” imported at runtime
scene_description: str
# ── Camera helpers ────────────────────────────────────────────────────────────
def _open_camera() -> cv2.VideoCapture:
cap = cv2.VideoCapture(0)
if not cap.isOpened():
print("[ERROR] Cannot open camera (VideoCapture(0) failed).", file=sys.stderr)
sys.exit(1)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
return cap
def _capture_frame(cap: cv2.VideoCapture) -> Optional[np.ndarray]:
ret, frame = cap.read()
return frame if ret else None
# ── VLM scene description ─────────────────────────────────────────────────────
def _call_vlm_scene(vlm, bgr_frame: np.ndarray) -> str:
try:
resized = cv2.resize(bgr_frame, (SCENE_SIZE, SCENE_SIZE))
rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
pil_img = Image.fromarray(rgb)
# Moondream path: exposes encode_image() + query()
if hasattr(vlm, "encode_image") and hasattr(vlm, "query"):
encoded = vlm.encode_image(pil_img)
answer = vlm.query(encoded, SCENE_PROMPT)
if isinstance(answer, dict):
return answer.get("answer", "").strip()
return str(answer).strip()
# LangChain cloud VLM path
import base64, io
from langchain_core.messages import HumanMessage
buf = io.BytesIO()
pil_img.save(buf, format="JPEG")
b64 = base64.b64encode(buf.getvalue()).decode("utf-8")
msg = HumanMessage(content=[
{"type": "text", "text": SCENE_PROMPT},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}},
])
return vlm.invoke([msg]).content.strip()
except Exception as exc:
print(f"[VLM] Scene description error: {exc}", file=sys.stderr)
return "(VLM error)"
# ── Main per-frame logic ──────────────────────────────────────────────────────
def _run_frame(cap, gesture_det, affect_det, air_sign_det, vlm):
from memorybridge.core.schemas import AirSignChar, VisionSnapshot
bgr = _capture_frame(cap)
if bgr is None:
print("[WARN] Frame capture failed β€” skipping.", file=sys.stderr)
return None
# Gesture
gesture_signals = gesture_det.detect(bgr) or []
# Affect
affect_signal = affect_det.detect(bgr)
# Air-sign (sync camera-loop path)
air_sign_char: Optional[AirSignChar] = None
tip = gesture_det.get_fingertip()
now = time.monotonic()
n_pts, should_fire, _ = air_sign_det.update_trajectory_from_tip(tip, now)
if should_fire:
canvas = air_sign_det._render_trajectory()
air_sign_det._reset()
letter = air_sign_det.call_vlm_sync(canvas, n_pts)
if letter is not None:
air_sign_char = AirSignChar(
character=letter,
confidence=0.85,
timestamp=time.time(),
)
print(f"[AirSign] Confirmed letter: {letter}", file=sys.stderr)
# Scene description from VLM
scene_description = _call_vlm_scene(vlm, bgr)
snapshot = VisionSnapshot(
timestamp=time.time(),
gestures=gesture_signals,
affect=affect_signal,
air_sign_char=air_sign_char,
)
return TestFrame(
timestamp=snapshot.timestamp,
snapshot=snapshot,
scene_description=scene_description,
)
# ── Deduplication ─────────────────────────────────────────────────────────────
def _is_no_signal(frame: TestFrame) -> bool:
return "no_signal" in frame.scene_description.lower()
def _is_duplicate(last: Optional[TestFrame], new: TestFrame) -> bool:
"""Compare only against the last frame that had a real behavioral signal."""
if last is None:
return False
def top_gesture(frame: TestFrame) -> str:
if frame.snapshot.gestures:
return frame.snapshot.gestures[0].gesture_class
return "neutral"
same_gesture = top_gesture(last) == top_gesture(new)
same_scene = last.scene_description == new.scene_description
return same_gesture and same_scene
# ── Display helpers ───────────────────────────────────────────────────────────
def _fmt_time(ts: float) -> str:
import datetime
return datetime.datetime.fromtimestamp(ts).strftime("%H:%M:%S.%f")[:-3]
def _print_frame(frame: TestFrame, index: int) -> None:
snap = frame.snapshot
top_g = "None"
if snap.gestures:
g = snap.gestures[0]
top_g = f"{g.gesture_class} (conf={g.confidence:.2f})"
aff = "None"
if snap.affect is not None:
a = snap.affect
aff = f"{a.affect_class} (conf={a.confidence:.2f})"
air = "None"
if snap.air_sign_char is not None:
air = f"'{snap.air_sign_char.character}' (conf={snap.air_sign_char.confidence:.2f})"
print(f"\n[Frame {index} | {_fmt_time(frame.timestamp)}]")
print(f" Gesture : {top_g}")
print(f" Affect : {aff}")
print(f" Air-Sign : {air}")
print(f" Scene : {frame.scene_description}")
def _print_buffer_state(test_buffer: deque, signal_buffer) -> None:
print("\n" + "=" * 60)
print(f" BUFFER STATE ({len(test_buffer)} frames stored)")
print("=" * 60)
for i, frame in enumerate(test_buffer):
_print_frame(frame, i + 1)
print("\n── Aggregated signals ──")
state = signal_buffer.get_state_sync()
if state.aggregated_gesture:
g = state.aggregated_gesture
print(f" Gesture : {g.gesture_class} (conf={g.confidence:.2f})")
else:
print(" Gesture : None (all neutral)")
if state.aggregated_affect:
a = state.aggregated_affect
print(f" Affect : {a.affect_class} (conf={a.confidence:.2f})")
else:
print(" Affect : None (no dominant class)")
if state.air_sign_sequence:
letters = "".join(c.character for c in state.air_sign_sequence)
print(f" Air-Sign : confirmed sequence = '{letters}'")
else:
print(" Air-Sign : (none confirmed)")
print("=" * 60)
# ── Keyboard input (Windows β€” msvcrt) ────────────────────────────────────────
def _check_keypress() -> Optional[str]:
try:
import msvcrt
if msvcrt.kbhit():
return msvcrt.getch().decode("utf-8", errors="ignore")
except ImportError:
pass
return None
# ── Main ──────────────────────────────────────────────────────────────────────
def main() -> None:
print("Initialising MemoryBridge vision pipeline test…")
print(f" Buffer: {BUFFER_MAXLEN} frames Γ— {CAPTURE_INTERVAL:.0f}s = {BUFFER_MAXLEN * int(CAPTURE_INTERVAL)}s window")
# Load registry + VLM
from memorybridge.core.models import ModelRegistry
registry = ModelRegistry(CONFIG_PATH)
print(" Loading VLM (may take a moment on first run)…", end="", flush=True)
vlm = registry.get_vlm()
print(" done.")
# Vision detectors
from memorybridge.vision_path.gesture_detector import GestureDetector
from memorybridge.vision_path.affect_detector import AffectDetector
from memorybridge.vision_path.air_sign_detector import AirSignDetector
from memorybridge.vision_path.signal_buffer import SignalBuffer
gesture_det = GestureDetector()
affect_det = AffectDetector()
air_sign_det = AirSignDetector(registry)
signal_buffer = SignalBuffer(
buffer_size=BUFFER_MAXLEN,
air_sign_confirmation_windows=2,
)
# Test-side deque (mirrors signal_buffer for display)
test_buffer: deque[TestFrame] = deque(maxlen=BUFFER_MAXLEN)
# Open camera
cap = _open_camera()
print("\nStarting β€” press Enter to dump buffer, q + Enter to quit.\n")
print("-" * 60)
frame_index = 0
last_frame: Optional[TestFrame] = None
pending_quit = False
running = True
try:
while running:
loop_start = time.monotonic()
new_frame = _run_frame(cap, gesture_det, affect_det, air_sign_det, vlm)
if new_frame is not None:
frame_index += 1
_print_frame(new_frame, frame_index)
has_vlm_signal = not _is_no_signal(new_frame)
has_gesture_signal = bool(
new_frame.snapshot.gestures
and new_frame.snapshot.gestures[0].gesture_class != "neutral"
and new_frame.snapshot.gestures[0].confidence >= 0.75
)
has_air_sign = new_frame.snapshot.air_sign_char is not None
has_any_signal = has_vlm_signal or has_gesture_signal or has_air_sign
if not has_any_signal:
print(" (no_signal β€” buffer unchanged)")
elif _is_duplicate(last_frame, new_frame):
print(" (no change β€” buffer unchanged)")
else:
test_buffer.append(new_frame)
# Direct append is GIL-atomic and safe in a single-threaded script
signal_buffer._buffer.append(new_frame.snapshot)
last_frame = new_frame
# Keyboard input
ch = _check_keypress()
if ch == "q":
pending_quit = True
print(" [Press Enter to confirm quit]")
elif ch in ("\r", "\n"):
if pending_quit:
running = False
else:
_print_buffer_state(test_buffer, signal_buffer)
elapsed = time.monotonic() - loop_start
sleep_s = max(0.0, CAPTURE_INTERVAL - elapsed)
time.sleep(sleep_s)
except KeyboardInterrupt:
print("\n[Interrupted]")
finally:
cap.release()
gesture_det.close()
affect_det.close()
air_sign_det.close()
print("\nCamera released. Done.")
if __name__ == "__main__":
main()