Fred808 commited on
Commit
9fea5e6
Β·
verified Β·
1 Parent(s): 8d428f0

Upload 3 files

Browse files
Files changed (3) hide show
  1. app.py +575 -558
  2. frame_extractor.py +55 -0
  3. processing_logic.py +398 -373
app.py CHANGED
@@ -1,559 +1,576 @@
1
- from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks, Query
2
- from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
3
- from fastapi.middleware.cors import CORSMiddleware
4
- import os
5
- import json
6
- import shutil
7
- import uuid
8
- import time
9
- import threading
10
- import logging
11
- import aiofiles
12
- from pathlib import Path
13
- from typing import Dict, List, Optional
14
- from dataclasses import dataclass, asdict
15
- from datetime import datetime
16
-
17
- # Configure logging
18
- logging.basicConfig(
19
- level=logging.INFO,
20
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
21
- handlers=[
22
- logging.FileHandler('api.log'),
23
- logging.StreamHandler()
24
- ]
25
- )
26
- logger = logging.getLogger(__name__)
27
-
28
- # Import error logger
29
- from error_logger import ErrorLogger
30
- error_logger = ErrorLogger()
31
-
32
- # Import processing functions and variables
33
- from processing_logic import (
34
- processing_status,
35
- uploaded_mp4s,
36
- log_message,
37
- process_hf_files_background,
38
- UPLOAD_DIRECTORY,
39
- MP4_OUTPUT_FOLDER,
40
- hf_api,
41
- DEFAULT_RAR_LIMIT
42
- )
43
-
44
- # Middleware Configuration
45
- LOCK_TIMEOUT = 300 # 5 minutes in seconds
46
- STATE_FILE = "middleware_state.json"
47
- CHUNK_SIZE = 8192 # 8KB chunks for streaming
48
-
49
- app = FastAPI(title="Unified MP4 Processing & Distribution API")
50
-
51
- # Configure CORS
52
- app.add_middleware(
53
- CORSMiddleware,
54
- allow_origins=["*"],
55
- allow_credentials=True,
56
- allow_methods=["*"],
57
- allow_headers=["*"],
58
- )
59
-
60
- # Define folders
61
- MP4_UPLOAD_FOLDER = os.path.join(UPLOAD_DIRECTORY, "uploads")
62
- os.makedirs(MP4_UPLOAD_FOLDER, exist_ok=True)
63
- os.makedirs(MP4_OUTPUT_FOLDER, exist_ok=True)
64
-
65
- processing_thread = None
66
-
67
- # ==== MIDDLEWARE STATE MANAGEMENT ====
68
-
69
- @dataclass
70
- class FileState:
71
- path: str
72
- locked: bool
73
- lock_holder: Optional[str] = None
74
- lock_time: Optional[float] = None
75
- download_count: int = 0
76
- last_access: Optional[float] = None
77
-
78
- class MiddlewareState:
79
- def __init__(self):
80
- self.files: Dict[str, FileState] = {}
81
- self.load_state()
82
-
83
- def load_state(self):
84
- """Load state from disk"""
85
- if os.path.exists(STATE_FILE):
86
- try:
87
- with open(STATE_FILE, 'r') as f:
88
- data = json.load(f)
89
- self.files = {
90
- k: FileState(**v) for k, v in data.items()
91
- }
92
- except Exception as e:
93
- logger.error(f"Error loading state: {e}")
94
- self.files = {}
95
-
96
- def save_state(self):
97
- """Save state to disk"""
98
- try:
99
- with open(STATE_FILE, 'w') as f:
100
- json.dump({
101
- k: asdict(v) for k, v in self.files.items()
102
- }, f, indent=2)
103
- except Exception as e:
104
- logger.error(f"Error saving state: {e}")
105
-
106
- def clean_expired_locks(self):
107
- """Remove expired locks"""
108
- now = time.time()
109
- for file_id, state in self.files.items():
110
- if state.locked and (now - state.lock_time) > LOCK_TIMEOUT:
111
- state.locked = False
112
- state.lock_holder = None
113
- state.lock_time = None
114
- self.save_state()
115
-
116
- def get_next_available_file(self, requester_id: str) -> Optional[str]:
117
- """Get next unlocked file"""
118
- self.clean_expired_locks()
119
-
120
- # First try to find any file this requester has locked
121
- for file_id, state in self.files.items():
122
- if state.locked and state.lock_holder == requester_id:
123
- return file_id
124
-
125
- # Then look for any unlocked file
126
- for file_id, state in self.files.items():
127
- if not state.locked:
128
- state.locked = True
129
- state.lock_holder = requester_id
130
- state.lock_time = time.time()
131
- self.save_state()
132
- return file_id
133
-
134
- return None
135
-
136
- def release_lock(self, file_id: str, requester_id: str) -> bool:
137
- """Release a file lock"""
138
- if file_id in self.files:
139
- state = self.files[file_id]
140
- if state.lock_holder == requester_id:
141
- state.locked = False
142
- state.lock_holder = None
143
- state.lock_time = None
144
- state.last_access = time.time()
145
- state.download_count += 1
146
- self.save_state()
147
- return True
148
- return False
149
-
150
- # Global state
151
- middleware_state = MiddlewareState()
152
-
153
- # ==== HELPER FUNCTIONS ====
154
-
155
- def save_file(uploaded_file: UploadFile, save_path: str):
156
- os.makedirs(os.path.dirname(save_path), exist_ok=True)
157
- with open(save_path, "wb") as f:
158
- shutil.copyfileobj(uploaded_file.file, f)
159
-
160
- def log_request(endpoint: str, params: dict = None):
161
- """Log API requests for debugging"""
162
- logger.info(f"API Request: {endpoint} - Params: {params}")
163
-
164
- # === ERROR HANDLING ===
165
-
166
- @app.exception_handler(HTTPException)
167
- async def http_exception_handler(request, exc):
168
- """Handle HTTP exceptions with detailed logging"""
169
- error_id = error_logger.log_error(
170
- exc,
171
- request.url.path,
172
- request_info={
173
- "method": request.method,
174
- "url": str(request.url),
175
- "headers": dict(request.headers),
176
- "query_params": dict(request.query_params),
177
- },
178
- context={
179
- "status_code": exc.status_code,
180
- "detail": exc.detail
181
- }
182
- )
183
- return JSONResponse(
184
- status_code=exc.status_code,
185
- content={
186
- "error": exc.detail,
187
- "error_id": error_id,
188
- "type": "http_error",
189
- "status_code": exc.status_code
190
- }
191
- )
192
-
193
- @app.exception_handler(Exception)
194
- async def general_exception_handler(request, exc):
195
- """Handle all other exceptions with detailed logging"""
196
- error_id = error_logger.log_error(
197
- exc,
198
- request.url.path,
199
- request_info={
200
- "method": request.method,
201
- "url": str(request.url),
202
- "headers": dict(request.headers),
203
- "query_params": dict(request.query_params),
204
- }
205
- )
206
- return JSONResponse(
207
- status_code=500,
208
- content={
209
- "error": "Internal server error",
210
- "error_id": error_id,
211
- "type": "server_error",
212
- "detail": str(exc) if app.debug else "An unexpected error occurred"
213
- }
214
- )
215
-
216
- # === ROUTES ===
217
-
218
- @app.get("/")
219
- async def root():
220
- """API root endpoint"""
221
- return {
222
- "message": "Unified MP4 Processing & Distribution API",
223
- "version": "1.0.0",
224
- "status": "running",
225
- "endpoints": {
226
- "processing": {
227
- "courses": "GET /courses - List all course folders",
228
- "images": "GET /images/{course_folder:path} - List MP4s in course",
229
- "download": "GET /download?course={course}&file={file} - Download MP4 file",
230
- "debug": "GET /debug/structure - Debug file structure"
231
- },
232
- "middleware": {
233
- "status": "GET /middleware/status - Get middleware status",
234
- "register": "POST /middleware/register - Register a new file",
235
- "next": "GET /middleware/next - Get next available file",
236
- "release": "POST /middleware/release/{file_id} - Release a file lock",
237
- "stream": "GET /middleware/stream/{file_id} - Stream a file"
238
- }
239
- }
240
- }
241
-
242
- # ==== PROCESSING ENDPOINTS ====
243
-
244
- @app.get("/courses")
245
- async def get_courses():
246
- """List all top-level course folders."""
247
- try:
248
- courses = [d.name for d in Path(MP4_OUTPUT_FOLDER).iterdir() if d.is_dir()]
249
- return {"courses": courses, "total": len(courses)}
250
- except Exception as e:
251
- raise HTTPException(status_code=500, detail=f"Failed to list courses: {e}")
252
-
253
- @app.get("/images/{course_folder:path}")
254
- async def get_mp4_list(course_folder: str):
255
- """List all MP4 files within a specific course folder."""
256
- course_path = Path(MP4_OUTPUT_FOLDER) / course_folder
257
-
258
- if not course_path.is_dir():
259
- raise HTTPException(status_code=404, detail="Course folder not found")
260
-
261
- try:
262
- mp4_files = [f.name for f in course_path.iterdir() if f.is_file() and f.suffix.lower() == ".mp4"]
263
- return mp4_files
264
- except Exception as e:
265
- raise HTTPException(status_code=500, detail=f"Failed to list MP4s: {e}")
266
-
267
- @app.get("/download")
268
- async def download_mp4(course: str, file: str):
269
- """Download a specific MP4 file from a course folder."""
270
- file_path = Path(MP4_OUTPUT_FOLDER) / course / file
271
-
272
- if not file_path.is_file():
273
- raise HTTPException(status_code=404, detail="File not found")
274
-
275
- return FileResponse(path=file_path, media_type="video/mp4", filename=file)
276
-
277
- @app.get("/debug/structure")
278
- async def debug_structure():
279
- """Debug endpoint to inspect the file structure and sizes."""
280
- mp4_output_folder_path = Path(MP4_OUTPUT_FOLDER)
281
-
282
- structure = {}
283
- total_size_bytes = 0
284
- total_mp4_files = 0
285
-
286
- if not mp4_output_folder_path.exists():
287
- return JSONResponse(content={
288
- "mp4_output_folder": str(mp4_output_folder_path),
289
- "folder_exists": False,
290
- "total_mp4_files": 0,
291
- "total_size_bytes": 0,
292
- "structure": {}
293
- })
294
-
295
- for root, dirs, files in os.walk(mp4_output_folder_path):
296
- current_path = Path(root)
297
- relative_path = str(current_path.relative_to(mp4_output_folder_path))
298
- if relative_path == ".":
299
- relative_path = "/"
300
-
301
- structure[relative_path] = {
302
- "directories": [d for d in dirs],
303
- "mp4_files": [],
304
- "other_files": []
305
- }
306
-
307
- for file in files:
308
- file_full_path = current_path / file
309
- file_size = file_full_path.stat().st_size
310
- total_size_bytes += file_size
311
-
312
- if file.lower().endswith(".mp4"):
313
- structure[relative_path]["mp4_files"].append({"name": file, "size": file_size})
314
- total_mp4_files += 1
315
- else:
316
- structure[relative_path]["other_files"].append({"name": file, "size": file_size})
317
-
318
- return {
319
- "mp4_output_folder": str(mp4_output_folder_path),
320
- "folder_exists": mp4_output_folder_path.exists(),
321
- "total_mp4_files": total_mp4_files,
322
- "total_size_bytes": total_size_bytes,
323
- "structure": structure
324
- }
325
-
326
- # ==== ERROR MONITORING ENDPOINTS ====
327
-
328
- @app.get("/errors/recent")
329
- async def get_recent_errors(limit: int = Query(10, ge=1, le=100)):
330
- """Get most recent errors"""
331
- return error_logger.get_recent_errors(limit)
332
-
333
- @app.get("/errors/{error_id}")
334
- async def get_error_details(error_id: str):
335
- """Get detailed information about a specific error"""
336
- error = error_logger.get_error(error_id)
337
- if not error:
338
- raise HTTPException(status_code=404, detail="Error ID not found")
339
- return error
340
-
341
- @app.get("/errors/summary")
342
- async def get_error_summary():
343
- """Get summary of errors by type"""
344
- return error_logger.get_error_summary()
345
-
346
- # ==== MIDDLEWARE ENDPOINTS ====
347
-
348
- @app.get("/middleware/status")
349
- async def get_middleware_status():
350
- """Get middleware status"""
351
- courses = sum(1 for f in middleware_state.files.keys() if f.startswith("course:"))
352
- images = sum(1 for f in middleware_state.files.keys() if f.startswith("image:"))
353
-
354
- return {
355
- "active_locks": sum(1 for f in middleware_state.files.values() if f.locked),
356
- "total_files": len(middleware_state.files),
357
- "total_courses": courses,
358
- "total_images": images,
359
- "downloads_completed": sum(f.download_count for f in middleware_state.files.values())
360
- }
361
-
362
- @app.get("/middleware/status/course/{course_id}")
363
- async def get_course_status(course_id: str):
364
- """Get status of a specific course"""
365
- file_id = f"course:{course_id}"
366
- if file_id not in middleware_state.files:
367
- raise HTTPException(status_code=404, detail="Course not found")
368
-
369
- state = middleware_state.files[file_id]
370
- return {
371
- "course_id": course_id,
372
- "locked": state.locked,
373
- "lock_holder": state.lock_holder,
374
- "lock_time": state.lock_time,
375
- "download_count": state.download_count,
376
- "last_access": state.last_access
377
- }
378
-
379
- @app.get("/middleware/status/image/{course_folder}/{file_id}")
380
- async def get_image_status(course_folder: str, file_id: str):
381
- """Get status of a specific image"""
382
- full_id = f"image:{course_folder}/{file_id}"
383
- if full_id not in middleware_state.files:
384
- raise HTTPException(status_code=404, detail="Image not found")
385
-
386
- state = middleware_state.files[full_id]
387
- return {
388
- "file_id": file_id,
389
- "course": course_folder,
390
- "locked": state.locked,
391
- "lock_holder": state.lock_holder,
392
- "lock_time": state.lock_time,
393
- "download_count": state.download_count,
394
- "last_access": state.last_access
395
- }
396
-
397
- @app.post("/middleware/register")
398
- async def register_file(file_path: str):
399
- """Register a new file in the system"""
400
- if not os.path.exists(file_path):
401
- raise HTTPException(status_code=404, detail="File not found")
402
-
403
- file_id = os.path.basename(file_path)
404
- if file_id not in middleware_state.files:
405
- middleware_state.files[file_id] = FileState(
406
- path=file_path,
407
- locked=False
408
- )
409
- middleware_state.save_state()
410
-
411
- return {"file_id": file_id}
412
-
413
- @app.get("/middleware/next/course")
414
- async def get_next_course(requester_id: str):
415
- """Get next available course folder"""
416
- try:
417
- courses = [d.name for d in Path(MP4_OUTPUT_FOLDER).iterdir() if d.is_dir()]
418
- for course in courses:
419
- course_id = f"course:{course}"
420
- if course_id not in middleware_state.files:
421
- middleware_state.files[course_id] = FileState(
422
- path=str(Path(MP4_OUTPUT_FOLDER) / course),
423
- locked=False
424
- )
425
- middleware_state.save_state()
426
-
427
- if not middleware_state.files[course_id].locked:
428
- middleware_state.files[course_id].locked = True
429
- middleware_state.files[course_id].lock_holder = requester_id
430
- middleware_state.files[course_id].lock_time = time.time()
431
- middleware_state.save_state()
432
- return {
433
- "course_id": course,
434
- "path": str(Path(MP4_OUTPUT_FOLDER) / course),
435
- "lock_time": middleware_state.files[course_id].lock_time
436
- }
437
- except Exception as e:
438
- raise HTTPException(status_code=500, detail=f"Failed to get next course: {e}")
439
-
440
- raise HTTPException(status_code=404, detail="No courses available")
441
-
442
- @app.get("/middleware/next/image/{course_folder}")
443
- async def get_next_image(course_folder: str, requester_id: str):
444
- """Get next available image from a specific course"""
445
- course_path = Path(MP4_OUTPUT_FOLDER) / course_folder
446
-
447
- if not course_path.is_dir():
448
- raise HTTPException(status_code=404, detail="Course folder not found")
449
-
450
- try:
451
- mp4_files = [f for f in course_path.iterdir() if f.is_file() and f.suffix.lower() == ".mp4"]
452
- for mp4_file in mp4_files:
453
- file_id = f"image:{course_folder}/{mp4_file.name}"
454
- if file_id not in middleware_state.files:
455
- middleware_state.files[file_id] = FileState(
456
- path=str(mp4_file),
457
- locked=False
458
- )
459
- middleware_state.save_state()
460
-
461
- if not middleware_state.files[file_id].locked:
462
- middleware_state.files[file_id].locked = True
463
- middleware_state.files[file_id].lock_holder = requester_id
464
- middleware_state.files[file_id].lock_time = time.time()
465
- middleware_state.save_state()
466
- return {
467
- "file_id": mp4_file.name,
468
- "course": course_folder,
469
- "path": str(mp4_file),
470
- "lock_time": middleware_state.files[file_id].lock_time
471
- }
472
- except Exception as e:
473
- raise HTTPException(status_code=500, detail=f"Failed to get next image: {e}")
474
-
475
- raise HTTPException(status_code=404, detail="No images available in this course")
476
-
477
- @app.get("/middleware/next/any")
478
- async def get_next_any_file(requester_id: str):
479
- """Get next available file of any type"""
480
- file_id = middleware_state.get_next_available_file(requester_id)
481
- if not file_id:
482
- raise HTTPException(status_code=404, detail="No files available")
483
-
484
- file_state = middleware_state.files[file_id]
485
- return {
486
- "file_id": file_id,
487
- "file_path": file_state.path,
488
- "lock_time": file_state.lock_time,
489
- "type": "course" if file_id.startswith("course:") else "image"
490
- }
491
-
492
- @app.post("/middleware/release/course/{course_id}")
493
- async def release_course(course_id: str, requester_id: str):
494
- """Release a course lock"""
495
- file_id = f"course:{course_id}"
496
- if not middleware_state.release_lock(file_id, requester_id):
497
- raise HTTPException(status_code=403, detail="Not lock holder")
498
- return {"status": "ok"}
499
-
500
- @app.post("/middleware/release/image/{course_folder}/{file_id}")
501
- async def release_image(course_folder: str, file_id: str, requester_id: str):
502
- """Release an image lock"""
503
- full_id = f"image:{course_folder}/{file_id}"
504
- if not middleware_state.release_lock(full_id, requester_id):
505
- raise HTTPException(status_code=403, detail="Not lock holder")
506
- return {"status": "ok"}
507
-
508
- @app.post("/middleware/release/{file_id}")
509
- async def release_file(file_id: str, requester_id: str):
510
- """Release any file lock (backward compatibility)"""
511
- if not middleware_state.release_lock(file_id, requester_id):
512
- raise HTTPException(status_code=403, detail="Not lock holder")
513
- return {"status": "ok"}
514
-
515
- @app.get("/middleware/stream/{file_id}")
516
- async def stream_file(file_id: str, requester_id: str):
517
- """Stream a file to client"""
518
- if file_id not in middleware_state.files:
519
- raise HTTPException(status_code=404, detail="File not found")
520
-
521
- file_state = middleware_state.files[file_id]
522
- if not file_state.locked or file_state.lock_holder != requester_id:
523
- raise HTTPException(status_code=403, detail="Not lock holder")
524
-
525
- if not os.path.exists(file_state.path):
526
- raise HTTPException(status_code=404, detail="File not found on disk")
527
-
528
- async def file_stream():
529
- async with aiofiles.open(file_state.path, 'rb') as f:
530
- while chunk := await f.read(CHUNK_SIZE):
531
- yield chunk
532
- # Auto-release lock after successful transfer
533
- middleware_state.release_lock(file_id, requester_id)
534
-
535
- return StreamingResponse(
536
- file_stream(),
537
- media_type="application/octet-stream",
538
- headers={
539
- "Content-Disposition": f"attachment; filename={file_id}"
540
- }
541
- )
542
-
543
- # ==== STARTUP EVENT ====
544
-
545
- @app.on_event("startup")
546
- async def startup_event():
547
- """Run the processing loop in the background when the API starts"""
548
- global processing_thread
549
- logger.info("Starting up Unified MP4 Processing & Distribution API...")
550
-
551
- if not (processing_thread and processing_thread.is_alive()):
552
- logger.info("πŸš€ Starting background processing thread...")
553
- processing_thread = threading.Thread(target=process_hf_files_background)
554
- processing_thread.daemon = True
555
- processing_thread.start()
556
-
557
- if __name__ == "__main__":
558
- import uvicorn
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
559
  uvicorn.run(app, host="0.0.0.0", port=8000)
 
