File size: 7,652 Bytes
963faec 7c2e183 963faec a4279dd 963faec a4279dd 963faec a4279dd 963faec a4279dd 963faec a4279dd 963faec a4279dd 963faec a4279dd 963faec a4279dd 963faec a4279dd 963faec a4279dd 7c2e183 963faec 7c2e183 23a7f39 963faec 23a7f39 963faec 23a7f39 963faec 23a7f39 963faec a4279dd 963faec a4279dd 963faec a4279dd 963faec 7c2e183 23a7f39 a4279dd 963faec 23a7f39 7c2e183 23a7f39 963faec 23a7f39 7c2e183 23a7f39 7c2e183 23a7f39 963faec 23a7f39 7c2e183 23a7f39 963faec 23a7f39 7c2e183 23a7f39 7c2e183 a4279dd 963faec 7c2e183 963faec a4279dd 963faec a4279dd 963faec a4279dd 963faec a4279dd 963faec a4279dd 963faec a4279dd 963faec a4279dd 963faec a4279dd 963faec a4279dd 963faec | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | import os
import json
import requests
import shutil
import cloudinary
import cloudinary.uploader
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from fastapi import FastAPI, Body, HTTPException, BackgroundTasks
from dotenv import load_dotenv
from datetime import datetime
from pydantic import BaseModel, HttpUrl
from typing import List
from pipeline import run_intervision_pipeline
# --- Setup Retry Strategy ---
retry_strategy = Retry(
total=3,
backoff_factor=1, # Wait 1s, 2s, 4s between retries
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
http = requests.Session()
http.mount("https://", adapter)
http.mount("http://", adapter)
# Load environment variables from .env file
load_dotenv()
app = FastAPI(title="Intervision AI Engine")
# Cloudinary Configuration
cloudinary.config(
cloud_name = os.getenv("CLOUDINARY_CLOUD_NAME"),
api_key = os.getenv("CLOUDINARY_API_KEY"),
api_secret = os.getenv("CLOUDINARY_API_SECRET")
)
# Directory Setup
RESULT_DIR = "temp_data/results"
UPLOAD_DIR = "temp_data/uploads"
os.makedirs(RESULT_DIR, exist_ok=True)
os.makedirs(UPLOAD_DIR, exist_ok=True)
class Answer(BaseModel):
aiQuestionId: int
questionText: str
expectedAnswer: str
isAnswered: bool
isSkipped: bool
isFailed: bool
startedAt: str
submittedAt: str
class InterviewRequest(BaseModel):
sessionId: str
originalVideoUrl: HttpUrl
callbackBaseUrl: HttpUrl
answers: List[Answer]
class DeleteVideoRequest(BaseModel):
videoUrl: str
def time_to_seconds(t_str: str) -> int:
"""Converts HH:MM:SS timestamp format to total seconds."""
if not t_str: return 0
h, m, s = map(int, t_str.split(':'))
return h * 3600 + m * 60 + s
def background_processing(session_data: dict):
session_id = session_data.get('sessionId')
video_url = session_data.get('originalVideoUrl')
callback_url = session_data.get('callbackBaseUrl')
session_dir = os.path.join(RESULT_DIR, session_id)
os.makedirs(session_dir, exist_ok=True)
print(f"[LOG] Processing started for session: {session_id}")
local_input_path = os.path.join(UPLOAD_DIR, f"{session_id}_input.mp4")
# 1. Download the original video from Cloudinary/URL
try:
response = http.get(str(video_url), stream=True, timeout=300)
response.raise_for_status()
with open(local_input_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=1024*1024):
f.write(chunk)
print(f"[LOG] Download complete: {local_input_path}")
except Exception as e:
print(f"[DOWNLOAD ERROR]: {e}")
return
# 2. Prepare questions for the pipeline
final_questions = []
skipped_failed_reports = []
for q in session_data.get('answers', []):
if q.get('isAnswered'):
final_questions.append({
"question_id": q['aiQuestionId'],
"question_text": q['questionText'],
"ideal_answer": q['expectedAnswer'],
"start_time": time_to_seconds(q['startedAt']),
"end_time": time_to_seconds(q['submittedAt'])
})
else:
skipped_failed_reports.append({
"questionId": q['aiQuestionId'],
"userAnswerText": "N/A",
"score": 0.0, "relevance": 0.0, "confidence": 0.0,
"stress": 0.0, "clarity": 0.0, "pauses": 0.0,
"toneOfVoice": 3,
"status": "skipped" if q.get('isSkipped') else "failed"
})
ai_results = []
final_video_url = None
# 3. Run the AI Pipeline
if final_questions:
print(f"[LOG] Running pipeline for {len(final_questions)} questions...")
# Capture the return message to ensure the pipeline finished
pipeline_status = run_intervision_pipeline(local_input_path, final_questions, session_dir)
print(f"[LOG] Pipeline Status: {pipeline_status}")
report_path = os.path.join(session_dir, "report.json")
# Ensure this filename exactly matches the one inside run_intervision_pipeline
final_video_path = os.path.join(session_dir, "Intervision_Final_Report.mp4")
# Parse JSON results
if os.path.exists(report_path):
with open(report_path, "r") as f:
ai_results = json.load(f).get("listOfAnswerReport", [])
# 4. Upload the generated video to Cloudinary
if os.path.exists(final_video_path):
print(f"[LOG] Uploading final video to Cloudinary...")
try:
upload_res = cloudinary.uploader.upload(
final_video_path,
public_id=f"res_{session_id}",
folder="intervision_results",
resource_type="video"
)
final_video_url = upload_res.get("secure_url")
print(f"[LOG] Upload successful: {final_video_url}")
except Exception as e:
print(f"[UPLOAD ERROR]: {e}")
else:
print(f"[ERROR] Final video file not found at {final_video_path}")
# 5. Send results back via Callback
final_payload = {
"sessionId": session_id,
"finalVideoUrl": final_video_url,
"report": ai_results + skipped_failed_reports
}
try:
callback_endpoint = f"{str(callback_url).rstrip('/')}/api/ai-callback"
callback_resp = requests.post(callback_endpoint, json=final_payload, timeout=30)
print(f"[LOG] Callback sent. Status: {callback_resp.status_code}")
# 6. Cleanup local storage
print(f"[LOG] Cleaning up session {session_id}...")
if os.path.exists(local_input_path):
os.remove(local_input_path)
if os.path.exists(session_dir):
shutil.rmtree(session_dir)
except Exception as e:
print(f"[CALLBACK/CLEANUP ERROR]: {e}")
@app.get("/")
async def root():
return {
"status": "Intervision AI Engine Running",
"message": "API is working successfully"
}
@app.post("/process-interview")
async def process_interview(background_tasks: BackgroundTasks, data: InterviewRequest):
background_tasks.add_task(background_processing, data.dict())
return {
"message": "Processing started",
"sessionId": data.sessionId
}
@app.post("/delete-video-by-url")
async def delete_video_by_url(data: DeleteVideoRequest):
video_url = data.videoUrl
if not video_url:
raise HTTPException(status_code=400, detail="videoUrl is required")
try:
# Logic to extract the public_id from a Cloudinary URL
# Example: .../folder/public_id.mp4 -> folder/public_id
url_parts = video_url.split('/')
filename_with_ext = url_parts[-1]
filename = filename_with_ext.split('.')[0]
# Check if the video is inside the results folder
folder = url_parts[-2] if "intervision_results" in url_parts[-2] else ""
public_id = f"{folder}/{filename}" if folder else filename
# Trigger deletion from Cloudinary
result = cloudinary.uploader.destroy(public_id, resource_type="video")
if result.get("result") == "ok":
return {"status": "success", "message": f"Deleted {public_id}"}
return {"status": "failed", "details": result}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) |