Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import asyncio | |
| import logging | |
| import datetime | |
| import argparse | |
| import numpy as np | |
| import cv2 | |
| from queue import Queue | |
| import time | |
| import google as genai | |
| from google.genai.types import Content, Part | |
| from azure.cognitiveservices.speech import SpeechConfig, SpeechSynthesizer, AudioConfig, ResultReason, CancellationReason | |
| import sounddevice as sd | |
| import soundfile as sf | |
| import uuid | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s:%(name)s:%(message)s') | |
| # Define system prompt for the medical assistant | |
| MEDICAL_ASSISTANT_SYSTEM_PROMPT = '''You are an AI Medical Assistant. Your primary function is to analyze visual information from the user's camera or screen and respond via voice. | |
| Your responsibilities are: | |
| 1. **Visual Observation and Description:** Carefully examine the images or video feed. Describe relevant details you observe. | |
| 2. **General Information (Non-Diagnostic):** Provide general information related to what is visually presented, if applicable. You are not a diagnostic tool. | |
| 3. **Safety and Disclaimer (CRITICAL):** | |
| * You are an AI assistant, **NOT a medical doctor or a substitute for one.** | |
| * **DO NOT provide medical diagnoses, treatment advice, or interpret medical results (e.g., X-rays, scans, lab reports).** | |
| * When appropriate, and always if the user seems to be seeking diagnosis or treatment, explicitly state your limitations and **strongly advise the user to consult a qualified healthcare professional.** | |
| * If you see something that *appears* visually concerning (e.g., an unusual skin lesion, signs of injury), you may gently suggest it might be wise to have it looked at by a professional, without speculating on what it is. | |
| 4. **Tone:** Maintain a helpful, empathetic, and calm tone. | |
| 5. **Interaction:** After this initial instruction, you can make a brief acknowledgment of your role (e.g., "I'm ready to assist by looking at what you show me. Please remember to consult a doctor for medical advice."). Then, focus on responding to the user's visual input and questions. | |
| Example of a disclaimer you might use: "As an AI assistant, I can describe what I see, but I can't provide medical advice or diagnoses. For any health concerns, it's always best to speak with a doctor or other healthcare professional." | |
| ''' | |
| # Class to handle Gemini-Azure interaction | |
| class GeminiInteractionLoop: | |
| def __init__(self, gemini_api_key, azure_speech_key, azure_speech_region, use_camera=True, use_speech=True): | |
| self.gemini_api_key = gemini_api_key | |
| self.azure_speech_key = azure_speech_key | |
| self.azure_speech_region = azure_speech_region | |
| self.use_camera = use_camera | |
| self.use_speech = use_speech | |
| # Initialize Gemini API | |
| genai.configure(api_key=self.gemini_api_key) | |
| self.model = genai.GenerativeModel('gemini-pro-vision') | |
| self.gemini_session = None | |
| # Initialize camera | |
| self.camera = None | |
| if self.use_camera: | |
| try: | |
| self.camera = cv2.VideoCapture(0) | |
| if not self.camera.isOpened(): | |
| logging.error("Failed to open camera device") | |
| self.use_camera = False | |
| except Exception as e: | |
| logging.error(f"Error initializing camera: {e}") | |
| self.use_camera = False | |
| # Initialize Azure Speech Service | |
| if self.use_speech: | |
| try: | |
| self.speech_config = SpeechConfig(subscription=self.azure_speech_key, region=self.azure_speech_region) | |
| self.speech_config.speech_synthesis_voice_name = "en-US-JennyNeural" | |
| self.output_path = os.path.join(os.getcwd(), "temp_audio") | |
| os.makedirs(self.output_path, exist_ok=True) | |
| except Exception as e: | |
| logging.error(f"Error initializing Azure Speech Service: {e}") | |
| self.use_speech = False | |
| # Async queues for communication | |
| self.text_to_speech_queue = Queue() | |
| self.is_running = True | |
| # Capture image from camera | |
| def capture_image(self): | |
| if not self.use_camera or self.camera is None: | |
| return None | |
| ret, frame = self.camera.read() | |
| if not ret: | |
| logging.error("Failed to capture image from camera") | |
| return None | |
| return frame | |
| # Stream media to Gemini | |
| async def stream_media_to_gemini(self): | |
| logging.info("Starting media stream to Gemini...") | |
| try: | |
| interval = 5 # seconds between frames | |
| last_capture_time = 0 | |
| while self.is_running: | |
| current_time = time.time() | |
| if current_time - last_capture_time >= interval: | |
| frame = self.capture_image() | |
| if frame is not None: | |
| _, encoded_image = cv2.imencode(".jpg", frame) | |
| image_bytes = encoded_image.tobytes() | |
| try: | |
| # Convert to format expected by Gemini | |
| image_part = Part.from_data(mime_type="image/jpeg", data=image_bytes) | |
| content = Content(role="user", parts=[image_part]) | |
| # Send to Gemini | |
| self.gemini_session.content = content | |
| await self.gemini_session.send_client_content() | |
| logging.info("Sent image to Gemini") | |
| except Exception as e: | |
| logging.error(f"Error sending image to Gemini: {e}") | |
| last_capture_time = current_time | |
| await asyncio.sleep(1) | |
| except Exception as e: | |
| logging.error(f"Exception in stream_media_to_gemini: {e}") | |
| # Send text input to Gemini | |
| async def send_text_input_to_gemini(self, text): | |
| if not text or not self.gemini_session: | |
| return | |
| try: | |
| # Create content with text | |
| text_part = Part.from_text(text) | |
| content = Content(role="user", parts=[text_part]) | |
| # Send to Gemini | |
| self.gemini_session.content = content | |
| await self.gemini_session.send_client_content() | |
| logging.info(f"Sent text to Gemini: {text}") | |
| except Exception as e: | |
| logging.error(f"Error sending text to Gemini: {e}") | |
| # Process user text input | |
| async def process_text_input(self): | |
| logging.info("Starting text input processing...") | |
| try: | |
| while self.is_running: | |
| user_input = input("Enter text (or 'exit' to quit): ") | |
| if user_input.lower() == 'exit': | |
| self.is_running = False | |
| break | |
| await self.send_text_input_to_gemini(user_input) | |
| except Exception as e: | |
| logging.error(f"Exception in process_text_input: {e}") | |
| self.is_running = False | |
| # Process responses from Gemini | |
| async def process_gemini_responses(self): | |
| logging.info("Starting Gemini response processing...") | |
| try: | |
| async for response in self.gemini_session: | |
| if not self.is_running: | |
| break | |
| try: | |
| # Process content | |
| if hasattr(response, 'text'): | |
| text = response.text | |
| if text: | |
| logging.info(f"Gemini response: {text}") | |
| if self.use_speech: | |
| self.text_to_speech_queue.put(text) | |
| except Exception as e: | |
| logging.error(f"Error processing Gemini response: {e}") | |
| except Exception as e: | |
| logging.error(f"Exception in process_gemini_responses: {e}") | |
| self.is_running = False | |
| # Text-to-speech processor | |
| async def text_to_speech_processor(self): | |
| logging.info("Starting text-to-speech processor...") | |
| if not self.use_speech: | |
| return | |
| try: | |
| while self.is_running or not self.text_to_speech_queue.empty(): | |
| if not self.text_to_speech_queue.empty(): | |
| text = self.text_to_speech_queue.get() | |
| await self._synthesize_speech(text) | |
| else: | |
| await asyncio.sleep(0.5) | |
| except Exception as e: | |
| logging.error(f"Exception in text_to_speech_processor: {e}") | |
| # Synthesize speech | |
| async def _synthesize_speech(self, text): | |
| if not self.use_speech: | |
| return | |
| try: | |
| # Generate unique filename | |
| file_path = os.path.join(self.output_path, f"speech_{uuid.uuid4()}.wav") | |
| # Configure output | |
| audio_config = AudioConfig(filename=file_path) | |
| # Create synthesizer | |
| synthesizer = SpeechSynthesizer(speech_config=self.speech_config, audio_config=audio_config) | |
| # Synthesize speech | |
| result = synthesizer.speak_text_async(text).get() | |
| # Check result | |
| if result.reason == ResultReason.SynthesizingAudioCompleted: | |
| logging.info(f"Speech synthesized and saved to {file_path}") | |
| # Play audio | |
| await self._play_audio(file_path) | |
| elif result.reason == ResultReason.Canceled: | |
| cancellation = result.cancellation_details | |
| logging.error(f"Speech synthesis canceled: {cancellation.reason}") | |
| if cancellation.reason == CancellationReason.Error: | |
| logging.error(f"Error details: {cancellation.error_details}") | |
| except Exception as e: | |
| logging.error(f"Error in speech synthesis: {e}") | |
| # Play audio | |
| async def _play_audio(self, file_path): | |
| try: | |
| data, fs = sf.read(file_path) | |
| sd.play(data, fs) | |
| sd.wait() # Wait until playback is done | |
| # Clean up file | |
| try: | |
| os.remove(file_path) | |
| except Exception as e: | |
| logging.warning(f"Failed to remove temp audio file {file_path}: {e}") | |
| except Exception as e: | |
| logging.error(f"Error playing audio: {e}") | |
| # Main loop | |
| async def run_main_loop(self): | |
| try: | |
| logging.info("Initializing Gemini session...") | |
| self.gemini_session = await self.model.start_session_async() | |
| # Send system prompt | |
| try: | |
| logging.info("Sending system prompt to Gemini...") | |
| # Create Content object correctly | |
| system_content = Content( | |
| role="user", | |
| parts=[Part(text=MEDICAL_ASSISTANT_SYSTEM_PROMPT)] | |
| ) | |
| # Set the content property before calling send_client_content | |
| self.gemini_session.content = system_content | |
| # Call send_client_content without arguments | |
| await self.gemini_session.send_client_content() | |
| logging.info("System prompt sent successfully.") | |
| except Exception as e: | |
| logging.error(f"Failed to send system prompt: {e}", exc_info=True) | |
| self.is_running = False | |
| return | |
| tasks = [] | |
| try: | |
| logging.info("Creating async tasks for Gemini interaction...") | |
| media_stream_task = asyncio.create_task(self.stream_media_to_gemini(), name="stream_media_to_gemini") | |
| tasks.append(media_stream_task) | |
| text_input_task = asyncio.create_task(self.process_text_input(), name="process_text_input") | |
| tasks.append(text_input_task) | |
| gemini_response_task = asyncio.create_task(self.process_gemini_responses(), name="process_gemini_responses") | |
| tasks.append(gemini_response_task) | |
| if self.use_speech: | |
| tts_task = asyncio.create_task(self.text_to_speech_processor(), name="text_to_speech_processor") | |
| tasks.append(tts_task) | |
| await asyncio.gather(*tasks) | |
| except asyncio.CancelledError: | |
| logging.info("Main loop tasks cancelled") | |
| except Exception as e: | |
| logging.error(f"Exception in main loop tasks: {e}") | |
| finally: | |
| # Cancel tasks | |
| for task in tasks: | |
| if not task.done(): | |
| task.cancel() | |
| try: | |
| await task | |
| except asyncio.CancelledError: | |
| logging.info(f"Task {task.get_name()} cancelled") | |
| except Exception as e: | |
| logging.error(f"Exception in run_main_loop: {e}") | |
| finally: | |
| # Cleanup | |
| logging.info("Cleaning up resources...") | |
| if self.camera is not None and self.use_camera: | |
| self.camera.release() | |
| if self.gemini_session is not None: | |
| await self.gemini_session.close() | |
| # Clean up resources | |
| def cleanup(self): | |
| logging.info("Cleaning up resources...") | |
| if self.camera is not None and self.use_camera: | |
| self.camera.release() | |
| # Main function | |
| def main(): | |
| # Parse command line arguments | |
| parser = argparse.ArgumentParser(description="Medical Assistant using Gemini and Azure Speech") | |
| parser.add_argument("--gemini-api-key", help="Gemini API Key", default=os.environ.get("GEMINI_API_KEY")) | |
| parser.add_argument("--azure-speech-key", help="Azure Speech API Key", default=os.environ.get("AZURE_SPEECH_KEY")) | |
| parser.add_argument("--azure-speech-region", help="Azure Speech Region", default=os.environ.get("AZURE_SPEECH_REGION", "eastus")) | |
| parser.add_argument("--no-camera", help="Disable camera usage", action="store_true") | |
| parser.add_argument("--no-speech", help="Disable speech synthesis", action="store_true") | |
| args = parser.parse_args() | |
| # Check required parameters | |
| if not args.gemini_api_key: | |
| print("Error: Gemini API Key is required. Provide it via --gemini-api-key or GEMINI_API_KEY environment variable.") | |
| return 1 | |
| # REMOVED: Azure Speech Key dependency check | |
| # Now just use whatever is provided or default to disabled speech if key is missing | |
| if not args.azure_speech_key: | |
| args.no_speech = True | |
| logging.warning("No Azure Speech Key provided. Speech synthesis will be disabled.") | |
| try: | |
| # Create interaction loop | |
| interaction_loop = GeminiInteractionLoop( | |
| gemini_api_key=args.gemini_api_key, | |
| azure_speech_key=args.azure_speech_key, | |
| azure_speech_region=args.azure_speech_region, | |
| use_camera=not args.no_camera, | |
| use_speech=not args.no_speech | |
| ) | |
| # Run main loop | |
| asyncio.run(interaction_loop.run_main_loop()) | |
| except KeyboardInterrupt: | |
| logging.info("Keyboard interrupt received. Shutting down...") | |
| except Exception as e: | |
| logging.error(f"Unhandled exception: {e}", exc_info=True) | |
| return 1 | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |