File size: 15,894 Bytes
de5424a 8dbc51c de5424a 03e22a7 de5424a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 |
import os
import json
import requests
import subprocess
import shutil
import time
import threading
import multiprocessing
from typing import Dict, List, Optional
from pathlib import Path
from huggingface_hub import HfApi
import uuid
import frame_extractor # Our frame extraction module
# ==== CONFIGURATION ====
HF_TOKEN = os.getenv("HF_TOKEN", "")
SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1")
# Directory Configuration
UPLOAD_DIRECTORY = "./uploads"
DOWNLOAD_FOLDER = "./downloads"
EXTRACT_FOLDER = "./extracted"
MP4_OUTPUT_FOLDER = "./mp4_files"
# Create directories
for directory in [UPLOAD_DIRECTORY, DOWNLOAD_FOLDER, EXTRACT_FOLDER, MP4_OUTPUT_FOLDER]:
os.makedirs(directory, exist_ok=True)
# State Files
DOWNLOAD_STATE_FILE = "download_progress.json"
PROCESS_STATE_FILE = "process_progress.json"
FAILED_FILES_LOG = "failed_files.log"
# Processing Parameters
MAX_RETRIES = 3
MIN_FREE_SPACE_GB = 2
DEFAULT_RAR_LIMIT = 1 # Default number of RAR files to process
# Initialize HF API
hf_api = HfApi(token=HF_TOKEN) if HF_TOKEN else None
# Global State
processing_status = {
"is_running": False,
"current_file": None,
"total_files": 0,
"processed_files": 0,
"failed_files": 0,
"extracted_courses": 0,
"extracted_mp4s": 0,
"last_update": None,
"logs": []
}
# Store for uploaded MP4s with metadata (this will be managed by the API part, but needs to be accessible)
uploaded_mp4s = {}
def log_message(message: str):
"""Log messages with timestamp"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
print(log_entry)
processing_status["logs"].append(log_entry)
processing_status["last_update"] = timestamp
if len(processing_status["logs"]) > 100:
processing_status["logs"] = processing_status["logs"][-100:]
def log_failed_file(filename: str, error: str):
"""Log failed files to persistent file"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
with open(FAILED_FILES_LOG, "a") as f:
f.write(f"{timestamp} - {filename}: {error}\n")
def get_disk_usage(path: str) -> Dict[str, float]:
"""Get disk usage statistics in GB"""
statvfs = os.statvfs(path)
total = statvfs.f_frsize * statvfs.f_blocks / (1024**3)
free = statvfs.f_frsize * statvfs.f_bavail / (1024**3)
used = total - free
return {"total": total, "free": free, "used": used}
def check_disk_space(path: str = ".") -> bool:
"""Check if there\'s enough disk space"""
disk_info = get_disk_usage(path)
if disk_info["free"] < MIN_FREE_SPACE_GB:
log_message(f'β οΈ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used')
return False
return True
def cleanup_temp_files():
"""Clean up temporary files to free space"""
log_message("π§Ή Cleaning up temporary files...")
# Clean old downloads (keep only current processing file)
current_file = processing_status.get("current_file")
for file in os.listdir(DOWNLOAD_FOLDER):
if file != current_file and file.endswith((".rar", ".zip")):
try:
os.remove(os.path.join(DOWNLOAD_FOLDER, file))
log_message(f"ποΈ Removed old download: {file}")
except:
pass
def load_json_state(file_path: str, default_value):
"""Load state from JSON file"""
if os.path.exists(file_path):
try:
with open(file_path, "r") as f:
return json.load(f)
except json.JSONDecodeError:
log_message(f"β οΈ Corrupted state file: {file_path}")
return default_value
def save_json_state(file_path: str, data):
"""Save state to JSON file"""
with open(file_path, "w") as f:
json.dump(data, f, indent=2)
def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool:
"""Download file with retry logic and disk space checking"""
if not check_disk_space():
cleanup_temp_files()
if not check_disk_space():
log_message("β Insufficient disk space even after cleanup")
return False
headers = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {}
for attempt in range(max_retries):
try:
with requests.get(url, headers=headers, stream=True) as r:
r.raise_for_status()
# Check content length if available
content_length = r.headers.get("content-length")
if content_length:
size_gb = int(content_length) / (1024**3)
disk_info = get_disk_usage(".")
if size_gb > disk_info["free"] - 0.5: # Leave 0.5GB buffer
log_message(f'β File too large: {size_gb:.2f}GB, only {disk_info["free"]:.2f}GB free')
return False
with open(dest_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
return True
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
log_message(f"β Download failed after {max_retries} attempts: {e}")
return False
return False
def is_multipart_rar(filename: str) -> bool:
"""Check if this is a multi-part RAR file"""
return ".part" in filename.lower() and filename.lower().endswith(".rar")
def get_rar_part_base(filename: str) -> str:
"""Get the base name for multi-part RAR files"""
if ".part" in filename.lower():
return filename.split(".part")[0]
return filename.replace(".rar", "")
def extract_with_retry(rar_path: str, output_dir: str, max_retries: int = 2) -> bool:
"""Extract RAR with retry and recovery, handling multi-part archives"""
filename = os.path.basename(rar_path)
# For multi-part RARs, we need the first part
if is_multipart_rar(filename):
base_name = get_rar_part_base(filename)
first_part = f"{base_name}.part01.rar"
first_part_path = os.path.join(os.path.dirname(rar_path), first_part)
if not os.path.exists(first_part_path):
log_message(f"β οΈ Multi-part RAR detected but first part not found: {first_part}")
return False
rar_path = first_part_path
log_message(f"π¦ Processing multi-part RAR starting with: {first_part}")
for attempt in range(max_retries):
try:
# Test RAR first
test_cmd = ["unrar", "t", rar_path]
test_result = subprocess.run(test_cmd, capture_output=True, text=True)
if test_result.returncode != 0:
log_message(f"β οΈ RAR test failed: {test_result.stderr}")
if attempt == max_retries - 1:
return False
continue
# Extract RAR
cmd = ["unrar", "x", "-o+", rar_path, output_dir]
if attempt > 0: # Try recovery on subsequent attempts
cmd.insert(2, "-kb")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
log_message(f"β
Successfully extracted: {os.path.basename(rar_path)}")
return True
else:
error_msg = result.stderr or result.stdout
log_message(f"β οΈ Extraction attempt {attempt + 1} failed: {error_msg}")
except Exception as e:
log_message(f"β Extraction exception: {str(e)}")
if attempt == max_retries - 1:
return False
time.sleep(1)
return False
def process_rar_file(rar_path: str) -> List[Dict]:
"""Process a single RAR file - extract and find MP4 files"""
filename = os.path.basename(rar_path)
processing_status["current_file"] = filename
# Handle multi-part RAR naming
if is_multipart_rar(filename):
course_name = get_rar_part_base(filename)
else:
course_name = filename.replace(".rar", "")
# Create a unique directory for this course's extracted MP4s
course_mp4_output_dir = os.path.join(MP4_OUTPUT_FOLDER, course_name)
os.makedirs(course_mp4_output_dir, exist_ok=True)
extract_dir = os.path.join(EXTRACT_FOLDER, course_name)
mp4_files = []
try:
log_message(f"π Processing: {filename}")
# Clean up any existing directory
if os.path.exists(extract_dir):
shutil.rmtree(extract_dir, ignore_errors=True)
# Extract RAR
os.makedirs(extract_dir, exist_ok=True)
if not extract_with_retry(rar_path, extract_dir):
raise Exception("RAR extraction failed")
# Find and copy MP4 files
for root, dirs, files in os.walk(extract_dir):
for file in files:
if file.lower().endswith(".mp4"):
source_path = os.path.join(root, file)
# Use original filename for MP4 output within the course directory
dest_path = os.path.join(course_mp4_output_dir, file)
try:
shutil.copy2(source_path, dest_path)
file_info = {
"id": os.path.join(course_name, file),
"original_name": file,
"course_name": course_name,
"size": os.path.getsize(dest_path),
"path": dest_path,
"created_at": time.strftime("%Y-%m-%d %H:%M:%S")
}
mp4_files.append(file_info)
log_message(f"β
Extracted MP4: {file} -> {os.path.join(course_name, file)}")
except Exception as e:
log_message(f"β Failed to copy MP4 {file}: {e}")
# Process frame extraction for all MP4s in parallel
if mp4_files:
log_message(f"ποΈ Starting frame extraction for {len(mp4_files)} MP4 files...")
# Create frames directory for this course
frames_dir = os.path.join(MP4_OUTPUT_FOLDER, f"{course_name}_frames")
os.makedirs(frames_dir, exist_ok=True)
# Prepare arguments for frame extraction
extraction_args = [
(mp4["path"], frames_dir, 10) # 10 FPS
for mp4 in mp4_files
]
# Use multiprocessing for frame extraction
cpu_count = multiprocessing.cpu_count()
with multiprocessing.Pool(processes=cpu_count) as pool:
results = pool.map(frame_extractor.extract_frames_from_video, extraction_args)
# Log frame extraction results
total_frames = sum(count for count in results if count is not None)
log_message(f"ποΈ Extracted {total_frames} frames from {len(mp4_files)} videos using {cpu_count} CPU cores")
processing_status["extracted_courses"] += 1
processing_status["extracted_mp4s"] += len(mp4_files)
log_message(f"β
Successfully processed '{course_name}' - found {len(mp4_files)} MP4 files")
return mp4_files
except Exception as e:
error_msg = str(e)
log_message(f"β Processing failed: {error_msg}")
log_failed_file(filename, error_msg)
return []
finally:
processing_status["current_file"] = None
# Clean up extracted directory
if os.path.exists(extract_dir):
shutil.rmtree(extract_dir, ignore_errors=True)
def process_hf_files_background(start_index: int = 9, limit: int = DEFAULT_RAR_LIMIT):
"""Background task to process HuggingFace files"""
if not hf_api:
log_message("β HuggingFace API not configured (missing HF_TOKEN)")
return
processing_status["is_running"] = True
try:
# Load state
processed_rars = load_json_state(PROCESS_STATE_FILE, {"processed_rars": []})["processed_rars"]
download_state = load_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": 9})
# Use start_index if provided, otherwise use the saved state
current_index = start_index if start_index > 0 else download_state["next_download_index"]
log_message(f"π Starting processing from index {current_index} with a limit of {limit} files.")
log_message(f"π Previously processed: {len(processed_rars)} files")
# Get file list
try:
files = list(hf_api.list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset"))
rar_files = sorted([f for f in files if f.endswith(".rar")])
processing_status["total_files"] = len(rar_files)
log_message(f"π Found {len(rar_files)} RAR files in repository")
if current_index >= len(rar_files):
log_message("β
All files have been processed!")
return
except Exception as e:
log_message(f"β Failed to get file list: {str(e)}")
return
processed_count = 0
while processed_count < limit and current_index < len(rar_files) and processing_status["is_running"]:
rar_file = rar_files[current_index]
filename = os.path.basename(rar_file)
if filename in processed_rars:
log_message(f"βοΈ Skipping already processed: {filename}")
processing_status["processed_files"] += 1
current_index += 1
save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": current_index})
continue
log_message(f"π₯ Downloading: {filename}")
dest_path = os.path.join(DOWNLOAD_FOLDER, filename)
# Download file
download_url = f"https://huggingface.co/datasets/{SOURCE_REPO_ID}/resolve/main/{rar_file}"
if download_with_retry(download_url, dest_path):
# Process file
mp4_files = process_rar_file(dest_path)
if mp4_files:
processed_rars.append(filename)
save_json_state(PROCESS_STATE_FILE, {"processed_rars": processed_rars})
log_message(f"β
Successfully processed: {filename}")
processing_status["processed_files"] += 1
else:
log_message(f"β Failed to process: {filename}")
processing_status["failed_files"] += 1
# Clean up downloaded file
try:
os.remove(dest_path)
log_message(f"ποΈ Cleaned up download: {filename}")
except:
pass
else:
log_message(f"β Failed to download: {filename}")
processing_status["failed_files"] += 1
# Update download state for next run
current_index += 1
processed_count += 1
save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": current_index})
if current_index >= len(rar_files):
log_message("π All available RAR files have been processed!")
elif not processing_status["is_running"]:
log_message("βΉοΈ Processing stopped by request.")
else:
log_message(f"β
Processed {processed_count} RAR files. Next index to process: {current_index}")
except Exception as e:
log_message(f"β Fatal error in background processing: {str(e)}")
|