Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, HTTPException, Form | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel | |
| import soundfile as sf | |
| import numpy as np | |
| import tempfile | |
| import os | |
| import warnings | |
| from pydub import AudioSegment | |
| import time | |
| warnings.filterwarnings("ignore") | |
| app = FastAPI() | |
| def convert_mp3_to_wav(mp3_path): | |
| sound = AudioSegment.from_mp3(mp3_path) | |
| wav_path = mp3_path.replace(".mp3", ".wav") | |
| sound.export(wav_path, format="wav") | |
| return wav_path | |
| def extract_audio_features(audio_file_path): | |
| waveform, sample_rate = sf.read(audio_file_path) | |
| if waveform.ndim > 1: | |
| waveform = waveform.mean(axis=1) | |
| energy = np.mean(waveform ** 2) | |
| mfccs = np.mean(np.abs(np.fft.fft(waveform)[:13]), axis=0) | |
| speech_rate = 4.0 | |
| f0 = np.mean(np.abs(np.diff(waveform))) * sample_rate / (2 * np.pi) | |
| return f0, energy, speech_rate, mfccs, waveform, sample_rate | |
| def analyze_voice_stress(audio_file_path): | |
| f0, energy, speech_rate, mfccs, waveform, sample_rate = extract_audio_features(audio_file_path) | |
| mean_f0 = f0 | |
| mean_energy = energy | |
| gender = 'male' if mean_f0 < 165 else 'female' | |
| norm_mean_f0 = 110 if gender == 'male' else 220 | |
| norm_std_f0 = 20 | |
| norm_mean_energy = 0.02 | |
| norm_std_energy = 0.005 | |
| norm_speech_rate = 4.4 | |
| norm_std_speech_rate = 0.5 | |
| z_f0 = (mean_f0 - norm_mean_f0) / norm_std_f0 | |
| z_energy = (mean_energy - norm_mean_energy) / norm_std_energy | |
| z_speech_rate = (speech_rate - norm_speech_rate) / norm_std_speech_rate | |
| stress_score = (0.4 * z_f0) + (0.4 * z_speech_rate) + (0.2 * z_energy) | |
| stress_level = round(float(1 / (1 + np.exp(-stress_score)) * 100), 2) | |
| categories = ["Very Low Stress", "Low Stress", "Moderate Stress", "High Stress", "Very High Stress"] | |
| category_idx = min(int(stress_level / 20), 4) | |
| stress_category = categories[category_idx] | |
| return {"stress_level": stress_level, "category": stress_category, "gender": gender} | |
| def analyze_text_stress(text: str): | |
| stress_keywords = ["anxious", "nervous", "stress", "panic", "tense"] | |
| stress_score = sum([1 for word in stress_keywords if word in text.lower()]) | |
| stress_level = min(stress_score * 20, 100) | |
| categories = ["Very Low Stress", "Low Stress", "Moderate Stress", "High Stress", "Very High Stress"] | |
| category_idx = min(int(stress_level / 20), 4) | |
| stress_category = categories[category_idx] | |
| return {"stress_level": stress_level, "category": stress_category} | |
| class StressResponse(BaseModel): | |
| stress_level: float | |
| category: str | |
| gender: str = None | |
| status: str | |
| time: str | |
| size: str | |
| async def analyze_stress( | |
| file: UploadFile = File(None), | |
| file_path: str = Form(None), | |
| text: str = Form(None) | |
| ): | |
| if file is None and file_path is None and text is None: | |
| raise HTTPException(status_code=400, detail="Either a file, file path, or text input is required.") | |
| start_time = time.time() | |
| if file or file_path: | |
| if file: | |
| if not (file.filename.endswith(".wav") or file.filename.endswith(".mp3")): | |
| raise HTTPException(status_code=400, detail="Only .wav and .mp3 files are supported.") | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[-1]) as temp_file: | |
| temp_file.write(await file.read()) | |
| temp_audio_path = temp_file.name | |
| file_size = os.path.getsize(temp_audio_path) | |
| else: | |
| if not (file_path.endswith(".wav") or file_path.endswith(".mp3")): | |
| raise HTTPException(status_code=400, detail="Only .wav and .mp3 files are supported.") | |
| if not os.path.exists(file_path): | |
| raise HTTPException(status_code=400, detail="File path does not exist.") | |
| temp_audio_path = file_path | |
| file_size = os.path.getsize(file_path) | |
| if temp_audio_path.endswith(".mp3"): | |
| temp_audio_path = convert_mp3_to_wav(temp_audio_path) | |
| try: | |
| result = analyze_voice_stress(temp_audio_path) | |
| processing_time_ms = int((time.time() - start_time) * 1000) | |
| result.update({ | |
| "status": "200 (OK)", | |
| "time": f"{processing_time_ms} ms", | |
| "size": f"{round(file_size / 1024, 2)} KB" | |
| }) | |
| return JSONResponse(content=result, status_code=200) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| if file: | |
| os.remove(temp_audio_path) | |
| elif text: | |
| result = analyze_text_stress(text) | |
| processing_time_ms = int((time.time() - start_time) * 1000) | |
| result.update({ | |
| "status": "200 (OK)", | |
| "time": f"{processing_time_ms} ms", | |
| "size": "N/A" | |
| }) | |
| return JSONResponse(content=result, status_code=200) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.getenv("PORT", 7860)) | |
| uvicorn.run("app:app", host="0.0.0.0", port=port, reload=True) | |