WebEssentz commited on
Commit
decb45c
·
1 Parent(s): 6f9c53c

Realtime Flow

Browse files
Files changed (1) hide show
  1. src/agent_session/main.py +77 -13
src/agent_session/main.py CHANGED
@@ -1,6 +1,11 @@
1
  #!/usr/bin/env python3
2
  """
3
  Agent Session for Avurna Flow (Final Corrected Version)
 
 
 
 
 
4
  """
5
  import os
6
  import json
@@ -10,63 +15,92 @@ from pydantic import BaseModel
10
  import uvicorn
11
  from fastapi.middleware.cors import CORSMiddleware
12
 
 
13
  from livekit.rtc import Room
14
  from livekit.agents import Agent, AgentSession
15
  from livekit.agents.llm import LLM
16
  from livekit.agents.stt.stream_adapter import StreamAdapter
17
  from livekit.plugins.google import LLM as GoogleLLM
18
  from livekit.plugins.groq import STT
19
- from livekit.plugins.hume import TTS, VoiceByName, VoiceProvider
20
  from livekit.plugins.silero import VAD
21
 
22
  from src.agent_session.constants import SYSTEM_PROMPT, GREETING_INSTRUCTIONS
23
  from src.utils import validate_env_vars
24
 
 
25
  app = FastAPI()
