import os import json import requests import shutil import cloudinary import cloudinary.uploader from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from fastapi import FastAPI, Body, HTTPException, BackgroundTasks from dotenv import load_dotenv from datetime import datetime from pydantic import BaseModel, HttpUrl from typing import List from pipeline import run_intervision_pipeline # --- Setup Retry Strategy --- retry_strategy = Retry( total=3, backoff_factor=1, # Wait 1s, 2s, 4s between retries status_forcelist=[429, 500, 502, 503, 504], ) adapter = HTTPAdapter(max_retries=retry_strategy) http = requests.Session() http.mount("https://", adapter) http.mount("http://", adapter) # Load environment variables from .env file load_dotenv() app = FastAPI(title="Intervision AI Engine") # Cloudinary Configuration cloudinary.config( cloud_name = os.getenv("CLOUDINARY_CLOUD_NAME"), api_key = os.getenv("CLOUDINARY_API_KEY"), api_secret = os.getenv("CLOUDINARY_API_SECRET") ) # Directory Setup RESULT_DIR = "temp_data/results" UPLOAD_DIR = "temp_data/uploads" os.makedirs(RESULT_DIR, exist_ok=True) os.makedirs(UPLOAD_DIR, exist_ok=True) class Answer(BaseModel): aiQuestionId: int questionText: str expectedAnswer: str isAnswered: bool isSkipped: bool isFailed: bool startedAt: str submittedAt: str class InterviewRequest(BaseModel): sessionId: str originalVideoUrl: HttpUrl callbackBaseUrl: HttpUrl answers: List[Answer] class DeleteVideoRequest(BaseModel): videoUrl: str def time_to_seconds(t_str: str) -> int: """Converts HH:MM:SS timestamp format to total seconds.""" if not t_str: return 0 h, m, s = map(int, t_str.split(':')) return h * 3600 + m * 60 + s def background_processing(session_data: dict): session_id = session_data.get('sessionId') video_url = session_data.get('originalVideoUrl') callback_url = session_data.get('callbackBaseUrl') session_dir = os.path.join(RESULT_DIR, session_id) os.makedirs(session_dir, exist_ok=True) print(f"[LOG] Processing started for session: {session_id}") local_input_path = os.path.join(UPLOAD_DIR, f"{session_id}_input.mp4") # 1. Download the original video from Cloudinary/URL try: response = http.get(str(video_url), stream=True, timeout=300) response.raise_for_status() with open(local_input_path, 'wb') as f: for chunk in response.iter_content(chunk_size=1024*1024): f.write(chunk) print(f"[LOG] Download complete: {local_input_path}") except Exception as e: print(f"[DOWNLOAD ERROR]: {e}") return # 2. Prepare questions for the pipeline final_questions = [] skipped_failed_reports = [] for q in session_data.get('answers', []): if q.get('isAnswered'): final_questions.append({ "question_id": q['aiQuestionId'], "question_text": q['questionText'], "ideal_answer": q['expectedAnswer'], "start_time": time_to_seconds(q['startedAt']), "end_time": time_to_seconds(q['submittedAt']) }) else: skipped_failed_reports.append({ "questionId": q['aiQuestionId'], "userAnswerText": "N/A", "score": 0.0, "relevance": 0.0, "confidence": 0.0, "stress": 0.0, "clarity": 0.0, "pauses": 0.0, "toneOfVoice": 3, "status": "skipped" if q.get('isSkipped') else "failed" }) ai_results = [] final_video_url = None # 3. Run the AI Pipeline if final_questions: print(f"[LOG] Running pipeline for {len(final_questions)} questions...") # Capture the return message to ensure the pipeline finished pipeline_status = run_intervision_pipeline(local_input_path, final_questions, session_dir) print(f"[LOG] Pipeline Status: {pipeline_status}") report_path = os.path.join(session_dir, "report.json") # Ensure this filename exactly matches the one inside run_intervision_pipeline final_video_path = os.path.join(session_dir, "Intervision_Final_Report.mp4") # Parse JSON results if os.path.exists(report_path): with open(report_path, "r") as f: ai_results = json.load(f).get("listOfAnswerReport", []) # 4. Upload the generated video to Cloudinary if os.path.exists(final_video_path): print(f"[LOG] Uploading final video to Cloudinary...") try: upload_res = cloudinary.uploader.upload( final_video_path, public_id=f"res_{session_id}", folder="intervision_results", resource_type="video" ) final_video_url = upload_res.get("secure_url") print(f"[LOG] Upload successful: {final_video_url}") except Exception as e: print(f"[UPLOAD ERROR]: {e}") else: print(f"[ERROR] Final video file not found at {final_video_path}") # 5. Send results back via Callback final_payload = { "sessionId": session_id, "finalVideoUrl": final_video_url, "report": ai_results + skipped_failed_reports } try: callback_endpoint = f"{str(callback_url).rstrip('/')}/api/ai-callback" callback_resp = requests.post(callback_endpoint, json=final_payload, timeout=30) print(f"[LOG] Callback sent. Status: {callback_resp.status_code}") # 6. Cleanup local storage print(f"[LOG] Cleaning up session {session_id}...") if os.path.exists(local_input_path): os.remove(local_input_path) if os.path.exists(session_dir): shutil.rmtree(session_dir) except Exception as e: print(f"[CALLBACK/CLEANUP ERROR]: {e}") @app.get("/") async def root(): return { "status": "Intervision AI Engine Running", "message": "API is working successfully" } @app.post("/process-interview") async def process_interview(background_tasks: BackgroundTasks, data: InterviewRequest): background_tasks.add_task(background_processing, data.dict()) return { "message": "Processing started", "sessionId": data.sessionId } @app.post("/delete-video-by-url") async def delete_video_by_url(data: DeleteVideoRequest): video_url = data.videoUrl if not video_url: raise HTTPException(status_code=400, detail="videoUrl is required") try: # Logic to extract the public_id from a Cloudinary URL # Example: .../folder/public_id.mp4 -> folder/public_id url_parts = video_url.split('/') filename_with_ext = url_parts[-1] filename = filename_with_ext.split('.')[0] # Check if the video is inside the results folder folder = url_parts[-2] if "intervision_results" in url_parts[-2] else "" public_id = f"{folder}/{filename}" if folder else filename # Trigger deletion from Cloudinary result = cloudinary.uploader.destroy(public_id, resource_type="video") if result.get("result") == "ok": return {"status": "success", "message": f"Deleted {public_id}"} return {"status": "failed", "details": result} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)