WebEssentz commited on
Commit
ee7e520
·
1 Parent(s): 7af59d7

Realtime Flow

Browse files
Files changed (1) hide show
  1. src/agent_session/main.py +49 -37
src/agent_session/main.py CHANGED
@@ -4,13 +4,14 @@ Agent Session for Avurna Flow, wrapped in a FastAPI server.
4
  """
5
  import asyncio
6
  import os
7
- import json # Import the json library
8
  from fastapi import FastAPI, BackgroundTasks
9
- from fastapi.middleware.cors import CORSMiddleware
10
  from pydantic import BaseModel
11
  import uvicorn
 
12
 
13
  from livekit.agents import Agent, AgentSession, JobContext
 
14
  from livekit.agents.stt.stream_adapter import StreamAdapter
15
  from livekit.plugins.google import LLM as GoogleLLM
16
  from livekit.plugins.groq import STT
@@ -23,13 +24,12 @@ from src.utils import validate_env_vars
23
  app = FastAPI()
24
 
25
  origins = ["*"]
26
-
27
  app.add_middleware(
28
  CORSMiddleware,
29
  allow_origins=origins,
30
  allow_credentials=True,
31
- allow_methods=["*"], # Allows all methods (GET, POST, etc.)
32
- allow_headers=["*"], # Allows all headers
33
  )
34
 
35
  class JoinRoomRequest(BaseModel):
@@ -40,9 +40,7 @@ class VoiceAssistant(Agent):
40
  def __init__(self):
41
  super().__init__(instructions=SYSTEM_PROMPT)
42
 
43
- # --- KEY CHANGE: Function to send state to the frontend ---
44
  async def send_agent_state(ctx: JobContext, state: str):
45
- """Helper to notify the frontend of the agent's current state."""
46
  try:
47
  msg = json.dumps({"type": "agent_state", "state": state})
48
  await ctx.room.local_participant.publish_data(msg)
@@ -50,41 +48,55 @@ async def send_agent_state(ctx: JobContext, state: str):
50
  except Exception as e:
51
  print(f"Error publishing agent state: {e}")
52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53
  async def run_agent_session(room_name: str, agent_token: str):
54
  livekit_url = os.getenv("LIVEKIT_URL")
55
  ctx = JobContext(room_name=room_name, livekit_url=livekit_url, token=agent_token)
56
 
57
- await ctx.connect()
58
- await send_agent_state(ctx, "listening") # Start in listening state
59
-
60
- vad = VAD.load(min_speech_duration=0.1, min_silence_duration=0.5)
61
-
62
- # --- This is a conceptual wrapper for the LLM to inject state messages ---
63
- # We will wrap the original LLM to send messages before and after it runs.
64
- original_llm = GoogleLLM(model="gemini-1.5-flash", temperature=0.5)
65
-
66
- async def llm_wrapper_fnc(history):
67
- await send_agent_state(ctx, "thinking") # State change before LLM call
68
- result = await original_llm.chat(history)
69
- await send_agent_state(ctx, "speaking") # State change after LLM, before TTS
70
- return result
71
-
72
- session = AgentSession(
73
- vad=vad,
74
- stt=StreamAdapter(stt=STT(model="whisper-large-v3-turbo", language="en"), vad=vad),
75
- llm=llm_wrapper_fnc, # Use our wrapped function
76
- tts=TTS(voice=VoiceByName(name="Tiktok Fashion Influencer", provider=VoiceProvider.hume), instant_mode=True),
77
- )
78
-
79
- print(f"Agent starting session in room: {room_name}")
80
- await session.start(agent=VoiceAssistant(), room=ctx.room)
81
-
82
- # We'll also wrap the initial greeting
83
- await send_agent_state(ctx, "speaking")
84
- await session.generate_reply(instructions=GREETING_INSTRUCTIONS)
85
- await send_agent_state(ctx, "listening") # Return to listening after greeting
86
 
