|
|
import os |
|
|
import json |
|
|
import requests |
|
|
import subprocess |
|
|
import shutil |
|
|
import time |
|
|
import re |
|
|
import threading |
|
|
from typing import Dict, List, Set, Optional, Any |
|
|
from huggingface_hub import HfApi, list_repo_files, CommitOperationAdd, hf_hub_download, hf_hub_url |
|
|
|
|
|
import cv2 |
|
|
import numpy as np |
|
|
from pathlib import Path |
|
|
import smtplib |
|
|
from email.message import EmailMessage |
|
|
|
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN", "") |
|
|
SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1") |
|
|
TARGET_REPO_ID = os.getenv("TARGET_REPO", "Fred808/BG3") |
|
|
|
|
|
|
|
|
DOWNLOAD_FOLDER = "downloads" |
|
|
EXTRACT_FOLDER = "extracted" |
|
|
FRAMES_OUTPUT_FOLDER = "extracted_frames" |
|
|
ZIP_OUTPUT_FOLDER = "zipped_frames" |
|
|
LOCAL_STATE_FOLDER = ".state" |
|
|
|
|
|
os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) |
|
|
os.makedirs(EXTRACT_FOLDER, exist_ok=True) |
|
|
os.makedirs(FRAMES_OUTPUT_FOLDER, exist_ok=True) |
|
|
os.makedirs(ZIP_OUTPUT_FOLDER, exist_ok=True) |
|
|
os.makedirs(LOCAL_STATE_FOLDER, exist_ok=True) |
|
|
|
|
|
|
|
|
FAILED_FILES_LOG = "failed_files.log" |
|
|
HF_STATE_FILE = "processing_state2.json" |
|
|
|
|
|
|
|
|
CHUNK_SIZE = 2 |
|
|
PROCESSING_DELAY = 2 |
|
|
MAX_RETRIES = 3 |
|
|
MIN_FREE_SPACE_GB = 2 |
|
|
|
|
|
|
|
|
DEFAULT_FPS = 3 |
|
|
|
|
|
|
|
|
|
|
|
hf_api = HfApi(token=HF_TOKEN) |
|
|
|
|
|
|
|
|
processing_status = { |
|
|
"is_running": False, |
|
|
"current_file": None, |
|
|
"total_files": 0, |
|
|
"processed_files": 0, |
|
|
"failed_files": 0, |
|
|
"extracted_courses": 0, |
|
|
"extracted_videos": 0, |
|
|
"last_update": None, |
|
|
"logs": [] |
|
|
} |
|
|
|
|
|
def log_message(message: str): |
|
|
"""Log messages with timestamp""" |
|
|
timestamp = time.strftime("%Y-%m-%d %H:%M:%S") |
|
|
log_entry = f"[{timestamp}] {message}" |
|
|
print(log_entry) |
|
|
processing_status["logs"].append(log_entry) |
|
|
processing_status["last_update"] = timestamp |
|
|
if len(processing_status["logs"]) > 100: |
|
|
processing_status["logs"] = processing_status["logs"][-100:] |
|
|
|
|
|
def log_failed_file(filename: str, error: str): |
|
|
"""Log failed files to persistent file""" |
|
|
with open(FAILED_FILES_LOG, "a") as f: |
|
|
f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {filename}: {error}\n") |
|
|
|
|
|
def get_disk_usage(path: str) -> Dict[str, float]: |
|
|
"""Get disk usage statistics in GB""" |
|
|
statvfs = os.statvfs(path) |
|
|
total = statvfs.f_frsize * statvfs.f_blocks / (1024**3) |
|
|
free = statvfs.f_frsize * statvfs.f_bavail / (1024**3) |
|
|
used = total - free |
|
|
return {"total": total, "free": free, "used": used} |
|
|
|
|
|
def check_disk_space(path: str = ".") -> bool: |
|
|
"""Check if there's enough disk space""" |
|
|
disk_info = get_disk_usage(path) |
|
|
if disk_info["free"] < MIN_FREE_SPACE_GB: |
|
|
log_message(f'β οΈ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used') |
|
|
return False |
|
|
return True |
|
|
|
|
|
def cleanup_temp_files(): |
|
|
"""Clean up temporary files to free space""" |
|
|
log_message("π§Ή Cleaning up temporary files...") |
|
|
|
|
|
|
|
|
current_file = processing_status.get("current_file") |
|
|
for file in os.listdir(DOWNLOAD_FOLDER): |
|
|
if file != current_file and file.endswith((".rar", ".zip")): |
|
|
try: |
|
|
os.remove(os.path.join(DOWNLOAD_FOLDER, file)) |
|
|
log_message(f"ποΈ Removed old download: {file}") |
|
|
except: |
|
|
pass |
|
|
|
|
|
def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Load state from JSON file with migration logic for new structure.""" |
|
|
if os.path.exists(file_path): |
|
|
try: |
|
|
with open(file_path, "r") as f: |
|
|
data = json.load(f) |
|
|
|
|
|
|
|
|
|
|
|
if "processed_rars" in data and isinstance(data["processed_rars"], list): |
|
|
log_message("βΉοΈ Migrating old 'processed_rars' list to new 'file_states' dictionary.") |
|
|
data["file_states"] = { |
|
|
filename: "processed" for filename in data.pop("processed_rars") |
|
|
} |
|
|
|
|
|
|
|
|
if "file_states" not in data or not isinstance(data["file_states"], dict): |
|
|
log_message("βΉοΈ Initializing 'file_states' dictionary.") |
|
|
data["file_states"] = {} |
|
|
|
|
|
|
|
|
if "next_download_index" not in data: |
|
|
data["next_download_index"] = 0 |
|
|
|
|
|
return data |
|
|
except json.JSONDecodeError: |
|
|
log_message(f"β οΈ Corrupted state file: {file_path}") |
|
|
return default_value |
|
|
|
|
|
def save_json_state(file_path: str, data: Dict[str, Any]): |
|
|
"""Save state to JSON file""" |
|
|
with open(file_path, "w") as f: |
|
|
json.dump(data, f, indent=2) |
|
|
|
|
|
def download_hf_state(repo_id: str, filename: str) -> Dict[str, Any]: |
|
|
"""Downloads the state file from Hugging Face or returns a default state.""" |
|
|
local_path = os.path.join(LOCAL_STATE_FOLDER, filename) |
|
|
|
|
|
default_state = {"next_download_index": 0, "file_states": {}} |
|
|
|
|
|
try: |
|
|
|
|
|
files = hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset") |
|
|
if filename not in files: |
|
|
log_message(f"βΉοΈ State file {filename} not found in {repo_id}. Starting from default state.") |
|
|
return default_state |
|
|
|
|
|
|
|
|
hf_hub_download( |
|
|
repo_id=repo_id, |
|
|
filename=filename, |
|
|
repo_type="dataset", |
|
|
local_dir=LOCAL_STATE_FOLDER, |
|
|
local_dir_use_symlinks=False |
|
|
) |
|
|
|
|
|
log_message(f"β
Successfully downloaded state file from {repo_id}.") |
|
|
|
|
|
return load_json_state(local_path, default_state) |
|
|
|
|
|
except Exception as e: |
|
|
log_message(f"β οΈ Failed to download state file from Hugging Face: {str(e)}. Starting from default state.") |
|
|
return default_state |
|
|
|
|
|
def upload_hf_state(repo_id: str, filename: str, state: Dict[str, Any]) -> bool: |
|
|
"""Uploads the state file to Hugging Face.""" |
|
|
local_path = os.path.join(LOCAL_STATE_FOLDER, filename) |
|
|
|
|
|
try: |
|
|
|
|
|
save_json_state(local_path, state) |
|
|
|
|
|
|
|
|
hf_api.upload_file( |
|
|
path_or_fileobj=local_path, |
|
|
path_in_repo=filename, |
|
|
repo_id=repo_id, |
|
|
repo_type="dataset", |
|
|
commit_message=f"Update processing state: next_index={state['next_download_index']}" |
|
|
) |
|
|
log_message(f"β
Successfully uploaded updated state file to {repo_id}") |
|
|
return True |
|
|
except Exception as e: |
|
|
log_message(f"β Failed to upload state file to Hugging Face: {str(e)}") |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
def lock_file_for_processing(rar_filename: str, state: Dict[str, Any]) -> bool: |
|
|
"""Marks a file as 'processing' in the state file and uploads the lock.""" |
|
|
log_message(f"π Attempting to lock file: {rar_filename} (Marking as 'processing')") |
|
|
|
|
|
|
|
|
state["file_states"][rar_filename] = "processing" |
|
|
|
|
|
|
|
|
if upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state): |
|
|
log_message(f"β
Successfully locked file: {rar_filename}") |
|
|
return True |
|
|
else: |
|
|
log_message(f"β Failed to upload lock for file: {rar_filename}. Aborting processing.") |
|
|
|
|
|
if rar_filename in state["file_states"]: |
|
|
del state["file_states"][rar_filename] |
|
|
return False |
|
|
|
|
|
def unlock_file_as_processed(rar_filename: str, state: Dict[str, Any], next_index: int) -> bool: |
|
|
"""Marks a file as 'processed', updates the index, and uploads the state.""" |
|
|
log_message(f"π Attempting to unlock file: {rar_filename} (Marking as 'processed')") |
|
|
|
|
|
|
|
|
state["file_states"][rar_filename] = "processed" |
|
|
state["next_download_index"] = next_index |
|
|
|
|
|
|
|
|
if upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state): |
|
|
log_message(f"β
Successfully unlocked and marked as processed: {rar_filename}") |
|
|
return True |
|
|
else: |
|
|
log_message(f"β Failed to upload final state for file: {rar_filename}. The file is processed locally but state is not updated.") |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool: |
|
|
"""Download file with retry logic and disk space checking""" |
|
|
if not check_disk_space(): |
|
|
cleanup_temp_files() |
|
|
if not check_disk_space(): |
|
|
log_message("β Insufficient disk space even after cleanup") |
|
|
return False |
|
|
|
|
|
|
|
|
try: |
|
|
os.makedirs(os.path.dirname(dest_path), exist_ok=True) |
|
|
except Exception as e: |
|
|
log_message(f"β Failed to create directory for download path {os.path.dirname(dest_path)}: {str(e)}") |
|
|
return False |
|
|
|
|
|
headers = {"Authorization": f"Bearer {HF_TOKEN}"} |
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
with requests.get(url, headers=headers, stream=True) as r: |
|
|
r.raise_for_status() |
|
|
|
|
|
|
|
|
content_length = r.headers.get("content-length") |
|
|
if content_length: |
|
|
size_gb = int(content_length) / (1024**3) |
|
|
disk_info = get_disk_usage(".") |
|
|
if size_gb > disk_info["free"] - 0.5: |
|
|
log_message(f'β File too large: {size_gb:.2f}GB, only {disk_info["free"]:.2f}GB free') |
|
|
return False |
|
|
|
|
|
with open(dest_path, "wb") as f: |
|
|
for chunk in r.iter_content(chunk_size=8192): |
|
|
f.write(chunk) |
|
|
return True |
|
|
except Exception as e: |
|
|
if attempt < max_retries - 1: |
|
|
time.sleep(2 ** attempt) |
|
|
continue |
|
|
log_message(f"β Download failed after {max_retries} attempts: {e}") |
|
|
return False |
|
|
return False |
|
|
|
|
|
def is_multipart_rar(filename: str) -> bool: |
|
|
"""Check if this is a multi-part RAR file""" |
|
|
return ".part" in filename.lower() and filename.lower().endswith(".rar") |
|
|
|
|
|
def get_rar_part_base(filename: str) -> str: |
|
|
"""Get the base name for multi-part RAR files""" |
|
|
if ".part" in filename.lower(): |
|
|
return filename.split(".part")[0] |
|
|
return filename.replace(".rar", "") |
|
|
|
|
|
def extract_with_retry(rar_path: str, output_dir: str, max_retries: int = 2) -> bool: |
|
|
"""Extract RAR with retry and recovery, handling multi-part archives""" |
|
|
filename = os.path.basename(rar_path) |
|
|
|
|
|
log_message("π¦ Processing RAR file independently as requested by user.") |
|
|
|
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
|
|
|
cmd = ["unrar", "e", "-o+", "-kb", rar_path, output_dir] |
|
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
|
if result.returncode == 0: |
|
|
log_message(f"β
Successfully extracted: {os.path.basename(rar_path)}") |
|
|
return True |
|
|
else: |
|
|
error_msg = result.stderr or result.stdout |
|
|
log_message(f"β οΈ Extraction attempt {attempt + 1} failed: {error_msg}") |
|
|
|
|
|
|
|
|
if "Cannot find volume" in error_msg: |
|
|
log_message(f"β
Extracted all possible files from independent part (expected volume error).") |
|
|
return True |
|
|
|
|
|
if "checksum error" in error_msg.lower() or "CRC failed" in error_msg: |
|
|
log_message(f"β οΈ Data corruption detected, attempt {attempt + 1}") |
|
|
elif result.returncode == 10: |
|
|
log_message(f"β οΈ No files to extract (exit code 10)") |
|
|
elif result.returncode == 1: |
|
|
log_message(f"β οΈ Non-fatal error (exit code 1)") |
|
|
|
|
|
except Exception as e: |
|
|
log_message(f"β Extraction exception: {str(e)}") |
|
|
if attempt == max_retries - 1: |
|
|
return False |
|
|
time.sleep(1) |
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
def ensure_dir(path): |
|
|
os.makedirs(path, exist_ok=True) |
|
|
|
|
|
def extract_frames(video_path, output_dir, fps=DEFAULT_FPS): |
|
|
"""Extract frames from video at the specified frames per second (fps).""" |
|
|
log_message(f"[INFO] Extracting frames from {video_path} to {output_dir} at {fps} fps...") |
|
|
ensure_dir(output_dir) |
|
|
cap = cv2.VideoCapture(str(video_path)) |
|
|
if not cap.isOpened(): |
|
|
log_message(f"[ERROR] Failed to open video file: {video_path}") |
|
|
return 0 |
|
|
video_fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
if not video_fps or video_fps <= 0: |
|
|
video_fps = 30 |
|
|
log_message(f"[WARN] Using fallback FPS: {video_fps}") |
|
|
frame_interval = int(round(video_fps / fps)) |
|
|
frame_idx = 0 |
|
|
saved_idx = 1 |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
log_message(f"[DEBUG] Total frames in video: {total_frames}") |
|
|
while cap.isOpened(): |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
if frame_idx % frame_interval == 0: |
|
|
frame_name = f"{saved_idx:04d}.png" |
|
|
cv2.imwrite(str(Path(output_dir) / frame_name), frame) |
|
|
saved_idx += 1 |
|
|
frame_idx += 1 |
|
|
cap.release() |
|
|
frame_count = saved_idx - 1 |
|
|
log_message(f"Extracted {frame_count} frames from {video_path} to {output_dir}") |
|
|
return frame_count |
|
|
|
|
|
def zip_frames_folder(frames_dir: str, output_folder: str) -> Optional[str]: |
|
|
"""Zips the extracted frames folder.""" |
|
|
base_dir_name = os.path.basename(frames_dir) |
|
|
|
|
|
zip_base_name = os.path.join(output_folder, base_dir_name) |
|
|
|
|
|
log_message(f"[INFO] Zipping frames from {frames_dir} to {zip_base_name}.zip...") |
|
|
try: |
|
|
archive_path = shutil.make_archive( |
|
|
base_name=zip_base_name, |
|
|
format='zip', |
|
|
root_dir=os.path.dirname(frames_dir), |
|
|
base_dir=base_dir_name |
|
|
) |
|
|
log_message(f"β
Successfully created zip file: {archive_path}") |
|
|
return archive_path |
|
|
except Exception as e: |
|
|
log_message(f"β Failed to create zip archive for {frames_dir}: {str(e)}") |
|
|
return None |
|
|
|
|
|
def upload_to_huggingface(file_path: str, repo_id: str, path_in_repo: str) -> bool: |
|
|
"""Uploads a file to a Hugging Face dataset repository.""" |
|
|
filename = os.path.basename(file_path) |
|
|
log_message(f"[INFO] Uploading {filename} to {repo_id}/{path_in_repo}...") |
|
|
try: |
|
|
|
|
|
hf_api.upload_file( |
|
|
path_or_fileobj=file_path, |
|
|
path_in_repo=path_in_repo, |
|
|
repo_id=repo_id, |
|
|
repo_type="dataset", |
|
|
commit_message=f"Add extracted frames zip: {filename}" |
|
|
) |
|
|
log_message(f"β
Successfully uploaded {filename} to {repo_id}") |
|
|
return True |
|
|
except Exception as e: |
|
|
log_message(f"β Failed to upload {filename} to Hugging Face: {str(e)}") |
|
|
return False |
|
|
|
|
|
|
|
|
def process_rar_file(rar_path: str) -> bool: |
|
|
"""Process a single RAR file - extract, then process videos for frames, zip, and upload""" |
|
|
filename = os.path.basename(rar_path) |
|
|
processing_status["current_file"] = filename |
|
|
|
|
|
|
|
|
if is_multipart_rar(filename): |
|
|
course_name = get_rar_part_base(filename) |
|
|
else: |
|
|
course_name = filename.replace(".rar", "") |
|
|
|
|
|
extract_dir = os.path.join(EXTRACT_FOLDER, course_name) |
|
|
|
|
|
try: |
|
|
log_message(f"π Processing: {filename}") |
|
|
|
|
|
|
|
|
os.makedirs(extract_dir, exist_ok=True) |
|
|
if not extract_with_retry(rar_path, extract_dir): |
|
|
raise Exception("RAR extraction failed") |
|
|
|
|
|
|
|
|
file_count = 0 |
|
|
video_files_found = [] |
|
|
for root, dirs, files in os.walk(extract_dir): |
|
|
for file in files: |
|
|
file_count += 1 |
|
|
if file.lower().endswith(('.mp4', '.avi', '.mov', '.mkv', '.webm')): |
|
|
video_files_found.append(os.path.join(root, file)) |
|
|
|
|
|
processing_status["extracted_courses"] += 1 |
|
|
log_message(f"β
Successfully extracted '{course_name}' ({file_count} files, {len(video_files_found)} videos)") |
|
|
|
|
|
|
|
|
for video_path in video_files_found: |
|
|
video_filename = Path(video_path).name |
|
|
|
|
|
frames_output_dir_name = f"{course_name}_{video_filename.replace('.', '_')}_frames" |
|
|
frames_output_dir = os.path.join( |
|
|
FRAMES_OUTPUT_FOLDER, |
|
|
frames_output_dir_name |
|
|
) |
|
|
ensure_dir(frames_output_dir) |
|
|
|
|
|
|
|
|
frame_count = extract_frames(video_path, frames_output_dir, fps=DEFAULT_FPS) |
|
|
processing_status["extracted_videos"] += 1 |
|
|
|
|
|
if frame_count == 0: |
|
|
log_message(f"β οΈ No frames extracted from {video_filename}. Skipping zip/upload.") |
|
|
|
|
|
shutil.rmtree(frames_output_dir, ignore_errors=True) |
|
|
continue |
|
|
else: |
|
|
log_message(f"β
{frame_count} frames extracted from {video_filename}") |
|
|
|
|
|
|
|
|
zip_path = zip_frames_folder(frames_output_dir, ZIP_OUTPUT_FOLDER) |
|
|
|
|
|
if zip_path: |
|
|
|
|
|
path_in_repo = f"frames/{os.path.basename(zip_path)}" |
|
|
if upload_to_huggingface(zip_path, TARGET_REPO_ID, path_in_repo): |
|
|
log_message(f"β
Upload successful for {os.path.basename(zip_path)}") |
|
|
else: |
|
|
log_message(f"β Upload failed for {os.path.basename(zip_path)}. Keeping file for manual inspection.") |
|
|
|
|
|
|
|
|
try: |
|
|
os.remove(zip_path) |
|
|
log_message(f"ποΈ Cleaned up local zip file: {os.path.basename(zip_path)}") |
|
|
shutil.rmtree(frames_output_dir, ignore_errors=True) |
|
|
log_message(f"ποΈ Cleaned up local frames directory: {frames_output_dir_name}") |
|
|
except Exception as e: |
|
|
log_message(f"β οΈ Failed to clean up temporary files: {str(e)}") |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = str(e) |
|
|
log_message(f"β Processing failed: {error_msg}") |
|
|
log_failed_file(filename, error_msg) |
|
|
return False |
|
|
|
|
|
finally: |
|
|
processing_status["current_file"] = None |
|
|
|
|
|
def get_all_rar_files(repo_id: str) -> List[str]: |
|
|
"""Fetches the list of all RAR files in the source repository.""" |
|
|
log_message(f"π Fetching all files from source repo: {repo_id}") |
|
|
try: |
|
|
files = list(hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset")) |
|
|
rar_files = sorted([f for f in files if f.lower().endswith(".rar")]) |
|
|
log_message(f"Found {len(rar_files)} RAR files.") |
|
|
return rar_files |
|
|
except Exception as e: |
|
|
log_message(f"β Failed to list files from source repo: {str(e)}") |
|
|
return [] |
|
|
|
|
|
def main_processing_loop(start_index: int = 0): |
|
|
"""Main processing workflow with locking mechanism.""" |
|
|
processing_status["is_running"] = True |
|
|
log_message("π Starting main processing loop with locking mechanism...") |
|
|
|
|
|
try: |
|
|
|
|
|
all_rar_files = get_all_rar_files(SOURCE_REPO_ID) |
|
|
if not all_rar_files: |
|
|
log_message("π No RAR files found in the source repository. Exiting.") |
|
|
return |
|
|
|
|
|
processing_status["total_files"] = len(all_rar_files) |
|
|
|
|
|
while True: |
|
|
|
|
|
current_state = download_hf_state(TARGET_REPO_ID, HF_STATE_FILE) |
|
|
file_states = current_state.get("file_states", {}) |
|
|
|
|
|
|
|
|
next_download_index = start_index if start_index > 0 else current_state.get("next_download_index", 0) |
|
|
|
|
|
|
|
|
target_file = None |
|
|
target_index = -1 |
|
|
|
|
|
|
|
|
if next_download_index >= len(all_rar_files): |
|
|
log_message("π All files have been processed! Exiting loop.") |
|
|
break |
|
|
|
|
|
for i in range(next_download_index, len(all_rar_files)): |
|
|
rar_filename = all_rar_files[i] |
|
|
state = file_states.get(rar_filename) |
|
|
|
|
|
if state in ["processed", "failed"]: |
|
|
|
|
|
continue |
|
|
|
|
|
if state == "processing": |
|
|
|
|
|
log_message(f"β οΈ File is currently 'processing' (locked): {rar_filename}. Skipping to next available file.") |
|
|
continue |
|
|
|
|
|
|
|
|
target_file = rar_filename |
|
|
target_index = i |
|
|
break |
|
|
|
|
|
if target_file is None: |
|
|
log_message("π All available files have been processed or are currently locked. Exiting loop.") |
|
|
break |
|
|
|
|
|
log_message(f"β
Selected file for processing: {target_file} (Index: {target_index})") |
|
|
processing_status["current_file"] = target_file |
|
|
|
|
|
|
|
|
if not lock_file_for_processing(target_file, current_state): |
|
|
log_message("β Failed to acquire lock. Retrying loop to get latest state.") |
|
|
time.sleep(PROCESSING_DELAY) |
|
|
continue |
|
|
|
|
|
|
|
|
rar_url = hf_hub_url(repo_id=SOURCE_REPO_ID, filename=target_file, repo_type="dataset") |
|
|
|
|
|
local_rar_path = os.path.join(DOWNLOAD_FOLDER, target_file) |
|
|
|
|
|
success = False |
|
|
try: |
|
|
|
|
|
if download_with_retry(rar_url, local_rar_path): |
|
|
|
|
|
|
|
|
if process_rar_file(local_rar_path): |
|
|
success = True |
|
|
log_message(f"β
Finished all processing steps for: {target_file}") |
|
|
else: |
|
|
log_message(f"β Processing failed for: {target_file}") |
|
|
else: |
|
|
log_message(f"β Download failed for: {target_file}") |
|
|
|
|
|
except Exception as e: |
|
|
log_message(f"π₯ An unhandled error occurred while processing {target_file}: {str(e)}") |
|
|
log_failed_file(target_file, str(e)) |
|
|
|
|
|
finally: |
|
|
|
|
|
|
|
|
|
|
|
next_index_to_save = target_index + 1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
current_state = download_hf_state(TARGET_REPO_ID, HF_STATE_FILE) |
|
|
|
|
|
if success: |
|
|
|
|
|
unlock_file_as_processed(target_file, current_state, next_index_to_save) |
|
|
processing_status["processed_files"] += 1 |
|
|
else: |
|
|
|
|
|
|
|
|
log_message(f"β οΈ Processing failed for {target_file}. Marking as 'failed' and advancing index.") |
|
|
current_state["file_states"][target_file] = "failed" |
|
|
current_state["next_download_index"] = next_index_to_save |
|
|
upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, current_state) |
|
|
processing_status["failed_files"] += 1 |
|
|
|
|
|
|
|
|
if os.path.exists(local_rar_path): |
|
|
os.remove(local_rar_path) |
|
|
log_message(f"ποΈ Cleaned up local file: {local_rar_path}") |
|
|
|
|
|
|
|
|
time.sleep(PROCESSING_DELAY) |
|
|
|
|
|
log_message("π Processing complete!") |
|
|
log_message(f'π Final stats: {processing_status["extracted_courses"]} courses extracted, {processing_status["extracted_videos"]} videos processed, frames extracted') |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
log_message("βΉοΈ Processing interrupted by user") |
|
|
except Exception as e: |
|
|
log_message(f"β Fatal error: {str(e)}") |
|
|
finally: |
|
|
processing_status["is_running"] = False |
|
|
cleanup_temp_files() |
|
|
|
|
|
|
|
|
__all__ = [ |
|
|
"main_processing_loop", |
|
|
"processing_status", |
|
|
"log_message", |
|
|
"extract_frames", |
|
|
"DEFAULT_FPS", |
|
|
"ensure_dir" |
|
|
] |
|
|
|