WebEssentz commited on
Commit
8e67cf0
·
1 Parent(s): 51a1cba

Realtime Flow

Browse files
Files changed (1) hide show
  1. src/agent_session/main.py +58 -75
src/agent_session/main.py CHANGED
@@ -1,4 +1,3 @@
1
- # Generated with 💚 by Avurna AI (2025)
2
  #!/usr/bin/env python3
3
 
4
  """
@@ -16,8 +15,7 @@ from pydantic import BaseModel
16
  import uvicorn
17
  from fastapi.middleware.cors import CORSMiddleware
18
  from livekit.rtc import Room
19
- from livekit.agents import Agent, AgentSession, JobContext
20
- from livekit.agents.llm import LLM
21
  from livekit.agents.stt.stream_adapter import StreamAdapter
22
  from livekit.plugins.google import LLM as GoogleLLM
23
  from livekit.plugins.groq import STT
@@ -27,7 +25,7 @@ from livekit.plugins.silero import VAD
27
  from src.agent_session.constants import SYSTEM_PROMPT, GREETING_INSTRUCTIONS
28
  from src.utils import validate_env_vars
29
 
30
- # FastAPI app setup (unchanged)
31
  @asynccontextmanager
32
  async def lifespan(app: FastAPI):
33
  async with aiohttp.ClientSession() as session:
@@ -50,93 +48,78 @@ async def send_agent_state(room: Room, state: str):
50
  except Exception as e:
51
  print(f"DEBUG: Error publishing agent state: {e}")
52
 
53
- class LLMStateWrapper(LLM):
54
- def __init__(self, llm: LLM, room: Room):
55
- super().__init__()
56
- self._llm = llm
57
- self._room = room
58
-
59
- @asynccontextmanager
60
- async def chat(self, **kwargs):
61
- await send_agent_state(self._room, "thinking")
62
- try:
63
- async with self._llm.chat(**kwargs) as stream:
64
- yield stream
65
- finally:
66
- await send_agent_state(self._room, "listening")
67
-
68
- # The VoiceAssistant is now simpler, as setup is handled externally.
69
  class VoiceAssistant(Agent):
70
  def __init__(self):
71
  super().__init__(instructions=SYSTEM_PROMPT)
72
 
73
- async def on_connected(self, ctx: JobContext):
74
- # This hook is still called, but setup is no longer needed here.
75
- # We can use it for logging or post-connection actions.
76
- print("DEBUG: VoiceAssistant agent has connected to the room.")
77
- await send_agent_state(ctx.room, "listening")
78
-
79
-
80
- # --- THIS FUNCTION IS NOW CORRECTED BASED ON THE HUME DOCS ---
81
- async def run_agent_session(room_name: str, agent_token: str):
82
- livekit_url = os.getenv("LIVEKIT_URL")
83
- room = Room()
84
- session = None
85
-
86
- try:
87
- print(f"DEBUG: 1. Connecting to LiveKit room '{room_name}'...")
88
- await room.connect(livekit_url, agent_token)
89
- print("DEBUG: 2. Connection successful.")
90
-
91
- # Create HTTP session that will persist for the entire session
92
- session = aiohttp.ClientSession()
93
-
94
- # 1. Instantiate all plugins
95
- vad = VAD.load(min_speech_duration=0.1, min_silence_duration=0.5)
96
- tts = TTS(
 
 
97
  voice=VoiceByName(
98
  name="Male English Actor",
99
  provider=VoiceProvider.hume,
100
  ),
101
- instant_mode=True,
102
- http_session=session,
103
- )
104
- stt = StreamAdapter(stt=STT(model="whisper-large-v3-turbo"), vad=vad)
105
- google_llm = GoogleLLM(model="gemini-2.5-flash")
106
- llm_wrapper = LLMStateWrapper(llm=google_llm, room=room)
107
-
108
- # 2. Create the AgentSession
109
- agent_session = AgentSession(
110
- stt=stt,
111
- llm=llm_wrapper,
112
- tts=tts,
113
- vad=vad,
114
- )
115
-
116
- # 3. Create an instance of your agent
117
- agent = VoiceAssistant()
118
-
119
- # 4. Start the session
120
- print("DEBUG: Starting AgentSession...")
121
- await send_agent_state(room, "speaking")
122
- await agent_session.start(agent=agent, room=room)
123
- await agent_session.generate_reply(instructions=GREETING_INSTRUCTIONS)
124
- print("DEBUG: Initial greeting complete. Agent is now fully operational.")
125
-
 
 
 
126
  except Exception as e:
127
  print(f"FATAL ERROR in agent session: {e}")
128
  print(traceback.format_exc())
129
  finally:
130
- print(f"DEBUG: Agent session for room {room_name} is ending.")
131
- if session:
132
- await session.close()
133
- await room.disconnect()
134
-
135
 
136
  @app.post("/join-room")
137
  async def join_room(req: JoinRoomRequest, background_tasks: BackgroundTasks):
138
  print(f"DEBUG: Received POST request to /join-room for: {req.room_name}")
139
- background_tasks.add_task(run_agent_session, req.room_name, req.agent_token)
140
  return {"status": "agent_triggered"}
141
 
142
  @app.get("/")
 
 
1
  #!/usr/bin/env python3
2
 
3
  """
 
