tlong-ds commited on
Commit
3f3b9b3
·
verified ·
1 Parent(s): a3fefcd

Upload upload_endpoints.py

Browse files
Files changed (1) hide show
  1. services/api/upload_endpoints.py +494 -0
services/api/upload_endpoints.py ADDED
@@ -0,0 +1,494 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import APIRouter, HTTPException, Depends, Request, UploadFile, File, Form
2
+ from fastapi.responses import JSONResponse
3
+ import boto3
4
+ import json
5
+ import os
6
+ import uuid
7
+ import logging
8
+ import asyncio
9
+ from typing import Dict, Optional, List
10
+ from pydantic import BaseModel
11
+ from io import BytesIO
12
+ from datetime import datetime, timedelta
13
+ from botocore.exceptions import ClientError
14
+ from services.api.db.token_utils import decode_token
15
+ from services.api.api_endpoints import connect_db
16
+ from services.utils.api_cache import invalidate_cache_pattern
17
+
18
+ # Configure logging for upload operations
19
+ logging.basicConfig(level=logging.INFO)
20
+ logger = logging.getLogger("upload_endpoints")
21
+
22
+ # Initialize S3 client
23
+ AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID")
24
+ AWS_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
25
+ REGION = os.getenv("REGION_NAME", "ap-southeast-1")
26
+ BUCKET_NAME = "tlhmaterials"
27
+
28
+ s3 = boto3.client('s3',
29
+ aws_access_key_id=AWS_ACCESS_KEY,
30
+ aws_secret_access_key=AWS_SECRET_KEY,
31
+ region_name=REGION
32
+ )
33
+
34
+ # Router initialization
35
+ router = APIRouter(
36
+ tags=["uploads"],
37
+ responses={404: {"description": "Not found"}},
38
+ )
39
+
40
+ # In-memory storage for active multipart uploads
41
+ # Structure: {upload_id: {parts: [], course_id: str, lecture_id: str, upload_key: str, ...}}
42
+ active_uploads = {}
43
+
44
+ # Models
45
+ class InitUploadResponse(BaseModel):
46
+ upload_id: str
47
+ presigned_urls: Dict[str, str]
48
+ key: str
49
+ expires_at: datetime
50
+
51
+ class CompletedPart(BaseModel):
52
+ ETag: str
53
+ PartNumber: int
54
+
55
+ class CompleteUploadRequest(BaseModel):
56
+ upload_id: str
57
+ parts: List[CompletedPart]
58
+
59
+ # Middleware to check instructor permissions
60
+ async def verify_instructor(request: Request, auth_token: Optional[str] = None):
61
+ if not auth_token:
62
+ auth_header = request.headers.get('Authorization')
63
+ if auth_header and auth_header.startswith('Bearer '):
64
+ auth_token = auth_header.split(' ')[1]
65
+ else:
66
+ raise HTTPException(status_code=401, detail="Authentication required")
67
+
68
+ try:
69
+ user_data = decode_token(auth_token)
70
+ username = user_data.get('username')
71
+ role = user_data.get('role')
72
+ instructor_id = user_data.get('user_id')
73
+
74
+ if role != "Instructor":
75
+ raise HTTPException(status_code=403, detail="Only instructors can upload files")
76
+
77
+ # Return instructor ID for use in the endpoints
78
+ return instructor_id or username
79
+ except Exception as e:
80
+ logger.error(f"Token verification error: {str(e)}")
81
+ raise HTTPException(status_code=401, detail="Invalid authentication token")
82
+
83
+ # Helper to generate upload key for lecture video
84
+ def get_lecture_video_key(course_id: int, lecture_id: int):
85
+ return f"videos/cid{course_id}/lid{lecture_id}/vid_lecture.mp4"
86
+
87
+ # 1. Initialize multipart upload
88
+ @router.post("/upload/init-upload")
89
+ async def init_upload(
90
+ request: Request,
91
+ course_id: int = Form(...),
92
+ lecture_id: int = Form(...),
93
+ file_size: int = Form(...),
94
+ file_type: str = Form(...),
95
+ parts: int = Form(...),
96
+ auth_token: Optional[str] = None
97
+ ):
98
+ # Verify instructor permissions
99
+ instructor_id = await verify_instructor(request, auth_token)
100
+
101
+ # Verify this instructor owns this course
102
+ conn = connect_db()
103
+ try:
104
+ with conn.cursor() as cursor:
105
+ cursor.execute(
106
+ "SELECT CourseID FROM Courses WHERE CourseID = %s AND InstructorID = %s",
107
+ (course_id, instructor_id)
108
+ )
109
+ if not cursor.fetchone():
110
+ raise HTTPException(status_code=403, detail="Not authorized to modify this course")
111
+
112
+ # Verify lecture belongs to this course
113
+ cursor.execute(
114
+ "SELECT LectureID FROM Lectures WHERE LectureID = %s AND CourseID = %s",
115
+ (lecture_id, course_id)
116
+ )
117
+ if not cursor.fetchone():
118
+ raise HTTPException(status_code=404, detail="Lecture not found or doesn't belong to this course")
119
+ finally:
120
+ conn.close()
121
+
122
+ # Generate S3 key for the video
123
+ key = get_lecture_video_key(course_id, lecture_id)
124
+
125
+ # Log upload initialization
126
+ logger.info(f"Initializing upload for course {course_id}, lecture {lecture_id}, size: {file_size}, parts: {parts}")
127
+
128
+ try:
129
+ # Create multipart upload
130
+ multipart_upload = s3.create_multipart_upload(
131
+ Bucket=BUCKET_NAME,
132
+ Key=key,
133
+ ContentType=file_type,
134
+ ACL='public-read',
135
+ ContentDisposition='inline'
136
+ )
137
+
138
+ upload_id = multipart_upload['UploadId']
139
+
140
+ # Generate presigned URLs for each part
141
+ presigned_urls = {}
142
+ for part_number in range(1, parts + 1):
143
+ presigned_url = s3.generate_presigned_url(
144
+ 'upload_part',
145
+ Params={
146
+ 'Bucket': BUCKET_NAME,
147
+ 'Key': key,
148
+ 'UploadId': upload_id,
149
+ 'PartNumber': part_number
150
+ },
151
+ ExpiresIn=3600 # 1 hour expiry
152
+ )
153
+ presigned_urls[str(part_number)] = presigned_url
154
+
155
+ # Store upload info in memory
156
+ expires_at = datetime.now() + timedelta(hours=24)
157
+ active_uploads[upload_id] = {
158
+ 'course_id': course_id,
159
+ 'lecture_id': lecture_id,
160
+ 'key': key,
161
+ 'file_size': file_size,
162
+ 'file_type': file_type,
163
+ 'parts_expected': parts,
164
+ 'parts_received': 0,
165
+ 'parts': [],
166
+ 'instructor_id': instructor_id,
167
+ 'status': 'initialized',
168
+ 'created_at': datetime.now(),
169
+ 'expires_at': expires_at
170
+ }
171
+
172
+ logger.info(f"Upload initialized with ID: {upload_id}")
173
+
174
+ return JSONResponse({
175
+ 'upload_id': upload_id,
176
+ 'presigned_urls': presigned_urls,
177
+ 'key': key,
178
+ 'expires_at': expires_at.isoformat()
179
+ })
180
+
181
+ except Exception as e:
182
+ logger.error(f"Error initializing upload: {str(e)}")
183
+ raise HTTPException(status_code=500, detail=f"Failed to initialize upload: {str(e)}")
184
+
185
+ # 2. Upload part status update
186
+ @router.post("/upload/upload-part")
187
+ async def upload_part_status(
188
+ request: Request,
189
+ upload_id: str = Form(...),
190
+ part_number: int = Form(...),
191
+ etag: str = Form(...),
192
+ auth_token: Optional[str] = None
193
+ ):
194
+ # Verify instructor permissions
195
+ instructor_id = await verify_instructor(request, auth_token)
196
+
197
+ # Verify upload exists and belongs to this instructor
198
+ if upload_id not in active_uploads:
199
+ raise HTTPException(status_code=404, detail="Upload not found")
200
+
201
+ upload_info = active_uploads[upload_id]
202
+ if upload_info['instructor_id'] != instructor_id:
203
+ raise HTTPException(status_code=403, detail="Not authorized to access this upload")
204
+
205
+ # Check if upload is still valid
206
+ if upload_info['expires_at'] < datetime.now():
207
+ raise HTTPException(status_code=400, detail="Upload expired")
208
+
209
+ # Add part info
210
+ upload_info['parts'].append({
211
+ 'ETag': etag,
212
+ 'PartNumber': part_number
213
+ })
214
+
215
+ upload_info['parts_received'] += 1
216
+
217
+ # Log progress
218
+ logger.info(f"Upload {upload_id}: Part {part_number} received. Progress: {upload_info['parts_received']}/{upload_info['parts_expected']}")
219
+
220
+ return JSONResponse({
221
+ 'upload_id': upload_id,
222
+ 'part_number': part_number,
223
+ 'parts_received': upload_info['parts_received'],
224
+ 'parts_expected': upload_info['parts_expected'],
225
+ 'progress': f"{int((upload_info['parts_received'] / upload_info['parts_expected']) * 100)}%"
226
+ })
227
+
228
+ # 3. Complete multipart upload
229
+ @router.post("/upload/complete-upload")
230
+ async def complete_upload(
231
+ request: Request,
232
+ upload_id: str = Form(...),
233
+ auth_token: Optional[str] = None
234
+ ):
235
+ # Verify instructor permissions
236
+ instructor_id = await verify_instructor(request, auth_token)
237
+
238
+ # Verify upload exists and belongs to this instructor
239
+ if upload_id not in active_uploads:
240
+ raise HTTPException(status_code=404, detail="Upload not found")
241
+
242
+ upload_info = active_uploads[upload_id]
243
+ if upload_info['instructor_id'] != instructor_id:
244
+ raise HTTPException(status_code=403, detail="Not authorized to access this upload")
245
+
246
+ # Check if upload is still valid
247
+ if upload_info['expires_at'] < datetime.now():
248
+ # Try to abort the upload
249
+ try:
250
+ s3.abort_multipart_upload(
251
+ Bucket=BUCKET_NAME,
252
+ Key=upload_info['key'],
253
+ UploadId=upload_id
254
+ )
255
+ except Exception as e:
256
+ logger.error(f"Error aborting expired upload {upload_id}: {str(e)}")
257
+
258
+ del active_uploads[upload_id]
259
+ raise HTTPException(status_code=400, detail="Upload expired")
260
+
261
+ # Verify all parts are received
262
+ if upload_info['parts_received'] != upload_info['parts_expected']:
263
+ raise HTTPException(status_code=400, detail=f"Not all parts received. Expected {upload_info['parts_expected']}, got {upload_info['parts_received']}")
264
+
265
+ try:
266
+ # Sort parts by part number
267
+ parts = sorted(upload_info['parts'], key=lambda x: x['PartNumber'])
268
+
269
+ # Complete multipart upload
270
+ response = s3.complete_multipart_upload(
271
+ Bucket=BUCKET_NAME,
272
+ Key=upload_info['key'],
273
+ UploadId=upload_id,
274
+ MultipartUpload={
275
+ 'Parts': parts
276
+ }
277
+ )
278
+
279
+ # Get the full video URL
280
+ video_url = f"https://{BUCKET_NAME}.s3-{REGION}.amazonaws.com/{upload_info['key']}"
281
+
282
+ # Clean up the upload from memory
283
+ course_id = upload_info['course_id']
284
+ lecture_id = upload_info['lecture_id']
285
+ del active_uploads[upload_id]
286
+
287
+ # Invalidate cache for this course
288
+ invalidate_cache_pattern(f"lectures:id:{lecture_id}:*")
289
+ invalidate_cache_pattern(f"courses:id:{course_id}:*")
290
+
291
+ logger.info(f"Upload {upload_id} completed successfully for course {course_id}, lecture {lecture_id}")
292
+
293
+ return JSONResponse({
294
+ 'message': 'Upload completed successfully',
295
+ 'video_url': video_url,
296
+ 'course_id': course_id,
297
+ 'lecture_id': lecture_id,
298
+ 'key': upload_info['key']
299
+ })
300
+
301
+ except Exception as e:
302
+ logger.error(f"Error completing upload {upload_id}: {str(e)}")
303
+
304
+ # Try to abort the upload
305
+ try:
306
+ s3.abort_multipart_upload(
307
+ Bucket=BUCKET_NAME,
308
+ Key=upload_info['key'],
309
+ UploadId=upload_id
310
+ )
311
+ except Exception as abort_error:
312
+ logger.error(f"Error aborting failed upload {upload_id}: {str(abort_error)}")
313
+
314
+ # Clean up
315
+ if upload_id in active_uploads:
316
+ del active_uploads[upload_id]
317
+
318
+ raise HTTPException(status_code=500, detail=f"Failed to complete upload: {str(e)}")
319
+
320
+ # 4. Abort multipart upload
321
+ @router.post("/upload/abort-upload")
322
+ async def abort_upload(
323
+ request: Request,
324
+ upload_id: str = Form(...),
325
+ auth_token: Optional[str] = None
326
+ ):
327
+ # Verify instructor permissions
328
+ instructor_id = await verify_instructor(request, auth_token)
329
+
330
+ # Verify upload exists and belongs to this instructor
331
+ if upload_id not in active_uploads:
332
+ raise HTTPException(status_code=404, detail="Upload not found")
333
+
334
+ upload_info = active_uploads[upload_id]
335
+ if upload_info['instructor_id'] != instructor_id:
336
+ raise HTTPException(status_code=403, detail="Not authorized to access this upload")
337
+
338
+ try:
339
+ # Abort multipart upload
340
+ s3.abort_multipart_upload(
341
+ Bucket=BUCKET_NAME,
342
+ Key=upload_info['key'],
343
+ UploadId=upload_id
344
+ )
345
+
346
+ # Clean up
347
+ course_id = upload_info['course_id']
348
+ lecture_id = upload_info['lecture_id']
349
+ del active_uploads[upload_id]
350
+
351
+ logger.info(f"Upload {upload_id} aborted for course {course_id}, lecture {lecture_id}")
352
+
353
+ return JSONResponse({
354
+ 'message': 'Upload aborted successfully',
355
+ 'course_id': course_id,
356
+ 'lecture_id': lecture_id
357
+ })
358
+
359
+ except Exception as e:
360
+ logger.error(f"Error aborting upload {upload_id}: {str(e)}")
361
+
362
+ # Clean up anyway
363
+ if upload_id in active_uploads:
364
+ del active_uploads[upload_id]
365
+
366
+ raise HTTPException(status_code=500, detail=f"Failed to abort upload: {str(e)}")
367
+
368
+ # 5. Get upload status
369
+ @router.get("/upload/status/{upload_id}")
370
+ async def get_upload_status(
371
+ request: Request,
372
+ upload_id: str,
373
+ auth_token: Optional[str] = None
374
+ ):
375
+ # Verify instructor permissions
376
+ instructor_id = await verify_instructor(request, auth_token)
377
+
378
+ # Verify upload exists and belongs to this instructor
379
+ if upload_id not in active_uploads:
380
+ raise HTTPException(status_code=404, detail="Upload not found")
381
+
382
+ upload_info = active_uploads[upload_id]
383
+ if upload_info['instructor_id'] != instructor_id:
384
+ raise HTTPException(status_code=403, detail="Not authorized to access this upload")
385
+
386
+ # Calculate progress
387
+ progress = int((upload_info['parts_received'] / upload_info['parts_expected']) * 100)
388
+
389
+ return JSONResponse({
390
+ 'upload_id': upload_id,
391
+ 'status': upload_info['status'],
392
+ 'course_id': upload_info['course_id'],
393
+ 'lecture_id': upload_info['lecture_id'],
394
+ 'parts_received': upload_info['parts_received'],
395
+ 'parts_expected': upload_info['parts_expected'],
396
+ 'progress': f"{progress}%",
397
+ 'progress_value': progress,
398
+ 'created_at': upload_info['created_at'].isoformat(),
399
+ 'expires_at': upload_info['expires_at'].isoformat()
400
+ })
401
+
402
+ # 6. List active uploads
403
+ @router.get("/upload/active-uploads")
404
+ async def list_active_uploads(
405
+ request: Request,
406
+ auth_token: Optional[str] = None
407
+ ):
408
+ # Verify instructor permissions
409
+ instructor_id = await verify_instructor(request, auth_token)
410
+
411
+ # Filter uploads for this instructor
412
+ instructor_uploads = {}
413
+ for upload_id, info in active_uploads.items():
414
+ if info['instructor_id'] == instructor_id:
415
+ # Don't include sensitive data
416
+ instructor_uploads[upload_id] = {
417
+ 'course_id': info['course_id'],
418
+ 'lecture_id': info['lecture_id'],
419
+ 'status': info['status'],
420
+ 'progress': f"{int((info['parts_received'] / info['parts_expected']) * 100)}%",
421
+ 'created_at': info['created_at'].isoformat(),
422
+ 'expires_at': info['expires_at'].isoformat()
423
+ }
424
+
425
+ return JSONResponse({
426
+ 'active_uploads': instructor_uploads,
427
+ 'count': len(instructor_uploads)
428
+ })
429
+
430
+ # Clean up expired uploads
431
+ @router.post("/upload/cleanup")
432
+ async def cleanup_uploads(
433
+ request: Request,
434
+ auth_token: Optional[str] = None
435
+ ):
436
+ # Verify instructor permissions
437
+ instructor_id = await verify_instructor(request, auth_token)
438
+
439
+ # Only allow system admin to perform cleanup
440
+ if instructor_id != 1: # Assuming instructor ID 1 is the admin
441
+ raise HTTPException(status_code=403, detail="Only system admin can perform this operation")
442
+
443
+ now = datetime.now()
444
+ expired_uploads = []
445
+
446
+ for upload_id, info in list(active_uploads.items()):
447
+ if info['expires_at'] < now:
448
+ try:
449
+ # Abort the upload in S3
450
+ s3.abort_multipart_upload(
451
+ Bucket=BUCKET_NAME,
452
+ Key=info['key'],
453
+ UploadId=upload_id
454
+ )
455
+ except Exception as e:
456
+ logger.error(f"Error aborting expired upload {upload_id}: {str(e)}")
457
+
458
+ # Remove from active uploads
459
+ del active_uploads[upload_id]
460
+ expired_uploads.append(upload_id)
461
+
462
+ return JSONResponse({
463
+ 'cleaned_uploads': expired_uploads,
464
+ 'count': len(expired_uploads)
465
+ })
466
+
467
+ # Periodically clean up expired uploads
468
+ async def background_cleanup():
469
+ """Periodically clean up expired uploads"""
470
+ while True:
471
+ now = datetime.now()
472
+ for upload_id, info in list(active_uploads.items()):
473
+ if info['expires_at'] < now:
474
+ try:
475
+ # Abort the upload in S3
476
+ s3.abort_multipart_upload(
477
+ Bucket=BUCKET_NAME,
478
+ Key=info['key'],
479
+ UploadId=upload_id
480
+ )
481
+ logger.info(f"Automatically cleaned up expired upload {upload_id}")
482
+ except Exception as e:
483
+ logger.error(f"Error cleaning up expired upload {upload_id}: {str(e)}")
484
+
485
+ # Remove from active uploads
486
+ del active_uploads[upload_id]
487
+
488
+ # Wait for 1 hour before next cleanup
489
+ await asyncio.sleep(3600)
490
+
491
+ # Start the background cleanup task when the module is imported
492
+ @router.on_event("startup")
493
+ async def start_background_tasks():
494
+ asyncio.create_task(background_cleanup())