File size: 3,321 Bytes
87461f5
 
b65e9d6
87461f5
 
 
 
b65e9d6
ae061ef
87461f5
478d03a
87461f5
 
91a7eef
 
 
 
 
87461f5
 
07104ff
 
 
 
b65e9d6
 
 
 
 
07104ff
 
b65e9d6
 
 
 
 
 
 
 
87461f5
 
 
 
 
06aae43
 
 
 
 
 
 
 
 
 
b65e9d6
87461f5
06aae43
87461f5
b65e9d6
87461f5
 
 
 
 
 
 
 
 
 
b65e9d6
91a7eef
b65e9d6
2433d67
87461f5
b65e9d6
87461f5
 
 
 
91a7eef
 
 
 
87461f5
b65e9d6
87461f5
b65e9d6
2433d67
87461f5
 
b65e9d6
91a7eef
 
87461f5
 
06aae43
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import os
import asyncio
from quart import Quart, websocket, request
from google import genai

app = Quart(__name__)

# Force v1beta for Live/Native Audio features
client = genai.Client(http_options={'api_version': 'v1beta'})

MODEL = "models/gemini-2.5-flash-native-audio-preview-12-2025" 

VOICE_MODES = {
    'Zephyr': 'Zephyr',
    'Puck': 'Puck',
    'Charon': 'Charon',
    'Kore': 'Kore',
    'Fenrir': 'Fenrir'
}

@app.route('/')
async def index():
    return {
        "status": "online",
        "service": "ASH-BAND Neural Link",
        "usage": {
            "websocket": "/stream",
            "text_trigger": "/prompt?text=Hello"
        }
    }

@app.route('/prompt')
async def trigger_prompt():
    """Simple GET endpoint to test text input without websocat pipes."""
    text = request.args.get("text", "System check.")
    # This just confirms the server received your curl. 
    # To hear it, you still need to be connected to the /stream websocket.
    return {"status": "sent_to_engine", "text": text}

@app.websocket('/stream')
async def ws_stream():
    requested_voice = websocket.args.get("voice", "Zephyr")
    voice_name = VOICE_MODES.get(requested_voice, "Zephyr")

    config = {
        "response_modalities": ["AUDIO"],
        "speech_config": {
            "voice_config": {
                "prebuilt_voice_config": {"voice_name": voice_name}
            }
        },
        "tools": [{"google_search": {}}],
        "system_instruction": (
            "You are ASH-BAND, a high-fidelity AI wearable companion. "
            "Speak concisely and professionally. Responses are spoken aloud."
        )
    }

    print(f"Connecting to Gemini Live API...")
    
    try:
        async with client.aio.live.connect(model=MODEL, config=config) as session:
            print("Live session established.")

            async def client_to_gemini():
                try:
                    while True:
                        data = await websocket.receive()
                        if isinstance(data, bytes):
                            await session.send(input={"data": data, "mime_type": "audio/pcm;rate=16000"})
                        elif isinstance(data, str):
                            # The 'end_of_turn' is CRITICAL for echo/text to trigger audio
                            await session.send(input=data, end_of_turn=True)
                except Exception as e:
                    print(f"Input Error: {e}")

            async def gemini_to_client():
                try:
                    async for message in session.receive():
                        if message.server_content and message.server_content.model_turn:
                            for part in message.server_content.model_turn.parts:
                                if part.inline_data and part.inline_data.data:
                                    await websocket.send(part.inline_data.data)
                except Exception as e:
                    print(f"Output Error: {e}")

            # Keep both directions alive
            await asyncio.gather(client_to_gemini(), gemini_to_client(), return_exceptions=True)
                
    except Exception as e:
        print(f"Link Failed: {e}")
    finally:
        print("Session closed.")

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=7860)