Spaces:
Sleeping
Sleeping
File size: 16,933 Bytes
eb80f12 3648def eb80f12 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 |
import os
import json
import requests
import subprocess
import shutil
import time
import re
import threading
from typing import Dict, List, Set, Optional
from huggingface_hub import HfApi, list_repo_files
import cv2
import numpy as np
from pathlib import Path
import smtplib
from email.message import EmailMessage
# ==== CONFIGURATION ====
HF_TOKEN = os.getenv("HF_TOKEN", "")
SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1")
# Path Configuration
DOWNLOAD_FOLDER = "downloads"
EXTRACT_FOLDER = "extracted"
FRAMES_OUTPUT_FOLDER = "extracted_frames"
os.makedirs(DOWNLOAD_FOLDER, exist_ok=True)
os.makedirs(EXTRACT_FOLDER, exist_ok=True)
os.makedirs(FRAMES_OUTPUT_FOLDER, exist_ok=True)
# State Files
DOWNLOAD_STATE_FILE = "download_progress.json"
PROCESS_STATE_FILE = "process_progress.json"
FAILED_FILES_LOG = "failed_files.log"
# Processing Parameters
CHUNK_SIZE = 1
PROCESSING_DELAY = 2
MAX_RETRIES = 3
MIN_FREE_SPACE_GB = 2 # Minimum free space in GB before processing
# Frame Extraction Parameters
DEFAULT_FPS = 3 # Default frames per second for extraction
# Cursor Tracking Parameters
# Initialize HF API
hf_api = HfApi(token=HF_TOKEN)
# Global State
processing_status = {
"is_running": False,
"current_file": None,
"total_files": 0,
"processed_files": 0,
"failed_files": 0,
"extracted_courses": 0,
"extracted_videos": 0,
"last_update": None,
"logs": []
}
def log_message(message: str):
"""Log messages with timestamp"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
print(log_entry)
processing_status["logs"].append(log_entry)
processing_status["last_update"] = timestamp
if len(processing_status["logs"]) > 100:
processing_status["logs"] = processing_status["logs"][-100:]
def log_failed_file(filename: str, error: str):
"""Log failed files to persistent file"""
with open(FAILED_FILES_LOG, "a") as f:
f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {filename}: {error}\n")
def get_disk_usage(path: str) -> Dict[str, float]:
"""Get disk usage statistics in GB"""
statvfs = os.statvfs(path)
total = statvfs.f_frsize * statvfs.f_blocks / (1024**3)
free = statvfs.f_frsize * statvfs.f_bavail / (1024**3)
used = total - free
return {"total": total, "free": free, "used": used}
def check_disk_space(path: str = ".") -> bool:
"""Check if there's enough disk space"""
disk_info = get_disk_usage(path)
if disk_info["free"] < MIN_FREE_SPACE_GB:
log_message(f'β οΈ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used')
return False
return True
def cleanup_temp_files():
"""Clean up temporary files to free space"""
log_message("π§Ή Cleaning up temporary files...")
# Clean old downloads (keep only current processing file)
current_file = processing_status.get("current_file")
for file in os.listdir(DOWNLOAD_FOLDER):
if file != current_file and file.endswith((".rar", ".zip")):
try:
os.remove(os.path.join(DOWNLOAD_FOLDER, file))
log_message(f"ποΈ Removed old download: {file}")
except:
pass
def load_json_state(file_path: str, default_value):
"""Load state from JSON file"""
if os.path.exists(file_path):
try:
with open(file_path, "r") as f:
return json.load(f)
except json.JSONDecodeError:
log_message(f"β οΈ Corrupted state file: {file_path}")
return default_value
def save_json_state(file_path: str, data):
"""Save state to JSON file"""
with open(file_path, "w") as f:
json.dump(data, f, indent=2)
def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool:
"""Download file with retry logic and disk space checking"""
if not check_disk_space():
cleanup_temp_files()
if not check_disk_space():
log_message("β Insufficient disk space even after cleanup")
return False
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
for attempt in range(max_retries):
try:
with requests.get(url, headers=headers, stream=True) as r:
r.raise_for_status()
# Check content length if available
content_length = r.headers.get("content-length")
if content_length:
size_gb = int(content_length) / (1024**3)
disk_info = get_disk_usage(".")
if size_gb > disk_info["free"] - 0.5: # Leave 0.5GB buffer
log_message(f'β File too large: {size_gb:.2f}GB, only {disk_info["free"]:.2f}GB free')
return False
with open(dest_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
return True
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
log_message(f"β Download failed after {max_retries} attempts: {e}")
return False
return False
def is_multipart_rar(filename: str) -> bool:
"""Check if this is a multi-part RAR file"""
return ".part" in filename.lower() and filename.lower().endswith(".rar")
def get_rar_part_base(filename: str) -> str:
"""Get the base name for multi-part RAR files"""
if ".part" in filename.lower():
return filename.split(".part")[0]
return filename.replace(".rar", "")
def extract_with_retry(rar_path: str, output_dir: str, max_retries: int = 2) -> bool:
"""Extract RAR with retry and recovery, handling multi-part archives"""
filename = os.path.basename(rar_path)
# For multi-part RARs, we need the first part
if is_multipart_rar(filename):
base_name = get_rar_part_base(filename)
first_part = f"{base_name}.part01.rar"
first_part_path = os.path.join(os.path.dirname(rar_path), first_part)
if not os.path.exists(first_part_path):
log_message(f"β οΈ Multi-part RAR detected but first part not found: {first_part}")
return False
rar_path = first_part_path
log_message(f"π¦ Processing multi-part RAR starting with: {first_part}")
for attempt in range(max_retries):
try:
# Test RAR first
test_cmd = ["unrar", "t", rar_path]
test_result = subprocess.run(test_cmd, capture_output=True, text=True)
if test_result.returncode != 0:
log_message(f"β οΈ RAR test failed: {test_result.stderr}")
if attempt == max_retries - 1:
return False
continue
# Extract RAR
cmd = ["unrar", "x", "-o+", rar_path, output_dir]
if attempt > 0: # Try recovery on subsequent attempts
cmd.insert(2, "-kb")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
log_message(f"β
Successfully extracted: {os.path.basename(rar_path)}")
return True
else:
error_msg = result.stderr or result.stdout
log_message(f"β οΈ Extraction attempt {attempt + 1} failed: {error_msg}")
if "checksum error" in error_msg.lower() or "CRC failed" in error_msg:
log_message(f"β οΈ Data corruption detected, attempt {attempt + 1}")
elif result.returncode == 10:
log_message(f"β οΈ No files to extract (exit code 10)")
return False
elif result.returncode == 1:
log_message(f"β οΈ Non-fatal error (exit code 1)")
except Exception as e:
log_message(f"β Extraction exception: {str(e)}")
if attempt == max_retries - 1:
return False
time.sleep(1)
return False
# --- Frame Extraction Utilities ---
def ensure_dir(path):
os.makedirs(path, exist_ok=True)
def extract_frames(video_path, output_dir, fps=DEFAULT_FPS):
"""Extract frames from video at the specified frames per second (fps)."""
log_message(f"[INFO] Extracting frames from {video_path} to {output_dir} at {fps} fps...")
ensure_dir(output_dir)
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
log_message(f"[ERROR] Failed to open video file: {video_path}")
return 0
video_fps = cap.get(cv2.CAP_PROP_FPS)
# log_message(f"[DEBUG] Video FPS: {video_fps}")
if not video_fps or video_fps <= 0:
video_fps = 30 # fallback if FPS is not available
log_message(f"[WARN] Using fallback FPS: {video_fps}")
frame_interval = int(round(video_fps / fps))
# log_message(f"[DEBUG] Frame interval: {frame_interval}")
frame_idx = 0
saved_idx = 1
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
log_message(f"[DEBUG] Total frames in video: {total_frames}")
while cap.isOpened():
ret, frame = cap.read()
if not ret:
# log_message(f"[DEBUG] No more frames to read at frame_idx {frame_idx}.")
break
if frame_idx % frame_interval == 0:
frame_name = f"{saved_idx:04d}.png"
cv2.imwrite(str(Path(output_dir) / frame_name), frame)
# log_message(f"[DEBUG] Saved frame {frame_idx} as {frame_name}")
saved_idx += 1
frame_idx += 1
cap.release()
log_message(f"Extracted {saved_idx-1} frames from {video_path} to {output_dir}")
return saved_idx - 1
def process_rar_file(rar_path: str) -> bool:
"""Process a single RAR file - extract, then process videos for frames"""
filename = os.path.basename(rar_path)
processing_status["current_file"] = filename
# Handle multi-part RAR naming
if is_multipart_rar(filename):
course_name = get_rar_part_base(filename)
else:
course_name = filename.replace(".rar", "")
extract_dir = os.path.join(EXTRACT_FOLDER, course_name)
try:
log_message(f"π Processing: {filename}")
# Clean up any existing directory
if os.path.exists(extract_dir):
shutil.rmtree(extract_dir, ignore_errors=True)
# Extract RAR
os.makedirs(extract_dir, exist_ok=True)
if not extract_with_retry(rar_path, extract_dir):
raise Exception("RAR extraction failed")
# Count extracted files
file_count = 0
video_files_found = []
for root, dirs, files in os.walk(extract_dir):
for file in files:
file_count += 1
if file.lower().endswith((".mp4", ".avi", ".mov", ".mkv")):
video_files_found.append(os.path.join(root, file))
processing_status["extracted_courses"] += 1
log_message(f"β
Successfully extracted '{course_name}' ({file_count} files, {len(video_files_found)} videos)")
# Process video files for frame extraction
for video_path in video_files_found:
video_filename = Path(video_path).name
# Unique output directory for frames
frames_output_dir = os.path.join(
FRAMES_OUTPUT_FOLDER,
f"{course_name}_{video_filename.replace('.', '_')}_frames"
)
ensure_dir(frames_output_dir)
# π₯ Extract frames here
frame_count = extract_frames(video_path, frames_output_dir, fps=DEFAULT_FPS)
processing_status["extracted_videos"] += 1
if frame_count == 0:
log_message(f"β οΈ No frames extracted from {video_filename}")
else:
log_message(f"β
{frame_count} frames extracted from {video_filename}")
return True
except Exception as e:
error_msg = str(e)
log_message(f"β Processing failed: {error_msg}")
log_failed_file(filename, error_msg)
return False
finally:
processing_status["current_file"] = None
def main_processing_loop(start_index: int = 0):
"""Main processing workflow - extraction, frame extraction, and cursor tracking"""
processing_status["is_running"] = True
try:
# Load state
processed_rars = load_json_state(PROCESS_STATE_FILE, {"processed_rars": []})["processed_rars"]
download_state = load_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": 5})
# Use start_index if provided, otherwise use the saved state
next_index = start_index if start_index > 0 else download_state["next_download_index"]
log_message(f"π Starting from index {next_index}")
log_message(f"π Previously processed: {len(processed_rars)} files")
# Get file list
try:
files = list(hf_api.list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset"))
rar_files = sorted([f for f in files if f.endswith(".rar")])
processing_status["total_files"] = len(rar_files)
log_message(f"π Found {len(rar_files)} RAR files in repository")
if next_index >= len(rar_files):
log_message("β
All files have been processed!")
return
except Exception as e:
log_message(f"β Failed to get file list: {str(e)}")
return
# Process only one file per run
if next_index < len(rar_files):
rar_file = rar_files[next_index]
filename = os.path.basename(rar_file)
if filename in processed_rars:
log_message(f"βοΈ Skipping already processed: {filename}")
processing_status["processed_files"] += 1
# Move to next file
next_index += 1
save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": next_index})
log_message(f"π Moving to next file. Progress: {next_index}/{len(rar_files)}")
return
log_message(f"π₯ Downloading: {filename}")
dest_path = os.path.join(DOWNLOAD_FOLDER, filename)
# Download file
download_url = f"https://huggingface.co/datasets/{SOURCE_REPO_ID}/resolve/main/{rar_file}"
if download_with_retry(download_url, dest_path):
# Process file
if process_rar_file(dest_path):
processed_rars.append(filename)
save_json_state(PROCESS_STATE_FILE, {"processed_rars": processed_rars})
log_message(f"β
Successfully processed: {filename}")
processing_status["processed_files"] += 1
else:
log_message(f"β Failed to process: {filename}")
processing_status["failed_files"] += 1
# Clean up downloaded file
try:
os.remove(dest_path)
log_message(f"ποΈ Cleaned up download: {filename}")
except:
pass
else:
log_message(f"β Failed to download: {filename}")
processing_status["failed_files"] += 1
# Update download state for next run
next_index += 1
save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": next_index})
# Status update log_message(f"π Frames Extracted: {processing_status["extracted_frames_count"]}")
if next_index < len(rar_files):
log_message(f"π Run the script again to process the next file: {os.path.basename(rar_files[next_index])}")
else:
log_message("π All files have been processed!")
else:
log_message("β
All files have been processed!")
log_message("π Processing complete!")
log_message(f'π Final stats: {processing_status["extracted_courses"]} courses extracted, {processing_status["extracted_videos"]} videos processed, frames extracted')
except KeyboardInterrupt:
log_message("βΉοΈ Processing interrupted by user")
except Exception as e:
log_message(f"β Fatal error: {str(e)}")
finally:
processing_status["is_running"] = False
cleanup_temp_files()
# Expose necessary functions and variables for download_api.py
__all__ = [
"main_processing_loop",
"processing_status",
"log_message",
"extract_frames",
"DEFAULT_FPS",
"ensure_dir"
]
|