samelias1 commited on
Commit
5d2f975
Β·
verified Β·
1 Parent(s): 0c46e3f

Update vision_analyzer.py

Browse files
Files changed (1) hide show
  1. vision_analyzer.py +757 -757
vision_analyzer.py CHANGED
@@ -1,757 +1,757 @@
1
- import os
2
- import json
3
- import requests
4
- import subprocess
5
- import shutil
6
- import time
7
- import re
8
- import threading
9
- from typing import Dict, List, Set, Optional, Any
10
- from huggingface_hub import HfApi, list_repo_files, CommitOperationAdd, hf_hub_download, hf_hub_url
11
-
12
- import cv2
13
- import numpy as np
14
- from pathlib import Path
15
- import smtplib
16
- from email.message import EmailMessage
17
-
18
- # ==== CONFIGURATION ====
19
- HF_TOKEN = os.getenv("HF_TOKEN", "")
20
- SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1")
21
- TARGET_REPO_ID = os.getenv("TARGET_REPO", "Fred808/BG3") # New target repo for uploads
22
-
23
- # Path Configuration
24
- DOWNLOAD_FOLDER = "downloads"
25
- EXTRACT_FOLDER = "extracted"
26
- FRAMES_OUTPUT_FOLDER = "extracted_frames"
27
- ZIP_OUTPUT_FOLDER = "zipped_frames" # New folder for zip files
28
- LOCAL_STATE_FOLDER = ".state" # Folder to temporarily store the downloaded state file
29
-
30
- os.makedirs(DOWNLOAD_FOLDER, exist_ok=True)
31
- os.makedirs(EXTRACT_FOLDER, exist_ok=True)
32
- os.makedirs(FRAMES_OUTPUT_FOLDER, exist_ok=True)
33
- os.makedirs(ZIP_OUTPUT_FOLDER, exist_ok=True) # Create zip output folder
34
- os.makedirs(LOCAL_STATE_FOLDER, exist_ok=True)
35
-
36
- # State Files
37
- FAILED_FILES_LOG = "failed_files.log"
38
- HF_STATE_FILE = "processing_state2.json" # New remote state file name
39
-
40
- # Processing Parameters
41
- CHUNK_SIZE = 2
42
- PROCESSING_DELAY = 2
43
- MAX_RETRIES = 3
44
- MIN_FREE_SPACE_GB = 2 # Minimum free space in GB before processing
45
-
46
- # Frame Extraction Parameters
47
- DEFAULT_FPS = 3 # Default frames per second for extraction
48
-
49
-
50
- # Initialize HF API
51
- hf_api = HfApi(token=HF_TOKEN)
52
-
53
- # Global State
54
- processing_status = {
55
- "is_running": False,
56
- "current_file": None,
57
- "total_files": 0,
58
- "processed_files": 0,
59
- "failed_files": 0,
60
- "extracted_courses": 0,
61
- "extracted_videos": 0,
62
- "last_update": None,
63
- "logs": []
64
- }
65
-
66
- def log_message(message: str, level: str = "INFO"):
67
- """Log messages with timestamp"""
68
- timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
69
- log_entry = f"[{timestamp}] {level}: {message}"
70
- print(log_entry)
71
- processing_status["logs"].append(log_entry)
72
- processing_status["last_update"] = timestamp
73
- if len(processing_status["logs"]) > 100:
74
- processing_status["logs"] = processing_status["logs"][-100:]
75
-
76
- def log_failed_file(filename: str, error: str):
77
- """Log failed files to persistent file"""
78
- with open(FAILED_FILES_LOG, "a") as f:
79
- f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {filename}: {error}\n")
80
-
81
- def get_disk_usage(path: str) -> Dict[str, float]:
82
- """Get disk usage statistics in GB"""
83
- statvfs = os.statvfs(path)
84
- total = statvfs.f_frsize * statvfs.f_blocks / (1024**3)
85
- free = statvfs.f_frsize * statvfs.f_bavail / (1024**3)
86
- used = total - free
87
- return {"total": total, "free": free, "used": used}
88
-
89
- def check_disk_space(path: str = ".") -> bool:
90
- """Check if there's enough disk space"""
91
- disk_info = get_disk_usage(path)
92
- if disk_info["free"] < MIN_FREE_SPACE_GB:
93
- log_message(f'⚠️ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used')
94
- return False
95
- return True
96
-
97
- def cleanup_temp_files():
98
- """Clean up temporary files to free space"""
99
- log_message("🧹 Cleaning up temporary files...", "INFO")
100
-
101
- # Clean old downloads (keep only current processing file)
102
- current_file = processing_status.get("current_file")
103
- for file in os.listdir(DOWNLOAD_FOLDER):
104
- if file != current_file and file.endswith((".rar", ".zip")):
105
- try:
106
- os.remove(os.path.join(DOWNLOAD_FOLDER, file))
107
- log_message(f"πŸ—‘οΈ Removed old download: {file}", "INFO")
108
- except:
109
- pass
110
-
111
- def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
112
- """Load state from JSON file with migration logic for new structure."""
113
- if os.path.exists(file_path):
114
- try:
115
- with open(file_path, "r") as f:
116
- data = json.load(f)
117
-
118
- # --- MIGRATION LOGIC ---
119
- # 1. Convert old "processed_rars" list to new "file_states" dictionary
120
- if "processed_rars" in data and isinstance(data["processed_rars"], list):
121
- log_message("ℹ️ Migrating old 'processed_rars' list to new 'file_states' dictionary.", "INFO")
122
- data["file_states"] = {
123
- filename: "processed" for filename in data.pop("processed_rars")
124
- }
125
-
126
- # 2. Ensure file_states exists and is a dict
127
- if "file_states" not in data or not isinstance(data["file_states"], dict):
128
- log_message("ℹ️ Initializing 'file_states' dictionary.", "INFO")
129
- data["file_states"] = {}
130
-
131
- # 3. Ensure next_download_index exists
132
- if "next_download_index" not in data:
133
- data["next_download_index"] = 0
134
-
135
- return data
136
- except json.JSONDecodeError:
137
- log_message(f"⚠️ Corrupted state file: {file_path}", "WARNING")
138
- return default_value
139
-
140
- def save_json_state(file_path: str, data: Dict[str, Any]):
141
- """Save state to JSON file"""
142
- with open(file_path, "w") as f:
143
- json.dump(data, f, indent=2)
144
-
145
- def download_hf_state(repo_id: str, filename: str) -> Dict[str, Any]:
146
- """Downloads the state file from Hugging Face or returns a default state."""
147
- local_path = os.path.join(LOCAL_STATE_FOLDER, filename)
148
- # Changed default state to use 'file_states' for the new structure
149
- default_state = {"next_download_index": 0, "file_states": {}}
150
-
151
- try:
152
- # Check if the file exists in the repo first
153
- files = hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset")
154
- if filename not in files:
155
- log_message(f"ℹ️ State file {filename} not found in {repo_id}. Starting from default state.", "INFO")
156
- return default_state
157
-
158
- # Download the file
159
- hf_hub_download(
160
- repo_id=repo_id,
161
- filename=filename,
162
- repo_type="dataset",
163
- local_dir=LOCAL_STATE_FOLDER,
164
- local_dir_use_symlinks=False
165
- )
166
-
167
- log_message(f"βœ… Successfully downloaded state file from {repo_id}.", "INFO")
168
- # Use the modified load_json_state which handles migration
169
- return load_json_state(local_path, default_state)
170
-
171
- except Exception as e:
172
- log_message(f"⚠️ Failed to download state file from Hugging Face: {str(e)}. Starting from default state.", "WARNING")
173
- return default_state
174
-
175
- def upload_hf_state(repo_id: str, filename: str, state: Dict[str, Any]) -> bool:
176
- """Uploads the state file to Hugging Face."""
177
- local_path = os.path.join(LOCAL_STATE_FOLDER, filename)
178
-
179
- try:
180
- # 1. Save the updated state locally
181
- save_json_state(local_path, state)
182
-
183
- # 2. Upload the file
184
- hf_api.upload_file(
185
- path_or_fileobj=local_path,
186
- path_in_repo=filename,
187
- repo_id=repo_id,
188
- repo_type="dataset",
189
- commit_message=f"Update processing state: next_index={state['next_download_index']}"
190
- )
191
- log_message(f"βœ… Successfully uploaded updated state file to {repo_id}", "INFO")
192
- return True
193
- except Exception as e:
194
- log_message(f"❌ Failed to upload state file to Hugging Face: {str(e)}", "ERROR")
195
- return False
196
-
197
- # --- NEW LOCKING FUNCTIONS ---
198
-
199
- def lock_file_for_processing(rar_filename: str, state: Dict[str, Any]) -> bool:
200
- """Marks a file as 'processing' in the state file and uploads the lock."""
201
- log_message(f"πŸ”’ Attempting to lock file: {rar_filename} (Marking as 'processing')", "INFO")
202
-
203
- # Update state locally
204
- state["file_states"][rar_filename] = "processing"
205
-
206
- # Upload the updated state file immediately to establish the lock
207
- if upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state):
208
- log_message(f"βœ… Successfully locked file: {rar_filename}", "INFO")
209
- return True
210
- else:
211
- log_message(f"❌ Failed to upload lock for file: {rar_filename}. Aborting processing.", "ERROR")
212
- # Revert local state to avoid confusion if upload failed
213
- if rar_filename in state["file_states"]:
214
- del state["file_states"][rar_filename]
215
- return False
216
-
217
- def unlock_file_as_processed(rar_filename: str, state: Dict[str, Any], next_index: int) -> bool:
218
- """Marks a file as 'processed', updates the index, and uploads the state."""
219
- log_message(f"πŸ”“ Attempting to unlock file: {rar_filename} (Marking as 'processed')", "INFO")
220
-
221
- # Update state locally
222
- state["file_states"][rar_filename] = "processed"
223
- state["next_download_index"] = next_index
224
-
225
- # Upload the updated state file
226
- if upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state):
227
- log_message(f"βœ… Successfully unlocked and marked as processed: {rar_filename}", "INFO")
228
- return True
229
- else:
230
- log_message(f"❌ Failed to upload final state for file: {rar_filename}. The file is processed locally but state is not updated.", "ERROR")
231
- return False
232
-
233
- # --- Original Utility Functions ---
234
-
235
- def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool:
236
- """Download file with retry logic and disk space checking"""
237
- if not check_disk_space():
238
- cleanup_temp_files()
239
- if not check_disk_space():
240
- log_message("❌ Insufficient disk space even after cleanup", "ERROR")
241
- return False
242
-
243
- # NEW FIX: Ensure the directory structure exists before attempting to write the file
244
- try:
245
- os.makedirs(os.path.dirname(dest_path), exist_ok=True)
246
- except Exception as e:
247
- log_message(f"❌ Failed to create directory for download path {os.path.dirname(dest_path)}: {str(e)}", "ERROR")
248
- return False
249
-
250
- headers = {"Authorization": f"Bearer {HF_TOKEN}"}
251
- for attempt in range(max_retries):
252
- try:
253
- with requests.get(url, headers=headers, stream=True) as r:
254
- r.raise_for_status()
255
-
256
- # Check content length if available
257
- content_length = r.headers.get("content-length")
258
- if content_length:
259
- size_gb = int(content_length) / (1024**3)
260
- disk_info = get_disk_usage(".")
261
- # Check if there is enough space for the full download
262
- if disk_info["free"] < size_gb + MIN_FREE_SPACE_GB:
263
- log_message(f"⚠️ Not enough space for download ({size_gb:.2f}GB required). Freeing space...", "WARNING")
264
- cleanup_temp_files()
265
- disk_info = get_disk_usage(".")
266
- if disk_info["free"] < size_gb + MIN_FREE_SPACE_GB:
267
- log_message(f"❌ Still not enough space for download. Required: {size_gb + MIN_FREE_SPACE_GB:.2f}GB, Available: {disk_info['free']:.2f}GB", "ERROR")
268
- return False
269
-
270
- # Download the file chunk by chunk
271
- with open(dest_path, "wb") as f:
272
- for chunk in r.iter_content(chunk_size=8192):
273
- if chunk: # filter out keep-alive new chunks
274
- f.write(chunk)
275
-
276
- log_message(f"βœ… Download successful: {dest_path}", "INFO")
277
- return True
278
-
279
- except requests.exceptions.RequestException as e:
280
- log_message(f"❌ Download attempt {attempt + 1} failed for {url}: {str(e)}", "WARNING")
281
- time.sleep(PROCESSING_DELAY)
282
- except Exception as e:
283
- log_message(f"❌ An unexpected error occurred during download: {str(e)}", "ERROR")
284
- return False
285
-
286
- log_message(f"❌ Failed to download {url} after {max_retries} attempts.", "ERROR")
287
- return False
288
-
289
- def extract_rar(rar_path: str, extract_path: str) -> bool:
290
- """Extracts a RAR file using unrar (requires unrar to be installed)."""
291
- log_message(f"πŸ“¦ Attempting to extract RAR: {rar_path} to {extract_path}", "INFO")
292
-
293
- # Helper to run a command and return (success, completed_process_or_exception)
294
- def _run(cmd):
295
- try:
296
- proc = subprocess.run(cmd, capture_output=True, text=True, check=True)
297
- return True, proc
298
- except subprocess.CalledProcessError as e:
299
- return False, e
300
-
301
- try:
302
- # Create the extraction directory if it doesn't exist
303
- os.makedirs(extract_path, exist_ok=True)
304
-
305
- # First try a normal full extraction
306
- command = ["unrar", "x", "-o+", "-y", rar_path, extract_path]
307
- ok, result = _run(command)
308
- if ok:
309
- # Successful full extraction
310
- if hasattr(result, 'stdout') and "All OK" not in result.stdout:
311
- log_message(f"⚠️ RAR extraction finished with warnings/non-fatal errors for {rar_path}: {result.stdout}", "WARNING")
312
- log_message(f"βœ… Successfully extracted RAR: {rar_path}", "INFO")
313
- return True
314
-
315
- # If full extraction failed, inspect the error to see if it's a multipart/volume dependency
316
- stderr = ''
317
- if isinstance(result, subprocess.CalledProcessError):
318
- stderr = (result.stderr or '')
319
-
320
- # Common message when a previous volume is required
321
- if "start extraction from a previous volume" in stderr.lower() or "previous volume" in stderr.lower() or "you need to start extraction" in stderr.lower():
322
- log_message(f"⚠️ Full extraction failed due to multipart volume dependency for {rar_path}. Will attempt per-file extraction fallback.", "WARNING")
323
-
324
- # Attempt to list files contained in this archive volume
325
- list_cmd = ["unrar", "lb", rar_path]
326
- ok_list, list_result = _run(list_cmd)
327
- if not ok_list:
328
- log_message(f"❌ Failed to list archive contents for {rar_path}: {(list_result.stderr if isinstance(list_result, subprocess.CalledProcessError) else str(list_result))}", "ERROR")
329
- return False
330
-
331
- file_list = [ln.strip() for ln in (list_result.stdout or '').splitlines() if ln.strip()]
332
- if not file_list:
333
- log_message(f"⚠️ Archive {rar_path} appears empty or listing failed. Cannot extract.", "WARNING")
334
- return False
335
-
336
- extracted_any = False
337
- # Try to extract each file individually; skip files that require previous volumes
338
- for member in file_list:
339
- # Use 'unrar x <archive> <member> <dest>' to extract a specific file
340
- cmd = ["unrar", "x", "-o+", "-y", rar_path, member, extract_path]
341
- ok_member, member_result = _run(cmd)
342
- if ok_member:
343
- extracted_any = True
344
- log_message(f"βœ… Extracted member {member} from {rar_path}", "INFO")
345
- else:
346
- # If this member failed due to missing previous volume, log and continue
347
- member_err = ''
348
- if isinstance(member_result, subprocess.CalledProcessError):
349
- member_err = (member_result.stderr or '')
350
- log_message(f"⚠️ Could not extract member {member} from {rar_path}: {member_err.strip()}", "WARNING")
351
-
352
- if extracted_any:
353
- log_message(f"βœ… Finished partial extraction from {rar_path} (some members extracted)", "INFO")
354
- return True
355
- else:
356
- log_message(f"❌ No members could be extracted from {rar_path} independently.", "ERROR")
357
- return False
358
-
359
- # Otherwise, full extraction failed for another reason
360
- log_message(f"❌ RAR extraction failed for {rar_path}. Error: {stderr}", "ERROR")
361
- return False
362
-
363
- except FileNotFoundError:
364
- log_message("❌ 'unrar' command not found. Please ensure 'unrar' is installed.", "ERROR")
365
- return False
366
- except Exception as e:
367
- log_message(f"❌ An unexpected error occurred during RAR extraction: {str(e)}", "ERROR")
368
- return False
369
-
370
- def extract_frames(video_path: str, output_dir: str, fps: int = DEFAULT_FPS) -> bool:
371
- """Extracts frames from a video file at a specified FPS."""
372
- log_message(f"🎬 Extracting frames from {video_path} at {fps} FPS to {output_dir}", "INFO")
373
-
374
- try:
375
- # Create output directory
376
- os.makedirs(output_dir, exist_ok=True)
377
-
378
- # Open the video file
379
- cap = cv2.VideoCapture(video_path)
380
- if not cap.isOpened():
381
- log_message(f"❌ Error opening video file: {video_path}", "ERROR")
382
- return False
383
-
384
- # Get video properties
385
- video_fps = cap.get(cv2.CAP_PROP_FPS)
386
- frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
387
-
388
- # Calculate the frame interval for the desired FPS
389
- if video_fps == 0:
390
- log_message(f"⚠️ Video FPS is 0, cannot extract frames: {video_path}", "WARNING")
391
- return False
392
-
393
- frame_interval = int(round(video_fps / fps))
394
- if frame_interval < 1:
395
- frame_interval = 1 # Extract every frame if video FPS is lower than desired FPS
396
-
397
- log_message(f"Video FPS: {video_fps:.2f}, Total Frames: {frame_count}, Extraction Interval: {frame_interval} frames", "INFO")
398
-
399
- frame_number = 0
400
- extracted_count = 0
401
- while True:
402
- # Set the frame position
403
- cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
404
-
405
- # Read the frame
406
- ret, frame = cap.read()
407
-
408
- # Break the loop if reading failed or end of video
409
- if not ret:
410
- break
411
-
412
- # Construct the output filename
413
- output_filename = os.path.join(output_dir, f"frame_{frame_number:06d}.jpg")
414
-
415
- # Save the frame
416
- cv2.imwrite(output_filename, frame, [cv2.IMWRITE_JPEG_QUALITY, 95])
417
- extracted_count += 1
418
-
419
- # Advance the frame number by the interval
420
- frame_number += frame_interval
421
-
422
- # Break if we've gone past the total frame count
423
- if frame_number >= frame_count:
424
- break
425
-
426
- cap.release()
427
- log_message(f"βœ… Finished extracting {extracted_count} frames from {video_path}", "INFO")
428
- return True
429
-
430
- except Exception as e:
431
- log_message(f"❌ An unexpected error occurred during frame extraction for {video_path}: {str(e)}", "ERROR")
432
- return False
433
-
434
- def zip_directory(source_dir: str, output_zip_path: str) -> bool:
435
- """Compresses a directory into a zip file."""
436
- log_message(f"πŸ—œοΈ Zipping directory: {source_dir} to {output_zip_path}", "INFO")
437
- try:
438
- # shutil.make_archive creates a zip file (without the .zip extension)
439
- # We need to pass the base name without the extension
440
- base_name = os.path.splitext(output_zip_path)[0]
441
- root_dir = os.path.dirname(source_dir)
442
- base_dir = os.path.basename(source_dir)
443
-
444
- shutil.make_archive(base_name, 'zip', root_dir, base_dir)
445
-
446
- log_message(f"βœ… Successfully created zip file: {output_zip_path}", "INFO")
447
- return True
448
- except Exception as e:
449
- log_message(f"❌ Failed to create zip file from {source_dir}: {str(e)}", "ERROR")
450
- return False
451
-
452
- def upload_folder_to_hf(folder_path: str, repo_id: str, path_in_repo: str, commit_message: str) -> bool:
453
- """Uploads an entire folder's contents to a Hugging Face repository."""
454
- log_message(f"⬆️ Uploading folder {folder_path} to {repo_id}/{path_in_repo}", "INFO")
455
- try:
456
- # Collect all files to be uploaded
457
- operations = []
458
- for root, _, files in os.walk(folder_path):
459
- for file in files:
460
- local_path = os.path.join(root, file)
461
- # Calculate the path inside the repository
462
- relative_path = os.path.relpath(local_path, folder_path)
463
- repo_path = os.path.join(path_in_repo, relative_path)
464
-
465
- operations.append(
466
- CommitOperationAdd(path_in_repo=repo_path, path_or_fileobj=local_path)
467
- )
468
-
469
- if not operations:
470
- log_message(f"⚠️ Folder {folder_path} is empty. Skipping upload.", "WARNING")
471
- return True # Consider an empty folder upload successful
472
-
473
- # Perform the upload
474
- hf_api.create_commit(
475
- repo_id=repo_id,
476
- operations=operations,
477
- commit_message=commit_message,
478
- repo_type="dataset"
479
- )
480
-
481
- log_message(f"βœ… Successfully uploaded {len(operations)} files from {folder_path}", "INFO")
482
- return True
483
-
484
- except Exception as e:
485
- log_message(f"❌ Failed to upload folder {folder_path} to Hugging Face: {str(e)}", "ERROR")
486
- return False
487
-
488
- def process_rar_file(rar_path: str) -> bool:
489
- """
490
- Main processing logic for a single RAR file:
491
- 1. Extract RAR
492
- 2. Find video files
493
- 3. Extract frames from each video
494
- 4. Zip the frames folder
495
- 5. Upload the zip file to HF
496
- 6. Clean up local files
497
- """
498
- rar_filename = os.path.basename(rar_path)
499
- base_name = os.path.splitext(rar_filename)[0]
500
-
501
- # 1. Extract RAR
502
- extract_dir = os.path.join(EXTRACT_FOLDER, base_name)
503
- if not extract_rar(rar_path, extract_dir):
504
- log_failed_file(rar_filename, "RAR extraction failed")
505
- return False
506
-
507
- video_files = []
508
- # Search for common video extensions recursively
509
- for ext in ['*.mp4', '*.mkv', '*.avi', '*.mov', '*.webm']:
510
- video_files.extend(Path(extract_dir).rglob(ext))
511
-
512
- if not video_files:
513
- log_message(f"⚠️ No video files found in extracted content for {rar_filename}", "WARNING")
514
- # Clean up the extracted folder
515
- shutil.rmtree(extract_dir, ignore_errors=True)
516
- log_message(f"πŸ—‘οΈ Cleaned up extracted folder: {extract_dir}", "INFO")
517
- log_failed_file(rar_filename, "No video files found")
518
- return False
519
-
520
- success_count = 0
521
-
522
- for video_path_obj in video_files:
523
- video_path = str(video_path_obj)
524
- video_filename = video_path_obj.name
525
- video_base_name = os.path.splitext(video_filename)[0]
526
-
527
- # Create a unique output folder for the frames of this video
528
- frames_output_dir = os.path.join(FRAMES_OUTPUT_FOLDER, base_name, video_base_name)
529
-
530
- # 3. Extract frames
531
- if extract_frames(video_path, frames_output_dir):
532
-
533
- # 4. Zip the frames folder
534
- zip_filename = f"{base_name}_{video_base_name}_frames.zip"
535
- zip_output_path = os.path.join(ZIP_OUTPUT_FOLDER, zip_filename)
536
-
537
- if zip_directory(frames_output_dir, zip_output_path):
538
-
539
- # 5. Upload the zip file to HF
540
- path_in_repo = f"frames_zips/{zip_filename}"
541
- commit_message = f"Add frames zip for video: {video_filename} from archive: {rar_filename}"
542
-
543
- # We use hf_api.upload_file for single file upload
544
- try:
545
- hf_api.upload_file(
546
- path_or_fileobj=zip_output_path,
547
- path_in_repo=path_in_repo,
548
- repo_id=TARGET_REPO_ID,
549
- repo_type="dataset",
550
- commit_message=commit_message
551
- )
552
- log_message(f"βœ… Successfully uploaded zip: {zip_filename}", "INFO")
553
- success_count += 1
554
- processing_status["extracted_videos"] += 1
555
-
556
- # Clean up the zip file after successful upload
557
- os.remove(zip_output_path)
558
- log_message(f"πŸ—‘οΈ Cleaned up local zip file: {zip_output_path}", "INFO")
559
-
560
- except Exception as e:
561
- log_message(f"❌ Failed to upload zip file {zip_output_path}: {str(e)}", "ERROR")
562
- log_failed_file(rar_filename, f"Failed to upload zip for {video_filename}: {str(e)}")
563
- else:
564
- log_failed_file(rar_filename, f"Failed to zip frames for {video_filename}")
565
- else:
566
- log_failed_file(rar_filename, f"Failed to extract frames from {video_filename}")
567
-
568
- # Clean up the frames output directory for this video
569
- shutil.rmtree(frames_output_dir, ignore_errors=True)
570
- log_message(f"πŸ—‘οΈ Cleaned up frames folder: {frames_output_dir}", "INFO")
571
-
572
- # 6. Clean up the extracted folder
573
- shutil.rmtree(extract_dir, ignore_errors=True)
574
- log_message(f"πŸ—‘οΈ Cleaned up extracted folder: {extract_dir}", "INFO")
575
-
576
- if success_count > 0:
577
- processing_status["extracted_courses"] += 1 # Assuming one rar is one course
578
- return True
579
- else:
580
- log_message(f"❌ All video processing failed for {rar_filename}", "ERROR")
581
- return False
582
-
583
- def get_next_file_to_process(repo_id: str, state: Dict[str, Any]) -> Optional[Dict[str, Any]]:
584
- """
585
- Finds the next file to process from the source repo.
586
- Returns: { 'filename': str, 'url': str, 'index': int } or None
587
- """
588
- log_message(f"πŸ” Searching for next file to process in {repo_id}", "INFO")
589
-
590
- try:
591
- # 1. List all files in the source repository
592
- files_list = hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset")
593
-
594
- # 2. Filter for .rar and .zip files
595
- archive_files = sorted([f for f in files_list if f.endswith(('.rar', '.zip'))])
596
-
597
- if not archive_files:
598
- log_message("ℹ️ No .rar or .zip files found in the source repository.", "INFO")
599
- return None
600
-
601
- processing_status["total_files"] = len(archive_files)
602
-
603
- # 3. Get the next index from the state
604
- start_index = state.get("next_download_index", 0)
605
-
606
- # 4. Iterate through files starting from the index
607
- for index in range(start_index, len(archive_files)):
608
- filename = archive_files[index]
609
-
610
- # Check the state of the file
611
- file_state = state["file_states"].get(filename)
612
-
613
- # Only process if the file is not in the state or is marked as 'failed'
614
- if file_state is None or file_state == "failed":
615
-
616
- # Construct the download URL
617
- url = hf_hub_url(repo_id=repo_id, filename=filename, repo_type="dataset", subfolder=None)
618
-
619
- log_message(f"βœ… Found next file: {filename} at index {index}", "INFO")
620
- return {
621
- 'filename': filename,
622
- 'url': url,
623
- 'index': index
624
- }
625
-
626
- elif file_state == "processing":
627
- log_message(f"⚠️ File {filename} is currently marked as 'processing'. Skipping for now.", "WARNING")
628
- # Advance the index if a file is stuck in 'processing' for too long,
629
- # but for now, we'll just skip it and let the loop continue.
630
-
631
- elif file_state == "processed":
632
- log_message(f"ℹ️ File {filename} already processed. Skipping.", "INFO")
633
-
634
- log_message("ℹ️ All files up to the current index have been processed or skipped.", "INFO")
635
-
636
- # If we reach the end of the list, reset the index to 0 to check for new files
637
- if start_index >= len(archive_files):
638
- log_message("ℹ️ Reached end of file list. Resetting index to 0 for next loop.", "INFO")
639
- state["next_download_index"] = 0
640
- upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state)
641
-
642
- return None
643
-
644
- except Exception as e:
645
- log_message(f"❌ Failed to list files from Hugging Face: {str(e)}", "ERROR")
646
- return None
647
-
648
- def main_processing_loop():
649
- """The main loop that orchestrates the download, processing, and upload cycle."""
650
-
651
- if processing_status["is_running"]:
652
- log_message("⚠️ Processing loop is already running.", "WARNING")
653
- return
654
-
655
- processing_status["is_running"] = True
656
-
657
- try:
658
- log_message("πŸš€ Starting main processing loop...", "INFO")
659
-
660
- while processing_status["is_running"]:
661
-
662
- # 1. Download the current state
663
- current_state = download_hf_state(TARGET_REPO_ID, HF_STATE_FILE)
664
-
665
- # 2. Find the next file to process
666
- next_file_info = get_next_file_to_process(SOURCE_REPO_ID, current_state)
667
-
668
- if next_file_info is None:
669
- log_message("πŸ’€ No new files to process. Sleeping for a while...", "INFO")
670
- time.sleep(PROCESSING_DELAY * 5) # Sleep longer if nothing to do
671
- continue
672
-
673
- target_file = next_file_info['filename']
674
- rar_url = next_file_info['url']
675
- target_index = next_file_info['index']
676
-
677
- processing_status["current_file"] = target_file
678
- success = False
679
-
680
- try:
681
- # 3. Lock the file for processing
682
- if not lock_file_for_processing(target_file, current_state):
683
- log_message(f"❌ Failed to lock file {target_file}. Skipping.", "ERROR")
684
- time.sleep(PROCESSING_DELAY)
685
- continue # Start next iteration
686
-
687
- # 4. Download the file
688
- local_rar_path = os.path.join(DOWNLOAD_FOLDER, target_file)
689
- log_message(f"⬇️ Downloading file: {target_file}", "INFO")
690
-
691
- if download_with_retry(rar_url, local_rar_path):
692
-
693
- # 5. Process the file (extraction, frame processing, zipping, uploading results, etc.)
694
- if process_rar_file(local_rar_path):
695
- success = True
696
- log_message(f"βœ… Finished all processing steps for: {target_file}", "INFO")
697
- else:
698
- log_message(f"❌ Processing failed for: {target_file}", "ERROR")
699
- else:
700
- log_message(f"❌ Download failed for: {target_file}", "ERROR")
701
-
702
- except Exception as e:
703
- log_message(f"πŸ”₯ An unhandled error occurred while processing {target_file}: {str(e)}", "ERROR")
704
- log_failed_file(target_file, str(e))
705
-
706
- finally:
707
- # 6. Release Lock / Update State
708
-
709
- # The next index to check will be the one *after* the current file, regardless of success.
710
- next_index_to_save = target_index + 1
711
-
712
- # Download the latest state again before final upload to ensure we don't overwrite
713
- # changes made by other workers in the meantime (e.g. if they processed a file
714
- # that was before this one in the queue).
715
- current_state = download_hf_state(TARGET_REPO_ID, HF_STATE_FILE)
716
-
717
- if success:
718
- # Mark as 'processed' and update the next_download_index
719
- unlock_file_as_processed(target_file, current_state, next_index_to_save)
720
- processing_status["processed_files"] += 1
721
- else:
722
- # If processing failed, we still want to release the 'processing' lock,
723
- # but we mark it as 'failed' instead of 'processed' and still advance the index.
724
- log_message(f"⚠️ Processing failed for {target_file}. Marking as 'failed' and advancing index.", "WARNING")
725
- current_state["file_states"][target_file] = "failed"
726
- current_state["next_download_index"] = next_index_to_save
727
- upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, current_state)
728
- processing_status["failed_files"] += 1
729
-
730
- # Clean up local files
731
- if os.path.exists(local_rar_path):
732
- os.remove(local_rar_path)
733
- log_message(f"πŸ—‘οΈ Cleaned up local file: {local_rar_path}", "INFO")
734
-
735
- # Wait a bit before checking for the next file to avoid hammering the HF API
736
- time.sleep(PROCESSING_DELAY)
737
-
738
- log_message("πŸŽ‰ Processing complete!", "INFO")
739
- log_message(f'πŸ“Š Final stats: {processing_status["extracted_courses"]} courses extracted, {processing_status["extracted_videos"]} videos processed, frames extracted', "INFO")
740
-
741
- except KeyboardInterrupt:
742
- log_message("⏹️ Processing interrupted by user", "WARNING")
743
- except Exception as e:
744
- log_message(f"❌ Fatal error: {str(e)}", "ERROR")
745
- finally:
746
- processing_status["is_running"] = False
747
- cleanup_temp_files()
748
-
749
- # Expose necessary functions and variables
750
- __all__ = [
751
- "main_processing_loop",
752
- "processing_status",
753
- "log_message",
754
- "extract_frames",
755
- "DEFAULT_FPS",
756
- "ensure_dir"
757
- ]
 
