Spaces:
Build error
Build error
| """ | |
| Standalone vision pipeline test script. | |
| Captures frames every 2 seconds, runs gesture/affect/VLM scene detection, | |
| stores results in a 30-second rolling buffer. Press Enter to dump the buffer | |
| state, q + Enter to quit. | |
| Usage: | |
| cd "j:\\My Drive\\UB\\SPRING 26\\cse635\\Term Project\\final_architecture" | |
| python scripts/test_vision_pipeline.py | |
| """ | |
| from __future__ import annotations | |
| import sys | |
| import time | |
| from collections import deque | |
| from dataclasses import dataclass | |
| from typing import Optional | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| # ββ Constants βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| BUFFER_MAXLEN = 15 # 15 Γ 2 s = 30 seconds | |
| CAPTURE_INTERVAL = 2.0 # seconds between frames | |
| SCENE_SIZE = 512 # pixels for VLM resize | |
| CONFIG_PATH = "memorybridge/config/settings.yaml" | |
| SCENE_PROMPT = ( | |
| "Look at this image and respond with ONE of the following:\n" | |
| "- If you see a clear smile or positive expression: 'smiling'\n" | |
| "- If you see a frustrated, angry, or negative expression: 'frustrated'\n" | |
| "- If you see a surprised expression: 'surprised'\n" | |
| "- If you see an object being held up or introduced (not a hand gesture): " | |
| "describe the object in 3 words max, e.g. 'holding phone', 'showing cat photo'\n" | |
| "- If you see a thumbs up, thumbs down, pointing, or other hand gesture: " | |
| "name it in 2 words, e.g. 'thumbs up'\n" | |
| "- If nothing significant: respond exactly 'no_signal'\n\n" | |
| "Respond with ONLY one of these options. No other text." | |
| ) | |
| # ββ Test-only data structure (NOT in production schemas) ββββββββββββββββββββββ | |
| class TestFrame: | |
| timestamp: float | |
| snapshot: object # VisionSnapshot β imported at runtime | |
| scene_description: str | |
| # ββ Camera helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _open_camera() -> cv2.VideoCapture: | |
| cap = cv2.VideoCapture(0) | |
| if not cap.isOpened(): | |
| print("[ERROR] Cannot open camera (VideoCapture(0) failed).", file=sys.stderr) | |
| sys.exit(1) | |
| cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) | |
| cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) | |
| return cap | |
| def _capture_frame(cap: cv2.VideoCapture) -> Optional[np.ndarray]: | |
| ret, frame = cap.read() | |
| return frame if ret else None | |
| # ββ VLM scene description βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _call_vlm_scene(vlm, bgr_frame: np.ndarray) -> str: | |
| try: | |
| resized = cv2.resize(bgr_frame, (SCENE_SIZE, SCENE_SIZE)) | |
| rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB) | |
| pil_img = Image.fromarray(rgb) | |
| # Moondream path: exposes encode_image() + query() | |
| if hasattr(vlm, "encode_image") and hasattr(vlm, "query"): | |
| encoded = vlm.encode_image(pil_img) | |
| answer = vlm.query(encoded, SCENE_PROMPT) | |
| if isinstance(answer, dict): | |
| return answer.get("answer", "").strip() | |
| return str(answer).strip() | |
| # LangChain cloud VLM path | |
| import base64, io | |
| from langchain_core.messages import HumanMessage | |
| buf = io.BytesIO() | |
| pil_img.save(buf, format="JPEG") | |
| b64 = base64.b64encode(buf.getvalue()).decode("utf-8") | |
| msg = HumanMessage(content=[ | |
| {"type": "text", "text": SCENE_PROMPT}, | |
| {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}}, | |
| ]) | |
| return vlm.invoke([msg]).content.strip() | |
| except Exception as exc: | |
| print(f"[VLM] Scene description error: {exc}", file=sys.stderr) | |
| return "(VLM error)" | |
| # ββ Main per-frame logic ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _run_frame(cap, gesture_det, affect_det, air_sign_det, vlm): | |
| from memorybridge.core.schemas import AirSignChar, VisionSnapshot | |
| bgr = _capture_frame(cap) | |
| if bgr is None: | |
| print("[WARN] Frame capture failed β skipping.", file=sys.stderr) | |
| return None | |
| # Gesture | |
| gesture_signals = gesture_det.detect(bgr) or [] | |
| # Affect | |
| affect_signal = affect_det.detect(bgr) | |
| # Air-sign (sync camera-loop path) | |
| air_sign_char: Optional[AirSignChar] = None | |
| tip = gesture_det.get_fingertip() | |
| now = time.monotonic() | |
| n_pts, should_fire, _ = air_sign_det.update_trajectory_from_tip(tip, now) | |
| if should_fire: | |
| canvas = air_sign_det._render_trajectory() | |
| air_sign_det._reset() | |
| letter = air_sign_det.call_vlm_sync(canvas, n_pts) | |
| if letter is not None: | |
| air_sign_char = AirSignChar( | |
| character=letter, | |
| confidence=0.85, | |
| timestamp=time.time(), | |
| ) | |
| print(f"[AirSign] Confirmed letter: {letter}", file=sys.stderr) | |
| # Scene description from VLM | |
| scene_description = _call_vlm_scene(vlm, bgr) | |
| snapshot = VisionSnapshot( | |
| timestamp=time.time(), | |
| gestures=gesture_signals, | |
| affect=affect_signal, | |
| air_sign_char=air_sign_char, | |
| ) | |
| return TestFrame( | |
| timestamp=snapshot.timestamp, | |
| snapshot=snapshot, | |
| scene_description=scene_description, | |
| ) | |
| # ββ Deduplication βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _is_no_signal(frame: TestFrame) -> bool: | |
| return "no_signal" in frame.scene_description.lower() | |
| def _is_duplicate(last: Optional[TestFrame], new: TestFrame) -> bool: | |
| """Compare only against the last frame that had a real behavioral signal.""" | |
| if last is None: | |
| return False | |
| def top_gesture(frame: TestFrame) -> str: | |
| if frame.snapshot.gestures: | |
| return frame.snapshot.gestures[0].gesture_class | |
| return "neutral" | |
| same_gesture = top_gesture(last) == top_gesture(new) | |
| same_scene = last.scene_description == new.scene_description | |
| return same_gesture and same_scene | |
| # ββ Display helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _fmt_time(ts: float) -> str: | |
| import datetime | |
| return datetime.datetime.fromtimestamp(ts).strftime("%H:%M:%S.%f")[:-3] | |
| def _print_frame(frame: TestFrame, index: int) -> None: | |
| snap = frame.snapshot | |
| top_g = "None" | |
| if snap.gestures: | |
| g = snap.gestures[0] | |
| top_g = f"{g.gesture_class} (conf={g.confidence:.2f})" | |
| aff = "None" | |
| if snap.affect is not None: | |
| a = snap.affect | |
| aff = f"{a.affect_class} (conf={a.confidence:.2f})" | |
| air = "None" | |
| if snap.air_sign_char is not None: | |
| air = f"'{snap.air_sign_char.character}' (conf={snap.air_sign_char.confidence:.2f})" | |
| print(f"\n[Frame {index} | {_fmt_time(frame.timestamp)}]") | |
| print(f" Gesture : {top_g}") | |
| print(f" Affect : {aff}") | |
| print(f" Air-Sign : {air}") | |
| print(f" Scene : {frame.scene_description}") | |
| def _print_buffer_state(test_buffer: deque, signal_buffer) -> None: | |
| print("\n" + "=" * 60) | |
| print(f" BUFFER STATE ({len(test_buffer)} frames stored)") | |
| print("=" * 60) | |
| for i, frame in enumerate(test_buffer): | |
| _print_frame(frame, i + 1) | |
| print("\nββ Aggregated signals ββ") | |
| state = signal_buffer.get_state_sync() | |
| if state.aggregated_gesture: | |
| g = state.aggregated_gesture | |
| print(f" Gesture : {g.gesture_class} (conf={g.confidence:.2f})") | |
| else: | |
| print(" Gesture : None (all neutral)") | |
| if state.aggregated_affect: | |
| a = state.aggregated_affect | |
| print(f" Affect : {a.affect_class} (conf={a.confidence:.2f})") | |
| else: | |
| print(" Affect : None (no dominant class)") | |
| if state.air_sign_sequence: | |
| letters = "".join(c.character for c in state.air_sign_sequence) | |
| print(f" Air-Sign : confirmed sequence = '{letters}'") | |
| else: | |
| print(" Air-Sign : (none confirmed)") | |
| print("=" * 60) | |
| # ββ Keyboard input (Windows β msvcrt) ββββββββββββββββββββββββββββββββββββββββ | |
| def _check_keypress() -> Optional[str]: | |
| try: | |
| import msvcrt | |
| if msvcrt.kbhit(): | |
| return msvcrt.getch().decode("utf-8", errors="ignore") | |
| except ImportError: | |
| pass | |
| return None | |
| # ββ Main ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main() -> None: | |
| print("Initialising MemoryBridge vision pipeline testβ¦") | |
| print(f" Buffer: {BUFFER_MAXLEN} frames Γ {CAPTURE_INTERVAL:.0f}s = {BUFFER_MAXLEN * int(CAPTURE_INTERVAL)}s window") | |
| # Load registry + VLM | |
| from memorybridge.core.models import ModelRegistry | |
| registry = ModelRegistry(CONFIG_PATH) | |
| print(" Loading VLM (may take a moment on first run)β¦", end="", flush=True) | |
| vlm = registry.get_vlm() | |
| print(" done.") | |
| # Vision detectors | |
| from memorybridge.vision_path.gesture_detector import GestureDetector | |
| from memorybridge.vision_path.affect_detector import AffectDetector | |
| from memorybridge.vision_path.air_sign_detector import AirSignDetector | |
| from memorybridge.vision_path.signal_buffer import SignalBuffer | |
| gesture_det = GestureDetector() | |
| affect_det = AffectDetector() | |
| air_sign_det = AirSignDetector(registry) | |
| signal_buffer = SignalBuffer( | |
| buffer_size=BUFFER_MAXLEN, | |
| air_sign_confirmation_windows=2, | |
| ) | |
| # Test-side deque (mirrors signal_buffer for display) | |
| test_buffer: deque[TestFrame] = deque(maxlen=BUFFER_MAXLEN) | |
| # Open camera | |
| cap = _open_camera() | |
| print("\nStarting β press Enter to dump buffer, q + Enter to quit.\n") | |
| print("-" * 60) | |
| frame_index = 0 | |
| last_frame: Optional[TestFrame] = None | |
| pending_quit = False | |
| running = True | |
| try: | |
| while running: | |
| loop_start = time.monotonic() | |
| new_frame = _run_frame(cap, gesture_det, affect_det, air_sign_det, vlm) | |
| if new_frame is not None: | |
| frame_index += 1 | |
| _print_frame(new_frame, frame_index) | |
| has_vlm_signal = not _is_no_signal(new_frame) | |
| has_gesture_signal = bool( | |
| new_frame.snapshot.gestures | |
| and new_frame.snapshot.gestures[0].gesture_class != "neutral" | |
| and new_frame.snapshot.gestures[0].confidence >= 0.75 | |
| ) | |
| has_air_sign = new_frame.snapshot.air_sign_char is not None | |
| has_any_signal = has_vlm_signal or has_gesture_signal or has_air_sign | |
| if not has_any_signal: | |
| print(" (no_signal β buffer unchanged)") | |
| elif _is_duplicate(last_frame, new_frame): | |
| print(" (no change β buffer unchanged)") | |
| else: | |
| test_buffer.append(new_frame) | |
| # Direct append is GIL-atomic and safe in a single-threaded script | |
| signal_buffer._buffer.append(new_frame.snapshot) | |
| last_frame = new_frame | |
| # Keyboard input | |
| ch = _check_keypress() | |
| if ch == "q": | |
| pending_quit = True | |
| print(" [Press Enter to confirm quit]") | |
| elif ch in ("\r", "\n"): | |
| if pending_quit: | |
| running = False | |
| else: | |
| _print_buffer_state(test_buffer, signal_buffer) | |
| elapsed = time.monotonic() - loop_start | |
| sleep_s = max(0.0, CAPTURE_INTERVAL - elapsed) | |
| time.sleep(sleep_s) | |
| except KeyboardInterrupt: | |
| print("\n[Interrupted]") | |
| finally: | |
| cap.release() | |
| gesture_det.close() | |
| affect_det.close() | |
| air_sign_det.close() | |
| print("\nCamera released. Done.") | |
| if __name__ == "__main__": | |
| main() | |