Fayza38 commited on
Commit
a4279dd
·
verified ·
1 Parent(s): 963faec

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +85 -80
app.py CHANGED
@@ -6,16 +6,16 @@ import cloudinary.uploader
6
  from requests.adapters import HTTPAdapter
7
  from urllib3.util.retry import Retry
8
  from fastapi import FastAPI, Body, HTTPException, BackgroundTasks
9
- from pydantic import BaseModel, HttpUrl
10
- from typing import List, Optional
11
  from dotenv import load_dotenv
 
 
 
12
  from pipeline import run_intervision_pipeline
13
 
14
- # --- 1. Setup Retry Strategy for Robust Downloads ---
15
- # This ensures that if the video download fails momentarily, it retries 3 times.
16
  retry_strategy = Retry(
17
  total=3,
18
- backoff_factor=1,
19
  status_forcelist=[429, 500, 502, 503, 504],
20
  )
21
  adapter = HTTPAdapter(max_retries=retry_strategy)
@@ -26,71 +26,63 @@ http.mount("http://", adapter)
26
  # Load environment variables from .env file
27
  load_dotenv()
28
 
29
- app = FastAPI(
30
- title="Intervision AI Engine",
31
- description="Asynchronous AI Pipeline for Interview Analysis",
32
- version="1.1.0"
33
- )
34
 
35
- # --- 2. Cloudinary Configuration ---
36
- cloudinary.config(
37
- cloud_name=os.getenv("CLOUDINARY_CLOUD_NAME"),
38
- api_key=os.getenv("CLOUDINARY_API_KEY"),
39
- api_secret=os.getenv("CLOUDINARY_API_SECRET")
40
  )
41
 
42
- # --- 3. Directory Setup ---
43
  RESULT_DIR = "temp_data/results"
44
  UPLOAD_DIR = "temp_data/uploads"
45
  os.makedirs(RESULT_DIR, exist_ok=True)
46
  os.makedirs(UPLOAD_DIR, exist_ok=True)
47
 
48
- # --- 4. Pydantic Models (Fixes the "additionalProp1" issue) ---
49
- class AnswerDetail(BaseModel):
50
  aiQuestionId: int
51
  questionText: str
52
  expectedAnswer: str
53
  isAnswered: bool
54
  isSkipped: bool
55
- startedAt: str # Expected format "HH:MM:SS"
56
- submittedAt: str # Expected format "HH:MM:SS"
 
57
 
58
  class InterviewRequest(BaseModel):
59
  sessionId: str
60
  originalVideoUrl: HttpUrl
61
  callbackBaseUrl: HttpUrl
62
- answers: List[AnswerDetail]
 
 
 
63
 
64
- # --- 5. Helper Functions ---
65
  def time_to_seconds(t_str: str) -> int:
66
- """Converts HH:MM:SS or MM:SS timestamp format to total seconds."""
67
- if not t_str or ":" not in t_str: return 0
68
- parts = list(map(int, t_str.split(':')))
69
- if len(parts) == 3:
70
- h, m, s = parts
71
- return h * 3600 + m * 60 + s
72
- elif len(parts) == 2:
73
- m, s = parts
74
- return m * 60 + s
75
- return 0
76
-
77
- # --- 6. Background Processing Logic ---
78
- def background_processing(session_data: InterviewRequest):
79
  """
80
  Handles heavy AI processing: video download, pipeline execution,
81
  result upload, and backend notification (callback).
82
  """
83
- session_id = session_data.sessionId
84
- video_url = str(session_data.originalVideoUrl)
85
- callback_url = str(session_data.callbackBaseUrl)
86
 
87
  print(f"[LOG] Processing started for session: {session_id}")
88
 
 
89
  local_input_path = os.path.join(UPLOAD_DIR, f"{session_id}_input.mp4")
90
-
91
- # Step 1: Download the original video
92
  try:
93
  print(f"[LOG] Downloading video: {video_url}")
 
94
  response = http.get(video_url, stream=True, timeout=300)
95
  response.raise_for_status()
96
  with open(local_input_path, 'wb') as f:
@@ -98,60 +90,64 @@ def background_processing(session_data: InterviewRequest):
98
  f.write(chunk)
99
  except Exception as e:
100
  print(f"[DOWNLOAD ERROR]: {e}")
 
101
  return
