tlong-ds commited on
Commit
99c043f
·
verified ·
1 Parent(s): fbbfcf2

Update services/api/upload_endpoints.py

Browse files
Files changed (1) hide show
  1. services/api/upload_endpoints.py +746 -107
services/api/upload_endpoints.py CHANGED
@@ -1,18 +1,17 @@
1
- from fastapi import APIRouter, HTTPException, Depends, Request, UploadFile, File, Form
2
  from fastapi.responses import JSONResponse
3
  import boto3
4
  import json
5
  import os
6
- import uuid
7
  import logging
8
  import asyncio
9
- from typing import Dict, Optional, List
10
  from pydantic import BaseModel
11
- from io import BytesIO
12
  from datetime import datetime, timedelta
13
  from botocore.exceptions import ClientError
14
- from services.api.db.token_utils import decode_token
15
  from services.api.api_endpoints import connect_db
 
16
  from services.utils.api_cache import invalidate_cache_pattern
17
 
18
  # Configure logging for upload operations
@@ -37,10 +36,153 @@ router = APIRouter(
37
  responses={404: {"description": "Not found"}},
38
  )
39
 
40
- # In-memory storage for active multipart uploads
41
  # Structure: {upload_id: {parts: [], course_id: str, lecture_id: str, upload_key: str, ...}}
42
  active_uploads = {}
43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
44
  # Models
45
  class InitUploadResponse(BaseModel):
46
  upload_id: str
@@ -56,35 +198,31 @@ class CompleteUploadRequest(BaseModel):
56
  upload_id: str
57
  parts: List[CompletedPart]
58
 
59
- # Middleware to check instructor permissions
60
- async def verify_instructor(request: Request, auth_token: Optional[str] = None):
 
61
  if not auth_token:
62
  auth_header = request.headers.get('Authorization')
63
  if auth_header and auth_header.startswith('Bearer '):
64
  auth_token = auth_header.split(' ')[1]
65
  else:
66
- raise HTTPException(status_code=401, detail="Authentication required")
67
 
68
  try:
69
  user_data = decode_token(auth_token)
70
- username = user_data.get('username')
71
- role = user_data.get('role')
72
- instructor_id = user_data.get('user_id')
73
-
74
- if role != "Instructor":
75
- raise HTTPException(status_code=403, detail="Only instructors can upload files")
76
-
77
- # Return instructor ID for use in the endpoints
78
- return instructor_id or username
79
  except Exception as e:
80
- logger.error(f"Token verification error: {str(e)}")
81
  raise HTTPException(status_code=401, detail="Invalid authentication token")
82
 
83
  # Helper to generate upload key for lecture video
84
  def get_lecture_video_key(course_id: int, lecture_id: int):
85
  return f"videos/cid{course_id}/lid{lecture_id}/vid_lecture.mp4"
86
 
87
- # 1. Initialize multipart upload
88
  @router.post("/upload/init-upload")
