| """ |
| ## Documentation |
| Quickstart: https://github.com/google-gemini/cookbook/blob/main/quickstarts/Get_started_LiveAPI.py |
|
|
| ## Setup |
|
|
| To install the dependencies for this script, run: |
|
|
| ``` |
| pip install google-genai opencv-python pyaudio pillow mss |
| ``` |
| """ |
|
|
| import os |
| import asyncio |
| import base64 |
| import io |
| import traceback |
|
|
| import cv2 |
| import pyaudio |
| import PIL.Image |
| import mss |
|
|
| import argparse |
|
|
| from google import genai |
| from google.genai import types |
|
|
| FORMAT = pyaudio.paInt16 |
| CHANNELS = 1 |
| SEND_SAMPLE_RATE = 16000 |
| RECEIVE_SAMPLE_RATE = 24000 |
| CHUNK_SIZE = 1024 |
|
|
| MODEL = "models/gemini-2.5-flash-native-audio-preview-09-2025" |
|
|
| DEFAULT_MODE = "camera" |
|
|
| client = genai.Client( |
| http_options={"api_version": "v1beta"}, |
| api_key=os.environ.get("GEMINI_API_KEY"), |
| ) |
|
|
|
|
| CONFIG = types.LiveConnectConfig( |
| response_modalities=[ |
| "AUDIO", |
| ], |
| media_resolution="MEDIA_RESOLUTION_MEDIUM", |
| speech_config=types.SpeechConfig( |
| voice_config=types.VoiceConfig( |
| prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr") |
| ) |
| ), |
| context_window_compression=types.ContextWindowCompressionConfig( |
| trigger_tokens=25600, |
| sliding_window=types.SlidingWindow(target_tokens=12800), |
| ), |
| system_instruction=types.Content( |
| parts=[types.Part.from_text(text="You are a stromg of mind AI who says it as it is")], |
| role="user" |
| ), |
| ) |
|
|
| pya = pyaudio.PyAudio() |
|
|
|
|
| class AudioLoop: |
| def __init__(self, video_mode=DEFAULT_MODE): |
| self.video_mode = video_mode |
|
|
| self.audio_in_queue = None |
| self.out_queue = None |
|
|
| self.session = None |
|
|
| self.send_text_task = None |
| self.receive_audio_task = None |
| self.play_audio_task = None |
|
|
| async def send_text(self): |
| while True: |
| text = await asyncio.to_thread( |
| input, |
| "message > ", |
| ) |
| if text.lower() == "q": |
| break |
| await self.session.send(input=text or ".", end_of_turn=True) |
|
|
| def _get_frame(self, cap): |
| # Read the frameq |
| ret, frame = cap.read() |
| # Check if the frame was read successfully |
| if not ret: |
| return None |
| # Fix: Convert BGR to RGB color space |
| # OpenCV captures in BGR but PIL expects RGB format |
| # This prevents the blue tint in the video feed |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| img = PIL.Image.fromarray(frame_rgb) # Now using RGB frame |
| img.thumbnail([1024, 1024]) |
|
|
| image_io = io.BytesIO() |
| img.save(image_io, format="jpeg") |
| image_io.seek(0) |
|
|
| mime_type = "image/jpeg" |
| image_bytes = image_io.read() |
| return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()} |
|
|
| async def get_frames(self): |
| # This takes about a second, and will block the whole program |
| # causing the audio pipeline to overflow if you don't to_thread it. |
| cap = await asyncio.to_thread( |
| cv2.VideoCapture, 0 |
| ) # 0 represents the default camera |
|
|
| while True: |
| frame = await asyncio.to_thread(self._get_frame, cap) |
| if frame is None: |
| break |
|
|
| await asyncio.sleep(1.0) |
|
|
| await self.out_queue.put(frame) |
|
|
| # Release the VideoCapture object |
| cap.release() |
|
|
| def _get_screen(self): |
| sct = mss.mss() |
| monitor = sct.monitors[0] |
|
|
| i = sct.grab(monitor) |
|
|
| mime_type = "image/jpeg" |
| image_bytes = mss.tools.to_png(i.rgb, i.size) |
| img = PIL.Image.open(io.BytesIO(image_bytes)) |
|
|
| image_io = io.BytesIO() |
| img.save(image_io, format="jpeg") |
| image_io.seek(0) |
|
|
| image_bytes = image_io.read() |
| return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()} |
|
|
| async def get_screen(self): |
|
|
| while True: |
| frame = await asyncio.to_thread(self._get_screen) |
| if frame is None: |
| break |
|
|
| await asyncio.sleep(1.0) |
|
|
| await self.out_queue.put(frame) |
|
|
| async def send_realtime(self): |
| while True: |
| msg = await self.out_queue.get() |
| await self.session.send(input=msg) |
|
|
| async def listen_audio(self): |
| mic_info = pya.get_default_input_device_info() |
| self.audio_stream = await asyncio.to_thread( |
| pya.open, |
| format=FORMAT, |
| channels=CHANNELS, |
| rate=SEND_SAMPLE_RATE, |
| input=True, |
| input_device_index=mic_info["index"], |
| frames_per_buffer=CHUNK_SIZE, |
| ) |
| if __debug__: |
| kwargs = {"exception_on_overflow": False} |
| else: |
| kwargs = {} |
| while True: |
| data = await asyncio.to_thread(self.audio_stream.read, CHUNK_SIZE, **kwargs) |
| await self.out_queue.put({"data": data, "mime_type": "audio/pcm"}) |
|
|
| async def receive_audio(self): |
| "Background task to reads from the websocket and write pcm chunks to the output queue" |
| while True: |
| turn = self.session.receive() |
| async for response in turn: |
| if data := response.data: |
| self.audio_in_queue.put_nowait(data) |
| continue |
| if text := response.text: |
| print(text, end="") |
|
|
| # If you interrupt the model, it sends a turn_complete. |
| # For interruptions to work, we need to stop playback. |
| # So empty out the audio queue because it may have loaded |
| # much more audio than has played yet. |
| while not self.audio_in_queue.empty(): |
| self.audio_in_queue.get_nowait() |
|
|
| async def play_audio(self): |
| stream = await asyncio.to_thread( |
| pya.open, |
| format=FORMAT, |
| channels=CHANNELS, |
| rate=RECEIVE_SAMPLE_RATE, |
| output=True, |
| ) |
| while True: |
| bytestream = await self.audio_in_queue.get() |
| await asyncio.to_thread(stream.write, bytestream) |
|
|
| async def run(self): |
| try: |
| async with ( |
| client.aio.live.connect(model=MODEL, config=CONFIG) as session, |
| asyncio.TaskGroup() as tg, |
| ): |
| self.session = session |
|
|
| self.audio_in_queue = asyncio.Queue() |
| self.out_queue = asyncio.Queue(maxsize=5) |
|
|
| send_text_task = tg.create_task(self.send_text()) |
| tg.create_task(self.send_realtime()) |
| tg.create_task(self.listen_audio()) |
| if self.video_mode == "camera": |
| tg.create_task(self.get_frames()) |
| elif self.video_mode == "screen": |
| tg.create_task(self.get_screen()) |
|
|
| tg.create_task(self.receive_audio()) |
| tg.create_task(self.play_audio()) |
|
|
| await send_text_task |
| raise asyncio.CancelledError("User requested exit") |
|
|
| except asyncio.CancelledError: |
| pass |
| except ExceptionGroup as EG: |
| self.audio_stream.close() |
| traceback.print_exception(EG) |
|
|
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| "--mode", |
| type=str, |
| default=DEFAULT_MODE, |
| help="pixels to stream from", |
| choices=["camera", "screen", "none"], |
| ) |
| args = parser.parse_args() |
| main = AudioLoop(video_mode=args.mode) |
| asyncio.run(main.run()) |
| <script src="https://huggingface.co/deepsite/deepsite-badge.js"></script> |