| from fastapi import FastAPI, UploadFile, File, HTTPException |
| from fastapi.responses import JSONResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| import os |
| import shutil |
| import uuid |
| import tempfile |
| import datetime |
| import time |
| from contextlib import contextmanager |
|
|
| |
| from fluency.fluency_api import main as analyze_fluency_main |
| from tone_modulation.tone_api import main as analyze_tone_main |
| from vcs.vcs_api import main as analyze_vcs_main |
| from vers.vers_api import main as analyze_vers_main |
| from voice_confidence_score.voice_confidence_api import main as analyze_voice_confidence_main |
| from vps.vps_api import main as analyze_vps_main |
| from ves.ves import calc_voice_engagement_score |
| from transcribe import transcribe_audio |
| from filler_count.filler_score import analyze_fillers |
| from emotion.emo_predict import predict_emotion |
|
|
| app = FastAPI() |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| ALLOWED_EXTENSIONS = {'.wav', '.mp3', '.m4a', '.mp4', '.flac'} |
|
|
| @contextmanager |
| def temp_file_handler(upload_file: UploadFile): |
| """Context manager to handle temporary file creation and cleanup.""" |
| temp_dir = "temp_uploads" |
| os.makedirs(temp_dir, exist_ok=True) |
| temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(upload_file.filename)[1]}" |
| temp_filepath = os.path.join(temp_dir, temp_filename) |
| |
| try: |
| with open(temp_filepath, "wb") as buffer: |
| shutil.copyfileobj(upload_file.file, buffer) |
| yield temp_filepath |
| finally: |
| if os.path.exists(temp_filepath): |
| os.remove(temp_filepath) |
|
|
| def validate_file_extension(filename: str): |
| """Validate if the file extension is allowed.""" |
| if not os.path.splitext(filename)[1].lower() in ALLOWED_EXTENSIONS: |
| raise HTTPException( |
| status_code=400, |
| detail="Invalid file type. Only .wav, .mp3, .m4a, .mp4, and .flac files are supported." |
| ) |
|
|
| async def process_audio_file(upload_file: UploadFile, analysis_func, **kwargs): |
| """Generic function to process an audio file with a given analysis function.""" |
| validate_file_extension(upload_file.filename) |
| |
| with temp_file_handler(upload_file) as temp_filepath: |
| try: |
| result = analysis_func(temp_filepath, **kwargs) |
| return JSONResponse(content=result) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") |
|
|
| @app.post("/analyze_fluency/") |
| async def analyze_fluency(file: UploadFile): |
| return await process_audio_file(file, analyze_fluency_main, model_size="base") |
|
|
| @app.post('/analyze_tone/') |
| async def analyze_tone(file: UploadFile): |
| return await process_audio_file(file, analyze_tone_main) |
|
|
| @app.post('/analyze_vcs/') |
| async def analyze_vcs(file: UploadFile): |
| return await process_audio_file(file, analyze_vcs_main) |
|
|
| @app.post('/analyze_vers/') |
| async def analyze_vers(file: UploadFile): |
| return await process_audio_file(file, analyze_vers_main) |
|
|
| @app.post('/voice_confidence/') |
| async def analyze_voice_confidence(file: UploadFile): |
| return await process_audio_file(file, analyze_voice_confidence_main) |
|
|
| @app.post('/analyze_vps/') |
| async def analyze_vps(file: UploadFile): |
| return await process_audio_file(file, analyze_vps_main) |
|
|
| @app.post('/voice_engagement_score/') |
| async def analyze_voice_engagement_score(file: UploadFile): |
| return await process_audio_file(file, calc_voice_engagement_score) |
|
|
| @app.post('/analyze_fillers/') |
| async def analyze_fillers_count(file: UploadFile): |
| return await process_audio_file(file, analyze_fillers) |
|
|
| @app.post('/transcribe/') |
| async def transcribe(file: UploadFile): |
| validate_file_extension(file.filename) |
| |
| start_time = time.time() |
| with temp_file_handler(file) as temp_filepath: |
| try: |
| transcript, language, _ = transcribe_audio(temp_filepath, model_size="base") |
| end_time = time.time() |
| response = { |
| "transcription": transcript, |
| "transcription_time": end_time - start_time, |
| "language": language |
| } |
| return JSONResponse(content=response) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}") |
|
|
| @app.post('/analyze_all/') |
| async def analyze_all(file: UploadFile): |
| """Endpoint to analyze all aspects of an uploaded audio file with single transcription.""" |
| print(f"Received request at {datetime.datetime.now()} for file: {file.filename}") |
| validate_file_extension(file.filename) |
| |
| with temp_file_handler(file) as temp_filepath: |
| try: |
| |
| transcript, language, _ = transcribe_audio(temp_filepath, model_size="base") |
| |
| |
| analyze_all_start = time.time() |
| |
| |
| filler_start = time.time() |
| filler_count = analyze_fillers(temp_filepath) |
| filler_count_number = filler_count.get("total_fillers", 0) |
| filler_end = time.time() |
| print(f"Filler analysis time: {filler_end - filler_start} seconds") |
| |
| fluency_start = time.time() |
| fluency_result = analyze_fluency_main(temp_filepath, model_size="base", filler_count = filler_count_number) |
| fluency_score = fluency_result['fluency_score'] |
| fluency_end = time.time() |
| print(f"Fluency analysis time: {fluency_end - fluency_start} seconds") |
| |
| tone_start = time.time() |
| tone_result = analyze_tone_main(temp_filepath) |
| tone_end = time.time() |
| print(f"Tone analysis time: {tone_end - tone_start} seconds") |
| |
| vcs_start = time.time() |
| vcs_result = analyze_vcs_main(temp_filepath) |
| vcs_end = time.time() |
| print(f"VCS analysis time: {vcs_end - vcs_start} seconds") |
| |
| vers_start = time.time() |
| vers_result = analyze_vers_main(temp_filepath, model_size="base", filler_count = filler_count_number) |
| vers_end = time.time() |
| print(f"VERS analysis time: {vers_end - vers_start} seconds") |
| |
| voice_confidence_start = time.time() |
| voice_confidence_result = analyze_voice_confidence_main(temp_filepath, model_size="base", filler_count = filler_count_number, fluency_score = fluency_score) |
| print("voice_confidence_result:", voice_confidence_result) |
|
|
| voice_confidence_end = time.time() |
| print(f"Voice confidence analysis time: {voice_confidence_end - voice_confidence_start} seconds") |
| |
| vps_start = time.time() |
| vps_result = analyze_vps_main(temp_filepath) |
| vps_end = time.time() |
| print(f"VPS analysis time: {vps_end - vps_start} seconds") |
| ves_start = time.time() |
| ves_result = calc_voice_engagement_score(temp_filepath) |
| ves_end = time.time() |
| print(f"VES analysis time: {ves_end - ves_start} seconds") |
| emotion_start = time.time() |
| emotion = predict_emotion(temp_filepath) |
| emotion_end = time.time() |
| print(f"Emotion analysis time: {emotion_end - emotion_start} seconds") |
| |
| |
| avg_score = ( |
| fluency_result['fluency_score'] + |
| tone_result['speech_dynamism_score'] + |
| vcs_result['Voice Clarity Sore'] + |
| vers_result['VERS Score'] + |
| voice_confidence_result['voice_confidence_score'] + |
| vps_result['VPS'] + |
| ves_result['ves'] |
| ) / 7 |
| |
| analyze_all_end = time.time() |
|
|
| |
| combined_result = { |
| "fluency": fluency_result, |
| "tone": tone_result, |
| "vcs": vcs_result, |
| "vers": vers_result, |
| "voice_confidence": voice_confidence_result, |
| "vps": vps_result, |
| "ves": ves_result, |
| "filler_words": filler_count, |
| "transcript": transcript, |
| "Detected Language": language, |
| "emotion": emotion, |
| "sank_score": avg_score, |
| "analysis_time": analyze_all_end - analyze_all_start, |
| } |
|
|
| return JSONResponse(content=combined_result) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") |
|
|