1
+ from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks, Query
2
+ from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
3
+ from fastapi.middleware.cors import CORSMiddleware
4
+ import os
5
+ import json
6
+ import shutil
7
+ import uuid
8
+ import time
9
+ import threading
10
+ import logging
11
+ import aiofiles
12
+ from pathlib import Path
13
+ from typing import Dict, List, Optional
14
+ from dataclasses import dataclass, asdict
15
+ from datetime import datetime
16
+
17
+ # Configure logging
18
+ logging.basicConfig(
19
+ level=logging.INFO,
20
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
21
+ handlers=[
22
+ logging.FileHandler('api.log'),
23
+ logging.StreamHandler()
24
+ ]
25
+ )
26
+ logger = logging.getLogger(__name__)
27
+
28
+ # Import error logger
29
+ from error_logger import ErrorLogger
30
+ error_logger = ErrorLogger()
31
+
32
+ # Import processing functions and variables
33
+ from processing_logic import (
34
+ processing_status,
35
+ uploaded_mp4s,
36
+ log_message,
37
+ process_hf_files_background,
38
+ UPLOAD_DIRECTORY,
39
+ MP4_OUTPUT_FOLDER,
40
+ hf_api,
41
+ DEFAULT_RAR_LIMIT
42
+ )
43
+
44
+ # Middleware Configuration
45
+ LOCK_TIMEOUT = 300 # 5 minutes in seconds
46
+ STATE_FILE = "middleware_state.json"
47
+ CHUNK_SIZE = 8192 # 8KB chunks for streaming
48
+
49
+ app = FastAPI(title="Unified MP4 Processing & Distribution API")
50
+
51
+ # Configure CORS
52
+ app.add_middleware(
53
+ CORSMiddleware,
54
+ allow_origins=["*"],
55
+ allow_credentials=True,
56
+ allow_methods=["*"],
57
+ allow_headers=["*"],
58
+ )
59
+
60
+ # Define folders
61
+ MP4_UPLOAD_FOLDER = os.path.join(UPLOAD_DIRECTORY, "uploads")
62
+ os.makedirs(MP4_UPLOAD_FOLDER, exist_ok=True)
63
+ os.makedirs(MP4_OUTPUT_FOLDER, exist_ok=True)
64
+
65
+ processing_thread = None
66
+
67
+ # ==== MIDDLEWARE STATE MANAGEMENT ====
68
+
69
+ @dataclass
70
+ class FileState:
71
+ path: str
72
+ locked: bool
73
+ lock_holder: Optional[str] = None
74
+ lock_time: Optional[float] = None
75
+ download_count: int = 0
76
+ last_access: Optional[float] = None
77
+
78
+ class MiddlewareState:
79
+ def __init__(self):
80
+ self.files: Dict[str, FileState] = {}
81
+ self.load_state()
82
+
83
+ def load_state(self):
84
+ """Load state from disk"""
85
+ if os.path.exists(STATE_FILE):
86
+ try:
87
+ with open(STATE_FILE, 'r') as f:
88
+ data = json.load(f)
89
+ self.files = {
90
+ k: FileState(**v) for k, v in data.items()
91
+ }
92
+ except Exception as e:
93
+ logger.error(f"Error loading state: {e}")
94
+ self.files = {}
95
+
96
+ def save_state(self):
97
+ """Save state to disk"""
98
+ try:
99
+ with open(STATE_FILE, 'w') as f:
100
+ json.dump({
101
+ k: asdict(v) for k, v in self.files.items()
102
+ }, f, indent=2)
103
+ except Exception as e:
104
+ logger.error(f"Error saving state: {e}")
105
+
106
+ def clean_expired_locks(self):
107
+ """Remove expired locks"""
108
+ now = time.time()
109
+ for file_id, state in self.files.items():
110
+ if state.locked and (now - state.lock_time) > LOCK_TIMEOUT:
111
+ state.locked = False
112
+ state.lock_holder = None
113
+ state.lock_time = None
114
+ self.save_state()
115
+
116
+ def get_next_available_file(self, requester_id: str) -> Optional[str]:
117
+ """Get next unlocked file"""
118
+ self.clean_expired_locks()
119
+
120
+ # First try to find any file this requester has locked
121
+ for file_id, state in self.files.items():
122
+ if state.locked and state.lock_holder == requester_id:
123
+ return file_id
124
+
125
+ # Then look for any unlocked file
126
+ for file_id, state in self.files.items():
127
+ if not state.locked:
128
+ state.locked = True
129
+ state.lock_holder = requester_id
130
+ state.lock_time = time.time()
131
+ self.save_state()
132
+ return file_id
133
+
134
+ return None
135
+
136
+ def release_lock(self, file_id: str, requester_id: str) -> bool:
137
+ """Release a file lock"""
138
+ if file_id in self.files:
139
+ state = self.files[file_id]
140
+ if state.lock_holder == requester_id:
141
+ state.locked = False
142
+ state.lock_holder = None
143
+ state.lock_time = None
144
+ state.last_access = time.time()
145
+ state.download_count += 1
146
+ self.save_state()
147
+ return True
148
+ return False
149
+
150
+ # Global state
151
+ middleware_state = MiddlewareState()
152
+
153
+ # ==== HELPER FUNCTIONS ====
154
+
155
+ def save_file(uploaded_file: UploadFile, save_path: str):
156
+ os.makedirs(os.path.dirname(save_path), exist_ok=True)
157
+ with open(save_path, "wb") as f:
158
+ shutil.copyfileobj(uploaded_file.file, f)
159
+
160
+ def log_request(endpoint: str, params: dict = None):
161
+ """Log API requests for debugging"""
162
+ logger.info(f"API Request: {endpoint} - Params: {params}")
163
+
164
+ # === ERROR HANDLING ===
165
+
166
+ @app.exception_handler(HTTPException)
167
+ async def http_exception_handler(request, exc):
168
+ """Handle HTTP exceptions with detailed logging"""
169
+ error_id = error_logger.log_error(
170
+ exc,
171
+ request.url.path,
172
+ request_info={
173
+ "method": request.method,
174
+ "url": str(request.url),
175
+ "headers": dict(request.headers),
176
+ "query_params": dict(request.query_params),
177
+ },
178
+ context={
179
+ "status_code": exc.status_code,
180
+ "detail": exc.detail
181
+ }
182
+ )
183
+ return JSONResponse(
184
+ status_code=exc.status_code,
185
+ content={
186
+ "error": exc.detail,
187
+ "error_id": error_id,
188
+ "type": "http_error",
189
+ "status_code": exc.status_code
190
+ }
191
+ )
192
+
193
+ @app.exception_handler(Exception)
194
+ async def general_exception_handler(request, exc):
195
+ """Handle all other exceptions with detailed logging"""
196
+ error_id = error_logger.log_error(
197
+ exc,
198
+ request.url.path,
199
+ request_info={
200
+ "method": request.method,
201
+ "url": str(request.url),
202
+ "headers": dict(request.headers),
203
+ "query_params": dict(request.query_params),
204
+ }
205
+ )
206
+ return JSONResponse(
207
+ status_code=500,
208
+ content={
209
+ "error": "Internal server error",
210
+ "error_id": error_id,
211
+ "type": "server_error",
212
+ "detail": str(exc) if app.debug else "An unexpected error occurred"
213
+ }
214
+ )
215
+
216
+ # === ROUTES ===
217
+
218
+ @app.get("/")
219
+ async def root():
220
+ """API root endpoint"""
221
+ return {
222
+ "message": "Unified MP4 Processing & Distribution API",
223
+ "version": "1.0.0",
224
+ "status": "running",
225
+ "endpoints": {
226
+ "processing": {
227
+ "courses": "GET /courses - List all course folders",
228
+ "images": "GET /images/{course_folder:path} - List MP4s in course",
229
+ "download": "GET /download?course={course}&file={file} - Download MP4 file",
230
+ "debug": "GET /debug/structure - Debug file structure"
231
+ },
232
+ "middleware": {
233
+ "status": "GET /middleware/status - Get middleware status",
234
+ "register": "POST /middleware/register - Register a new file",
235
+ "next": "GET /middleware/next - Get next available file",
236
+ "release": "POST /middleware/release/{file_id} - Release a file lock",
237
+ "stream": "GET /middleware/stream/{file_id} - Stream a file"
238
+ }
239
+ }
240
+ }
241
+
242
+ # ==== PROCESSING ENDPOINTS ====
243
+
244
+ @app.get("/courses")
245
+ async def get_courses():
246
+ """List all top-level course folders."""
247
+ try:
248
+ courses = [d.name for d in Path(MP4_OUTPUT_FOLDER).iterdir() if d.is_dir()]
249
+ return {"courses": courses, "total": len(courses)}
250
+ except Exception as e:
251
+ raise HTTPException(status_code=500, detail=f"Failed to list courses: {e}")
252
+
253
+ @app.get("/images/{course_folder:path}")
254
+ async def get_mp4_list(course_folder: str):
255
+ """List all MP4 files within a specific course folder."""
256
+ course_path = Path(MP4_OUTPUT_FOLDER) / course_folder
257
+
258
+ if not course_path.is_dir():
259
+ raise HTTPException(status_code=404, detail="Course folder not found")
260
+
261
+ try:
262
+ mp4_files = [f.name for f in course_path.iterdir() if f.is_file() and f.suffix.lower() == ".mp4"]
263
+ return mp4_files
264
+ except Exception as e:
265
+ raise HTTPException(status_code=500, detail=f"Failed to list MP4s: {e}")
266
+
267
+ @app.get("/download")
268
+ async def download_mp4(course: str, file: str):
269
+ """Download a specific MP4 file from a course folder."""
270
+ file_path = Path(MP4_OUTPUT_FOLDER) / course / file
271
+
272
+ if not file_path.is_file():
273
+ raise HTTPException(status_code=404, detail="File not found")
274
+
275
+ return FileResponse(path=file_path, media_type="video/mp4", filename=file)
276
+
277
+ @app.get("/debug/structure")
278
+ async def debug_structure():
279
+ """Debug endpoint to inspect the file structure and sizes."""
280
+ mp4_output_folder_path = Path(MP4_OUTPUT_FOLDER)
281
+
282
+ structure = {}
283
+ total_size_bytes = 0
284
+ total_mp4_files = 0
285
+
286
+ if not mp4_output_folder_path.exists():
287
+ return JSONResponse(content={
288
+ "mp4_output_folder": str(mp4_output_folder_path),
289
+ "folder_exists": False,
290
+ "total_mp4_files": 0,
291
+ "total_size_bytes": 0,
292
+ "structure": {}
293
+ })
294
+
295
+ for root, dirs, files in os.walk(mp4_output_folder_path):
296
+ current_path = Path(root)
297
+ relative_path = str(current_path.relative_to(mp4_output_folder_path))
298
+ if relative_path == ".":
299
+ relative_path = "/"
300
+
301
+ structure[relative_path] = {
302
+ "directories": [d for d in dirs],
303
+ "mp4_files": [],
304
+ "other_files": []
305
+ }
306
+
307
+ for file in files:
308
+ file_full_path = current_path / file
309
+ file_size = file_full_path.stat().st_size
310
+ total_size_bytes += file_size
311
+
312
+ if file.lower().endswith(".mp4"):
313
+ structure[relative_path]["mp4_files"].append({"name": file, "size": file_size})
314
+ total_mp4_files += 1
315
+ else:
316
+ structure[relative_path]["other_files"].append({"name": file, "size": file_size})
317
+
318
+ return {
319
+ "mp4_output_folder": str(mp4_output_folder_path),
320
+ "folder_exists": mp4_output_folder_path.exists(),
321
+ "total_mp4_files": total_mp4_files,
322
+ "total_size_bytes": total_size_bytes,
323
+ "structure": structure
324
+ }
325
+
326
+ # ==== ERROR MONITORING ENDPOINTS ====
327
+
328
+ @app.get("/errors/recent")
329
+ async def get_recent_errors(limit: int = Query(10, ge=1, le=100)):
330
+ """Get most recent errors"""
331
+ return error_logger.get_recent_errors(limit)
332
+
333
+ @app.get("/errors/{error_id}")
334
+ async def get_error_details(error_id: str):
335
+ """Get detailed information about a specific error"""
336
+ error = error_logger.get_error(error_id)
337
+ if not error:
338
+ raise HTTPException(status_code=404, detail="Error ID not found")
339
+ return error
340
+
341
+ @app.get("/errors/summary")
342
+ async def get_error_summary():
343
+ """Get summary of errors by type"""
344
+ return error_logger.get_error_summary()
345
+
346
+ # ==== MIDDLEWARE ENDPOINTS ====
347
+
348
+ @app.get("/middleware/status")
349
+ async def get_middleware_status():
350
+ """Get middleware status"""
351
+ courses = sum(1 for f in middleware_state.files.keys() if f.startswith("course:"))
352
+ images = sum(1 for f in middleware_state.files.keys() if f.startswith("image:"))
353
+
354
+ return {
355
+ "active_locks": sum(1 for f in middleware_state.files.values() if f.locked),
356
+ "total_files": len(middleware_state.files),
357
+ "total_courses": courses,
358
+ "total_images": images,
359
+ "downloads_completed": sum(f.download_count for f in middleware_state.files.values())
360
+ }
361
+
362
+ @app.get("/middleware/status/course/{course_id}")
363
+ async def get_course_status(course_id: str):
364
+ """Get status of a specific course"""
365
+ file_id = f"course:{course_id}"
366
+ if file_id not in middleware_state.files:
367
+ raise HTTPException(status_code=404, detail="Course not found")
368
+
369
+ state = middleware_state.files[file_id]
370
+ return {
371
+ "course_id": course_id,
372
+ "locked": state.locked,
373
+ "lock_holder": state.lock_holder,
374
+ "lock_time": state.lock_time,
375
+ "download_count": state.download_count,
376
+ "last_access": state.last_access
377
+ }
378
+
379
+ @app.get("/middleware/status/image/{course_folder}/{file_id}")
380
+ async def get_image_status(course_folder: str, file_id: str):
381
+ """Get status of a specific image"""
382
+ full_id = f"image:{course_folder}/{file_id}"
383
+ if full_id not in middleware_state.files:
384
+ raise HTTPException(status_code=404, detail="Image not found")
385
+
386
+ state = middleware_state.files[full_id]
387
+ return {
388
+ "file_id": file_id,
389
+ "course": course_folder,
390
+ "locked": state.locked,
391
+ "lock_holder": state.lock_holder,
392
+ "lock_time": state.lock_time,
393
+ "download_count": state.download_count,
394
+ "last_access": state.last_access
395
+ }
396
+
397
+ @app.post("/middleware/register")
398
+ async def register_file(file_path: str):
399
+ """Register a new file in the system"""
400
+ if not os.path.exists(file_path):
401
+ raise HTTPException(status_code=404, detail="File not found")
402
+
403
+ file_id = os.path.basename(file_path)
404
+ if file_id not in middleware_state.files:
405
+ middleware_state.files[file_id] = FileState(
406
+ path=file_path,
407
+ locked=False
408
+ )
409
+ middleware_state.save_state()
410
+
411
+ return {"file_id": file_id}
412
+
413
+ @app.get("/middleware/next/course")
414
+ async def get_next_course(requester_id: str):
415
+ """Get next available course folder"""
416
+ try:
417
+ courses = [d.name for d in Path(MP4_OUTPUT_FOLDER).iterdir() if d.is_dir()]
418
+ for course in courses:
419
+ course_id = f"course:{course}"
420
+ if course_id not in middleware_state.files:
421
+ middleware_state.files[course_id] = FileState(
422
+ path=str(Path(MP4_OUTPUT_FOLDER) / course),
423
+ locked=False
424
+ )
425
+ middleware_state.save_state()
426
+
427
+ if not middleware_state.files[course_id].locked:
428
+ middleware_state.files[course_id].locked = True
429
+ middleware_state.files[course_id].lock_holder = requester_id
430
+ middleware_state.files[course_id].lock_time = time.time()
431
+ middleware_state.save_state()
432
+ return {
433
+ "course_id": course,
434
+ "path": str(Path(MP4_OUTPUT_FOLDER) / course),
435
+ "lock_time": middleware_state.files[course_id].lock_time
436
+ }
437
+ except Exception as e:
438
+ raise HTTPException(status_code=500, detail=f"Failed to get next course: {e}")
439
+
440
+ raise HTTPException(status_code=404, detail="No courses available")
441
+
442
+ @app.get("/middleware/next/image/{course_folder}")
443
+ async def get_next_image(course_folder: str, requester_id: str):
444
+ """Get next available frame image from a specific course's frames directory"""
445
+ # Look in the frames directory for the course
446
+ frames_path = Path(MP4_OUTPUT_FOLDER) / f"{course_folder}_frames"
447
+
448
+ if not frames_path.is_dir():
449
+ raise HTTPException(status_code=404, detail="Course frames directory not found")
450
+
451
+ try:
452
+ # Get all frame images (jpg files) from all video subdirectories
453
+ frame_files = []
454
+ for video_dir in frames_path.iterdir():
455
+ if video_dir.is_dir():
456
+ frame_files.extend([
457
+ (video_dir.name, f)
458
+ for f in video_dir.iterdir()
459
+ if f.suffix.lower() in ('.jpg', '.jpeg')
460
+ ])
461
+
462
+ # Try to find an unlocked frame
463
+ for video_name, frame_file in frame_files:
464
+ file_id = f"frame:{course_folder}/{video_name}/{frame_file.name}"
465
+
466
+ # Register frame if not in state
467
+ if file_id not in middleware_state.files:
468
+ middleware_state.files[file_id] = FileState(
469
+ path=str(frame_file),
470
+ locked=False
471
+ )
472
+ middleware_state.save_state()
473
+
474
+ # Check if frame is available
475
+ if not middleware_state.files[file_id].locked:
476
+ middleware_state.files[file_id].locked = True
477
+ middleware_state.files[file_id].lock_holder = requester_id
478
+ middleware_state.files[file_id].lock_time = time.time()
479
+ middleware_state.save_state()
480
+
481
+ return {
482
+ "file_id": file_id,
483
+ "course": course_folder,
484
+ "video": video_name,
485
+ "frame": frame_file.name,
486
+ "path": str(frame_file),
487
+ "lock_time": middleware_state.files[file_id].lock_time
488
+ }
489
+ except Exception as e:
490
+ raise HTTPException(status_code=500, detail=f"Failed to get next image: {e}")
491
+
492
+ raise HTTPException(status_code=404, detail="No images available in this course")
493
+
494
+ @app.get("/middleware/next/any")
495
+ async def get_next_any_file(requester_id: str):
496
+ """Get next available file of any type"""
497
+ file_id = middleware_state.get_next_available_file(requester_id)
498
+ if not file_id:
499
+ raise HTTPException(status_code=404, detail="No files available")
500
+
501
+ file_state = middleware_state.files[file_id]
502
+ return {
503
+ "file_id": file_id,
504
+ "file_path": file_state.path,
505
+ "lock_time": file_state.lock_time,
506
+ "type": "course" if file_id.startswith("course:") else "image"
507
+ }
508
+
509
+ @app.post("/middleware/release/course/{course_id}")
510
+ async def release_course(course_id: str, requester_id: str):
511
+ """Release a course lock"""
512
+ file_id = f"course:{course_id}"
513
+ if not middleware_state.release_lock(file_id, requester_id):
514
+ raise HTTPException(status_code=403, detail="Not lock holder")
515
+ return {"status": "ok"}
516
+
517
+ @app.post("/middleware/release/frame/{course_folder}/{video_name}/{frame_id}")
518
+ async def release_frame(course_folder: str, video_name: str, frame_id: str, requester_id: str):
519
+ """Release a frame image lock"""
520
+ full_id = f"frame:{course_folder}/{video_name}/{frame_id}"
521
+ if not middleware_state.release_lock(full_id, requester_id):
522
+ raise HTTPException(status_code=403, detail="Not lock holder")
523
+ return {"status": "ok"}
524
+
525
+ @app.post("/middleware/release/{file_id}")
526
+ async def release_file(file_id: str, requester_id: str):
527
+ """Release any file lock (backward compatibility)"""
528
+ if not middleware_state.release_lock(file_id, requester_id):
529
+ raise HTTPException(status_code=403, detail="Not lock holder")
530
+ return {"status": "ok"}
531
+
532
+ @app.get("/middleware/stream/{file_id}")
533
+ async def stream_file(file_id: str, requester_id: str):
534
+ """Stream a file to client"""
535
+ if file_id not in middleware_state.files:
536
+ raise HTTPException(status_code=404, detail="File not found")
537
+
538
+ file_state = middleware_state.files[file_id]
539
+ if not file_state.locked or file_state.lock_holder != requester_id:
540
+ raise HTTPException(status_code=403, detail="Not lock holder")
541
+
542
+ if not os.path.exists(file_state.path):
543
+ raise HTTPException(status_code=404, detail="File not found on disk")
544
+
545
+ async def file_stream():
546
+ async with aiofiles.open(file_state.path, 'rb') as f:
547
+ while chunk := await f.read(CHUNK_SIZE):
548
+ yield chunk
549
+ # Auto-release lock after successful transfer
550
+ middleware_state.release_lock(file_id, requester_id)
551
+
552
+ return StreamingResponse(
553
+ file_stream(),
554
+ media_type="application/octet-stream",
555
+ headers={
556
+ "Content-Disposition": f"attachment; filename={file_id}"
557
+ }
558
+ )
559
+
560
+ # ==== STARTUP EVENT ====
561
+
562
+ @app.on_event("startup")
563
+ async def startup_event():
564
+ """Run the processing loop in the background when the API starts"""
565
+ global processing_thread
566
+ logger.info("Starting up Unified MP4 Processing & Distribution API...")
567
+
568
+ if not (processing_thread and processing_thread.is_alive()):
569
+ logger.info("πŸš€ Starting background processing thread...")
570
+ processing_thread = threading.Thread(target=process_hf_files_background)
571
+ processing_thread.daemon = True
572
+ processing_thread.start()
573
+
574
+ if __name__ == "__main__":
575
+ import uvicorn
576
  uvicorn.run(app, host="0.0.0.0", port=8000)
