|
|
import os
|
|
|
import json
|
|
|
import requests
|
|
|
import subprocess
|
|
|
import shutil
|
|
|
import time
|
|
|
import re
|
|
|
import threading
|
|
|
from typing import Dict, List, Set, Optional, Any
|
|
|
from huggingface_hub import HfApi, list_repo_files, CommitOperationAdd, hf_hub_download, hf_hub_url
|
|
|
|
|
|
import cv2
|
|
|
import numpy as np
|
|
|
from pathlib import Path
|
|
|
import smtplib
|
|
|
from email.message import EmailMessage
|
|
|
from moviepy import VideoFileClip
|
|
|
|
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN", "")
|
|
|
SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1")
|
|
|
TARGET_REPO_ID = os.getenv("TARGET_REPO", "Samfredoly/BG_Vid")
|
|
|
|
|
|
|
|
|
DOWNLOAD_FOLDER = "downloads"
|
|
|
EXTRACT_FOLDER = "extracted"
|
|
|
LOCAL_STATE_FOLDER = ".state"
|
|
|
FRAMES_OUTPUT_FOLDER = "frames"
|
|
|
|
|
|
os.makedirs(DOWNLOAD_FOLDER, exist_ok=True)
|
|
|
os.makedirs(EXTRACT_FOLDER, exist_ok=True)
|
|
|
os.makedirs(LOCAL_STATE_FOLDER, exist_ok=True)
|
|
|
|
|
|
|
|
|
FAILED_FILES_LOG = "failed_files.log"
|
|
|
HF_STATE_FILE = "processing_state2.json"
|
|
|
|
|
|
|
|
|
CHUNK_SIZE = 2
|
|
|
PROCESSING_DELAY = 2
|
|
|
MAX_RETRIES = 3
|
|
|
MIN_FREE_SPACE_GB = 2
|
|
|
|
|
|
|
|
|
|
|
|
hf_api = HfApi(token=HF_TOKEN)
|
|
|
|
|
|
|
|
|
processing_status = {
|
|
|
"is_running": False,
|
|
|
"current_file": None,
|
|
|
"total_files": 0,
|
|
|
"processed_files": 0,
|
|
|
"failed_files": 0,
|
|
|
"extracted_courses": 0,
|
|
|
"extracted_videos": 0,
|
|
|
"last_update": None,
|
|
|
"logs": []
|
|
|
}
|
|
|
|
|
|
def log_message(message: str, level: str = "INFO"):
|
|
|
"""Log messages with timestamp"""
|
|
|
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
log_entry = f"[{timestamp}] {level}: {message}"
|
|
|
print(log_entry)
|
|
|
processing_status["logs"].append(log_entry)
|
|
|
processing_status["last_update"] = timestamp
|
|
|
if len(processing_status["logs"]) > 100:
|
|
|
processing_status["logs"] = processing_status["logs"][-100:]
|
|
|
|
|
|
def log_failed_file(filename: str, error: str):
|
|
|
"""Log failed files to persistent file"""
|
|
|
with open(FAILED_FILES_LOG, "a") as f:
|
|
|
f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {filename}: {error}\n")
|
|
|
|
|
|
def get_disk_usage(path: str) -> Dict[str, float]:
|
|
|
"""Get disk usage statistics in GB"""
|
|
|
statvfs = os.statvfs(path)
|
|
|
total = statvfs.f_frsize * statvfs.f_blocks / (1024**3)
|
|
|
free = statvfs.f_frsize * statvfs.f_bavail / (1024**3)
|
|
|
used = total - free
|
|
|
return {"total": total, "free": free, "used": used}
|
|
|
|
|
|
def check_disk_space(path: str = ".") -> bool:
|
|
|
"""Check if there's enough disk space"""
|
|
|
disk_info = get_disk_usage(path)
|
|
|
if disk_info["free"] < MIN_FREE_SPACE_GB:
|
|
|
log_message(f'β οΈ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used')
|
|
|
return False
|
|
|
return True
|
|
|
|
|
|
def cleanup_temp_files():
|
|
|
"""Clean up temporary files to free space"""
|
|
|
log_message("π§Ή Cleaning up temporary files...", "INFO")
|
|
|
|
|
|
|
|
|
current_file = processing_status.get("current_file")
|
|
|
for file in os.listdir(DOWNLOAD_FOLDER):
|
|
|
if file != current_file and file.endswith((".rar", ".zip")):
|
|
|
try:
|
|
|
os.remove(os.path.join(DOWNLOAD_FOLDER, file))
|
|
|
log_message(f"ποΈ Removed old download: {file}", "INFO")
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
"""Load state from JSON file with migration logic for new structure."""
|
|
|
if os.path.exists(file_path):
|
|
|
try:
|
|
|
with open(file_path, "r") as f:
|
|
|
data = json.load(f)
|
|
|
|
|
|
|
|
|
|
|
|
if "processed_rars" in data and isinstance(data["processed_rars"], list):
|
|
|
log_message("βΉοΈ Migrating old 'processed_rars' list to new 'file_states' dictionary.", "INFO")
|
|
|
data["file_states"] = {
|
|
|
filename: "processed" for filename in data.pop("processed_rars")
|
|
|
}
|
|
|
|
|
|
|
|
|
if "file_states" not in data or not isinstance(data["file_states"], dict):
|
|
|
log_message("βΉοΈ Initializing 'file_states' dictionary.", "INFO")
|
|
|
data["file_states"] = {}
|
|
|
|
|
|
|
|
|
if "next_download_index" not in data:
|
|
|
data["next_download_index"] = 0
|
|
|
|
|
|
return data
|
|
|
except json.JSONDecodeError:
|
|
|
log_message(f"β οΈ Corrupted state file: {file_path}", "WARNING")
|
|
|
return default_value
|
|
|
|
|
|
def save_json_state(file_path: str, data: Dict[str, Any]):
|
|
|
"""Save state to JSON file"""
|
|
|
with open(file_path, "w") as f:
|
|
|
json.dump(data, f, indent=2)
|
|
|
|
|
|
def download_hf_state(repo_id: str, filename: str) -> Dict[str, Any]:
|
|
|
"""Downloads the state file from Hugging Face or returns a default state."""
|
|
|
local_path = os.path.join(LOCAL_STATE_FOLDER, filename)
|
|
|
|
|
|
default_state = {"next_download_index": 0, "file_states": {}}
|
|
|
|
|
|
try:
|
|
|
|
|
|
files = hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset")
|
|
|
if filename not in files:
|
|
|
log_message(f"βΉοΈ State file {filename} not found in {repo_id}. Starting from default state.", "INFO")
|
|
|
return default_state
|
|
|
|
|
|
|
|
|
hf_hub_download(
|
|
|
repo_id=repo_id,
|
|
|
filename=filename,
|
|
|
repo_type="dataset",
|
|
|
local_dir=LOCAL_STATE_FOLDER,
|
|
|
local_dir_use_symlinks=False
|
|
|
)
|
|
|
|
|
|
log_message(f"β
Successfully downloaded state file from {repo_id}.", "INFO")
|
|
|
|
|
|
return load_json_state(local_path, default_state)
|
|
|
|
|
|
except Exception as e:
|
|
|
log_message(f"β οΈ Failed to download state file from Hugging Face: {str(e)}. Starting from default state.", "WARNING")
|
|
|
return default_state
|
|
|
|
|
|
def upload_hf_state(repo_id: str, filename: str, state: Dict[str, Any]) -> bool:
|
|
|
"""Uploads the state file to Hugging Face."""
|
|
|
local_path = os.path.join(LOCAL_STATE_FOLDER, filename)
|
|
|
|
|
|
try:
|
|
|
|
|
|
save_json_state(local_path, state)
|
|
|
|
|
|
|
|
|
hf_api.upload_file(
|
|
|
path_or_fileobj=local_path,
|
|
|
path_in_repo=filename,
|
|
|
repo_id=repo_id,
|
|
|
repo_type="dataset",
|
|
|
commit_message=f"Update processing state: next_index={state['next_download_index']}"
|
|
|
)
|
|
|
log_message(f"β
Successfully uploaded updated state file to {repo_id}", "INFO")
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
log_message(f"β Failed to upload state file to Hugging Face: {str(e)}", "ERROR")
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def lock_file_for_processing(rar_filename: str, state: Dict[str, Any]) -> bool:
|
|
|
"""Marks a file as 'processing' in the state file and uploads the lock."""
|
|
|
log_message(f"π Attempting to lock file: {rar_filename} (Marking as 'processing')", "INFO")
|
|
|
|
|
|
|
|
|
state["file_states"][rar_filename] = "processing"
|
|
|
|
|
|
|
|
|
if upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state):
|
|
|
log_message(f"β
Successfully locked file: {rar_filename}", "INFO")
|
|
|
return True
|
|
|
else:
|
|
|
log_message(f"β Failed to upload lock for file: {rar_filename}. Aborting processing.", "ERROR")
|
|
|
|
|
|
if rar_filename in state["file_states"]:
|
|
|
del state["file_states"][rar_filename]
|
|
|
return False
|
|
|
|
|
|
def unlock_file_as_processed(rar_filename: str, state: Dict[str, Any], next_index: int) -> bool:
|
|
|
"""Marks a file as 'processed', updates the index, and uploads the state."""
|
|
|
log_message(f"π Attempting to unlock file: {rar_filename} (Marking as 'processed')", "INFO")
|
|
|
|
|
|
|
|
|
state["file_states"][rar_filename] = "processed"
|
|
|
state["next_download_index"] = next_index
|
|
|
|
|
|
|
|
|
if upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state):
|
|
|
log_message(f"β
Successfully unlocked and marked as processed: {rar_filename}", "INFO")
|
|
|
return True
|
|
|
else:
|
|
|
log_message(f"β Failed to upload final state for file: {rar_filename}. The file is processed locally but state is not updated.", "ERROR")
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool:
|
|
|
"""Download file with retry logic and disk space checking"""
|
|
|
if not check_disk_space():
|
|
|
cleanup_temp_files()
|
|
|
if not check_disk_space():
|
|
|
log_message("β Insufficient disk space even after cleanup", "ERROR")
|
|
|
return False
|
|
|
|
|
|
|
|
|
try:
|
|
|
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
|
|
|
except Exception as e:
|
|
|
log_message(f"β Failed to create directory for download path {os.path.dirname(dest_path)}: {str(e)}", "ERROR")
|
|
|
return False
|
|
|
|
|
|
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
|
|
|
for attempt in range(max_retries):
|
|
|
try:
|
|
|
with requests.get(url, headers=headers, stream=True) as r:
|
|
|
r.raise_for_status()
|
|
|
|
|
|
|
|
|
content_length = r.headers.get("content-length")
|
|
|
if content_length:
|
|
|
size_gb = int(content_length) / (1024**3)
|
|
|
disk_info = get_disk_usage(".")
|
|
|
|
|
|
if disk_info["free"] < size_gb + MIN_FREE_SPACE_GB:
|
|
|
log_message(f"β οΈ Not enough space for download ({size_gb:.2f}GB required). Freeing space...", "WARNING")
|
|
|
cleanup_temp_files()
|
|
|
disk_info = get_disk_usage(".")
|
|
|
if disk_info["free"] < size_gb + MIN_FREE_SPACE_GB:
|
|
|
log_message(f"β Still not enough space for download. Required: {size_gb + MIN_FREE_SPACE_GB:.2f}GB, Available: {disk_info['free']:.2f}GB", "ERROR")
|
|
|
return False
|
|
|
|
|
|
|
|
|
with open(dest_path, "wb") as f:
|
|
|
for chunk in r.iter_content(chunk_size=8192):
|
|
|
if chunk:
|
|
|
f.write(chunk)
|
|
|
|
|
|
log_message(f"β
Download successful: {dest_path}", "INFO")
|
|
|
return True
|
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
log_message(f"β Download attempt {attempt + 1} failed for {url}: {str(e)}", "WARNING")
|
|
|
time.sleep(PROCESSING_DELAY)
|
|
|
except Exception as e:
|
|
|
log_message(f"β An unexpected error occurred during download: {str(e)}", "ERROR")
|
|
|
return False
|
|
|
|
|
|
log_message(f"β Failed to download {url} after {max_retries} attempts.", "ERROR")
|
|
|
return False
|
|
|
|
|
|
def extract_rar(rar_path: str, extract_path: str) -> bool:
|
|
|
"""Extracts a RAR file using unrar (requires unrar to be installed)."""
|
|
|
log_message(f"π¦ Attempting to extract RAR: {rar_path} to {extract_path}", "INFO")
|
|
|
|
|
|
|
|
|
def _run(cmd):
|
|
|
try:
|
|
|
proc = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
|
return True, proc
|
|
|
except subprocess.CalledProcessError as e:
|
|
|
return False, e
|
|
|
|
|
|
try:
|
|
|
|
|
|
os.makedirs(extract_path, exist_ok=True)
|
|
|
|
|
|
|
|
|
command = ["unrar", "x", "-o+", "-y", rar_path, extract_path]
|
|
|
ok, result = _run(command)
|
|
|
if ok:
|
|
|
|
|
|
if hasattr(result, 'stdout') and "All OK" not in result.stdout:
|
|
|
log_message(f"β οΈ RAR extraction finished with warnings/non-fatal errors for {rar_path}: {result.stdout}", "WARNING")
|
|
|
log_message(f"β
Successfully extracted RAR: {rar_path}", "INFO")
|
|
|
return True
|
|
|
|
|
|
|
|
|
stderr = ''
|
|
|
if isinstance(result, subprocess.CalledProcessError):
|
|
|
stderr = (result.stderr or '')
|
|
|
|
|
|
|
|
|
if "start extraction from a previous volume" in stderr.lower() or "previous volume" in stderr.lower() or "you need to start extraction" in stderr.lower():
|
|
|
log_message(f"β οΈ Full extraction failed due to multipart volume dependency for {rar_path}. Will attempt per-file extraction fallback.", "WARNING")
|
|
|
|
|
|
|
|
|
list_cmd = ["unrar", "lb", rar_path]
|
|
|
ok_list, list_result = _run(list_cmd)
|
|
|
if not ok_list:
|
|
|
log_message(f"β Failed to list archive contents for {rar_path}: {(list_result.stderr if isinstance(list_result, subprocess.CalledProcessError) else str(list_result))}", "ERROR")
|
|
|
return False
|
|
|
|
|
|
file_list = [ln.strip() for ln in (list_result.stdout or '').splitlines() if ln.strip()]
|
|
|
if not file_list:
|
|
|
log_message(f"β οΈ Archive {rar_path} appears empty or listing failed. Cannot extract.", "WARNING")
|
|
|
return False
|
|
|
|
|
|
extracted_any = False
|
|
|
|
|
|
for member in file_list:
|
|
|
|
|
|
cmd = ["unrar", "x", "-o+", "-y", rar_path, member, extract_path]
|
|
|
ok_member, member_result = _run(cmd)
|
|
|
if ok_member:
|
|
|
extracted_any = True
|
|
|
log_message(f"β
Extracted member {member} from {rar_path}", "INFO")
|
|
|
else:
|
|
|
|
|
|
member_err = ''
|
|
|
if isinstance(member_result, subprocess.CalledProcessError):
|
|
|
member_err = (member_result.stderr or '')
|
|
|
log_message(f"β οΈ Could not extract member {member} from {rar_path}: {member_err.strip()}", "WARNING")
|
|
|
|
|
|
if extracted_any:
|
|
|
log_message(f"β
Finished partial extraction from {rar_path} (some members extracted)", "INFO")
|
|
|
return True
|
|
|
else:
|
|
|
log_message(f"β No members could be extracted from {rar_path} independently.", "ERROR")
|
|
|
return False
|
|
|
|
|
|
|
|
|
log_message(f"β RAR extraction failed for {rar_path}. Error: {stderr}", "ERROR")
|
|
|
return False
|
|
|
|
|
|
except FileNotFoundError:
|
|
|
log_message("β 'unrar' command not found. Please ensure 'unrar' is installed.", "ERROR")
|
|
|
return False
|
|
|
except Exception as e:
|
|
|
log_message(f"β An unexpected error occurred during RAR extraction: {str(e)}", "ERROR")
|
|
|
return False
|
|
|
|
|
|
def extract_audio_from_video(video_path: str, output_wav_path: str) -> bool:
|
|
|
"""Extracts audio from a video file and saves it as WAV format using moviepy."""
|
|
|
log_message(f"π Extracting audio from {video_path} to {output_wav_path}", "INFO")
|
|
|
|
|
|
try:
|
|
|
|
|
|
os.makedirs(os.path.dirname(output_wav_path), exist_ok=True)
|
|
|
|
|
|
|
|
|
video = VideoFileClip(video_path)
|
|
|
|
|
|
|
|
|
if video.audio is None:
|
|
|
log_message(f"β οΈ No audio track found in video: {video_path}", "WARNING")
|
|
|
video.close()
|
|
|
return False
|
|
|
|
|
|
|
|
|
video.audio.write_audiofile(output_wav_path, logger=None)
|
|
|
video.close()
|
|
|
|
|
|
if os.path.exists(output_wav_path) and os.path.getsize(output_wav_path) > 0:
|
|
|
log_message(f"β
Successfully extracted audio to WAV: {output_wav_path}", "INFO")
|
|
|
return True
|
|
|
else:
|
|
|
log_message(f"β Audio extraction produced empty or missing file: {output_wav_path}", "ERROR")
|
|
|
return False
|
|
|
|
|
|
except Exception as e:
|
|
|
log_message(f"β An error occurred during audio extraction from {video_path}: {str(e)}", "ERROR")
|
|
|
return False
|
|
|
|
|
|
def upload_folder_to_hf(folder_path: str, repo_id: str, path_in_repo: str, commit_message: str) -> bool:
|
|
|
"""Uploads an entire folder's contents to a Hugging Face repository."""
|
|
|
log_message(f"β¬οΈ Uploading folder {folder_path} to {repo_id}/{path_in_repo}", "INFO")
|
|
|
try:
|
|
|
|
|
|
operations = []
|
|
|
for root, _, files in os.walk(folder_path):
|
|
|
for file in files:
|
|
|
local_path = os.path.join(root, file)
|
|
|
|
|
|
relative_path = os.path.relpath(local_path, folder_path)
|
|
|
repo_path = os.path.join(path_in_repo, relative_path)
|
|
|
|
|
|
operations.append(
|
|
|
CommitOperationAdd(path_in_repo=repo_path, path_or_fileobj=local_path)
|
|
|
)
|
|
|
|
|
|
if not operations:
|
|
|
log_message(f"β οΈ Folder {folder_path} is empty. Skipping upload.", "WARNING")
|
|
|
return True
|
|
|
|
|
|
|
|
|
hf_api.create_commit(
|
|
|
repo_id=repo_id,
|
|
|
operations=operations,
|
|
|
commit_message=commit_message,
|
|
|
repo_type="dataset"
|
|
|
)
|
|
|
|
|
|
log_message(f"β
Successfully uploaded {len(operations)} files from {folder_path}", "INFO")
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
log_message(f"β Failed to upload folder {folder_path} to Hugging Face: {str(e)}", "ERROR")
|
|
|
return False
|
|
|
|
|
|
def process_rar_file(rar_path: str) -> bool:
|
|
|
"""
|
|
|
Main processing logic for a single RAR file:
|
|
|
1. Extract RAR
|
|
|
2. Find video files (MP4s)
|
|
|
3. Extract audio from each video and convert to WAV
|
|
|
4. Upload each WAV file to HF one by one
|
|
|
5. Clean up local files
|
|
|
"""
|
|
|
rar_filename = os.path.basename(rar_path)
|
|
|
base_name = os.path.splitext(rar_filename)[0]
|
|
|
|
|
|
|
|
|
extract_dir = os.path.join(EXTRACT_FOLDER, base_name)
|
|
|
if not extract_rar(rar_path, extract_dir):
|
|
|
log_failed_file(rar_filename, "RAR extraction failed")
|
|
|
return False
|
|
|
|
|
|
video_files = []
|
|
|
|
|
|
for ext in ['*.mp4', '*.mkv', '*.avi', '*.mov', '*.webm']:
|
|
|
video_files.extend(Path(extract_dir).rglob(ext))
|
|
|
|
|
|
if not video_files:
|
|
|
log_message(f"β οΈ No video files found in extracted content for {rar_filename}", "WARNING")
|
|
|
|
|
|
shutil.rmtree(extract_dir, ignore_errors=True)
|
|
|
log_message(f"ποΈ Cleaned up extracted folder: {extract_dir}", "INFO")
|
|
|
log_failed_file(rar_filename, "No video files found")
|
|
|
return False
|
|
|
|
|
|
success_count = 0
|
|
|
|
|
|
for video_path_obj in video_files:
|
|
|
video_path = str(video_path_obj)
|
|
|
video_filename = video_path_obj.name
|
|
|
video_base_name = os.path.splitext(video_filename)[0]
|
|
|
|
|
|
|
|
|
wav_filename = f"{video_base_name}.wav"
|
|
|
wav_output_path = os.path.join(EXTRACT_FOLDER, wav_filename)
|
|
|
|
|
|
if not extract_audio_from_video(video_path, wav_output_path):
|
|
|
log_failed_file(rar_filename, f"Failed to extract audio from {video_filename}")
|
|
|
continue
|
|
|
|
|
|
|
|
|
path_in_repo = f"audio/{wav_filename}"
|
|
|
commit_message = f"Add audio: {wav_filename} extracted from {video_filename} in archive {rar_filename}"
|
|
|
|
|
|
try:
|
|
|
log_message(f"β¬οΈ Uploading audio: {wav_filename}", "INFO")
|
|
|
hf_api.upload_file(
|
|
|
path_or_fileobj=wav_output_path,
|
|
|
path_in_repo=path_in_repo,
|
|
|
repo_id=TARGET_REPO_ID,
|
|
|
repo_type="dataset",
|
|
|
commit_message=commit_message
|
|
|
)
|
|
|
log_message(f"β
Successfully uploaded audio: {wav_filename}", "INFO")
|
|
|
success_count += 1
|
|
|
processing_status["extracted_videos"] += 1
|
|
|
|
|
|
|
|
|
log_message(f"β³ Waiting 60 seconds before next upload...", "INFO")
|
|
|
time.sleep(60)
|
|
|
|
|
|
except Exception as e:
|
|
|
log_message(f"β Failed to upload audio {wav_filename}: {str(e)}", "ERROR")
|
|
|
log_failed_file(rar_filename, f"Failed to upload audio {wav_filename}: {str(e)}")
|
|
|
|
|
|
finally:
|
|
|
|
|
|
if os.path.exists(wav_output_path):
|
|
|
os.remove(wav_output_path)
|
|
|
log_message(f"ποΈ Cleaned up WAV file: {wav_output_path}", "INFO")
|
|
|
|
|
|
|
|
|
shutil.rmtree(extract_dir, ignore_errors=True)
|
|
|
log_message(f"ποΈ Cleaned up extracted folder: {extract_dir}", "INFO")
|
|
|
|
|
|
if success_count > 0:
|
|
|
processing_status["extracted_courses"] += 1
|
|
|
return True
|
|
|
else:
|
|
|
log_message(f"β All audio extraction/upload failed for {rar_filename}", "ERROR")
|
|
|
return False
|
|
|
|
|
|
def get_next_file_to_process(repo_id: str, state: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
|
"""
|
|
|
Finds the next file to process from the source repo.
|
|
|
Returns: { 'filename': str, 'url': str, 'index': int } or None
|
|
|
"""
|
|
|
log_message(f"π Searching for next file to process in {repo_id}", "INFO")
|
|
|
|
|
|
try:
|
|
|
|
|
|
files_list = hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset")
|
|
|
|
|
|
|
|
|
archive_files = sorted([f for f in files_list if f.endswith(('.rar', '.zip'))])
|
|
|
|
|
|
if not archive_files:
|
|
|
log_message("βΉοΈ No .rar or .zip files found in the source repository.", "INFO")
|
|
|
return None
|
|
|
|
|
|
processing_status["total_files"] = len(archive_files)
|
|
|
|
|
|
|
|
|
start_index = state.get("next_download_index", 0)
|
|
|
|
|
|
|
|
|
for index in range(start_index, len(archive_files)):
|
|
|
filename = archive_files[index]
|
|
|
|
|
|
|
|
|
file_state = state["file_states"].get(filename)
|
|
|
|
|
|
|
|
|
if file_state is None or file_state == "failed":
|
|
|
|
|
|
|
|
|
url = hf_hub_url(repo_id=repo_id, filename=filename, repo_type="dataset", subfolder=None)
|
|
|
|
|
|
log_message(f"β
Found next file: {filename} at index {index}", "INFO")
|
|
|
return {
|
|
|
'filename': filename,
|
|
|
'url': url,
|
|
|
'index': index
|
|
|
}
|
|
|
|
|
|
elif file_state == "processing":
|
|
|
log_message(f"β οΈ File {filename} is currently marked as 'processing'. Skipping for now.", "WARNING")
|
|
|
|
|
|
|
|
|
|
|
|
elif file_state == "processed":
|
|
|
log_message(f"βΉοΈ File {filename} already processed. Skipping.", "INFO")
|
|
|
|
|
|
log_message("βΉοΈ All files up to the current index have been processed or skipped.", "INFO")
|
|
|
|
|
|
|
|
|
if start_index >= len(archive_files):
|
|
|
log_message("βΉοΈ Reached end of file list. Resetting index to 0 for next loop.", "INFO")
|
|
|
state["next_download_index"] = 0
|
|
|
upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state)
|
|
|
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
log_message(f"β Failed to list files from Hugging Face: {str(e)}", "ERROR")
|
|
|
return None
|
|
|
|
|
|
def main_processing_loop():
|
|
|
"""The main loop that orchestrates the download, processing, and upload cycle."""
|
|
|
|
|
|
if processing_status["is_running"]:
|
|
|
log_message("β οΈ Processing loop is already running.", "WARNING")
|
|
|
return
|
|
|
|
|
|
processing_status["is_running"] = True
|
|
|
|
|
|
try:
|
|
|
log_message("π Starting main processing loop...", "INFO")
|
|
|
|
|
|
while processing_status["is_running"]:
|
|
|
|
|
|
|
|
|
current_state = download_hf_state(TARGET_REPO_ID, HF_STATE_FILE)
|
|
|
|
|
|
|
|
|
next_file_info = get_next_file_to_process(SOURCE_REPO_ID, current_state)
|
|
|
|
|
|
if next_file_info is None:
|
|
|
log_message("π€ No new files to process. Sleeping for a while...", "INFO")
|
|
|
time.sleep(PROCESSING_DELAY * 5)
|
|
|
continue
|
|
|
|
|
|
target_file = next_file_info['filename']
|
|
|
rar_url = next_file_info['url']
|
|
|
target_index = next_file_info['index']
|
|
|
|
|
|
processing_status["current_file"] = target_file
|
|
|
success = False
|
|
|
|
|
|
try:
|
|
|
|
|
|
if not lock_file_for_processing(target_file, current_state):
|
|
|
log_message(f"β Failed to lock file {target_file}. Skipping.", "ERROR")
|
|
|
time.sleep(PROCESSING_DELAY)
|
|
|
continue
|
|
|
|
|
|
|
|
|
local_rar_path = os.path.join(DOWNLOAD_FOLDER, target_file)
|
|
|
log_message(f"β¬οΈ Downloading file: {target_file}", "INFO")
|
|
|
|
|
|
if download_with_retry(rar_url, local_rar_path):
|
|
|
|
|
|
|
|
|
if process_rar_file(local_rar_path):
|
|
|
success = True
|
|
|
log_message(f"β
Finished all processing steps for: {target_file}", "INFO")
|
|
|
else:
|
|
|
log_message(f"β Processing failed for: {target_file}", "ERROR")
|
|
|
else:
|
|
|
log_message(f"β Download failed for: {target_file}", "ERROR")
|
|
|
|
|
|
except Exception as e:
|
|
|
log_message(f"π₯ An unhandled error occurred while processing {target_file}: {str(e)}", "ERROR")
|
|
|
log_failed_file(target_file, str(e))
|
|
|
|
|
|
finally:
|
|
|
|
|
|
|
|
|
|
|
|
next_index_to_save = target_index + 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
current_state = download_hf_state(TARGET_REPO_ID, HF_STATE_FILE)
|
|
|
|
|
|
if success:
|
|
|
|
|
|
unlock_file_as_processed(target_file, current_state, next_index_to_save)
|
|
|
processing_status["processed_files"] += 1
|
|
|
else:
|
|
|
|
|
|
|
|
|
log_message(f"β οΈ Processing failed for {target_file}. Marking as 'failed' and advancing index.", "WARNING")
|
|
|
current_state["file_states"][target_file] = "failed"
|
|
|
current_state["next_download_index"] = next_index_to_save
|
|
|
upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, current_state)
|
|
|
processing_status["failed_files"] += 1
|
|
|
|
|
|
|
|
|
if os.path.exists(local_rar_path):
|
|
|
os.remove(local_rar_path)
|
|
|
log_message(f"ποΈ Cleaned up local file: {local_rar_path}", "INFO")
|
|
|
|
|
|
|
|
|
time.sleep(PROCESSING_DELAY)
|
|
|
|
|
|
log_message("π Processing complete!", "INFO")
|
|
|
log_message(f'π Final stats: {processing_status["extracted_courses"]} courses extracted, {processing_status["extracted_videos"]} videos processed, frames extracted', "INFO")
|
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
log_message("βΉοΈ Processing interrupted by user", "WARNING")
|
|
|
except Exception as e:
|
|
|
log_message(f"β Fatal error: {str(e)}", "ERROR")
|
|
|
finally:
|
|
|
processing_status["is_running"] = False
|
|
|
cleanup_temp_files()
|
|
|
|
|
|
|
|
|
__all__ = [
|
|
|
"main_processing_loop",
|
|
|
"processing_status",
|
|
|
"log_message",
|
|
|
]
|
|
|
|