1
+ import os
2
+ import json
3
+ import requests
4
+ import subprocess
5
+ import shutil
6
+ import time
7
+ import re
8
+ import threading
9
+ from typing import Dict, List, Set, Optional, Any
10
+ from huggingface_hub import HfApi, list_repo_files, CommitOperationAdd, hf_hub_download, hf_hub_url
11
+
12
+ import cv2
13
+ import numpy as np
14
+ from pathlib import Path
15
+ import smtplib
16
+ from email.message import EmailMessage
17
+
18
+ # ==== CONFIGURATION ====
19
+ HF_TOKEN = os.getenv("HF_TOKEN", "")
20
+ SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "samfred2/TGFiles")
21
+ TARGET_REPO_ID = os.getenv("TARGET_REPO", "samfred2/BG4") # New target repo for uploads
22
+
23
+ # Path Configuration
24
+ DOWNLOAD_FOLDER = "downloads"
25
+ EXTRACT_FOLDER = "extracted"
26
+ FRAMES_OUTPUT_FOLDER = "extracted_frames"
27
+ ZIP_OUTPUT_FOLDER = "zipped_frames" # New folder for zip files
28
+ LOCAL_STATE_FOLDER = ".state" # Folder to temporarily store the downloaded state file
29
+
30
+ os.makedirs(DOWNLOAD_FOLDER, exist_ok=True)
31
+ os.makedirs(EXTRACT_FOLDER, exist_ok=True)
32
+ os.makedirs(FRAMES_OUTPUT_FOLDER, exist_ok=True)
33
+ os.makedirs(ZIP_OUTPUT_FOLDER, exist_ok=True) # Create zip output folder
34
+ os.makedirs(LOCAL_STATE_FOLDER, exist_ok=True)
35
+
36
+ # State Files
37
+ FAILED_FILES_LOG = "failed_files.log"
38
+ HF_STATE_FILE = "processing_state2.json" # New remote state file name
39
+
40
+ # Processing Parameters
41
+ CHUNK_SIZE = 2
42
+ PROCESSING_DELAY = 2
43
+ MAX_RETRIES = 3
44
+ MIN_FREE_SPACE_GB = 2 # Minimum free space in GB before processing
45
+
46
+ # Frame Extraction Parameters
47
+ DEFAULT_FPS = 3 # Default frames per second for extraction
48
+
49
+
50
+ # Initialize HF API
51
+ hf_api = HfApi(token=HF_TOKEN)
52
+
53
+ # Global State
54
+ processing_status = {
55
+ "is_running": False,
56
+ "current_file": None,
57
+ "total_files": 0,
58
+ "processed_files": 0,
59
+ "failed_files": 0,
60
+ "extracted_courses": 0,
61
+ "extracted_videos": 0,
62
+ "last_update": None,
63
+ "logs": []
64
+ }
65
+
66
+ def log_message(message: str, level: str = "INFO"):
67
+ """Log messages with timestamp"""
68
+ timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
69
+ log_entry = f"[{timestamp}] {level}: {message}"
70
+ print(log_entry)
71
+ processing_status["logs"].append(log_entry)
72
+ processing_status["last_update"] = timestamp
73
+ if len(processing_status["logs"]) > 100:
74
+ processing_status["logs"] = processing_status["logs"][-100:]
75
+
76
+ def log_failed_file(filename: str, error: str):
77
+ """Log failed files to persistent file"""
78
+ with open(FAILED_FILES_LOG, "a") as f:
79
+ f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {filename}: {error}\n")
80
+
81
+ def get_disk_usage(path: str) -> Dict[str, float]:
82
+ """Get disk usage statistics in GB"""
83
+ statvfs = os.statvfs(path)
84
+ total = statvfs.f_frsize * statvfs.f_blocks / (1024**3)
85
+ free = statvfs.f_frsize * statvfs.f_bavail / (1024**3)
86
+ used = total - free
87
+ return {"total": total, "free": free, "used": used}
88
+
89
+ def check_disk_space(path: str = ".") -> bool:
90
+ """Check if there's enough disk space"""
91
+ disk_info = get_disk_usage(path)
92
+ if disk_info["free"] < MIN_FREE_SPACE_GB:
93
+ log_message(f'⚠️ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used')
94
+ return False
95
+ return True
96
+
97
+ def cleanup_temp_files():
98
+ """Clean up temporary files to free space"""
99
+ log_message("🧹 Cleaning up temporary files...", "INFO")
100
+
101
+ # Clean old downloads (keep only current processing file)
102
+ current_file = processing_status.get("current_file")
103
+ for file in os.listdir(DOWNLOAD_FOLDER):
104
+ if file != current_file and file.endswith((".rar", ".zip")):
105
+ try:
106
+ os.remove(os.path.join(DOWNLOAD_FOLDER, file))
107
+ log_message(f"πŸ—‘οΈ Removed old download: {file}", "INFO")
108
+ except:
109
+ pass
110
+
111
+ def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
112
+ """Load state from JSON file with migration logic for new structure."""
113
+ if os.path.exists(file_path):
114
+ try:
115
+ with open(file_path, "r") as f:
116
+ data = json.load(f)
117
+
118
+ # --- MIGRATION LOGIC ---
119
+ # 1. Convert old "processed_rars" list to new "file_states" dictionary
120
+ if "processed_rars" in data and isinstance(data["processed_rars"], list):
121
+ log_message("ℹ️ Migrating old 'processed_rars' list to new 'file_states' dictionary.", "INFO")
122
+ data["file_states"] = {
123
+ filename: "processed" for filename in data.pop("processed_rars")
124
+ }
125
+
126
+ # 2. Ensure file_states exists and is a dict
127
+ if "file_states" not in data or not isinstance(data["file_states"], dict):
128
+ log_message("ℹ️ Initializing 'file_states' dictionary.", "INFO")
129
+ data["file_states"] = {}
130
+
131
+ # 3. Ensure next_download_index exists
132
+ if "next_download_index" not in data:
133
+ data["next_download_index"] = 0
134
+
135
+ return data
136
+ except json.JSONDecodeError:
137
+ log_message(f"⚠️ Corrupted state file: {file_path}", "WARNING")
138
+ return default_value
139
+
140
+ def save_json_state(file_path: str, data: Dict[str, Any]):
141
+ """Save state to JSON file"""
142
+ with open(file_path, "w") as f:
143
+ json.dump(data, f, indent=2)
144
+
145
+ def download_hf_state(repo_id: str, filename: str) -> Dict[str, Any]:
146
+ """Downloads the state file from Hugging Face or returns a default state."""
147
+ local_path = os.path.join(LOCAL_STATE_FOLDER, filename)
148
+ # Changed default state to use 'file_states' for the new structure
149
+ default_state = {"next_download_index": 0, "file_states": {}}
150
+
151
+ try:
152
+ # Check if the file exists in the repo first
153
+ files = hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset")
154
+ if filename not in files:
155
+ log_message(f"ℹ️ State file {filename} not found in {repo_id}. Starting from default state.", "INFO")
156
+ return default_state
157
+
158
+ # Download the file
159
+ hf_hub_download(
160
+ repo_id=repo_id,
161
+ filename=filename,
162
+ repo_type="dataset",
163
+ local_dir=LOCAL_STATE_FOLDER,
164
+ local_dir_use_symlinks=False
165
+ )
166
+
167
+ log_message(f"βœ… Successfully downloaded state file from {repo_id}.", "INFO")
168
+ # Use the modified load_json_state which handles migration
169
+ return load_json_state(local_path, default_state)
170
+
171
+ except Exception as e:
172
+ log_message(f"⚠️ Failed to download state file from Hugging Face: {str(e)}. Starting from default state.", "WARNING")
173
+ return default_state
174
+
175
+ def upload_hf_state(repo_id: str, filename: str, state: Dict[str, Any]) -> bool:
176
+ """Uploads the state file to Hugging Face."""
177
+ local_path = os.path.join(LOCAL_STATE_FOLDER, filename)
178
+
179
+ try:
180
+ # 1. Save the updated state locally
181
+ save_json_state(local_path, state)
182
+
183
+ # 2. Upload the file
184
+ hf_api.upload_file(
185
+ path_or_fileobj=local_path,
186
+ path_in_repo=filename,
187
+ repo_id=repo_id,
188
+ repo_type="dataset",
189
+ commit_message=f"Update processing state: next_index={state['next_download_index']}"
190
+ )
191
+ log_message(f"βœ… Successfully uploaded updated state file to {repo_id}", "INFO")
192
+ return True
193
+ except Exception as e:
194
+ log_message(f"❌ Failed to upload state file to Hugging Face: {str(e)}", "ERROR")
195
+ return False
196
+
197
+ # --- NEW LOCKING FUNCTIONS ---
198
+
199
+ def lock_file_for_processing(rar_filename: str, state: Dict[str, Any]) -> bool:
200
+ """Marks a file as 'processing' in the state file and uploads the lock."""
201
+ log_message(f"πŸ”’ Attempting to lock file: {rar_filename} (Marking as 'processing')", "INFO")
202
+
203
+ # Update state locally
204
+ state["file_states"][rar_filename] = "processing"
205
+
206
+ # Upload the updated state file immediately to establish the lock
207
+ if upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state):
208
+ log_message(f"βœ… Successfully locked file: {rar_filename}", "INFO")
209
+ return True
210
+ else:
211
+ log_message(f"❌ Failed to upload lock for file: {rar_filename}. Aborting processing.", "ERROR")
212
+ # Revert local state to avoid confusion if upload failed
213
+ if rar_filename in state["file_states"]:
214
+ del state["file_states"][rar_filename]
215
+ return False
216
+
217
+ def unlock_file_as_processed(rar_filename: str, state: Dict[str, Any], next_index: int) -> bool:
218
+ """Marks a file as 'processed', updates the index, and uploads the state."""
219
+ log_message(f"πŸ”“ Attempting to unlock file: {rar_filename} (Marking as 'processed')", "INFO")
220
+
221
+ # Update state locally
222
+ state["file_states"][rar_filename] = "processed"
223
+ state["next_download_index"] = next_index
224
+
225
+ # Upload the updated state file
226
+ if upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state):
227
+ log_message(f"βœ… Successfully unlocked and marked as processed: {rar_filename}", "INFO")
228
+ return True
229
+ else:
230
+ log_message(f"❌ Failed to upload final state for file: {rar_filename}. The file is processed locally but state is not updated.", "ERROR")
231
+ return False
232
+
233
+ # --- Original Utility Functions ---
234
+
235
+ def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool:
236
+ """Download file with retry logic and disk space checking"""
237
+ if not check_disk_space():
238
+ cleanup_temp_files()
239
+ if not check_disk_space():
240
+ log_message("❌ Insufficient disk space even after cleanup", "ERROR")
241
+ return False
242
+
243
+ # NEW FIX: Ensure the directory structure exists before attempting to write the file
244
+ try:
245
+ os.makedirs(os.path.dirname(dest_path), exist_ok=True)
246
+ except Exception as e:
247
+ log_message(f"❌ Failed to create directory for download path {os.path.dirname(dest_path)}: {str(e)}", "ERROR")
248
+ return False
249
+
250
+ headers = {"Authorization": f"Bearer {HF_TOKEN}"}
251
+ for attempt in range(max_retries):
252
+ try:
253
+ with requests.get(url, headers=headers, stream=True) as r:
254
+ r.raise_for_status()
255
+
256
+ # Check content length if available
257
+ content_length = r.headers.get("content-length")
258
+ if content_length:
259
+ size_gb = int(content_length) / (1024**3)
260
+ disk_info = get_disk_usage(".")
261
+ # Check if there is enough space for the full download
262
+ if disk_info["free"] < size_gb + MIN_FREE_SPACE_GB:
263
+ log_message(f"⚠️ Not enough space for download ({size_gb:.2f}GB required). Freeing space...", "WARNING")
264
+ cleanup_temp_files()
265
+ disk_info = get_disk_usage(".")
266
+ if disk_info["free"] < size_gb + MIN_FREE_SPACE_GB:
267
+ log_message(f"❌ Still not enough space for download. Required: {size_gb + MIN_FREE_SPACE_GB:.2f}GB, Available: {disk_info['free']:.2f}GB", "ERROR")
268
+ return False
269
+
270
+ # Download the file chunk by chunk
271
+ with open(dest_path, "wb") as f:
272
+ for chunk in r.iter_content(chunk_size=8192):
273
+ if chunk: # filter out keep-alive new chunks
274
+ f.write(chunk)
275
+
276
+ log_message(f"βœ… Download successful: {dest_path}", "INFO")
277
+ return True
278
+
279
+ except requests.exceptions.RequestException as e:
280
+ log_message(f"❌ Download attempt {attempt + 1} failed for {url}: {str(e)}", "WARNING")
281
+ time.sleep(PROCESSING_DELAY)
282
+ except Exception as e:
283
+ log_message(f"❌ An unexpected error occurred during download: {str(e)}", "ERROR")
284
+ return False
285
+
286
+ log_message(f"❌ Failed to download {url} after {max_retries} attempts.", "ERROR")
287
+ return False
288
+
289
+ def extract_rar(rar_path: str, extract_path: str) -> bool:
290
+ """Extracts a RAR file using unrar (requires unrar to be installed)."""
291
+ log_message(f"πŸ“¦ Attempting to extract RAR: {rar_path} to {extract_path}", "INFO")
292
+
293
+ # Helper to run a command and return (success, completed_process_or_exception)
294
+ def _run(cmd):
295
+ try:
296
+ proc = subprocess.run(cmd, capture_output=True, text=True, check=True)
297
+ return True, proc
298
+ except subprocess.CalledProcessError as e:
299
+ return False, e
300
+
301
+ try:
302
+ # Create the extraction directory if it doesn't exist
303
+ os.makedirs(extract_path, exist_ok=True)
304
+
305
+ # First try a normal full extraction
306
+ command = ["unrar", "x", "-o+", "-y", rar_path, extract_path]
307
+ ok, result = _run(command)
308
+ if ok:
309
+ # Successful full extraction
310
+ if hasattr(result, 'stdout') and "All OK" not in result.stdout:
311
+ log_message(f"⚠️ RAR extraction finished with warnings/non-fatal errors for {rar_path}: {result.stdout}", "WARNING")
312
+ log_message(f"βœ… Successfully extracted RAR: {rar_path}", "INFO")
313
+ return True
314
+
315
+ # If full extraction failed, inspect the error to see if it's a multipart/volume dependency
316
+ stderr = ''
317
+ if isinstance(result, subprocess.CalledProcessError):
318
+ stderr = (result.stderr or '')
319
+
320
+ # Common message when a previous volume is required
321
+ if "start extraction from a previous volume" in stderr.lower() or "previous volume" in stderr.lower() or "you need to start extraction" in stderr.lower():
322
+ log_message(f"⚠️ Full extraction failed due to multipart volume dependency for {rar_path}. Will attempt per-file extraction fallback.", "WARNING")
323
+
324
+ # Attempt to list files contained in this archive volume
325
+ list_cmd = ["unrar", "lb", rar_path]
326
+ ok_list, list_result = _run(list_cmd)
327
+ if not ok_list:
328
+ log_message(f"❌ Failed to list archive contents for {rar_path}: {(list_result.stderr if isinstance(list_result, subprocess.CalledProcessError) else str(list_result))}", "ERROR")
329
+ return False
330
+
331
+ file_list = [ln.strip() for ln in (list_result.stdout or '').splitlines() if ln.strip()]
332
+ if not file_list:
333
+ log_message(f"⚠️ Archive {rar_path} appears empty or listing failed. Cannot extract.", "WARNING")
334
+ return False
335
+
336
+ extracted_any = False
337
+ # Try to extract each file individually; skip files that require previous volumes
338
+ for member in file_list:
339
+ # Use 'unrar x <archive> <member> <dest>' to extract a specific file
340
+ cmd = ["unrar", "x", "-o+", "-y", rar_path, member, extract_path]
341
+ ok_member, member_result = _run(cmd)
342
+ if ok_member:
343
+ extracted_any = True
344
+ log_message(f"βœ… Extracted member {member} from {rar_path}", "INFO")
345
+ else:
346
+ # If this member failed due to missing previous volume, log and continue
347
+ member_err = ''
348
+ if isinstance(member_result, subprocess.CalledProcessError):
349
+ member_err = (member_result.stderr or '')
350
+ log_message(f"⚠️ Could not extract member {member} from {rar_path}: {member_err.strip()}", "WARNING")
351
+
352
+ if extracted_any:
353
+ log_message(f"βœ… Finished partial extraction from {rar_path} (some members extracted)", "INFO")
354
+ return True
355
+ else:
356
+ log_message(f"❌ No members could be extracted from {rar_path} independently.", "ERROR")
357
+ return False
358
+
359
+ # Otherwise, full extraction failed for another reason
360
+ log_message(f"❌ RAR extraction failed for {rar_path}. Error: {stderr}", "ERROR")
361
+ return False
362
+
363
+ except FileNotFoundError:
364
+ log_message("❌ 'unrar' command not found. Please ensure 'unrar' is installed.", "ERROR")
365
+ return False
366
+ except Exception as e:
367
+ log_message(f"❌ An unexpected error occurred during RAR extraction: {str(e)}", "ERROR")
368
+ return False
369
+
370
+ def extract_frames(video_path: str, output_dir: str, fps: int = DEFAULT_FPS) -> bool:
371
+ """Extracts frames from a video file at a specified FPS."""
372
+ log_message(f"🎬 Extracting frames from {video_path} at {fps} FPS to {output_dir}", "INFO")
373
+
374
+ try:
375
+ # Create output directory
376
+ os.makedirs(output_dir, exist_ok=True)
377
+
378
+ # Open the video file
379
+ cap = cv2.VideoCapture(video_path)
380
+ if not cap.isOpened():
381
+ log_message(f"❌ Error opening video file: {video_path}", "ERROR")
382
+ return False
383
+
384
+ # Get video properties
385
+ video_fps = cap.get(cv2.CAP_PROP_FPS)
386
+ frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
387
+
388
+ # Calculate the frame interval for the desired FPS
389
+ if video_fps == 0:
390
+ log_message(f"⚠️ Video FPS is 0, cannot extract frames: {video_path}", "WARNING")
391
+ return False
392
+
393
+ frame_interval = int(round(video_fps / fps))
394
+ if frame_interval < 1:
395
+ frame_interval = 1 # Extract every frame if video FPS is lower than desired FPS
396
+
397
+ log_message(f"Video FPS: {video_fps:.2f}, Total Frames: {frame_count}, Extraction Interval: {frame_interval} frames", "INFO")
398
+
399
+ frame_number = 0
400
+ extracted_count = 0
401
+ while True:
402
+ # Set the frame position
403
+ cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
404
+
405
+ # Read the frame
406
+ ret, frame = cap.read()
407
+
408
+ # Break the loop if reading failed or end of video
409
+ if not ret:
410
+ break
411
+
412
+ # Construct the output filename
413
+ output_filename = os.path.join(output_dir, f"frame_{frame_number:06d}.jpg")
414
+
415
+ # Save the frame
416
+ cv2.imwrite(output_filename, frame, [cv2.IMWRITE_JPEG_QUALITY, 95])
417
+ extracted_count += 1
418
+
419
+ # Advance the frame number by the interval
420
+ frame_number += frame_interval
421
+
422
+ # Break if we've gone past the total frame count
423
+ if frame_number >= frame_count:
424
+ break
425
+
426
+ cap.release()
427
+ log_message(f"βœ… Finished extracting {extracted_count} frames from {video_path}", "INFO")
428
+ return True
429
+
430
+ except Exception as e:
431
+ log_message(f"❌ An unexpected error occurred during frame extraction for {video_path}: {str(e)}", "ERROR")
432
+ return False
433
+
434
+ def zip_directory(source_dir: str, output_zip_path: str) -> bool:
435
+ """Compresses a directory into a zip file."""
436
+ log_message(f"πŸ—œοΈ Zipping directory: {source_dir} to {output_zip_path}", "INFO")
437
+ try:
438
+ # shutil.make_archive creates a zip file (without the .zip extension)
439
+ # We need to pass the base name without the extension
440
+ base_name = os.path.splitext(output_zip_path)[0]
441
+ root_dir = os.path.dirname(source_dir)
442
+ base_dir = os.path.basename(source_dir)
443
+
444
+ shutil.make_archive(base_name, 'zip', root_dir, base_dir)
445
+
446
+ log_message(f"βœ… Successfully created zip file: {output_zip_path}", "INFO")
447
+ return True
448
+ except Exception as e:
449
+ log_message(f"❌ Failed to create zip file from {source_dir}: {str(e)}", "ERROR")
450
+ return False
451
+
452
+ def upload_folder_to_hf(folder_path: str, repo_id: str, path_in_repo: str, commit_message: str) -> bool:
453
+ """Uploads an entire folder's contents to a Hugging Face repository."""
454
+ log_message(f"⬆️ Uploading folder {folder_path} to {repo_id}/{path_in_repo}", "INFO")
455
+ try:
456
+ # Collect all files to be uploaded
457
+ operations = []
458
+ for root, _, files in os.walk(folder_path):
459
+ for file in files:
460
+ local_path = os.path.join(root, file)
461
+ # Calculate the path inside the repository
462
+ relative_path = os.path.relpath(local_path, folder_path)
463
+ repo_path = os.path.join(path_in_repo, relative_path)
464
+
465
+ operations.append(
466
+ CommitOperationAdd(path_in_repo=repo_path, path_or_fileobj=local_path)
467
+ )
468
+
469
+ if not operations:
470
+ log_message(f"⚠️ Folder {folder_path} is empty. Skipping upload.", "WARNING")
471
+ return True # Consider an empty folder upload successful
472
+
473
+ # Perform the upload
474
+ hf_api.create_commit(
475
+ repo_id=repo_id,
476
+ operations=operations,
477
+ commit_message=commit_message,
478
+ repo_type="dataset"
479
+ )
480
+
481
+ log_message(f"βœ… Successfully uploaded {len(operations)} files from {folder_path}", "INFO")
482
+ return True
483
+
484
+ except Exception as e:
485
+ log_message(f"❌ Failed to upload folder {folder_path} to Hugging Face: {str(e)}", "ERROR")
486
+ return False
487
+
488
+ def process_rar_file(rar_path: str) -> bool:
489
+ """
490
+ Main processing logic for a single RAR file:
491
+ 1. Extract RAR
492
+ 2. Find video files
493
+ 3. Extract frames from each video
494
+ 4. Zip the frames folder
495
+ 5. Upload the zip file to HF
496
+ 6. Clean up local files
497
+ """
498
+ rar_filename = os.path.basename(rar_path)
499
+ base_name = os.path.splitext(rar_filename)[0]
500
+
501
+ # 1. Extract RAR
502
+ extract_dir = os.path.join(EXTRACT_FOLDER, base_name)
503
+ if not extract_rar(rar_path, extract_dir):
504
+ log_failed_file(rar_filename, "RAR extraction failed")
505
+ return False
506
+
507
+ video_files = []
508
+ # Search for common video extensions recursively
509
+ for ext in ['*.mp4', '*.mkv', '*.avi', '*.mov', '*.webm']:
510
+ video_files.extend(Path(extract_dir).rglob(ext))
511
+
512
+ if not video_files:
513
+ log_message(f"⚠️ No video files found in extracted content for {rar_filename}", "WARNING")
514
+ # Clean up the extracted folder
515
+ shutil.rmtree(extract_dir, ignore_errors=True)
516
+ log_message(f"πŸ—‘οΈ Cleaned up extracted folder: {extract_dir}", "INFO")
517
+ log_failed_file(rar_filename, "No video files found")
518
+ return False
519
+
520
+ success_count = 0
521
+
522
+ for video_path_obj in video_files:
523
+ video_path = str(video_path_obj)
524
+ video_filename = video_path_obj.name
525
+ video_base_name = os.path.splitext(video_filename)[0]
526
+
527
+ # Create a unique output folder for the frames of this video
528
+ frames_output_dir = os.path.join(FRAMES_OUTPUT_FOLDER, base_name, video_base_name)
529
+
530
+ # 3. Extract frames
531
+ if extract_frames(video_path, frames_output_dir):
532
+
533
+ # 4. Zip the frames folder
534
+ zip_filename = f"{base_name}_{video_base_name}_frames.zip"
535
+ zip_output_path = os.path.join(ZIP_OUTPUT_FOLDER, zip_filename)
536
+
537
+ if zip_directory(frames_output_dir, zip_output_path):
538
+
539
+ # 5. Upload the zip file to HF
540
+ path_in_repo = f"frames_zips/{zip_filename}"
541
+ commit_message = f"Add frames zip for video: {video_filename} from archive: {rar_filename}"
542
+
543
+ # We use hf_api.upload_file for single file upload
544
+ try:
545
+ hf_api.upload_file(
546
+ path_or_fileobj=zip_output_path,
547
+ path_in_repo=path_in_repo,
548
+ repo_id=TARGET_REPO_ID,
549
+ repo_type="dataset",
550
+ commit_message=commit_message
551
+ )
552
+ log_message(f"βœ… Successfully uploaded zip: {zip_filename}", "INFO")
553
+ success_count += 1
554
+ processing_status["extracted_videos"] += 1
555
+
556
+ # Clean up the zip file after successful upload
557
+ os.remove(zip_output_path)
558
+ log_message(f"πŸ—‘οΈ Cleaned up local zip file: {zip_output_path}", "INFO")
559
+
560
+ except Exception as e:
561
+ log_message(f"❌ Failed to upload zip file {zip_output_path}: {str(e)}", "ERROR")
562
+ log_failed_file(rar_filename, f"Failed to upload zip for {video_filename}: {str(e)}")
563
+ else:
564
+ log_failed_file(rar_filename, f"Failed to zip frames for {video_filename}")
565
+ else:
566
+ log_failed_file(rar_filename, f"Failed to extract frames from {video_filename}")
567
+
568
+ # Clean up the frames output directory for this video
569
+ shutil.rmtree(frames_output_dir, ignore_errors=True)
570
+ log_message(f"πŸ—‘οΈ Cleaned up frames folder: {frames_output_dir}", "INFO")
571
+
572
+ # 6. Clean up the extracted folder
573
+ shutil.rmtree(extract_dir, ignore_errors=True)
574
+ log_message(f"πŸ—‘οΈ Cleaned up extracted folder: {extract_dir}", "INFO")
575
+
576
+ if success_count > 0:
577
+ processing_status["extracted_courses"] += 1 # Assuming one rar is one course
578
+ return True
579
+ else:
580
+ log_message(f"❌ All video processing failed for {rar_filename}", "ERROR")
581
+ return False
582
+
583
+ def get_next_file_to_process(repo_id: str, state: Dict[str, Any]) -> Optional[Dict[str, Any]]:
584
+ """
585
+ Finds the next file to process from the source repo.
586
+ Returns: { 'filename': str, 'url': str, 'index': int } or None
587
+ """
588
+ log_message(f"πŸ” Searching for next file to process in {repo_id}", "INFO")
589
+
590
+ try:
591
+ # 1. List all files in the source repository
592
+ files_list = hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset")
593
+
594
+ # 2. Filter for .rar and .zip files
595
+ archive_files = sorted([f for f in files_list if f.endswith(('.rar', '.zip'))])
596
+
597
+ if not archive_files:
598
+ log_message("ℹ️ No .rar or .zip files found in the source repository.", "INFO")
599
+ return None
600
+
601
+ processing_status["total_files"] = len(archive_files)
602
+
603
+ # 3. Get the next index from the state
604
+ start_index = state.get("next_download_index", 0)
605
+
606
+ # 4. Iterate through files starting from the index
607
+ for index in range(start_index, len(archive_files)):
608
+ filename = archive_files[index]
609
+
610
+ # Check the state of the file
611
+ file_state = state["file_states"].get(filename)
612
+
613
+ # Only process if the file is not in the state or is marked as 'failed'
614
+ if file_state is None or file_state == "failed":
615
+
616
+ # Construct the download URL
617
+ url = hf_hub_url(repo_id=repo_id, filename=filename, repo_type="dataset", subfolder=None)
618
+
619
+ log_message(f"βœ… Found next file: {filename} at index {index}", "INFO")
620
+ return {
621
+ 'filename': filename,
622
+ 'url': url,
623
+ 'index': index
624
+ }
625
+
626
+ elif file_state == "processing":
627
+ log_message(f"⚠️ File {filename} is currently marked as 'processing'. Skipping for now.", "WARNING")
628
+ # Advance the index if a file is stuck in 'processing' for too long,
629
+ # but for now, we'll just skip it and let the loop continue.
630
+
631
+ elif file_state == "processed":
632
+ log_message(f"ℹ️ File {filename} already processed. Skipping.", "INFO")
633
+
634
+ log_message("ℹ️ All files up to the current index have been processed or skipped.", "INFO")
635
+
636
+ # If we reach the end of the list, reset the index to 0 to check for new files
637
+ if start_index >= len(archive_files):
638
+ log_message("ℹ️ Reached end of file list. Resetting index to 0 for next loop.", "INFO")
639
+ state["next_download_index"] = 0
640
+ upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state)
641
+
642
+ return None
643
+
644
+ except Exception as e:
645
+ log_message(f"❌ Failed to list files from Hugging Face: {str(e)}", "ERROR")
646
+ return None
647
+
648
+ def main_processing_loop():
649
+ """The main loop that orchestrates the download, processing, and upload cycle."""
650
+
651
+ if processing_status["is_running"]:
652
+ log_message("⚠️ Processing loop is already running.", "WARNING")
653
+ return
654
+
655
+ processing_status["is_running"] = True
656
+
657
+ try:
658
+ log_message("πŸš€ Starting main processing loop...", "INFO")
659
+
660
+ while processing_status["is_running"]:
661
+
662
+ # 1. Download the current state
663
+ current_state = download_hf_state(TARGET_REPO_ID, HF_STATE_FILE)
664
+
665
+ # 2. Find the next file to process
666
+ next_file_info = get_next_file_to_process(SOURCE_REPO_ID, current_state)
667
+
668
+ if next_file_info is None:
669
+ log_message("πŸ’€ No new files to process. Sleeping for a while...", "INFO")
670
+ time.sleep(PROCESSING_DELAY * 5) # Sleep longer if nothing to do
671
+ continue
672
+
673
+ target_file = next_file_info['filename']
674
+ rar_url = next_file_info['url']
675
+ target_index = next_file_info['index']
676
+
677
+ processing_status["current_file"] = target_file
678
+ success = False
679
+
680
+ try:
681
+ # 3. Lock the file for processing
682
+ if not lock_file_for_processing(target_file, current_state):
683
+ log_message(f"❌ Failed to lock file {target_file}. Skipping.", "ERROR")
684
+ time.sleep(PROCESSING_DELAY)
685
+ continue # Start next iteration
686
+
687
+ # 4. Download the file
688
+ local_rar_path = os.path.join(DOWNLOAD_FOLDER, target_file)
689
+ log_message(f"⬇️ Downloading file: {target_file}", "INFO")
690
+
691
+ if download_with_retry(rar_url, local_rar_path):
692
+
693
+ # 5. Process the file (extraction, frame processing, zipping, uploading results, etc.)
694
+ if process_rar_file(local_rar_path):
695
+ success = True
696
+ log_message(f"βœ… Finished all processing steps for: {target_file}", "INFO")
697
+ else:
698
+ log_message(f"❌ Processing failed for: {target_file}", "ERROR")
699
+ else:
700
+ log_message(f"❌ Download failed for: {target_file}", "ERROR")
701
+
702
+ except Exception as e:
703
+ log_message(f"πŸ”₯ An unhandled error occurred while processing {target_file}: {str(e)}", "ERROR")
704
+ log_failed_file(target_file, str(e))
705
+
706
+ finally:
707
+ # 6. Release Lock / Update State
708
+
709
+ # The next index to check will be the one *after* the current file, regardless of success.
710
+ next_index_to_save = target_index + 1
711
+
712
+ # Download the latest state again before final upload to ensure we don't overwrite
713
+ # changes made by other workers in the meantime (e.g. if they processed a file
714
+ # that was before this one in the queue).
715
+ current_state = download_hf_state(TARGET_REPO_ID, HF_STATE_FILE)
716
+
717
+ if success:
718
+ # Mark as 'processed' and update the next_download_index
719
+ unlock_file_as_processed(target_file, current_state, next_index_to_save)
720
+ processing_status["processed_files"] += 1
721
+ else:
722
+ # If processing failed, we still want to release the 'processing' lock,
723
+ # but we mark it as 'failed' instead of 'processed' and still advance the index.
724
+ log_message(f"⚠️ Processing failed for {target_file}. Marking as 'failed' and advancing index.", "WARNING")
725
+ current_state["file_states"][target_file] = "failed"
726
+ current_state["next_download_index"] = next_index_to_save
727
+ upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, current_state)
728
+ processing_status["failed_files"] += 1
729
+
730
+ # Clean up local files
731
+ if os.path.exists(local_rar_path):
732
+ os.remove(local_rar_path)
733
+ log_message(f"πŸ—‘οΈ Cleaned up local file: {local_rar_path}", "INFO")
734
+
735
+ # Wait a bit before checking for the next file to avoid hammering the HF API
736
+ time.sleep(PROCESSING_DELAY)
737
+
738
+ log_message("πŸŽ‰ Processing complete!", "INFO")
739
+ log_message(f'πŸ“Š Final stats: {processing_status["extracted_courses"]} courses extracted, {processing_status["extracted_videos"]} videos processed, frames extracted', "INFO")
740
+
741
+ except KeyboardInterrupt:
742
+ log_message("⏹️ Processing interrupted by user", "WARNING")
743
+ except Exception as e:
744
+ log_message(f"❌ Fatal error: {str(e)}", "ERROR")
745
+ finally:
746
+ processing_status["is_running"] = False
747
+ cleanup_temp_files()
748
+
749
+ # Expose necessary functions and variables
750
+ __all__ = [
751
+ "main_processing_loop",
752
+ "processing_status",
753
+ "log_message",
754
+ "extract_frames",
755
+ "DEFAULT_FPS",
756
+ "ensure_dir"
757
+ ]