frame_extractor.py ADDED
@@ -0,0 +1,55 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import cv2
2
+ import multiprocessing
3
+ from pathlib import Path
4
+ import numpy as np
5
+ from typing import List, Tuple
6
+ import os
7
+
8
+ def extract_frames_from_video(args: Tuple[str, str, int]):
9
+ """
10
+ Extract frames from a video file at specified FPS
11
+ args: (input_video_path, output_folder, target_fps)
12
+ """
13
+ video_path, output_folder, target_fps = args
14
+ video_name = Path(video_path).stem
15
+
16
+ # Create output folder
17
+ frames_dir = Path(output_folder) / video_name
18
+ frames_dir.mkdir(parents=True, exist_ok=True)
19
+
20
+ try:
21
+ # Open video file
22
+ cap = cv2.VideoCapture(str(video_path))
23
+ if not cap.isOpened():
24
+ print(f"Error: Could not open video {video_path}")
25
+ return
26
+
27
+ # Get video properties
28
+ original_fps = cap.get(cv2.CAP_PROP_FPS)
29
+ total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
30
+
31
+ # Calculate frame extraction interval
32
+ interval = int(original_fps / target_fps)
33
+
34
+ frame_count = 0
35
+ saved_count = 0
36
+
37
+ while True:
38
+ ret, frame = cap.read()
39
+ if not ret:
40
+ break
41
+
42
+ # Extract frame at target FPS
43
+ if frame_count % interval == 0:
44
+ frame_path = frames_dir / f"frame_{saved_count:06d}.jpg"
45
+ cv2.imwrite(str(frame_path), frame)
46
+ saved_count += 1
47
+
48
+ frame_count += 1
49
+
50
+ cap.release()
51
+ return saved_count
52
+
53
+ except Exception as e:
54
+ print(f"Error processing {video_path}: {str(e)}")
55
+ return 0
processing_logic.py CHANGED
@@ -1,373 +1,398 @@
1
- import os
2
- import json
3
- import requests
4
- import subprocess
5
- import shutil
6
- import time
7
- import threading
8
- from typing import Dict, List, Optional
9
- from pathlib import Path
10
- from huggingface_hub import HfApi
11
- import uuid
12
-
13
- # ==== CONFIGURATION ====
14
- HF_TOKEN = os.getenv("HF_TOKEN", "")
15
- SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1")
16
-
17
- # Directory Configuration
18
- UPLOAD_DIRECTORY = "./uploads"
19
- DOWNLOAD_FOLDER = "./downloads"
20
- EXTRACT_FOLDER = "./extracted"
21
- MP4_OUTPUT_FOLDER = "./mp4_files"
22
-
23
- # Create directories
24
- for directory in [UPLOAD_DIRECTORY, DOWNLOAD_FOLDER, EXTRACT_FOLDER, MP4_OUTPUT_FOLDER]:
25
- os.makedirs(directory, exist_ok=True)
26
-
27
- # State Files
28
- DOWNLOAD_STATE_FILE = "download_progress.json"
29
- PROCESS_STATE_FILE = "process_progress.json"
30
- FAILED_FILES_LOG = "failed_files.log"
31
-
32
- # Processing Parameters
33
- MAX_RETRIES = 3
34
- MIN_FREE_SPACE_GB = 2
35
- DEFAULT_RAR_LIMIT = 5 # Default number of RAR files to process
36
-
37
- # Initialize HF API
38
- hf_api = HfApi(token=HF_TOKEN) if HF_TOKEN else None
39
-
40
- # Global State
41
- processing_status = {
42
- "is_running": False,
43
- "current_file": None,
44
- "total_files": 0,
45
- "processed_files": 0,
46
- "failed_files": 0,
47
- "extracted_courses": 0,
48
- "extracted_mp4s": 0,
49
- "last_update": None,
50
- "logs": []
51
- }
52
-
53
- # Store for uploaded MP4s with metadata (this will be managed by the API part, but needs to be accessible)
54
- uploaded_mp4s = {}
55
-
56
- def log_message(message: str):
57
- """Log messages with timestamp"""
58
- timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
59
- log_entry = f"[{timestamp}] {message}"
60
- print(log_entry)
61
- processing_status["logs"].append(log_entry)
62
- processing_status["last_update"] = timestamp
63
- if len(processing_status["logs"]) > 100:
64
- processing_status["logs"] = processing_status["logs"][-100:]
65
-
66
- def log_failed_file(filename: str, error: str):
67
- """Log failed files to persistent file"""
68
- timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
69
- with open(FAILED_FILES_LOG, "a") as f:
70
- f.write(f"{timestamp} - {filename}: {error}\n")
71
-
72
- def get_disk_usage(path: str) -> Dict[str, float]:
73
- """Get disk usage statistics in GB"""
74
- statvfs = os.statvfs(path)
75
- total = statvfs.f_frsize * statvfs.f_blocks / (1024**3)
76
- free = statvfs.f_frsize * statvfs.f_bavail / (1024**3)
77
- used = total - free
78
- return {"total": total, "free": free, "used": used}
79
-
80
- def check_disk_space(path: str = ".") -> bool:
81
- """Check if there\'s enough disk space"""
82
- disk_info = get_disk_usage(path)
83
- if disk_info["free"] < MIN_FREE_SPACE_GB:
84
- log_message(f'⚠️ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used')
85
- return False
86
- return True
87
-
88
- def cleanup_temp_files():
89
- """Clean up temporary files to free space"""
90
- log_message("🧹 Cleaning up temporary files...")
91
-
92
- # Clean old downloads (keep only current processing file)
93
- current_file = processing_status.get("current_file")
94
- for file in os.listdir(DOWNLOAD_FOLDER):
95
- if file != current_file and file.endswith((".rar", ".zip")):
96
- try:
97
- os.remove(os.path.join(DOWNLOAD_FOLDER, file))
98
- log_message(f"πŸ—‘οΈ Removed old download: {file}")
99
- except:
100
- pass
101
-
102
- def load_json_state(file_path: str, default_value):
103
- """Load state from JSON file"""
104
- if os.path.exists(file_path):
105
- try:
106
- with open(file_path, "r") as f:
107
- return json.load(f)
108
- except json.JSONDecodeError:
109
- log_message(f"⚠️ Corrupted state file: {file_path}")
110
- return default_value
111
-
112
- def save_json_state(file_path: str, data):
113
- """Save state to JSON file"""
114
- with open(file_path, "w") as f:
115
- json.dump(data, f, indent=2)
116
-
117
- def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool:
118
- """Download file with retry logic and disk space checking"""
119
- if not check_disk_space():
120
- cleanup_temp_files()
121
- if not check_disk_space():
122
- log_message("❌ Insufficient disk space even after cleanup")
123
- return False
124
-
125
- headers = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {}
126
- for attempt in range(max_retries):
127
- try:
128
- with requests.get(url, headers=headers, stream=True) as r:
129
- r.raise_for_status()
130
-
131
- # Check content length if available
132
- content_length = r.headers.get("content-length")
133
- if content_length:
134
- size_gb = int(content_length) / (1024**3)
135
- disk_info = get_disk_usage(".")
136
- if size_gb > disk_info["free"] - 0.5: # Leave 0.5GB buffer
137
- log_message(f'❌ File too large: {size_gb:.2f}GB, only {disk_info["free"]:.2f}GB free')
138
- return False
139
-
140
- with open(dest_path, "wb") as f:
141
- for chunk in r.iter_content(chunk_size=8192):
142
- f.write(chunk)
143
- return True
144
- except Exception as e:
145
- if attempt < max_retries - 1:
146
- time.sleep(2 ** attempt)
147
- continue
148
- log_message(f"❌ Download failed after {max_retries} attempts: {e}")
149
- return False
150
- return False
151
-
152
- def is_multipart_rar(filename: str) -> bool:
153
- """Check if this is a multi-part RAR file"""
154
- return ".part" in filename.lower() and filename.lower().endswith(".rar")
155
-
156
- def get_rar_part_base(filename: str) -> str:
157
- """Get the base name for multi-part RAR files"""
158
- if ".part" in filename.lower():
159
- return filename.split(".part")[0]
160
- return filename.replace(".rar", "")
161
-
162
- def extract_with_retry(rar_path: str, output_dir: str, max_retries: int = 2) -> bool:
163
- """Extract RAR with retry and recovery, handling multi-part archives"""
164
- filename = os.path.basename(rar_path)
165
-
166
- # For multi-part RARs, we need the first part
167
- if is_multipart_rar(filename):
168
- base_name = get_rar_part_base(filename)
169
- first_part = f"{base_name}.part01.rar"
170
- first_part_path = os.path.join(os.path.dirname(rar_path), first_part)
171
-
172
- if not os.path.exists(first_part_path):
173
- log_message(f"⚠️ Multi-part RAR detected but first part not found: {first_part}")
174
- return False
175
-
176
- rar_path = first_part_path
177
- log_message(f"πŸ“¦ Processing multi-part RAR starting with: {first_part}")
178
-
179
- for attempt in range(max_retries):
180
- try:
181
- # Test RAR first
182
- test_cmd = ["unrar", "t", rar_path]
183
- test_result = subprocess.run(test_cmd, capture_output=True, text=True)
184
- if test_result.returncode != 0:
185
- log_message(f"⚠️ RAR test failed: {test_result.stderr}")
186
- if attempt == max_retries - 1:
187
- return False
188
- continue
189
-
190
- # Extract RAR
191
- cmd = ["unrar", "x", "-o+", rar_path, output_dir]
192
- if attempt > 0: # Try recovery on subsequent attempts
193
- cmd.insert(2, "-kb")
194
-
195
- result = subprocess.run(cmd, capture_output=True, text=True)
196
- if result.returncode == 0:
197
- log_message(f"βœ… Successfully extracted: {os.path.basename(rar_path)}")
198
- return True
199
- else:
200
- error_msg = result.stderr or result.stdout
201
- log_message(f"⚠️ Extraction attempt {attempt + 1} failed: {error_msg}")
202
-
203
- except Exception as e:
204
- log_message(f"❌ Extraction exception: {str(e)}")
205
- if attempt == max_retries - 1:
206
- return False
207
- time.sleep(1)
208
-
209
- return False
210
-
211
- def process_rar_file(rar_path: str) -> List[Dict]:
212
- """Process a single RAR file - extract and find MP4 files"""
213
- filename = os.path.basename(rar_path)
214
- processing_status["current_file"] = filename
215
-
216
- # Handle multi-part RAR naming
217
- if is_multipart_rar(filename):
218
- course_name = get_rar_part_base(filename)
219
- else:
220
- course_name = filename.replace(".rar", "")
221
-
222
- # Create a unique directory for this course's extracted MP4s
223
- course_mp4_output_dir = os.path.join(MP4_OUTPUT_FOLDER, course_name)
224
- os.makedirs(course_mp4_output_dir, exist_ok=True)
225
-
226
- extract_dir = os.path.join(EXTRACT_FOLDER, course_name)
227
- mp4_files = []
228
-
229
- try:
230
- log_message(f"πŸ”„ Processing: {filename}")
231
-
232
- # Clean up any existing directory
233
- if os.path.exists(extract_dir):
234
- shutil.rmtree(extract_dir, ignore_errors=True)
235
-
236
- # Extract RAR
237
- os.makedirs(extract_dir, exist_ok=True)
238
- if not extract_with_retry(rar_path, extract_dir):
239
- raise Exception("RAR extraction failed")
240
-
241
- # Find and copy MP4 files
242
- for root, dirs, files in os.walk(extract_dir):
243
- for file in files:
244
- if file.lower().endswith(".mp4"):
245
- source_path = os.path.join(root, file)
246
- # Use original filename for MP4 output within the course directory
247
- dest_path = os.path.join(course_mp4_output_dir, file)
248
-
249
- try:
250
- shutil.copy2(source_path, dest_path)
251
- mp4_files.append({
252
- "id": os.path.join(course_name, file), # Store as course_name/filename.mp4
253
- "original_name": file,
254
- "course_name": course_name,
255
- "size": os.path.getsize(dest_path),
256
- "created_at": time.strftime("%Y-%m-%d %H:%M:%S")
257
- })
258
- log_message(f"βœ… Extracted MP4: {file} -> {os.path.join(course_name, file)}")
259
- except Exception as e:
260
- log_message(f"❌ Failed to copy MP4 {file}: {e}")
261
-
262
- processing_status["extracted_courses"] += 1
263
- processing_status["extracted_mp4s"] += len(mp4_files)
264
- log_message(f"βœ… Successfully processed '{course_name}' - found {len(mp4_files)} MP4 files")
265
-
266
- return mp4_files
267
-
268
- except Exception as e:
269
- error_msg = str(e)
270
- log_message(f"❌ Processing failed: {error_msg}")
271
- log_failed_file(filename, error_msg)
272
- return []
273
-
274
- finally:
275
- processing_status["current_file"] = None
276
- # Clean up extracted directory
277
- if os.path.exists(extract_dir):
278
- shutil.rmtree(extract_dir, ignore_errors=True)
279
-
280
- def process_hf_files_background(start_index: int = 5, limit: int = DEFAULT_RAR_LIMIT):
281
- """Background task to process HuggingFace files"""
282
- if not hf_api:
283
- log_message("❌ HuggingFace API not configured (missing HF_TOKEN)")
284
- return
285
-
286
- processing_status["is_running"] = True
287
-
288
- try:
289
- # Load state
290
- processed_rars = load_json_state(PROCESS_STATE_FILE, {"processed_rars": []})["processed_rars"]
291
- download_state = load_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": 0})
292
-
293
- # Use start_index if provided, otherwise use the saved state
294
- current_index = start_index if start_index > 0 else download_state["next_download_index"]
295
-
296
- log_message(f"πŸ“Š Starting processing from index {current_index} with a limit of {limit} files.")
297
- log_message(f"πŸ“Š Previously processed: {len(processed_rars)} files")
298
-
299
- # Get file list
300
- try:
301
- files = list(hf_api.list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset"))
302
- rar_files = sorted([f for f in files if f.endswith(".rar")])
303
-
304
- processing_status["total_files"] = len(rar_files)
305
- log_message(f"πŸ“ Found {len(rar_files)} RAR files in repository")
306
-
307
- if current_index >= len(rar_files):
308
- log_message("βœ… All files have been processed!")
309
- return
310
-
311
- except Exception as e:
312
- log_message(f"❌ Failed to get file list: {str(e)}")
313
- return
314
-
315
- processed_count = 0
316
- while processed_count < limit and current_index < len(rar_files) and processing_status["is_running"]:
317
- rar_file = rar_files[current_index]
318
- filename = os.path.basename(rar_file)
319
-
320
- if filename in processed_rars:
321
- log_message(f"⏭️ Skipping already processed: {filename}")
322
- processing_status["processed_files"] += 1
323
- current_index += 1
324
- save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": current_index})
325
- continue
326
-
327
- log_message(f"πŸ“₯ Downloading: {filename}")
328
- dest_path = os.path.join(DOWNLOAD_FOLDER, filename)
329
-
330
- # Download file
331
- download_url = f"https://huggingface.co/datasets/{SOURCE_REPO_ID}/resolve/main/{rar_file}"
332
- if download_with_retry(download_url, dest_path):
333
- # Process file
334
- mp4_files = process_rar_file(dest_path)
335
- if mp4_files:
336
- processed_rars.append(filename)
337
- save_json_state(PROCESS_STATE_FILE, {"processed_rars": processed_rars})
338
- log_message(f"βœ… Successfully processed: {filename}")
339
- processing_status["processed_files"] += 1
340
- else:
341
- log_message(f"❌ Failed to process: {filename}")
342
- processing_status["failed_files"] += 1
343
-
344
- # Clean up downloaded file
345
- try:
346
- os.remove(dest_path)
347
- log_message(f"πŸ—‘οΈ Cleaned up download: {filename}")
348
- except:
349
- pass
350
- else:
351
- log_message(f"❌ Failed to download: {filename}")
352
- processing_status["failed_files"] += 1
353
-
354
- # Update download state for next run
355
- current_index += 1
356
- processed_count += 1
357
- save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": current_index})
358
-
359
- if current_index >= len(rar_files):
360
- log_message("πŸŽ‰ All available RAR files have been processed!")
361
- elif not processing_status["is_running"]:
362
- log_message("⏹️ Processing stopped by request.")
363
- else:
364
- log_message(f"βœ… Processed {processed_count} RAR files. Next index to process: {current_index}")
365
-
366
- except Exception as e:
367
- log_message(f"❌ Fatal error in background processing: {str(e)}")
368
- finally:
369
- processing_status["is_running"] = False
370
- cleanup_temp_files()
371
-
372
-
373
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import requests
4
+ import subprocess
5
+ import shutil
6
+ import time
7
+ import threading
8
+ import multiprocessing
9
+ from typing import Dict, List, Optional
10
+ from pathlib import Path
11
+ from huggingface_hub import HfApi
12
+ import uuid
13
+ import frame_extractor # Our frame extraction module
14
+
15
+ # ==== CONFIGURATION ====
16
+ HF_TOKEN = os.getenv("HF_TOKEN", "")
17
+ SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1")
18
+
19
+ # Directory Configuration
20
+ UPLOAD_DIRECTORY = "./uploads"
21
+ DOWNLOAD_FOLDER = "./downloads"
22
+ EXTRACT_FOLDER = "./extracted"
23
+ MP4_OUTPUT_FOLDER = "./mp4_files"
24
+
25
+ # Create directories
26
+ for directory in [UPLOAD_DIRECTORY, DOWNLOAD_FOLDER, EXTRACT_FOLDER, MP4_OUTPUT_FOLDER]:
27
+ os.makedirs(directory, exist_ok=True)
28
+
29
+ # State Files
30
+ DOWNLOAD_STATE_FILE = "download_progress.json"
31
+ PROCESS_STATE_FILE = "process_progress.json"
32
+ FAILED_FILES_LOG = "failed_files.log"
33
+
34
+ # Processing Parameters
35
+ MAX_RETRIES = 3
36
+ MIN_FREE_SPACE_GB = 2
37
+ DEFAULT_RAR_LIMIT = 2 # Default number of RAR files to process
38
+
39
+ # Initialize HF API
40
+ hf_api = HfApi(token=HF_TOKEN) if HF_TOKEN else None
41
+
42
+ # Global State
43
+ processing_status = {
44
+ "is_running": False,
45
+ "current_file": None,
46
+ "total_files": 0,
47
+ "processed_files": 0,
48
+ "failed_files": 0,
49
+ "extracted_courses": 0,
50
+ "extracted_mp4s": 0,
51
+ "last_update": None,
52
+ "logs": []
53
+ }
54
+
55
+ # Store for uploaded MP4s with metadata (this will be managed by the API part, but needs to be accessible)
56
+ uploaded_mp4s = {}
57
+
58
+ def log_message(message: str):
59
+ """Log messages with timestamp"""
60
+ timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
61
+ log_entry = f"[{timestamp}] {message}"
62
+ print(log_entry)
63
+ processing_status["logs"].append(log_entry)
64
+ processing_status["last_update"] = timestamp
65
+ if len(processing_status["logs"]) > 100:
66
+ processing_status["logs"] = processing_status["logs"][-100:]
67
+
68
+ def log_failed_file(filename: str, error: str):
69
+ """Log failed files to persistent file"""
70
+ timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
71
+ with open(FAILED_FILES_LOG, "a") as f:
72
+ f.write(f"{timestamp} - {filename}: {error}\n")
73
+
74
+ def get_disk_usage(path: str) -> Dict[str, float]:
75
+ """Get disk usage statistics in GB"""
76
+ statvfs = os.statvfs(path)
77
+ total = statvfs.f_frsize * statvfs.f_blocks / (1024**3)
78
+ free = statvfs.f_frsize * statvfs.f_bavail / (1024**3)
79
+ used = total - free
80
+ return {"total": total, "free": free, "used": used}
81
+
82
+ def check_disk_space(path: str = ".") -> bool:
83
+ """Check if there\'s enough disk space"""
84
+ disk_info = get_disk_usage(path)
85
+ if disk_info["free"] < MIN_FREE_SPACE_GB:
86
+ log_message(f'⚠️ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used')
87
+ return False
88
+ return True
89
+
90
+ def cleanup_temp_files():
91
+ """Clean up temporary files to free space"""
92
+ log_message("🧹 Cleaning up temporary files...")
93
+
94
+ # Clean old downloads (keep only current processing file)
95
+ current_file = processing_status.get("current_file")
96
+ for file in os.listdir(DOWNLOAD_FOLDER):
97
+ if file != current_file and file.endswith((".rar", ".zip")):
98
+ try:
99
+ os.remove(os.path.join(DOWNLOAD_FOLDER, file))
100
+ log_message(f"πŸ—‘οΈ Removed old download: {file}")
101
+ except:
102
+ pass
103
+
104
+ def load_json_state(file_path: str, default_value):
105
+ """Load state from JSON file"""
106
+ if os.path.exists(file_path):
107
+ try:
108
+ with open(file_path, "r") as f:
109
+ return json.load(f)
110
+ except json.JSONDecodeError:
111
+ log_message(f"⚠️ Corrupted state file: {file_path}")
112
+ return default_value
113
+
114
+ def save_json_state(file_path: str, data):
115
+ """Save state to JSON file"""
116
+ with open(file_path, "w") as f:
117
+ json.dump(data, f, indent=2)
118
+
119
+ def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool:
120
+ """Download file with retry logic and disk space checking"""
121
+ if not check_disk_space():
122
+ cleanup_temp_files()
123
+ if not check_disk_space():
124
+ log_message("❌ Insufficient disk space even after cleanup")
125
+ return False
126
+
127
+ headers = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {}
128
+ for attempt in range(max_retries):
129
+ try:
130
+ with requests.get(url, headers=headers, stream=True) as r:
131
+ r.raise_for_status()
132
+
133
+ # Check content length if available
134
+ content_length = r.headers.get("content-length")
135
+ if content_length:
136
+ size_gb = int(content_length) / (1024**3)
137
+ disk_info = get_disk_usage(".")
138
+ if size_gb > disk_info["free"] - 0.5: # Leave 0.5GB buffer
139
+ log_message(f'❌ File too large: {size_gb:.2f}GB, only {disk_info["free"]:.2f}GB free')
140
+ return False
141
+
142
+ with open(dest_path, "wb") as f:
143
+ for chunk in r.iter_content(chunk_size=8192):
144
+ f.write(chunk)
145
+ return True
146
+ except Exception as e:
147
+ if attempt < max_retries - 1:
148
+ time.sleep(2 ** attempt)
149
+ continue
150
+ log_message(f"❌ Download failed after {max_retries} attempts: {e}")
151
+ return False
152
+ return False
153
+
154
+ def is_multipart_rar(filename: str) -> bool:
155
+ """Check if this is a multi-part RAR file"""
156
+ return ".part" in filename.lower() and filename.lower().endswith(".rar")
157
+
158
+ def get_rar_part_base(filename: str) -> str:
159
+ """Get the base name for multi-part RAR files"""
160
+ if ".part" in filename.lower():
161
+ return filename.split(".part")[0]
162
+ return filename.replace(".rar", "")
163
+
164
+ def extract_with_retry(rar_path: str, output_dir: str, max_retries: int = 2) -> bool:
165
+ """Extract RAR with retry and recovery, handling multi-part archives"""
166
+ filename = os.path.basename(rar_path)
167
+
168
+ # For multi-part RARs, we need the first part
169
+ if is_multipart_rar(filename):
170
+ base_name = get_rar_part_base(filename)
171
+ first_part = f"{base_name}.part01.rar"
172
+ first_part_path = os.path.join(os.path.dirname(rar_path), first_part)
173
+
174
+ if not os.path.exists(first_part_path):
175
+ log_message(f"⚠️ Multi-part RAR detected but first part not found: {first_part}")
176
+ return False
177
+
178
+ rar_path = first_part_path
179
+ log_message(f"πŸ“¦ Processing multi-part RAR starting with: {first_part}")
180
+
181
+ for attempt in range(max_retries):
182
+ try:
183
+ # Test RAR first
184
+ test_cmd = ["unrar", "t", rar_path]
185
+ test_result = subprocess.run(test_cmd, capture_output=True, text=True)
186
+ if test_result.returncode != 0:
187
+ log_message(f"⚠️ RAR test failed: {test_result.stderr}")
188
+ if attempt == max_retries - 1:
189
+ return False
190
+ continue
191
+
192
+ # Extract RAR
193
+ cmd = ["unrar", "x", "-o+", rar_path, output_dir]
194
+ if attempt > 0: # Try recovery on subsequent attempts
195
+ cmd.insert(2, "-kb")
196
+
197
+ result = subprocess.run(cmd, capture_output=True, text=True)
198
+ if result.returncode == 0:
199
+ log_message(f"βœ… Successfully extracted: {os.path.basename(rar_path)}")
200
+ return True
201
+ else:
202
+ error_msg = result.stderr or result.stdout
203
+ log_message(f"⚠️ Extraction attempt {attempt + 1} failed: {error_msg}")
204
+
205
+ except Exception as e:
206
+ log_message(f"❌ Extraction exception: {str(e)}")
207
+ if attempt == max_retries - 1:
208
+ return False
209
+ time.sleep(1)
210
+
211
+ return False
212
+
213
+ def process_rar_file(rar_path: str) -> List[Dict]:
214
+ """Process a single RAR file - extract and find MP4 files"""
215
+ filename = os.path.basename(rar_path)
216
+ processing_status["current_file"] = filename
217
+
218
+ # Handle multi-part RAR naming
219
+ if is_multipart_rar(filename):
220
+ course_name = get_rar_part_base(filename)
221
+ else:
222
+ course_name = filename.replace(".rar", "")
223
+
224
+ # Create a unique directory for this course's extracted MP4s
225
+ course_mp4_output_dir = os.path.join(MP4_OUTPUT_FOLDER, course_name)
226
+ os.makedirs(course_mp4_output_dir, exist_ok=True)
227
+
228
+ extract_dir = os.path.join(EXTRACT_FOLDER, course_name)
229
+ mp4_files = []
230
+
231
+ try:
232
+ log_message(f"πŸ”„ Processing: {filename}")
233
+
234
+ # Clean up any existing directory
235
+ if os.path.exists(extract_dir):
236
+ shutil.rmtree(extract_dir, ignore_errors=True)
237
+
238
+ # Extract RAR
239
+ os.makedirs(extract_dir, exist_ok=True)
240
+ if not extract_with_retry(rar_path, extract_dir):
241
+ raise Exception("RAR extraction failed")
242
+
243
+ # Find and copy MP4 files
244
+ for root, dirs, files in os.walk(extract_dir):
245
+ for file in files:
246
+ if file.lower().endswith(".mp4"):
247
+ source_path = os.path.join(root, file)
248
+ # Use original filename for MP4 output within the course directory
249
+ dest_path = os.path.join(course_mp4_output_dir, file)
250
+
251
+ try:
252
+ shutil.copy2(source_path, dest_path)
253
+ file_info = {
254
+ "id": os.path.join(course_name, file),
255
+ "original_name": file,
256
+ "course_name": course_name,
257
+ "size": os.path.getsize(dest_path),
258
+ "path": dest_path,
259
+ "created_at": time.strftime("%Y-%m-%d %H:%M:%S")
260
+ }
261
+ mp4_files.append(file_info)
262
+ log_message(f"βœ… Extracted MP4: {file} -> {os.path.join(course_name, file)}")
263
+ except Exception as e:
264
+ log_message(f"❌ Failed to copy MP4 {file}: {e}")
265
+
266
+ # Process frame extraction for all MP4s in parallel
267
+ if mp4_files:
268
+ log_message(f"🎞️ Starting frame extraction for {len(mp4_files)} MP4 files...")
269
+
270
+ # Create frames directory for this course
271
+ frames_dir = os.path.join(MP4_OUTPUT_FOLDER, f"{course_name}_frames")
272
+ os.makedirs(frames_dir, exist_ok=True)
273
+
274
+ # Prepare arguments for frame extraction
275
+ extraction_args = [
276
+ (mp4["path"], frames_dir, 10) # 10 FPS
277
+ for mp4 in mp4_files
278
+ ]
279
+
280
+ # Use multiprocessing for frame extraction
281
+ cpu_count = multiprocessing.cpu_count()
282
+ with multiprocessing.Pool(processes=cpu_count) as pool:
283
+ results = pool.map(frame_extractor.extract_frames_from_video, extraction_args)
284
+
285
+ # Log frame extraction results
286
+ total_frames = sum(count for count in results if count is not None)
287
+ log_message(f"🎞️ Extracted {total_frames} frames from {len(mp4_files)} videos using {cpu_count} CPU cores")
288
+
289
+ processing_status["extracted_courses"] += 1
290
+ processing_status["extracted_mp4s"] += len(mp4_files)
291
+ log_message(f"βœ… Successfully processed '{course_name}' - found {len(mp4_files)} MP4 files")
292
+
293
+ return mp4_files
294
+
295
+ except Exception as e:
296
+ error_msg = str(e)
297
+ log_message(f"❌ Processing failed: {error_msg}")
298
+ log_failed_file(filename, error_msg)
299
+ return []
300
+
301
+ finally:
302
+ processing_status["current_file"] = None
303
+ # Clean up extracted directory
304
+ if os.path.exists(extract_dir):
305
+ shutil.rmtree(extract_dir, ignore_errors=True)
306
+
307
+ def process_hf_files_background(start_index: int = 5, limit: int = DEFAULT_RAR_LIMIT):
308
+ """Background task to process HuggingFace files"""
309
+ if not hf_api:
310
+ log_message("❌ HuggingFace API not configured (missing HF_TOKEN)")
311
+ return
312
+
313
+ processing_status["is_running"] = True
314
+
315
+ try:
316
+ # Load state
317
+ processed_rars = load_json_state(PROCESS_STATE_FILE, {"processed_rars": []})["processed_rars"]
318
+ download_state = load_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": 0})
319
+
320
+ # Use start_index if provided, otherwise use the saved state
321
+ current_index = start_index if start_index > 0 else download_state["next_download_index"]
322
+
323
+ log_message(f"πŸ“Š Starting processing from index {current_index} with a limit of {limit} files.")
324
+ log_message(f"πŸ“Š Previously processed: {len(processed_rars)} files")
325
+
326
+ # Get file list
327
+ try:
328
+ files = list(hf_api.list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset"))
329
+ rar_files = sorted([f for f in files if f.endswith(".rar")])
330
+
331
+ processing_status["total_files"] = len(rar_files)
332
+ log_message(f"πŸ“ Found {len(rar_files)} RAR files in repository")
333
+
334
+ if current_index >= len(rar_files):
335
+ log_message("βœ… All files have been processed!")
336
+ return
337
+
338
+ except Exception as e:
339
+ log_message(f"❌ Failed to get file list: {str(e)}")
340
+ return
341
+
342
+ processed_count = 0
343
+ while processed_count < limit and current_index < len(rar_files) and processing_status["is_running"]:
344
+ rar_file = rar_files[current_index]
345
+ filename = os.path.basename(rar_file)
346
+
347
+ if filename in processed_rars:
348
+ log_message(f"⏭️ Skipping already processed: {filename}")
349
+ processing_status["processed_files"] += 1
350
+ current_index += 1
351
+ save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": current_index})
352
+ continue
353
+
354
+ log_message(f"πŸ“₯ Downloading: {filename}")
355
+ dest_path = os.path.join(DOWNLOAD_FOLDER, filename)
356
+
357
+ # Download file
358
+ download_url = f"https://huggingface.co/datasets/{SOURCE_REPO_ID}/resolve/main/{rar_file}"
359
+ if download_with_retry(download_url, dest_path):
360
+ # Process file
361
+ mp4_files = process_rar_file(dest_path)
362
+ if mp4_files:
363
+ processed_rars.append(filename)
364
+ save_json_state(PROCESS_STATE_FILE, {"processed_rars": processed_rars})
365
+ log_message(f"βœ… Successfully processed: {filename}")
366
+ processing_status["processed_files"] += 1
367
+ else:
368
+ log_message(f"❌ Failed to process: {filename}")
369
+ processing_status["failed_files"] += 1
370
+
371
+ # Clean up downloaded file
372
+ try:
373
+ os.remove(dest_path)
374
+ log_message(f"πŸ—‘οΈ Cleaned up download: {filename}")
375
+ except:
376
+ pass
377
+ else:
378
+ log_message(f"❌ Failed to download: {filename}")
379
+ processing_status["failed_files"] += 1
380
+
381
+ # Update download state for next run
382
+ current_index += 1
383
+ processed_count += 1
384
+ save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": current_index})
385
+
386
+ if current_index >= len(rar_files):
387
+ log_message("πŸŽ‰ All available RAR files have been processed!")
388
+ elif not processing_status["is_running"]:
389
+ log_message("⏹️ Processing stopped by request.")
390
+ else:
391
+ log_message(f"βœ… Processed {processed_count} RAR files. Next index to process: {current_index}")
392
+
393
+ except Exception as e:
394
+ log_message(f"❌ Fatal error in background processing: {str(e)}")
395
+ finally:
396
+ processing_status["is_running"] = False
397
+ cleanup_temp_files()
398
+