File size: 24,578 Bytes
7fb83e4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
"""Bot pipeline setup and execution."""

import sys
from pathlib import Path

# Add src directory to Python path for imports
src_dir = Path(__file__).parent
sys.path.insert(0, str(src_dir))

import asyncio
import json
import os
import logging
import uuid
import httpx

from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import (
    LLMRunFrame,
    TranscriptionFrame,
    InterimTranscriptionFrame,
    Frame,
    TranscriptionMessage,
    TranslationFrame,
    UserImageRawFrame,
    UserAudioRawFrame,
    UserImageRequestFrame,
)
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask, PipelineParams
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
    LLMContextAggregatorPair,
    LLMUserAggregatorParams
)
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver
from pipecat.observers.loggers.user_bot_latency_log_observer import UserBotLatencyLogObserver
from pipecat.services.moondream.vision import MoondreamService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.llm_service import FunctionCallParams
from services.memory_hybrid import HybridMemoryService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport

from loguru import logger

from config import (
    SPEECHMATICS_API_KEY,
    DEEPGRAM_API_KEY,
    ELEVENLABS_API_KEY,
    ELEVENLABS_VOICE_ID,
    DEEPINFRA_API_KEY,
    DEEPINFRA_BASE_URL,
    MEM0_API_KEY,
    get_fresh_config,
)
from services.factories import create_stt_service, create_tts_service
from processors import (
    SilenceFilter,
    InputAudioFilter,
    InterventionGating,
    VisualObserver,
    EmotionalStateMonitor,
)
from observers import (
    MetricsObserver,
    TranscriptionObserver,
    AssistantResponseObserver,
    TTSStateObserver,
    VisionObserver,
    DebugObserver,
    DisplayEventsObserver,
)
from character.prompts import (
    load_persona_ini,
    load_tars_json,
    build_tars_system_prompt,
    get_introduction_instruction,
)
from tools import (
    fetch_user_image,
    adjust_persona_parameter,
    execute_movement,
    capture_camera_view,
    create_fetch_image_schema,
    create_adjust_persona_schema,
    create_identity_schema,
    create_movement_schema,
    create_camera_capture_schema,
    get_persona_storage,
    get_crossword_hint,
    create_crossword_hint_schema,
)
from shared_state import metrics_store


# ============================================================================
# CUSTOM FRAME PROCESSORS
# ============================================================================

class IdentityUnifier(FrameProcessor):
    """
    Applies 'guest_ID' ONLY to specific user input frames.
    Leaves other frames untouched.
    """
    # Define the frame types that should have user_id set
    TARGET_FRAME_TYPES = (
        TranscriptionFrame,
        TranscriptionMessage,
        TranslationFrame,
        InterimTranscriptionFrame,
        UserImageRawFrame,
        UserAudioRawFrame,
        UserImageRequestFrame,
    )

    def __init__(self, target_user_id):
        super().__init__()
        self.target_user_id = target_user_id

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        # 1. Handle internal state
        await super().process_frame(frame, direction)

        # 2. Only modify specific frame types
        if isinstance(frame, self.TARGET_FRAME_TYPES):
            try:
                frame.user_id = self.target_user_id
            except Exception:
                pass

        # 3. Push downstream
        await self.push_frame(frame, direction)


# ============================================================================
# HELPER FUNCTIONS
# ============================================================================

async def _cleanup_services(service_refs: dict):
    if service_refs.get("stt"):
        try:
            await service_refs["stt"].close()
            logger.info("βœ“ STT service cleaned up")
        except Exception:
            pass
    if service_refs.get("tts"):
        try:
            await service_refs["tts"].close()
            logger.info("βœ“ TTS service cleaned up")
        except Exception:
            pass


# ============================================================================
# MAIN BOT PIPELINE
# ============================================================================

