WebEssentz commited on
Commit
dbcd819
·
1 Parent(s): 44afed5

Realtime Flow

Browse files
Files changed (1) hide show
  1. src/agent_session/main.py +34 -46
src/agent_session/main.py CHANGED
@@ -1,17 +1,18 @@
1
  #!/usr/bin/env python3
2
  """
3
- Agent Session for Avurna Flow, wrapped in a FastAPI server. (DEBUGGING ENABLED)
4
  """
5
- import asyncio
6
  import os
7
  import json
8
- import traceback # Import traceback to print full errors
9
  from fastapi import FastAPI, BackgroundTasks
10
  from pydantic import BaseModel
11
  import uvicorn
12
  from fastapi.middleware.cors import CORSMiddleware
13
 
14
- from livekit.agents import Agent, AgentSession, JobContext
 
 
15
  from livekit.agents.llm import LLM
16
  from livekit.agents.stt.stream_adapter import StreamAdapter
17
  from livekit.plugins.google import LLM as GoogleLLM
@@ -22,16 +23,9 @@ from livekit.plugins.silero import VAD
22
  from src.agent_session.constants import SYSTEM_PROMPT, GREETING_INSTRUCTIONS
23
  from src.utils import validate_env_vars
24
 
25
- # --- FastAPI and CORS setup (unchanged) ---
26
  app = FastAPI()
27
  origins = ["*"]
28
- app.add_middleware(
29
- CORSMiddleware,
30
- allow_origins=origins,
31
- allow_credentials=True,
32
- allow_methods=["*"],
33
- allow_headers=["*"],
34
- )
35
 
36
  class JoinRoomRequest(BaseModel):
37
  room_name: str
@@ -39,50 +33,45 @@ class JoinRoomRequest(BaseModel):
39
 
40
  class VoiceAssistant(Agent):
41
  def __init__(self):
42
- super().__init__(instructions=SYSTEM_PROMPT)
43
 
44
- async def send_agent_state(ctx: JobContext, state: str):
45
  try:
46
  msg = json.dumps({"type": "agent_state", "state": state})
47
- await ctx.room.local_participant.publish_data(msg)
48
  print(f"DEBUG: Sent agent state: {state}")
49
  except Exception as e:
50
  print(f"DEBUG: Error publishing agent state: {e}")
51
 
52
- # --- KEY FIX 2: Correctly yield from the wrapped LLM chat method ---
53
  class LLMStateWrapper(LLM):
54
- def __init__(self, llm: LLM, ctx: JobContext):
55
  super().__init__()
56
  self._llm = llm
57
- self._ctx = ctx
58
 
59
  async def chat(self, history):
60
- await send_agent_state(self._ctx, "thinking")
61
- # The `yield from` is crucial for streaming responses.
62
  async for chunk in self._llm.chat(history):
63
  yield chunk
64
- await send_agent_state(self._ctx, "listening") # Change state back to listening after speaking is done
65
-
66
 
67
  async def run_agent_session(room_name: str, agent_token: str):
68
- # --- KEY FIX 1: Add aggressive "breadcrumb" logging ---
69
- print(f"DEBUG: Starting run_agent_session for room: {room_name}")
70
  livekit_url = os.getenv("LIVEKIT_URL")
71
-
72
- # --- THIS IS THE ONLY LINE THAT CHANGES ---
73
- # The 'room_name' argument is removed as it's derived from the token.
74
- ctx = JobContext(token=agent_token)
75
-
76
  try:
77
- print("DEBUG: 1. Connecting to LiveKit context...")
78
- await ctx.connect()
79
- print("DEBUG: 2. Context connected. Sending initial 'listening' state.")
80
- await send_agent_state(ctx, "listening")
 
 
81
 
82
- print("DEBUG: 3. Initializing plugins (VAD, STT, LLM, TTS)...")
83
- llm_state_wrapper = LLMStateWrapper(
84
  llm=GoogleLLM(model="gemini-1.5-flash", temperature=0.5),
85
- ctx=ctx
86
  )
87
  vad = VAD.load(min_speech_duration=0.1, min_silence_duration=0.5)
88
  stt = StreamAdapter(stt=STT(model="whisper-large-v3-turbo", language="en"), vad=vad)
@@ -90,36 +79,35 @@ async def run_agent_session(room_name: str, agent_token: str):
90
  print("DEBUG: 4. Plugins initialized.")
91
 
92
  print("DEBUG: 5. Creating AgentSession...")
 
93
  session = AgentSession(
 
94
  vad=vad,
95
  stt=stt,
96
- llm=llm_state_wrapper,
97
  tts=tts,
98
  )
99
  print("DEBUG: 6. AgentSession created. Starting session now...")
100
- await session.start(agent=VoiceAssistant(), room=ctx.room)
101
 
102
  print("DEBUG: 7. Session started. Generating initial greeting...")
103
- await send_agent_state(ctx, "speaking")
104
  await session.generate_reply(instructions=GREETING_INSTRUCTIONS)
105
- # Note: The state is now set back to 'listening' inside the LLM wrapper
106
 
107
- print("DEBUG: 8. Initial greeting complete. Agent is now fully operational.")
108
 
109
  except Exception as e:
110
- # This will now print the FULL error to your Hugging Face logs
111
  print(f"FATAL ERROR in agent session: {e}")
112
  print(traceback.format_exc())