102
 
103
- # Step 2: Prepare questions for the Pipeline
104
  final_questions = []
105
  skipped_failed_reports = []
106
 
107
- for q in session_data.answers:
108
- if q.isAnswered:
109
  final_questions.append({
110
- "question_id": q.aiQuestionId,
111
- "question_text": q.questionText,
112
- "ideal_answer": q.expectedAnswer,
113
- "start_time": time_to_seconds(q.startedAt),
114
- "end_time": time_to_seconds(q.submittedAt)
115
  })
116
  else:
 
117
  skipped_failed_reports.append({
118
- "questionId": q.aiQuestionId,
119
  "userAnswerText": "N/A",
120
- "score": 0.0,
121
  "relevance": 0.0,
122
  "confidence": 0.0,
123
  "stress": 0.0,
124
  "clarity": 0.0,
125
  "pauses": 0.0,
126
- "toneOfVoice": 3, # Default to Natural/N/A
127
- "status": "skipped" if q.isSkipped else "failed"
128
  })
129
 
130
- # Step 3: Execute AI Pipeline
131
  ai_results = []
132
  if final_questions:
 
133
  run_intervision_pipeline(local_input_path, final_questions, RESULT_DIR)
134
  report_path = os.path.join(RESULT_DIR, "report.json")
135
  if os.path.exists(report_path):
136
  with open(report_path, "r") as f:
137
  ai_results = json.load(f).get("listOfAnswerReport", [])
138
 
139
- # Step 4: Upload processed video to Cloudinary
140
  final_video_path = os.path.join(RESULT_DIR, "Intervision_Final_Result.mp4")
141
  final_video_url = None
142
  if os.path.exists(final_video_path):
143
  try:
144
  upload_res = cloudinary.uploader.upload(
145
- final_video_path,
146
- public_id=f"res_{session_id}",
147
  folder="intervision_results",
148
- resource_type="video"
 
149
  )
150
  final_video_url = upload_res.get("secure_url")
151
  except Exception as e:
152
  print(f"[UPLOAD ERROR]: {e}")
153
 
154
- # Step 5: Construct final payload and notify Backend
155
  final_payload = {
156
  "sessionId": session_id,
157
  "finalVideoUrl": final_video_url,
@@ -159,48 +155,57 @@ def background_processing(session_data: InterviewRequest):
159
  }
160
 
161
  try:
 
162
  cb_response = requests.post(f"{callback_url}/api/ai-callback", json=final_payload, timeout=30)
163
- print(f"[LOG] Callback sent. Status: {cb_response.status_code}")
164
 
165
- # Cleanup local files
166
  if os.path.exists(local_input_path): os.remove(local_input_path)
167
  if os.path.exists(final_video_path): os.remove(final_video_path)
 
168
  except Exception as e:
169
  print(f"[CALLBACK ERROR]: {e}")
170
 
171
- # --- 7. API Routes ---
172
-
173
  @app.get("/")
174
  async def root():
175
- """Health check endpoint to verify the service is running."""
176
  return {
177
- "status": "online",
178
- "message": "Intervision AI Engine is running",
179
- "documentation": "/docs"
180
  }
181
 
182
- @app.post("/process-interview/")
183
  async def process_interview(background_tasks: BackgroundTasks, data: InterviewRequest):
184
- """
185
- Entry point to start the AI analysis asynchronously.
186
- Receives validated data via InterviewRequest model.
187
- """
188
- background_tasks.add_task(background_processing, data)
189
- return {"message": "Processing started", "sessionId": data.sessionId}
190
 
191
- @app.post("/delete-video-by-url/")
192
- async def delete_video_by_url(payload: dict = Body(...)):
193
- """Deletes a video from Cloudinary based on its URL."""
194
- video_url = payload.get("videoUrl")
195
  if not video_url:
196
  raise HTTPException(status_code=400, detail="videoUrl is required")
 
197
  try:
 
 
198
  url_parts = video_url.split('/')
199
- filename = url_parts[-1].split('.')[0]
200
- folder = "intervision_results" if "intervision_results" in video_url else ""
 
 
 
201
  public_id = f"{folder}/{filename}" if folder else filename
 
 
202
  result = cloudinary.uploader.destroy(public_id, resource_type="video")
203
- return {"status": result.get("result"), "public_id": public_id}
 
 
 
 
204
  except Exception as e:
