| import os |
| import json |
| import requests |
| import shutil |
| import cloudinary |
| import cloudinary.uploader |
| from requests.adapters import HTTPAdapter |
| from urllib3.util.retry import Retry |
| from fastapi import FastAPI, Body, HTTPException, BackgroundTasks |
| from dotenv import load_dotenv |
| from datetime import datetime |
| from pydantic import BaseModel, HttpUrl |
| from typing import List |
| from pipeline import run_intervision_pipeline |
|
|
| |
| retry_strategy = Retry( |
| total=3, |
| backoff_factor=1, |
| status_forcelist=[429, 500, 502, 503, 504], |
| ) |
| adapter = HTTPAdapter(max_retries=retry_strategy) |
| http = requests.Session() |
| http.mount("https://", adapter) |
| http.mount("http://", adapter) |
|
|
| |
| load_dotenv() |
|
|
| app = FastAPI(title="Intervision AI Engine") |
|
|
| |
| cloudinary.config( |
| cloud_name = os.getenv("CLOUDINARY_CLOUD_NAME"), |
| api_key = os.getenv("CLOUDINARY_API_KEY"), |
| api_secret = os.getenv("CLOUDINARY_API_SECRET") |
| ) |
|
|
| |
| RESULT_DIR = "temp_data/results" |
| UPLOAD_DIR = "temp_data/uploads" |
| os.makedirs(RESULT_DIR, exist_ok=True) |
| os.makedirs(UPLOAD_DIR, exist_ok=True) |
|
|
| class Answer(BaseModel): |
| aiQuestionId: int |
| questionText: str |
| expectedAnswer: str |
| isAnswered: bool |
| isSkipped: bool |
| isFailed: bool |
| startedAt: str |
| submittedAt: str |
|
|
| class InterviewRequest(BaseModel): |
| sessionId: str |
| originalVideoUrl: HttpUrl |
| callbackBaseUrl: HttpUrl |
| answers: List[Answer] |
| |
| class DeleteVideoRequest(BaseModel): |
| videoUrl: str |
|
|
| def time_to_seconds(t_str: str) -> int: |
| """Converts HH:MM:SS timestamp format to total seconds.""" |
| if not t_str: return 0 |
| h, m, s = map(int, t_str.split(':')) |
| return h * 3600 + m * 60 + s |
|
|
| def background_processing(session_data: dict): |
| session_id = session_data.get('sessionId') |
| video_url = session_data.get('originalVideoUrl') |
| callback_url = session_data.get('callbackBaseUrl') |
|
|
| session_dir = os.path.join(RESULT_DIR, session_id) |
| os.makedirs(session_dir, exist_ok=True) |
| |
| print(f"[LOG] Processing started for session: {session_id}") |
| |
| local_input_path = os.path.join(UPLOAD_DIR, f"{session_id}_input.mp4") |
|
|
| |
| try: |
| response = http.get(str(video_url), stream=True, timeout=300) |
| response.raise_for_status() |
| with open(local_input_path, 'wb') as f: |
| for chunk in response.iter_content(chunk_size=1024*1024): |
| f.write(chunk) |
| print(f"[LOG] Download complete: {local_input_path}") |
| except Exception as e: |
| print(f"[DOWNLOAD ERROR]: {e}") |
| return |
|
|
| |
| final_questions = [] |
| skipped_failed_reports = [] |
| |
| for q in session_data.get('answers', []): |
| if q.get('isAnswered'): |
| final_questions.append({ |
| "question_id": q['aiQuestionId'], |
| "question_text": q['questionText'], |
| "ideal_answer": q['expectedAnswer'], |
| "start_time": time_to_seconds(q['startedAt']), |
| "end_time": time_to_seconds(q['submittedAt']) |
| }) |
| else: |
| skipped_failed_reports.append({ |
| "questionId": q['aiQuestionId'], |
| "userAnswerText": "N/A", |
| "score": 0.0, "relevance": 0.0, "confidence": 0.0, |
| "stress": 0.0, "clarity": 0.0, "pauses": 0.0, |
| "toneOfVoice": 3, |
| "status": "skipped" if q.get('isSkipped') else "failed" |
| }) |
|
|
| ai_results = [] |
| final_video_url = None |
| |
| |
| if final_questions: |
| print(f"[LOG] Running pipeline for {len(final_questions)} questions...") |
| |
| pipeline_status = run_intervision_pipeline(local_input_path, final_questions, session_dir) |
| print(f"[LOG] Pipeline Status: {pipeline_status}") |
|
|
| report_path = os.path.join(session_dir, "report.json") |
| |
| final_video_path = os.path.join(session_dir, "Intervision_Final_Report.mp4") |
| |
| |
| if os.path.exists(report_path): |
| with open(report_path, "r") as f: |
| ai_results = json.load(f).get("listOfAnswerReport", []) |
|
|
| |
| if os.path.exists(final_video_path): |
| print(f"[LOG] Uploading final video to Cloudinary...") |
| try: |
| upload_res = cloudinary.uploader.upload( |
| final_video_path, |
| public_id=f"res_{session_id}", |
| folder="intervision_results", |
| resource_type="video" |
| ) |
| final_video_url = upload_res.get("secure_url") |
| print(f"[LOG] Upload successful: {final_video_url}") |
| except Exception as e: |
| print(f"[UPLOAD ERROR]: {e}") |
| else: |
| print(f"[ERROR] Final video file not found at {final_video_path}") |
|
|
| |
| final_payload = { |
| "sessionId": session_id, |
| "finalVideoUrl": final_video_url, |
| "report": ai_results + skipped_failed_reports |
| } |
| |
| try: |
| callback_endpoint = f"{str(callback_url).rstrip('/')}/api/ai-callback" |
| callback_resp = requests.post(callback_endpoint, json=final_payload, timeout=30) |
| print(f"[LOG] Callback sent. Status: {callback_resp.status_code}") |
|
|
| |
| print(f"[LOG] Cleaning up session {session_id}...") |
| if os.path.exists(local_input_path): |
| os.remove(local_input_path) |
| if os.path.exists(session_dir): |
| shutil.rmtree(session_dir) |
| |
| except Exception as e: |
| print(f"[CALLBACK/CLEANUP ERROR]: {e}") |
|
|
| @app.get("/") |
| async def root(): |
| return { |
| "status": "Intervision AI Engine Running", |
| "message": "API is working successfully" |
| } |
|
|
| @app.post("/process-interview") |
| async def process_interview(background_tasks: BackgroundTasks, data: InterviewRequest): |
| background_tasks.add_task(background_processing, data.dict()) |
| return { |
| "message": "Processing started", |
| "sessionId": data.sessionId |
| } |
|
|
| @app.post("/delete-video-by-url") |
| async def delete_video_by_url(data: DeleteVideoRequest): |
|
|
| video_url = data.videoUrl |
| if not video_url: |
| raise HTTPException(status_code=400, detail="videoUrl is required") |
|
|
| try: |
| |
| |
| url_parts = video_url.split('/') |
| filename_with_ext = url_parts[-1] |
| filename = filename_with_ext.split('.')[0] |
| |
| |
| folder = url_parts[-2] if "intervision_results" in url_parts[-2] else "" |
| public_id = f"{folder}/{filename}" if folder else filename |
|
|
| |
| result = cloudinary.uploader.destroy(public_id, resource_type="video") |
|
|
| if result.get("result") == "ok": |
| return {"status": "success", "message": f"Deleted {public_id}"} |
| return {"status": "failed", "details": result} |
|
|
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=8000) |