Spaces:
Running
Running
| import asyncio | |
| import time | |
| from typing import Optional, List, Dict, Any | |
| from loguru import logger | |
| from pipecat.frames.frames import Frame, ImageRawFrame, TextFrame | |
| from pipecat.processors.frame_processor import FrameProcessor, FrameDirection | |
| import base64 | |
| from PIL import Image | |
| import io | |
| import cv2 | |
| import numpy as np | |
| try: | |
| import mediapipe as mp | |
| MEDIAPIPE_AVAILABLE = True | |
| except ImportError: | |
| MEDIAPIPE_AVAILABLE = False | |
| logger.warning("MediaPipe not available, using OpenCV for face detection") | |
| class VisualObserver(FrameProcessor): | |
| """ | |
| Observer that waits for UserImageRequestFrame, captures the next video frame, | |
| analyzes it with a vision model, and injects the description back into the context. | |
| Now includes face detection and display capabilities. | |
| """ | |
| def __init__( | |
| self, | |
| vision_client, | |
| model="moondream", | |
| enable_display=False, | |
| enable_face_detection=True, | |
| webrtc_connection=None, | |
| tars_client=None | |
| ): | |
| super().__init__() | |
| self._vision_client = vision_client | |
| self._model = model | |
| self._waiting_for_image = False | |
| self._current_request = None | |
| self._last_analysis_time = 0 | |
| self._cooldown = 2.0 # Min seconds between analyses | |
| self._enable_display = enable_display | |
| self._enable_face_detection = enable_face_detection | |
| self._webrtc_connection = webrtc_connection | |
| self._tars_client = None # Deprecated: Display control via gRPC in robot mode | |
| self._display_window_name = "TARS Visual Observer" | |
| # Face detection setup | |
| self._face_detector = None | |
| if self._enable_face_detection: | |
| self._setup_face_detection() | |
| # Stats | |
| self._face_count = 0 | |
| self._frames_processed = 0 | |
| self._last_face_time = 0 | |
| def _setup_face_detection(self): | |
| """Initialize face detection based on available libraries.""" | |
| try: | |
| if MEDIAPIPE_AVAILABLE: | |
| logger.info("🎯 Initializing MediaPipe face detection") | |
| self._face_detector_type = "mediapipe" | |
| self._mp_face_detection = mp.solutions.face_detection | |
| self._mp_drawing = mp.solutions.drawing_utils | |
| self._face_detector = self._mp_face_detection.FaceDetection( | |
| model_selection=0, # 0 for short-range (< 2m), 1 for full-range | |
| min_detection_confidence=0.5 | |
| ) | |
| else: | |
| # Fallback to OpenCV Haar Cascade | |
| logger.info("🎯 Initializing OpenCV Haar Cascade face detection") | |
| self._face_detector_type = "opencv" | |
| cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml' | |
| self._face_detector = cv2.CascadeClassifier(cascade_path) | |
| if self._face_detector.empty(): | |
| logger.error("Failed to load Haar Cascade classifier") | |
| self._face_detector = None | |
| except Exception as e: | |
| logger.error(f"Failed to initialize face detection: {e}") | |
| self._face_detector = None | |
| def detect_faces(self, image: np.ndarray) -> List[Dict[str, Any]]: | |
| """ | |
| Detect faces in the image. | |
| Args: | |
| image: numpy array in BGR format | |
| Returns: | |
| List of face dictionaries with bounding boxes and confidence | |
| """ | |
| if not self._face_detector: | |
| return [] | |
| faces = [] | |
| try: | |
| if self._face_detector_type == "mediapipe": | |
| # Convert BGR to RGB for MediaPipe | |
| rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| results = self._face_detector.process(rgb_image) | |
| if results.detections: | |
| h, w, _ = image.shape | |
| for detection in results.detections: | |
| bbox = detection.location_data.relative_bounding_box | |
| faces.append({ | |
| 'x': int(bbox.xmin * w), | |
| 'y': int(bbox.ymin * h), | |
| 'width': int(bbox.width * w), | |
| 'height': int(bbox.height * h), | |
| 'confidence': detection.score[0] | |
| }) | |
| else: # opencv | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| detected_faces = self._face_detector.detectMultiScale( | |
| gray, | |
| scaleFactor=1.1, | |
| minNeighbors=5, | |
| minSize=(30, 30) | |
| ) | |
| for (x, y, w, h) in detected_faces: | |
| faces.append({ | |
| 'x': x, | |
| 'y': y, | |
| 'width': w, | |
| 'height': h, | |
| 'confidence': 1.0 # OpenCV Haar doesn't provide confidence | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error detecting faces: {e}") | |
| return faces | |
| def draw_faces(self, image: np.ndarray, faces: List[Dict[str, Any]]) -> np.ndarray: | |
| """ | |
| Draw bounding boxes around detected faces. | |
| Args: | |
| image: numpy array in BGR format | |
| faces: List of face dictionaries from detect_faces() | |
| Returns: | |
| Image with faces drawn | |
| """ | |
| annotated_image = image.copy() | |
| for face in faces: | |
| x, y, w, h = face['x'], face['y'], face['width'], face['height'] | |
| confidence = face['confidence'] | |
| # Draw rectangle | |
| cv2.rectangle(annotated_image, (x, y), (x + w, y + h), (0, 255, 0), 2) | |
| # Draw confidence score | |
| label = f"Face: {confidence:.2f}" | |
| cv2.putText( | |
| annotated_image, | |
| label, | |
| (x, y - 10), | |
| cv2.FONT_HERSHEY_SIMPLEX, | |
| 0.5, | |
| (0, 255, 0), | |
| 2 | |
| ) | |
| # Draw face count | |
| cv2.putText( | |
| annotated_image, | |
| f"Faces: {len(faces)}", | |
| (10, 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, | |
| 1, | |
| (0, 255, 0), | |
| 2 | |
| ) | |
| return annotated_image | |
| def display_frame(self, image: np.ndarray, faces: Optional[List[Dict[str, Any]]] = None): | |
| """ | |
| Display the frame in a window with optional face annotations. | |
| Args: | |
| image: numpy array in BGR format | |
| faces: Optional list of detected faces to draw | |
| """ | |
| if not self._enable_display: | |
| return | |
| try: | |
| display_image = image.copy() | |
| if faces: | |
| display_image = self.draw_faces(display_image, faces) | |
| cv2.imshow(self._display_window_name, display_image) | |
| cv2.waitKey(1) # Required for window to update | |
| except Exception as e: | |
| logger.error(f"Error displaying frame: {e}") | |
| def send_display_event(self, faces: List[Dict[str, Any]], image_base64: Optional[str] = None): | |
| """ | |
| Send display event to WebRTC connection with face detection results. | |
| Args: | |
| faces: List of detected faces | |
| image_base64: Optional base64-encoded image | |
| """ | |
| if not self._webrtc_connection: | |
| return | |
| try: | |
| if self._webrtc_connection.is_connected(): | |
| event_data = { | |
| "type": "face_detection", | |
| "status": "detected" if faces else "no_faces", | |
| "face_count": len(faces), | |
| "faces": faces, | |
| "timestamp": time.time() | |
| } | |
| # Optionally include thumbnail | |
| if image_base64 and len(faces) > 0: | |
| event_data["thumbnail"] = image_base64 | |
| self._webrtc_connection.send_app_message(event_data) | |
| except Exception as e: | |
| logger.debug(f"Error sending display event: {e}") | |
| async def process_frame(self, frame: Frame, direction: FrameDirection): | |
| await super().process_frame(frame, direction) | |
| # 1. Handle Request from LLM (Check by class name to avoid import errors) | |
| # We check for "UserImageRequestFrame" (your custom frame) OR "VisionImageRequestFrame" | |
| if frame.__class__.__name__ in ["UserImageRequestFrame", "VisionImageRequestFrame"]: | |
| logger.info(f"👁️ Vision request received: {getattr(frame, 'context', 'No context')}") | |
| self._waiting_for_image = True | |
| self._current_request = frame | |
| # We don't yield this frame downstream; we consume it and act on it. | |
| return | |
| # 2. Handle Video Input (continuous face detection + optional vision analysis) | |
| if isinstance(frame, ImageRawFrame): | |
| self._frames_processed += 1 | |
| # Process face detection on every frame (or throttled) | |
| if self._enable_face_detection and self._frames_processed % 5 == 0: | |
| # Run face detection in background | |
| asyncio.create_task(self._process_face_detection(frame)) | |
| # Vision analysis only when requested | |
| if self._waiting_for_image: | |
| # Check cooldown | |
| if time.time() - self._last_analysis_time < self._cooldown: | |
| await self.push_frame(frame, direction) | |
| return | |
| logger.info("📸 Capturing frame for analysis...") | |
| self._waiting_for_image = False # Reset flag immediately | |
| self._last_analysis_time = time.time() | |
| # Run analysis in background to avoid blocking audio pipeline | |
| asyncio.create_task(self._analyze_and_respond(frame)) | |
| # Note: Still pass frame through for face detection | |
| # Pass all other frames through | |
| await self.push_frame(frame, direction) | |
| async def _process_face_detection(self, frame: ImageRawFrame): | |
| """Process face detection on video frame and send display events.""" | |
| try: | |
| # Convert frame to numpy array | |
| image = Image.frombytes(frame.format, frame.size, frame.image) | |
| image_np = np.array(image) | |
| # Convert RGB to BGR for OpenCV | |
| if image_np.shape[2] == 3: | |
| image_bgr = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR) | |
| else: | |
| image_bgr = image_np | |
| # Get frame dimensions | |
| frame_height, frame_width = image_bgr.shape[:2] | |
| # Detect faces | |
| faces = self.detect_faces(image_bgr) | |
| if faces: | |
| self._face_count = len(faces) | |
| current_time = time.time() | |
| # Log only periodically to avoid spam | |
| if current_time - self._last_face_time > 5.0: | |
| logger.info(f"👤 Detected {len(faces)} face(s)") | |
| self._last_face_time = current_time | |
| # Get the largest/most prominent face | |
| primary_face = max(faces, key=lambda f: f['width'] * f['height']) | |
| # Calculate face center | |
| face_center_x = primary_face['x'] + primary_face['width'] // 2 | |
| face_center_y = primary_face['y'] + primary_face['height'] // 2 | |
| # Display the frame with face annotations | |
| self.display_frame(image_bgr, faces) | |
| # Send face position event to WebRTC frontend | |
| self.send_display_event(faces) | |
| # Optionally send face position to text frame for LLM context | |
| # This can be used for "user is looking at you" type feedback | |
| # Uncomment if you want the LLM to know about face position | |
| # face_text = f"[Face Detected]: Position ({face_center_x}, {face_center_y}), Size: {primary_face['width']}x{primary_face['height']}" | |
| # await self.push_frame(TextFrame(text=face_text), FrameDirection.UPSTREAM) | |
| else: | |
| # No faces detected | |
| if self._face_count > 0: | |
| logger.debug("No faces detected") | |
| self._face_count = 0 | |
| # Send "no face" event to WebRTC | |
| self.send_display_event([]) | |
| # Display frame without annotations | |
| self.display_frame(image_bgr) | |
| except Exception as e: | |
| logger.error(f"Error in face detection: {e}") | |
| async def _analyze_and_respond(self, frame: ImageRawFrame): | |
| """Analyze image and push result text frame downstream.""" | |
| try: | |
| # Convert raw frame to base64 | |
| image = Image.frombytes(frame.format, frame.size, frame.image) | |
| buffered = io.BytesIO() | |
| image.save(buffered, format="JPEG") | |
| img_str = base64.b64encode(buffered.getvalue()).decode() | |
| prompt = "Describe this image briefly." | |
| # Try to extract prompt from the request context if available | |
| if self._current_request and hasattr(self._current_request, 'context'): | |
| # Assuming context might be the question text | |
| context = self._current_request.context | |
| if context: | |
| prompt = f"{context} (Describe the image to answer this)" | |
| logger.info(f"🔍 Sending image to vision model ({self._model})...") | |
| try: | |
| response = await asyncio.wait_for( | |
| self._vision_client.chat.completions.create( | |
| model=self._model, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": prompt}, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/jpeg;base64,{img_str}" | |
| }, | |
| }, | |
| ], | |
| } | |
| ], | |
| max_tokens=100 | |
| ), | |
| timeout=8.0 # 8 second timeout to prevent hanging | |
| ) | |
| description = response.choices[0].message.content | |
| logger.info(f"✅ Vision analysis: {description}") | |
| except asyncio.TimeoutError: | |
| logger.warning("⚠️ Vision model timed out!") | |
| description = "I couldn't see clearly because the visual processing timed out." | |
| except Exception as e: | |
| logger.error(f"❌ Vision model error: {e}") | |
| description = "I had trouble processing the visual data." | |
| feedback_text = f"[Visual Observation]: {description}" | |
| # Push text frame to LLM | |
| await self.push_frame(TextFrame(text=feedback_text), FrameDirection.UPSTREAM) | |
| except Exception as e: | |
| logger.error(f"Error in vision pipeline: {e}") | |
| self._waiting_for_image = False |