Spaces:
Runtime error
Runtime error
| #agent.py | |
| import logging | |
| import os | |
| from dotenv import load_dotenv | |
| from mem0 import Memory | |
| from mem0.configs.base import MemoryConfig | |
| from mem0.vector_stores.configs import VectorStoreConfig | |
| from livekit.agents import ( | |
| Agent, | |
| AgentServer, | |
| AgentSession, | |
| JobContext, | |
| RunContext, | |
| function_tool, | |
| cli, | |
| WorkerOptions, | |
| ) | |
| from livekit.plugins import openai, silero | |
| from livekit.plugins import noise_cancellation | |
| load_dotenv() | |
| logger = logging.getLogger("voice-agent") | |
| memory = Memory(config=MemoryConfig(vector_store=VectorStoreConfig(provider='chroma', config={'path': 'data/mem0'}))) # uses local storage by default; swap for mem0 cloud later | |
| def get_memory(user_id: str) -> str: | |
| search_response = memory.search(query="user preferences and history", user_id=user_id, limit=10) | |
| results = search_response.get("results", []) if isinstance(search_response, dict) else search_response | |
| if not results: | |
| return "" | |
| return "\n".join([f"- {r['memory']}" for r in results if isinstance(r, dict) and "memory" in r]) | |
| def save_memory(user_id: str, content: str): | |
| memory.add(content, user_id=user_id) | |
| #./.venv311/bin/python agent.py console --input-device 0 --output-device 1 | |
| #python agent.py console --input-device 0 --output-device 1 | |
| async def remember_this(context: RunContext, fact: str): | |
| """Save an important fact or user preference to long-term memory.""" | |
| user_id = context.userdata.get("user_id", "default_user") | |
| save_memory(user_id, fact) | |
| return {"status": "saved", "fact": fact} | |
| async def recall_memory(context: RunContext, query: str): | |
| """Search long-term memory for something the user mentioned before.""" | |
| user_id = context.userdata.get("user_id", "default_user") | |
| search_response = memory.search(query=query, user_id=user_id, limit=5) | |
| results = search_response.get("results", []) if isinstance(search_response, dict) else search_response | |
| if not results: | |
| return {"found": False, "results": []} | |
| return {"found": True, "results": [r["memory"] for r in results if isinstance(r, dict) and "memory" in r]} | |
| class VoiceAssistant(Agent): | |
| def __init__(self, user_id: str, past_memory: str) -> None: | |
| memory_block = ( | |
| f"\n\nWhat you already know about this user:\n{past_memory}" | |
| if past_memory else "" | |
| ) | |
| super().__init__( | |
| instructions=f""" | |
| You are a friendly, intelligent voice assistant. | |
| Keep responses short and conversational — 1 to 3 sentences max. | |
| Never use markdown, bullet points, emojis, or asterisks. | |
| Speak naturally as if on a phone call. | |
| your name is benita. | |
| If the user shares something personal or useful to remember, call remember_this. | |
| If the user references something from before, call recall_memory. | |
| {memory_block} | |
| """, | |
| tools=[remember_this, recall_memory], | |
| ) | |
| self.user_id = user_id | |
| async def on_enter(self): | |
| await self.session.say("Hey there! am benita How can I help you today?") | |
| async def on_exit(self): | |
| messages = self.session.history.messages | |
| if messages: | |
| summary = " | ".join([ | |
| f"{m.role}: {m.content[:80]}" | |
| for m in messages[-6:] # last 6 turns | |
| if hasattr(m, "content") and m.content | |
| ]) | |
| save_memory(self.user_id, f"Recent conversation: {summary}") | |
| logger.info(f"Saved session memory for user {self.user_id}") | |
| server = AgentServer() | |
| async def entrypoint(ctx: JobContext): | |
| await ctx.connect() | |
| if ctx.is_fake_job(): | |
| user_id = "console" | |
| logger.info("Console fake job detected; skipping wait_for_participant") | |
| else: | |
| participant = await ctx.wait_for_participant() | |
| user_id = participant.identity or "default_user" | |
| logger.info(f"Session started | room={ctx.room.name} | user={user_id}") | |
| past_memory = get_memory(user_id) | |
| session = AgentSession( | |
| userdata={"user_id": user_id}, | |
| # VAD detects when user starts/stops speaking | |
| vad=silero.VAD.load(), | |
| # STT | |
| stt=openai.STT(model="whisper-1", language='en'), | |
| # LLM | |
| llm=openai.LLM(model="gpt-4o-mini"), | |
| # TTS | |
| tts=openai.TTS(model="tts-1", voice="nova"), | |
| turn_detection=None, | |
| # Interruption settings | |
| allow_interruptions=True, | |
| min_endpointing_delay=0.4, | |
| ) | |
| def on_user_spoke(msg): | |
| logger.info(f"[{user_id}] User: {msg.content}") | |
| def on_agent_spoke(msg): | |
| logger.info(f"[{user_id}] Agent: {msg.content}") | |
| await session.start( | |
| agent=VoiceAssistant(user_id=user_id, past_memory=past_memory), | |
| room=ctx.room, | |
| ) | |
| if __name__ == "__main__": | |
| cli.run_app(server) |