async def run_bot(webrtc_connection):
    """Initialize and run the TARS bot pipeline."""
    logger.info("Starting bot pipeline for WebRTC connection...")

    # Load fresh configuration for this connection (allows runtime config updates)
    runtime_config = get_fresh_config()
    DEEPINFRA_MODEL = runtime_config['DEEPINFRA_MODEL']
    DEEPINFRA_GATING_MODEL = runtime_config['DEEPINFRA_GATING_MODEL']
    STT_PROVIDER = runtime_config['STT_PROVIDER']
    TTS_PROVIDER = runtime_config['TTS_PROVIDER']
    QWEN3_TTS_MODEL = runtime_config['QWEN3_TTS_MODEL']
    QWEN3_TTS_DEVICE = runtime_config['QWEN3_TTS_DEVICE']
    QWEN3_TTS_REF_AUDIO = runtime_config['QWEN3_TTS_REF_AUDIO']
    EMOTIONAL_MONITORING_ENABLED = runtime_config['EMOTIONAL_MONITORING_ENABLED']
    EMOTIONAL_SAMPLING_INTERVAL = runtime_config['EMOTIONAL_SAMPLING_INTERVAL']
    EMOTIONAL_INTERVENTION_THRESHOLD = runtime_config['EMOTIONAL_INTERVENTION_THRESHOLD']
    TARS_DISPLAY_URL = runtime_config['TARS_DISPLAY_URL']
    TARS_DISPLAY_ENABLED = runtime_config['TARS_DISPLAY_ENABLED']

    logger.info(f"πŸ“‹ Runtime config loaded - STT: {STT_PROVIDER}, LLM: {DEEPINFRA_MODEL}, TTS: {TTS_PROVIDER}, Emotional: {EMOTIONAL_MONITORING_ENABLED}")

    # Session initialization
    session_id = str(uuid.uuid4())[:8]
    client_id = f"guest_{session_id}"
    client_state = {"client_id": client_id}
    logger.info(f"Session started: {client_id}")

    service_refs = {"stt": None, "tts": None}

    try:
        # ====================================================================
        # TRANSPORT INITIALIZATION
        # ====================================================================
        # Note: STT providers handle their own turn detection:
        # - Speechmatics: SMART_TURN mode
        # - Deepgram: endpointing parameter (300ms silence detection)
        # - Deepgram Flux: built-in turn detection with ExternalUserTurnStrategies (deprecated)

        logger.info(f"Initializing transport with {STT_PROVIDER} turn detection...")

        transport_params = TransportParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            video_in_enabled=False,
            video_out_enabled=False,
            video_out_is_live=False,
        )

        pipecat_transport = SmallWebRTCTransport(
            webrtc_connection=webrtc_connection,
            params=transport_params,
        )

        logger.info("βœ“ Transport initialized")

        # ====================================================================
        # SPEECH-TO-TEXT SERVICE
        # ====================================================================

        logger.info(f"Initializing {STT_PROVIDER} STT...")
        stt = None
        try:
            stt = create_stt_service(
                provider=STT_PROVIDER,
                speechmatics_api_key=SPEECHMATICS_API_KEY,
                deepgram_api_key=DEEPGRAM_API_KEY,
                language=Language.EN,
                enable_diarization=False,
            )
            service_refs["stt"] = stt

            # Log additional info for Deepgram
            if STT_PROVIDER == "deepgram":
                logger.info("βœ“ Deepgram: 300ms endpointing for turn detection")
                logger.info("βœ“ Deepgram: VAD events enabled for speech detection")

        except Exception as e:
            logger.error(f"Failed to initialize {STT_PROVIDER} STT: {e}", exc_info=True)
            return

        # ====================================================================
        # TEXT-TO-SPEECH SERVICE
        # ====================================================================

        try:
            tts = create_tts_service(
                provider=TTS_PROVIDER,
                elevenlabs_api_key=ELEVENLABS_API_KEY,
                elevenlabs_voice_id=ELEVENLABS_VOICE_ID,
                qwen_model=QWEN3_TTS_MODEL,
                qwen_device=QWEN3_TTS_DEVICE,
                qwen_ref_audio=QWEN3_TTS_REF_AUDIO,
            )
            service_refs["tts"] = tts
        except Exception as e:
            logger.error(f"Failed to initialize TTS service: {e}", exc_info=True)
            return

        # ====================================================================
        # LLM SERVICE & TOOLS
        # ====================================================================

        logger.info("Initializing LLM via DeepInfra...")
        llm = None
        try:
            llm = OpenAILLMService(
                api_key=DEEPINFRA_API_KEY,
                base_url=DEEPINFRA_BASE_URL,
                model=DEEPINFRA_MODEL 
            )
            
            character_dir = os.path.join(os.path.dirname(__file__), "character")
            persona_params = load_persona_ini(os.path.join(character_dir, "persona.ini"))
            tars_data = load_tars_json(os.path.join(character_dir, "TARS.json"))
            system_prompt = build_tars_system_prompt(persona_params, tars_data)

            # Create tool schemas (these return FunctionSchema objects)
            fetch_image_tool = create_fetch_image_schema()
            persona_tool = create_adjust_persona_schema()
            identity_tool = create_identity_schema()
            crossword_hint_tool = create_crossword_hint_schema()
            movement_tool = create_movement_schema()
            camera_capture_tool = create_camera_capture_schema()

            # Pass FunctionSchema objects directly to standard_tools
            tools = ToolsSchema(
                standard_tools=[
                    fetch_image_tool,
                    persona_tool,
                    identity_tool,
                    crossword_hint_tool,
                    movement_tool,
                    camera_capture_tool,
                ]
            )
            messages = [system_prompt]
            context = LLMContext(messages, tools)

            llm.register_function("fetch_user_image", fetch_user_image)
            llm.register_function("adjust_persona_parameter", adjust_persona_parameter)
            llm.register_function("get_crossword_hint", get_crossword_hint)
            llm.register_function("execute_movement", execute_movement)
            llm.register_function("capture_camera_view", capture_camera_view)

            pipeline_unifier = IdentityUnifier(client_id)
            async def wrapped_set_identity(params: FunctionCallParams):
                name = params.arguments["name"]
                logger.info(f"πŸ‘€ Identity discovered: {name}")

                old_id = client_state["client_id"]
                new_id = f"user_{name.lower().replace(' ', '_')}"

                if old_id != new_id:
                    logger.info(f"πŸ”„ Switching User ID: {old_id} -> {new_id}")
                    client_state["client_id"] = new_id

                    # Update the pipeline unifier to use new identity
                    pipeline_unifier.target_user_id = new_id
                    logger.info(f"βœ“ Updated pipeline unifier with new ID: {new_id}")

                    # Update memory service with new user_id
                    if memory_service:
                        memory_service.user_id = new_id
                        logger.info(f"βœ“ Updated memory service user_id to: {new_id}")

                    # Notify frontend of identity change
                    try:
                        if webrtc_connection and webrtc_connection.is_connected():
                            webrtc_connection.send_app_message({
                                "type": "identity_update",
                                "old_id": old_id,
                                "new_id": new_id,
                                "name": name
                            })
                            logger.info(f"πŸ“€ Sent identity update to frontend: {new_id}")
                    except Exception as e:
                        logger.warning(f"Failed to send identity update to frontend: {e}")

                await params.result_callback(f"Identity updated to {name}.")

            llm.register_function("set_user_identity", wrapped_set_identity)
            logger.info(f"βœ“ LLM initialized with model: {DEEPINFRA_MODEL}")

        except Exception as e:
            logger.error(f"Failed to initialize LLM: {e}", exc_info=True)
            return

        # ====================================================================
        # VISION & GATING SERVICES
        # ====================================================================

        logger.info("Initializing Moondream vision service...")
        moondream = None
        try:
            moondream = MoondreamService(model="vikhyatk/moondream2", revision="2025-01-09")
            logger.info("βœ“ Moondream vision service initialized")
        except Exception as e:
            logger.error(f"Failed to initialize Moondream: {e}")
            return

        # ====================================================================
        # TARS DISPLAY - Note: Display control via gRPC in robot mode only
        # ====================================================================

        logger.info("TARS Display features available in robot mode (tars_bot.py)")
        tars_client = None

        logger.info("Initializing Visual Observer...")
        visual_observer = VisualObserver(
            vision_client=moondream,
            enable_face_detection=True,
            tars_client=tars_client
        )
        logger.info("βœ“ Visual Observer initialized")

        logger.info("Initializing Emotional State Monitor...")
        emotional_monitor = EmotionalStateMonitor(
            vision_client=moondream,
            model="vikhyatk/moondream2",
            sampling_interval=EMOTIONAL_SAMPLING_INTERVAL,
            intervention_threshold=EMOTIONAL_INTERVENTION_THRESHOLD,
            enabled=EMOTIONAL_MONITORING_ENABLED,
            auto_intervene=False,  # Let gating layer handle intervention decisions
        )
        logger.info(f"βœ“ Emotional State Monitor initialized (enabled: {EMOTIONAL_MONITORING_ENABLED})")
        logger.info(f"   Mode: Integrated with gating layer for smarter decisions")

        logger.info("Initializing Gating Layer...")
        gating_layer = InterventionGating(
            api_key=DEEPINFRA_API_KEY,
            base_url=DEEPINFRA_BASE_URL,
            model=DEEPINFRA_GATING_MODEL,
            visual_observer=visual_observer,
            emotional_monitor=emotional_monitor
        )
        logger.info(f"βœ“ Gating Layer initialized with emotional state integration")

        # ====================================================================
        # MEMORY SERVICE
        # ====================================================================

        # Memory service: Hybrid search combining vector similarity (70%) and BM25 keyword matching (30%)
        # Optimized for voice AI with <50ms latency target
        logger.info("Initializing hybrid memory service...")
        memory_service = None
        try:
            memory_service = HybridMemoryService(
                user_id=client_id,
                db_path="./memory_data/memory.sqlite",
                search_limit=3,
                search_timeout_ms=100,  # Hybrid search needs ~60-80ms, allow buffer
                vector_weight=0.7,      # 70% semantic similarity
                bm25_weight=0.3,        # 30% keyword matching
                system_prompt_prefix="From our conversations:\n",
            )
            logger.info(f"βœ“ Hybrid memory service initialized for {client_id}")
        except Exception as e:
            logger.error(f"Failed to initialize hybrid memory service: {e}")
            logger.info("  Continuing without memory service...")
            memory_service = None  # Continue without memory if it fails

        # ====================================================================
        # CONTEXT AGGREGATOR & PERSONA STORAGE
        # ====================================================================

        # Configure user turn aggregation
        # STT services (Speechmatics, Deepgram) handle turn detection internally
        user_params = LLMUserAggregatorParams(
            user_turn_stop_timeout=1.5
        )

        context_aggregator = LLMContextAggregatorPair(
            context,
            user_params=user_params
        )

        
        persona_storage = get_persona_storage()
        persona_storage["persona_params"] = persona_params
        persona_storage["tars_data"] = tars_data
        persona_storage["context_aggregator"] = context_aggregator

        # ====================================================================
        # LOGGING PROCESSORS
        # ====================================================================

        transcription_observer = TranscriptionObserver(
            webrtc_connection=webrtc_connection,
            client_state=client_state
        )
        assistant_observer = AssistantResponseObserver(webrtc_connection=webrtc_connection)
        tts_state_observer = TTSStateObserver(webrtc_connection=webrtc_connection)
        vision_observer = VisionObserver(webrtc_connection=webrtc_connection)
        display_events_observer = DisplayEventsObserver(tars_client=tars_client)

        # Create MetricsObserver (non-intrusive monitoring outside pipeline)
        metrics_observer = MetricsObserver(
            webrtc_connection=webrtc_connection,
            stt_service=stt
        )

        # Turn tracking observer (for debugging turn detection)
        turn_observer = TurnTrackingObserver()

        @turn_observer.event_handler("on_turn_started")
        async def on_turn_started(*args, **kwargs):
            turn_number = args[1] if len(args) > 1 else kwargs.get('turn_number', 0)
            logger.info(f"πŸ—£οΈ  [TurnObserver] Turn STARTED: {turn_number}")
            # Notify metrics observer of new turn
            metrics_observer.start_turn(turn_number)

        @turn_observer.event_handler("on_turn_ended")
        async def on_turn_ended(*args, **kwargs):
            turn_number = args[1] if len(args) > 1 else kwargs.get('turn_number', 0)
            logger.info(f"πŸ—£οΈ  [TurnObserver] Turn ENDED: {turn_number}")

        # ====================================================================
        # PIPELINE ASSEMBLY
        # ====================================================================

        logger.info("Creating audio/video pipeline...")

        pipeline = Pipeline([
            pipecat_transport.input(),
            # emotional_monitor,  # Real-time emotional state monitoring
            stt,
            pipeline_unifier,
            context_aggregator.user(),
            memory_service,  # Hybrid memory (70% vector + 30% BM25) for automatic recall/storage
            # gating_layer,  # AI decision system (with emotional state integration)
            llm,
            SilenceFilter(),
            tts,
            pipecat_transport.output(),
            context_aggregator.assistant(),
        ])

        # ====================================================================
        # EVENT HANDLERS
        # ====================================================================

        task_ref = {"task": None}

        @pipecat_transport.event_handler("on_client_connected")
        async def on_client_connected(transport, client):
            logger.info("Pipecat Client connected")
            try:
                if webrtc_connection.is_connected():
                    webrtc_connection.send_app_message({"type": "system", "message": "Connection established"})

                    # Send service configuration info with provider and model details
                    llm_display = DEEPINFRA_MODEL.split('/')[-1] if '/' in DEEPINFRA_MODEL else DEEPINFRA_MODEL

                    if TTS_PROVIDER == "elevenlabs":
                        tts_display = "ElevenLabs: eleven_flash_v2_5"
                    else:
                        tts_model = QWEN3_TTS_MODEL.split('/')[-1] if '/' in QWEN3_TTS_MODEL else QWEN3_TTS_MODEL
                        tts_display = f"Qwen3-TTS: {tts_model}"

                    # Format STT provider name for display
                    stt_display = {
                        "speechmatics": "Speechmatics",
                        "deepgram": "Deepgram Nova-2"
                    }.get(STT_PROVIDER, STT_PROVIDER.capitalize())

                    service_info = {
                        "stt": stt_display,
                        "memory": "Hybrid Search (SQLite)",
                        "llm": f"DeepInfra: {llm_display}",
                        "tts": tts_display
                    }

                    # Store in shared state for Gradio UI
                    metrics_store.set_service_info(service_info)

                    # Send via WebRTC
                    webrtc_connection.send_app_message({
                        "type": "service_info",
                        **service_info
                    })
                    logger.info(f"πŸ“Š Sent service info to frontend: STT={stt_display}, LLM={llm_display}, TTS={tts_display}")
            except Exception as e:
                logger.error(f"❌ Error sending service info: {e}")

            if task_ref["task"]:
                verbosity = persona_params.get("verbosity", 10) if persona_params else 10
                intro_instruction = get_introduction_instruction(client_state['client_id'], verbosity)
                
                if context and hasattr(context, "messages"):
                     context.messages.append(intro_instruction)

                logger.info("Waiting for pipeline to warm up...")
                await asyncio.sleep(2.0)
                
                logger.info("Queueing initial LLM greeting...")
                await task_ref["task"].queue_frames([LLMRunFrame()])

        @pipecat_transport.event_handler("on_client_disconnected")
        async def on_client_disconnected(transport, client):
            logger.info("Pipecat Client disconnected")
            if task_ref["task"]:
                await task_ref["task"].cancel()
            await _cleanup_services(service_refs)

        # ====================================================================
        # PIPELINE EXECUTION
        # ====================================================================

        # Enable built-in Pipecat metrics for latency tracking
        user_bot_latency_observer = UserBotLatencyLogObserver()

        task = PipelineTask(
            pipeline,
            params=PipelineParams(
                enable_metrics=True,              # Enable performance metrics (TTFB, latency)
                enable_usage_metrics=True,        # Enable LLM/TTS usage metrics
                report_only_initial_ttfb=False,   # Report all TTFB measurements
            ),
            observers=[
                turn_observer,
                metrics_observer,
                transcription_observer,
                assistant_observer,
                tts_state_observer,
                vision_observer,
                display_events_observer,          # Send events to TARS display
                user_bot_latency_observer,        # Measures total user→bot response time
            ],  # Non-intrusive monitoring
        )
        task_ref["task"] = task
        runner = PipelineRunner(handle_sigint=False)

        logger.info("Starting pipeline runner...")
        
        try:
            await runner.run(task)
        except Exception:
            raise
        finally:
            await _cleanup_services(service_refs)

    except Exception as e:
        logger.error(f"Error in bot pipeline: {e}", exc_info=True)
    finally:
        await _cleanup_services(service_refs)