Spaces:
Running
Running
| """Intervention Gating: Traffic Controller for Bot Responses.""" | |
| import json | |
| import time | |
| import aiohttp | |
| import asyncio | |
| from loguru import logger | |
| from pipecat.processors.frame_processor import FrameProcessor, FrameDirection | |
| from pipecat.frames.frames import LLMMessagesFrame, Frame | |
| from character.prompts import build_gating_system_prompt | |
| class InterventionGating(FrameProcessor): | |
| """ | |
| Traffic Controller: Decides if TARS should reply based on Audio + Vision + Emotions. | |
| Uses OpenAI-compatible API (DeepInfra). | |
| """ | |
| def __init__( | |
| self, | |
| api_key: str, | |
| base_url: str = "https://api.deepinfra.com/v1/openai", | |
| model: str = "meta-llama/Llama-3.2-3B-Instruct", | |
| visual_observer=None, | |
| emotional_monitor=None | |
| ): | |
| super().__init__() | |
| self.api_key = api_key | |
| self.base_url = base_url | |
| self.model = model | |
| self.visual_observer = visual_observer | |
| self.emotional_monitor = emotional_monitor | |
| self.api_url = f"{base_url}/chat/completions" | |
| async def _check_should_reply(self, messages: list) -> bool: | |
| """Asks the fast LLM if we should reply (Audio + Vision + Emotions).""" | |
| if not messages: | |
| return False | |
| # Extract the last user message | |
| last_msg = messages[-1] | |
| if last_msg.get("role") != "user": | |
| return True | |
| # 1. READ EMOTIONAL STATE (Highest Priority) | |
| emotional_state = None | |
| needs_help = False | |
| if self.emotional_monitor: | |
| emotional_state = self.emotional_monitor.get_current_state() | |
| if emotional_state and emotional_state.needs_intervention(): | |
| # User is confused/hesitant/frustrated - ALWAYS respond | |
| logger.info( | |
| f"🧠 Gating: User shows {emotional_state} - BYPASSING gating, offering help" | |
| ) | |
| return True | |
| needs_help = emotional_state.needs_intervention() if emotional_state else False | |
| # 2. READ VISUAL CONTEXT (0ms Latency) | |
| is_looking = False | |
| if self.visual_observer: | |
| # Read the variable updated by the background task | |
| is_looking = self.visual_observer.visual_context.get("is_looking_at_robot", False) | |
| # Ignore if data is too old (> 5 seconds) | |
| last_update = self.visual_observer.visual_context.get("last_updated", 0) | |
| if time.time() - last_update > 5.0: | |
| is_looking = False | |
| # 3. ANALYZE CONTEXT | |
| history_text = "\n".join([f"{m['role']}: {m['content']}" for m in messages[-3:]]) | |
| # Build enriched system prompt with emotional context | |
| system_prompt = build_gating_system_prompt(is_looking, emotional_state) | |
| payload = { | |
| "model": self.model, | |
| "messages": [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": f"Context:\n{history_text}"} | |
| ], | |
| "response_format": {"type": "json_object"}, | |
| "max_tokens": 50 | |
| } | |
| # Set strict timeout so we don't silence the bot if API is slow | |
| timeout = aiohttp.ClientTimeout(total=1.5) | |
| try: | |
| async with aiohttp.ClientSession(timeout=timeout) as session: | |
| async with session.post( | |
| self.api_url, | |
| headers={"Authorization": f"Bearer {self.api_key}"}, | |
| json=payload | |
| ) as resp: | |
| if resp.status == 200: | |
| result = await resp.json() | |
| content_response = result["choices"][0]["message"]["content"] | |
| content_response = content_response.replace("```json", "").replace("```", "").strip() | |
| data = json.loads(content_response) | |
| should_reply = data.get("reply", False) | |
| logger.debug(f"Gating decision: {should_reply} (Looking: {is_looking})") | |
| return should_reply | |
| else: | |
| logger.warning(f"Gating check failed: {resp.status}") | |
| return True # Fail open (reply if check fails) | |
| except asyncio.TimeoutError: | |
| logger.warning("🚦 Gating: Timed out! Defaulting to REPLY.") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Gating error: {e}") | |
| return True | |
| async def process_frame(self, frame: Frame, direction: FrameDirection): | |
| """ | |
| Intercepts LLMMessagesFrame. | |
| If 'should_reply' is False, we DROP the frame, effectively silencing the bot. | |
| """ | |
| await super().process_frame(frame, direction) | |
| if isinstance(frame, LLMMessagesFrame) and direction == FrameDirection.DOWNSTREAM: | |
| # Check if we should reply | |
| should_reply = await self._check_should_reply(frame.messages) | |
| if not should_reply: | |
| logger.info(f"🚦 Gating: BLOCKING response.") | |
| return # DROP THE FRAME | |
| logger.info(f"🟢 Gating: PASSING through.") | |
| await self.push_frame(frame, direction) |