Fred808 commited on
Commit
fcbaea0
·
verified ·
1 Parent(s): 20bb311

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +400 -14
app.py CHANGED
@@ -1,19 +1,34 @@
1
  from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks, Query
2
- from fastapi.responses import FileResponse, JSONResponse
3
  from fastapi.middleware.cors import CORSMiddleware
4
  import os
 
5
  import shutil
6
  import uuid
7
  import time
8
- from typing import Dict, List, Optional
9
  import threading
10
  import logging
 
11
  from pathlib import Path
 
 
 
12
 
13
  # Configure logging
14
- logging.basicConfig(level=logging.INFO)
 
 
 
 
 
 
 
15
  logger = logging.getLogger(__name__)
16
 
 
 
 
 
17
  # Import processing functions and variables
18
  from processing_logic import (
19
  processing_status,
@@ -26,7 +41,12 @@ from processing_logic import (
26
  DEFAULT_RAR_LIMIT
27
  )
28
 
29
- app = FastAPI(title="MP4 Processing API", description="API for uploading, processing, and downloading MP4 files")
 
 
 
 
 
30
 
31
  # Configure CORS
32
  app.add_middleware(
@@ -37,13 +57,99 @@ app.add_middleware(
37
  allow_headers=["*"],
38
  )
39
 
40
- # Define MP4_UPLOAD_FOLDER if not imported from processing_logic
41
- MP4_UPLOAD_FOLDER = os.path.join(UPLOAD_DIRECTORY, "uploads") if 'UPLOAD_DIRECTORY' in globals() else "uploads"
42
  os.makedirs(MP4_UPLOAD_FOLDER, exist_ok=True)
43
  os.makedirs(MP4_OUTPUT_FOLDER, exist_ok=True)
44
 
45
  processing_thread = None
46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
  # ==== HELPER FUNCTIONS ====
48
 
49
  def save_file(uploaded_file: UploadFile, save_path: str):
@@ -55,25 +161,86 @@ def log_request(endpoint: str, params: dict = None):
55
  """Log API requests for debugging"""
56
  logger.info(f"API Request: {endpoint} - Params: {params}")
57
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58
  # === ROUTES ===
59
 
60
  @app.get("/")
61
  async def root():
62
  """API root endpoint"""
63
  return {
64
- "message": "MP4 Processing API",
65
  "version": "1.0.0",
66
  "status": "running",
67
  "endpoints": {
68
- "upload": "POST /upload - Upload MP4 file",
69
- "download": "GET /download?course={course}&file={file} - Download MP4 file",
70
- "list": "GET /list - List uploaded files",
71
- "courses": "GET /courses - List all course folders",
72
- "images": "GET /images/{course_folder:path} - List MP4s in course",
73
- "debug": "GET /debug/structure - Debug file structure"
 
 
 
 
 
 
 
74
  }
75
  }
76
 
 
 
77
  @app.get("/courses")
78
  async def get_courses():
79
  """List all top-level course folders."""
@@ -156,11 +323,230 @@ async def debug_structure():
156
  "structure": structure
157
  }
158
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
159
  @app.on_event("startup")
160
  async def startup_event():
161
  """Run the processing loop in the background when the API starts"""
162
  global processing_thread
163
- logger.info("Starting up MP4 Processing API...")
164
 
165
  if not (processing_thread and processing_thread.is_alive()):
166
  logger.info("🚀 Starting background processing thread...")
 
1
  from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks, Query
2
+ from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
3
  from fastapi.middleware.cors import CORSMiddleware
4
  import os
5
+ import json
6
  import shutil
7
  import uuid
8
  import time
 
9
  import threading
10
  import logging
11
+ import aiofiles
12
  from pathlib import Path
13
+ from typing import Dict, List, Optional
14
+ from dataclasses import dataclass, asdict
15
+ from datetime import datetime
16
 
17
  # Configure logging
18
+ logging.basicConfig(
19
+ level=logging.INFO,
20
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
21
+ handlers=[
22
+ logging.FileHandler('api.log'),
23
+ logging.StreamHandler()
24
+ ]
25
+ )
26
  logger = logging.getLogger(__name__)
27
 
28
+ # Import error logger
29
+ from error_logger import ErrorLogger
30
+ error_logger = ErrorLogger()
31
+
32
  # Import processing functions and variables
33
  from processing_logic import (
34
  processing_status,
 
41
  DEFAULT_RAR_LIMIT
42
  )
43
 
44
+ # Middleware Configuration
45
+ LOCK_TIMEOUT = 300 # 5 minutes in seconds
46
+ STATE_FILE = "middleware_state.json"
47
+ CHUNK_SIZE = 8192 # 8KB chunks for streaming
48
+
49
+ app = FastAPI(title="Unified MP4 Processing & Distribution API")
50
 
51
  # Configure CORS
52
  app.add_middleware(
 
57
  allow_headers=["*"],
58
  )
59
 
60
+ # Define folders
61
+ MP4_UPLOAD_FOLDER = os.path.join(UPLOAD_DIRECTORY, "uploads")
62
  os.makedirs(MP4_UPLOAD_FOLDER, exist_ok=True)
63
  os.makedirs(MP4_OUTPUT_FOLDER, exist_ok=True)
64
 
65
  processing_thread = None
66
 
67
+ # ==== MIDDLEWARE STATE MANAGEMENT ====
68
+
69
+ @dataclass
70
+ class FileState:
71
+ path: str
72
+ locked: bool
73
+ lock_holder: Optional[str] = None
74
+ lock_time: Optional[float] = None
75
+ download_count: int = 0
76
+ last_access: Optional[float] = None
77
+
78
+ class MiddlewareState:
79
+ def __init__(self):
80
+ self.files: Dict[str, FileState] = {}
81
+ self.load_state()
82
+
83
+ def load_state(self):
84
+ """Load state from disk"""
85
+ if os.path.exists(STATE_FILE):
86
+ try:
87
+ with open(STATE_FILE, 'r') as f:
88
+ data = json.load(f)
89
+ self.files = {
90
+ k: FileState(**v) for k, v in data.items()
91
+ }
92
+ except Exception as e:
93
+ logger.error(f"Error loading state: {e}")
94
+ self.files = {}
95
+
96
+ def save_state(self):
97
+ """Save state to disk"""
98
+ try:
99
+ with open(STATE_FILE, 'w') as f:
100
+ json.dump({
101
+ k: asdict(v) for k, v in self.files.items()
102
+ }, f, indent=2)
103
+ except Exception as e:
104
+ logger.error(f"Error saving state: {e}")
105
+
106
+ def clean_expired_locks(self):
107
+ """Remove expired locks"""
108
+ now = time.time()
109
+ for file_id, state in self.files.items():
110
+ if state.locked and (now - state.lock_time) > LOCK_TIMEOUT:
111
+ state.locked = False
112
+ state.lock_holder = None
113
+ state.lock_time = None
114
+ self.save_state()
115
+
116
+ def get_next_available_file(self, requester_id: str) -> Optional[str]:
117
+ """Get next unlocked file"""
118
+ self.clean_expired_locks()
119
+
120
+ # First try to find any file this requester has locked
121
+ for file_id, state in self.files.items():
122
+ if state.locked and state.lock_holder == requester_id:
123
+ return file_id
124
+
125
+ # Then look for any unlocked file
126
+ for file_id, state in self.files.items():
127
+ if not state.locked:
128
+ state.locked = True
129
+ state.lock_holder = requester_id
130
+ state.lock_time = time.time()
131
+ self.save_state()
132
+ return file_id
133
+
134
+ return None
135
+
136
+ def release_lock(self, file_id: str, requester_id: str) -> bool:
137
+ """Release a file lock"""
138
+ if file_id in self.files:
139
+ state = self.files[file_id]
140
+ if state.lock_holder == requester_id:
141
+ state.locked = False
142
+ state.lock_holder = None
143
+ state.lock_time = None
144
+ state.last_access = time.time()
145
+ state.download_count += 1
146
+ self.save_state()
147
+ return True
148
+ return False
149
+
150
+ # Global state
151
+ middleware_state = MiddlewareState()
152
+
153
  # ==== HELPER FUNCTIONS ====
154
 
155
  def save_file(uploaded_file: UploadFile, save_path: str):
 
161
  """Log API requests for debugging"""
162
  logger.info(f"API Request: {endpoint} - Params: {params}")
163
 
164
+ # === ERROR HANDLING ===
165
+
166
+ @app.exception_handler(HTTPException)
167
+ async def http_exception_handler(request, exc):
168
+ """Handle HTTP exceptions with detailed logging"""
169
+ error_id = error_logger.log_error(
170
+ exc,
171
+ request.url.path,
172
+ request_info={
173
+ "method": request.method,
174
+ "url": str(request.url),
175
+ "headers": dict(request.headers),
176
+ "query_params": dict(request.query_params),
177
+ },
178
+ context={
179
+ "status_code": exc.status_code,
180
+ "detail": exc.detail
181
+ }
182
+ )
183
+ return JSONResponse(
184
+ status_code=exc.status_code,
185
+ content={
186
+ "error": exc.detail,
187
+ "error_id": error_id,
188
+ "type": "http_error",
189
+ "status_code": exc.status_code
190
+ }
191
+ )
192
+
193
+ @app.exception_handler(Exception)
194
+ async def general_exception_handler(request, exc):
195
+ """Handle all other exceptions with detailed logging"""
196
+ error_id = error_logger.log_error(
197
+ exc,
198
+ request.url.path,
199
+ request_info={
200
+ "method": request.method,
201
+ "url": str(request.url),
202
+ "headers": dict(request.headers),
203
+ "query_params": dict(request.query_params),
204
+ }
205
+ )
206
+ return JSONResponse(
207
+ status_code=500,
208
+ content={
209
+ "error": "Internal server error",
210
+ "error_id": error_id,
211
+ "type": "server_error",
212
+ "detail": str(exc) if app.debug else "An unexpected error occurred"
213
+ }
214
+ )
215
+
216
  # === ROUTES ===
217
 
218
  @app.get("/")
219
  async def root():
220
  """API root endpoint"""
221
  return {
222
+ "message": "Unified MP4 Processing & Distribution API",
223
  "version": "1.0.0",
224
  "status": "running",
225
  "endpoints": {
226
+ "processing": {
227
+ "courses": "GET /courses - List all course folders",
228
+ "images": "GET /images/{course_folder:path} - List MP4s in course",
229
+ "download": "GET /download?course={course}&file={file} - Download MP4 file",
230
+ "debug": "GET /debug/structure - Debug file structure"
231
+ },
232
+ "middleware": {
233
+ "status": "GET /middleware/status - Get middleware status",
234
+ "register": "POST /middleware/register - Register a new file",
235
+ "next": "GET /middleware/next - Get next available file",
236
+ "release": "POST /middleware/release/{file_id} - Release a file lock",
237
+ "stream": "GET /middleware/stream/{file_id} - Stream a file"
238
+ }
239
  }
240
  }
241
 
242
+ # ==== PROCESSING ENDPOINTS ====
243
+
244
  @app.get("/courses")
245
  async def get_courses():
246
  """List all top-level course folders."""
 
323
  "structure": structure
324
  }
