Spaces:
Runtime error
Runtime error
| import os | |
| import asyncio | |
| import time | |
| import numpy as np | |
| import cv2 | |
| import gradio as gr | |
| from fastrtc import Stream, AsyncAudioVideoStreamHandler, get_cloudflare_turn_credentials_async, ReplyOnPause | |
| from google import genai | |
| from google.genai import types | |
| # Environment variable for API key | |
| API_KEY = os.getenv("GEMINI_API_KEY", "") | |
| class EnhancedScreenAssistantHandler(AsyncAudioVideoStreamHandler): | |
| """Enhanced real-time screen assistant with voice activity detection""" | |
| def __init__(self): | |
| super().__init__(input_audio_type="mono", output_sample_rate=24000, input_sample_rate=16000) | |
| self.session = None | |
| self.last_frame_time = 0 | |
| self.audio_queue = asyncio.Queue() | |
| self.text_queue = asyncio.Queue() | |
| self.connected = False | |
| self.frame_interval = 1.0 # Send one frame per second | |
| self.conversation_history = [] | |
| async def start_up(self): | |
| """Initialize Google GenAI Live session with enhanced configuration""" | |
| try: | |
| if not API_KEY: | |
| print("β No GEMINI_API_KEY found in environment") | |
| return | |
| # Initialize Google GenAI client with alpha API access | |
| client = genai.Client(api_key=API_KEY, http_options={"api_version": "v1alpha"}) | |
| # Enhanced configuration for live session | |
| config = { | |
| "response_modalities": ["AUDIO", "TEXT"], | |
| "input_audio_transcription": {"model": "latest"}, | |
| "output_audio_transcription": {"model": "latest"}, | |
| "system_instruction": ( | |
| "You are an expert real-time screen assistant. You can see the user's screen " | |
| "and hear their voice. Provide clear, actionable guidance based on what you observe. " | |
| "Be proactive - if you see the user struggling or notice something important, " | |
| "offer helpful suggestions even without being asked. Keep responses concise but thorough. " | |
| "When giving instructions, be specific about what to click, where to look, " | |
| "and what to expect next." | |
| ), | |
| "generation_config": {"response_mime_type": "text/plain", "temperature": 0.7, "max_output_tokens": 512}, | |
| } | |
| # Connect to Live API | |
| self.session = await client.aio.live.connect(model="gemini-2.0-flash-live-preview", config=config) | |
| self.connected = True | |
| print("β Connected to Google GenAI Live API with enhanced configuration") | |
| # Start background tasks with proper management | |
| self.background_tasks = set() | |
| response_task = asyncio.create_task(self._handle_responses()) | |
| context_task = asyncio.create_task(self._periodic_context_update()) | |
| self.background_tasks.add(response_task) | |
| self.background_tasks.add(context_task) | |
| response_task.add_done_callback(self.background_tasks.discard) | |
| context_task.add_done_callback(self.background_tasks.discard) | |
| except Exception as e: | |
| print(f"β Failed to connect to GenAI: {e}") | |
| self.connected = False | |
| async def _handle_responses(self): | |
| """Handle incoming responses from AI with enhanced processing""" | |
| try: | |
| current_text = "" | |
| async for msg in self.session.receive(): | |
| if msg.data: # Audio response from AI | |
| # Convert raw PCM bytes to numpy array for FastRTC | |
| audio_array = np.frombuffer(msg.data, dtype=np.int16) | |
| if len(audio_array) > 0: | |
| audio_array = audio_array.reshape(1, -1) # Shape: (1, N) | |
| await self.audio_queue.put(audio_array) | |
| if msg.text: # Text response from AI | |
| current_text += msg.text | |
| print(f"π€ AI: {msg.text}") | |
| # Add to conversation history when response is complete | |
| if msg.text.endswith((".", "!", "?", "\n")): | |
| self.conversation_history.append({"role": "assistant", "content": current_text.strip(), "timestamp": time.time()}) | |
| current_text = "" | |
| # Keep conversation history manageable | |
| if len(self.conversation_history) > 20: | |
| self.conversation_history = self.conversation_history[-15:] | |
| await self.text_queue.put(msg.text) | |
| except Exception as e: | |
| print(f"β Error handling AI responses: {e}") | |
| async def _periodic_context_update(self): | |
| """Periodically send context updates to maintain session state""" | |
| while self.connected: | |
| await asyncio.sleep(30) # Update every 30 seconds | |
| if self.session and len(self.conversation_history) > 0: | |
| try: | |
| # Send a subtle context maintenance message | |
| context_msg = "Continue monitoring and providing assistance as needed." | |
| await self.session.send_realtime_input(text=context_msg) | |
| except Exception as e: | |
| print(f"β οΈ Context update failed: {e}") | |
| async def receive(self, frame: tuple[int, np.ndarray]): | |
| """Handle incoming audio with voice activity detection""" | |
| if not self.connected or not self.session: | |
| return | |
| try: | |
| _, audio_np = frame | |
| # Basic voice activity detection | |
| audio_level = np.abs(audio_np).mean() | |
| if audio_level > 0.01: # Threshold for voice activity | |
| audio_bytes = audio_np.tobytes() | |
| # Send audio to Google GenAI Live API | |
| await self.session.send_realtime_input(media=types.Blob(data=audio_bytes, mime_type="audio/pcm;rate=16000")) | |
| except Exception as e: | |
| print(f"β Error processing audio: {e}") | |
| async def video_receive(self, frame: np.ndarray): | |
| """Handle incoming video frames with intelligent frame selection""" | |
| if not self.connected or not self.session: | |
| return | |
| try: | |
| current_time = time.time() | |
| # Adaptive frame rate based on activity | |
| # Send frames more frequently if there's likely activity | |
| frame_diff_threshold = 0.1 | |
| if hasattr(self, "last_frame"): | |
| frame_diff = np.abs(frame.astype(float) - self.last_frame.astype(float)).mean() | |
| if frame_diff > frame_diff_threshold: | |
| # More activity detected, reduce interval | |
| effective_interval = self.frame_interval * 0.5 | |
| else: | |
| effective_interval = self.frame_interval | |
| else: | |
| effective_interval = self.frame_interval | |
| if current_time - self.last_frame_time < effective_interval: | |
| return | |
| self.last_frame_time = current_time | |
| self.last_frame = frame.copy() | |
| # Resize frame for efficiency while maintaining quality | |
| height, width = frame.shape[:2] | |
| if width > 1280: | |
| scale = 1280 / width | |
| new_width = 1280 | |
| new_height = int(height * scale) | |
| frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_AREA) | |
| # Encode frame as JPEG with optimized quality | |
| success, jpg_bytes = cv2.imencode( | |
| ".jpg", | |
| frame, | |
| [cv2.IMWRITE_JPEG_QUALITY, 75], # Balanced quality/size | |
| ) | |
| if not success: | |
| return | |
| # Send frame to Google GenAI | |
| await self.session.send_realtime_input(media=types.Blob(data=jpg_bytes.tobytes(), mime_type="image/jpeg")) | |
| print(f"πΈ Sent frame ({frame.shape[1]}x{frame.shape[0]}, {len(jpg_bytes)} bytes)") | |
| except Exception as e: | |
| print(f"β Error processing video frame: {e}") | |
| async def emit(self): | |
| """Provide audio output back to user with queue management""" | |
| try: | |
| audio_chunk = self.audio_queue.get_nowait() | |
| return (24000, audio_chunk) | |
| except asyncio.QueueEmpty: | |
| return None | |
| async def get_latest_text(self): | |
| """Get latest text response for UI updates""" | |
| try: | |
| text = self.text_queue.get_nowait() | |
| return text | |
| except asyncio.QueueEmpty: | |
| return None | |
| async def shutdown(self): | |
| """Enhanced cleanup with proper resource management""" | |
| self.connected = False | |
| if self.session: | |
| try: | |
| # Send goodbye message | |
| await self.session.send_realtime_input(text="Session ending. Thank you!") | |
| await asyncio.sleep(0.5) # Brief delay for message to send | |
| await self.session.close() | |
| print("π΄ Cleanly disconnected from GenAI Live API") | |
| except Exception as e: | |
| print(f"β οΈ Error during shutdown: {e}") | |
| # Cancel all background tasks properly | |
| if hasattr(self, "background_tasks"): | |
| for task in self.background_tasks.copy(): | |
| if not task.done(): | |
| task.cancel() | |
| # Wait for all tasks to complete or be cancelled | |
| if self.background_tasks: | |
| await asyncio.gather(*self.background_tasks, return_exceptions=True) | |
| self.background_tasks.clear() | |
| # Clear queues | |
| while not self.audio_queue.empty(): | |
| try: | |
| self.audio_queue.get_nowait() | |
| except asyncio.QueueEmpty: | |
| break | |
| while not self.text_queue.empty(): | |
| try: | |
| self.text_queue.get_nowait() | |
| except asyncio.QueueEmpty: | |
| break | |
| self.session = None | |
| self.conversation_history = [] | |
| # Global state management | |
| app_state = {"stream": None, "handler": None, "connected": False, "screen_sharing": False} | |
| def initialize_stream(): | |
| """Initialize the FastRTC stream with enhanced configuration""" | |
| try: | |
| # Create enhanced handler | |
| handler = EnhancedScreenAssistantHandler() | |
| app_state["handler"] = handler | |
| # Create stream with optimized settings for HF Spaces | |
| stream = Stream( | |
| handler=ReplyOnPause(handler), # Add voice activity detection | |
| modality="audio-video", | |
| mode="send-receive", | |
| rtc_configuration=get_cloudflare_turn_credentials_async, | |
| time_limit=600, # 10 minute session limit | |
| ui_args={ | |
| "audio_controls": True, | |
| "video_controls": True, | |
| }, | |
| ) | |
| app_state["stream"] = stream | |
| return stream | |
| except Exception as e: | |
| print(f"β Error initializing stream: {e}") | |
| return None | |
| def handle_connect(): | |
| """Enhanced connection handler""" | |
| if not API_KEY: | |
| return "β Please set GEMINI_API_KEY environment variable" | |
| if app_state["connected"]: | |
| return "β Already connected - session is active" | |
| if app_state["handler"]: | |
| app_state["connected"] = True | |
| return "β Connecting to AI... Please allow microphone and camera permissions" | |
| return "β Stream not initialized - please refresh the page" | |
| def handle_screen_share(): | |
| """Handle screen sharing toggle""" | |
| app_state["screen_sharing"] = not app_state["screen_sharing"] | |
| if app_state["screen_sharing"]: | |
| return "π₯οΈ Screen sharing started - AI can now see your screen" | |
| else: | |
| return "π± Switched back to camera view" | |
| async def handle_disconnect_async(): | |
| """Async enhanced disconnection handler""" | |
| if app_state["handler"] and app_state["connected"]: | |
| try: | |
| await app_state["handler"].shutdown() | |
| app_state["connected"] = False | |
| app_state["screen_sharing"] = False | |
| app_state["handler"] = None | |
| return "π΄ Disconnected from AI assistant" | |
| except Exception as e: | |
| return f"β οΈ Disconnect error: {e}" | |
| return "Already disconnected" | |
| def handle_disconnect(): | |
| """Sync wrapper for enhanced disconnection handler""" | |
| # Create task and store reference for proper cleanup | |
| if not hasattr(app_state, "disconnect_task") or app_state.get("disconnect_task", {}).done(): | |
| import asyncio | |
| app_state["disconnect_task"] = asyncio.create_task(handle_disconnect_async()) | |
| app_state["connected"] = False # Immediately mark as disconnected | |
| app_state["screen_sharing"] = False | |
| return "π Disconnecting... Please wait..." | |
| # Enhanced JavaScript for screen sharing | |
| enhanced_screen_share_js = """ | |
| async function toggleScreenShare() { | |
| try { | |
| const videoElements = document.querySelectorAll('video'); | |
| const webrtcVideo = Array.from(videoElements).find(video => | |
| video.srcObject && video.srcObject.getVideoTracks().length > 0 | |
| ); | |
| if (!webrtcVideo) { | |
| return "β Could not find video element"; | |
| } | |
| const currentTrack = webrtcVideo.srcObject.getVideoTracks()[0]; | |
| const isScreenShare = currentTrack && currentTrack.label.includes('screen'); | |
| if (isScreenShare) { | |
| // Switch back to camera | |
| const cameraStream = await navigator.mediaDevices.getUserMedia({ | |
| video: { width: 640, height: 480 }, | |
| audio: false | |
| }); | |
| const videoTrack = cameraStream.getVideoTracks()[0]; | |
| webrtcVideo.srcObject.removeTrack(currentTrack); | |
| webrtcVideo.srcObject.addTrack(videoTrack); | |
| currentTrack.stop(); | |
| return "π± Switched to camera view"; | |
| } else { | |
| // Switch to screen share | |
| const screenStream = await navigator.mediaDevices.getDisplayMedia({ | |
| video: { | |
| mediaSource: 'screen', | |
| width: { ideal: 1280, max: 1920 }, | |
| height: { ideal: 720, max: 1080 }, | |
| frameRate: { ideal: 2, max: 5 } // Low frame rate for efficiency | |
| }, | |
| audio: false | |
| }); | |
| const videoTrack = screenStream.getVideoTracks()[0]; | |
| webrtcVideo.srcObject.removeTrack(currentTrack); | |
| webrtcVideo.srcObject.addTrack(videoTrack); | |
| // Handle when screen sharing ends | |
| videoTrack.onended = () => { | |
| console.log('Screen sharing ended by user'); | |
| // Automatically switch back to camera | |
| navigator.mediaDevices.getUserMedia({video: true, audio: false}) | |
| .then(cameraStream => { | |
| const cameraTrack = cameraStream.getVideoTracks()[0]; | |
| webrtcVideo.srcObject.addTrack(cameraTrack); | |
| }); | |
| }; | |
| currentTrack.stop(); | |
| return "π₯οΈ Screen sharing active"; | |
| } | |
| } catch (error) { | |
| console.error('Screen sharing error:', error); | |
| if (error.name === 'NotAllowedError') { | |
| return "β Screen sharing permission denied"; | |
| } else if (error.name === 'NotFoundError') { | |
| return "β No screen available to share"; | |
| } else { | |
| return `β Error: ${error.message}`; | |
| } | |
| } | |
| } | |
| return toggleScreenShare(); | |
| """ | |
| def create_main_interface(): | |
| """Create the enhanced main interface""" | |
| # Initialize stream | |
| stream = initialize_stream() | |
| with gr.Blocks( | |
| title="Enhanced Real-Time Screen Assistant", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .status-connected { background: linear-gradient(90deg, #4CAF50, #45a049); color: white; } | |
| .status-disconnected { background: linear-gradient(90deg, #f44336, #da190b); color: white; } | |
| .status-warning { background: linear-gradient(90deg, #ff9800, #f57c00); color: white; } | |
| .control-row { margin: 10px 0; } | |
| .stream-container { border: 2px solid #ddd; border-radius: 10px; padding: 20px; margin: 20px 0; } | |
| """, | |
| ) as demo: | |
| gr.Markdown("# π₯οΈ Enhanced Real-Time Screen Assistant") | |
| gr.Markdown(""" | |
| **Advanced AI assistant with live screen sharing, voice interaction, and real-time guidance** | |
| Powered by Google's Gemini Live API and FastRTC for ultra-low latency communication. | |
| """) | |
| # Status display | |
| status_display = gr.Textbox( | |
| label="π Status", | |
| value="Ready to connect - Click Connect to start your AI session", | |
| interactive=False, | |
| elem_classes=["status-disconnected"], | |
| ) | |
| # Control buttons | |
| with gr.Row(elem_classes=["control-row"]): | |
| connect_btn = gr.Button("π Connect to AI", variant="primary", size="lg") | |
| screen_btn = gr.Button("π₯οΈ Toggle Screen Share", variant="secondary", size="lg") | |
| disconnect_btn = gr.Button("π΄ Disconnect", variant="stop", size="lg") | |
| # Stream container | |
| if stream and stream.ui: | |
| with gr.Group(elem_classes=["stream-container"]): | |
| gr.Markdown("### π‘ Live Stream") | |
| stream_interface = stream.ui | |
| else: | |
| stream_interface = gr.HTML("<div>β οΈ Stream initialization failed - check console for errors</div>") | |
| # Usage instructions | |
| with gr.Accordion("π How to Use This Assistant", open=True): | |
| gr.Markdown(""" | |
| **Getting Started:** | |
| 1. **Connect**: Click "Connect to AI" to establish the AI session | |
| 2. **Permissions**: Allow microphone and camera access in your browser | |
| 3. **Screen Share**: Click "Toggle Screen Share" to let the AI see your screen | |
| 4. **Interact**: Simply speak naturally - the AI will respond with voice and can see your screen | |
| **What the AI can help with:** | |
| - π₯οΈ **Software tutorials**: "Show me how to use this feature" | |
| - π§ **Troubleshooting**: "Why isn't this working?" | |
| - π **Data analysis**: "Help me understand this chart" | |
| - π¨ **Design feedback**: "How can I improve this layout?" | |
| - π **Writing assistance**: "Help me edit this document" | |
| - π **Web navigation**: "Guide me through this website" | |
| **Voice Commands:** | |
| - "What am I looking at?" | |
| - "What should I do next?" | |
| - "Explain this to me" | |
| - "Help me fix this error" | |
| - "Is this the right approach?" | |
| """) | |
| # Advanced features | |
| with gr.Accordion("βοΈ Advanced Features", open=False): | |
| gr.Markdown(""" | |
| **Technical Capabilities:** | |
| - ποΈ **Voice Activity Detection**: AI responds when you finish speaking | |
| - πΈ **Intelligent Frame Sampling**: Optimized screen capture (1-2 FPS) | |
| - π§ **Context Awareness**: AI remembers your conversation history | |
| - π **Adaptive Quality**: Automatically adjusts based on connection | |
| - β‘ **Ultra-Low Latency**: Typical response time under 500ms | |
| **Privacy & Security:** | |
| - π All data encrypted in transit (WebRTC + TLS) | |
| - π Processing by Google's secure AI infrastructure | |
| - π« No permanent storage of your screen or voice data | |
| - π€ Each session is completely isolated and private | |
| **Optimization for Hugging Face Spaces:** | |
| - βοΈ Cloudflare TURN servers for reliable connectivity | |
| - π§ Automatic resource management and cleanup | |
| - β±οΈ Session timeout prot""") | |
| # Wire up the interface | |
| connect_btn.click(fn=handle_connect, outputs=[status_display]) | |
| screen_btn.click(fn=handle_screen_share, outputs=[status_display], _js=enhanced_screen_share_js) | |
| disconnect_btn.click(fn=handle_disconnect, outputs=[status_display]) | |
| return demo | |
| # Main execution | |
| if __name__ == "__main__": | |
| print("π₯οΈ Enhanced Real-Time Screen Assistant") | |
| print("=" * 55) | |
| if not API_KEY: | |
| print("β οΈ CRITICAL: No GEMINI_API_KEY environment variable found!") | |
| print("Please set your Google AI API key:") | |
| print("export GEMINI_API_KEY='your-api-key-here'") | |
| print("\nGet your API key at: https://makersuite.google.com/app/apikey") | |
| else: | |
| print(f"β API key configured (length: {len(API_KEY)})") | |
| print("\nπ§ Initializing enhanced components...") | |
| print("- FastRTC with voice activity detection") | |
| print("- Google GenAI Live API integration") | |
| print("- Cloudflare TURN server configuration") | |
| print("- Enhanced screen sharing capabilities") | |
| try: | |
| demo = create_main_interface() | |
| print("\nπ Launching enhanced interface...") | |
| demo.launch(server_name="0.0.0.0", server_port=7860, share=False, show_error=True, enable_queue=True) | |
| except Exception as e: | |
| print(f"\nβ Failed to launch: {e}") | |
| print("Check that all dependencies are installed:") | |
| print("pip install -r requirements.txt") |