205
  raise HTTPException(status_code=500, detail=str(e))
206
 
 
6
  from requests.adapters import HTTPAdapter
7
  from urllib3.util.retry import Retry
8
  from fastapi import FastAPI, Body, HTTPException, BackgroundTasks
 
 
9
  from dotenv import load_dotenv
10
+ from datetime import datetime
11
+ from pydantic import BaseModel, HttpUrl
12
+ from typing import List
13
  from pipeline import run_intervision_pipeline
14
 
15
+ # --- Setup Retry Strategy ---
 
16
  retry_strategy = Retry(
17
  total=3,
18
+ backoff_factor=1, # Wait 1s, 2s, 4s between retries
19
  status_forcelist=[429, 500, 502, 503, 504],
20
  )
21
  adapter = HTTPAdapter(max_retries=retry_strategy)
 
26
  # Load environment variables from .env file
27
  load_dotenv()
28
 
29
+ app = FastAPI(title="Intervision AI Engine")
 
 
 
 
30
 
31
+ # Cloudinary Configuration
32
+ cloudinary.config(
33
+ cloud_name = os.getenv("CLOUDINARY_CLOUD_NAME"),
34
+ api_key = os.getenv("CLOUDINARY_API_KEY"),
35
+ api_secret = os.getenv("CLOUDINARY_API_SECRET")
36
  )
37
 
38
+ # Directory Setup
39
  RESULT_DIR = "temp_data/results"
40
  UPLOAD_DIR = "temp_data/uploads"
41
  os.makedirs(RESULT_DIR, exist_ok=True)
42
  os.makedirs(UPLOAD_DIR, exist_ok=True)
43
 
44
+ class Answer(BaseModel):
 
45
  aiQuestionId: int
46
  questionText: str
47
  expectedAnswer: str
48
  isAnswered: bool
49
  isSkipped: bool
50
+ isFailed: bool
51
+ startedAt: str
52
+ submittedAt: str
53
 
54
  class InterviewRequest(BaseModel):
55
  sessionId: str
56
  originalVideoUrl: HttpUrl
57
  callbackBaseUrl: HttpUrl
58
+ answers: List[Answer]
59
+
60
+ class DeleteVideoRequest(BaseModel):
61
+ videoUrl: str
62
 
 
63
  def time_to_seconds(t_str: str) -> int:
64
+ """Converts HH:MM:SS timestamp format to total seconds."""
65
+ if not t_str: return 0
66
+ h, m, s = map(int, t_str.split(':'))
67
+ return h * 3600 + m * 60 + s
68
+
69
+ def background_processing(session_data: dict):
 
 
 
 
 
 
 
70
  """
71
  Handles heavy AI processing: video download, pipeline execution,
72
  result upload, and backend notification (callback).
73
  """
74
+ session_id = session_data.get('sessionId')
75
+ video_url = session_data.get('originalVideoUrl')
76
+ callback_url = session_data.get('callbackBaseUrl')
77
 
78
  print(f"[LOG] Processing started for session: {session_id}")
79
 
80
+ # 1. Download the original video from the provided URL
81
  local_input_path = os.path.join(UPLOAD_DIR, f"{session_id}_input.mp4")
82
+ # 1. Download with increased timeout and Retry logic
 
83
  try:
84
  print(f"[LOG] Downloading video: {video_url}")
85
+ # Increased timeout to 300s (5 minutes) for large files
86
  response = http.get(video_url, stream=True, timeout=300)
87
  response.raise_for_status()
88
  with open(local_input_path, 'wb') as f:
 
90
  f.write(chunk)
91
  except Exception as e:
92
  print(f"[DOWNLOAD ERROR]: {e}")
93
+ # Notify backend that it failed due to download
94
  return
95
 
96
+ # 2. Prepare question list for the AI Pipeline
97
  final_questions = []
98
  skipped_failed_reports = []
99
 
100
+ for q in session_data.get('answers', []):
101
+ if q.get('isAnswered'):
102
  final_questions.append({
103
+ "question_id": q['aiQuestionId'],
104
+ "question_text": q['questionText'],
105
+ "ideal_answer": q['expectedAnswer'],
106
+ "start_time": time_to_seconds(q['startedAt']),
107
+ "end_time": time_to_seconds(q['submittedAt'])
108
  })