325
 
326
+ # ==== ERROR MONITORING ENDPOINTS ====
327
+
328
+ @app.get("/errors/recent")
329
+ async def get_recent_errors(limit: int = Query(10, ge=1, le=100)):
330
+ """Get most recent errors"""
331
+ return error_logger.get_recent_errors(limit)
332
+
333
+ @app.get("/errors/{error_id}")
334
+ async def get_error_details(error_id: str):
335
+ """Get detailed information about a specific error"""
336
+ error = error_logger.get_error(error_id)
337
+ if not error:
338
+ raise HTTPException(status_code=404, detail="Error ID not found")
339
+ return error
340
+
341
+ @app.get("/errors/summary")
342
+ async def get_error_summary():
343
+ """Get summary of errors by type"""
344
+ return error_logger.get_error_summary()
345
+
346
+ # ==== MIDDLEWARE ENDPOINTS ====
347
+
348
+ @app.get("/middleware/status")
349
+ async def get_middleware_status():
350
+ """Get middleware status"""
351
+ courses = sum(1 for f in middleware_state.files.keys() if f.startswith("course:"))
352
+ images = sum(1 for f in middleware_state.files.keys() if f.startswith("image:"))
353
+
354
+ return {
355
+ "active_locks": sum(1 for f in middleware_state.files.values() if f.locked),
356
+ "total_files": len(middleware_state.files),
357
+ "total_courses": courses,
358
+ "total_images": images,
359
+ "downloads_completed": sum(f.download_count for f in middleware_state.files.values())
360
+ }
361
+
362
+ @app.get("/middleware/status/course/{course_id}")
363
+ async def get_course_status(course_id: str):
364
+ """Get status of a specific course"""
365
+ file_id = f"course:{course_id}"
366
+ if file_id not in middleware_state.files:
367
+ raise HTTPException(status_code=404, detail="Course not found")
368
+
369
+ state = middleware_state.files[file_id]
370
+ return {
371
+ "course_id": course_id,
372
+ "locked": state.locked,
373
+ "lock_holder": state.lock_holder,
374
+ "lock_time": state.lock_time,
375
+ "download_count": state.download_count,
376
+ "last_access": state.last_access
377
+ }
378
+
379
+ @app.get("/middleware/status/image/{course_folder}/{file_id}")
380
+ async def get_image_status(course_folder: str, file_id: str):
381
+ """Get status of a specific image"""
382
+ full_id = f"image:{course_folder}/{file_id}"
383
+ if full_id not in middleware_state.files:
384
+ raise HTTPException(status_code=404, detail="Image not found")
385
+
386
+ state = middleware_state.files[full_id]
387
+ return {
388
+ "file_id": file_id,
389
+ "course": course_folder,
390
+ "locked": state.locked,
391
+ "lock_holder": state.lock_holder,
392
+ "lock_time": state.lock_time,
393
+ "download_count": state.download_count,
394
+ "last_access": state.last_access
395
+ }
396
+
397
+ @app.post("/middleware/register")
398
+ async def register_file(file_path: str):
399
+ """Register a new file in the system"""
400
+ if not os.path.exists(file_path):
401
+ raise HTTPException(status_code=404, detail="File not found")
402
+
403
+ file_id = os.path.basename(file_path)
404
+ if file_id not in middleware_state.files:
405
+ middleware_state.files[file_id] = FileState(
406
+ path=file_path,
407
+ locked=False
408
+ )
409
+ middleware_state.save_state()
410
+
411
+ return {"file_id": file_id}
412
+
413
+ @app.get("/middleware/next/course")
414
+ async def get_next_course(requester_id: str):
415
+ """Get next available course folder"""
416
+ try:
417
+ courses = [d.name for d in Path(MP4_OUTPUT_FOLDER).iterdir() if d.is_dir()]
418
+ for course in courses:
419
+ course_id = f"course:{course}"
420
+ if course_id not in middleware_state.files:
421
+ middleware_state.files[course_id] = FileState(
422
+ path=str(Path(MP4_OUTPUT_FOLDER) / course),
423
+ locked=False
424
+ )
425
+ middleware_state.save_state()
426
+
427
+ if not middleware_state.files[course_id].locked:
428
+ middleware_state.files[course_id].locked = True
429
+ middleware_state.files[course_id].lock_holder = requester_id
430
+ middleware_state.files[course_id].lock_time = time.time()
431
+ middleware_state.save_state()
432
+ return {
433
+ "course_id": course,
434
+ "path": str(Path(MP4_OUTPUT_FOLDER) / course),
435
+ "lock_time": middleware_state.files[course_id].lock_time
436
+ }
437
+ except Exception as e:
438
+ raise HTTPException(status_code=500, detail=f"Failed to get next course: {e}")
439
+
440
+ raise HTTPException(status_code=404, detail="No courses available")
441
+
442
+ @app.get("/middleware/next/image/{course_folder}")
443
+ async def get_next_image(course_folder: str, requester_id: str):
444
+ """Get next available image from a specific course"""
445
+ course_path = Path(MP4_OUTPUT_FOLDER) / course_folder
446
+
447
+ if not course_path.is_dir():
448
+ raise HTTPException(status_code=404, detail="Course folder not found")
449
+
450
+ try:
451
+ mp4_files = [f for f in course_path.iterdir() if f.is_file() and f.suffix.lower() == ".mp4"]
452
+ for mp4_file in mp4_files:
453
+ file_id = f"image:{course_folder}/{mp4_file.name}"
454
+ if file_id not in middleware_state.files:
455
+ middleware_state.files[file_id] = FileState(
456
+ path=str(mp4_file),
457
+ locked=False
458
+ )
459
+ middleware_state.save_state()
460
+
461
+ if not middleware_state.files[file_id].locked:
462
+ middleware_state.files[file_id].locked = True
463
+ middleware_state.files[file_id].lock_holder = requester_id
464
+ middleware_state.files[file_id].lock_time = time.time()
465
+ middleware_state.save_state()
466
+ return {
467
+ "file_id": mp4_file.name,
468
+ "course": course_folder,
469
+ "path": str(mp4_file),
470
+ "lock_time": middleware_state.files[file_id].lock_time
471
+ }
472
+ except Exception as e:
473
+ raise HTTPException(status_code=500, detail=f"Failed to get next image: {e}")
474
+
475
+ raise HTTPException(status_code=404, detail="No images available in this course")
476
+
477
+ @app.get("/middleware/next/any")
478
+ async def get_next_any_file(requester_id: str):
479
+ """Get next available file of any type"""
480
+ file_id = middleware_state.get_next_available_file(requester_id)
481
+ if not file_id:
482
+ raise HTTPException(status_code=404, detail="No files available")
483
+
484
+ file_state = middleware_state.files[file_id]
485
+ return {
486
+ "file_id": file_id,
487
+ "file_path": file_state.path,
488
+ "lock_time": file_state.lock_time,
489
+ "type": "course" if file_id.startswith("course:") else "image"
490
+ }
491
+
492
+ @app.post("/middleware/release/course/{course_id}")
493
+ async def release_course(course_id: str, requester_id: str):
494
+ """Release a course lock"""
495
+ file_id = f"course:{course_id}"
496
+ if not middleware_state.release_lock(file_id, requester_id):
497
+ raise HTTPException(status_code=403, detail="Not lock holder")
498
+ return {"status": "ok"}
499
+
500
+ @app.post("/middleware/release/image/{course_folder}/{file_id}")
501
+ async def release_image(course_folder: str, file_id: str, requester_id: str):
502
+ """Release an image lock"""
503
+ full_id = f"image:{course_folder}/{file_id}"
504
+ if not middleware_state.release_lock(full_id, requester_id):
505
+ raise HTTPException(status_code=403, detail="Not lock holder")
506
+ return {"status": "ok"}
507
+
508
+ @app.post("/middleware/release/{file_id}")
509
+ async def release_file(file_id: str, requester_id: str):
510
+ """Release any file lock (backward compatibility)"""
511
+ if not middleware_state.release_lock(file_id, requester_id):
512
+ raise HTTPException(status_code=403, detail="Not lock holder")
513
+ return {"status": "ok"}
514
+
515
+ @app.get("/middleware/stream/{file_id}")
516
+ async def stream_file(file_id: str, requester_id: str):
517
+ """Stream a file to client"""
518
+ if file_id not in middleware_state.files:
519
+ raise HTTPException(status_code=404, detail="File not found")
520
+
521
+ file_state = middleware_state.files[file_id]
522
+ if not file_state.locked or file_state.lock_holder != requester_id:
523
+ raise HTTPException(status_code=403, detail="Not lock holder")
524
+
525
+ if not os.path.exists(file_state.path):
526
+ raise HTTPException(status_code=404, detail="File not found on disk")
527
+
528
+ async def file_stream():
529
+ async with aiofiles.open(file_state.path, 'rb') as f:
530
+ while chunk := await f.read(CHUNK_SIZE):
531
+ yield chunk
532
+ # Auto-release lock after successful transfer
533
+ middleware_state.release_lock(file_id, requester_id)
534
+
535
+ return StreamingResponse(
536
+ file_stream(),
537
+ media_type="application/octet-stream",
538
+ headers={
539
+ "Content-Disposition": f"attachment; filename={file_id}"
540
+ }
541
+ )
542
+
543
+ # ==== STARTUP EVENT ====
544
+
545
  @app.on_event("startup")
546
  async def startup_event():
547
  """Run the processing loop in the background when the API starts"""
548
  global processing_thread
549
+ logger.info("Starting up Unified MP4 Processing & Distribution API...")
550
 
551
  if not (processing_thread and processing_thread.is_alive()):
552
  logger.info("🚀 Starting background processing thread...")