| """ | |
| ## Documentation | |
| Quickstart: https://github.com/google-gemini/cookbook/blob/main/quickstarts/Get_started_LiveAPI.py | |
| ## Setup | |
| To install the dependencies for this script, run: | |
| ``` | |
| pip install google-genai opencv-python pyaudio pillow mss | |
| ``` | |
| """ | |
| import os | |
| import asyncio | |
| import base64 | |
| import io | |
| import traceback | |
| import cv2 | |
| import pyaudio | |
| import PIL.Image | |
| import mss | |
| import argparse | |
| from google import genai | |
| from google.genai import types | |
| FORMAT = pyaudio.paInt16 | |
| CHANNELS = 1 | |
| SEND_SAMPLE_RATE = 16000 | |
| RECEIVE_SAMPLE_RATE = 24000 | |
| CHUNK_SIZE = 1024 | |
| MODEL = "models/gemini-2.5-flash-native-audio-preview-09-2025" | |
| DEFAULT_MODE = "camera" | |
| client = genai.Client( | |
| http_options={"api_version": "v1beta"}, | |
| api_key=os.environ.get("GEMINI_API_KEY"), | |
| ) | |
| CONFIG = types.LiveConnectConfig( | |
| response_modalities=[ | |
| "AUDIO", | |
| ], | |
| media_resolution="MEDIA_RESOLUTION_MEDIUM", | |
| speech_config=types.SpeechConfig( | |
| voice_config=types.VoiceConfig( | |
| prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr") | |
| ) | |
| ), | |
| context_window_compression=types.ContextWindowCompressionConfig( | |
| trigger_tokens=25600, | |
| sliding_window=types.SlidingWindow(target_tokens=12800), | |
| ), | |
| system_instruction=types.Content( | |
| parts=[types.Part.from_text(text="You are a stromg of mind AI who says it as it is")], | |
| role="user" | |
| ), | |
| ) | |
| pya = pyaudio.PyAudio() | |
| class AudioLoop: | |
| def __init__(self, video_mode=DEFAULT_MODE): | |
| self.video_mode = video_mode | |
| self.audio_in_queue = None | |
| self.out_queue = None | |
| self.session = None | |
| self.send_text_task = None | |
| self.receive_audio_task = None | |
| self.play_audio_task = None | |
| async def send_text(self): | |
| while True: | |
| text = await asyncio.to_thread( | |
| input, | |
| "message > ", | |
| ) | |
| if text.lower() == "q": | |
| break | |
| await self.session.send(input=text or ".", end_of_turn=True) | |
| def _get_frame(self, cap): | |
| # Read the frameq | |
| ret, frame = cap.read() | |
| # Check if the frame was read successfully | |
| if not ret: | |
| return None | |
| # Fix: Convert BGR to RGB color space | |
| # OpenCV captures in BGR but PIL expects RGB format | |
| # This prevents the blue tint in the video feed | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| img = PIL.Image.fromarray(frame_rgb) # Now using RGB frame | |
| img.thumbnail([1024, 1024]) | |
| image_io = io.BytesIO() | |
| img.save(image_io, format="jpeg") | |
| image_io.seek(0) | |
| mime_type = "image/jpeg" | |
| image_bytes = image_io.read() | |
| return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()} | |
| async def get_frames(self): | |
| # This takes about a second, and will block the whole program | |
| # causing the audio pipeline to overflow if you don't to_thread it. | |
| cap = await asyncio.to_thread( | |
| cv2.VideoCapture, 0 | |
| ) # 0 represents the default camera | |
| while True: | |
| frame = await asyncio.to_thread(self._get_frame, cap) | |
| if frame is None: | |
| break | |
| await asyncio.sleep(1.0) | |
| await self.out_queue.put(frame) | |
| # Release the VideoCapture object | |
| cap.release() | |
| def _get_screen(self): | |
| sct = mss.mss() | |
| monitor = sct.monitors[0] | |
| i = sct.grab(monitor) | |
| mime_type = "image/jpeg" | |
| image_bytes = mss.tools.to_png(i.rgb, i.size) | |
| img = PIL.Image.open(io.BytesIO(image_bytes)) | |
| image_io = io.BytesIO() | |
| img.save(image_io, format="jpeg") | |
| image_io.seek(0) | |
| image_bytes = image_io.read() | |
| return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()} | |
| async def get_screen(self): | |
| while True: | |
| frame = await asyncio.to_thread(self._get_screen) | |
| if frame is None: | |
| break | |
| await asyncio.sleep(1.0) | |
| await self.out_queue.put(frame) | |
| async def send_realtime(self): | |
| while True: | |
| msg = await self.out_queue.get() | |
| await self.session.send(input=msg) | |
| async def listen_audio(self): | |
| mic_info = pya.get_default_input_device_info() | |
| self.audio_stream = await asyncio.to_thread( | |
| pya.open, | |
| format=FORMAT, | |
| channels=CHANNELS, | |
| rate=SEND_SAMPLE_RATE, | |
| input=True, | |
| input_device_index=mic_info["index"], | |
| frames_per_buffer=CHUNK_SIZE, | |
| ) | |
| if __debug__: | |
| kwargs = {"exception_on_overflow": False} | |
| else: | |
| kwargs = {} | |
| while True: | |
| data = await asyncio.to_thread(self.audio_stream.read, CHUNK_SIZE, **kwargs) | |
| await self.out_queue.put({"data": data, "mime_type": "audio/pcm"}) | |
| async def receive_audio(self): | |
| "Background task to reads from the websocket and write pcm chunks to the output queue" | |
| while True: | |
| turn = self.session.receive() | |
| async for response in turn: | |
| if data := response.data: | |
| self.audio_in_queue.put_nowait(data) | |
| continue | |
| if text := response.text: | |
| print(text, end="") | |
| # If you interrupt the model, it sends a turn_complete. | |
| # For interruptions to work, we need to stop playback. | |
| # So empty out the audio queue because it may have loaded | |
| # much more audio than has played yet. | |
| while not self.audio_in_queue.empty(): | |
| self.audio_in_queue.get_nowait() | |
| async def play_audio(self): | |
| stream = await asyncio.to_thread( | |
| pya.open, | |
| format=FORMAT, | |
| channels=CHANNELS, | |
| rate=RECEIVE_SAMPLE_RATE, | |
| output=True, | |
| ) | |
| while True: | |
| bytestream = await self.audio_in_queue.get() | |
| await asyncio.to_thread(stream.write, bytestream) | |
| async def run(self): | |
| try: | |
| async with ( | |
| client.aio.live.connect(model=MODEL, config=CONFIG) as session, | |
| asyncio.TaskGroup() as tg, | |
| ): | |
| self.session = session | |
| self.audio_in_queue = asyncio.Queue() | |
| self.out_queue = asyncio.Queue(maxsize=5) | |
| send_text_task = tg.create_task(self.send_text()) | |
| tg.create_task(self.send_realtime()) | |
| tg.create_task(self.listen_audio()) | |
| if self.video_mode == "camera": | |
| tg.create_task(self.get_frames()) | |
| elif self.video_mode == "screen": | |
| tg.create_task(self.get_screen()) | |
| tg.create_task(self.receive_audio()) | |
| tg.create_task(self.play_audio()) | |
| await send_text_task | |
| raise asyncio.CancelledError("User requested exit") | |
| except asyncio.CancelledError: | |
| pass | |
| except ExceptionGroup as EG: | |
| self.audio_stream.close() | |
| traceback.print_exception(EG) | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument( | |
| "--mode", | |
| type=str, | |
| default=DEFAULT_MODE, | |
| help="pixels to stream from", | |
| choices=["camera", "screen", "none"], | |
| ) | |
| args = parser.parse_args() | |
| main = AudioLoop(video_mode=args.mode) | |
| asyncio.run(main.run()) | |
| <script src="https://huggingface.co/deepsite/deepsite-badge.js"></script> |