#agent.py import logging import os from dotenv import load_dotenv from mem0 import Memory from mem0.configs.base import MemoryConfig from mem0.vector_stores.configs import VectorStoreConfig from livekit.agents import ( Agent, AgentServer, AgentSession, JobContext, RunContext, function_tool, cli, WorkerOptions, ) from livekit.plugins import openai, silero from livekit.plugins import noise_cancellation load_dotenv() logger = logging.getLogger("voice-agent") memory = Memory(config=MemoryConfig(vector_store=VectorStoreConfig(provider='chroma', config={'path': 'data/mem0'}))) # uses local storage by default; swap for mem0 cloud later def get_memory(user_id: str) -> str: search_response = memory.search(query="user preferences and history", user_id=user_id, limit=10) results = search_response.get("results", []) if isinstance(search_response, dict) else search_response if not results: return "" return "\n".join([f"- {r['memory']}" for r in results if isinstance(r, dict) and "memory" in r]) def save_memory(user_id: str, content: str): memory.add(content, user_id=user_id) #./.venv311/bin/python agent.py console --input-device 0 --output-device 1 #python agent.py console --input-device 0 --output-device 1 @function_tool async def remember_this(context: RunContext, fact: str): """Save an important fact or user preference to long-term memory.""" user_id = context.userdata.get("user_id", "default_user") save_memory(user_id, fact) return {"status": "saved", "fact": fact} @function_tool async def recall_memory(context: RunContext, query: str): """Search long-term memory for something the user mentioned before.""" user_id = context.userdata.get("user_id", "default_user") search_response = memory.search(query=query, user_id=user_id, limit=5) results = search_response.get("results", []) if isinstance(search_response, dict) else search_response if not results: return {"found": False, "results": []} return {"found": True, "results": [r["memory"] for r in results if isinstance(r, dict) and "memory" in r]} class VoiceAssistant(Agent): def __init__(self, user_id: str, past_memory: str) -> None: memory_block = ( f"\n\nWhat you already know about this user:\n{past_memory}" if past_memory else "" ) super().__init__( instructions=f""" You are a friendly, intelligent voice assistant. Keep responses short and conversational — 1 to 3 sentences max. Never use markdown, bullet points, emojis, or asterisks. Speak naturally as if on a phone call. your name is benita. If the user shares something personal or useful to remember, call remember_this. If the user references something from before, call recall_memory. {memory_block} """, tools=[remember_this, recall_memory], ) self.user_id = user_id async def on_enter(self): await self.session.say("Hey there! am benita How can I help you today?") async def on_exit(self): messages = self.session.history.messages if messages: summary = " | ".join([ f"{m.role}: {m.content[:80]}" for m in messages[-6:] # last 6 turns if hasattr(m, "content") and m.content ]) save_memory(self.user_id, f"Recent conversation: {summary}") logger.info(f"Saved session memory for user {self.user_id}") server = AgentServer() @server.rtc_session(agent_name="benita") async def entrypoint(ctx: JobContext): await ctx.connect() if ctx.is_fake_job(): user_id = "console" logger.info("Console fake job detected; skipping wait_for_participant") else: participant = await ctx.wait_for_participant() user_id = participant.identity or "default_user" logger.info(f"Session started | room={ctx.room.name} | user={user_id}") past_memory = get_memory(user_id) session = AgentSession( userdata={"user_id": user_id}, # VAD detects when user starts/stops speaking vad=silero.VAD.load(), # STT stt=openai.STT(model="whisper-1", language='en'), # LLM llm=openai.LLM(model="gpt-4o-mini"), # TTS tts=openai.TTS(model="tts-1", voice="nova"), turn_detection=None, # Interruption settings allow_interruptions=True, min_endpointing_delay=0.4, ) @session.on("user_speech_committed") def on_user_spoke(msg): logger.info(f"[{user_id}] User: {msg.content}") @session.on("agent_speech_committed") def on_agent_spoke(msg): logger.info(f"[{user_id}] Agent: {msg.content}") await session.start( agent=VoiceAssistant(user_id=user_id, past_memory=past_memory), room=ctx.room, ) if __name__ == "__main__": cli.run_app(server)