109
  else:
110
+ # Handle questions that weren't answered during the session
111
  skipped_failed_reports.append({
112
+ "questionId": q['aiQuestionId'],
113
  "userAnswerText": "N/A",
114
+ "score": 0.0,
115
  "relevance": 0.0,
116
  "confidence": 0.0,
117
  "stress": 0.0,
118
  "clarity": 0.0,
119
  "pauses": 0.0,
120
+ "toneOfVoice": "N/A",
121
+ "status": "skipped" if q.get('isSkipped') else "failed"
122
  })
123
 
124
+ # 3. Execute AI Pipeline (Analysis & Visualization)
125
  ai_results = []
126
  if final_questions:
127
+ # run_intervision_pipeline generates Intervision_Final_Result.mp4
128
  run_intervision_pipeline(local_input_path, final_questions, RESULT_DIR)
129
  report_path = os.path.join(RESULT_DIR, "report.json")
130
  if os.path.exists(report_path):
131
  with open(report_path, "r") as f:
132
  ai_results = json.load(f).get("listOfAnswerReport", [])
133
 
134
+ # 4. Upload the processed video to Cloudinary
135
  final_video_path = os.path.join(RESULT_DIR, "Intervision_Final_Result.mp4")
136
  final_video_url = None
137
  if os.path.exists(final_video_path):
138
  try:
139
  upload_res = cloudinary.uploader.upload(
140
+ final_video_path,
141
+ public_id=f"res_{session_id}",
142
  folder="intervision_results",
143
+ resource_type="video",
144
+ chunk_size=6000000
145
  )
146
  final_video_url = upload_res.get("secure_url")
147
  except Exception as e:
148
  print(f"[UPLOAD ERROR]: {e}")
149
 
150
+ # 5. Construct final payload and notify Backend via Callback
151
  final_payload = {
152
  "sessionId": session_id,
153
  "finalVideoUrl": final_video_url,
 
155
  }
156
 
157
  try:
158
+ # Notify backend that processing is complete
159
  cb_response = requests.post(f"{callback_url}/api/ai-callback", json=final_payload, timeout=30)
160
+ print(f"[LOG] Callback sent to {callback_url}. Status: {cb_response.status_code}")
161
 
162
+ # 6. Local Cleanup: Remove files to save disk space
163
  if os.path.exists(local_input_path): os.remove(local_input_path)
164
  if os.path.exists(final_video_path): os.remove(final_video_path)
165
+
166
  except Exception as e:
167
  print(f"[CALLBACK ERROR]: {e}")
168
 
 
 
169
  @app.get("/")
170
  async def root():
 
171
  return {
172
+ "status": "Intervision AI Engine Running",
173
+ "message": "API is working successfully"
 
174
  }
175
 
176
+ @app.post("/process-interview")
177
  async def process_interview(background_tasks: BackgroundTasks, data: InterviewRequest):
178
+ background_tasks.add_task(background_processing, data.dict())
179
+ return {
180
+ "message": "Processing started",
181
+ "sessionId": data.sessionId
182
+ }
 
183
 
184
+ @app.post("/delete-video-by-url")
185
+ async def delete_video_by_url(data: DeleteVideoRequest):
186
+
187
+ video_url = data.videoUrl
188
  if not video_url:
189
  raise HTTPException(status_code=400, detail="videoUrl is required")
190
+
191
  try:
192
+ # Logic to extract the public_id from a Cloudinary URL
193
+ # Example: .../folder/public_id.mp4 -> folder/public_id
194
  url_parts = video_url.split('/')
195
+ filename_with_ext = url_parts[-1]
196
+ filename = filename_with_ext.split('.')[0]
197
+
198
+ # Check if the video is inside the results folder
199
+ folder = url_parts[-2] if "intervision_results" in url_parts[-2] else ""
200
  public_id = f"{folder}/{filename}" if folder else filename
201
+
202
+ # Trigger deletion from Cloudinary
203
  result = cloudinary.uploader.destroy(public_id, resource_type="video")
204
+
205
+ if result.get("result") == "ok":
206
+ return {"status": "success", "message": f"Deleted {public_id}"}
207
+ return {"status": "failed", "details": result}
208
+
209
  except Exception as e:
210
  raise HTTPException(status_code=500, detail=str(e))
211