Spaces:
Runtime error
Runtime error
File size: 5,116 Bytes
2651a17 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | #agent.py
import logging
import os
from dotenv import load_dotenv
from mem0 import Memory
from mem0.configs.base import MemoryConfig
from mem0.vector_stores.configs import VectorStoreConfig
from livekit.agents import (
Agent,
AgentServer,
AgentSession,
JobContext,
RunContext,
function_tool,
cli,
WorkerOptions,
)
from livekit.plugins import openai, silero
from livekit.plugins import noise_cancellation
load_dotenv()
logger = logging.getLogger("voice-agent")
memory = Memory(config=MemoryConfig(vector_store=VectorStoreConfig(provider='chroma', config={'path': 'data/mem0'}))) # uses local storage by default; swap for mem0 cloud later
def get_memory(user_id: str) -> str:
search_response = memory.search(query="user preferences and history", user_id=user_id, limit=10)
results = search_response.get("results", []) if isinstance(search_response, dict) else search_response
if not results:
return ""
return "\n".join([f"- {r['memory']}" for r in results if isinstance(r, dict) and "memory" in r])
def save_memory(user_id: str, content: str):
memory.add(content, user_id=user_id)
#./.venv311/bin/python agent.py console --input-device 0 --output-device 1
#python agent.py console --input-device 0 --output-device 1
@function_tool
async def remember_this(context: RunContext, fact: str):
"""Save an important fact or user preference to long-term memory."""
user_id = context.userdata.get("user_id", "default_user")
save_memory(user_id, fact)
return {"status": "saved", "fact": fact}
@function_tool
async def recall_memory(context: RunContext, query: str):
"""Search long-term memory for something the user mentioned before."""
user_id = context.userdata.get("user_id", "default_user")
search_response = memory.search(query=query, user_id=user_id, limit=5)
results = search_response.get("results", []) if isinstance(search_response, dict) else search_response
if not results:
return {"found": False, "results": []}
return {"found": True, "results": [r["memory"] for r in results if isinstance(r, dict) and "memory" in r]}
class VoiceAssistant(Agent):
def __init__(self, user_id: str, past_memory: str) -> None:
memory_block = (
f"\n\nWhat you already know about this user:\n{past_memory}"
if past_memory else ""
)
super().__init__(
instructions=f"""
You are a friendly, intelligent voice assistant.
Keep responses short and conversational — 1 to 3 sentences max.
Never use markdown, bullet points, emojis, or asterisks.
Speak naturally as if on a phone call.
your name is benita.
If the user shares something personal or useful to remember, call remember_this.
If the user references something from before, call recall_memory.
{memory_block}
""",
tools=[remember_this, recall_memory],
)
self.user_id = user_id
async def on_enter(self):
await self.session.say("Hey there! am benita How can I help you today?")
async def on_exit(self):
messages = self.session.history.messages
if messages:
summary = " | ".join([
f"{m.role}: {m.content[:80]}"
for m in messages[-6:] # last 6 turns
if hasattr(m, "content") and m.content
])
save_memory(self.user_id, f"Recent conversation: {summary}")
logger.info(f"Saved session memory for user {self.user_id}")
server = AgentServer()
@server.rtc_session(agent_name="benita")
async def entrypoint(ctx: JobContext):
await ctx.connect()
if ctx.is_fake_job():
user_id = "console"
logger.info("Console fake job detected; skipping wait_for_participant")
else:
participant = await ctx.wait_for_participant()
user_id = participant.identity or "default_user"
logger.info(f"Session started | room={ctx.room.name} | user={user_id}")
past_memory = get_memory(user_id)
session = AgentSession(
userdata={"user_id": user_id},
# VAD detects when user starts/stops speaking
vad=silero.VAD.load(),
# STT
stt=openai.STT(model="whisper-1", language='en'),
# LLM
llm=openai.LLM(model="gpt-4o-mini"),
# TTS
tts=openai.TTS(model="tts-1", voice="nova"),
turn_detection=None,
# Interruption settings
allow_interruptions=True,
min_endpointing_delay=0.4,
)
@session.on("user_speech_committed")
def on_user_spoke(msg):
logger.info(f"[{user_id}] User: {msg.content}")
@session.on("agent_speech_committed")
def on_agent_spoke(msg):
logger.info(f"[{user_id}] Agent: {msg.content}")
await session.start(
agent=VoiceAssistant(user_id=user_id, past_memory=past_memory),
room=ctx.room,
)
if __name__ == "__main__":
cli.run_app(server) |