import os import json import requests import subprocess import shutil import time import re import threading from typing import Dict, List, Set, Optional, Any from huggingface_hub import HfApi, list_repo_files, CommitOperationAdd, hf_hub_download, hf_hub_url import cv2 import numpy as np from pathlib import Path import smtplib from email.message import EmailMessage # ==== CONFIGURATION ==== HF_TOKEN = os.getenv("HF_TOKEN", "") # Using provided token as fallback SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1") TARGET_REPO_ID = os.getenv("TARGET_REPO", "Fred808/BG3") # New target repo for uploads # Path Configuration DOWNLOAD_FOLDER = "downloads" EXTRACT_FOLDER = "extracted" FRAMES_OUTPUT_FOLDER = "extracted_frames" ZIP_OUTPUT_FOLDER = "zipped_frames" # New folder for zip files LOCAL_STATE_FOLDER = ".state" # Folder to temporarily store the downloaded state file os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) os.makedirs(EXTRACT_FOLDER, exist_ok=True) os.makedirs(FRAMES_OUTPUT_FOLDER, exist_ok=True) os.makedirs(ZIP_OUTPUT_FOLDER, exist_ok=True) # Create zip output folder os.makedirs(LOCAL_STATE_FOLDER, exist_ok=True) # State Files FAILED_FILES_LOG = "failed_files.log" HF_STATE_FILE = "processing_state2.json" # New remote state file name # Processing Parameters CHUNK_SIZE = 2 PROCESSING_DELAY = 2 MAX_RETRIES = 3 MIN_FREE_SPACE_GB = 2 # Minimum free space in GB before processing # Frame Extraction Parameters DEFAULT_FPS = 3 # Default frames per second for extraction # Initialize HF API hf_api = HfApi(token=HF_TOKEN) # Global State processing_status = { "is_running": False, "current_file": None, "total_files": 0, "processed_files": 0, "failed_files": 0, "extracted_courses": 0, "extracted_videos": 0, "last_update": None, "logs": [] } def log_message(message: str): """Log messages with timestamp""" timestamp = time.strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] {message}" print(log_entry) processing_status["logs"].append(log_entry) processing_status["last_update"] = timestamp if len(processing_status["logs"]) > 100: processing_status["logs"] = processing_status["logs"][-100:] def log_failed_file(filename: str, error: str): """Log failed files to persistent file""" with open(FAILED_FILES_LOG, "a") as f: f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {filename}: {error}\n") def get_disk_usage(path: str) -> Dict[str, float]: """Get disk usage statistics in GB""" statvfs = os.statvfs(path) total = statvfs.f_frsize * statvfs.f_blocks / (1024**3) free = statvfs.f_frsize * statvfs.f_bavail / (1024**3) used = total - free return {"total": total, "free": free, "used": used} def check_disk_space(path: str = ".") -> bool: """Check if there's enough disk space""" disk_info = get_disk_usage(path) if disk_info["free"] < MIN_FREE_SPACE_GB: log_message(f'โš ๏ธ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used') return False return True def cleanup_temp_files(): """Clean up temporary files to free space""" log_message("๐Ÿงน Cleaning up temporary files...") # Clean old downloads (keep only current processing file) current_file = processing_status.get("current_file") for file in os.listdir(DOWNLOAD_FOLDER): if file != current_file and file.endswith((".rar", ".zip")): try: os.remove(os.path.join(DOWNLOAD_FOLDER, file)) log_message(f"๐Ÿ—‘๏ธ Removed old download: {file}") except: pass def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]: """Load state from JSON file with migration logic for new structure.""" if os.path.exists(file_path): try: with open(file_path, "r") as f: data = json.load(f) # --- MIGRATION LOGIC --- # 1. Convert old "processed_rars" list to new "file_states" dictionary if "processed_rars" in data and isinstance(data["processed_rars"], list): log_message("โ„น๏ธ Migrating old 'processed_rars' list to new 'file_states' dictionary.") data["file_states"] = { filename: "processed" for filename in data.pop("processed_rars") } # 2. Ensure file_states exists and is a dict if "file_states" not in data or not isinstance(data["file_states"], dict): log_message("โ„น๏ธ Initializing 'file_states' dictionary.") data["file_states"] = {} # 3. Ensure next_download_index exists if "next_download_index" not in data: data["next_download_index"] = 0 return data except json.JSONDecodeError: log_message(f"โš ๏ธ Corrupted state file: {file_path}") return default_value def save_json_state(file_path: str, data: Dict[str, Any]): """Save state to JSON file""" with open(file_path, "w") as f: json.dump(data, f, indent=2) def download_hf_state(repo_id: str, filename: str) -> Dict[str, Any]: """Downloads the state file from Hugging Face or returns a default state.""" local_path = os.path.join(LOCAL_STATE_FOLDER, filename) # Changed default state to use 'file_states' for the new structure default_state = {"next_download_index": 0, "file_states": {}} try: # Check if the file exists in the repo first files = hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset") if filename not in files: log_message(f"โ„น๏ธ State file {filename} not found in {repo_id}. Starting from default state.") return default_state # Download the file hf_hub_download( repo_id=repo_id, filename=filename, repo_type="dataset", local_dir=LOCAL_STATE_FOLDER, local_dir_use_symlinks=False ) log_message(f"โœ… Successfully downloaded state file from {repo_id}.") # Use the modified load_json_state which handles migration return load_json_state(local_path, default_state) except Exception as e: log_message(f"โš ๏ธ Failed to download state file from Hugging Face: {str(e)}. Starting from default state.") return default_state def upload_hf_state(repo_id: str, filename: str, state: Dict[str, Any]) -> bool: """Uploads the state file to Hugging Face.""" local_path = os.path.join(LOCAL_STATE_FOLDER, filename) try: # 1. Save the updated state locally save_json_state(local_path, state) # 2. Upload the file hf_api.upload_file( path_or_fileobj=local_path, path_in_repo=filename, repo_id=repo_id, repo_type="dataset", commit_message=f"Update processing state: next_index={state['next_download_index']}" ) log_message(f"โœ… Successfully uploaded updated state file to {repo_id}") return True except Exception as e: log_message(f"โŒ Failed to upload state file to Hugging Face: {str(e)}") return False # --- NEW LOCKING FUNCTIONS --- def lock_file_for_processing(rar_filename: str, state: Dict[str, Any]) -> bool: """Marks a file as 'processing' in the state file and uploads the lock.""" log_message(f"๐Ÿ”’ Attempting to lock file: {rar_filename} (Marking as 'processing')") # Update state locally state["file_states"][rar_filename] = "processing" # Upload the updated state file immediately to establish the lock if upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state): log_message(f"โœ… Successfully locked file: {rar_filename}") return True else: log_message(f"โŒ Failed to upload lock for file: {rar_filename}. Aborting processing.") # Revert local state to avoid confusion if upload failed if rar_filename in state["file_states"]: del state["file_states"][rar_filename] return False def unlock_file_as_processed(rar_filename: str, state: Dict[str, Any], next_index: int) -> bool: """Marks a file as 'processed', updates the index, and uploads the state.""" log_message(f"๐Ÿ”“ Attempting to unlock file: {rar_filename} (Marking as 'processed')") # Update state locally state["file_states"][rar_filename] = "processed" state["next_download_index"] = next_index # Upload the updated state file if upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state): log_message(f"โœ… Successfully unlocked and marked as processed: {rar_filename}") return True else: log_message(f"โŒ Failed to upload final state for file: {rar_filename}. The file is processed locally but state is not updated.") return False # --- Original Utility Functions --- def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool: """Download file with retry logic and disk space checking""" if not check_disk_space(): cleanup_temp_files() if not check_disk_space(): log_message("โŒ Insufficient disk space even after cleanup") return False # NEW FIX: Ensure the directory structure exists before attempting to write the file try: os.makedirs(os.path.dirname(dest_path), exist_ok=True) except Exception as e: log_message(f"โŒ Failed to create directory for download path {os.path.dirname(dest_path)}: {str(e)}") return False headers = {"Authorization": f"Bearer {HF_TOKEN}"} for attempt in range(max_retries): try: with requests.get(url, headers=headers, stream=True) as r: r.raise_for_status() # Check content length if available content_length = r.headers.get("content-length") if content_length: size_gb = int(content_length) / (1024**3) disk_info = get_disk_usage(".") if size_gb > disk_info["free"] - 0.5: # Leave 0.5GB buffer log_message(f'โŒ File too large: {size_gb:.2f}GB, only {disk_info["free"]:.2f}GB free') return False with open(dest_path, "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) return True except Exception as e: if attempt < max_retries - 1: time.sleep(2 ** attempt) continue log_message(f"โŒ Download failed after {max_retries} attempts: {e}") return False return False def is_multipart_rar(filename: str) -> bool: """Check if this is a multi-part RAR file""" return ".part" in filename.lower() and filename.lower().endswith(".rar") def get_rar_part_base(filename: str) -> str: """Get the base name for multi-part RAR files""" if ".part" in filename.lower(): return filename.split(".part")[0] return filename.replace(".rar", "") def extract_with_retry(rar_path: str, output_dir: str, max_retries: int = 2) -> bool: """Extract RAR with retry and recovery, handling multi-part archives""" filename = os.path.basename(rar_path) log_message("๐Ÿ“ฆ Processing RAR file independently as requested by user.") for attempt in range(max_retries): try: # Use 'e' (extract) instead of 'x' (extract with full paths) and the -kb switch cmd = ["unrar", "e", "-o+", "-kb", rar_path, output_dir] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: log_message(f"โœ… Successfully extracted: {os.path.basename(rar_path)}") return True else: error_msg = result.stderr or result.stdout log_message(f"โš ๏ธ Extraction attempt {attempt + 1} failed: {error_msg}") # Check for the expected "Cannot find volume" error and treat it as a success if "Cannot find volume" in error_msg: log_message(f"โœ… Extracted all possible files from independent part (expected volume error).") return True if "checksum error" in error_msg.lower() or "CRC failed" in error_msg: log_message(f"โš ๏ธ Data corruption detected, attempt {attempt + 1}") elif result.returncode == 10: log_message(f"โš ๏ธ No files to extract (exit code 10)") elif result.returncode == 1: log_message(f"โš ๏ธ Non-fatal error (exit code 1)") except Exception as e: log_message(f"โŒ Extraction exception: {str(e)}") if attempt == max_retries - 1: return False time.sleep(1) return False # --- Frame Extraction Utilities --- def ensure_dir(path): os.makedirs(path, exist_ok=True) def extract_frames(video_path, output_dir, fps=DEFAULT_FPS): """Extract frames from video at the specified frames per second (fps).""" log_message(f"[INFO] Extracting frames from {video_path} to {output_dir} at {fps} fps...") ensure_dir(output_dir) cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): log_message(f"[ERROR] Failed to open video file: {video_path}") return 0 video_fps = cap.get(cv2.CAP_PROP_FPS) if not video_fps or video_fps <= 0: video_fps = 30 # fallback if FPS is not available log_message(f"[WARN] Using fallback FPS: {video_fps}") frame_interval = int(round(video_fps / fps)) frame_idx = 0 saved_idx = 1 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) log_message(f"[DEBUG] Total frames in video: {total_frames}") while cap.isOpened(): ret, frame = cap.read() if not ret: break if frame_idx % frame_interval == 0: frame_name = f"{saved_idx:04d}.png" cv2.imwrite(str(Path(output_dir) / frame_name), frame) saved_idx += 1 frame_idx += 1 cap.release() frame_count = saved_idx - 1 log_message(f"Extracted {frame_count} frames from {video_path} to {output_dir}") return frame_count def zip_frames_folder(frames_dir: str, output_folder: str) -> Optional[str]: """Zips the extracted frames folder.""" base_dir_name = os.path.basename(frames_dir) # The output path for the zip file (without .zip extension) zip_base_name = os.path.join(output_folder, base_dir_name) log_message(f"[INFO] Zipping frames from {frames_dir} to {zip_base_name}.zip...") try: archive_path = shutil.make_archive( base_name=zip_base_name, format='zip', root_dir=os.path.dirname(frames_dir), base_dir=base_dir_name ) log_message(f"โœ… Successfully created zip file: {archive_path}") return archive_path except Exception as e: log_message(f"โŒ Failed to create zip archive for {frames_dir}: {str(e)}") return None def upload_to_huggingface(file_path: str, repo_id: str, path_in_repo: str) -> bool: """Uploads a file to a Hugging Face dataset repository.""" filename = os.path.basename(file_path) log_message(f"[INFO] Uploading {filename} to {repo_id}/{path_in_repo}...") try: # Use HfApi.upload_file for a single file upload hf_api.upload_file( path_or_fileobj=file_path, path_in_repo=path_in_repo, repo_id=repo_id, repo_type="dataset", commit_message=f"Add extracted frames zip: {filename}" ) log_message(f"โœ… Successfully uploaded {filename} to {repo_id}") return True except Exception as e: log_message(f"โŒ Failed to upload {filename} to Hugging Face: {str(e)}") return False def process_rar_file(rar_path: str) -> bool: """Process a single RAR file - extract, then process videos for frames, zip, and upload""" filename = os.path.basename(rar_path) processing_status["current_file"] = filename # Handle multi-part RAR naming if is_multipart_rar(filename): course_name = get_rar_part_base(filename) else: course_name = filename.replace(".rar", "") extract_dir = os.path.join(EXTRACT_FOLDER, course_name) try: log_message(f"๐Ÿ”„ Processing: {filename}") # Extract RAR os.makedirs(extract_dir, exist_ok=True) if not extract_with_retry(rar_path, extract_dir): raise Exception("RAR extraction failed") # Count extracted files file_count = 0 video_files_found = [] for root, dirs, files in os.walk(extract_dir): for file in files: file_count += 1 if file.lower().endswith(('.mp4', '.avi', '.mov', '.mkv', '.webm')): video_files_found.append(os.path.join(root, file)) processing_status["extracted_courses"] += 1 log_message(f"โœ… Successfully extracted '{course_name}' ({file_count} files, {len(video_files_found)} videos)") # Process video files for frame extraction, zipping, and uploading for video_path in video_files_found: video_filename = Path(video_path).name # Unique output directory for frames frames_output_dir_name = f"{course_name}_{video_filename.replace('.', '_')}_frames" frames_output_dir = os.path.join( FRAMES_OUTPUT_FOLDER, frames_output_dir_name ) ensure_dir(frames_output_dir) # 1. Extract frames frame_count = extract_frames(video_path, frames_output_dir, fps=DEFAULT_FPS) processing_status["extracted_videos"] += 1 if frame_count == 0: log_message(f"โš ๏ธ No frames extracted from {video_filename}. Skipping zip/upload.") # Clean up empty directory shutil.rmtree(frames_output_dir, ignore_errors=True) continue else: log_message(f"โœ… {frame_count} frames extracted from {video_filename}") # 2. Zip the extracted frames zip_path = zip_frames_folder(frames_output_dir, ZIP_OUTPUT_FOLDER) if zip_path: # 3. Upload the zip file to Hugging Face path_in_repo = f"frames/{os.path.basename(zip_path)}" if upload_to_huggingface(zip_path, TARGET_REPO_ID, path_in_repo): log_message(f"โœ… Upload successful for {os.path.basename(zip_path)}") else: log_message(f"โŒ Upload failed for {os.path.basename(zip_path)}. Keeping file for manual inspection.") # 4. Clean up local zip and frame files after successful upload try: os.remove(zip_path) log_message(f"๐Ÿ—‘๏ธ Cleaned up local zip file: {os.path.basename(zip_path)}") shutil.rmtree(frames_output_dir, ignore_errors=True) log_message(f"๐Ÿ—‘๏ธ Cleaned up local frames directory: {frames_output_dir_name}") except Exception as e: log_message(f"โš ๏ธ Failed to clean up temporary files: {str(e)}") return True except Exception as e: error_msg = str(e) log_message(f"โŒ Processing failed: {error_msg}") log_failed_file(filename, error_msg) return False finally: processing_status["current_file"] = None def get_all_rar_files(repo_id: str) -> List[str]: """Fetches the list of all RAR files in the source repository.""" log_message(f"๐Ÿ” Fetching all files from source repo: {repo_id}") try: files = list(hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset")) rar_files = sorted([f for f in files if f.lower().endswith(".rar")]) log_message(f"Found {len(rar_files)} RAR files.") return rar_files except Exception as e: log_message(f"โŒ Failed to list files from source repo: {str(e)}") return [] def main_processing_loop(start_index: int = 0): """Main processing workflow with locking mechanism.""" processing_status["is_running"] = True log_message("๐Ÿš€ Starting main processing loop with locking mechanism...") try: # 1. Get the list of all RAR files from the source repo all_rar_files = get_all_rar_files(SOURCE_REPO_ID) if not all_rar_files: log_message("๐Ÿ›‘ No RAR files found in the source repository. Exiting.") return processing_status["total_files"] = len(all_rar_files) while True: # 2. Download the latest state file current_state = download_hf_state(TARGET_REPO_ID, HF_STATE_FILE) file_states = current_state.get("file_states", {}) # Use start_index if provided, otherwise use the remote state next_download_index = start_index if start_index > 0 else current_state.get("next_download_index", 0) # 3. Find the next *available* file to process target_file = None target_index = -1 # Check if we have processed all files if next_download_index >= len(all_rar_files): log_message("๐ŸŽ‰ All files have been processed! Exiting loop.") break for i in range(next_download_index, len(all_rar_files)): rar_filename = all_rar_files[i] state = file_states.get(rar_filename) if state in ["processed", "failed"]: # Already done (or failed and we are skipping it), skip to the next index continue if state == "processing": # This is the lock mechanism: another worker is currently processing this file log_message(f"โš ๏ธ File is currently 'processing' (locked): {rar_filename}. Skipping to next available file.") continue # If state is None (not in file_states), it's a new, available file target_file = rar_filename target_index = i break if target_file is None: log_message("๐ŸŽ‰ All available files have been processed or are currently locked. Exiting loop.") break log_message(f"โœ… Selected file for processing: {target_file} (Index: {target_index})") processing_status["current_file"] = target_file # 4. Acquire Lock: Update state to 'processing' and upload immediately if not lock_file_for_processing(target_file, current_state): log_message("โŒ Failed to acquire lock. Retrying loop to get latest state.") time.sleep(PROCESSING_DELAY) continue # 5. Perform the actual work rar_url = hf_hub_url(repo_id=SOURCE_REPO_ID, filename=target_file, repo_type="dataset") # The download path now includes the subdirectory from the Hugging Face repo local_rar_path = os.path.join(DOWNLOAD_FOLDER, target_file) success = False try: # Download the file if download_with_retry(rar_url, local_rar_path): # Process the file (extraction, frame processing, zipping, uploading results, etc.) if process_rar_file(local_rar_path): success = True log_message(f"โœ… Finished all processing steps for: {target_file}") else: log_message(f"โŒ Processing failed for: {target_file}") else: log_message(f"โŒ Download failed for: {target_file}") except Exception as e: log_message(f"๐Ÿ”ฅ An unhandled error occurred while processing {target_file}: {str(e)}") log_failed_file(target_file, str(e)) finally: # 6. Release Lock / Update State # The next index to check will be the one *after* the current file, regardless of success. next_index_to_save = target_index + 1 # Download the latest state again before final upload to ensure we don't overwrite # changes made by other workers in the meantime (e.g. if they processed a file # that was before this one in the queue). current_state = download_hf_state(TARGET_REPO_ID, HF_STATE_FILE) if success: # Mark as 'processed' and update the next_download_index unlock_file_as_processed(target_file, current_state, next_index_to_save) processing_status["processed_files"] += 1 else: # If processing failed, we still want to release the 'processing' lock, # but we mark it as 'failed' instead of 'processed' and still advance the index. log_message(f"โš ๏ธ Processing failed for {target_file}. Marking as 'failed' and advancing index.") current_state["file_states"][target_file] = "failed" current_state["next_download_index"] = next_index_to_save upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, current_state) processing_status["failed_files"] += 1 # Clean up local files if os.path.exists(local_rar_path): os.remove(local_rar_path) log_message(f"๐Ÿ—‘๏ธ Cleaned up local file: {local_rar_path}") # Wait a bit before checking for the next file to avoid hammering the HF API time.sleep(PROCESSING_DELAY) log_message("๐ŸŽ‰ Processing complete!") log_message(f'๐Ÿ“Š Final stats: {processing_status["extracted_courses"]} courses extracted, {processing_status["extracted_videos"]} videos processed, frames extracted') except KeyboardInterrupt: log_message("โน๏ธ Processing interrupted by user") except Exception as e: log_message(f"โŒ Fatal error: {str(e)}") finally: processing_status["is_running"] = False cleanup_temp_files() # Expose necessary functions and variables __all__ = [ "main_processing_loop", "processing_status", "log_message", "extract_frames", "DEFAULT_FPS", "ensure_dir" ]