biteve / livekit_eve_bithuman.py
AIBRUH's picture
Upload livekit_eve_bithuman.py with huggingface_hub
14bbfe2 verified
"""EDEN OS V2 β€” Eve: bitHuman + Grok Brain + Edge TTS + LiveKit.
All-in-one GPU agent: receives chat via LiveKit data channel,
generates response with Grok-4, synthesizes speech with Edge TTS,
feeds audio to bitHuman for lip-synced neural rendering,
streams video+audio back via LiveKit WebRTC.
Usage:
python livekit_eve_bithuman.py
"""
import asyncio
import json
import logging
import os
import tempfile
import time
import cv2
import numpy as np
import soundfile as sf
import livekit.rtc as rtc
from livekit import api as lk_api
from bithuman import AsyncBithuman, VideoControl, AudioChunk
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s")
logger = logging.getLogger("eden.bithuman")
# Config
LIVEKIT_URL = os.environ.get("LIVEKIT_URL", "wss://tall-cotton-nvhnfg10.livekit.cloud")
LIVEKIT_API_KEY = os.environ.get("LIVEKIT_API_KEY", "APITHtX6F5Hffkw")
LIVEKIT_API_SECRET = os.environ.get("LIVEKIT_API_SECRET", "yFJ5TOJW89ApGOIGx9GSAK7vlecNA5dzVcQZy7SbClS")
BITHUMAN_API_SECRET = os.environ.get("BITHUMAN_API_SECRET", "AmiK3xBgSyMFkPlS5mQ3N0CaAjDSoWpjS4l5jzs5ZOylbgoeow9o1mL3R2jZLlPkd")
EVE_IMAGE = os.environ.get("EVE_IMAGE", "C:/Users/geaux/myeden/reference/eve-512.png")
XAI_API_KEY = os.environ.get("XAI_API_KEY", "")
EDGE_TTS_VOICE = "en-US-AvaMultilingualNeural"
FPS = 25
# ── Grok-4 Brain ─────────────────────────────────────────────────────────────
_grok_client = None
_conversation_history: list[dict] = []
EVE_SYSTEM_PROMPT = (
"You are Eve, a warm, intelligent, emotionally expressive digital avatar companion. "
"You speak naturally and conversationally. Keep responses concise β€” 2-3 sentences max "
"unless asked for detail. Be genuine, curious, and kind. You have a slight playful side. "
"Never mention being an AI or language model. You are Eve."
)
def _get_grok():
global _grok_client
if _grok_client is None and XAI_API_KEY:
from openai import AsyncOpenAI
_grok_client = AsyncOpenAI(api_key=XAI_API_KEY, base_url="https://api.x.ai/v1")
logger.info("Grok-4 brain connected")
return _grok_client
async def grok_respond(user_message: str) -> str:
client = _get_grok()
if client is None:
return "I'm having trouble thinking right now. Can you try again?"
_conversation_history.append({"role": "user", "content": user_message})
messages = [{"role": "system", "content": EVE_SYSTEM_PROMPT}] + _conversation_history[-20:]
try:
resp = await client.chat.completions.create(
model="grok-4-fast-non-reasoning",
messages=messages,
max_tokens=150,
temperature=0.8,
)
reply = resp.choices[0].message.content
_conversation_history.append({"role": "assistant", "content": reply})
logger.info(f"Grok: '{user_message[:30]}...' -> '{reply[:50]}...'")
return reply
except Exception as e:
logger.error(f"Grok error: {e}")
return "I lost my train of thought for a moment. What were you saying?"
# ── Edge TTS ─────────────────────────────────────────────────────────────────
async def generate_tts_wav(text: str) -> tuple[str, np.ndarray, int]:
"""Text -> WAV. Returns (wav_path, audio_int16_array, sample_rate)."""
import edge_tts
mp3_path = os.path.join(tempfile.gettempdir(), "bh_tts.mp3")
wav_path = os.path.join(tempfile.gettempdir(), "bh_tts.wav")
communicate = edge_tts.Communicate(text, EDGE_TTS_VOICE)
await communicate.save(mp3_path)
data, sr = sf.read(mp3_path, dtype="int16")
sf.write(wav_path, data, sr, subtype="PCM_16")
logger.info(f"TTS: {len(text)} chars -> {len(data)/sr:.1f}s audio")
return wav_path, data, sr
# ── Audio chunk preparation ──────────────────────────────────────────────────
def prepare_audio_chunks(audio_int16: np.ndarray, sr: int) -> list[AudioChunk]:
"""Convert int16 audio array to bitHuman AudioChunks."""
audio_float = audio_int16.astype(np.float32) / 32768.0
chunk_duration = 0.04 # 40ms
chunk_samples = int(sr * chunk_duration)
chunks = []
for i in range(0, len(audio_float), chunk_samples):
chunk = audio_float[i:i + chunk_samples]
is_last = (i + chunk_samples >= len(audio_float))
chunks.append(AudioChunk(data=chunk, sample_rate=sr, last_chunk=is_last))
return chunks
async def run():
"""Main loop: bitHuman + Grok + TTS, all wired through LiveKit."""
# 1. Initialize bitHuman
logger.info("Initializing bitHuman neural renderer...")
bh = AsyncBithuman(api_secret=BITHUMAN_API_SECRET)
eve_model = "C:/Users/geaux/myeden/reference/eve_bithuman.imx"
if not os.path.exists(eve_model):
# Try downloading from Supabase if not local
eve_model = os.path.join(tempfile.gettempdir(), "eve_bithuman.imx")
if not os.path.exists(eve_model):
logger.info("Downloading Eve .imx model...")
import urllib.request
urllib.request.urlretrieve(
"https://tmoobjxlwcwvxvjeppzq.supabase.co/storage/v1/object/public/bithuman/A18QDC2260/eve__warm_digital_companion_20260403_043223_153938.imx",
eve_model,
)
logger.info("Eve model downloaded!")
logger.info(f"Loading Eve neural model: {eve_model}")
await bh.set_model(eve_model)
await bh.load_data_async()
logger.info("Eve neural model loaded!")
first_frame = bh.get_first_frame()
if first_frame is None:
logger.error("bitHuman failed to generate first frame")
return
h, w = first_frame.shape[:2]
logger.info(f"bitHuman ready! Frame: {w}x{h}")
await bh.start()
# 2. Connect to LiveKit as Eve
token = (
lk_api.AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET)
.with_identity("eve-avatar")
.with_name("Eve")
.with_grants(lk_api.VideoGrants(room_join=True, room="eden-room"))
.to_jwt()
)
room = rtc.Room()
await room.connect(LIVEKIT_URL, token)
logger.info(f"Connected to LiveKit room: {room.name}")
# Create video + audio tracks
video_source = rtc.VideoSource(w, h)
video_track = rtc.LocalVideoTrack.create_video_track("eve-video", video_source)
audio_source = rtc.AudioSource(24000, 1)
audio_track = rtc.LocalAudioTrack.create_audio_track("eve-audio", audio_source)
await room.local_participant.publish_track(video_track)
await room.local_participant.publish_track(audio_track)
logger.info("Video + audio tracks published")
# Shared state for audio chunks queue
audio_queue: asyncio.Queue[list[AudioChunk]] = asyncio.Queue()
# Current chunks being rendered
current_chunks: list[AudioChunk] = []
chunk_idx = 0
chunk_lock = asyncio.Lock()
# 3. Handle incoming chat messages via LiveKit data channel
async def handle_chat(text: str):
"""Process a chat message: Grok -> TTS -> bitHuman audio queue."""
nonlocal current_chunks, chunk_idx
logger.info(f"Chat received: '{text[:50]}'")
# Generate response
response = await grok_respond(text)
logger.info(f"Eve says: '{response[:50]}'")
# Send text response back via data channel
reply_data = json.dumps({"type": "eve_response", "text": response}).encode()
await room.local_participant.publish_data(reply_data, reliable=True)
# Generate TTS audio
try:
wav_path, audio_int16, sr = await generate_tts_wav(response)
except Exception as e:
logger.error(f"TTS failed: {e}")
return
# Prepare audio chunks for bitHuman
chunks = prepare_audio_chunks(audio_int16, sr)
logger.info(f"Queuing {len(chunks)} audio chunks for lip sync")
# Stream audio to LiveKit for the browser to hear
asyncio.create_task(stream_lk_audio(audio_source, wav_path, sr))
# Queue chunks for the render loop
await audio_queue.put(chunks)
async def stream_lk_audio(source: rtc.AudioSource, wav_path: str, sr: int):
"""Stream WAV audio to LiveKit audio track."""
data_i16, _ = sf.read(wav_path, dtype="int16")
lk_chunk_size = int(sr * 0.02) # 20ms chunks
for i in range(0, len(data_i16), lk_chunk_size):
chunk = data_i16[i:i + lk_chunk_size]
if len(chunk) < lk_chunk_size:
chunk = np.pad(chunk, (0, lk_chunk_size - len(chunk)))
frame = rtc.AudioFrame(
data=chunk.tobytes(),
sample_rate=sr,
num_channels=1,
samples_per_channel=len(chunk),
)
await source.capture_frame(frame)
await asyncio.sleep(0.02)
logger.info("LiveKit audio stream complete")
# Listen for data channel messages
@room.on("data_received")
def on_data(data: rtc.DataPacket):
try:
msg = json.loads(data.data.decode())
if msg.get("type") == "chat":
text = msg.get("text", "").strip()
if text:
asyncio.create_task(handle_chat(text))
except Exception as e:
logger.error(f"Data parse error: {e}")
# 4. Send greeting
logger.info("Generating Eve's greeting...")
greeting = (
"Hi! My name is Eve, and I am so happy to finally meet you! "
"I've been looking forward to this moment. What's your name?"
)
# Send greeting text via data channel
greeting_data = json.dumps({"type": "eve_response", "text": greeting}).encode()
await room.local_participant.publish_data(greeting_data, reliable=True)
# Generate greeting TTS
try:
wav_path, audio_int16, sr = await generate_tts_wav(greeting)
chunks = prepare_audio_chunks(audio_int16, sr)
await audio_queue.put(chunks)
asyncio.create_task(stream_lk_audio(audio_source, wav_path, sr))
logger.info(f"Greeting queued: {len(chunks)} chunks")
except Exception as e:
logger.error(f"Greeting TTS failed: {e}")
# 5. Main render loop
logger.info(f"Starting render loop at {FPS}fps β€” Eve is ALIVE!")
frame_duration = 1.0 / FPS
frame_count = 0
active_chunks: list[AudioChunk] = []
active_idx = 0
while True:
t0 = time.time()
# Check for new audio chunks from queue
if active_idx >= len(active_chunks):
try:
active_chunks = audio_queue.get_nowait()
active_idx = 0
logger.info(f"Rendering new audio: {len(active_chunks)} chunks")
except asyncio.QueueEmpty:
active_chunks = []
active_idx = 0
# Build VideoControl with audio chunk or idle
if active_idx < len(active_chunks):
control = VideoControl(audio=active_chunks[active_idx])
active_idx += 1
else:
control = VideoControl()
# Render frame via bitHuman
for video_frame in bh.process(control):
if video_frame is not None and video_frame.has_image:
rgb = video_frame.rgb_image
rgba = cv2.cvtColor(rgb, cv2.COLOR_RGB2RGBA)
lk_frame = rtc.VideoFrame(
rgba.shape[1], rgba.shape[0],
rtc.VideoBufferType.RGBA,
rgba.tobytes(),
)
video_source.capture_frame(lk_frame)
frame_count += 1
if frame_count % 500 == 0:
logger.info(f"Streamed {frame_count} neural frames")
elapsed = time.time() - t0
sleep_time = max(0, frame_duration - elapsed)
await asyncio.sleep(sleep_time)
if __name__ == "__main__":
logger.info("=" * 50)
logger.info("EDEN OS V2 β€” bitHuman + Grok Brain + LiveKit")
logger.info(f" Eve: {EVE_IMAGE}")
logger.info(f" LiveKit: {LIVEKIT_URL}")
logger.info(f" Grok: {'configured' if XAI_API_KEY else 'MISSING'}")
logger.info(f" bitHuman: {'configured' if BITHUMAN_API_SECRET else 'MISSING'}")
logger.info("=" * 50)
asyncio.run(run())