File size: 4,800 Bytes
87461f5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import os
import asyncio
from quart import Quart, websocket
from google import genai
from google.genai import types

app = Quart(__name__)

# Ensure your HF Space has GEMINI_API_KEY set in its secrets/environment variables
client = genai.Client()

# Note: The official live model name is currently gemini-2.0-flash-exp. 
# Update this if you have specific access to a 3.1 live preview endpoint.
MODEL = "gemini-2.0-flash-exp" 

VOICE_MODES = {
    'Zephyr': 'Zephyr', # Default / Balanced
    'Puck': 'Puck',     # Energetic / Bright
    'Charon': 'Charon', # Deep / Calm
    'Kore': 'Kore',     # Soft / Warm
    'Fenrir': 'Fenrir'  # Formal / Sharp
}

@app.websocket('/stream')
async def ws_stream():
    """
    WebSocket endpoint for the Termux client.
    Connect via: ws://<hf-space-url>/stream?voice=Zephyr
    """
    # Grab the requested voice from the URL parameter, default to Zephyr
    requested_voice = websocket.args.get("voice", "Zephyr")
    voice_name = VOICE_MODES.get(requested_voice, "Zephyr")

    # Mirroring your TS configuration
    config = types.LiveConnectConfig(
        response_modalities=[types.LiveModality.AUDIO],
        speech_config=types.SpeechConfig(
            voice_config=types.VoiceConfig(
                prebuilt_voice_config=types.PrebuiltVoiceConfig(
                    voice_name=voice_name
                )
            )
        ),
        tools=[{"google_search": {}}],
        system_instruction=types.Content(
            parts=[types.Part.from_text(
                "You are ASH-BAND, a high-fidelity AI wearable companion. "
                "Speak in a professional, concise, and helpful tone. "
                "You have access to Google Search. Keep responses brief to minimize latency. "
                "Your responses are spoken aloud."
            )]
        )
    )

    print(f"Connecting to Gemini Live API with voice: {voice_name}...")
    
    try:
        async with client.aio.live.connect(model=MODEL, config=config) as session:
            print("Live session established.")

            # Task 1: Stream audio from Client (Termux) -> Gemini
            async def client_to_gemini():
                try:
                    while True:
                        # Receive audio chunks from the client
                        data = await websocket.receive()
                        if isinstance(data, bytes):
                            # The TS file was downsampling to 16000Hz PCM
                            await session.send(
                                input={"data": data, "mime_type": "audio/pcm;rate=16000"}
                            )
                except asyncio.CancelledError:
                    pass
                except Exception as e:
                    print(f"Error reading from client: {e}")

            # Task 2: Stream audio from Gemini -> Client (Termux)
            async def gemini_to_client():
                try:
                    async for message in session.receive():
                        server_content = message.server_content
                        if server_content is not None:
                            # Handle Interruption
                            if server_content.interrupted:
                                print("AI Interrupted by user.")
                                # In a more complex setup, send a control message to client to clear audio queue
                                
                            model_turn = server_content.model_turn
                            if model_turn is not None:
                                for part in model_turn.parts:
                                    # Output raw audio back to the client
                                    if part.inline_data and part.inline_data.data:
                                        # Gemini returns 24kHz PCM audio
                                        await websocket.send(part.inline_data.data)
                except asyncio.CancelledError:
                    pass
                except Exception as e:
                    print(f"Error receiving from Gemini: {e}")

            # Run both streaming directions concurrently
            task1 = asyncio.create_task(client_to_gemini())
            task2 = asyncio.create_task(gemini_to_client())
            
            # Wait until one of the connections drops
            done, pending = await asyncio.wait(
                [task1, task2],
                return_when=asyncio.FIRST_COMPLETED,
            )
            
            # Clean up the remaining task
            for p in pending:
                p.cancel()
                
    except Exception as e:
        print(f"Connection failed: {e}")

# Hugging Face Spaces standard port is 7860
if __name__ == "__main__":
    app.run(host="0.0.0.0", port=7860)