import os import json import requests import subprocess import shutil import time import threading import multiprocessing from typing import Dict, List, Optional from pathlib import Path from huggingface_hub import HfApi import uuid import frame_extractor # Our frame extraction module # ==== CONFIGURATION ==== HF_TOKEN = os.getenv("HF_TOKEN", "") SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1") # Directory Configuration UPLOAD_DIRECTORY = "./uploads" DOWNLOAD_FOLDER = "./downloads" EXTRACT_FOLDER = "./extracted" MP4_OUTPUT_FOLDER = "./mp4_files" # Create directories for directory in [UPLOAD_DIRECTORY, DOWNLOAD_FOLDER, EXTRACT_FOLDER, MP4_OUTPUT_FOLDER]: os.makedirs(directory, exist_ok=True) # State Files DOWNLOAD_STATE_FILE = "download_progress.json" PROCESS_STATE_FILE = "process_progress.json" FAILED_FILES_LOG = "failed_files.log" # Processing Parameters MAX_RETRIES = 3 MIN_FREE_SPACE_GB = 2 DEFAULT_RAR_LIMIT = 1 # Default number of RAR files to process # Initialize HF API hf_api = HfApi(token=HF_TOKEN) if HF_TOKEN else None # Global State processing_status = { "is_running": False, "current_file": None, "total_files": 0, "processed_files": 0, "failed_files": 0, "extracted_courses": 0, "extracted_mp4s": 0, "last_update": None, "logs": [] } # Store for uploaded MP4s with metadata (this will be managed by the API part, but needs to be accessible) uploaded_mp4s = {} def log_message(message: str): """Log messages with timestamp""" timestamp = time.strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] {message}" print(log_entry) processing_status["logs"].append(log_entry) processing_status["last_update"] = timestamp if len(processing_status["logs"]) > 100: processing_status["logs"] = processing_status["logs"][-100:] def log_failed_file(filename: str, error: str): """Log failed files to persistent file""" timestamp = time.strftime("%Y-%m-%d %H:%M:%S") with open(FAILED_FILES_LOG, "a") as f: f.write(f"{timestamp} - {filename}: {error}\n") def get_disk_usage(path: str) -> Dict[str, float]: """Get disk usage statistics in GB""" statvfs = os.statvfs(path) total = statvfs.f_frsize * statvfs.f_blocks / (1024**3) free = statvfs.f_frsize * statvfs.f_bavail / (1024**3) used = total - free return {"total": total, "free": free, "used": used} def check_disk_space(path: str = ".") -> bool: """Check if there\'s enough disk space""" disk_info = get_disk_usage(path) if disk_info["free"] < MIN_FREE_SPACE_GB: log_message(f'โš ๏ธ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used') return False return True def cleanup_temp_files(): """Clean up temporary files to free space""" log_message("๐Ÿงน Cleaning up temporary files...") # Clean old downloads (keep only current processing file) current_file = processing_status.get("current_file") for file in os.listdir(DOWNLOAD_FOLDER): if file != current_file and file.endswith((".rar", ".zip")): try: os.remove(os.path.join(DOWNLOAD_FOLDER, file)) log_message(f"๐Ÿ—‘๏ธ Removed old download: {file}") except: pass def load_json_state(file_path: str, default_value): """Load state from JSON file""" if os.path.exists(file_path): try: with open(file_path, "r") as f: return json.load(f) except json.JSONDecodeError: log_message(f"โš ๏ธ Corrupted state file: {file_path}") return default_value def save_json_state(file_path: str, data): """Save state to JSON file""" with open(file_path, "w") as f: json.dump(data, f, indent=2) def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool: """Download file with retry logic and disk space checking""" if not check_disk_space(): cleanup_temp_files() if not check_disk_space(): log_message("โŒ Insufficient disk space even after cleanup") return False headers = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} for attempt in range(max_retries): try: with requests.get(url, headers=headers, stream=True) as r: r.raise_for_status() # Check content length if available content_length = r.headers.get("content-length") if content_length: size_gb = int(content_length) / (1024**3) disk_info = get_disk_usage(".") if size_gb > disk_info["free"] - 0.5: # Leave 0.5GB buffer log_message(f'โŒ File too large: {size_gb:.2f}GB, only {disk_info["free"]:.2f}GB free') return False with open(dest_path, "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) return True except Exception as e: if attempt < max_retries - 1: time.sleep(2 ** attempt) continue log_message(f"โŒ Download failed after {max_retries} attempts: {e}") return False return False def is_multipart_rar(filename: str) -> bool: """Check if this is a multi-part RAR file""" return ".part" in filename.lower() and filename.lower().endswith(".rar") def get_rar_part_base(filename: str) -> str: """Get the base name for multi-part RAR files""" if ".part" in filename.lower(): return filename.split(".part")[0] return filename.replace(".rar", "") def extract_with_retry(rar_path: str, output_dir: str, max_retries: int = 2) -> bool: """Extract RAR with retry and recovery, handling multi-part archives""" filename = os.path.basename(rar_path) # For multi-part RARs, we need the first part if is_multipart_rar(filename): base_name = get_rar_part_base(filename) first_part = f"{base_name}.part01.rar" first_part_path = os.path.join(os.path.dirname(rar_path), first_part) if not os.path.exists(first_part_path): log_message(f"โš ๏ธ Multi-part RAR detected but first part not found: {first_part}") return False rar_path = first_part_path log_message(f"๐Ÿ“ฆ Processing multi-part RAR starting with: {first_part}") for attempt in range(max_retries): try: # Test RAR first test_cmd = ["unrar", "t", rar_path] test_result = subprocess.run(test_cmd, capture_output=True, text=True) if test_result.returncode != 0: log_message(f"โš ๏ธ RAR test failed: {test_result.stderr}") if attempt == max_retries - 1: return False continue # Extract RAR cmd = ["unrar", "x", "-o+", rar_path, output_dir] if attempt > 0: # Try recovery on subsequent attempts cmd.insert(2, "-kb") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: log_message(f"โœ… Successfully extracted: {os.path.basename(rar_path)}") return True else: error_msg = result.stderr or result.stdout log_message(f"โš ๏ธ Extraction attempt {attempt + 1} failed: {error_msg}") except Exception as e: log_message(f"โŒ Extraction exception: {str(e)}") if attempt == max_retries - 1: return False time.sleep(1) return False def process_rar_file(rar_path: str) -> List[Dict]: """Process a single RAR file - extract and find MP4 files""" filename = os.path.basename(rar_path) processing_status["current_file"] = filename # Handle multi-part RAR naming if is_multipart_rar(filename): course_name = get_rar_part_base(filename) else: course_name = filename.replace(".rar", "") # Create a unique directory for this course's extracted MP4s course_mp4_output_dir = os.path.join(MP4_OUTPUT_FOLDER, course_name) os.makedirs(course_mp4_output_dir, exist_ok=True) extract_dir = os.path.join(EXTRACT_FOLDER, course_name) mp4_files = [] try: log_message(f"๐Ÿ”„ Processing: {filename}") # Clean up any existing directory if os.path.exists(extract_dir): shutil.rmtree(extract_dir, ignore_errors=True) # Extract RAR os.makedirs(extract_dir, exist_ok=True) if not extract_with_retry(rar_path, extract_dir): raise Exception("RAR extraction failed") # Find and copy MP4 files for root, dirs, files in os.walk(extract_dir): for file in files: if file.lower().endswith(".mp4"): source_path = os.path.join(root, file) # Use original filename for MP4 output within the course directory dest_path = os.path.join(course_mp4_output_dir, file) try: shutil.copy2(source_path, dest_path) file_info = { "id": os.path.join(course_name, file), "original_name": file, "course_name": course_name, "size": os.path.getsize(dest_path), "path": dest_path, "created_at": time.strftime("%Y-%m-%d %H:%M:%S") } mp4_files.append(file_info) log_message(f"โœ… Extracted MP4: {file} -> {os.path.join(course_name, file)}") except Exception as e: log_message(f"โŒ Failed to copy MP4 {file}: {e}") # Process frame extraction for all MP4s in parallel if mp4_files: log_message(f"๐ŸŽž๏ธ Starting frame extraction for {len(mp4_files)} MP4 files...") # Create frames directory for this course frames_dir = os.path.join(MP4_OUTPUT_FOLDER, f"{course_name}_frames") os.makedirs(frames_dir, exist_ok=True) # Prepare arguments for frame extraction extraction_args = [ (mp4["path"], frames_dir, 10) # 10 FPS for mp4 in mp4_files ] # Use multiprocessing for frame extraction cpu_count = multiprocessing.cpu_count() with multiprocessing.Pool(processes=cpu_count) as pool: results = pool.map(frame_extractor.extract_frames_from_video, extraction_args) # Log frame extraction results total_frames = sum(count for count in results if count is not None) log_message(f"๐ŸŽž๏ธ Extracted {total_frames} frames from {len(mp4_files)} videos using {cpu_count} CPU cores") processing_status["extracted_courses"] += 1 processing_status["extracted_mp4s"] += len(mp4_files) log_message(f"โœ… Successfully processed '{course_name}' - found {len(mp4_files)} MP4 files") return mp4_files except Exception as e: error_msg = str(e) log_message(f"โŒ Processing failed: {error_msg}") log_failed_file(filename, error_msg) return [] finally: processing_status["current_file"] = None # Clean up extracted directory if os.path.exists(extract_dir): shutil.rmtree(extract_dir, ignore_errors=True) def process_hf_files_background(start_index: int = 9, limit: int = DEFAULT_RAR_LIMIT): """Background task to process HuggingFace files""" if not hf_api: log_message("โŒ HuggingFace API not configured (missing HF_TOKEN)") return processing_status["is_running"] = True try: # Load state processed_rars = load_json_state(PROCESS_STATE_FILE, {"processed_rars": []})["processed_rars"] download_state = load_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": 9}) # Use start_index if provided, otherwise use the saved state current_index = start_index if start_index > 0 else download_state["next_download_index"] log_message(f"๐Ÿ“Š Starting processing from index {current_index} with a limit of {limit} files.") log_message(f"๐Ÿ“Š Previously processed: {len(processed_rars)} files") # Get file list try: files = list(hf_api.list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset")) rar_files = sorted([f for f in files if f.endswith(".rar")]) processing_status["total_files"] = len(rar_files) log_message(f"๐Ÿ“ Found {len(rar_files)} RAR files in repository") if current_index >= len(rar_files): log_message("โœ… All files have been processed!") return except Exception as e: log_message(f"โŒ Failed to get file list: {str(e)}") return processed_count = 0 while processed_count < limit and current_index < len(rar_files) and processing_status["is_running"]: rar_file = rar_files[current_index] filename = os.path.basename(rar_file) if filename in processed_rars: log_message(f"โญ๏ธ Skipping already processed: {filename}") processing_status["processed_files"] += 1 current_index += 1 save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": current_index}) continue log_message(f"๐Ÿ“ฅ Downloading: {filename}") dest_path = os.path.join(DOWNLOAD_FOLDER, filename) # Download file download_url = f"https://huggingface.co/datasets/{SOURCE_REPO_ID}/resolve/main/{rar_file}" if download_with_retry(download_url, dest_path): # Process file mp4_files = process_rar_file(dest_path) if mp4_files: processed_rars.append(filename) save_json_state(PROCESS_STATE_FILE, {"processed_rars": processed_rars}) log_message(f"โœ… Successfully processed: {filename}") processing_status["processed_files"] += 1 else: log_message(f"โŒ Failed to process: {filename}") processing_status["failed_files"] += 1 # Clean up downloaded file try: os.remove(dest_path) log_message(f"๐Ÿ—‘๏ธ Cleaned up download: {filename}") except: pass else: log_message(f"โŒ Failed to download: {filename}") processing_status["failed_files"] += 1 # Update download state for next run current_index += 1 processed_count += 1 save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": current_index}) if current_index >= len(rar_files): log_message("๐ŸŽ‰ All available RAR files have been processed!") elif not processing_status["is_running"]: log_message("โน๏ธ Processing stopped by request.") else: log_message(f"โœ… Processed {processed_count} RAR files. Next index to process: {current_index}") except Exception as e: log_message(f"โŒ Fatal error in background processing: {str(e)}")