Fayza38 commited on
Commit
963faec
·
verified ·
1 Parent(s): 9570783

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +209 -281
app.py CHANGED
@@ -1,281 +1,209 @@
1
- import os
2
- import json
3
- import requests
4
- import cloudinary
5
- import cloudinary.uploader
6
- from requests.adapters import HTTPAdapter
7
- from urllib3.util.retry import Retry
8
- from fastapi import FastAPI, Body, HTTPException, BackgroundTasks
9
- from dotenv import load_dotenv
10
- from datetime import datetime
11
- from pipeline import run_intervision_pipeline
12
-
13
- # --- Setup Retry Strategy ---
14
- retry_strategy = Retry(
15
- total=3,
16
- backoff_factor=1, # Wait 1s, 2s, 4s between retries
17
- status_forcelist=[429, 500, 502, 503, 504],
18
- )
19
- adapter = HTTPAdapter(max_retries=retry_strategy)
20
- http = requests.Session()
21
- http.mount("https://", adapter)
22
- http.mount("http://", adapter)
23
-
24
- # Load environment variables from .env file
25
- load_dotenv()
26
-
27
- app = FastAPI(title="Intervision AI Engine")
28
-
29
- # Cloudinary Configuration
30
- cloudinary.config(
31
- cloud_name = os.getenv("CLOUDINARY_CLOUD_NAME"),
32
- api_key = os.getenv("CLOUDINARY_API_KEY"),
33
- api_secret = os.getenv("CLOUDINARY_API_SECRET")
34
- )
35
-
36
- # Directory Setup
37
- RESULT_DIR = "temp_data/results"
38
- UPLOAD_DIR = "temp_data/uploads"
39
- os.makedirs(RESULT_DIR, exist_ok=True)
40
- os.makedirs(UPLOAD_DIR, exist_ok=True)
41
-
42
- def time_to_seconds(t_str: str) -> int:
43
- """Converts HH:MM:SS timestamp format to total seconds."""
44
- if not t_str: return 0
45
- h, m, s = map(int, t_str.split(':'))
46
- return h * 3600 + m * 60 + s
47
-
48
- def background_processing(session_data: dict):
49
- """
50
- Handles heavy AI processing: video download, pipeline execution,
51
- result upload, and backend notification (callback).
52
- """
53
- session_id = session_data.get('sessionId')
54
- video_url = session_data.get('originalVideoUrl')
55
- callback_url = session_data.get('callbackBaseUrl')
56
-
57
- print(f"[LOG] Processing started for session: {session_id}")
58
-
59
- # 1. Download the original video from the provided URL
60
- local_input_path = os.path.join(UPLOAD_DIR, f"{session_id}_input.mp4")
61
- # 1. Download with increased timeout and Retry logic
62
- try:
63
- print(f"[LOG] Downloading video: {video_url}")
64
- # Increased timeout to 300s (5 minutes) for large files
65
- response = http.get(video_url, stream=True, timeout=300)
66
- response.raise_for_status()
67
- with open(local_input_path, 'wb') as f:
68
- for chunk in response.iter_content(chunk_size=1024*1024):
69
- f.write(chunk)
70
- except Exception as e:
71
- print(f"[DOWNLOAD ERROR]: {e}")
72
- # Notify backend that it failed due to download
73
- return
74
-
75
- # 2. Prepare question list for the AI Pipeline
76
- final_questions = []
77
- skipped_failed_reports = []
78
-
79
- for q in session_data.get('answers', []):
80
- if q.get('isAnswered'):
81
- final_questions.append({
82
- "question_id": q['aiQuestionId'],
83
- "question_text": q['questionText'],
84
- "ideal_answer": q['expectedAnswer'],
85
- "start_time": time_to_seconds(q['startedAt']),
86
- "end_time": time_to_seconds(q['submittedAt'])
87
- })
88
- else:
89
- # Handle questions that weren't answered during the session
90
- skipped_failed_reports.append({
91
- "questionId": q['aiQuestionId'],
92
- "userAnswerText": "N/A",
93
- "score": 0.0,
94
- "relevance": 0.0,
95
- "confidence": 0.0,
96
- "stress": 0.0,
97
- "clarity": 0.0,
98
- "pauses": 0.0,
99
- "toneOfVoice": "N/A",
100
- "status": "skipped" if q.get('isSkipped') else "failed"
101
- })
102
-
103
- # 3. Execute AI Pipeline (Analysis & Visualization)
104
- ai_results = []
105
- if final_questions:
106
- # run_intervision_pipeline generates Intervision_Final_Result.mp4
107
- run_intervision_pipeline(local_input_path, final_questions, RESULT_DIR)
108
- report_path = os.path.join(RESULT_DIR, "report.json")
109
- if os.path.exists(report_path):
110
- with open(report_path, "r") as f:
111
- ai_results = json.load(f).get("listOfAnswerReport", [])
112
-
113
- # 4. Upload the processed video to Cloudinary
114
- final_video_path = os.path.join(RESULT_DIR, "Intervision_Final_Result.mp4")
115
- final_video_url = None
116
- if os.path.exists(final_video_path):
117
- try:
118
- upload_res = cloudinary.uploader.upload(
119
- final_video_path,
120
- public_id=f"res_{session_id}",
121
- folder="intervision_results",
122
- resource_type="video",
123
- chunk_size=6000000
124
- )
125
- final_video_url = upload_res.get("secure_url")
126
- except Exception as e:
127
- print(f"[UPLOAD ERROR]: {e}")
128
-
129
- # 5. Construct final payload and notify Backend via Callback
130
- final_payload = {
131
- "sessionId": session_id,
132
- "finalVideoUrl": final_video_url,
133
- "report": ai_results + skipped_failed_reports
134
- }
135
-
136
- try:
137
- # Notify backend that processing is complete
138
- cb_response = requests.post(f"{callback_url}/api/ai-callback", json=final_payload, timeout=30)
139
- print(f"[LOG] Callback sent to {callback_url}. Status: {cb_response.status_code}")
140
-
141
- # 6. Local Cleanup: Remove files to save disk space
142
- if os.path.exists(local_input_path): os.remove(local_input_path)
143
- if os.path.exists(final_video_path): os.remove(final_video_path)
144
-
145
- except Exception as e:
146
- print(f"[CALLBACK ERROR]: {e}")
147
-
148
- @app.post("/process-interview/")
149
- async def process_interview(background_tasks: BackgroundTasks, data: dict = Body(...)):
150
- """Entry point to start the AI analysis asynchronously."""
151
- background_tasks.add_task(background_processing, data)
152
- return {"message": "Processing started", "sessionId": data.get('sessionId')}
153
-
154
- @app.post("/delete-video-by-url/")
155
- async def delete_video_by_url(data: dict = Body(...)):
156
- """
157
- Deletes a video from Cloudinary based on its URL.
158
- Input JSON: {"videoUrl": "https://..."}
159
- """
160
- video_url = data.get("videoUrl")
161
- if not video_url:
162
- raise HTTPException(status_code=400, detail="videoUrl is required")
163
-
164
- try:
165
- # Logic to extract the public_id from a Cloudinary URL
166
- # Example: .../folder/public_id.mp4 -> folder/public_id
167
- url_parts = video_url.split('/')
168
- filename_with_ext = url_parts[-1]
169
- filename = filename_with_ext.split('.')[0]
170
-
171
- # Check if the video is inside the results folder
172
- folder = url_parts[-2] if "intervision_results" in url_parts[-2] else ""
173
- public_id = f"{folder}/{filename}" if folder else filename
174
-
175
- # Trigger deletion from Cloudinary
176
- result = cloudinary.uploader.destroy(public_id, resource_type="video")
177
-
178
- if result.get("result") == "ok":
179
- return {"status": "success", "message": f"Deleted {public_id}"}
180
- return {"status": "failed", "details": result}
181
-
182
- except Exception as e:
183
- raise HTTPException(status_code=500, detail=str(e))
184
-
185
- if __name__ == "__main__":
186
- import uvicorn
187
- uvicorn.run(app, host="0.0.0.0", port=8000)
188
-
189
- # @app.post("/process-interview-test/")
190
- # async def process_test(data: dict = Body(...)):
191
- # try:
192
- # print(f"--- [TEST LOG] Processing Session: {data['sessionId']} ---")
193
-
194
- # # 1. Path Check
195
- # local_path = r"D:\FayzaAhmed\Graduation_project\models\MultiModal\deployment\interview_test.mp4"
196
- # if not os.path.exists(local_path):
197
- # return {"error": f"Video file not found at {local_path}"}
198
-
199
- # # 2. Prepare Data
200
- # final_questions = []
201
- # for q in data['answers']:
202
- # if q.get('isAnswered'):
203
- # final_questions.append({
204
- # "question_id": q['aiQuestionId'],
205
- # "question_text": q['questionText'],
206
- # "ideal_answer": q['expectedAnswer'],
207
- # "start_time": time_to_seconds(q['startedAt']),
208
- # "end_time": time_to_seconds(q['submittedAt'])
209
- # })
210
-
211
- # # 3. Run Pipeline
212
- # print("[LOG] Running AI Pipeline...")
213
- # run_intervision_pipeline(local_path, final_questions, RESULT_DIR)
214
-
215
- # # 4. Upload
216
- # print("[LOG] Uploading to Cloudinary...")
217
- # final_video_path = os.path.join(RESULT_DIR, "Intervision_Final_Result.mp4")
218
- # upload_res = cloudinary.uploader.upload(
219
- # final_video_path,
220
- # public_id=f"{data['sessionId']}_test",
221
- # folder="intervision_tests",
222
- # resource_type="video" # This is the important part
223
- # )
224
-
225
- # # 5. Load Report
226
- # report_path = os.path.join(RESULT_DIR, "report.json")
227
- # if not os.path.exists(report_path):
228
- # return {"error": "Pipeline finished but report.json was not created."}
229
-
230
- # with open(report_path, "r") as f:
231
- # ai_results = json.load(f)["listOfAnswerReport"]
232
-
233
- # return {
234
- # "status": "Success",
235
- # "videoUrl": upload_res.get("secure_url"),
236
- # "report": ai_results
237
- # }
238
-
239
- # except Exception as e:
240
- # print(f"[CRITICAL ERROR]: {str(e)}")
241
- # return {"error": str(e), "traceback": "Check Terminal for details"}
242
-
243
- """QUESTIONS_CONFIG =
244
- [
245
- {
246
- "question_id": 1,
247
- "question_text": "how do you describe yourself",
248
- "ideal_answer": "Being different means you have to work at belonging...",
249
- "start_time": 0,
250
- "end_time": 15,
251
- },
252
- {
253
- "question_id": 2,
254
- "question_text": "Tell us about your biggest achievement",
255
- "ideal_answer": "I am proud of accomplishing...",
256
- "start_time": 15,
257
- "end_time": 24,
258
- }
259
- ]
260
- """
261
-
262
- """
263
- {
264
- "sessionId": "test-session-123",
265
- "originalVideoUrl": "local_test_no_url",
266
- "callbackBaseUrl": "http://localhost:8000",
267
- "answers": [
268
- {
269
- "questionId": "q-1",
270
- "aiQuestionId": 1,
271
- "questionText": "How does the speaker encourage people to deal with their differences and uniqueness?",
272
- "expectedAnswer": "When you're different, you have to work at the longing. Everybody wants to feel valued and accepted, and we think it should happen spontaneously, but it doesn't. Sometimes society tells us, and we tell ourselves, we don't fit the mold. Take a piece of paper and write down what makes you different. And I want you to celebrate it today and every day. Shout it from the rooftops. What makes me different is what has made me stand out and be successful. I also encourage you to be curious and ask, what is on other people's pieces of paper? What makes them different? Let's celebrate those imperfections that make us special. I hope that it teaches you that nobody has a claim on the word normal. We are all different. We are all quirky and unique, and that is what makes us wonderful.",
273
- "isAnswered": true,
274
- "isSkipped": false,
275
- "isFailed": false,
276
- "startedAt": "00:00:00",
277
- "submittedAt": "00:00:55"
278
- }
279
- ]
280
- }
281
- """
 
