import os import asyncio from quart import Quart, websocket from google import genai from google.genai import types app = Quart(__name__) # Ensure your HF Space has GEMINI_API_KEY set in its secrets/environment variables client = genai.Client() # Note: The official live model name is currently gemini-2.0-flash-exp. # Update this if you have specific access to a 3.1 live preview endpoint. MODEL = "gemini-2.0-flash-exp" VOICE_MODES = { 'Zephyr': 'Zephyr', # Default / Balanced 'Puck': 'Puck', # Energetic / Bright 'Charon': 'Charon', # Deep / Calm 'Kore': 'Kore', # Soft / Warm 'Fenrir': 'Fenrir' # Formal / Sharp } @app.websocket('/stream') async def ws_stream(): """ WebSocket endpoint for the Termux client. Connect via: ws:///stream?voice=Zephyr """ # Grab the requested voice from the URL parameter, default to Zephyr requested_voice = websocket.args.get("voice", "Zephyr") voice_name = VOICE_MODES.get(requested_voice, "Zephyr") # Mirroring your TS configuration config = types.LiveConnectConfig( response_modalities=[types.LiveModality.AUDIO], speech_config=types.SpeechConfig( voice_config=types.VoiceConfig( prebuilt_voice_config=types.PrebuiltVoiceConfig( voice_name=voice_name ) ) ), tools=[{"google_search": {}}], system_instruction=types.Content( parts=[types.Part.from_text( "You are ASH-BAND, a high-fidelity AI wearable companion. " "Speak in a professional, concise, and helpful tone. " "You have access to Google Search. Keep responses brief to minimize latency. " "Your responses are spoken aloud." )] ) ) print(f"Connecting to Gemini Live API with voice: {voice_name}...") try: async with client.aio.live.connect(model=MODEL, config=config) as session: print("Live session established.") # Task 1: Stream audio from Client (Termux) -> Gemini async def client_to_gemini(): try: while True: # Receive audio chunks from the client data = await websocket.receive() if isinstance(data, bytes): # The TS file was downsampling to 16000Hz PCM await session.send( input={"data": data, "mime_type": "audio/pcm;rate=16000"} ) except asyncio.CancelledError: pass except Exception as e: print(f"Error reading from client: {e}") # Task 2: Stream audio from Gemini -> Client (Termux) async def gemini_to_client(): try: async for message in session.receive(): server_content = message.server_content if server_content is not None: # Handle Interruption if server_content.interrupted: print("AI Interrupted by user.") # In a more complex setup, send a control message to client to clear audio queue model_turn = server_content.model_turn if model_turn is not None: for part in model_turn.parts: # Output raw audio back to the client if part.inline_data and part.inline_data.data: # Gemini returns 24kHz PCM audio await websocket.send(part.inline_data.data) except asyncio.CancelledError: pass except Exception as e: print(f"Error receiving from Gemini: {e}") # Run both streaming directions concurrently task1 = asyncio.create_task(client_to_gemini()) task2 = asyncio.create_task(gemini_to_client()) # Wait until one of the connections drops done, pending = await asyncio.wait( [task1, task2], return_when=asyncio.FIRST_COMPLETED, ) # Clean up the remaining task for p in pending: p.cancel() except Exception as e: print(f"Connection failed: {e}") # Hugging Face Spaces standard port is 7860 if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)