Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import requests | |
| import subprocess | |
| import shutil | |
| import time | |
| import re | |
| import threading | |
| from typing import Dict, List, Set, Optional, Any | |
| from huggingface_hub import HfApi, list_repo_files, CommitOperationAdd, hf_hub_download, hf_hub_url | |
| import cv2 | |
| import numpy as np | |
| from pathlib import Path | |
| import smtplib | |
| from email.message import EmailMessage | |
| # ==== CONFIGURATION ==== | |
| HF_TOKEN = os.getenv("HF_TOKEN", "") | |
| SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "samfred2/TGFiles") | |
| TARGET_REPO_ID = os.getenv("TARGET_REPO", "samfred2/BG4") # New target repo for uploads | |
| # Path Configuration | |
| DOWNLOAD_FOLDER = "downloads" | |
| EXTRACT_FOLDER = "extracted" | |
| FRAMES_OUTPUT_FOLDER = "extracted_frames" | |
| ZIP_OUTPUT_FOLDER = "zipped_frames" # New folder for zip files | |
| LOCAL_STATE_FOLDER = ".state" # Folder to temporarily store the downloaded state file | |
| os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(EXTRACT_FOLDER, exist_ok=True) | |
| os.makedirs(FRAMES_OUTPUT_FOLDER, exist_ok=True) | |
| os.makedirs(ZIP_OUTPUT_FOLDER, exist_ok=True) # Create zip output folder | |
| os.makedirs(LOCAL_STATE_FOLDER, exist_ok=True) | |
| # State Files | |
| FAILED_FILES_LOG = "failed_files.log" | |
| HF_STATE_FILE = "processing_state2.json" # New remote state file name | |
| # Processing Parameters | |
| CHUNK_SIZE = 2 | |
| PROCESSING_DELAY = 2 | |
| MAX_RETRIES = 3 | |
| MIN_FREE_SPACE_GB = 2 # Minimum free space in GB before processing | |
| # Frame Extraction Parameters | |
| DEFAULT_FPS = 3 # Default frames per second for extraction | |
| # Initialize HF API | |
| hf_api = HfApi(token=HF_TOKEN) | |
| # Global State | |
| processing_status = { | |
| "is_running": False, | |
| "current_file": None, | |
| "total_files": 0, | |
| "processed_files": 0, | |
| "failed_files": 0, | |
| "extracted_courses": 0, | |
| "extracted_videos": 0, | |
| "last_update": None, | |
| "logs": [] | |
| } | |
| def log_message(message: str, level: str = "INFO"): | |
| """Log messages with timestamp""" | |
| timestamp = time.strftime("%Y-%m-%d %H:%M:%S") | |
| log_entry = f"[{timestamp}] {level}: {message}" | |
| print(log_entry) | |
| processing_status["logs"].append(log_entry) | |
| processing_status["last_update"] = timestamp | |
| if len(processing_status["logs"]) > 100: | |
| processing_status["logs"] = processing_status["logs"][-100:] | |
| def log_failed_file(filename: str, error: str): | |
| """Log failed files to persistent file""" | |
| with open(FAILED_FILES_LOG, "a") as f: | |
| f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {filename}: {error}\n") | |
| def get_disk_usage(path: str) -> Dict[str, float]: | |
| """Get disk usage statistics in GB""" | |
| statvfs = os.statvfs(path) | |
| total = statvfs.f_frsize * statvfs.f_blocks / (1024**3) | |
| free = statvfs.f_frsize * statvfs.f_bavail / (1024**3) | |
| used = total - free | |
| return {"total": total, "free": free, "used": used} | |
| def check_disk_space(path: str = ".") -> bool: | |
| """Check if there's enough disk space""" | |
| disk_info = get_disk_usage(path) | |
| if disk_info["free"] < MIN_FREE_SPACE_GB: | |
| log_message(f'โ ๏ธ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used') | |
| return False | |
| return True | |
| def cleanup_temp_files(): | |
| """Clean up temporary files to free space""" | |
| log_message("๐งน Cleaning up temporary files...", "INFO") | |
| # Clean old downloads (keep only current processing file) | |
| current_file = processing_status.get("current_file") | |
| for file in os.listdir(DOWNLOAD_FOLDER): | |
| if file != current_file and file.endswith((".rar", ".zip")): | |
| try: | |
| os.remove(os.path.join(DOWNLOAD_FOLDER, file)) | |
| log_message(f"๐๏ธ Removed old download: {file}", "INFO") | |
| except: | |
| pass | |
| def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]: | |
| """Load state from JSON file with migration logic for new structure.""" | |
| if os.path.exists(file_path): | |
| try: | |
| with open(file_path, "r") as f: | |
| data = json.load(f) | |
| # --- MIGRATION LOGIC --- | |
| # 1. Convert old "processed_rars" list to new "file_states" dictionary | |
| if "processed_rars" in data and isinstance(data["processed_rars"], list): | |
| log_message("โน๏ธ Migrating old 'processed_rars' list to new 'file_states' dictionary.", "INFO") | |
| data["file_states"] = { | |
| filename: "processed" for filename in data.pop("processed_rars") | |
| } | |
| # 2. Ensure file_states exists and is a dict | |
| if "file_states" not in data or not isinstance(data["file_states"], dict): | |
| log_message("โน๏ธ Initializing 'file_states' dictionary.", "INFO") | |
| data["file_states"] = {} | |
| # 3. Ensure next_download_index exists | |
| if "next_download_index" not in data: | |
| data["next_download_index"] = 0 | |
| return data | |
| except json.JSONDecodeError: | |
| log_message(f"โ ๏ธ Corrupted state file: {file_path}", "WARNING") | |
| return default_value | |
| def save_json_state(file_path: str, data: Dict[str, Any]): | |
| """Save state to JSON file""" | |
| with open(file_path, "w") as f: | |
| json.dump(data, f, indent=2) | |
| def download_hf_state(repo_id: str, filename: str) -> Dict[str, Any]: | |
| """Downloads the state file from Hugging Face or returns a default state.""" | |
| local_path = os.path.join(LOCAL_STATE_FOLDER, filename) | |
| # Changed default state to use 'file_states' for the new structure | |
| default_state = {"next_download_index": 0, "file_states": {}} | |
| try: | |
| # Check if the file exists in the repo first | |
| files = hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset") | |
| if filename not in files: | |
| log_message(f"โน๏ธ State file {filename} not found in {repo_id}. Starting from default state.", "INFO") | |
| return default_state | |
| # Download the file | |
| hf_hub_download( | |
| repo_id=repo_id, | |
| filename=filename, | |
| repo_type="dataset", | |
| local_dir=LOCAL_STATE_FOLDER, | |
| local_dir_use_symlinks=False | |
| ) | |
| log_message(f"โ Successfully downloaded state file from {repo_id}.", "INFO") | |
| # Use the modified load_json_state which handles migration | |
| return load_json_state(local_path, default_state) | |
| except Exception as e: | |
| log_message(f"โ ๏ธ Failed to download state file from Hugging Face: {str(e)}. Starting from default state.", "WARNING") | |
| return default_state | |
| def upload_hf_state(repo_id: str, filename: str, state: Dict[str, Any]) -> bool: | |
| """Uploads the state file to Hugging Face.""" | |
| local_path = os.path.join(LOCAL_STATE_FOLDER, filename) | |
| try: | |
| # 1. Save the updated state locally | |
| save_json_state(local_path, state) | |
| # 2. Upload the file | |
| hf_api.upload_file( | |
| path_or_fileobj=local_path, | |
| path_in_repo=filename, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| commit_message=f"Update processing state: next_index={state['next_download_index']}" | |
| ) | |
| log_message(f"โ Successfully uploaded updated state file to {repo_id}", "INFO") | |
| return True | |
| except Exception as e: | |
| log_message(f"โ Failed to upload state file to Hugging Face: {str(e)}", "ERROR") | |
| return False | |
| # --- NEW LOCKING FUNCTIONS --- | |
| def lock_file_for_processing(rar_filename: str, state: Dict[str, Any]) -> bool: | |
| """Marks a file as 'processing' in the state file and uploads the lock.""" | |
| log_message(f"๐ Attempting to lock file: {rar_filename} (Marking as 'processing')", "INFO") | |
| # Update state locally | |
| state["file_states"][rar_filename] = "processing" | |
| # Upload the updated state file immediately to establish the lock | |
| if upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state): | |
| log_message(f"โ Successfully locked file: {rar_filename}", "INFO") | |
| return True | |
| else: | |
| log_message(f"โ Failed to upload lock for file: {rar_filename}. Aborting processing.", "ERROR") | |
| # Revert local state to avoid confusion if upload failed | |
| if rar_filename in state["file_states"]: | |
| del state["file_states"][rar_filename] | |
| return False | |
| def unlock_file_as_processed(rar_filename: str, state: Dict[str, Any], next_index: int) -> bool: | |
| """Marks a file as 'processed', updates the index, and uploads the state.""" | |
| log_message(f"๐ Attempting to unlock file: {rar_filename} (Marking as 'processed')", "INFO") | |
| # Update state locally | |
| state["file_states"][rar_filename] = "processed" | |
| state["next_download_index"] = next_index | |
| # Upload the updated state file | |
| if upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state): | |
| log_message(f"โ Successfully unlocked and marked as processed: {rar_filename}", "INFO") | |
| return True | |
| else: | |
| log_message(f"โ Failed to upload final state for file: {rar_filename}. The file is processed locally but state is not updated.", "ERROR") | |
| return False | |
| # --- Original Utility Functions --- | |
| def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool: | |
| """Download file with retry logic and disk space checking""" | |
| if not check_disk_space(): | |
| cleanup_temp_files() | |
| if not check_disk_space(): | |
| log_message("โ Insufficient disk space even after cleanup", "ERROR") | |
| return False | |
| # NEW FIX: Ensure the directory structure exists before attempting to write the file | |
| try: | |
| os.makedirs(os.path.dirname(dest_path), exist_ok=True) | |
| except Exception as e: | |
| log_message(f"โ Failed to create directory for download path {os.path.dirname(dest_path)}: {str(e)}", "ERROR") | |
| return False | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| for attempt in range(max_retries): | |
| try: | |
| with requests.get(url, headers=headers, stream=True) as r: | |
| r.raise_for_status() | |
| # Check content length if available | |
| content_length = r.headers.get("content-length") | |
| if content_length: | |
| size_gb = int(content_length) / (1024**3) | |
| disk_info = get_disk_usage(".") | |
| # Check if there is enough space for the full download | |
| if disk_info["free"] < size_gb + MIN_FREE_SPACE_GB: | |
| log_message(f"โ ๏ธ Not enough space for download ({size_gb:.2f}GB required). Freeing space...", "WARNING") | |
| cleanup_temp_files() | |
| disk_info = get_disk_usage(".") | |
| if disk_info["free"] < size_gb + MIN_FREE_SPACE_GB: | |
| log_message(f"โ Still not enough space for download. Required: {size_gb + MIN_FREE_SPACE_GB:.2f}GB, Available: {disk_info['free']:.2f}GB", "ERROR") | |
| return False | |
| # Download the file chunk by chunk | |
| with open(dest_path, "wb") as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| if chunk: # filter out keep-alive new chunks | |
| f.write(chunk) | |
| log_message(f"โ Download successful: {dest_path}", "INFO") | |
| return True | |
| except requests.exceptions.RequestException as e: | |
| log_message(f"โ Download attempt {attempt + 1} failed for {url}: {str(e)}", "WARNING") | |
| time.sleep(PROCESSING_DELAY) | |
| except Exception as e: | |
| log_message(f"โ An unexpected error occurred during download: {str(e)}", "ERROR") | |
| return False | |
| log_message(f"โ Failed to download {url} after {max_retries} attempts.", "ERROR") | |
| return False | |
| def extract_rar(rar_path: str, extract_path: str) -> bool: | |
| """Extracts a RAR file using unrar (requires unrar to be installed).""" | |
| log_message(f"๐ฆ Attempting to extract RAR: {rar_path} to {extract_path}", "INFO") | |
| # Helper to run a command and return (success, completed_process_or_exception) | |
| def _run(cmd): | |
| try: | |
| proc = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| return True, proc | |
| except subprocess.CalledProcessError as e: | |
| return False, e | |
| try: | |
| # Create the extraction directory if it doesn't exist | |
| os.makedirs(extract_path, exist_ok=True) | |
| # First try a normal full extraction | |
| command = ["unrar", "x", "-o+", "-y", rar_path, extract_path] | |
| ok, result = _run(command) | |
| if ok: | |
| # Successful full extraction | |
| if hasattr(result, 'stdout') and "All OK" not in result.stdout: | |
| log_message(f"โ ๏ธ RAR extraction finished with warnings/non-fatal errors for {rar_path}: {result.stdout}", "WARNING") | |
| log_message(f"โ Successfully extracted RAR: {rar_path}", "INFO") | |
| return True | |
| # If full extraction failed, inspect the error to see if it's a multipart/volume dependency | |
| stderr = '' | |
| if isinstance(result, subprocess.CalledProcessError): | |
| stderr = (result.stderr or '') | |
| # Common message when a previous volume is required | |
| if "start extraction from a previous volume" in stderr.lower() or "previous volume" in stderr.lower() or "you need to start extraction" in stderr.lower(): | |
| log_message(f"โ ๏ธ Full extraction failed due to multipart volume dependency for {rar_path}. Will attempt per-file extraction fallback.", "WARNING") | |
| # Attempt to list files contained in this archive volume | |
| list_cmd = ["unrar", "lb", rar_path] | |
| ok_list, list_result = _run(list_cmd) | |
| if not ok_list: | |
| log_message(f"โ Failed to list archive contents for {rar_path}: {(list_result.stderr if isinstance(list_result, subprocess.CalledProcessError) else str(list_result))}", "ERROR") | |
| return False | |
| file_list = [ln.strip() for ln in (list_result.stdout or '').splitlines() if ln.strip()] | |
| if not file_list: | |
| log_message(f"โ ๏ธ Archive {rar_path} appears empty or listing failed. Cannot extract.", "WARNING") | |
| return False | |
| extracted_any = False | |
| # Try to extract each file individually; skip files that require previous volumes | |
| for member in file_list: | |
| # Use 'unrar x <archive> <member> <dest>' to extract a specific file | |
| cmd = ["unrar", "x", "-o+", "-y", rar_path, member, extract_path] | |
| ok_member, member_result = _run(cmd) | |
| if ok_member: | |
| extracted_any = True | |
| log_message(f"โ Extracted member {member} from {rar_path}", "INFO") | |
| else: | |
| # If this member failed due to missing previous volume, log and continue | |
| member_err = '' | |
| if isinstance(member_result, subprocess.CalledProcessError): | |
| member_err = (member_result.stderr or '') | |
| log_message(f"โ ๏ธ Could not extract member {member} from {rar_path}: {member_err.strip()}", "WARNING") | |
| if extracted_any: | |
| log_message(f"โ Finished partial extraction from {rar_path} (some members extracted)", "INFO") | |
| return True | |
| else: | |
| log_message(f"โ No members could be extracted from {rar_path} independently.", "ERROR") | |
| return False | |
| # Otherwise, full extraction failed for another reason | |
| log_message(f"โ RAR extraction failed for {rar_path}. Error: {stderr}", "ERROR") | |
| return False | |
| except FileNotFoundError: | |
| log_message("โ 'unrar' command not found. Please ensure 'unrar' is installed.", "ERROR") | |
| return False | |
| except Exception as e: | |
| log_message(f"โ An unexpected error occurred during RAR extraction: {str(e)}", "ERROR") | |
| return False | |
| def extract_frames(video_path: str, output_dir: str, fps: int = DEFAULT_FPS) -> bool: | |
| """Extracts frames from a video file at a specified FPS.""" | |
| log_message(f"๐ฌ Extracting frames from {video_path} at {fps} FPS to {output_dir}", "INFO") | |
| try: | |
| # Create output directory | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Open the video file | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| log_message(f"โ Error opening video file: {video_path}", "ERROR") | |
| return False | |
| # Get video properties | |
| video_fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Calculate the frame interval for the desired FPS | |
| if video_fps == 0: | |
| log_message(f"โ ๏ธ Video FPS is 0, cannot extract frames: {video_path}", "WARNING") | |
| return False | |
| frame_interval = int(round(video_fps / fps)) | |
| if frame_interval < 1: | |
| frame_interval = 1 # Extract every frame if video FPS is lower than desired FPS | |
| log_message(f"Video FPS: {video_fps:.2f}, Total Frames: {frame_count}, Extraction Interval: {frame_interval} frames", "INFO") | |
| frame_number = 0 | |
| extracted_count = 0 | |
| while True: | |
| # Set the frame position | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) | |
| # Read the frame | |
| ret, frame = cap.read() | |
| # Break the loop if reading failed or end of video | |
| if not ret: | |
| break | |
| # Construct the output filename | |
| output_filename = os.path.join(output_dir, f"frame_{frame_number:06d}.jpg") | |
| # Save the frame | |
| cv2.imwrite(output_filename, frame, [cv2.IMWRITE_JPEG_QUALITY, 95]) | |
| extracted_count += 1 | |
| # Advance the frame number by the interval | |
| frame_number += frame_interval | |
| # Break if we've gone past the total frame count | |
| if frame_number >= frame_count: | |
| break | |
| cap.release() | |
| log_message(f"โ Finished extracting {extracted_count} frames from {video_path}", "INFO") | |
| return True | |
| except Exception as e: | |
| log_message(f"โ An unexpected error occurred during frame extraction for {video_path}: {str(e)}", "ERROR") | |
| return False | |
| def zip_directory(source_dir: str, output_zip_path: str) -> bool: | |
| """Compresses a directory into a zip file.""" | |
| log_message(f"๐๏ธ Zipping directory: {source_dir} to {output_zip_path}", "INFO") | |
| try: | |
| # shutil.make_archive creates a zip file (without the .zip extension) | |
| # We need to pass the base name without the extension | |
| base_name = os.path.splitext(output_zip_path)[0] | |
| root_dir = os.path.dirname(source_dir) | |
| base_dir = os.path.basename(source_dir) | |
| shutil.make_archive(base_name, 'zip', root_dir, base_dir) | |
| log_message(f"โ Successfully created zip file: {output_zip_path}", "INFO") | |
| return True | |
| except Exception as e: | |
| log_message(f"โ Failed to create zip file from {source_dir}: {str(e)}", "ERROR") | |
| return False | |
| def upload_folder_to_hf(folder_path: str, repo_id: str, path_in_repo: str, commit_message: str) -> bool: | |
| """Uploads an entire folder's contents to a Hugging Face repository.""" | |
| log_message(f"โฌ๏ธ Uploading folder {folder_path} to {repo_id}/{path_in_repo}", "INFO") | |
| try: | |
| # Collect all files to be uploaded | |
| operations = [] | |
| for root, _, files in os.walk(folder_path): | |
| for file in files: | |
| local_path = os.path.join(root, file) | |
| # Calculate the path inside the repository | |
| relative_path = os.path.relpath(local_path, folder_path) | |
| repo_path = os.path.join(path_in_repo, relative_path) | |
| operations.append( | |
| CommitOperationAdd(path_in_repo=repo_path, path_or_fileobj=local_path) | |
| ) | |
| if not operations: | |
| log_message(f"โ ๏ธ Folder {folder_path} is empty. Skipping upload.", "WARNING") | |
| return True # Consider an empty folder upload successful | |
| # Perform the upload | |
| hf_api.create_commit( | |
| repo_id=repo_id, | |
| operations=operations, | |
| commit_message=commit_message, | |
| repo_type="dataset" | |
| ) | |
| log_message(f"โ Successfully uploaded {len(operations)} files from {folder_path}", "INFO") | |
| return True | |
| except Exception as e: | |
| log_message(f"โ Failed to upload folder {folder_path} to Hugging Face: {str(e)}", "ERROR") | |
| return False | |
| def process_rar_file(rar_path: str) -> bool: | |
| """ | |
| Main processing logic for a single RAR file: | |
| 1. Extract RAR | |
| 2. Find video files | |
| 3. Extract frames from each video | |
| 4. Zip the frames folder | |
| 5. Upload the zip file to HF | |
| 6. Clean up local files | |
| """ | |
| rar_filename = os.path.basename(rar_path) | |
| base_name = os.path.splitext(rar_filename)[0] | |
| # 1. Extract RAR | |
| extract_dir = os.path.join(EXTRACT_FOLDER, base_name) | |
| if not extract_rar(rar_path, extract_dir): | |
| log_failed_file(rar_filename, "RAR extraction failed") | |
| return False | |
| video_files = [] | |
| # Search for common video extensions recursively | |
| for ext in ['*.mp4', '*.mkv', '*.avi', '*.mov', '*.webm']: | |
| video_files.extend(Path(extract_dir).rglob(ext)) | |
| if not video_files: | |
| log_message(f"โ ๏ธ No video files found in extracted content for {rar_filename}", "WARNING") | |
| # Clean up the extracted folder | |
| shutil.rmtree(extract_dir, ignore_errors=True) | |
| log_message(f"๐๏ธ Cleaned up extracted folder: {extract_dir}", "INFO") | |
| log_failed_file(rar_filename, "No video files found") | |
| return False | |
| success_count = 0 | |
| for video_path_obj in video_files: | |
| video_path = str(video_path_obj) | |
| video_filename = video_path_obj.name | |
| video_base_name = os.path.splitext(video_filename)[0] | |
| # Create a unique output folder for the frames of this video | |
| frames_output_dir = os.path.join(FRAMES_OUTPUT_FOLDER, base_name, video_base_name) | |
| # 3. Extract frames | |
| if extract_frames(video_path, frames_output_dir): | |
| # 4. Zip the frames folder | |
| zip_filename = f"{base_name}_{video_base_name}_frames.zip" | |
| zip_output_path = os.path.join(ZIP_OUTPUT_FOLDER, zip_filename) | |
| if zip_directory(frames_output_dir, zip_output_path): | |
| # 5. Upload the zip file to HF | |
| path_in_repo = f"frames_zips/{zip_filename}" | |
| commit_message = f"Add frames zip for video: {video_filename} from archive: {rar_filename}" | |
| # We use hf_api.upload_file for single file upload | |
| try: | |
| hf_api.upload_file( | |
| path_or_fileobj=zip_output_path, | |
| path_in_repo=path_in_repo, | |
| repo_id=TARGET_REPO_ID, | |
| repo_type="dataset", | |
| commit_message=commit_message | |
| ) | |
| log_message(f"โ Successfully uploaded zip: {zip_filename}", "INFO") | |
| success_count += 1 | |
| processing_status["extracted_videos"] += 1 | |
| # Clean up the zip file after successful upload | |
| os.remove(zip_output_path) | |
| log_message(f"๐๏ธ Cleaned up local zip file: {zip_output_path}", "INFO") | |
| except Exception as e: | |
| log_message(f"โ Failed to upload zip file {zip_output_path}: {str(e)}", "ERROR") | |
| log_failed_file(rar_filename, f"Failed to upload zip for {video_filename}: {str(e)}") | |
| else: | |
| log_failed_file(rar_filename, f"Failed to zip frames for {video_filename}") | |
| else: | |
| log_failed_file(rar_filename, f"Failed to extract frames from {video_filename}") | |
| # Clean up the frames output directory for this video | |
| shutil.rmtree(frames_output_dir, ignore_errors=True) | |
| log_message(f"๐๏ธ Cleaned up frames folder: {frames_output_dir}", "INFO") | |
| # 6. Clean up the extracted folder | |
| shutil.rmtree(extract_dir, ignore_errors=True) | |
| log_message(f"๐๏ธ Cleaned up extracted folder: {extract_dir}", "INFO") | |
| if success_count > 0: | |
| processing_status["extracted_courses"] += 1 # Assuming one rar is one course | |
| return True | |
| else: | |
| log_message(f"โ All video processing failed for {rar_filename}", "ERROR") | |
| return False | |
| def get_next_file_to_process(repo_id: str, state: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
| """ | |
| Finds the next file to process from the source repo. | |
| Returns: { 'filename': str, 'url': str, 'index': int } or None | |
| """ | |
| log_message(f"๐ Searching for next file to process in {repo_id}", "INFO") | |
| try: | |
| # 1. List all files in the source repository | |
| files_list = hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset") | |
| # 2. Filter for .rar and .zip files | |
| archive_files = sorted([f for f in files_list if f.endswith(('.rar', '.zip'))]) | |
| if not archive_files: | |
| log_message("โน๏ธ No .rar or .zip files found in the source repository.", "INFO") | |
| return None | |
| processing_status["total_files"] = len(archive_files) | |
| # 3. Get the next index from the state | |
| start_index = state.get("next_download_index", 0) | |
| # 4. Iterate through files starting from the index | |
| for index in range(start_index, len(archive_files)): | |
| filename = archive_files[index] | |
| # Check the state of the file | |
| file_state = state["file_states"].get(filename) | |
| # Only process if the file is not in the state or is marked as 'failed' | |
| if file_state is None or file_state == "failed": | |
| # Construct the download URL | |
| url = hf_hub_url(repo_id=repo_id, filename=filename, repo_type="dataset", subfolder=None) | |
| log_message(f"โ Found next file: {filename} at index {index}", "INFO") | |
| return { | |
| 'filename': filename, | |
| 'url': url, | |
| 'index': index | |
| } | |
| elif file_state == "processing": | |
| log_message(f"โ ๏ธ File {filename} is currently marked as 'processing'. Skipping for now.", "WARNING") | |
| # Advance the index if a file is stuck in 'processing' for too long, | |
| # but for now, we'll just skip it and let the loop continue. | |
| elif file_state == "processed": | |
| log_message(f"โน๏ธ File {filename} already processed. Skipping.", "INFO") | |
| log_message("โน๏ธ All files up to the current index have been processed or skipped.", "INFO") | |
| # If we reach the end of the list, reset the index to 0 to check for new files | |
| if start_index >= len(archive_files): | |
| log_message("โน๏ธ Reached end of file list. Resetting index to 0 for next loop.", "INFO") | |
| state["next_download_index"] = 0 | |
| upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state) | |
| return None | |
| except Exception as e: | |
| log_message(f"โ Failed to list files from Hugging Face: {str(e)}", "ERROR") | |
| return None | |
| def main_processing_loop(): | |
| """The main loop that orchestrates the download, processing, and upload cycle.""" | |
| if processing_status["is_running"]: | |
| log_message("โ ๏ธ Processing loop is already running.", "WARNING") | |
| return | |
| processing_status["is_running"] = True | |
| try: | |
| log_message("๐ Starting main processing loop...", "INFO") | |
| while processing_status["is_running"]: | |
| # 1. Download the current state | |
| current_state = download_hf_state(TARGET_REPO_ID, HF_STATE_FILE) | |
| # 2. Find the next file to process | |
| next_file_info = get_next_file_to_process(SOURCE_REPO_ID, current_state) | |
| if next_file_info is None: | |
| log_message("๐ค No new files to process. Sleeping for a while...", "INFO") | |
| time.sleep(PROCESSING_DELAY * 5) # Sleep longer if nothing to do | |
| continue | |
| target_file = next_file_info['filename'] | |
| rar_url = next_file_info['url'] | |
| target_index = next_file_info['index'] | |
| processing_status["current_file"] = target_file | |
| success = False | |
| try: | |
| # 3. Lock the file for processing | |
| if not lock_file_for_processing(target_file, current_state): | |
| log_message(f"โ Failed to lock file {target_file}. Skipping.", "ERROR") | |
| time.sleep(PROCESSING_DELAY) | |
| continue # Start next iteration | |
| # 4. Download the file | |
| local_rar_path = os.path.join(DOWNLOAD_FOLDER, target_file) | |
| log_message(f"โฌ๏ธ Downloading file: {target_file}", "INFO") | |
| if download_with_retry(rar_url, local_rar_path): | |
| # 5. Process the file (extraction, frame processing, zipping, uploading results, etc.) | |
| if process_rar_file(local_rar_path): | |
| success = True | |
| log_message(f"โ Finished all processing steps for: {target_file}", "INFO") | |
| else: | |
| log_message(f"โ Processing failed for: {target_file}", "ERROR") | |
| else: | |
| log_message(f"โ Download failed for: {target_file}", "ERROR") | |
| except Exception as e: | |
| log_message(f"๐ฅ An unhandled error occurred while processing {target_file}: {str(e)}", "ERROR") | |
| log_failed_file(target_file, str(e)) | |
| finally: | |
| # 6. Release Lock / Update State | |
| # The next index to check will be the one *after* the current file, regardless of success. | |
| next_index_to_save = target_index + 1 | |
| # Download the latest state again before final upload to ensure we don't overwrite | |
| # changes made by other workers in the meantime (e.g. if they processed a file | |
| # that was before this one in the queue). | |
| current_state = download_hf_state(TARGET_REPO_ID, HF_STATE_FILE) | |
| if success: | |
| # Mark as 'processed' and update the next_download_index | |
| unlock_file_as_processed(target_file, current_state, next_index_to_save) | |
| processing_status["processed_files"] += 1 | |
| else: | |
| # If processing failed, we still want to release the 'processing' lock, | |
| # but we mark it as 'failed' instead of 'processed' and still advance the index. | |
| log_message(f"โ ๏ธ Processing failed for {target_file}. Marking as 'failed' and advancing index.", "WARNING") | |
| current_state["file_states"][target_file] = "failed" | |
| current_state["next_download_index"] = next_index_to_save | |
| upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, current_state) | |
| processing_status["failed_files"] += 1 | |
| # Clean up local files | |
| if os.path.exists(local_rar_path): | |
| os.remove(local_rar_path) | |
| log_message(f"๐๏ธ Cleaned up local file: {local_rar_path}", "INFO") | |
| # Wait a bit before checking for the next file to avoid hammering the HF API | |
| time.sleep(PROCESSING_DELAY) | |
| log_message("๐ Processing complete!", "INFO") | |
| log_message(f'๐ Final stats: {processing_status["extracted_courses"]} courses extracted, {processing_status["extracted_videos"]} videos processed, frames extracted', "INFO") | |
| except KeyboardInterrupt: | |
| log_message("โน๏ธ Processing interrupted by user", "WARNING") | |
| except Exception as e: | |
| log_message(f"โ Fatal error: {str(e)}", "ERROR") | |
| finally: | |
| processing_status["is_running"] = False | |
| cleanup_temp_files() | |
| # Expose necessary functions and variables | |
| __all__ = [ | |
| "main_processing_loop", | |
| "processing_status", | |
| "log_message", | |
| "extract_frames", | |
| "DEFAULT_FPS", | |
| "ensure_dir" | |
| ] | |