113
  finally:
114
  print(f"DEBUG: Agent session for room {room_name} is ending. Cleaning up.")
115
- await ctx.disconnect()
116
-
117
 
118
  @app.post("/join-room")
119
  async def join_room(req: JoinRoomRequest, background_tasks: BackgroundTasks):
120
- print(f"DEBUG: Received request for agent to join room: {req.room_name}")
121
  background_tasks.add_task(run_agent_session, req.room_name, req.agent_token)
122
- return {"status": "agent_joining"}
123
 
124
  @app.get("/")
125
  async def root():
 
1
  #!/usr/bin/env python3
2
  """
3
+ Agent Session for Avurna Flow, wrapped in a FastAPI server. (Corrected Connection Logic)
4
  """
 
5
  import os
6
  import json
7
+ import traceback
8
  from fastapi import FastAPI, BackgroundTasks
9
  from pydantic import BaseModel
10
  import uvicorn
11
  from fastapi.middleware.cors import CORSMiddleware
12
 
13
+ # --- KEY CHANGE: Import the correct connection tools ---
14
+ from livekit.rtc import Room, aio
15
+ from livekit.agents import Agent, AgentSession
16
  from livekit.agents.llm import LLM
17
  from livekit.agents.stt.stream_adapter import StreamAdapter
18
  from livekit.plugins.google import LLM as GoogleLLM
 
23
  from src.agent_session.constants import SYSTEM_PROMPT, GREETING_INSTRUCTIONS
24
  from src.utils import validate_env_vars
25
 
 
26
  app = FastAPI()
27
  origins = ["*"]
28
+ app.add_middleware(CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
 
 
 
 
 
 
29
 
30
  class JoinRoomRequest(BaseModel):
31
  room_name: str
 
33
 
34
  class VoiceAssistant(Agent):
35
  def __init__(self):
36
+ super().__init__()
37
 
38
+ async def send_agent_state(room: Room, state: str):
39
  try:
40
  msg = json.dumps({"type": "agent_state", "state": state})
41
+ await room.local_participant.publish_data(msg)
42
  print(f"DEBUG: Sent agent state: {state}")
43
  except Exception as e:
44
  print(f"DEBUG: Error publishing agent state: {e}")
45
 
 
46
  class LLMStateWrapper(LLM):
47
+ def __init__(self, llm: LLM, room: Room):
48
  super().__init__()
49
  self._llm = llm
50
+ self._room = room
51
 
52
  async def chat(self, history):
53
+ await send_agent_state(self._room, "thinking")
 
54
  async for chunk in self._llm.chat(history):
55
  yield chunk
56
+ # Speaking is finished, the user can talk again
57
+ await send_agent_state(self._room, "listening")
58
 
59
  async def run_agent_session(room_name: str, agent_token: str):
 
 
60
  livekit_url = os.getenv("LIVEKIT_URL")
61
+ room = Room() # Create a Room object
62
+
 
 
 
63
  try:
64
+ print(f"DEBUG: 1. Connecting to LiveKit room '{room_name}' at {livekit_url}...")
65
+ # --- KEY FIX: Use aio.connect to join the room ---
66
+ await aio.connect(livekit_url, agent_token, room=room)
67
+ print("DEBUG: 2. Connection successful. Agent is in the room.")
68
+
69
+ await send_agent_state(room, "listening")
70
 
71
+ print("DEBUG: 3. Initializing plugins...")
72
+ llm_wrapper = LLMStateWrapper(
73
  llm=GoogleLLM(model="gemini-1.5-flash", temperature=0.5),
74
+ room=room
75
  )
76
  vad = VAD.load(min_speech_duration=0.1, min_silence_duration=0.5)
77
  stt = StreamAdapter(stt=STT(model="whisper-large-v3-turbo", language="en"), vad=vad)
 
79
  print("DEBUG: 4. Plugins initialized.")
80
 
81
  print("DEBUG: 5. Creating AgentSession...")
82
+ # --- KEY FIX: Pass the 'room' object directly to AgentSession ---
83
  session = AgentSession(
84
+ room=room, # Pass the connected room
85
  vad=vad,
86
  stt=stt,
87
+ llm=llm_wrapper,
88
  tts=tts,
89
  )
90
  print("DEBUG: 6. AgentSession created. Starting session now...")
91
+ await session.start(agent=VoiceAssistant())
92
 
93
  print("DEBUG: 7. Session started. Generating initial greeting...")
94
+ await send_agent_state(room, "speaking")
95
  await session.generate_reply(instructions=GREETING_INSTRUCTIONS)
 
96
 
97
+ print("DEBUG: 8. Initial greeting complete. Agent is now fully operational and listening.")
98
 
99
  except Exception as e:
 
100
  print(f"FATAL ERROR in agent session: {e}")
101
  print(traceback.format_exc())
102
  finally:
103
  print(f"DEBUG: Agent session for room {room_name} is ending. Cleaning up.")
104
+ await room.disconnect()
 
105
 
106
  @app.post("/join-room")
107
  async def join_room(req: JoinRoomRequest, background_tasks: BackgroundTasks):
108
+ print(f"DEBUG: Received POST request to /join-room for: {req.room_name}")
109
  background_tasks.add_task(run_agent_session, req.room_name, req.agent_token)
110
+ return {"status": "agent_triggered"}
111
 
112
  @app.get("/")
113
  async def root():