| import logging |
| import os |
| from fastapi import FastAPI, HTTPException |
| from fastapi.responses import FileResponse |
| from pydantic import BaseModel, HttpUrl |
| from typing import Dict |
| from process_interview import process_interview |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
| app = FastAPI( |
| title="EvalBot API", |
| description="API to analyze a single audio interview URL.", |
| version="1.0.0" |
| ) |
|
|
| |
| OUTPUT_DIR = "./processed_audio" |
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
| class AudioItem(BaseModel): |
| url: HttpUrl |
| user_id: str |
|
|
| def process_item_wrapper(item: AudioItem) -> Dict: |
| try: |
| logger.info(f"Starting analysis for user '{item.user_id}' with URL: {item.url}") |
| result = process_interview(str(item.url)) |
| |
| if result and "pdf_path" in result and result["pdf_path"]: |
| result["pdf_filename"] = os.path.basename(result["pdf_path"]) |
| if result and "json_path" in result and result["json_path"]: |
| result["json_filename"] = os.path.basename(result["json_path"]) |
| |
| return { |
| "user_id": item.user_id, |
| "url": str(item.url), |
| "status": "success", |
| "result": result |
| } |
| except Exception as e: |
| logger.error(f"Failed processing for user '{item.user_id}': {e}", exc_info=True) |
| return { |
| "user_id": item.user_id, |
| "url": str(item.url), |
| "status": "error", |
| "detail": str(e) |
| } |
|
|
| @app.get("/health", summary="Check API Health") |
| def health_check(): |
| return {"status": "ok"} |
|
|
| @app.post("/analyze", summary="Analyze a single audio URL") |
| def analyze_audio(request: AudioItem): |
| """ |
| Analyze a single audio URL and return analysis results. |
| """ |
| logger.info(f"Received request to analyze audio for user '{request.user_id}' with URL: {request.url}") |
| result = process_item_wrapper(request) |
| logger.info(f"Finished processing audio for user '{request.user_id}'") |
| return {"analysis_result": result} |
|
|
| @app.get("/outputs/{file_name}", summary="Download an output file") |
| def get_output_file(file_name: str): |
| """ |
| Downloads a generated report (PDF or JSON) by its filename. |
| """ |
| file_path = os.path.join(OUTPUT_DIR, file_name) |
|
|
| |
| if not os.path.abspath(file_path).startswith(os.path.abspath(OUTPUT_DIR)): |
| raise HTTPException(status_code=403, detail="Forbidden: Access is denied.") |
|
|
| if os.path.exists(file_path): |
| return FileResponse(path=file_path, media_type='application/octet-stream', filename=file_name) |
| else: |
| raise HTTPException(status_code=404, detail="File not found.") |