Audio / app.py
norhan12's picture
Update app.py
a9a21fe verified
raw
history blame
2.9 kB
import logging
import os
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
from pydantic import BaseModel, HttpUrl
from typing import Dict
from process_interview import process_interview
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
app = FastAPI(
title="EvalBot API",
description="API to analyze a single audio interview URL.",
version="1.0.0"
)
# Directory where output files are stored
OUTPUT_DIR = "./processed_audio"
os.makedirs(OUTPUT_DIR, exist_ok=True)
class AudioItem(BaseModel):
url: HttpUrl
user_id: str
def process_item_wrapper(item: AudioItem) -> Dict:
try:
logger.info(f"Starting analysis for user '{item.user_id}' with URL: {item.url}")
result = process_interview(str(item.url))
# Return just the filename, not the full path
if result and "pdf_path" in result and result["pdf_path"]:
result["pdf_filename"] = os.path.basename(result["pdf_path"])
if result and "json_path" in result and result["json_path"]:
result["json_filename"] = os.path.basename(result["json_path"])
return {
"user_id": item.user_id,
"url": str(item.url),
"status": "success",
"result": result
}
except Exception as e:
logger.error(f"Failed processing for user '{item.user_id}': {e}", exc_info=True)
return {
"user_id": item.user_id,
"url": str(item.url),
"status": "error",
"detail": str(e)
}
@app.get("/health", summary="Check API Health")
def health_check():
return {"status": "ok"}
@app.post("/analyze", summary="Analyze a single audio URL")
def analyze_audio(request: AudioItem):
"""
Analyze a single audio URL and return analysis results.
"""
logger.info(f"Received request to analyze audio for user '{request.user_id}' with URL: {request.url}")
result = process_item_wrapper(request)
logger.info(f"Finished processing audio for user '{request.user_id}'")
return {"analysis_result": result}
@app.get("/outputs/{file_name}", summary="Download an output file")
def get_output_file(file_name: str):
"""
Downloads a generated report (PDF or JSON) by its filename.
"""
file_path = os.path.join(OUTPUT_DIR, file_name)
# Security check to prevent accessing files outside the output directory
if not os.path.abspath(file_path).startswith(os.path.abspath(OUTPUT_DIR)):
raise HTTPException(status_code=403, detail="Forbidden: Access is denied.")
if os.path.exists(file_path):
return FileResponse(path=file_path, media_type='application/octet-stream', filename=file_name)
else:
raise HTTPException(status_code=404, detail="File not found.")