WebEssentz commited on
Commit
01db28c
·
1 Parent(s): d155731

Realtime Flow

Browse files
Files changed (1) hide show
  1. src/agent_session/main.py +158 -56
src/agent_session/main.py CHANGED
@@ -1,8 +1,5 @@
1
  #!/usr/bin/env python3
2
-
3
- """
4
- Agent Session for Avurna Flow (Corrected based on Hume Docs)
5
- """
6
 
7
  import asyncio
8
  import os
@@ -21,26 +18,56 @@ from livekit.plugins.google import LLM as GoogleLLM
21
  from livekit.plugins.groq import STT
22
  from livekit.plugins.hume import TTS, VoiceByName, VoiceProvider
23
  from livekit.plugins.silero import VAD
24
-
25
  from src.agent_session.constants import SYSTEM_PROMPT, GREETING_INSTRUCTIONS
26
  from src.utils import validate_env_vars
27
 
28
- # FastAPI app setup
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
  @asynccontextmanager
30
  async def lifespan(app: FastAPI):
31
- async with aiohttp.ClientSession() as session:
32
- app.state.http_session = session
33
- yield
 
 
 
 
 
 
34
 
35
  app = FastAPI(lifespan=lifespan)
 
36
  origins = ["*"]
37
- app.add_middleware(CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
 
 
 
 
 
 
38
 
39
  class JoinRoomRequest(BaseModel):
40
  room_name: str
41
  agent_token: str
42
 
43
  async def send_agent_state(room: Room, state: str):
 
44
  try:
45
  msg = json.dumps({"type": "agent_state", "state": state})
46
  await room.local_participant.publish_data(msg)
@@ -52,70 +79,120 @@ class VoiceAssistant(Agent):
52
  def __init__(self):
53
  super().__init__(instructions=SYSTEM_PROMPT)
54
 
55
- async def entrypoint(ctx: JobContext) -> None:
56
- """
57
- Configure and run STT, LLM, and TTS in a LiveKit session.
58
- """
59
- await ctx.connect()
60
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61
  # Voice-activity detection + buffering for non-streaming STT
62
  vad = VAD.load(
63
- min_speech_duration=0.1,
64
  min_silence_duration=0.5
65
  )
66
-
 
 
 
 
 
 
 
 
 
 
 
 
67
  session = AgentSession(
68
  vad=vad,
69
  stt=StreamAdapter(
70
  stt=STT(
71
- model="whisper-large-v3-turbo",
72
  language="en",
73
  ),
74
  vad=vad,
75
  ),
76
  llm=GoogleLLM(
77
- model="gemini-2.5-flash",
78
  temperature=0.5,
79
  ),
80
- tts=TTS(
81
- voice=VoiceByName(
82
- name="Male English Actor",
83
- provider=VoiceProvider.hume,
84
- ),
85
- instant_mode=True
86
- ),
87
  )
 
 
88
 
89
- # Start the session and generate greeting
90
- await session.start(agent=VoiceAssistant(), room=ctx.room)
91
- await session.generate_reply(instructions=GREETING_INSTRUCTIONS)
92
 
93
- # Keep the session alive by waiting for it to complete
94
- # The session will run until the room is disconnected
95
  try:
96
- # Wait indefinitely - the session manages its own lifecycle
97
- await asyncio.Event().wait()
98
- except asyncio.CancelledError:
99
- pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
100
 
101
- # Custom entrypoint for FastAPI integration
102
  async def run_agent_with_room(room_name: str, agent_token: str):
103
- """
104
- Custom entrypoint that connects to a specific room and keeps it alive
105
- """
106
- livekit_url = os.getenv("LIVEKIT_URL")
107
-
108
- # Create a mock JobContext for the specific room
109
- class MockJobContext:
110
- def __init__(self, room_name: str, agent_token: str):
111
- self.room_name = room_name
112
- self.agent_token = agent_token
113
- self.room = Room()
114
-
115
- async def connect(self):
116
- await self.room.connect(livekit_url, self.agent_token)
117
-
118
- ctx = MockJobContext(room_name, agent_token)
119
 
120
  try:
121
  await entrypoint(ctx)
@@ -123,7 +200,7 @@ async def run_agent_with_room(room_name: str, agent_token: str):
123
  print(f"FATAL ERROR in agent session: {e}")
124
  print(traceback.format_exc())
125
  finally:
126
- await ctx.room.disconnect()
127
 
128
  @app.post("/join-room")
129
  async def join_room(req: JoinRoomRequest, background_tasks: BackgroundTasks):
@@ -135,6 +212,31 @@ async def join_room(req: JoinRoomRequest, background_tasks: BackgroundTasks):
135
  async def root():
136
  return {"status": "avurna_agent_server_online"}
137
 
 
 
 
 
 
 
 
 
 
 
 
138
  if __name__ == "__main__":
139
- validate_env_vars(["HUME_API_KEY", "LIVEKIT_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET", "GROQ_API_KEY", "GOOGLE_API_KEY"])
140
- uvicorn.run(app, host="0.0.0.0", port=7860)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  #!/usr/bin/env python3
2
+ """Agent Session for Avurna Flow (Fixed HTTP Session Management)"""
 
 
 
3
 
4
  import asyncio
5
  import os
 
18
  from livekit.plugins.groq import STT
19
  from livekit.plugins.hume import TTS, VoiceByName, VoiceProvider
20
  from livekit.plugins.silero import VAD
 
21
  from src.agent_session.constants import SYSTEM_PROMPT, GREETING_INSTRUCTIONS
22
  from src.utils import validate_env_vars
23
 
24
+ # Global HTTP session for the entire application
25
+ _global_http_session: aiohttp.ClientSession | None = None
26
+
27
+ async def get_http_session() -> aiohttp.ClientSession:
28
+ """Get or create the global HTTP session"""
29
+ global _global_http_session
30
+ if _global_http_session is None or _global_http_session.closed:
31
+ _global_http_session = aiohttp.ClientSession()
32
+ return _global_http_session
33
+
34
+ async def cleanup_http_session():
35
+ """Clean up the global HTTP session"""
36
+ global _global_http_session
37
+ if _global_http_session and not _global_http_session.closed:
38
+ await _global_http_session.close()
39
+ _global_http_session = None
40
+
41
+ # FastAPI app setup with proper lifecycle management
42
  @asynccontextmanager
43
  async def lifespan(app: FastAPI):
44
+ # Startup: Initialize the global HTTP session
45
+ await get_http_session()
46
+ print("HTTP session initialized")
47
+
48
+ yield
49
+
50
+ # Shutdown: Clean up the HTTP session
51
+ await cleanup_http_session()
52
+ print("HTTP session cleaned up")
53
 
54
  app = FastAPI(lifespan=lifespan)
55
+
56
  origins = ["*"]
57
+ app.add_middleware(
58
+ CORSMiddleware,
59
+ allow_origins=origins,
60
+ allow_credentials=True,
61
+ allow_methods=["*"],
62
+ allow_headers=["*"]
63
+ )
64
 
65
  class JoinRoomRequest(BaseModel):
66
  room_name: str
67
  agent_token: str
68
 
69
  async def send_agent_state(room: Room, state: str):
70
+ """Send agent state to the room"""
71
  try:
72
  msg = json.dumps({"type": "agent_state", "state": state})
73
  await room.local_participant.publish_data(msg)
 
79
  def __init__(self):
80
  super().__init__(instructions=SYSTEM_PROMPT)
81
 
82
+ class CustomJobContext:
83
+ """Custom JobContext that properly manages the room connection"""
84
+
85
+ def __init__(self, room_name: str, agent_token: str):
86
+ self.room_name = room_name
87
+ self.agent_token = agent_token
88
+ self.room = Room()
89
+ self._connected = False
90
+
91
+ async def connect(self):
92
+ """Connect to the LiveKit room"""
93
+ if not self._connected:
94
+ livekit_url = os.getenv("LIVEKIT_URL")
95
+ if not livekit_url:
96
+ raise ValueError("LIVEKIT_URL environment variable not set")
97
+
98
+ await self.room.connect(livekit_url, self.agent_token)
99
+ self._connected = True
100
+ print(f"Connected to room: {self.room_name}")
101
+
102
+ async def disconnect(self):
103
+ """Disconnect from the LiveKit room"""
104
+ if self._connected:
105
+ await self.room.disconnect()
106
+ self._connected = False
107
+ print(f"Disconnected from room: {self.room_name}")
108
+
109
+ async def create_agent_session(ctx: CustomJobContext) -> AgentSession:
110
+ """Create and configure the agent session with proper HTTP session management"""
111
+
112
+ # Get the global HTTP session
113
+ http_session = await get_http_session()
114
+
115
  # Voice-activity detection + buffering for non-streaming STT
116
  vad = VAD.load(
117
+ min_speech_duration=0.1,
118
  min_silence_duration=0.5
119
  )
120
+
121
+ # Create TTS with explicit HTTP session
122
+ tts = TTS(
123
+ voice=VoiceByName(
124
+ name="Male English Actor",
125
+ provider=VoiceProvider.hume,
126
+ ),
127
+ instant_mode=True,
128
+ # Pass the HTTP session to avoid the context error
129
+ session=http_session
130
+ )
131
+
132
+ # Create the agent session
133
  session = AgentSession(
134
  vad=vad,
135
  stt=StreamAdapter(
136
  stt=STT(
137
+ model="whisper-large-v3-turbo",
138
  language="en",
139
  ),
140
  vad=vad,
141
  ),
142
  llm=GoogleLLM(
143
+ model="gemini-2.5-flash",
144
  temperature=0.5,
145
  ),
146
+ tts=tts,
 
 
 
 
 
 
147
  )
148
+
149
+ return session
150
 
151
+ async def entrypoint(ctx: CustomJobContext) -> None:
152
+ """Configure and run STT, LLM, and TTS in a LiveKit session"""
 
153
 
 
 
154
  try:
155
+ # Connect to the room
156
+ await ctx.connect()
157
+
158
+ # Send initial state
159
+ await send_agent_state(ctx.room, "listening")
160
+
161
+ # Create the agent session
162
+ session = await create_agent_session(ctx)
163
+
164
+ # Start the session
165
+ await session.start(agent=VoiceAssistant(), room=ctx.room)
166
+
167
+ # Send greeting state
168
+ await send_agent_state(ctx.room, "thinking")
169
+
170
+ # Generate greeting
171
+ await session.generate_reply(instructions=GREETING_INSTRUCTIONS)
172
+
173
+ # Back to listening
174
+ await send_agent_state(ctx.room, "listening")
175
+
176
+ # Keep the session alive
177
+ print("Agent session started successfully, waiting for interactions...")
178
+
179
+ # Monitor room connection and keep alive
180
+ while ctx.room.connection_state.name in ['CONNECTED', 'CONNECTING']:
181
+ await asyncio.sleep(1)
182
+
183
+ print("Room disconnected, ending agent session")
184
+
185
+ except Exception as e:
186
+ print(f"Error in agent session: {e}")
187
+ print(traceback.format_exc())
188
+ await send_agent_state(ctx.room, "error")
189
+ finally:
190
+ # Clean up
191
+ await ctx.disconnect()
192
 
 
193
  async def run_agent_with_room(room_name: str, agent_token: str):
194
+ """Run the agent in a specific room"""
195
+ ctx = CustomJobContext(room_name, agent_token)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
196
 
197
  try:
198
  await entrypoint(ctx)
 
200
  print(f"FATAL ERROR in agent session: {e}")
201
  print(traceback.format_exc())
202
  finally:
203
+ await ctx.disconnect()
204
 
205
  @app.post("/join-room")
206
  async def join_room(req: JoinRoomRequest, background_tasks: BackgroundTasks):
 
212
  async def root():
213
  return {"status": "avurna_agent_server_online"}
214
 
215
+ @app.get("/health")
216
+ async def health():
217
+ """Health check endpoint"""
218
+ global _global_http_session
219
+ session_status = "healthy" if _global_http_session and not _global_http_session.closed else "needs_init"
220
+ return {
221
+ "status": "healthy",
222
+ "http_session": session_status,
223
+ "timestamp": asyncio.get_event_loop().time()
224
+ }
225
+
226
  if __name__ == "__main__":
227
+ # Validate required environment variables
228
+ required_vars = [
229
+ "HUME_API_KEY",
230
+ "LIVEKIT_URL",
231
+ "LIVEKIT_API_KEY",
232
+ "LIVEKIT_API_SECRET",
233
+ "GROQ_API_KEY",
234
+ "GOOGLE_API_KEY"
235
+ ]
236
+
237
+ validate_env_vars(required_vars)
238
+
239
+ print("Starting Avurna Agent Server...")
240
+ print("Required environment variables validated")
241
+
242
+ uvicorn.run(app, host="0.0.0.0", port=7860)