| from fastapi import FastAPI, UploadFile, File, Form , HTTPException |
| from fastapi.responses import JSONResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| import sys |
| import os |
| import shutil |
| import uuid |
|
|
| |
| |
|
|
| from fluency.fluency_api import main as analyze_fluency_main |
| from tone_modulation.tone_api import main as analyze_tone_main |
| from vcs.vcs_api import main as analyze_vcs_main |
| from vers.vers_api import main as analyze_vers_main |
| from voice_confidence_score.voice_confidence_api import main as analyze_voice_confidence_main |
| from vps.vps_api import main as analyze_vps_main |
| from ves.ves import calc_voice_engagement_score |
| from transcribe import transcribe_audio |
| from filler_count.filler_score import analyze_fillers |
| from emotion.emo_predict import predict_emotion |
|
|
| app = FastAPI() |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| @app.post("/analyze_fluency/") |
| async def analyze_fluency(file: UploadFile): |
| |
| if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): |
| raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") |
| |
| |
| temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
| temp_dir = "temp_uploads" |
| temp_filepath = os.path.join(temp_dir, temp_filename) |
| os.makedirs(temp_dir, exist_ok=True) |
|
|
| try: |
| |
| with open(temp_filepath, "wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
|
|
|
|
| result = analyze_fluency_main(temp_filepath, model_size="base") |
|
|
| return JSONResponse(content=result) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Fluency analysis failed: {str(e)}") |
|
|
| finally: |
| |
| if os.path.exists(temp_filepath): |
| os.remove(temp_filepath) |
| |
| @app.post('/analyze_tone/') |
| async def analyze_tone(file: UploadFile): |
| """ |
| Endpoint to analyze tone of an uploaded audio file (.wav or .mp3). |
| """ |
| if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): |
| raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") |
| |
| |
| temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
| temp_dir = "temp_uploads" |
| temp_filepath = os.path.join(temp_dir, temp_filename) |
| os.makedirs(temp_dir, exist_ok=True) |
|
|
| try: |
| |
| with open(temp_filepath, "wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
|
|
| |
| result = analyze_tone_main(temp_filepath) |
|
|
| return JSONResponse(content=result) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Tone analysis failed: {str(e)}") |
|
|
| finally: |
| |
| if os.path.exists(temp_filepath): |
| os.remove(temp_filepath) |
|
|
| @app.post('/analyze_vcs/') |
| async def analyze_vcs(file: UploadFile): |
| """ |
| Endpoint to analyze voice clarity of an uploaded audio file (.wav or .mp3). |
| """ |
| if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): |
| raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") |
| |
| |
| temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
| temp_dir = "temp_uploads" |
| temp_filepath = os.path.join(temp_dir, temp_filename) |
| os.makedirs(temp_dir, exist_ok=True) |
|
|
| try: |
| |
| with open(temp_filepath, "wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
|
|
| |
| result = analyze_vcs_main(temp_filepath) |
|
|
| return JSONResponse(content=result) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Voice clarity analysis failed: {str(e)}") |
|
|
| finally: |
| |
| if os.path.exists(temp_filepath): |
| os.remove(temp_filepath) |
|
|
| @app.post('/analyze_vers/') |
| async def analyze_vers(file: UploadFile): |
| """ |
| Endpoint to analyze VERS of an uploaded audio file (.wav or .mp3). |
| """ |
| if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): |
| raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") |
| |
| |
| temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
| temp_dir = "temp_uploads" |
| temp_filepath = os.path.join(temp_dir, temp_filename) |
| os.makedirs(temp_dir, exist_ok=True) |
|
|
| try: |
| |
| with open(temp_filepath, "wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
|
|
| |
| result = analyze_vers_main(temp_filepath) |
|
|
| return JSONResponse(content=result) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"VERS analysis failed: {str(e)}") |
|
|
| finally: |
| |
| if os.path.exists(temp_filepath): |
| os.remove(temp_filepath) |
| |
| @app.post('/voice_confidence/') |
| async def analyze_voice_confidence(file: UploadFile): |
| """ |
| Endpoint to analyze voice confidence of an uploaded audio file (.wav or .mp3). |
| """ |
| if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): |
| raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") |
| |
| |
| temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
| temp_dir = "temp_uploads" |
| temp_filepath = os.path.join(temp_dir, temp_filename) |
| os.makedirs(temp_dir, exist_ok=True) |
|
|
| try: |
| |
| with open(temp_filepath, "wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
|
|
| |
| result = analyze_voice_confidence_main(temp_filepath) |
|
|
| return JSONResponse(content=result) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Voice confidence analysis failed: {str(e)}") |
|
|
| finally: |
| |
| if os.path.exists(temp_filepath): |
| os.remove(temp_filepath) |
|
|
| @app.post('/analyze_vps/') |
| async def analyze_vps(file: UploadFile): |
| """ |
| Endpoint to analyze voice pacing score of an uploaded audio file (.wav or .mp3). |
| """ |
| if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): |
| raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") |
| |
| |
| temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
| temp_dir = "temp_uploads" |
| temp_filepath = os.path.join(temp_dir, temp_filename) |
| os.makedirs(temp_dir, exist_ok=True) |
|
|
| try: |
| |
| with open(temp_filepath, "wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
|
|
| |
| result = analyze_vps_main(temp_filepath) |
|
|
| return JSONResponse(content=result) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Voice pacing score analysis failed: {str(e)}") |
|
|
| finally: |
| |
| if os.path.exists(temp_filepath): |
| os.remove(temp_filepath) |
|
|
| @app.post('/voice_engagement_score/') |
| async def analyze_voice_engagement_score(file: UploadFile): |
| """ |
| Endpoint to analyze voice engagement score of an uploaded audio file (.wav or .mp3). |
| """ |
| if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): |
| raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") |
| |
| |
| temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
| temp_dir = "temp_uploads" |
| temp_filepath = os.path.join(temp_dir, temp_filename) |
| os.makedirs(temp_dir, exist_ok=True) |
|
|
| try: |
| |
| with open(temp_filepath, "wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
|
|
| |
| result = calc_voice_engagement_score(temp_filepath) |
|
|
| return JSONResponse(content=result) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Voice engagement score analysis failed: {str(e)}") |
|
|
| finally: |
| |
| if os.path.exists(temp_filepath): |
| os.remove(temp_filepath) |
|
|
| @app.post('/analyze_fillers/') |
| async def analyze_fillers_count(file: UploadFile): |
| """ |
| Endpoint to analyze filler words in an uploaded audio file (.wav or .mp3). |
| """ |
| if not file.filename.endswith(('.wav', '.mp3','.mp4','.m4a','.flac')): |
| raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") |
| |
| |
| temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
| temp_dir = "temp_uploads" |
| temp_filepath = os.path.join(temp_dir, temp_filename) |
| os.makedirs(temp_dir, exist_ok=True) |
|
|
| try: |
| |
| with open(temp_filepath, "wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
|
|
| |
| result = analyze_fillers(temp_filepath) |
|
|
| return JSONResponse(content=result) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Filler analysis failed: {str(e)}") |
|
|
| finally: |
| |
| if os.path.exists(temp_filepath): |
| os.remove(temp_filepath) |
|
|
|
|
| import time |
|
|
|
|
|
|
| @app.post('/transcribe/') |
| async def transcribe(file: UploadFile, language: str = Form(...)): |
| """ |
| Endpoint to transcribe an uploaded audio file (.wav or .mp3). |
| """ |
| |
| start_time = time.time() |
| if not file.filename.endswith(('.wav', '.mp3','mp4','.m4a','.flac')): |
| raise HTTPException(status_code=400, detail="Invalid file type. Only .wav ,mp4 and .mp3 files are supported.") |
| |
| |
| temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
| temp_dir = "temp_uploads" |
| temp_filepath = os.path.join(temp_dir, temp_filename) |
| os.makedirs(temp_dir, exist_ok=True) |
|
|
| try: |
| |
| with open(temp_filepath, "wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
|
|
| |
| result = transcribe_audio(temp_filepath, language=language, model_size="base") |
| end_time = time.time() |
| transcription_time = end_time - start_time |
| response = { |
| "transcription": result, |
| "transcription_time": transcription_time |
| } |
| |
| return JSONResponse(content=response) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}") |
| |
| finally: |
| |
| if os.path.exists(temp_filepath): |
| os.remove(temp_filepath) |
|
|
|
|
| @app.post('/analyze_all/') |
| async def analyze_all(file: UploadFile, language: str = Form(...)): |
| """ |
| Endpoint to analyze all aspects of an uploaded audio file (.wav or .mp3). |
| """ |
| if not file.filename.endswith(('.wav', '.mp3','.m4a','.mp4','.flac')): |
| raise HTTPException(status_code=400, detail="Invalid file type. Only .wav and .mp3 files are supported.") |
| |
| |
| temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(file.filename)[1]}" |
| temp_dir = "temp_uploads" |
| temp_filepath = os.path.join(temp_dir, temp_filename) |
| os.makedirs(temp_dir, exist_ok=True) |
|
|
| try: |
| |
| with open(temp_filepath, "wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
|
|
| |
| fluency_result = analyze_fluency_main(temp_filepath, model_size="base") |
| tone_result = analyze_tone_main(temp_filepath) |
| vcs_result = analyze_vcs_main(temp_filepath) |
| vers_result = analyze_vers_main(temp_filepath) |
| voice_confidence_result = analyze_voice_confidence_main(temp_filepath) |
| vps_result = analyze_vps_main(temp_filepath) |
| ves_result = calc_voice_engagement_score(temp_filepath) |
| filler_count = analyze_fillers(temp_filepath) |
| transcript = transcribe_audio(temp_filepath, language, "base") |
| emotion = predict_emotion(temp_filepath) |
| avg_score = (fluency_result['fluency_score'] + tone_result['speech_dynamism_score'] + vcs_result['Voice Clarity Sore'] + vers_result['VERS Score'] + voice_confidence_result['voice_confidence_score'] + vps_result['VPS'] + ves_result['ves']) / 7 |
|
|
|
|
| |
| combined_result = { |
| "fluency": fluency_result, |
| "tone": tone_result, |
| "vcs": vcs_result, |
| "vers": vers_result, |
| "voice_confidence": voice_confidence_result, |
| "vps": vps_result, |
| "ves": ves_result, |
| "filler_words": filler_count, |
| "transcript": transcript, |
| "emotion": emotion , |
| "sank_score": avg_score |
| } |
|
|
| return JSONResponse(content=combined_result) |
|
|
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") |
|
|
| finally: |
| |
| if os.path.exists(temp_filepath): |
| os.remove(temp_filepath) |
|
|