VOICE_TO_VOICE_AI / agent.py
USER
Initial clean voice agent - no venv or git history
2651a17
#agent.py
import logging
import os
from dotenv import load_dotenv
from mem0 import Memory
from mem0.configs.base import MemoryConfig
from mem0.vector_stores.configs import VectorStoreConfig
from livekit.agents import (
Agent,
AgentServer,
AgentSession,
JobContext,
RunContext,
function_tool,
cli,
WorkerOptions,
)
from livekit.plugins import openai, silero
from livekit.plugins import noise_cancellation
load_dotenv()
logger = logging.getLogger("voice-agent")
memory = Memory(config=MemoryConfig(vector_store=VectorStoreConfig(provider='chroma', config={'path': 'data/mem0'}))) # uses local storage by default; swap for mem0 cloud later
def get_memory(user_id: str) -> str:
search_response = memory.search(query="user preferences and history", user_id=user_id, limit=10)
results = search_response.get("results", []) if isinstance(search_response, dict) else search_response
if not results:
return ""
return "\n".join([f"- {r['memory']}" for r in results if isinstance(r, dict) and "memory" in r])
def save_memory(user_id: str, content: str):
memory.add(content, user_id=user_id)
#./.venv311/bin/python agent.py console --input-device 0 --output-device 1
#python agent.py console --input-device 0 --output-device 1
@function_tool
async def remember_this(context: RunContext, fact: str):
"""Save an important fact or user preference to long-term memory."""
user_id = context.userdata.get("user_id", "default_user")
save_memory(user_id, fact)
return {"status": "saved", "fact": fact}
@function_tool
async def recall_memory(context: RunContext, query: str):
"""Search long-term memory for something the user mentioned before."""
user_id = context.userdata.get("user_id", "default_user")
search_response = memory.search(query=query, user_id=user_id, limit=5)
results = search_response.get("results", []) if isinstance(search_response, dict) else search_response
if not results:
return {"found": False, "results": []}
return {"found": True, "results": [r["memory"] for r in results if isinstance(r, dict) and "memory" in r]}
class VoiceAssistant(Agent):
def __init__(self, user_id: str, past_memory: str) -> None:
memory_block = (
f"\n\nWhat you already know about this user:\n{past_memory}"
if past_memory else ""
)
super().__init__(
instructions=f"""
You are a friendly, intelligent voice assistant.
Keep responses short and conversational — 1 to 3 sentences max.
Never use markdown, bullet points, emojis, or asterisks.
Speak naturally as if on a phone call.
your name is benita.
If the user shares something personal or useful to remember, call remember_this.
If the user references something from before, call recall_memory.
{memory_block}
""",
tools=[remember_this, recall_memory],
)
self.user_id = user_id
async def on_enter(self):
await self.session.say("Hey there! am benita How can I help you today?")
async def on_exit(self):
messages = self.session.history.messages
if messages:
summary = " | ".join([
f"{m.role}: {m.content[:80]}"
for m in messages[-6:] # last 6 turns
if hasattr(m, "content") and m.content
])
save_memory(self.user_id, f"Recent conversation: {summary}")
logger.info(f"Saved session memory for user {self.user_id}")
server = AgentServer()
@server.rtc_session(agent_name="benita")
async def entrypoint(ctx: JobContext):
await ctx.connect()
if ctx.is_fake_job():
user_id = "console"
logger.info("Console fake job detected; skipping wait_for_participant")
else:
participant = await ctx.wait_for_participant()
user_id = participant.identity or "default_user"
logger.info(f"Session started | room={ctx.room.name} | user={user_id}")
past_memory = get_memory(user_id)
session = AgentSession(
userdata={"user_id": user_id},
# VAD detects when user starts/stops speaking
vad=silero.VAD.load(),
# STT
stt=openai.STT(model="whisper-1", language='en'),
# LLM
llm=openai.LLM(model="gpt-4o-mini"),
# TTS
tts=openai.TTS(model="tts-1", voice="nova"),
turn_detection=None,
# Interruption settings
allow_interruptions=True,
min_endpointing_delay=0.4,
)
@session.on("user_speech_committed")
def on_user_spoke(msg):
logger.info(f"[{user_id}] User: {msg.content}")
@session.on("agent_speech_committed")
def on_agent_spoke(msg):
logger.info(f"[{user_id}] Agent: {msg.content}")
await session.start(
agent=VoiceAssistant(user_id=user_id, past_memory=past_memory),
room=ctx.room,
)
if __name__ == "__main__":
cli.run_app(server)