Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import librosa | |
| import numpy as np | |
| import torch | |
| from transformers import WhisperProcessor, WhisperForConditionalGeneration | |
| from simple_salesforce import Salesforce | |
| import os | |
| from datetime import datetime | |
| import logging | |
| import webrtcvad | |
| import google.generativeai as genai | |
| from gtts import gTTS | |
| import tempfile | |
| import base64 | |
| import re | |
| import subprocess | |
| from cryptography.fernet import Fernet | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| usage_metrics = {"total_assessments": 0, "assessments_by_language": {}} | |
| # Environment variables | |
| SF_USERNAME = os.getenv("SF_USERNAME", "smartvoicebot@voice.com") | |
| SF_PASSWORD = os.getenv("SF_PASSWORD", "voicebot1") | |
| SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN", "jq4VVHUFti6TmzJDjjegv2h6b") | |
| SF_INSTANCE_URL = os.getenv("SF_INSTANCE_URL", "https://swe42.sfdc-cehfhs.salesforce.com") | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "AIzaSyBzr5vVpbe8CV1v70l3pGDp9vRJ76yCxdk") | |
| ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY", Fernet.generate_key().decode()) | |
| DEFAULT_EMAIL = os.getenv("SALESFORCE_USER_EMAIL", "default@mindcare.com") | |
| # Initialize encryption | |
| cipher = Fernet(ENCRYPTION_KEY) | |
| # Initialize Salesforce | |
| try: | |
| sf = Salesforce( | |
| username=SF_USERNAME, | |
| password=SF_PASSWORD, | |
| security_token=SF_SECURITY_TOKEN, | |
| instance_url=SF_INSTANCE_URL | |
| ) | |
| logger.info(f"Connected to Salesforce at {SF_INSTANCE_URL}") | |
| except Exception as e: | |
| logger.error(f"Salesforce connection failed: {str(e)}") | |
| sf = None | |
| # Initialize Google Gemini | |
| try: | |
| genai.configure(api_key=GEMINI_API_KEY) | |
| gemini_model = genai.GenerativeModel('gemini-1.5-flash') | |
| chat = gemini_model.start_chat(history=[]) | |
| logger.info("Connected to Google Gemini") | |
| except Exception as e: | |
| logger.error(f"Google Gemini initialization failed: {str(e)}") | |
| chat = None | |
| # Load Whisper model | |
| SUPPORTED_LANGUAGES = {"en": "english", "es": "spanish", "hi": "hindi", "zh": "mandarin"} | |
| SALESFORCE_LANGUAGE_MAP = {"en": "English", "es": "Spanish", "hi": "Hindi", "zh": "Mandarin"} | |
| whisper_processor = WhisperProcessor.from_pretrained("openai/whisper-small") | |
| whisper_model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small") | |
| vad = webrtcvad.Vad(mode=2) | |
| # Context for chatbot | |
| base_info = """ | |
| You are MindCare, an AI health assistant providing support in: | |
| - Mental health: Emotional support, stress management | |
| - Medical guidance: Symptom analysis, general advice | |
| - General health: Lifestyle and wellness recommendations | |
| Tone: Empathetic, supportive, informative. Always suggest professional consultation for medical issues. | |
| """ | |
| context = [base_info] | |
| def encrypt_data(data): | |
| try: | |
| return cipher.encrypt(data.encode('utf-8')).decode('utf-8') | |
| except Exception as e: | |
| logger.error(f"Encryption failed: {str(e)}") | |
| return data | |
| def decrypt_data(encrypted_data): | |
| try: | |
| return cipher.decrypt(encrypted_data.encode('utf-8')).decode('utf-8') | |
| except Exception as e: | |
| logger.error(f"Decryption failed: {str(e)}") | |
| return encrypted_data | |
| def extract_health_features(audio, sr): | |
| try: | |
| audio = audio / np.max(np.abs(audio)) if np.max(np.abs(audio)) != 0 else audio | |
| frame_duration = 30 | |
| frame_samples = int(sr * frame_duration / 1000) | |
| frames = [audio[i:i + frame_samples] for i in range(0, len(audio), frame_samples)] | |
| voiced_frames = [frame for frame in frames if len(frame) == frame_samples and vad.is_speech((frame * 32768).astype(np.int16).tobytes(), sr)] | |
| if not voiced_frames: | |
| raise ValueError("No voiced segments detected") | |
| voiced_audio = np.concatenate(voiced_frames) | |
| # Enhanced feature extraction | |
| pitches, magnitudes = librosa.piptrack(y=voiced_audio, sr=sr, fmin=75, fmax=300) | |
| valid_pitches = [p for p in pitches[magnitudes > 0] if 75 <= p <= 300] | |
| pitch = np.mean(valid_pitches) if valid_pitches else 0 | |
| jitter = np.std(valid_pitches) / pitch if pitch and valid_pitches else 0 | |
| jitter = min(jitter, 10) # Cap jitter | |
| amplitudes = librosa.feature.rms(y=voiced_audio, frame_length=2048, hop_length=512)[0] | |
| shimmer = np.std(amplitudes) / np.mean(amplitudes) if np.mean(amplitudes) else 0 | |
| shimmer = min(shimmer, 10) # Cap shimmer | |
| energy = np.mean(librosa.feature.rms(y=voiced_audio, frame_length=2048, hop_length=512)[0]) | |
| # Additional features | |
| mfcc = np.mean(librosa.feature.mfcc(y=voiced_audio, sr=sr, n_mfcc=13), axis=1) | |
| spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=voiced_audio, sr=sr)) | |
| return { | |
| "pitch": pitch, | |
| "jitter": jitter * 100, | |
| "shimmer": shimmer * 100, | |
| "energy": energy, | |
| "mfcc_mean": np.mean(mfcc), | |
| "spectral_centroid": spectral_centroid | |
| } | |
| except Exception as e: | |
| logger.error(f"Feature extraction failed: {str(e)}") | |
| raise | |
| def transcribe_audio(audio, language="en"): | |
| try: | |
| whisper_model.config.forced_decoder_ids = whisper_processor.get_decoder_prompt_ids( | |
| language=SUPPORTED_LANGUAGES.get(language, "english"), task="transcribe" | |
| ) | |
| inputs = whisper_processor(audio, sampling_rate=16000, return_tensors="pt") | |
| with torch.no_grad(): | |
| generated_ids = whisper_model.generate(inputs["input_features"]) | |
| transcription = whisper_processor.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
| logger.info(f"Transcription (language: {language}): {transcription}") | |
| return transcription | |
| except Exception as e: | |
| logger.error(f"Transcription failed: {str(e)}") | |
| return None | |
| def get_chatbot_response(message, language="en"): | |
| if not chat or not message: | |
| return "Unable to generate response.", None | |
| full_context = "\n".join(context) + f"\nUser: {message}\nMindCare:" | |
| try: | |
| response = chat.send_message(full_context).text | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio: | |
| tts = gTTS(text=response, lang=language, slow=False) | |
| tts.save(temp_audio.name) | |
| audio_path = temp_audio.name | |
| return response, audio_path | |
| except Exception as e: | |
| logger.error(f"Chatbot response failed: {str(e)}") | |
| return "Error generating response.", None | |
| def analyze_symptoms(text, features): | |
| feedback = [] | |
| text = text.lower() if text else "" | |
| # Voice-based health assessment | |
| if features["jitter"] > 2.0: | |
| feedback.append(f"Elevated jitter ({features['jitter']:.2f}%) detected, which may indicate respiratory strain or vocal cord issues. Consult a doctor.") | |
| if features["shimmer"] > 3.0: | |
| feedback.append(f"High shimmer ({features['shimmer']:.2f}%) suggests possible emotional stress or vocal fatigue. Consider professional evaluation.") | |
| if features["energy"] < 0.01: | |
| feedback.append(f"Low vocal energy ({features['energy']:.4f}) detected, which might indicate fatigue or low mood. Rest and medical advice recommended.") | |
| if features["pitch"] < 100 or features["pitch"] > 250: | |
| feedback.append(f"Unusual pitch ({features['pitch']:.2f} Hz) may indicate vocal cord issues or emotional stress.") | |
| if features["spectral_centroid"] > 2000: | |
| feedback.append(f"High spectral centroid ({features['spectral_centroid']:.2f} Hz) suggests tense speech, possibly linked to stress or anxiety.") | |
| # Text-based symptom analysis | |
| if "cough" in text or "breath" in text: | |
| feedback.append("Your description suggests respiratory symptoms. Possible conditions include bronchitis or asthma. Please consult a doctor.") | |
| if "stress" in text or "anxious" in text: | |
| feedback.append("You mentioned stress or anxiety. Try deep breathing or mindfulness. Consider speaking with a mental health professional.") | |
| if "pain" in text: | |
| feedback.append("Pain reported. For mild pain, consider Paracetamol; for inflammation, Ibuprofen may help. Consult a doctor before taking medication.") | |
| if not feedback: | |
| feedback.append("No specific health concerns detected from voice or text. Maintain a healthy lifestyle and consult a doctor if symptoms arise.") | |
| return "\n".join(feedback) | |
| def store_user_consent(language): | |
| if not sf: | |
| logger.warning("Salesforce not connected; skipping consent storage") | |
| return None | |
| try: | |
| user = sf.query(f"SELECT Id FROM HealthUser__c WHERE Email__c = '{DEFAULT_EMAIL}'") | |
| user_id = None | |
| if user["totalSize"] == 0: | |
| user = sf.HealthUser__c.create({ | |
| "Email__c": DEFAULT_EMAIL, | |
| "Language__c": SALESFORCE_LANGUAGE_MAP.get(language, "English"), | |
| "ConsentGiven__c": True | |
| }) | |
| user_id = user["id"] | |
| logger.info(f"Created new user with email: {DEFAULT_EMAIL}") | |
| else: | |
| user_id = user["records"][0]["Id"] | |
| sf.HealthUser__c.update(user_id, { | |
| "Language__c": SALESFORCE_LANGUAGE_MAP.get(language, "English"), | |
| "ConsentGiven__c": True | |
| }) | |
| logger.info(f"Updated user with email: {DEFAULT_EMAIL}") | |
| sf.ConsentLog__c.create({ | |
| "HealthUser__c": user_id, | |
| "ConsentType__c": "Voice Analysis", | |
| "ConsentDate__c": datetime.utcnow().isoformat() | |
| }) | |
| return user_id | |
| except Exception as e: | |
| logger.error(f"Consent storage failed: {str(e)}") | |
| return None | |
| def generate_pdf_report(feedback, transcription, features, language): | |
| try: | |
| # Sanitize inputs for LaTeX | |
| feedback = feedback.replace('&', '\\&').replace('%', '\\%').replace('$', '\\$').replace('#', '\\#') | |
| transcription = transcription.replace('&', '\\&').replace('%', '\\%').replace('$', '\\$').replace('#', '\\#') if transcription else "None" | |
| email = DEFAULT_EMAIL.replace('&', '\\&').replace('%', '\\%').replace('$', '\\$').replace('#', '\\#') | |
| language_display = SALESFORCE_LANGUAGE_MAP.get(language, "English") | |
| latex_content = ( | |
| "\\documentclass[a4paper,12pt]{article}\n" | |
| "\\usepackage[utf8]{inputenc}\n" | |
| "\\usepackage{geometry}\n" | |
| "\\usepackage{parskip}\n" | |
| "\\usepackage{titlesec}\n" | |
| "\\usepackage{times}\n" | |
| "\\usepackage{datetime}\n" | |
| "\\newdateformat{isodate}{\\THEDAY{} \\short Ascent(0,0) \\shortmonthname[\\THEMONTH] \\THEYEAR}\n" | |
| "\\geometry{margin=1in}\n" | |
| "\\titleformat{\\section}{\\large\\bfseries}{\\thesection}{1em}{}\n" | |
| "\\titleformat{\\subsection}{\\bfseries}{\\thesubsection}{1em}{}\n" | |
| "\\begin{document}\n" | |
| "\\begin{center}\n" | |
| " \\textbf{\\large MindCare Health Assistant Report} \\\\\n" | |
| " \\vspace{0.5cm}\n" | |
| " Generated on \\isodate\\today\\ at \\currenttime\n" | |
| "\\end{center}\n" | |
| "\\section*{User Information}\n" | |
| "\\begin{itemize}\n" | |
| f" \\item \\textbf{{Email}}: {email}\n" | |
| f" \\item \\textbf{{Language}}: {language_display}\n" | |
| "\\end{itemize}\n" | |
| "\\section*{Voice Analysis Results}\n" | |
| "\\subsection*{Health Assessment}\n" | |
| f"{feedback}\n" | |
| "\\subsection*{Transcription}\n" | |
| f"{transcription}\n" | |
| "\\subsection*{Voice Metrics}\n" | |
| "\\begin{itemize}\n" | |
| f" \\item \\textbf{{Pitch}}: {features['pitch']:.2f} Hz\n" | |
| f" \\item \\textbf{{Jitter}}: {features['jitter']:.2f}\\%\n" | |
| f" \\item \\textbf{{Shimmer}}: {features['shimmer']:.2f}\\%\n" | |
| f" \\item \\textbf{{Energy}}: {features['energy']:.4f}\n" | |
| f" \\item \\textbf{{MFCC Mean}}: {features['mfcc_mean']:.2f}\n" | |
| f" \\item \\textbf{{Spectral Centroid}}: {features['spectral_centroid']:.2f} Hz\n" | |
| "\\end{itemize}\n" | |
| "\\section*{Disclaimer}\n" | |
| "This report is a preliminary analysis and not a medical diagnosis. Always consult a healthcare provider.\n" | |
| "\\end{document}\n" | |
| ) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".tex") as tex_file: | |
| tex_file.write(latex_content.encode('utf-8')) | |
| tex_file_path = tex_file.name | |
| pdf_path = tex_file_path.replace('.tex', '.pdf') | |
| result = subprocess.run( | |
| ['latexmk', '-pdf', '-pdflatex=pdflatex', '-interaction=nonstopmode', tex_file_path], | |
| capture_output=True, text=True, check=True | |
| ) | |
| logger.info(f"PDF generation output: {result.stdout}") | |
| for ext in ['.aux', '.log', '.out', '.fls', '.fdb_latexmk']: | |
| try: | |
| os.remove(tex_file_path.replace('.tex', ext)) | |
| except: | |
| pass | |
| if os.path.exists(pdf_path): | |
| logger.info(f"Generated PDF report: {pdf_path}") | |
| return pdf_path | |
| else: | |
| logger.error("PDF file was not created") | |
| return None | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"PDF generation failed: {e.stderr}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"PDF generation failed: {str(e)}") | |
| return None | |
| def store_in_salesforce(user_id, audio_file, feedback, respiratory_score, mental_health_score, features, transcription, language): | |
| if not sf: | |
| logger.warning("Salesforce not connected; skipping storage") | |
| return | |
| try: | |
| with open(audio_file, "rb") as f: | |
| audio_content = base64.b64encode(f.read()).decode() | |
| content_version = sf.ContentVersion.create({ | |
| "Title": f"Voice_Assessment_{datetime.utcnow().isoformat()}", | |
| "PathOnClient": os.path.basename(audio_file), | |
| "VersionData": audio_content, | |
| "IsMajorVersion": True | |
| }) | |
| content_document_id = sf.query(f"SELECT ContentDocumentId FROM ContentVersion WHERE Id = '{content_version['id']}'")["records"][0]["ContentDocumentId"] | |
| file_url = f"{SF_INSTANCE_URL}/lightning/r/ContentDocument/{content_document_id}/view" | |
| feedback_str = feedback.encode('utf-8').decode('utf-8') | |
| encrypted_feedback = encrypt_data(feedback_str) | |
| if len(encrypted_feedback) > 131072: | |
| encrypted_feedback = encrypted_feedback[:131072] | |
| assessment = sf.VoiceAssessment__c.create({ | |
| "HealthUser__c": user_id, | |
| "VoiceRecording__c": file_url, | |
| "AssessmentResult__c": encrypted_feedback, | |
| "AssessmentDate__c": datetime.utcnow().isoformat(), | |
| "ConfidenceScore__c": 95.0, | |
| "RespiratoryScore__c": float(respiratory_score), | |
| "MentalHealthScore__c": float(mental_health_score), | |
| "Pitch__c": float(features["pitch"]), | |
| "Jitter__c": float(features["jitter"]), | |
| "Shimmer__c": float(features["shimmer"]), | |
| "Energy__c": float(features["energy"]), | |
| "Transcription__c": transcription or "None", | |
| "Language__c": SALESFORCE_LANGUAGE_MAP.get(language, "English") | |
| }) | |
| sf.ContentDocumentLink.create({ | |
| "ContentDocumentId": content_document_id, | |
| "LinkedEntityId": assessment["id"], | |
| "ShareType": "V" | |
| }) | |
| logger.info(f"Stored assessment in Salesforce: {assessment['id']}") | |
| except Exception as e: | |
| logger.error(f"Salesforce storage failed: {str(e)}") | |
| raise | |
| def analyze_voice(audio_file=None, language="en"): | |
| global usage_metrics | |
| usage_metrics["total_assessments"] += 1 | |
| usage_metrics["assessments_by_language"][language] = usage_metrics["assessments_by_language"].get(language, 0) + 1 | |
| try: | |
| if not audio_file or not os.path.exists(audio_file): | |
| raise ValueError("No valid audio file provided") | |
| audio, sr = librosa.load(audio_file, sr=16000) | |
| if len(audio) < sr: | |
| raise ValueError("Audio too short (minimum 1 second)") | |
| user_id = store_user_consent(language) | |
| if not user_id: | |
| return "Error: Failed to store user consent.", None | |
| features = extract_health_features(audio, sr) | |
| transcription = transcribe_audio(audio, language) | |
| feedback = analyze_symptoms(transcription, features) | |
| respiratory_score = features["jitter"] | |
| mental_health_score = features["shimmer"] | |
| 等 | |
| feedback += f"\n\n**Voice Analysis Details**:\n" | |
| feedback += f"- Pitch: {features['pitch']:.2f} Hz\n" | |
| feedback += f"- Jitter: {features['jitter']:.2f}% (voice stability)\n" | |
| feedback += f"- Shimmer: {features['shimmer']:.2f}% (amplitude variation)\n" | |
| feedback += f"- Energy: {features['energy']:.4f} (vocal intensity)\n" | |
| feedback += f"- MFCC Mean: {features['mfcc_mean']:.2f} (timbre quality)\n" | |
| feedback += f"- Spectral Centroid: {features['spectral_centroid']:.2f} Hz (voice brightness)\n" | |
| feedback += f"- Transcription: {transcription if transcription else 'None'}\n" | |
| feedback += "\n**Disclaimer**: This is a preliminary analysis. Consult a healthcare provider for professional evaluation." | |
| if sf: | |
| store_in_salesforce(user_id, audio_file, feedback, respiratory_score, mental_health_score, features, transcription, language) | |
| pdf_path = generate_pdf_report(feedback, transcription, features, language) | |
| try: | |
| os.remove(audio_file) | |
| logger.info(f"Deleted audio file: {audio_file}") | |
| except Exception as e: | |
| logger.error(f"Failed to delete audio file: {str(e)}") | |
| return feedback, pdf_path | |
| except Exception as e: | |
| logger.error(f"Audio processing failed: {str(e)}") | |
| return f"Error: {str(e)}", None | |
| def launch(): | |
| with gr.Blocks(title="MindCare Health Assistant", css=".gradio-container {max-width: 1200px; margin: auto; font-family: Arial, sans-serif;}") as demo: | |
| gr.Markdown("# MindCare Health Assistant") | |
| gr.Markdown("Record your voice or type a message for health assessments and suggestions.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Voice Analysis") | |
| gr.Markdown("Record or upload voice (1+ sec) describing symptoms (e.g., 'I have a cough' or 'I feel stressed').") | |
| language_input = gr.Dropdown(choices=list(SUPPORTED_LANGUAGES.keys()), label="Select Language", value="en") | |
| consent_input = gr.Checkbox(label="I consent to data storage and voice analysis", value=True, interactive=False) | |
| audio_input = gr.Audio(type="filepath", label="Record or Upload Voice (WAV, MP3, FLAC)", format="wav") | |
| voice_output = gr.Textbox(label="Health Assessment Results", elem_id="health-results") | |
| pdf_output = gr.File(label="Download Assessment Report (PDF)") | |
| submit_btn = gr.Button("Submit") | |
| clear_btn = gr.Button("Clear") | |
| with gr.Column(): | |
| gr.Markdown("### Health Suggestions") | |
| gr.Markdown("Enter a message for personalized health advice.") | |
| text_input = gr.Textbox(label="Enter your message") | |
| text_output = gr.Textbox(label="Response") | |
| audio_output = gr.Audio(label="Response Audio") | |
| suggest_submit_btn = gr.Button("Submit") | |
| suggest_clear_btn = gr.Button("Clear") | |
| submit_btn.click( | |
| fn=analyze_voice, | |
| inputs=[audio_input, language_input], | |
| outputs=[voice_output, pdf_output] | |
| ) | |
| clear_btn.click( | |
| fn=lambda: (gr.update(value=None), gr.update(value="en"), gr.update(value=""), gr.update(value=None)), | |
| inputs=None, | |
| outputs=[audio_input, language_input, voice_output, pdf_output] | |
| ) | |
| suggest_submit_btn.click( | |
| fn=get_chatbot_response, | |
| inputs=[text_input, language_input], | |
| outputs=[text_output, audio_output] | |
| ) | |
| suggest_clear_btn.click( | |
| fn=lambda: (gr.update(value=""), gr.update(value=""), gr.update(value=None)), | |
| inputs=None, | |
| outputs=[text_input, text_output, audio_output] | |
| ) | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |
| if __name__ == "__main__": | |
| launch() |