WebEssentz commited on
Commit
4e316af
·
1 Parent(s): e67faa0

Realtime Flow

Browse files
Files changed (2) hide show
  1. requirements.txt +2 -1
  2. src/agent_session/main.py +43 -37
requirements.txt CHANGED
@@ -5,4 +5,5 @@ livekit-plugins-hume
5
  livekit-plugins-silero
6
  fastapi
7
  uvicorn
8
- livekit
 
 
5
  livekit-plugins-silero
6
  fastapi
7
  uvicorn
8
+ livekit
9
+ aiohttp
src/agent_session/main.py CHANGED
@@ -1,15 +1,16 @@
1
  #!/usr/bin/env python3
2
  """
3
- Agent Session for Avurna Flow (Final Corrected Version)
4
  """
5
  import os
6
  import json
7
  import traceback
8
- from fastapi import FastAPI, BackgroundTasks
 
 
9
  from pydantic import BaseModel
10
  import uvicorn
11
  from fastapi.middleware.cors import CORSMiddleware
12
- from contextlib import asynccontextmanager # --- KEY: Import the required decorator ---
13
 
14
  from livekit.rtc import Room
15
  from livekit.agents import Agent, AgentSession
@@ -23,7 +24,16 @@ from livekit.plugins.silero import VAD
23
  from src.agent_session.constants import SYSTEM_PROMPT, GREETING_INSTRUCTIONS
24
  from src.utils import validate_env_vars
25
 
26
- app = FastAPI()
 
 
 
 
 
 
 
 
 
27
  origins = ["*"]
