latishab's picture
Update TARS Conversation App with TarsApp framework
e8ed0e1 verified
"""Intervention Gating: Traffic Controller for Bot Responses."""
import json
import time
import aiohttp
import asyncio
from loguru import logger
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from pipecat.frames.frames import LLMMessagesFrame, Frame
from character.prompts import build_gating_system_prompt
class InterventionGating(FrameProcessor):
"""
Traffic Controller: Decides if TARS should reply based on Audio + Vision + Emotions.
Uses OpenAI-compatible API (DeepInfra).
"""
def __init__(
self,
api_key: str,
base_url: str = "https://api.deepinfra.com/v1/openai",
model: str = "meta-llama/Llama-3.2-3B-Instruct",
visual_observer=None,
emotional_monitor=None
):
super().__init__()
self.api_key = api_key
self.base_url = base_url
self.model = model
self.visual_observer = visual_observer
self.emotional_monitor = emotional_monitor
self.api_url = f"{base_url}/chat/completions"
async def _check_should_reply(self, messages: list) -> bool:
"""Asks the fast LLM if we should reply (Audio + Vision + Emotions)."""
if not messages:
return False
# Extract the last user message
last_msg = messages[-1]
if last_msg.get("role") != "user":
return True
# 1. READ EMOTIONAL STATE (Highest Priority)
emotional_state = None
needs_help = False
if self.emotional_monitor:
emotional_state = self.emotional_monitor.get_current_state()
if emotional_state and emotional_state.needs_intervention():
# User is confused/hesitant/frustrated - ALWAYS respond
logger.info(
f"🧠 Gating: User shows {emotional_state} - BYPASSING gating, offering help"
)
return True
needs_help = emotional_state.needs_intervention() if emotional_state else False
# 2. READ VISUAL CONTEXT (0ms Latency)
is_looking = False
if self.visual_observer:
# Read the variable updated by the background task
is_looking = self.visual_observer.visual_context.get("is_looking_at_robot", False)
# Ignore if data is too old (> 5 seconds)
last_update = self.visual_observer.visual_context.get("last_updated", 0)
if time.time() - last_update > 5.0:
is_looking = False
# 3. ANALYZE CONTEXT
history_text = "\n".join([f"{m['role']}: {m['content']}" for m in messages[-3:]])
# Build enriched system prompt with emotional context
system_prompt = build_gating_system_prompt(is_looking, emotional_state)
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Context:\n{history_text}"}
],
"response_format": {"type": "json_object"},
"max_tokens": 50
}
# Set strict timeout so we don't silence the bot if API is slow
timeout = aiohttp.ClientTimeout(total=1.5)
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(
self.api_url,
headers={"Authorization": f"Bearer {self.api_key}"},
json=payload
) as resp:
if resp.status == 200:
result = await resp.json()
content_response = result["choices"][0]["message"]["content"]
content_response = content_response.replace("```json", "").replace("```", "").strip()
data = json.loads(content_response)
should_reply = data.get("reply", False)
logger.debug(f"Gating decision: {should_reply} (Looking: {is_looking})")
return should_reply
else:
logger.warning(f"Gating check failed: {resp.status}")
return True # Fail open (reply if check fails)
except asyncio.TimeoutError:
logger.warning("🚦 Gating: Timed out! Defaulting to REPLY.")
return True
except Exception as e:
logger.error(f"Gating error: {e}")
return True
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""
Intercepts LLMMessagesFrame.
If 'should_reply' is False, we DROP the frame, effectively silencing the bot.
"""
await super().process_frame(frame, direction)
if isinstance(frame, LLMMessagesFrame) and direction == FrameDirection.DOWNSTREAM:
# Check if we should reply
should_reply = await self._check_should_reply(frame.messages)
if not should_reply:
logger.info(f"🚦 Gating: BLOCKING response.")
return # DROP THE FRAME
logger.info(f"🟢 Gating: PASSING through.")
await self.push_frame(frame, direction)