Spaces:
Sleeping
Sleeping
| import os | |
| import asyncio | |
| from quart import Quart, websocket | |
| from google import genai | |
| from google.genai import types | |
| app = Quart(__name__) | |
| # Ensure your HF Space has GEMINI_API_KEY set in its secrets/environment variables | |
| client = genai.Client() | |
| # Note: The official live model name is currently gemini-2.0-flash-exp. | |
| # Update this if you have specific access to a 3.1 live preview endpoint. | |
| MODEL = "gemini-2.0-flash-exp" | |
| VOICE_MODES = { | |
| 'Zephyr': 'Zephyr', # Default / Balanced | |
| 'Puck': 'Puck', # Energetic / Bright | |
| 'Charon': 'Charon', # Deep / Calm | |
| 'Kore': 'Kore', # Soft / Warm | |
| 'Fenrir': 'Fenrir' # Formal / Sharp | |
| } | |
| async def ws_stream(): | |
| """ | |
| WebSocket endpoint for the Termux client. | |
| Connect via: ws://<hf-space-url>/stream?voice=Zephyr | |
| """ | |
| # Grab the requested voice from the URL parameter, default to Zephyr | |
| requested_voice = websocket.args.get("voice", "Zephyr") | |
| voice_name = VOICE_MODES.get(requested_voice, "Zephyr") | |
| # Mirroring your TS configuration | |
| config = types.LiveConnectConfig( | |
| response_modalities=[types.LiveModality.AUDIO], | |
| speech_config=types.SpeechConfig( | |
| voice_config=types.VoiceConfig( | |
| prebuilt_voice_config=types.PrebuiltVoiceConfig( | |
| voice_name=voice_name | |
| ) | |
| ) | |
| ), | |
| tools=[{"google_search": {}}], | |
| system_instruction=types.Content( | |
| parts=[types.Part.from_text( | |
| "You are ASH-BAND, a high-fidelity AI wearable companion. " | |
| "Speak in a professional, concise, and helpful tone. " | |
| "You have access to Google Search. Keep responses brief to minimize latency. " | |
| "Your responses are spoken aloud." | |
| )] | |
| ) | |
| ) | |
| print(f"Connecting to Gemini Live API with voice: {voice_name}...") | |
| try: | |
| async with client.aio.live.connect(model=MODEL, config=config) as session: | |
| print("Live session established.") | |
| # Task 1: Stream audio from Client (Termux) -> Gemini | |
| async def client_to_gemini(): | |
| try: | |
| while True: | |
| # Receive audio chunks from the client | |
| data = await websocket.receive() | |
| if isinstance(data, bytes): | |
| # The TS file was downsampling to 16000Hz PCM | |
| await session.send( | |
| input={"data": data, "mime_type": "audio/pcm;rate=16000"} | |
| ) | |
| except asyncio.CancelledError: | |
| pass | |
| except Exception as e: | |
| print(f"Error reading from client: {e}") | |
| # Task 2: Stream audio from Gemini -> Client (Termux) | |
| async def gemini_to_client(): | |
| try: | |
| async for message in session.receive(): | |
| server_content = message.server_content | |
| if server_content is not None: | |
| # Handle Interruption | |
| if server_content.interrupted: | |
| print("AI Interrupted by user.") | |
| # In a more complex setup, send a control message to client to clear audio queue | |
| model_turn = server_content.model_turn | |
| if model_turn is not None: | |
| for part in model_turn.parts: | |
| # Output raw audio back to the client | |
| if part.inline_data and part.inline_data.data: | |
| # Gemini returns 24kHz PCM audio | |
| await websocket.send(part.inline_data.data) | |
| except asyncio.CancelledError: | |
| pass | |
| except Exception as e: | |
| print(f"Error receiving from Gemini: {e}") | |
| # Run both streaming directions concurrently | |
| task1 = asyncio.create_task(client_to_gemini()) | |
| task2 = asyncio.create_task(gemini_to_client()) | |
| # Wait until one of the connections drops | |
| done, pending = await asyncio.wait( | |
| [task1, task2], | |
| return_when=asyncio.FIRST_COMPLETED, | |
| ) | |
| # Clean up the remaining task | |
| for p in pending: | |
| p.cancel() | |
| except Exception as e: | |
| print(f"Connection failed: {e}") | |
| # Hugging Face Spaces standard port is 7860 | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) | |