Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| STT GPU Service for HuggingFace Spaces - Pure Gradio Implementation | |
| GPU-accelerated Speech-to-Text microservice eliminating Streamlit iframe barriers | |
| Version: 1.2.6 | |
| """ | |
| import os | |
| import tempfile | |
| import time | |
| import logging | |
| from typing import Optional, Dict, Any | |
| import base64 | |
| # Fix OpenMP threading issue | |
| os.environ['OMP_NUM_THREADS'] = '1' | |
| import torch | |
| import whisper | |
| import gradio as gr | |
| import numpy as np | |
| from pydub import AudioSegment | |
| # Debug support for HuggingFace Dev Mode with Cursor/VSCode | |
| DEBUG_MODE = os.getenv("DEBUG_MODE", "false").lower() == "true" | |
| if DEBUG_MODE: | |
| try: | |
| import debugpy | |
| # Listen on all interfaces, port 5679 (different from voiceCal to avoid conflicts) | |
| debugpy.listen(("0.0.0.0", 5679)) | |
| print("π STT DEBUGPY: Waiting for debugger to attach on port 5679...") | |
| print("π STT DEBUGPY: Connect from Cursor/VSCode using 'Python: Remote Attach'") | |
| print("π STT DEBUGPY: Host: <stt-space-url>, Port: 5679") | |
| # Uncomment the next line if you want to wait for debugger before continuing | |
| # debugpy.wait_for_client() | |
| print("π STT DEBUGPY: Debug server started (not waiting for attach)") | |
| except ImportError: | |
| print("β οΈ STT DEBUGPY: debugpy not installed, skipping debug setup") | |
| except Exception as e: | |
| print(f"β οΈ STT DEBUGPY: Failed to setup debug server: {e}") | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class STTService: | |
| """GPU-accelerated Speech-to-Text service""" | |
| def __init__(self): | |
| self.model = None | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.model_size = os.getenv("WHISPER_MODEL_SIZE", "base") | |
| self.language = os.getenv("DEFAULT_LANGUAGE", "en") | |
| logger.info(f"π€ Initializing STT Service on device: {self.device}") | |
| self.load_model() | |
| def load_model(self): | |
| """Load Whisper model with GPU acceleration""" | |
| try: | |
| logger.info(f"Loading Whisper model: {self.model_size}") | |
| self.model = whisper.load_model(self.model_size, device=self.device) | |
| logger.info(f"β Whisper model loaded successfully on {self.device}") | |
| except Exception as e: | |
| logger.error(f"Failed to load Whisper model: {e}") | |
| raise | |
| def transcribe_audio_file(self, audio_file_path: str, language: str = None) -> str: | |
| """Transcribe audio file - returns formatted string for Gradio""" | |
| try: | |
| if not audio_file_path: | |
| return "β No audio file provided" | |
| with open(audio_file_path, 'rb') as f: | |
| audio_data = f.read() | |
| result = self.transcribe_audio(audio_data, language) | |
| if result["success"]: | |
| return f"β Transcription ({result['processing_time']:.2f}s on {result['device']}): {result['transcription']}" | |
| else: | |
| return f"β Error: {result['error']}" | |
| except Exception as e: | |
| return f"β File processing error: {str(e)}" | |
| def transcribe_audio(self, audio_data: bytes, language: str = None) -> Dict[str, Any]: | |
| """Core transcription method""" | |
| start_time = time.time() | |
| try: | |
| lang = language or self.language | |
| # Create temporary file for audio processing | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.webm') as temp_file: | |
| temp_file.write(audio_data) | |
| temp_path = temp_file.name | |
| try: | |
| # Convert audio using pydub | |
| audio_segment = AudioSegment.from_file(temp_path) | |
| wav_path = temp_path.replace('.webm', '.wav') | |
| audio_segment.export(wav_path, format="wav") | |
| # Transcribe with Whisper | |
| logger.info(f"Transcribing: {len(audio_data)} bytes, language: {lang}") | |
| result = self.model.transcribe( | |
| wav_path, | |
| language=lang, | |
| fp16=torch.cuda.is_available(), | |
| verbose=False | |
| ) | |
| # Clean up | |
| os.unlink(temp_path) | |
| os.unlink(wav_path) | |
| processing_time = time.time() - start_time | |
| transcription = result.get("text", "").strip() | |
| logger.info(f"β Transcribed in {processing_time:.2f}s: '{transcription}'") | |
| return { | |
| "success": True, | |
| "transcription": transcription, | |
| "language": lang, | |
| "processing_time": processing_time, | |
| "device": self.device, | |
| "model_size": self.model_size | |
| } | |
| except Exception as e: | |
| if os.path.exists(temp_path): | |
| os.unlink(temp_path) | |
| if 'wav_path' in locals() and os.path.exists(wav_path): | |
| os.unlink(wav_path) | |
| raise e | |
| except Exception as e: | |
| processing_time = time.time() - start_time | |
| logger.error(f"β Transcription failed: {e}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "processing_time": processing_time, | |
| "device": self.device | |
| } | |
| def transcribe_base64(self, audio_base64: str, language: str = None) -> str: | |
| """Transcribe base64 audio - optimized for WebRTC""" | |
| try: | |
| if not audio_base64: | |
| return "β No audio data provided" | |
| # Log original data for debugging | |
| logger.info(f"π STT BASE64 DEBUG - Original length: {len(audio_base64)}") | |
| logger.info(f"π STT BASE64 DEBUG - Starts with data URL: {audio_base64.startswith('data:audio')}") | |
| logger.info(f"π STT BASE64 DEBUG - First 50 chars: {audio_base64[:50]}") | |
| # Clean base64 data | |
| original_base64 = audio_base64 | |
| if audio_base64.startswith('data:audio'): | |
| parts = audio_base64.split(',') | |
| if len(parts) != 2: | |
| return f"β Invalid data URL format: expected 'data:audio/...;base64,DATA' but got {len(parts)} parts" | |
| audio_base64 = parts[1] | |
| logger.info(f"π STT BASE64 DEBUG - Extracted base64 part: {len(audio_base64)} chars") | |
| # Clean up base64 string - remove whitespace and newlines | |
| clean_base64 = audio_base64.strip().replace('\n', '').replace('\r', '').replace(' ', '') | |
| if len(clean_base64) != len(audio_base64): | |
| logger.info(f"π STT BASE64 DEBUG - Removed whitespace: {len(audio_base64)} -> {len(clean_base64)}") | |
| audio_base64 = clean_base64 | |
| # Validate base64 characters before padding | |
| import string | |
| valid_chars = set(string.ascii_letters + string.digits + '+/=') | |
| invalid_chars = [c for c in audio_base64 if c not in valid_chars] | |
| if invalid_chars: | |
| logger.error(f"π STT BASE64 DEBUG - Invalid characters found: {set(invalid_chars)}") | |
| logger.error(f"π STT BASE64 DEBUG - Sample invalid chars at positions: {[(i, c) for i, c in enumerate(audio_base64) if c not in valid_chars][:10]}") | |
| return f"β Invalid base64 characters found: {set(invalid_chars)} - check audio encoding" | |
| # Add padding if needed (base64 must be multiple of 4 characters) | |
| padding_needed = 4 - (len(audio_base64) % 4) | |
| if padding_needed != 4: | |
| audio_base64 += '=' * padding_needed | |
| logger.info(f"π§ Added {padding_needed} padding characters to base64 data") | |
| # Final validation before decode | |
| logger.info(f"π STT BASE64 DEBUG - Final base64 length: {len(audio_base64)}") | |
| logger.info(f"π STT BASE64 DEBUG - Final last 20 chars: {audio_base64[-20:]}") | |
| # Validate base64 data | |
| try: | |
| audio_data = base64.b64decode(audio_base64, validate=True) | |
| logger.info(f"π STT BASE64 DEBUG - Decode successful: {len(audio_data)} bytes") | |
| except Exception as e: | |
| logger.error(f"π STT BASE64 DEBUG - Decode failed: {str(e)}") | |
| logger.error(f"π STT BASE64 DEBUG - Original data (first 200): {original_base64[:200]}") | |
| logger.error(f"π STT BASE64 DEBUG - Final data (first 200): {audio_base64[:200]}") | |
| return f"β Invalid base64 data: {str(e)} - check voiceCal audio conversion" | |
| # Check if data looks like valid audio (but allow small test data for demo) | |
| if len(audio_data) < 50: | |
| return "β Audio data too small" | |
| # Handle test/demo data gracefully | |
| if len(audio_data) < 1000: | |
| return f"β Demo transcription: 'Test audio data received ({len(audio_data)} bytes)'" | |
| result = self.transcribe_audio(audio_data, language) | |
| if result["success"]: | |
| return f"β {result['transcription']}" | |
| else: | |
| return f"β Audio processing error: {result['error']}" | |
| except Exception as e: | |
| return f"β Base64 processing error: {str(e)}" | |
| # Initialize service | |
| stt_service = STTService() | |
| # Gradio Interface Functions | |
| def gradio_transcribe_file(audio_file, language="en"): | |
| """File upload transcription""" | |
| result = stt_service.transcribe_audio_file(audio_file, language) | |
| # Extract timing and status information from result | |
| if result.startswith("β ") and "(" in result and "s on " in result: | |
| # Parse timing from result like "β Transcription (2.34s on cuda): text" | |
| parts = result.split("(", 1) | |
| if len(parts) > 1: | |
| timing_part = parts[1].split(")", 1)[0] # Extract "2.34s on cuda" | |
| transcription = parts[1].split("): ", 1)[1] if "): " in parts[1] else result | |
| timing_info = f"Processing time: {timing_part}" | |
| status = "β Success" | |
| else: | |
| transcription = result | |
| timing_info = "No timing data" | |
| status = "β Complete" | |
| else: | |
| transcription = result | |
| timing_info = "No timing data available" | |
| status = "β Error" if result.startswith("β") else "β³ Processing" | |
| return transcription, timing_info, status | |
| def gradio_transcribe_memory(audio_base64, language="en", model_size="base"): | |
| """Memory transcription for WebRTC compatibility - Returns single string for voiceCal.ai compatibility""" | |
| # Switch model if needed | |
| if model_size != stt_service.model_size: | |
| try: | |
| stt_service.model_size = model_size | |
| stt_service.load_model() | |
| except Exception as e: | |
| return f"β Model switch failed: {str(e)}" | |
| result = stt_service.transcribe_base64(audio_base64, language) | |
| # Return just the transcription result as a string for voiceCal.ai compatibility | |
| # The result from transcribe_base64 is already formatted properly | |
| return result | |
| def get_system_status(): | |
| """System information""" | |
| gpu_info = "β GPU Available" if torch.cuda.is_available() else "β CPU Only" | |
| if torch.cuda.is_available(): | |
| gpu_name = torch.cuda.get_device_name(0) | |
| gpu_memory = torch.cuda.get_device_properties(0).total_memory / (1024**3) | |
| gpu_info += f" ({gpu_name}, {gpu_memory:.1f}GB)" | |
| return f""" | |
| ### π€ STT GPU Service Status | |
| - **Device**: {stt_service.device.upper()} | |
| - **Model**: Whisper {stt_service.model_size} | |
| - **GPU**: {gpu_info} | |
| - **Status**: β Ready for WebRTC integration | |
| - **Purpose**: Eliminate Streamlit iframe communication barriers | |
| """ | |
| # Create Gradio Interface | |
| with gr.Blocks( | |
| title="STT GPU Service - WebRTC Speech-to-Text", | |
| theme=gr.themes.Base(), | |
| css=""" | |
| .gradio-container {max-width: 1200px !important} | |
| .gr-button-primary {background: linear-gradient(45deg, #FF6B6B, #4ECDC4) !important} | |
| """ | |
| ) as demo: | |
| gr.Markdown(""" | |
| # π€ STT GPU Service - WebRTC Speech-to-Text | |
| **Pure microservice eliminating Streamlit iframe barriers for VoiceCalendar integration** | |
| This service provides GPU-accelerated speech-to-text transcription with direct API endpoints, | |
| removing the complex iframe communication issues from the previous Streamlit approach. | |
| """) | |
| # System status | |
| status_md = gr.Markdown(get_system_status()) | |
| with gr.Tab("π΅ File Upload Transcription"): | |
| gr.Markdown("### Upload and transcribe audio files") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| audio_input = gr.Audio( | |
| label="Audio File", | |
| type="filepath", | |
| format="wav" | |
| ) | |
| with gr.Column(scale=1): | |
| language_dropdown = gr.Dropdown( | |
| choices=["en", "es", "fr", "de", "it", "pt", "ru", "ja", "ko", "zh", "auto"], | |
| value="en", | |
| label="Language", | |
| info="Select target language or 'auto' for detection" | |
| ) | |
| transcribe_file_btn = gr.Button("π€ Transcribe File", variant="primary", size="lg") | |
| file_result = gr.Textbox( | |
| label="Transcription Result", | |
| lines=4, | |
| placeholder="Transcription will appear here..." | |
| ) | |
| file_timing = gr.Textbox( | |
| label="Processing Time", | |
| lines=1, | |
| placeholder="Processing timing will appear here..." | |
| ) | |
| file_status = gr.Textbox( | |
| label="Status", | |
| lines=1, | |
| placeholder="Status will appear here..." | |
| ) | |
| transcribe_file_btn.click( | |
| fn=gradio_transcribe_file, | |
| inputs=[audio_input, language_dropdown], | |
| outputs=[file_result, file_timing, file_status] | |
| ) | |
| with gr.Tab("π WebRTC Memory Transcription"): | |
| gr.Markdown(""" | |
| ### In-Memory Audio Processing (WebRTC Compatible) | |
| This interface simulates the WebRTC audio processing pipeline that VoiceCalendar will use. | |
| Paste base64 encoded audio data to test the transcription service. | |
| """) | |
| with gr.Row(): | |
| audio_base64_input = gr.Textbox( | |
| label="Base64 Audio Data", | |
| placeholder="Paste base64 encoded WebM/Opus audio data here...\nExample: data:audio/webm;codecs=opus;base64,GkXf...", | |
| lines=5, | |
| max_lines=10 | |
| ) | |
| with gr.Column(): | |
| memory_language = gr.Dropdown( | |
| choices=["en", "es", "fr", "de", "it", "pt", "ru", "ja", "ko", "zh"], | |
| value="en", | |
| label="Language" | |
| ) | |
| model_selector = gr.Dropdown( | |
| choices=["tiny", "base", "small", "medium", "large"], | |
| value="base", | |
| label="Whisper Model", | |
| info="Larger models = better accuracy but slower" | |
| ) | |
| transcribe_memory_btn = gr.Button("π Process WebRTC Audio", variant="primary", size="lg") | |
| memory_result = gr.Textbox( | |
| label="WebRTC Transcription Result", | |
| lines=4, | |
| placeholder="WebRTC transcription result will appear here..." | |
| ) | |
| memory_timing = gr.Textbox( | |
| label="Processing Time", | |
| lines=1, | |
| placeholder="Processing timing will appear here..." | |
| ) | |
| memory_status = gr.Textbox( | |
| label="Status", | |
| lines=1, | |
| placeholder="Status will appear here..." | |
| ) | |
| transcribe_memory_btn.click( | |
| fn=gradio_transcribe_memory, | |
| inputs=[audio_base64_input, memory_language, model_selector], | |
| outputs=[memory_result] | |
| ) | |
| # Example data for testing | |
| gr.Markdown(""" | |
| **Test with sample base64 data:** *(This would be actual WebM audio in production)* | |
| ``` | |
| data:audio/webm;codecs=opus;base64,GkXfo0OBA... | |
| ``` | |
| """) | |
| with gr.Tab("π API Integration"): | |
| gr.Markdown(""" | |
| ## VoiceCalendar Integration Guide | |
| This STT service eliminates the iframe communication barriers by providing direct HTTP endpoints. | |
| ### Key Advantages: | |
| β **No iframe/postMessage complexity** | |
| β **Direct WebRTC β STT data flow** | |
| β **GPU-accelerated processing** | |
| β **Scalable microservice architecture** | |
| β **Native unmute.sh methodology support** | |
| ### API Endpoints: | |
| **Health Check:** | |
| ```bash | |
| GET /api/health | |
| # Returns service status and GPU info | |
| ``` | |
| **Transcribe Audio:** | |
| ```bash | |
| POST /api/transcribe | |
| Content-Type: application/json | |
| { | |
| "audio_base64": "base64_encoded_webm_audio", | |
| "language": "en", | |
| "model_size": "base" | |
| } | |
| ``` | |
| ### JavaScript WebRTC Integration: | |
| ```javascript | |
| // Direct STT API call - no iframe complexity! | |
| async function transcribeWebRTCAudio(audioBlob) { | |
| const arrayBuffer = await audioBlob.arrayBuffer(); | |
| const audioArray = new Uint8Array(arrayBuffer); | |
| const audioBase64 = btoa(String.fromCharCode(...audioArray)); | |
| const response = await fetch('/api/transcribe', { | |
| method: 'POST', | |
| headers: { 'Content-Type': 'application/json' }, | |
| body: JSON.stringify({ | |
| audio_base64: audioBase64, | |
| language: 'en', | |
| model_size: 'base' | |
| }) | |
| }); | |
| const result = await response.json(); | |
| return result.transcription; | |
| } | |
| ``` | |
| ### Python Integration: | |
| ```python | |
| import requests | |
| import base64 | |
| def transcribe_audio_chunk(audio_data, language='en'): | |
| audio_base64 = base64.b64encode(audio_data).decode('utf-8') | |
| response = requests.post('/api/transcribe', json={ | |
| 'audio_base64': audio_base64, | |
| 'language': language | |
| }) | |
| return response.json()['transcription'] | |
| ``` | |
| **This approach completely eliminates the Streamlit iframe communication issues!** | |
| """) | |
| # Refresh status button | |
| refresh_btn = gr.Button("π Refresh System Status", variant="secondary") | |
| refresh_btn.click(fn=lambda: get_system_status(), outputs=status_md) | |
| # Launch interface | |
| if __name__ == "__main__": | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| debug=False, | |
| show_error=True | |
| ) |