26
- origins = ["*"]
27
- app.add_middleware(CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
28
 
 
 
 
 
 
 
 
 
 
 
 
29
  class JoinRoomRequest(BaseModel):
 
30
  room_name: str
31
  agent_token: str
32
 
 
33
  class VoiceAssistant(Agent):
 
34
  def __init__(self):
35
  super().__init__(instructions=SYSTEM_PROMPT)
36
 
 
37
  async def send_agent_state(room: Room, state: str):
 
38
  try:
 
39
  msg = json.dumps({"type": "agent_state", "state": state})
40
  await room.local_participant.publish_data(msg)
41
  print(f"DEBUG: Sent agent state: {state}")
42
  except Exception as e:
43
  print(f"DEBUG: Error publishing agent state: {e}")
44
 
45
- # --- THE DEFINITIVE FIX IS HERE ---
46
  class LLMStateWrapper(LLM):
 
 
 
 
 
 
47
  def __init__(self, llm: LLM, room: Room):
48
  super().__init__()
49
  self._llm = llm
50
  self._room = room
51
 
52
- # This signature accepts ANY named arguments the library might send.
53
  async def chat(self, **kwargs):
 
 
 
 
 
54
  await send_agent_state(self._room, "thinking")
55
 
56
- # We find and extract 'history' from the arguments.
57
- # This makes our code resilient to the library's internal calling convention.
58
  history = kwargs.pop('history', [])
59
-
60
- # We pass the extracted history and all other arguments along.
61
  async for chunk in self._llm.chat(history=history, **kwargs):
62
  yield chunk
63
 
64
  await send_agent_state(self._room, "listening")
65
 
66
- # The rest of the file is correct and can remain unchanged.
67
- # For absolute certainty, here is the full correct file again.
68
-
69
  async def run_agent_session(room_name: str, agent_token: str):
 
 
 
 
70
  livekit_url = os.getenv("LIVEKIT_URL")
71
  room = Room()
72
 
@@ -75,13 +109,23 @@ async def run_agent_session(room_name: str, agent_token: str):
75
  await room.connect(livekit_url, agent_token)
76
  print("DEBUG: 2. Connection successful.")
77
 
 
78
  await send_agent_state(room, "listening")
79
 
80
  print("DEBUG: 3. Initializing plugins...")
 
 
81
  llm_wrapper = LLMStateWrapper(llm=GoogleLLM(model="gemini-1.5-flash"), room=room)
 
 
82
  vad = VAD.load(min_speech_duration=0.1)
 
 
83
  stt = StreamAdapter(stt=STT(model="whisper-large-v3-turbo"), vad=vad)
 
 
84
  tts = TTS(voice=VoiceByName(name="Tiktok Fashion Influencer"), instant_mode=True)
 
85
  print("DEBUG: 4. Plugins initialized.")
86
 
87
  print("DEBUG: 5. Creating AgentSession...")
@@ -89,9 +133,11 @@ async def run_agent_session(room_name: str, agent_token: str):
89
  print("DEBUG: 6. AgentSession created.")
90
 
91
  print("DEBUG: 7. Starting session and passing room to .start()...")
 
92
  await session.start(agent=VoiceAssistant(), room=room)
93
 
94
  print("DEBUG: 8. Session started. Generating initial greeting...")
 
95
  await send_agent_state(room, "speaking")
96
  await session.generate_reply(instructions=GREETING_INSTRUCTIONS)
97
 
@@ -101,19 +147,37 @@ async def run_agent_session(room_name: str, agent_token: str):
101
  print(f"FATAL ERROR in agent session: {e}")
102
  print(traceback.format_exc())
103
  finally:
 
104
  print(f"DEBUG: Agent session for room {room_name} is ending. Cleaning up.")
105
  await room.disconnect()
106
 
 
107
  @app.post("/join-room")
108
  async def join_room(req: JoinRoomRequest, background_tasks: BackgroundTasks):
 
 
 
 
109
  print(f"DEBUG: Received POST request to /join-room for: {req.room_name}")
110
  background_tasks.add_task(run_agent_session, req.room_name, req.agent_token)
111
  return {"status": "agent_triggered"}
112
 
113
  @app.get("/")
114
  async def root():
 
115
  return {"status": "avurna_agent_server_online"}
116
 
 
117
  if __name__ == "__main__":
118
- validate_env_vars(["HUME_API_KEY", "LIVEKIT_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET", "GROQ_API_KEY", "GOOGLE_API_KEY"])
 
 
 
 
 
 
 
 
 
 
119
  uvicorn.run(app, host="0.0.0.0", port=7860)
 
1
  #!/usr/bin/env python3
2
  """
3
  Agent Session for Avurna Flow (Final Corrected Version)
4
+
5
+ This script sets up a FastAPI web server to manage a voice-based AI agent
6
+ that connects to a LiveKit room. The core issue of the TypeError is resolved
7
+ by implementing the LLMStateWrapper, which correctly handles the asynchronous
8
+ generator returned by the LLM's chat method.
9
  """
10
  import os
11
  import json
 
15
  import uvicorn
16
  from fastapi.middleware.cors import CORSMiddleware
17
 
18
+ # Import LiveKit and plugin components
19
  from livekit.rtc import Room
20
  from livekit.agents import Agent, AgentSession
21
  from livekit.agents.llm import LLM
22
  from livekit.agents.stt.stream_adapter import StreamAdapter
23
  from livekit.plugins.google import LLM as GoogleLLM
24
  from livekit.plugins.groq import STT
25
+ from livekit.plugins.hume import TTS, VoiceByName
26
  from livekit.plugins.silero import VAD
27
 
28
  from src.agent_session.constants import SYSTEM_PROMPT, GREETING_INSTRUCTIONS
29
  from src.utils import validate_env_vars
30
 
31
+ # --- FastAPI Application Setup ---
32
  app = FastAPI()
 
 
33
 
34
+ # Configure CORS (Cross-Origin Resource Sharing) to allow all origins
35
+ origins = ["*"]
36
+ app.add_middleware(
37
+ CORSMiddleware,
38
+ allow_origins=origins,
39
+ allow_credentials=True,
40
+ allow_methods=["*"],
41
+ allow_headers=["*"]
42
+ )
43
+
44
+ # --- Pydantic Model for API Request ---
45
  class JoinRoomRequest(BaseModel):
46
+ """Defines the expected data structure for a /join-room request."""
47
  room_name: str
48
  agent_token: str
49
 
50
+ # --- Custom Agent Definition ---
51
  class VoiceAssistant(Agent):
52
+ """A simple voice assistant agent with a predefined system prompt."""
53
  def __init__(self):
54
  super().__init__(instructions=SYSTEM_PROMPT)
55
 
56
+ # --- Utility Function for State Publishing ---
57
  async def send_agent_state(room: Room, state: str):
58
+ """Publishes the agent's current state to the room via data channel."""
59
  try:
60
+ # The message is structured as a JSON object for easy parsing by clients
61
  msg = json.dumps({"type": "agent_state", "state": state})
62
  await room.local_participant.publish_data(msg)
63
  print(f"DEBUG: Sent agent state: {state}")
64
  except Exception as e:
65
  print(f"DEBUG: Error publishing agent state: {e}")
66
 
67
+ # --- THE DEFINITIVE FIX: LLM Wrapper ---
68
  class LLMStateWrapper(LLM):
69
+ """
70
+ Wraps an LLM instance to correctly handle its async generator `chat` method
71
+ and to inject agent state updates ("thinking", "listening") into the process.
72
+ This class solves the `TypeError: 'async_generator' object does not support
73
+ the asynchronous context manager protocol`.
74
+ """
75
  def __init__(self, llm: LLM, room: Room):
76
  super().__init__()
77
  self._llm = llm
78
  self._room = room
79
 
 
80
  async def chat(self, **kwargs):
81
+ """
82
+ This method is called by the AgentSession. It intercepts the call to the LLM,
83
+ sends the 'thinking' state, properly iterates over the LLM's async generator,
84
+ and then sends the 'listening' state upon completion.
85
+ """
86
  await send_agent_state(self._room, "thinking")
87
 
88
+ # Extract 'history' and pass all arguments along to the underlying LLM.
89
+ # This makes the wrapper resilient to future library updates.
90
  history = kwargs.pop('history', [])
91
+
92
+ # Use `async for` to correctly consume the asynchronous generator.
93
  async for chunk in self._llm.chat(history=history, **kwargs):
94
  yield chunk
95
 
96
  await send_agent_state(self._room, "listening")
97
 
98
+ # --- Main Agent Session Logic ---
 
 
99
  async def run_agent_session(room_name: str, agent_token: str):
100
+ """
101
+ This function contains the main logic for connecting the agent to a
102
+ LiveKit room and running the session.
103
+ """
104
  livekit_url = os.getenv("LIVEKIT_URL")
105
  room = Room()
106
 
 
109
  await room.connect(livekit_url, agent_token)
110
  print("DEBUG: 2. Connection successful.")
111
 
112
+ # Agent is ready to receive input
113
  await send_agent_state(room, "listening")
114
 
115
  print("DEBUG: 3. Initializing plugins...")
116
+
117
+ # Wrap the Google LLM with our custom state-aware wrapper
118
  llm_wrapper = LLMStateWrapper(llm=GoogleLLM(model="gemini-1.5-flash"), room=room)
119
+
120
+ # Configure Voice Activity Detection (VAD)
121
  vad = VAD.load(min_speech_duration=0.1)
122
+
123
+ # Configure Speech-to-Text (STT) with the VAD adapter
124
  stt = StreamAdapter(stt=STT(model="whisper-large-v3-turbo"), vad=vad)
125
+
126
+ # Configure Text-to-Speech (TTS)
127
  tts = TTS(voice=VoiceByName(name="Tiktok Fashion Influencer"), instant_mode=True)
128
+
129
  print("DEBUG: 4. Plugins initialized.")
130
 
131
  print("DEBUG: 5. Creating AgentSession...")
 
133
  print("DEBUG: 6. AgentSession created.")
134
 
135
  print("DEBUG: 7. Starting session and passing room to .start()...")
136
+ # The .start() method begins the main processing loop for the agent
137
  await session.start(agent=VoiceAssistant(), room=room)
138
 
139
  print("DEBUG: 8. Session started. Generating initial greeting...")
140
+ # Proactively generate a greeting to start the conversation
141
  await send_agent_state(room, "speaking")
142
  await session.generate_reply(instructions=GREETING_INSTRUCTIONS)
143
 
 
147
  print(f"FATAL ERROR in agent session: {e}")
148
  print(traceback.format_exc())
149
  finally:
150
+ # Ensure cleanup and disconnection on exit or error
151
  print(f"DEBUG: Agent session for room {room_name} is ending. Cleaning up.")
152
  await room.disconnect()
153
 
154
+ # --- FastAPI API Endpoints ---
155
  @app.post("/join-room")
156
  async def join_room(req: JoinRoomRequest, background_tasks: BackgroundTasks):
157
+ """
158
+ API endpoint to trigger an agent to join a room.
159
+ It runs the agent session as a background task to not block the HTTP response.
160
+ """
161
  print(f"DEBUG: Received POST request to /join-room for: {req.room_name}")
162
  background_tasks.add_task(run_agent_session, req.room_name, req.agent_token)
163
  return {"status": "agent_triggered"}
164
 
165
  @app.get("/")
166
  async def root():
167
+ """A simple health check endpoint."""
168
  return {"status": "avurna_agent_server_online"}
169
 
170
+ # --- Main Execution Block ---
171
  if __name__ == "__main__":
172
+ # Validate that all required environment variables are set before starting
173
+ validate_env_vars([
174
+ "HUME_API_KEY",
175
+ "LIVEKIT_URL",
176
+ "LIVEKIT_API_KEY",
177
+ "LIVEKIT_API_SECRET",
178
+ "GROQ_API_KEY",
179
+ "GOOGLE_API_KEY"
180
+ ])
181
+
182
+ # Run the FastAPI server using Uvicorn
183
  uvicorn.run(app, host="0.0.0.0", port=7860)