87
- print(f"Agent session ended for room: {room_name}")
 
 
 
 
88
 
89
 
90
  @app.post("/join-room")
 
4
  """
5
  import asyncio
6
  import os
7
+ import json
8
  from fastapi import FastAPI, BackgroundTasks
 
9
  from pydantic import BaseModel
10
  import uvicorn
11
+ from fastapi.middleware.cors import CORSMiddleware
12
 
13
  from livekit.agents import Agent, AgentSession, JobContext
14
+ from livekit.agents.llm import LLM
15
  from livekit.agents.stt.stream_adapter import StreamAdapter
16
  from livekit.plugins.google import LLM as GoogleLLM
17
  from livekit.plugins.groq import STT
 
24
  app = FastAPI()
25
 
26
  origins = ["*"]
 
27
  app.add_middleware(
28
  CORSMiddleware,
29
  allow_origins=origins,
30
  allow_credentials=True,
31
+ allow_methods=["*"],
32
+ allow_headers=["*"],
33
  )
34
 
35
  class JoinRoomRequest(BaseModel):
 
40
  def __init__(self):
41
  super().__init__(instructions=SYSTEM_PROMPT)
42
 
 
43
  async def send_agent_state(ctx: JobContext, state: str):
 
44
  try:
45
  msg = json.dumps({"type": "agent_state", "state": state})
46
  await ctx.room.local_participant.publish_data(msg)
 
48
  except Exception as e:
49
  print(f"Error publishing agent state: {e}")
50
 
51
+ # --- KEY FIX: Create a proper wrapper CLASS for the LLM ---
52
+ # This class conforms to the interface that AgentSession expects.
53
+ class LLMStateWrapper(LLM):
54
+ def __init__(self, llm: LLM, ctx: JobContext):
55
+ super().__init__()
56
+ self._llm = llm
57
+ self._ctx = ctx
58
+
59
+ async def chat(self, history):
60
+ await send_agent_state(self._ctx, "thinking")
61
+ res_stream = self._llm.chat(history)
62
+ await send_agent_state(self._ctx, "speaking")
63
+ return res_stream
64
+
65
+ # --- Main agent session logic ---
66
  async def run_agent_session(room_name: str, agent_token: str):
67
  livekit_url = os.getenv("LIVEKIT_URL")
68
  ctx = JobContext(room_name=room_name, livekit_url=livekit_url, token=agent_token)
69
 
70
+ try:
71
+ await ctx.connect()
72
+ await send_agent_state(ctx, "listening")
73
+
74
+ # --- KEY FIX: Instantiate the wrapper class correctly ---
75
+ llm_state_wrapper = LLMStateWrapper(
76
+ llm=GoogleLLM(model="gemini-1.5-flash", temperature=0.5),
77
+ ctx=ctx
78
+ )
79
+
80
+ vad = VAD.load(min_speech_duration=0.1, min_silence_duration=0.5)
81
+ session = AgentSession(
82
+ vad=vad,
83
+ stt=StreamAdapter(stt=STT(model="whisper-large-v3-turbo", language="en"), vad=vad),
84
+ llm=llm_state_wrapper, # Use the class instance here
85
+ tts=TTS(voice=VoiceByName(name="Tiktok Fashion Influencer", provider=VoiceProvider.hume), instant_mode=True),
86
+ )
87
+
88
+ print(f"Agent starting session in room: {room_name}")
89
+ await session.start(agent=VoiceAssistant(), room=ctx.room)
90
+
91
+ await send_agent_state(ctx, "speaking")
92
+ await session.generate_reply(instructions=GREETING_INSTRUCTIONS)
93
+ await send_agent_state(ctx, "listening")
 
 
 
 
 
94
 
95
+ except Exception as e:
96
+ print(f"An error occurred during the agent session: {e}")
97
+ finally:
98
+ print(f"Agent session ended for room: {room_name}. Cleaning up.")
99
+ await ctx.disconnect()
100
 
101
 
102
  @app.post("/join-room")