Spaces:
Sleeping
Sleeping
| """ | |
| Hugging Face Spaces Demo - Voice Assistant API | |
| Multi-language voice assistant with division matching and contact search | |
| """ | |
| import gradio as gr | |
| import logging | |
| from typing import Optional, Tuple | |
| import numpy as np | |
| # Import existing services | |
| from embedding_service import EmbeddingService | |
| from name_extraction_service import NameExtractor | |
| from voice_processing_service import VoiceProcessor | |
| from contact_search_service import ContactSearchService | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Global services (initialized once) | |
| embedding_service: Optional[EmbeddingService] = None | |
| name_extractor: Optional[NameExtractor] = None | |
| voice_processor: Optional[VoiceProcessor] = None | |
| contact_search_service: Optional[ContactSearchService] = None | |
| def initialize_services(): | |
| """Initialize all AI services (called once on startup)""" | |
| global embedding_service, name_extractor, voice_processor, contact_search_service | |
| logger.info("π Initializing services...") | |
| # Initialize embedding service (fast & lightweight) | |
| logger.info("Loading embedding model...") | |
| embedding_service = EmbeddingService(model_name="all-MiniLM-L6-v2") | |
| logger.info("β Embedding service ready!") | |
| # Initialize name extractor | |
| logger.info("Loading name extraction model...") | |
| name_extractor = NameExtractor(model_name="urchade/gliner_small-v2.1") | |
| logger.info("β Name extractor ready!") | |
| # Initialize voice processor (using small model for better accuracy) | |
| logger.info("Loading Whisper model...") | |
| voice_processor = VoiceProcessor(model_size="small") # Using small for better accuracy | |
| logger.info("β Voice processor ready!") | |
| # Initialize contact search | |
| logger.info("Loading contact database...") | |
| contact_search_service = ContactSearchService(name_extractor, embedding_service) | |
| stats = contact_search_service.get_contact_stats() | |
| logger.info(f"β Loaded {stats['total_contacts']} contacts across {stats['divisions']} divisions") | |
| return stats | |
| def format_division_matches(matches, names): | |
| """Format division matching results for display""" | |
| if not matches: | |
| return "No matches found." | |
| output = [] | |
| if names: | |
| output.append(f"**Extracted Names:** {', '.join(names)}\n") | |
| output.append("### π― Division Matches:\n") | |
| for i, match in enumerate(matches[:3], 1): | |
| confidence_pct = match.confidence * 100 | |
| confidence_bar = "π’" * int(confidence_pct / 20) + "βͺ" * (5 - int(confidence_pct / 20)) | |
| output.append(f"**{i}. {match.division}**") | |
| output.append(f" - Confidence: {confidence_pct:.1f}% {confidence_bar}") | |
| if match.department: | |
| output.append(f" - Department: {match.department}") | |
| output.append("") | |
| return "\n".join(output) | |
| def format_contact_results(contacts, extracted_names, matched_divisions): | |
| """Format contact search results for display""" | |
| if not contacts: | |
| return "No contacts found." | |
| output = [] | |
| if extracted_names: | |
| output.append(f"**Extracted Names:** {', '.join(extracted_names)}\n") | |
| if matched_divisions: | |
| output.append(f"**Matched Divisions:** {', '.join(matched_divisions[:3])}\n") | |
| output.append(f"### π₯ Found {len(contacts)} Contact(s):\n") | |
| for i, contact in enumerate(contacts[:10], 1): | |
| confidence_pct = contact['confidence'] * 100 | |
| confidence_bar = "π’" * int(confidence_pct / 20) + "βͺ" * (5 - int(confidence_pct / 20)) | |
| # Use full_name_en (English name) for display | |
| output.append(f"**{i}. {contact['full_name_en']}** ({contact['full_name_ar']})") | |
| output.append(f" - Title: {contact['title_en']}") | |
| output.append(f" - Division: {contact['division']}") | |
| output.append(f" - Department: {contact['department']}") | |
| output.append(f" - Phone: {contact['phone']}") | |
| output.append(f" - Email: {contact['email']}") | |
| output.append(f" - Confidence: {confidence_pct:.1f}% {confidence_bar}") | |
| output.append(f" - Match Reason: {contact['match_reason']}") | |
| output.append("") | |
| return "\n".join(output) | |
| def search_divisions_text(query: str) -> str: | |
| """Search for divisions based on text query""" | |
| if not query or not query.strip(): | |
| return "Please enter a query." | |
| try: | |
| # Extract names | |
| names = name_extractor.extract_names(query) | |
| # Find matching divisions | |
| matches = embedding_service.find_division(query, top_k=3) | |
| return format_division_matches(matches, names) | |
| except Exception as e: | |
| logger.error(f"Error in division search: {e}") | |
| return f"Error: {str(e)}" | |
| def search_divisions_voice(audio: Optional[Tuple[int, np.ndarray]]) -> str: | |
| """Search for divisions based on voice query""" | |
| if audio is None: | |
| return "Please record audio first." | |
| try: | |
| # Save audio to temporary file | |
| sample_rate, audio_data = audio | |
| temp_path = voice_processor.save_audio_array(audio_data, sample_rate) | |
| # Process voice query | |
| voice_result = voice_processor.process_voice_query(temp_path) | |
| query = voice_result['query'] | |
| # Extract names | |
| names = name_extractor.extract_names(query) | |
| # Find matching divisions | |
| matches = embedding_service.find_division(query, top_k=3) | |
| # Format output | |
| output = [] | |
| output.append(f"**π€ Transcribed Text:** {query}") | |
| output.append(f"**π Language:** {voice_result['language_name']}") | |
| if voice_result['was_translated']: | |
| output.append(f"**π Original:** {voice_result['original_text']}") | |
| output.append("") | |
| output.append(format_division_matches(matches, names)) | |
| # Cleanup | |
| voice_processor.cleanup_temp_file(temp_path) | |
| return "\n".join(output) | |
| except Exception as e: | |
| logger.error(f"Error in voice division search: {e}") | |
| return f"Error: {str(e)}" | |
| def search_contacts_text(query: str) -> str: | |
| """Search for contacts based on text query""" | |
| if not query or not query.strip(): | |
| return "Please enter a query." | |
| try: | |
| # Search contacts | |
| contacts = contact_search_service.search_contacts(query, top_k=10, min_confidence=0.3) | |
| # Extract names and divisions | |
| names = name_extractor.extract_names(query) | |
| division_matches = embedding_service.find_division(query, top_k=3) | |
| matched_divisions = [m.division for m in division_matches] | |
| return format_contact_results(contacts, names, matched_divisions) | |
| except Exception as e: | |
| logger.error(f"Error in contact search: {e}") | |
| return f"Error: {str(e)}" | |
| def search_contacts_voice(audio: Optional[Tuple[int, np.ndarray]]) -> str: | |
| """Search for contacts based on voice query""" | |
| if audio is None: | |
| return "Please record audio first." | |
| try: | |
| # Save audio to temporary file | |
| sample_rate, audio_data = audio | |
| temp_path = voice_processor.save_audio_array(audio_data, sample_rate) | |
| # Process voice query | |
| voice_result = voice_processor.process_voice_query(temp_path) | |
| query = voice_result['query'] | |
| # Search contacts (this already extracts names and divisions internally) | |
| contacts = contact_search_service.search_contacts(query, top_k=10, min_confidence=0.3) | |
| # Format output - names and divisions are already extracted by search_contacts | |
| output = [] | |
| output.append(f"**π€ Transcribed Text:** {query}") | |
| output.append(f"**π Language:** {voice_result['language_name']}") | |
| if voice_result['was_translated']: | |
| output.append(f"**π Original:** {voice_result['original_text']}") | |
| output.append("") | |
| # Pass empty arrays since contact_search already did the matching | |
| output.append(format_contact_results(contacts, [], [])) | |
| # Cleanup | |
| voice_processor.cleanup_temp_file(temp_path) | |
| return "\n".join(output) | |
| except Exception as e: | |
| logger.error(f"Error in voice contact search: {e}") | |
| return f"Error: {str(e)}" | |
| def create_demo(): | |
| """Create the Gradio demo interface""" | |
| # Initialize services on startup | |
| stats = initialize_services() | |
| # Create the interface | |
| with gr.Blocks(title="Voice Assistant Demo", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(f""" | |
| # ποΈ Voice Assistant Demo | |
| ### Multi-language voice assistant with division matching and contact search | |
| **Database:** {stats['total_contacts']} contacts β’ {stats['departments']} departments β’ {stats['divisions']} divisions | |
| **Features:** | |
| - π£οΈ Speech-to-text in 99+ languages | |
| - π Smart division matching | |
| - π€ Name extraction (English & Arabic) | |
| - π Contact search with confidence scoring | |
| """) | |
| with gr.Tabs(): | |
| # Tab 1: Division Matching (Text) | |
| with gr.Tab("π Division Matching (Text)"): | |
| gr.Markdown(""" | |
| ### Search for divisions by text query | |
| Try queries like: | |
| - "I need help from IT Security" | |
| - "Find someone in Finance" | |
| - "Connect me to Human Resources" | |
| - "Find Ahmed in App Dev" | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| div_text_input = gr.Textbox( | |
| label="Enter your query", | |
| placeholder="e.g., I need help from IT Security", | |
| lines=2 | |
| ) | |
| div_text_btn = gr.Button("π Search Divisions", variant="primary") | |
| with gr.Column(): | |
| div_text_output = gr.Markdown(label="Results") | |
| div_text_btn.click( | |
| fn=search_divisions_text, | |
| inputs=[div_text_input], | |
| outputs=[div_text_output] | |
| ) | |
| # Tab 2: Division Matching (Voice) | |
| with gr.Tab("π€ Division Matching (Voice)"): | |
| gr.Markdown(""" | |
| ### Search for divisions by voice | |
| Speak your query in any language - it will be automatically transcribed and translated. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| div_voice_input = gr.Audio( | |
| sources=["microphone"], | |
| type="numpy", | |
| label="Record your voice query" | |
| ) | |
| div_voice_btn = gr.Button("π Search Divisions", variant="primary") | |
| with gr.Column(): | |
| div_voice_output = gr.Markdown(label="Results") | |
| div_voice_btn.click( | |
| fn=search_divisions_voice, | |
| inputs=[div_voice_input], | |
| outputs=[div_voice_output] | |
| ) | |
| # Tab 3: Contact Search (Text) | |
| with gr.Tab("π₯ Contact Search (Text)"): | |
| gr.Markdown(""" | |
| ### Search for contacts by text query | |
| Try queries like: | |
| - "Find Dima in Information Technology" | |
| - "Ahmed Al-Malek" | |
| - "I need to talk to someone in Legal" | |
| - "Find Rashed in Finance" | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| contact_text_input = gr.Textbox( | |
| label="Enter your query", | |
| placeholder="e.g., Find Dima in Information Technology", | |
| lines=2 | |
| ) | |
| contact_text_btn = gr.Button("π Search Contacts", variant="primary") | |
| with gr.Column(): | |
| contact_text_output = gr.Markdown(label="Results") | |
| contact_text_btn.click( | |
| fn=search_contacts_text, | |
| inputs=[contact_text_input], | |
| outputs=[contact_text_output] | |
| ) | |
| # Tab 4: Contact Search (Voice) | |
| with gr.Tab("ποΈ Contact Search (Voice)"): | |
| gr.Markdown(""" | |
| ### Search for contacts by voice | |
| Speak your query in any language to find contacts. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| contact_voice_input = gr.Audio( | |
| sources=["microphone"], | |
| type="numpy", | |
| label="Record your voice query" | |
| ) | |
| contact_voice_btn = gr.Button("π Search Contacts", variant="primary") | |
| with gr.Column(): | |
| contact_voice_output = gr.Markdown(label="Results") | |
| contact_voice_btn.click( | |
| fn=search_contacts_voice, | |
| inputs=[contact_voice_input], | |
| outputs=[contact_voice_output] | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| **Models:** | |
| - Embeddings: `sentence-transformers/all-MiniLM-L6-v2` | |
| - Name Extraction: `urchade/gliner_small-v2.1` | |
| - Speech-to-Text: `openai/whisper-tiny` | |
| **Supported Languages:** 99+ languages (auto-detected) | |
| """) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_demo() | |
| demo.launch() | |