1
+ import os
2
+ import json
3
+ import requests
4
+ import cloudinary
5
+ import cloudinary.uploader
6
+ from requests.adapters import HTTPAdapter
7
+ from urllib3.util.retry import Retry
8
+ from fastapi import FastAPI, Body, HTTPException, BackgroundTasks
9
+ from pydantic import BaseModel, HttpUrl
10
+ from typing import List, Optional
11
+ from dotenv import load_dotenv
12
+ from pipeline import run_intervision_pipeline
13
+
14
+ # --- 1. Setup Retry Strategy for Robust Downloads ---
15
+ # This ensures that if the video download fails momentarily, it retries 3 times.
16
+ retry_strategy = Retry(
17
+ total=3,
18
+ backoff_factor=1,
19
+ status_forcelist=[429, 500, 502, 503, 504],
20
+ )
21
+ adapter = HTTPAdapter(max_retries=retry_strategy)
22
+ http = requests.Session()
23
+ http.mount("https://", adapter)
24
+ http.mount("http://", adapter)
25
+
26
+ # Load environment variables from .env file
27
+ load_dotenv()
28
+
29
+ app = FastAPI(
30
+ title="Intervision AI Engine",
31
+ description="Asynchronous AI Pipeline for Interview Analysis",
32
+ version="1.1.0"
33
+ )
34
+
35
+ # --- 2. Cloudinary Configuration ---
36
+ cloudinary.config(
37
+ cloud_name=os.getenv("CLOUDINARY_CLOUD_NAME"),
38
+ api_key=os.getenv("CLOUDINARY_API_KEY"),
39
+ api_secret=os.getenv("CLOUDINARY_API_SECRET")
40
+ )
41
+
42
+ # --- 3. Directory Setup ---
43
+ RESULT_DIR = "temp_data/results"
44
+ UPLOAD_DIR = "temp_data/uploads"
45
+ os.makedirs(RESULT_DIR, exist_ok=True)
46
+ os.makedirs(UPLOAD_DIR, exist_ok=True)
47
+
48
+ # --- 4. Pydantic Models (Fixes the "additionalProp1" issue) ---
49
+ class AnswerDetail(BaseModel):
50
+ aiQuestionId: int
51
+ questionText: str
52
+ expectedAnswer: str
53
+ isAnswered: bool
54
+ isSkipped: bool
55
+ startedAt: str # Expected format "HH:MM:SS"
56
+ submittedAt: str # Expected format "HH:MM:SS"
57
+
58
+ class InterviewRequest(BaseModel):
59
+ sessionId: str
60
+ originalVideoUrl: HttpUrl
61
+ callbackBaseUrl: HttpUrl
62
+ answers: List[AnswerDetail]
63
+
64
+ # --- 5. Helper Functions ---
65
+ def time_to_seconds(t_str: str) -> int:
66
+ """Converts HH:MM:SS or MM:SS timestamp format to total seconds."""
67
+ if not t_str or ":" not in t_str: return 0
68
+ parts = list(map(int, t_str.split(':')))
69
+ if len(parts) == 3:
70
+ h, m, s = parts
71
+ return h * 3600 + m * 60 + s
72
+ elif len(parts) == 2:
73
+ m, s = parts
74
+ return m * 60 + s
75
+ return 0
76
+
77
+ # --- 6. Background Processing Logic ---
78
+ def background_processing(session_data: InterviewRequest):
79
+ """
80
+ Handles heavy AI processing: video download, pipeline execution,
81
+ result upload, and backend notification (callback).
82
+ """
83
+ session_id = session_data.sessionId
84
+ video_url = str(session_data.originalVideoUrl)
85
+ callback_url = str(session_data.callbackBaseUrl)
86
+
87
+ print(f"[LOG] Processing started for session: {session_id}")
88
+
89
+ local_input_path = os.path.join(UPLOAD_DIR, f"{session_id}_input.mp4")
90
+
91
+ # Step 1: Download the original video
92
+ try:
93
+ print(f"[LOG] Downloading video: {video_url}")
94
+ response = http.get(video_url, stream=True, timeout=300)
95
+ response.raise_for_status()
96
+ with open(local_input_path, 'wb') as f:
97
+ for chunk in response.iter_content(chunk_size=1024*1024):
98
+ f.write(chunk)
99
+ except Exception as e:
100
+ print(f"[DOWNLOAD ERROR]: {e}")
101
+ return
102
+
103
+ # Step 2: Prepare questions for the Pipeline
104
+ final_questions = []
105
+ skipped_failed_reports = []
106
+
107
+ for q in session_data.answers:
108
+ if q.isAnswered:
109
+ final_questions.append({
110
+ "question_id": q.aiQuestionId,
111
+ "question_text": q.questionText,
112
+ "ideal_answer": q.expectedAnswer,
113
+ "start_time": time_to_seconds(q.startedAt),
114
+ "end_time": time_to_seconds(q.submittedAt)
115
+ })
116
+ else:
117
+ skipped_failed_reports.append({
118
+ "questionId": q.aiQuestionId,
119
+ "userAnswerText": "N/A",
120
+ "score": 0.0,
121
+ "relevance": 0.0,
122
+ "confidence": 0.0,
123
+ "stress": 0.0,
124
+ "clarity": 0.0,
125
+ "pauses": 0.0,
126
+ "toneOfVoice": 3, # Default to Natural/N/A
127
+ "status": "skipped" if q.isSkipped else "failed"
128
+ })
129
+
130
+ # Step 3: Execute AI Pipeline
131
+ ai_results = []
132
+ if final_questions:
133
+ run_intervision_pipeline(local_input_path, final_questions, RESULT_DIR)
134
+ report_path = os.path.join(RESULT_DIR, "report.json")
135
+ if os.path.exists(report_path):
136
+ with open(report_path, "r") as f:
137
+ ai_results = json.load(f).get("listOfAnswerReport", [])
138
+
139
+ # Step 4: Upload processed video to Cloudinary
140
+ final_video_path = os.path.join(RESULT_DIR, "Intervision_Final_Result.mp4")
141
+ final_video_url = None
142
+ if os.path.exists(final_video_path):
143
+ try:
144
+ upload_res = cloudinary.uploader.upload(
145
+ final_video_path,
146
+ public_id=f"res_{session_id}",
147
+ folder="intervision_results",
148
+ resource_type="video"
149
+ )
150
+ final_video_url = upload_res.get("secure_url")
151
+ except Exception as e:
152
+ print(f"[UPLOAD ERROR]: {e}")
153
+
154
+ # Step 5: Construct final payload and notify Backend
155
+ final_payload = {
156
+ "sessionId": session_id,
157
+ "finalVideoUrl": final_video_url,
158
+ "report": ai_results + skipped_failed_reports
159
+ }
160
+
161
+ try:
162
+ cb_response = requests.post(f"{callback_url}/api/ai-callback", json=final_payload, timeout=30)
163
+ print(f"[LOG] Callback sent. Status: {cb_response.status_code}")
164
+
165
+ # Cleanup local files
166
+ if os.path.exists(local_input_path): os.remove(local_input_path)
167
+ if os.path.exists(final_video_path): os.remove(final_video_path)
168
+ except Exception as e:
169
+ print(f"[CALLBACK ERROR]: {e}")
170
+
171
+ # --- 7. API Routes ---
172
+
173
+ @app.get("/")
174
+ async def root():
175
+ """Health check endpoint to verify the service is running."""
176
+ return {
177
+ "status": "online",
178
+ "message": "Intervision AI Engine is running",
179
+ "documentation": "/docs"
180
+ }
181
+
182
+ @app.post("/process-interview/")
183
+ async def process_interview(background_tasks: BackgroundTasks, data: InterviewRequest):
184
+ """
185
+ Entry point to start the AI analysis asynchronously.
186
+ Receives validated data via InterviewRequest model.
187
+ """
188
+ background_tasks.add_task(background_processing, data)
189
+ return {"message": "Processing started", "sessionId": data.sessionId}
190
+
191
+ @app.post("/delete-video-by-url/")
192
+ async def delete_video_by_url(payload: dict = Body(...)):
193
+ """Deletes a video from Cloudinary based on its URL."""
194
+ video_url = payload.get("videoUrl")
195
+ if not video_url:
196
+ raise HTTPException(status_code=400, detail="videoUrl is required")
197
+ try:
198
+ url_parts = video_url.split('/')
199
+ filename = url_parts[-1].split('.')[0]
200
+ folder = "intervision_results" if "intervision_results" in video_url else ""
201
+ public_id = f"{folder}/{filename}" if folder else filename
202
+ result = cloudinary.uploader.destroy(public_id, resource_type="video")
203
+ return {"status": result.get("result"), "public_id": public_id}
204
+ except Exception as e:
205
+ raise HTTPException(status_code=500, detail=str(e))
206
+
207
+ if __name__ == "__main__":
208
+ import uvicorn
209
+ uvicorn.run(app, host="0.0.0.0", port=8000)