89
  async def init_upload(
90
  request: Request,
@@ -95,38 +233,62 @@ async def init_upload(
95
  parts: int = Form(...),
96
  auth_token: Optional[str] = None
97
  ):
98
- # Verify instructor permissions
99
- instructor_id = await verify_instructor(request, auth_token)
 
100
 
101
- # Verify this instructor owns this course
102
- conn = connect_db()
103
- try:
104
- with conn.cursor() as cursor:
105
- cursor.execute(
106
- "SELECT CourseID FROM Courses WHERE CourseID = %s AND InstructorID = %s",
107
- (course_id, instructor_id)
108
- )
109
- if not cursor.fetchone():
110
- raise HTTPException(status_code=403, detail="Not authorized to modify this course")
111
-
112
- # Verify lecture belongs to this course
113
- cursor.execute(
114
- "SELECT LectureID FROM Lectures WHERE LectureID = %s AND CourseID = %s",
115
- (lecture_id, course_id)
116
- )
117
- if not cursor.fetchone():
118
- raise HTTPException(status_code=404, detail="Lecture not found or doesn't belong to this course")
119
- finally:
120
- conn.close()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
121
 
122
  # Generate S3 key for the video
123
  key = get_lecture_video_key(course_id, lecture_id)
124
 
125
  # Log upload initialization
126
  logger.info(f"Initializing upload for course {course_id}, lecture {lecture_id}, size: {file_size}, parts: {parts}")
 
127
 
128
  try:
129
  # Create multipart upload
 
130
  multipart_upload = s3.create_multipart_upload(
131
  Bucket=BUCKET_NAME,
132
  Key=key,
@@ -136,9 +298,11 @@ async def init_upload(
136
  )
137
 
138
  upload_id = multipart_upload['UploadId']
 
139
 
140
  # Generate presigned URLs for each part
141
  presigned_urls = {}
 
142
  for part_number in range(1, parts + 1):
143
  presigned_url = s3.generate_presigned_url(
144
  'upload_part',
@@ -151,10 +315,13 @@ async def init_upload(
151
  ExpiresIn=3600 # 1 hour expiry
152
  )
153
  presigned_urls[str(part_number)] = presigned_url
 
 
 
154
 
155
- # Store upload info in memory
156
  expires_at = datetime.now() + timedelta(hours=24)
157
- active_uploads[upload_id] = {
158
  'course_id': course_id,
159
  'lecture_id': lecture_id,
160
  'key': key,
@@ -169,6 +336,12 @@ async def init_upload(
169
  'expires_at': expires_at
170
  }
171
 
 
 
 
 
 
 
172
  logger.info(f"Upload initialized with ID: {upload_id}")
173
 
174
  return JSONResponse({
@@ -182,7 +355,6 @@ async def init_upload(
182
  logger.error(f"Error initializing upload: {str(e)}")
183
  raise HTTPException(status_code=500, detail=f"Failed to initialize upload: {str(e)}")
184
 
185
- # 2. Upload part status update
186
  @router.post("/upload/upload-part")
187
  async def upload_part_status(
188
  request: Request,
@@ -191,31 +363,58 @@ async def upload_part_status(
191
  etag: str = Form(...),
192
  auth_token: Optional[str] = None
193
  ):
194
- # Verify instructor permissions
195
- instructor_id = await verify_instructor(request, auth_token)
 
196
 
197
  # Verify upload exists and belongs to this instructor
198
- if upload_id not in active_uploads:
 
199
  raise HTTPException(status_code=404, detail="Upload not found")
200
 
201
- upload_info = active_uploads[upload_id]
202
  if upload_info['instructor_id'] != instructor_id:
203
  raise HTTPException(status_code=403, detail="Not authorized to access this upload")
204
-
205
- # Check if upload is still valid
206
  if upload_info['expires_at'] < datetime.now():
207
  raise HTTPException(status_code=400, detail="Upload expired")
208
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
209
  # Add part info
210
- upload_info['parts'].append({
211
  'ETag': etag,
212
  'PartNumber': part_number
213
- })
 
214
 
215
  upload_info['parts_received'] += 1
216
 
217
- # Log progress
218
- logger.info(f"Upload {upload_id}: Part {part_number} received. Progress: {upload_info['parts_received']}/{upload_info['parts_expected']}")
 
 
 
 
 
 
 
219
 
220
  return JSONResponse({
221
  'upload_id': upload_id,
@@ -225,21 +424,21 @@ async def upload_part_status(
225
  'progress': f"{int((upload_info['parts_received'] / upload_info['parts_expected']) * 100)}%"
226
  })
227
 
228
- # 3. Complete multipart upload
229
  @router.post("/upload/complete-upload")
230
  async def complete_upload(
231
  request: Request,
232
  upload_id: str = Form(...),
233
  auth_token: Optional[str] = None
234
  ):
235
- # Verify instructor permissions
236
- instructor_id = await verify_instructor(request, auth_token)
 
237
 
238
  # Verify upload exists and belongs to this instructor
239
- if upload_id not in active_uploads:
 
240
  raise HTTPException(status_code=404, detail="Upload not found")
241
 
242
- upload_info = active_uploads[upload_id]
243
  if upload_info['instructor_id'] != instructor_id:
244
  raise HTTPException(status_code=403, detail="Not authorized to access this upload")
245
 
@@ -266,23 +465,42 @@ async def complete_upload(
266
  # Sort parts by part number
267
  parts = sorted(upload_info['parts'], key=lambda x: x['PartNumber'])
268
 
 
 
 
 
 
269
  # Complete multipart upload
270
- response = s3.complete_multipart_upload(
271
- Bucket=BUCKET_NAME,
272
- Key=upload_info['key'],
273
- UploadId=upload_id,
274
- MultipartUpload={
275
- 'Parts': parts
276
- }
277
- )
 
 
 
 
 
 
 
 
278
 
279
  # Get the full video URL
280
  video_url = f"https://{BUCKET_NAME}.s3-{REGION}.amazonaws.com/{upload_info['key']}"
281
 
282
- # Clean up the upload from memory
283
  course_id = upload_info['course_id']
284
  lecture_id = upload_info['lecture_id']
285
- del active_uploads[upload_id]
 
 
 
 
 
 
286
 
287
  # Invalidate cache for this course
288
  invalidate_cache_pattern(f"lectures:id:{lecture_id}:*")
@@ -317,21 +535,21 @@ async def complete_upload(
317
 
318
  raise HTTPException(status_code=500, detail=f"Failed to complete upload: {str(e)}")
319
 
320
- # 4. Abort multipart upload
321
  @router.post("/upload/abort-upload")
322
  async def abort_upload(
323
  request: Request,
324
  upload_id: str = Form(...),
325
  auth_token: Optional[str] = None
326
  ):
327
- # Verify instructor permissions
328
- instructor_id = await verify_instructor(request, auth_token)
 
329
 
330
  # Verify upload exists and belongs to this instructor
331
- if upload_id not in active_uploads:
 
332
  raise HTTPException(status_code=404, detail="Upload not found")
333
 
334
- upload_info = active_uploads[upload_id]
335
  if upload_info['instructor_id'] != instructor_id:
336
  raise HTTPException(status_code=403, detail="Not authorized to access this upload")
337
 
@@ -343,10 +561,13 @@ async def abort_upload(
343
  UploadId=upload_id
344
  )
345
 
346
- # Clean up
347
  course_id = upload_info['course_id']
348
  lecture_id = upload_info['lecture_id']
349
- del active_uploads[upload_id]
 
 
 
350
 
351
  logger.info(f"Upload {upload_id} aborted for course {course_id}, lecture {lecture_id}")
352
 
@@ -365,21 +586,21 @@ async def abort_upload(
365
 
366
  raise HTTPException(status_code=500, detail=f"Failed to abort upload: {str(e)}")
367
 
368
- # 5. Get upload status
369
  @router.get("/upload/status/{upload_id}")
370
  async def get_upload_status(
371
  request: Request,
372
  upload_id: str,
373
  auth_token: Optional[str] = None
374
  ):
375
- # Verify instructor permissions
376
- instructor_id = await verify_instructor(request, auth_token)
 
377
 
378
  # Verify upload exists and belongs to this instructor
379
- if upload_id not in active_uploads:
 
380
  raise HTTPException(status_code=404, detail="Upload not found")
381
 
382
- upload_info = active_uploads[upload_id]
383
  if upload_info['instructor_id'] != instructor_id:
384
  raise HTTPException(status_code=403, detail="Not authorized to access this upload")
385
 
@@ -399,16 +620,18 @@ async def get_upload_status(
399
  'expires_at': upload_info['expires_at'].isoformat()
400
  })
401
 
402
- # 6. List active uploads
403
  @router.get("/upload/active-uploads")
404
  async def list_active_uploads(
405
  request: Request,
406
  auth_token: Optional[str] = None
407
  ):
408
- # Verify instructor permissions
409
- instructor_id = await verify_instructor(request, auth_token)
 
410
 
411
- # Filter uploads for this instructor
 
 
412
  instructor_uploads = {}
413
  for upload_id, info in active_uploads.items():
414
  if info['instructor_id'] == instructor_id:
@@ -427,36 +650,35 @@ async def list_active_uploads(
427
  'count': len(instructor_uploads)
428
  })
429
 
430
- # Clean up expired uploads
431
  @router.post("/upload/cleanup")
432
  async def cleanup_uploads(
433
  request: Request,
434
  auth_token: Optional[str] = None
435
  ):
436
- # Verify instructor permissions
437
- instructor_id = await verify_instructor(request, auth_token)
438
-
439
- # Only allow system admin to perform cleanup
440
- if instructor_id != 1: # Assuming instructor ID 1 is the admin
441
- raise HTTPException(status_code=403, detail="Only system admin can perform this operation")
442
 
443
  now = datetime.now()
444
  expired_uploads = []
445
 
446
- for upload_id, info in list(active_uploads.items()):
447
- if info['expires_at'] < now:
 
 
448
  try:
449
  # Abort the upload in S3
450
  s3.abort_multipart_upload(
451
  Bucket=BUCKET_NAME,
452
- Key=info['key'],
453
  UploadId=upload_id
454
  )
455
  except Exception as e:
456
  logger.error(f"Error aborting expired upload {upload_id}: {str(e)}")
457
 
458
- # Remove from active uploads
459
- del active_uploads[upload_id]
 
 
460
  expired_uploads.append(upload_id)
461
 
462
  return JSONResponse({
@@ -469,26 +691,443 @@ async def background_cleanup():
469
  """Periodically clean up expired uploads"""
470
  while True:
471
  now = datetime.now()
472
- for upload_id, info in list(active_uploads.items()):
473
- if info['expires_at'] < now:
 
 
 
 
474
  try:
475
  # Abort the upload in S3
476
  s3.abort_multipart_upload(
477
  Bucket=BUCKET_NAME,
478
- Key=info['key'],
479
  UploadId=upload_id
480
  )
481
  logger.info(f"Automatically cleaned up expired upload {upload_id}")
482
  except Exception as e:
483
  logger.error(f"Error cleaning up expired upload {upload_id}: {str(e)}")
484
 
485
- # Remove from active uploads
486
- del active_uploads[upload_id]
 
 
487
 
488
  # Wait for 1 hour before next cleanup
489
  await asyncio.sleep(3600)
490
 
491
- # Start the background cleanup task when the module is imported
492
- @router.on_event("startup")
493
- async def start_background_tasks():
494
- asyncio.create_task(background_cleanup())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, HTTPException, UploadFile, File, Form, Request
2
  from fastapi.responses import JSONResponse
3
  import boto3
4
  import json
5
  import os
6
+ import time
7
  import logging
8
  import asyncio
9
+ from typing import Dict, List, Optional
10
  from pydantic import BaseModel
 
11
  from datetime import datetime, timedelta
12
  from botocore.exceptions import ClientError
 
13
  from services.api.api_endpoints import connect_db
14
+ from services.api.db.token_utils import decode_token
15
  from services.utils.api_cache import invalidate_cache_pattern
16
 
17
  # Configure logging for upload operations
 
36
  responses={404: {"description": "Not found"}},
37
  )
38
 
39
+ # In-memory storage for active multipart uploads (for quick access)
40
  # Structure: {upload_id: {parts: [], course_id: str, lecture_id: str, upload_key: str, ...}}
41
  active_uploads = {}
42
 
43
+ # Cache persistence functions for upload state
44
+ def save_upload_to_cache(upload_id, upload_info):
45
+ """Save upload information to cache for persistence"""
46
+ try:
47
+ from services.config.valkey_config import get_redis_client, is_connection_available
48
+
49
+ if not is_connection_available():
50
+ logger.warning("Cache not available, upload data will only be stored in memory")
51
+ return
52
+
53
+ redis_client = get_redis_client()
54
+ cache_key = f"multipart_upload:{upload_id}"
55
+
56
+ # Convert datetime objects to ISO strings for JSON serialization
57
+ cache_data = upload_info.copy()
58
+ if isinstance(cache_data.get('created_at'), datetime):
59
+ cache_data['created_at'] = cache_data['created_at'].isoformat()
60
+ if isinstance(cache_data.get('expires_at'), datetime):
61
+ cache_data['expires_at'] = cache_data['expires_at'].isoformat()
62
+
63
+ # Set expiry based on upload expiry time (24 hours)
64
+ ttl = 24 * 3600 # 24 hours
65
+ redis_client.setex(cache_key, ttl, json.dumps(cache_data))
66
+ logger.info(f"Saved upload {upload_id} to cache with TTL {ttl}s")
67
+
68
+ except Exception as e:
69
+ logger.error(f"Error saving upload to cache: {e}")
70
+
71
+ def save_upload_part_to_cache(upload_id, part_number, etag):
72
+ """Save individual part information to cache"""
73
+ try:
74
+ from services.config.valkey_config import get_redis_client, is_connection_available
75
+
76
+ if not is_connection_available():
77
+ return
78
+
79
+ redis_client = get_redis_client()
80
+ cache_key = f"multipart_upload_part:{upload_id}:{part_number}"
81
+
82
+ part_data = {
83
+ 'part_number': part_number,
84
+ 'etag': etag,
85
+ 'uploaded_at': datetime.now().isoformat()
86
+ }
87
+
88
+ # Set expiry to 24 hours
89
+ ttl = 24 * 3600
90
+ redis_client.setex(cache_key, ttl, json.dumps(part_data))
91
+ logger.debug(f"Saved part {part_number} for upload {upload_id} to cache")
92
+
93
+ except Exception as e:
94
+ logger.error(f"Error saving upload part to cache: {e}")
95
+
96
+ def load_upload_from_cache(upload_id):
97
+ """Load upload information from cache"""
98
+ try:
99
+ from services.config.valkey_config import get_redis_client, is_connection_available
100
+
101
+ if not is_connection_available():
102
+ return None
103
+
104
+ redis_client = get_redis_client()
105
+ cache_key = f"multipart_upload:{upload_id}"
106
+
107
+ cached_data = redis_client.get(cache_key)
108
+ if not cached_data:
109
+ return None
110
+
111
+ upload_info = json.loads(cached_data)
112
+
113
+ # Convert ISO strings back to datetime objects
114
+ if 'created_at' in upload_info and isinstance(upload_info['created_at'], str):
115
+ upload_info['created_at'] = datetime.fromisoformat(upload_info['created_at'])
116
+ if 'expires_at' in upload_info and isinstance(upload_info['expires_at'], str):
117
+ upload_info['expires_at'] = datetime.fromisoformat(upload_info['expires_at'])
118
+
119
+ # Load parts data
120
+ parts_pattern = f"multipart_upload_part:{upload_id}:*"
121
+ parts_keys = redis_client.keys(parts_pattern)
122
+ parts = []
123
+
124
+ for part_key in parts_keys:
125
+ try:
126
+ part_data = json.loads(redis_client.get(part_key))
127
+ parts.append({
128
+ 'PartNumber': part_data['part_number'],
129
+ 'ETag': part_data['etag']
130
+ })
131
+ except Exception as e:
132
+ logger.error(f"Error loading part data from {part_key}: {e}")
133
+
134
+ # Sort parts by part number
135
+ parts.sort(key=lambda x: x['PartNumber'])
136
+ upload_info['parts'] = parts
137
+ upload_info['parts_received'] = len(parts)
138
+
139
+ logger.info(f"Loaded upload {upload_id} from cache with {len(parts)} parts")
140
+ return upload_info
141
+
142
+ except Exception as e:
143
+ logger.error(f"Error loading upload from cache: {e}")
144
+ return None
145
+
146
+ def remove_upload_from_cache(upload_id):
147
+ """Remove upload and its parts from cache"""
148
+ try:
149
+ from services.config.valkey_config import get_redis_client, is_connection_available
150
+
151
+ if not is_connection_available():
152
+ return
153
+
154
+ redis_client = get_redis_client()
155
+
156
+ # Remove main upload data
157
+ cache_key = f"multipart_upload:{upload_id}"
158
+ redis_client.delete(cache_key)
159
+
160
+ # Remove all parts
161
+ parts_pattern = f"multipart_upload_part:{upload_id}:*"
162
+ parts_keys = redis_client.keys(parts_pattern)
163
+ if parts_keys:
164
+ redis_client.delete(*parts_keys)
165
+
166
+ logger.info(f"Removed upload {upload_id} and {len(parts_keys)} parts from cache")
167
+
168
+ except Exception as e:
169
+ logger.error(f"Error removing upload from cache: {e}")
170
+
171
+ def get_upload_info(upload_id):
172
+ """Get upload info from memory first, then cache if not found"""
173
+ # Try memory first (fastest)
174
+ if upload_id in active_uploads:
175
+ return active_uploads[upload_id]
176
+
177
+ # Try cache
178
+ upload_info = load_upload_from_cache(upload_id)
179
+ if upload_info:
180
+ # Cache in memory for faster subsequent access
181
+ active_uploads[upload_id] = upload_info
182
+ return upload_info
183
+
184
+ return None
185
+
186
  # Models
187
  class InitUploadResponse(BaseModel):
188
  upload_id: str
 
198
  upload_id: str
199
  parts: List[CompletedPart]
200
 
201
+ # Helper functions for user info extraction
202
+ def get_user_info_from_token(request: Request, auth_token: Optional[str] = None):
203
+ """Extract user information from token without additional verification"""
204
  if not auth_token:
205
  auth_header = request.headers.get('Authorization')
206
  if auth_header and auth_header.startswith('Bearer '):
207
  auth_token = auth_header.split(' ')[1]
208
  else:
209
+ raise HTTPException(status_code=401, detail="Authentication token required")
210
 
211
  try:
212
  user_data = decode_token(auth_token)
213
+ return {
214
+ 'username': user_data.get('username'),
215
+ 'role': user_data.get('role'),
216
+ 'user_id': user_data.get('user_id')
217
+ }
 
 
 
 
218
  except Exception as e:
219
+ logger.error(f"Token decode error: {str(e)}")
220
  raise HTTPException(status_code=401, detail="Invalid authentication token")
221
 
222
  # Helper to generate upload key for lecture video
223
  def get_lecture_video_key(course_id: int, lecture_id: int):
224
  return f"videos/cid{course_id}/lid{lecture_id}/vid_lecture.mp4"
225
 
 
226
  @router.post("/upload/init-upload")
227
  async def init_upload(
228
  request: Request,
 
233
  parts: int = Form(...),
234
  auth_token: Optional[str] = None
235
  ):
236
+ # Get user info from token
237
+ user_info = get_user_info_from_token(request, auth_token)
238
+ instructor_id = user_info['user_id']
239
 
240
+ # Verify this instructor owns this course and lecture exists (with retry for newly created lectures)
241
+ max_retries = 3
242
+ retry_delay = 0.5 # 500ms delay between retries
243
+
244
+ for attempt in range(max_retries):
245
+ conn = connect_db()
246
+ try:
247
+ with conn.cursor() as cursor:
248
+ cursor.execute(
249
+ "SELECT CourseID FROM Courses WHERE CourseID = %s AND InstructorID = %s",
250
+ (course_id, instructor_id)
251
+ )
252
+ if not cursor.fetchone():
253
+ raise HTTPException(status_code=403, detail="Not authorized to modify this course")
254
+
255
+ # Verify lecture belongs to this course (with retry for newly created lectures)
256
+ cursor.execute(
257
+ "SELECT LectureID FROM Lectures WHERE LectureID = %s AND CourseID = %s",
258
+ (lecture_id, course_id)
259
+ )
260
+ lecture_result = cursor.fetchone()
261
+
262
+ if lecture_result:
263
+ # Lecture found, break out of retry loop
264
+ break
265
+ elif attempt < max_retries - 1:
266
+ # Lecture not found, but we have more retries
267
+ logger.info(f"Lecture {lecture_id} not found on attempt {attempt + 1}, retrying in {retry_delay}s...")
268
+ time.sleep(retry_delay)
269
+ retry_delay *= 1.5 # Exponential backoff
270
+ else:
271
+ # Final attempt failed
272
+ raise HTTPException(status_code=404, detail="Lecture not found or doesn't belong to this course")
273
+ finally:
274
+ conn.close()
275
+
276
+ # If we reached here and haven't broken out, we need to retry
277
+ if attempt < max_retries - 1:
278
+ continue
279
+ else:
280
+ break
281
 
282
  # Generate S3 key for the video
283
  key = get_lecture_video_key(course_id, lecture_id)
284
 
285
  # Log upload initialization
286
  logger.info(f"Initializing upload for course {course_id}, lecture {lecture_id}, size: {file_size}, parts: {parts}")
287
+ logger.info(f"Instructor ID: {instructor_id}, File type: {file_type}")
288
 
289
  try:
290
  # Create multipart upload
291
+ logger.info(f"Creating multipart upload for key: {key}")
292
  multipart_upload = s3.create_multipart_upload(
293
  Bucket=BUCKET_NAME,
294
  Key=key,
 
298
  )
299
 
300
  upload_id = multipart_upload['UploadId']
301
+ logger.info(f"Multipart upload created with ID: {upload_id}")
302
 
303
  # Generate presigned URLs for each part
304
  presigned_urls = {}
305
+ logger.info(f"Generating {parts} presigned URLs...")
306
  for part_number in range(1, parts + 1):
307
  presigned_url = s3.generate_presigned_url(
308
  'upload_part',
 
315
  ExpiresIn=3600 # 1 hour expiry
316
  )
317
  presigned_urls[str(part_number)] = presigned_url
318
+ logger.debug(f"Generated presigned URL for part {part_number}")
319
+
320
+ logger.info(f"Generated {len(presigned_urls)} presigned URLs")
321
 
322
+ # Store upload info in both memory and cache
323
  expires_at = datetime.now() + timedelta(hours=24)
324
+ upload_info = {
325
  'course_id': course_id,
326
  'lecture_id': lecture_id,
327
  'key': key,
 
336
  'expires_at': expires_at
337
  }
338
 
339
+ # Save to cache for persistence across server restarts
340
+ save_upload_to_cache(upload_id, upload_info)
341
+
342
+ # Store in memory for faster access
343
+ active_uploads[upload_id] = upload_info
344
+
345
  logger.info(f"Upload initialized with ID: {upload_id}")
346
 
347
  return JSONResponse({
 
355
  logger.error(f"Error initializing upload: {str(e)}")
356
  raise HTTPException(status_code=500, detail=f"Failed to initialize upload: {str(e)}")
357
 
 
358
  @router.post("/upload/upload-part")
359
  async def upload_part_status(
360
  request: Request,
 
363
  etag: str = Form(...),
364
  auth_token: Optional[str] = None
365
  ):
366
+ # Get user info from token
367
+ user_info = get_user_info_from_token(request, auth_token)
368
+ instructor_id = user_info['user_id']
369
 
370
  # Verify upload exists and belongs to this instructor
371
+ upload_info = get_upload_info(upload_id)
372
+ if not upload_info:
373
  raise HTTPException(status_code=404, detail="Upload not found")
374
 
 
375
  if upload_info['instructor_id'] != instructor_id:
376
  raise HTTPException(status_code=403, detail="Not authorized to access this upload")
377
+ # Check if upload is still valid
 
378
  if upload_info['expires_at'] < datetime.now():
379
  raise HTTPException(status_code=400, detail="Upload expired")
380
 
381
+ # Validate and clean up ETag format
382
+ def normalize_etag(etag_value):
383
+ """Normalize ETag format to ensure proper S3 compatibility"""
384
+ if not etag_value:
385
+ return None
386
+
387
+ # Remove any existing quotes first
388
+ clean_etag = etag_value.strip().strip('"')
389
+
390
+ # Add quotes back for S3 compatibility
391
+ normalized = f'"{clean_etag}"'
392
+ return normalized
393
+
394
+ etag = normalize_etag(etag)
395
+ if not etag:
396
+ raise HTTPException(status_code=400, detail=f"Invalid ETag provided for part {part_number}")
397
+
398
+ logger.info(f"Normalized ETag for part {part_number}: {etag}")
399
+
400
  # Add part info
401
+ part_info = {
402
  'ETag': etag,
403
  'PartNumber': part_number
404
+ }
405
+ upload_info['parts'].append(part_info)
406
 
407
  upload_info['parts_received'] += 1
408
 
409
+ # Save part to cache for persistence
410
+ save_upload_part_to_cache(upload_id, part_number, etag)
411
+
412
+ # Update the upload info in cache
413
+ save_upload_to_cache(upload_id, upload_info)
414
+
415
+ # Log progress with detailed part info
416
+ logger.info(f"Upload {upload_id}: Part {part_number} received with ETag {etag}. Progress: {upload_info['parts_received']}/{upload_info['parts_expected']}")
417
+ logger.info(f"Current parts for upload {upload_id}: {[p['PartNumber'] for p in upload_info['parts']]}")
418
 
419
  return JSONResponse({
420
  'upload_id': upload_id,
 
424
  'progress': f"{int((upload_info['parts_received'] / upload_info['parts_expected']) * 100)}%"
425
  })
426
 
 
427
  @router.post("/upload/complete-upload")
428
  async def complete_upload(
429
  request: Request,
430
  upload_id: str = Form(...),
431
  auth_token: Optional[str] = None
432
  ):
433
+ # Get user info from token
434
+ user_info = get_user_info_from_token(request, auth_token)
435
+ instructor_id = user_info['user_id']
436
 
437
  # Verify upload exists and belongs to this instructor
438
+ upload_info = get_upload_info(upload_id)
439
+ if not upload_info:
440
  raise HTTPException(status_code=404, detail="Upload not found")
441
 
 
442
  if upload_info['instructor_id'] != instructor_id:
443
  raise HTTPException(status_code=403, detail="Not authorized to access this upload")
444
 
 
465
  # Sort parts by part number
466
  parts = sorted(upload_info['parts'], key=lambda x: x['PartNumber'])
467
 
468
+ # Log the parts being sent to S3
469
+ logger.info(f"Completing upload {upload_id} with {len(parts)} parts:")
470
+ for i, part in enumerate(parts):
471
+ logger.info(f" Part {i+1}: PartNumber={part['PartNumber']}, ETag={part['ETag']}")
472
+
473
  # Complete multipart upload
474
+ try:
475
+ response = s3.complete_multipart_upload(
476
+ Bucket=BUCKET_NAME,
477
+ Key=upload_info['key'],
478
+ UploadId=upload_id,
479
+ MultipartUpload={
480
+ 'Parts': parts
481
+ }
482
+ )
483
+ logger.info(f"S3 complete_multipart_upload response: {response}")
484
+ except ClientError as s3_error:
485
+ logger.error(f"S3 ClientError during completion: {s3_error}")
486
+ raise s3_error
487
+ except Exception as s3_error:
488
+ logger.error(f"S3 Error during completion: {s3_error}")
489
+ raise s3_error
490
 
491
  # Get the full video URL
492
  video_url = f"https://{BUCKET_NAME}.s3-{REGION}.amazonaws.com/{upload_info['key']}"
493
 
494
+ # Clean up the upload from memory and cache
495
  course_id = upload_info['course_id']
496
  lecture_id = upload_info['lecture_id']
497
+
498
+ # Remove from cache
499
+ remove_upload_from_cache(upload_id)
500
+
501
+ # Remove from memory
502
+ if upload_id in active_uploads:
503
+ del active_uploads[upload_id]
504
 
505
  # Invalidate cache for this course
506
  invalidate_cache_pattern(f"lectures:id:{lecture_id}:*")
 
535
 
536
  raise HTTPException(status_code=500, detail=f"Failed to complete upload: {str(e)}")
537
 
 
538
  @router.post("/upload/abort-upload")
539
  async def abort_upload(
540
  request: Request,
541
  upload_id: str = Form(...),
542
  auth_token: Optional[str] = None
543
  ):
544
+ # Get user info from token
545
+ user_info = get_user_info_from_token(request, auth_token)
546
+ instructor_id = user_info['user_id']
547
 
548
  # Verify upload exists and belongs to this instructor
549
+ upload_info = get_upload_info(upload_id)
550
+ if not upload_info:
551
  raise HTTPException(status_code=404, detail="Upload not found")
552
 
 
553
  if upload_info['instructor_id'] != instructor_id:
554
  raise HTTPException(status_code=403, detail="Not authorized to access this upload")
555
 
 
561
  UploadId=upload_id
562
  )
563
 
564
+ # Clean up from cache and memory
565
  course_id = upload_info['course_id']
566
  lecture_id = upload_info['lecture_id']
567
+
568
+ remove_upload_from_cache(upload_id)
569
+ if upload_id in active_uploads:
570
+ del active_uploads[upload_id]
571
 
572
  logger.info(f"Upload {upload_id} aborted for course {course_id}, lecture {lecture_id}")
573
 
 
586
 
587
  raise HTTPException(status_code=500, detail=f"Failed to abort upload: {str(e)}")
588
 
 
589
  @router.get("/upload/status/{upload_id}")
590
  async def get_upload_status(
591
  request: Request,
592
  upload_id: str,
593
  auth_token: Optional[str] = None
594
  ):
595
+ # Get user info from token
596
+ user_info = get_user_info_from_token(request, auth_token)
597
+ instructor_id = user_info['user_id']
598
 
599
  # Verify upload exists and belongs to this instructor
600
+ upload_info = get_upload_info(upload_id)
601
+ if not upload_info:
602
  raise HTTPException(status_code=404, detail="Upload not found")
603
 
 
604
  if upload_info['instructor_id'] != instructor_id:
605
  raise HTTPException(status_code=403, detail="Not authorized to access this upload")
606
 
 
620
  'expires_at': upload_info['expires_at'].isoformat()
621
  })
622
 
 
623
  @router.get("/upload/active-uploads")
624
  async def list_active_uploads(
625
  request: Request,
626
  auth_token: Optional[str] = None
627
  ):
628
+ # Get user info from token
629
+ user_info = get_user_info_from_token(request, auth_token)
630
+ instructor_id = user_info['user_id']
631
 
632
+ # Filter uploads for this instructor from in-memory storage
633
+ # Note: This only shows uploads that are currently in memory
634
+ # For complete persistence, you'd need to also query cache patterns
635
  instructor_uploads = {}
636
  for upload_id, info in active_uploads.items():
637
  if info['instructor_id'] == instructor_id:
 
650
  'count': len(instructor_uploads)
651
  })
652
 
 
653
  @router.post("/upload/cleanup")
654
  async def cleanup_uploads(
655
  request: Request,
656
  auth_token: Optional[str] = None
657
  ):
658
+ # Get user info from token (but don't restrict to instructors for cleanup)
659
+ user_info = get_user_info_from_token(request, auth_token)
 
 
 
 
660
 
661
  now = datetime.now()
662
  expired_uploads = []
663
 
664
+ # Clean up from memory and also check cache for other expired uploads
665
+ for upload_id in list(active_uploads.keys()):
666
+ upload_info = get_upload_info(upload_id)
667
+ if upload_info and upload_info['expires_at'] < now:
668
  try:
669
  # Abort the upload in S3
670
  s3.abort_multipart_upload(
671
  Bucket=BUCKET_NAME,
672
+ Key=upload_info['key'],
673
  UploadId=upload_id
674
  )
675
  except Exception as e:
676
  logger.error(f"Error aborting expired upload {upload_id}: {str(e)}")
677
 
678
+ # Remove from cache and memory
679
+ remove_upload_from_cache(upload_id)
680
+ if upload_id in active_uploads:
681
+ del active_uploads[upload_id]
682
  expired_uploads.append(upload_id)
683
 
684
  return JSONResponse({
 
691
  """Periodically clean up expired uploads"""
692
  while True:
693
  now = datetime.now()
694
+ # Create a copy of the keys to avoid dictionary size change during iteration
695
+ upload_ids = list(active_uploads.keys())
696
+
697
+ for upload_id in upload_ids:
698
+ upload_info = get_upload_info(upload_id)
699
+ if upload_info and upload_info['expires_at'] < now:
700
  try:
701
  # Abort the upload in S3
702
  s3.abort_multipart_upload(
703
  Bucket=BUCKET_NAME,
704
+ Key=upload_info['key'],
705
  UploadId=upload_id
706
  )
707
  logger.info(f"Automatically cleaned up expired upload {upload_id}")
708
  except Exception as e:
709
  logger.error(f"Error cleaning up expired upload {upload_id}: {str(e)}")
710
 
711
+ # Remove from cache and memory
712
+ remove_upload_from_cache(upload_id)
713
+ if upload_id in active_uploads:
714
+ del active_uploads[upload_id]
715
 
716
  # Wait for 1 hour before next cleanup
717
  await asyncio.sleep(3600)
718
 
719
+ # Background cleanup task storage
720
+ cleanup_task = None
721
+
722
+ # Function to start background cleanup task
723
+ def start_cleanup_task():
724
+ """Start the background cleanup task"""
725
+ global cleanup_task
726
+ if cleanup_task is None or cleanup_task.done():
727
+ cleanup_task = asyncio.create_task(background_cleanup())
728
+ logger.info("Background cleanup task started")
729
+ return cleanup_task
730
+
731
+ @router.post("/courses/{course_id}/lectures/{lecture_id}/upload-video")
732
+ async def upload_video_standard(
733
+ request: Request,
734
+ course_id: int,
735
+ lecture_id: int,
736
+ video: UploadFile = File(...),
737
+ auth_token: Optional[str] = None
738
+ ):
739
+ # Get user info from token
740
+ user_info = get_user_info_from_token(request, auth_token)
741
+ instructor_id = user_info['user_id']
742
+
743
+ # Verify this instructor owns this course and lecture exists (with retry for newly created lectures)
744
+ max_retries = 3
745
+ retry_delay = 0.5 # 500ms delay between retries
746
+
747
+ for attempt in range(max_retries):
748
+ conn = connect_db()
749
+ try:
750
+ with conn.cursor() as cursor:
751
+ cursor.execute(
752
+ "SELECT CourseID FROM Courses WHERE CourseID = %s AND InstructorID = %s",
753
+ (course_id, instructor_id)
754
+ )
755
+ if not cursor.fetchone():
756
+ raise HTTPException(status_code=403, detail="Not authorized to modify this course")
757
+
758
+ # Verify lecture belongs to this course (with retry for newly created lectures)
759
+ cursor.execute(
760
+ "SELECT LectureID FROM Lectures WHERE LectureID = %s AND CourseID = %s",
761
+ (lecture_id, course_id)
762
+ )
763
+ lecture_result = cursor.fetchone()
764
+
765
+ if lecture_result:
766
+ # Lecture found, break out of retry loop
767
+ break
768
+ elif attempt < max_retries - 1:
769
+ # Lecture not found, but we have more retries
770
+ logger.info(f"Lecture {lecture_id} not found on attempt {attempt + 1}, retrying in {retry_delay}s...")
771
+ time.sleep(retry_delay)
772
+ retry_delay *= 1.5 # Exponential backoff
773
+ else:
774
+ # Final attempt failed
775
+ raise HTTPException(status_code=404, detail="Lecture not found or doesn't belong to this course")
776
+ finally:
777
+ conn.close()
778
+
779
+ # If we reached here and haven't broken out, we need to retry
780
+ if attempt < max_retries - 1:
781
+ continue
782
+ else:
783
+ break
784
+
785
+ try:
786
+ # File validation
787
+ if not video.content_type or not video.content_type.startswith('video/'):
788
+ raise HTTPException(status_code=400, detail="Invalid file type. Please upload a video file")
789
+
790
+ # Check file size (limit to 100MB for standard upload)
791
+ video.file.seek(0, 2) # Seek to end
792
+ file_size = video.file.tell()
793
+ video.file.seek(0) # Reset to beginning
794
+
795
+ if file_size > 100 * 1024 * 1024: # 100MB
796
+ raise HTTPException(status_code=400, detail="File too large for standard upload. Use chunked upload for files over 100MB")
797
+
798
+ # Generate S3 key for the video
799
+ key = get_lecture_video_key(course_id, lecture_id)
800
+
801
+ logger.info(f"Starting standard upload for course {course_id}, lecture {lecture_id}, size: {file_size}")
802
+
803
+ # Upload to S3
804
+ s3.upload_fileobj(
805
+ video.file,
806
+ BUCKET_NAME,
807
+ key,
808
+ ExtraArgs={
809
+ 'ContentType': video.content_type,
810
+ 'ACL': 'public-read',
811
+ 'ContentDisposition': 'inline'
812
+ }
813
+ )
814
+
815
+ # Generate the video URL
816
+ video_url = f"https://{BUCKET_NAME}.s3-{REGION}.amazonaws.com/{key}"
817
+
818
+ # Invalidate cache for this lecture and course
819
+ invalidate_cache_pattern(f"lectures:id:{lecture_id}:*")
820
+ invalidate_cache_pattern(f"courses:id:{course_id}:*")
821
+
822
+ logger.info(f"Standard upload completed successfully for course {course_id}, lecture {lecture_id}")
823
+
824
+ return JSONResponse({
825
+ 'message': 'Video uploaded successfully',
826
+ 'video_url': video_url,
827
+ 'course_id': course_id,
828
+ 'lecture_id': lecture_id,
829
+ 'key': key,
830
+ 'file_size': file_size
831
+ })
832
+
833
+ except Exception as e:
834
+ logger.error(f"Error in standard upload for course {course_id}, lecture {lecture_id}: {str(e)}")
835
+ raise HTTPException(status_code=500, detail=f"Failed to upload video: {str(e)}")
836
+
837
+ # Backend-Proxied Chunked Upload Endpoints (to avoid CORS issues)
838
+
839
+ @router.post("/upload/proxy/init-upload")
840
+ async def init_proxy_upload(
841
+ request: Request,
842
+ course_id: int = Form(...),
843
+ lecture_id: int = Form(...),
844
+ file_size: int = Form(...),
845
+ file_type: str = Form(...),
846
+ parts: int = Form(...),
847
+ auth_token: Optional[str] = None
848
+ ):
849
+ """Initialize a backend-proxied chunked upload"""
850
+ # Get user info from token
851
+ user_info = get_user_info_from_token(request, auth_token)
852
+ instructor_id = user_info['user_id']
853
+
854
+ # Validate course and lecture ownership (similar to existing init_upload)
855
+ max_retries = 3
856
+ retry_delay = 1.0 # Start with 1 second
857
+
858
+ for attempt in range(max_retries):
859
+ try:
860
+ conn = connect_db()
861
+ cursor = conn.cursor()
862
+
863
+ # Check if lecture exists and belongs to instructor
864
+ cursor.execute("""
865
+ SELECT l.LectureID, c.InstructorID
866
+ FROM Lectures l
867
+ JOIN Courses c ON l.CourseID = c.CourseID
868
+ WHERE l.LectureID = %s AND l.CourseID = %s
869
+ """, (lecture_id, course_id))
870
+
871
+ result = cursor.fetchone()
872
+ if result and result[1] == instructor_id:
873
+ break
874
+ else:
875
+ if attempt == max_retries - 1:
876
+ conn.close()
877
+ raise HTTPException(status_code=404, detail="Lecture not found or doesn't belong to this course")
878
+ else:
879
+ conn.close()
880
+ await asyncio.sleep(retry_delay)
881
+ retry_delay *= 1.5
882
+ continue
883
+
884
+ except Exception as e:
885
+ if attempt < max_retries - 1:
886
+ if conn:
887
+ conn.close()
888
+ await asyncio.sleep(retry_delay)
889
+ retry_delay *= 1.5
890
+ continue
891
+ else:
892
+ if conn:
893
+ conn.close()
894
+ raise HTTPException(status_code=404, detail="Lecture not found or doesn't belong to this course")
895
+ finally:
896
+ if conn:
897
+ conn.close()
898
+
899
+ break
900
+
901
+ # Generate S3 key for the video
902
+ key = get_lecture_video_key(course_id, lecture_id)
903
+
904
+ logger.info(f"Initializing backend-proxied upload for course {course_id}, lecture {lecture_id}, size: {file_size}, parts: {parts}")
905
+
906
+ try:
907
+ # Create multipart upload
908
+ multipart_upload = s3.create_multipart_upload(
909
+ Bucket=BUCKET_NAME,
910
+ Key=key,
911
+ ContentType=file_type,
912
+ ACL='public-read',
913
+ ContentDisposition='inline'
914
+ )
915
+
916
+ upload_id = multipart_upload['UploadId']
917
+ logger.info(f"Backend-proxied multipart upload created with ID: {upload_id}")
918
+
919
+ # Store upload info in both memory and cache (no presigned URLs needed)
920
+ expires_at = datetime.now() + timedelta(hours=24)
921
+ upload_info = {
922
+ 'course_id': course_id,
923
+ 'lecture_id': lecture_id,
924
+ 'instructor_id': instructor_id,
925
+ 'key': key,
926
+ 'file_size': file_size,
927
+ 'file_type': file_type,
928
+ 'parts_expected': parts,
929
+ 'parts_received': 0,
930
+ 'parts': [],
931
+ 'created_at': datetime.now(),
932
+ 'expires_at': expires_at,
933
+ 'upload_type': 'proxy' # Mark as backend-proxied
934
+ }
935
+
936
+ active_uploads[upload_id] = upload_info
937
+ save_upload_to_cache(upload_id, upload_info)
938
+
939
+ logger.info(f"Backend-proxied upload {upload_id} initialized successfully")
940
+
941
+ return JSONResponse({
942
+ 'upload_id': upload_id,
943
+ 'parts_expected': parts,
944
+ 'message': 'Backend-proxied upload initialized successfully'
945
+ })
946
+
947
+ except Exception as e:
948
+ logger.error(f"Error initializing backend-proxied upload: {str(e)}")
949
+ raise HTTPException(status_code=500, detail=f"Failed to initialize upload: {str(e)}")
950
+
951
+
952
+ @router.post("/upload/proxy/upload-part")
953
+ async def upload_part_proxy(
954
+ request: Request,
955
+ upload_id: str = Form(...),
956
+ part_number: int = Form(...),
957
+ chunk: UploadFile = File(...),
958
+ auth_token: Optional[str] = None
959
+ ):
960
+ """Upload a single part through backend proxy"""
961
+ # Get user info from token
962
+ user_info = get_user_info_from_token(request, auth_token)
963
+ instructor_id = user_info['user_id']
964
+
965
+ # Verify upload exists and belongs to this instructor
966
+ upload_info = get_upload_info(upload_id)
967
+ if not upload_info:
968
+ raise HTTPException(status_code=404, detail="Upload not found")
969
+
970
+ if upload_info['instructor_id'] != instructor_id:
971
+ raise HTTPException(status_code=403, detail="Not authorized to access this upload")
972
+
973
+ # Check if upload is still valid
974
+ if upload_info['expires_at'] < datetime.now():
975
+ raise HTTPException(status_code=400, detail="Upload expired")
976
+
977
+ # Verify this is a backend-proxied upload
978
+ if upload_info.get('upload_type') != 'proxy':
979
+ raise HTTPException(status_code=400, detail="This upload is not configured for backend proxy")
980
+
981
+ try:
982
+ # Read the chunk data
983
+ chunk_data = await chunk.read()
984
+
985
+ logger.info(f"Uploading part {part_number} for upload {upload_id} via backend proxy (size: {len(chunk_data)} bytes)")
986
+
987
+ # Upload the part directly to S3 via backend
988
+ response = s3.upload_part(
989
+ Bucket=BUCKET_NAME,
990
+ Key=upload_info['key'],
991
+ PartNumber=part_number,
992
+ UploadId=upload_id,
993
+ Body=chunk_data
994
+ )
995
+
996
+ etag = response['ETag']
997
+ logger.info(f"Part {part_number} uploaded successfully via backend proxy, ETag: {etag}")
998
+
999
+ # Add part info
1000
+ part_info = {
1001
+ 'ETag': etag,
1002
+ 'PartNumber': part_number
1003
+ }
1004
+ upload_info['parts'].append(part_info)
1005
+ upload_info['parts_received'] += 1
1006
+
1007
+ # Save part to cache for persistence
1008
+ save_upload_part_to_cache(upload_id, part_number, etag)
1009
+
1010
+ # Update the upload info in cache
1011
+ save_upload_to_cache(upload_id, upload_info)
1012
+
1013
+ logger.info(f"Backend-proxied upload {upload_id}: Part {part_number} received. Progress: {upload_info['parts_received']}/{upload_info['parts_expected']}")
1014
+
1015
+ return JSONResponse({
1016
+ 'upload_id': upload_id,
1017
+ 'part_number': part_number,
1018
+ 'etag': etag,
1019
+ 'parts_received': upload_info['parts_received'],
1020
+ 'parts_expected': upload_info['parts_expected'],
1021
+ 'progress': f"{int((upload_info['parts_received'] / upload_info['parts_expected']) * 100)}%"
1022
+ })
1023
+
1024
+ except Exception as e:
1025
+ logger.error(f"Error uploading part {part_number} via backend proxy for upload {upload_id}: {str(e)}")
1026
+ raise HTTPException(status_code=500, detail=f"Failed to upload part: {str(e)}")
1027
+
1028
+
1029
+ @router.post("/upload/proxy/complete-upload")
1030
+ async def complete_proxy_upload(
1031
+ request: Request,
1032
+ upload_id: str = Form(...),
1033
+ auth_token: Optional[str] = None
1034
+ ):
1035
+ """Complete a backend-proxied chunked upload"""
1036
+ # Get user info from token
1037
+ user_info = get_user_info_from_token(request, auth_token)
1038
+ instructor_id = user_info['user_id']
1039
+
1040
+ # Verify upload exists and belongs to this instructor
1041
+ upload_info = get_upload_info(upload_id)
1042
+ if not upload_info:
1043
+ raise HTTPException(status_code=404, detail="Upload not found")
1044
+
1045
+ if upload_info['instructor_id'] != instructor_id:
1046
+ raise HTTPException(status_code=403, detail="Not authorized to access this upload")
1047
+
1048
+ # Check if upload is still valid
1049
+ if upload_info['expires_at'] < datetime.now():
1050
+ # Try to abort the upload
1051
+ try:
1052
+ s3.abort_multipart_upload(
1053
+ Bucket=BUCKET_NAME,
1054
+ Key=upload_info['key'],
1055
+ UploadId=upload_id
1056
+ )
1057
+ except Exception as e:
1058
+ logger.error(f"Error aborting expired backend-proxied upload {upload_id}: {str(e)}")
1059
+
1060
+ if upload_id in active_uploads:
1061
+ del active_uploads[upload_id]
1062
+ raise HTTPException(status_code=400, detail="Upload expired")
1063
+
1064
+ # Verify this is a backend-proxied upload
1065
+ if upload_info.get('upload_type') != 'proxy':
1066
+ raise HTTPException(status_code=400, detail="This upload is not configured for backend proxy")
1067
+
1068
+ # Verify all parts are received
1069
+ if upload_info['parts_received'] != upload_info['parts_expected']:
1070
+ raise HTTPException(status_code=400, detail=f"Not all parts received. Expected {upload_info['parts_expected']}, got {upload_info['parts_received']}")
1071
+
1072
+ try:
1073
+ # Sort parts by part number
1074
+ parts = sorted(upload_info['parts'], key=lambda x: x['PartNumber'])
1075
+
1076
+ logger.info(f"Completing backend-proxied upload {upload_id} with {len(parts)} parts")
1077
+
1078
+ # Complete multipart upload
1079
+ response = s3.complete_multipart_upload(
1080
+ Bucket=BUCKET_NAME,
1081
+ Key=upload_info['key'],
1082
+ UploadId=upload_id,
1083
+ MultipartUpload={
1084
+ 'Parts': parts
1085
+ }
1086
+ )
1087
+
1088
+ # Get the full video URL
1089
+ video_url = f"https://{BUCKET_NAME}.s3-{REGION}.amazonaws.com/{upload_info['key']}"
1090
+
1091
+ # Clean up the upload from memory and cache
1092
+ course_id = upload_info['course_id']
1093
+ lecture_id = upload_info['lecture_id']
1094
+
1095
+ # Remove from cache
1096
+ remove_upload_from_cache(upload_id)
1097
+
1098
+ # Remove from memory
1099
+ if upload_id in active_uploads:
1100
+ del active_uploads[upload_id]
1101
+
1102
+ # Invalidate cache for this course
1103
+ invalidate_cache_pattern(f"lectures:id:{lecture_id}:*")
1104
+ invalidate_cache_pattern(f"courses:id:{course_id}:*")
1105
+
1106
+ logger.info(f"Backend-proxied upload {upload_id} completed successfully for course {course_id}, lecture {lecture_id}")
1107
+
1108
+ return JSONResponse({
1109
+ 'message': 'Backend-proxied upload completed successfully',
1110
+ 'video_url': video_url,
1111
+ 'course_id': course_id,
1112
+ 'lecture_id': lecture_id,
1113
+ 'key': upload_info['key']
1114
+ })
1115
+
1116
+ except Exception as e:
1117
+ logger.error(f"Error completing backend-proxied upload {upload_id}: {str(e)}")
1118
+
1119
+ # Try to abort the upload
1120
+ try:
1121
+ s3.abort_multipart_upload(
1122
+ Bucket=BUCKET_NAME,
1123
+ Key=upload_info['key'],
1124
+ UploadId=upload_id
1125
+ )
1126
+ except Exception as abort_error:
1127
+ logger.error(f"Error aborting failed backend-proxied upload {upload_id}: {str(abort_error)}")
1128
+
1129
+ # Clean up
1130
+ if upload_id in active_uploads:
1131
+ del active_uploads[upload_id]
1132
+
1133
+ raise HTTPException(status_code=500, detail=f"Failed to complete upload: {str(e)}")