15
  import uvicorn
16
  from fastapi.middleware.cors import CORSMiddleware
17
  from livekit.rtc import Room
18
+ from livekit.agents import Agent, AgentSession, JobContext, WorkerOptions, cli
 
19
  from livekit.agents.stt.stream_adapter import StreamAdapter
20
  from livekit.plugins.google import LLM as GoogleLLM
21
  from livekit.plugins.groq import STT
 
25
  from src.agent_session.constants import SYSTEM_PROMPT, GREETING_INSTRUCTIONS
26
  from src.utils import validate_env_vars
27
 
28
+ # FastAPI app setup
29
  @asynccontextmanager
30
  async def lifespan(app: FastAPI):
31
  async with aiohttp.ClientSession() as session:
 
48
  except Exception as e:
49
  print(f"DEBUG: Error publishing agent state: {e}")
50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51
  class VoiceAssistant(Agent):
52
  def __init__(self):
53
  super().__init__(instructions=SYSTEM_PROMPT)
54
 
55
+ async def entrypoint(ctx: JobContext) -> None:
56
+ """
57
+ Configure and run STT, LLM, and TTS in a LiveKit session.
58
+ """
59
+ await ctx.connect()
60
+
61
+ # Voice-activity detection + buffering for non-streaming STT
62
+ vad = VAD.load(
63
+ min_speech_duration=0.1,
64
+ min_silence_duration=0.5
65
+ )
66
+
67
+ session = AgentSession(
68
+ vad=vad,
69
+ stt=StreamAdapter(
70
+ stt=STT(
71
+ model="whisper-large-v3-turbo",
72
+ language="en",
73
+ ),
74
+ vad=vad,
75
+ ),
76
+ llm=GoogleLLM(
77
+ model="gemini-2.5-flash",
78
+ temperature=0.5,
79
+ ),
80
+ tts=TTS(
81
  voice=VoiceByName(
82
  name="Male English Actor",
83
  provider=VoiceProvider.hume,
84
  ),
85
+ instant_mode=True
86
+ ),
87
+ )
88
+
89
+ await session.start(agent=VoiceAssistant(), room=ctx.room)
90
+ await session.generate_reply(instructions=GREETING_INSTRUCTIONS)
91
+
92
+ # Custom entrypoint for FastAPI integration
93
+ async def run_agent_with_room(room_name: str, agent_token: str):
94
+ """
95
+ Custom entrypoint that connects to a specific room
96
+ """
97
+ livekit_url = os.getenv("LIVEKIT_URL")
98
+
99
+ # Create a mock JobContext for the specific room
100
+ class MockJobContext:
101
+ def __init__(self, room_name: str, agent_token: str):
102
+ self.room_name = room_name
103
+ self.agent_token = agent_token
104
+ self.room = Room()
105
+
106
+ async def connect(self):
107
+ await self.room.connect(livekit_url, self.agent_token)
108
+
109
+ ctx = MockJobContext(room_name, agent_token)
110
+
111
+ try:
112
+ await entrypoint(ctx)
113
  except Exception as e:
114
  print(f"FATAL ERROR in agent session: {e}")
115
  print(traceback.format_exc())
116
  finally:
117
+ await ctx.room.disconnect()
 
 
 
 
118
 
119
  @app.post("/join-room")
120
  async def join_room(req: JoinRoomRequest, background_tasks: BackgroundTasks):
121
  print(f"DEBUG: Received POST request to /join-room for: {req.room_name}")
122
+ background_tasks.add_task(run_agent_with_room, req.room_name, req.agent_token)
123
  return {"status": "agent_triggered"}
124
 
125
  @app.get("/")