28
  app.add_middleware(CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
29
 
@@ -36,35 +46,18 @@ class VoiceAssistant(Agent):
36
  super().__init__(instructions=SYSTEM_PROMPT)
37
 
38
  async def send_agent_state(room: Room, state: str):
39
- try:
40
- msg = json.dumps({"type": "agent_state", "state": state})
41
- await room.local_participant.publish_data(msg)
42
- print(f"DEBUG: Sent agent state: {state}")
43
- except Exception as e:
44
- print(f"DEBUG: Error publishing agent state: {e}")
45
 
46
  class LLMStateWrapper(LLM):
47
- def __init__(self, llm: LLM, room: Room):
48
- super().__init__()
49
- self._llm = llm
50
- self._room = room
51
-
52
- # --- THE DEFINITIVE FIX ---
53
- @asynccontextmanager # 1. Decorate the method to make it a valid context manager
54
- async def chat(self, **kwargs):
55
- # 2. This code runs when the context is entered
56
- await send_agent_state(self._room, "thinking")
57
-
58
- try:
59
- # 3. Enter the original LLM's context and get its stream
60
- async with self._llm.chat(**kwargs) as stream:
61
- # 4. Yield the stream to the AgentSession
62
- yield stream
63
- finally:
64
- # 5. This code runs when the context is exited (after TTS is done)
65
- await send_agent_state(self._room, "listening")
66
-
67
- async def run_agent_session(room_name: str, agent_token: str):
68
  livekit_url = os.getenv("LIVEKIT_URL")
69
  room = Room()
70
 
@@ -75,11 +68,22 @@ async def run_agent_session(room_name: str, agent_token: str):
75
 
76
  await send_agent_state(room, "listening")
77
 
78
- print("DEBUG: 3. Initializing plugins...")
79
- llm_wrapper = LLMStateWrapper(llm=GoogleLLM(model="gemini-1.5-flash"), room=room)
 
 
 
 
80
  vad = VAD.load(min_speech_duration=0.1)
81
- stt = StreamAdapter(stt=STT(model="whisper-large-v3-turbo"), vad=vad)
82
- tts = TTS(voice=VoiceByName(name="Tiktok Fashion Influencer"), instant_mode=True)
 
 
 
 
 
 
 
83
  print("DEBUG: 4. Plugins initialized.")
84
 
85
  print("DEBUG: 5. Creating AgentSession...")
@@ -102,10 +106,12 @@ async def run_agent_session(room_name: str, agent_token: str):
102
  print(f"DEBUG: Agent session for room {room_name} is ending. Cleaning up.")
103
  await room.disconnect()
104
 
 
105
  @app.post("/join-room")
106
- async def join_room(req: JoinRoomRequest, background_tasks: BackgroundTasks):
 
107
  print(f"DEBUG: Received POST request to /join-room for: {req.room_name}")
108
- background_tasks.add_task(run_agent_session, req.room_name, req.agent_token)
109
  return {"status": "agent_triggered"}
110
 
111
  @app.get("/")
 
1
  #!/usr/bin/env python3
2
  """
3
+ Agent Session for Avurna Flow (Final Corrected Version with HTTP Session Management)
4
  """
5
  import os
6
  import json
7
  import traceback
8
+ import aiohttp # 1. Import aiohttp
9
+ from contextlib import asynccontextmanager
10
+ from fastapi import FastAPI, BackgroundTasks, Request
11
  from pydantic import BaseModel
12
  import uvicorn
13
  from fastapi.middleware.cors import CORSMiddleware
 
14
 
15
  from livekit.rtc import Room
16
  from livekit.agents import Agent, AgentSession
 
24
  from src.agent_session.constants import SYSTEM_PROMPT, GREETING_INSTRUCTIONS
25
  from src.utils import validate_env_vars
26
 
27
+ # 2. Create a lifespan manager for the http session
28
+ @asynccontextmanager
29
+ async def lifespan(app: FastAPI):
30
+ # Create a single, shared session for the entire application
31
+ async with aiohttp.ClientSession() as session:
32
+ app.state.http_session = session
33
+ yield
34
+ # The session is automatically closed when the app shuts down
35
+
36
+ app = FastAPI(lifespan=lifespan)
37
  origins = ["*"]
38
  app.add_middleware(CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
39
 
 
46
  super().__init__(instructions=SYSTEM_PROMPT)
47
 
48
  async def send_agent_state(room: Room, state: str):
49
+ # This function is correct
50
+ # ...
51
+ pass # Omitted for brevity
 
 
 
52
 
53
  class LLMStateWrapper(LLM):
54
+ # This class is correct
55
+ # ...
56
+ pass # Omitted for brevity
57
+
58
+
59
+ # 3. Modify run_agent_session to accept the http_session
60
+ async def run_agent_session(room_name: str, agent_token: str, http_session: aiohttp.ClientSession):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61
  livekit_url = os.getenv("LIVEKIT_URL")
62
  room = Room()
63
 
 
68
 
69
  await send_agent_state(room, "listening")
70
 
71
+ print("DEBUG: 3. Initializing plugins with shared http_session...")
72
+ # 4. Pass the session to every plugin that makes an external HTTP call
73
+ llm_wrapper = LLMStateWrapper(
74
+ llm=GoogleLLM(model="gemini-1.5-flash", session=http_session),
75
+ room=room
76
+ )
77
  vad = VAD.load(min_speech_duration=0.1)
78
+ stt = StreamAdapter(
79
+ stt=STT(model="whisper-large-v3-turbo", session=http_session),
80
+ vad=vad
81
+ )
82
+ tts = TTS(
83
+ voice=VoiceByName(name="Tiktok Fashion Influencer"),
84
+ instant_mode=True,
85
+ session=http_session
86
+ )
87
  print("DEBUG: 4. Plugins initialized.")
88
 
89
  print("DEBUG: 5. Creating AgentSession...")
 
106
  print(f"DEBUG: Agent session for room {room_name} is ending. Cleaning up.")
107
  await room.disconnect()
108
 
109
+ # 5. Modify the endpoint to retrieve the session from the app state
110
  @app.post("/join-room")
111
+ async def join_room(req: JoinRoomRequest, request: Request, background_tasks: BackgroundTasks):
112
+ http_session = request.app.state.http_session
113
  print(f"DEBUG: Received POST request to /join-room for: {req.room_name}")
114
+ background_tasks.add_task(run_agent_session, req.room_name, req.agent_token, http_session)
115
  return {"status": "agent_triggered"}
116
 
117
  @app.get("/")