import os import asyncio from quart import Quart, websocket, request from google import genai app = Quart(__name__) # Force v1beta for Live/Native Audio features client = genai.Client(http_options={'api_version': 'v1beta'}) MODEL = "models/gemini-2.5-flash-native-audio-preview-12-2025" VOICE_MODES = { 'Zephyr': 'Zephyr', 'Puck': 'Puck', 'Charon': 'Charon', 'Kore': 'Kore', 'Fenrir': 'Fenrir' } @app.route('/') async def index(): return { "status": "online", "service": "ASH-BAND Neural Link", "usage": { "websocket": "/stream", "text_trigger": "/prompt?text=Hello" } } @app.route('/prompt') async def trigger_prompt(): """Simple GET endpoint to test text input without websocat pipes.""" text = request.args.get("text", "System check.") # This just confirms the server received your curl. # To hear it, you still need to be connected to the /stream websocket. return {"status": "sent_to_engine", "text": text} @app.websocket('/stream') async def ws_stream(): requested_voice = websocket.args.get("voice", "Zephyr") voice_name = VOICE_MODES.get(requested_voice, "Zephyr") config = { "response_modalities": ["AUDIO"], "speech_config": { "voice_config": { "prebuilt_voice_config": {"voice_name": voice_name} } }, "tools": [{"google_search": {}}], "system_instruction": ( "You are ASH-BAND, a high-fidelity AI wearable companion. " "Speak concisely and professionally. Responses are spoken aloud." ) } print(f"Connecting to Gemini Live API...") try: async with client.aio.live.connect(model=MODEL, config=config) as session: print("Live session established.") async def client_to_gemini(): try: while True: data = await websocket.receive() if isinstance(data, bytes): await session.send(input={"data": data, "mime_type": "audio/pcm;rate=16000"}) elif isinstance(data, str): # The 'end_of_turn' is CRITICAL for echo/text to trigger audio await session.send(input=data, end_of_turn=True) except Exception as e: print(f"Input Error: {e}") async def gemini_to_client(): try: async for message in session.receive(): if message.server_content and message.server_content.model_turn: for part in message.server_content.model_turn.parts: if part.inline_data and part.inline_data.data: await websocket.send(part.inline_data.data) except Exception as e: print(f"Output Error: {e}") # Keep both directions alive await asyncio.gather(client_to_gemini(), gemini_to_client(), return_exceptions=True) except Exception as e: print(f"Link Failed: {e}") finally: print("Session closed.") if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)