|
|
import os |
|
|
import json |
|
|
import requests |
|
|
import subprocess |
|
|
import shutil |
|
|
import time |
|
|
import threading |
|
|
import multiprocessing |
|
|
from typing import Dict, List, Optional |
|
|
from pathlib import Path |
|
|
from huggingface_hub import HfApi |
|
|
import uuid |
|
|
import frame_extractor |
|
|
|
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN", "") |
|
|
SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1") |
|
|
|
|
|
|
|
|
UPLOAD_DIRECTORY = "./uploads" |
|
|
DOWNLOAD_FOLDER = "./downloads" |
|
|
EXTRACT_FOLDER = "./extracted" |
|
|
MP4_OUTPUT_FOLDER = "./mp4_files" |
|
|
|
|
|
|
|
|
for directory in [UPLOAD_DIRECTORY, DOWNLOAD_FOLDER, EXTRACT_FOLDER, MP4_OUTPUT_FOLDER]: |
|
|
os.makedirs(directory, exist_ok=True) |
|
|
|
|
|
|
|
|
DOWNLOAD_STATE_FILE = "download_progress.json" |
|
|
PROCESS_STATE_FILE = "process_progress.json" |
|
|
FAILED_FILES_LOG = "failed_files.log" |
|
|
|
|
|
|
|
|
MAX_RETRIES = 3 |
|
|
MIN_FREE_SPACE_GB = 2 |
|
|
DEFAULT_RAR_LIMIT = 1 |
|
|
|
|
|
|
|
|
hf_api = HfApi(token=HF_TOKEN) if HF_TOKEN else None |
|
|
|
|
|
|
|
|
processing_status = { |
|
|
"is_running": False, |
|
|
"current_file": None, |
|
|
"total_files": 0, |
|
|
"processed_files": 0, |
|
|
"failed_files": 0, |
|
|
"extracted_courses": 0, |
|
|
"extracted_mp4s": 0, |
|
|
"last_update": None, |
|
|
"logs": [] |
|
|
} |
|
|
|
|
|
|
|
|
uploaded_mp4s = {} |
|
|
|
|
|
def log_message(message: str): |
|
|
"""Log messages with timestamp""" |
|
|
timestamp = time.strftime("%Y-%m-%d %H:%M:%S") |
|
|
log_entry = f"[{timestamp}] {message}" |
|
|
print(log_entry) |
|
|
processing_status["logs"].append(log_entry) |
|
|
processing_status["last_update"] = timestamp |
|
|
if len(processing_status["logs"]) > 100: |
|
|
processing_status["logs"] = processing_status["logs"][-100:] |
|
|
|
|
|
def log_failed_file(filename: str, error: str): |
|
|
"""Log failed files to persistent file""" |
|
|
timestamp = time.strftime("%Y-%m-%d %H:%M:%S") |
|
|
with open(FAILED_FILES_LOG, "a") as f: |
|
|
f.write(f"{timestamp} - {filename}: {error}\n") |
|
|
|
|
|
def get_disk_usage(path: str) -> Dict[str, float]: |
|
|
"""Get disk usage statistics in GB""" |
|
|
statvfs = os.statvfs(path) |
|
|
total = statvfs.f_frsize * statvfs.f_blocks / (1024**3) |
|
|
free = statvfs.f_frsize * statvfs.f_bavail / (1024**3) |
|
|
used = total - free |
|
|
return {"total": total, "free": free, "used": used} |
|
|
|
|
|
def check_disk_space(path: str = ".") -> bool: |
|
|
"""Check if there\'s enough disk space""" |
|
|
disk_info = get_disk_usage(path) |
|
|
if disk_info["free"] < MIN_FREE_SPACE_GB: |
|
|
log_message(f'β οΈ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used') |
|
|
return False |
|
|
return True |
|
|
|
|
|
def cleanup_temp_files(): |
|
|
"""Clean up temporary files to free space""" |
|
|
log_message("π§Ή Cleaning up temporary files...") |
|
|
|
|
|
|
|
|
current_file = processing_status.get("current_file") |
|
|
for file in os.listdir(DOWNLOAD_FOLDER): |
|
|
if file != current_file and file.endswith((".rar", ".zip")): |
|
|
try: |
|
|
os.remove(os.path.join(DOWNLOAD_FOLDER, file)) |
|
|
log_message(f"ποΈ Removed old download: {file}") |
|
|
except: |
|
|
pass |
|
|
|
|
|
def load_json_state(file_path: str, default_value): |
|
|
"""Load state from JSON file""" |
|
|
if os.path.exists(file_path): |
|
|
try: |
|
|
with open(file_path, "r") as f: |
|
|
return json.load(f) |
|
|
except json.JSONDecodeError: |
|
|
log_message(f"β οΈ Corrupted state file: {file_path}") |
|
|
return default_value |
|
|
|
|
|
def save_json_state(file_path: str, data): |
|
|
"""Save state to JSON file""" |
|
|
with open(file_path, "w") as f: |
|
|
json.dump(data, f, indent=2) |
|
|
|
|
|
def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool: |
|
|
"""Download file with retry logic and disk space checking""" |
|
|
if not check_disk_space(): |
|
|
cleanup_temp_files() |
|
|
if not check_disk_space(): |
|
|
log_message("β Insufficient disk space even after cleanup") |
|
|
return False |
|
|
|
|
|
headers = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} |
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
with requests.get(url, headers=headers, stream=True) as r: |
|
|
r.raise_for_status() |
|
|
|
|
|
|
|
|
content_length = r.headers.get("content-length") |
|
|
if content_length: |
|
|
size_gb = int(content_length) / (1024**3) |
|
|
disk_info = get_disk_usage(".") |
|
|
if size_gb > disk_info["free"] - 0.5: |
|
|
log_message(f'β File too large: {size_gb:.2f}GB, only {disk_info["free"]:.2f}GB free') |
|
|
return False |
|
|
|
|
|
with open(dest_path, "wb") as f: |
|
|
for chunk in r.iter_content(chunk_size=8192): |
|
|
f.write(chunk) |
|
|
return True |
|
|
except Exception as e: |
|
|
if attempt < max_retries - 1: |
|
|
time.sleep(2 ** attempt) |
|
|
continue |
|
|
log_message(f"β Download failed after {max_retries} attempts: {e}") |
|
|
return False |
|
|
return False |
|
|
|
|
|
def is_multipart_rar(filename: str) -> bool: |
|
|
"""Check if this is a multi-part RAR file""" |
|
|
return ".part" in filename.lower() and filename.lower().endswith(".rar") |
|
|
|
|
|
def get_rar_part_base(filename: str) -> str: |
|
|
"""Get the base name for multi-part RAR files""" |
|
|
if ".part" in filename.lower(): |
|
|
return filename.split(".part")[0] |
|
|
return filename.replace(".rar", "") |
|
|
|
|
|
def extract_with_retry(rar_path: str, output_dir: str, max_retries: int = 2) -> bool: |
|
|
"""Extract RAR with retry and recovery, handling multi-part archives""" |
|
|
filename = os.path.basename(rar_path) |
|
|
|
|
|
|
|
|
if is_multipart_rar(filename): |
|
|
base_name = get_rar_part_base(filename) |
|
|
first_part = f"{base_name}.part01.rar" |
|
|
first_part_path = os.path.join(os.path.dirname(rar_path), first_part) |
|
|
|
|
|
if not os.path.exists(first_part_path): |
|
|
log_message(f"β οΈ Multi-part RAR detected but first part not found: {first_part}") |
|
|
return False |
|
|
|
|
|
rar_path = first_part_path |
|
|
log_message(f"π¦ Processing multi-part RAR starting with: {first_part}") |
|
|
|
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
|
|
|
test_cmd = ["unrar", "t", rar_path] |
|
|
test_result = subprocess.run(test_cmd, capture_output=True, text=True) |
|
|
if test_result.returncode != 0: |
|
|
log_message(f"β οΈ RAR test failed: {test_result.stderr}") |
|
|
if attempt == max_retries - 1: |
|
|
return False |
|
|
continue |
|
|
|
|
|
|
|
|
cmd = ["unrar", "x", "-o+", rar_path, output_dir] |
|
|
if attempt > 0: |
|
|
cmd.insert(2, "-kb") |
|
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
|
if result.returncode == 0: |
|
|
log_message(f"β
Successfully extracted: {os.path.basename(rar_path)}") |
|
|
return True |
|
|
else: |
|
|
error_msg = result.stderr or result.stdout |
|
|
log_message(f"β οΈ Extraction attempt {attempt + 1} failed: {error_msg}") |
|
|
|
|
|
except Exception as e: |
|
|
log_message(f"β Extraction exception: {str(e)}") |
|
|
if attempt == max_retries - 1: |
|
|
return False |
|
|
time.sleep(1) |
|
|
|
|
|
return False |
|
|
|
|
|
def process_rar_file(rar_path: str) -> List[Dict]: |
|
|
"""Process a single RAR file - extract and find MP4 files""" |
|
|
filename = os.path.basename(rar_path) |
|
|
processing_status["current_file"] = filename |
|
|
|
|
|
|
|
|
if is_multipart_rar(filename): |
|
|
course_name = get_rar_part_base(filename) |
|
|
else: |
|
|
course_name = filename.replace(".rar", "") |
|
|
|
|
|
|
|
|
course_mp4_output_dir = os.path.join(MP4_OUTPUT_FOLDER, course_name) |
|
|
os.makedirs(course_mp4_output_dir, exist_ok=True) |
|
|
|
|
|
extract_dir = os.path.join(EXTRACT_FOLDER, course_name) |
|
|
mp4_files = [] |
|
|
|
|
|
try: |
|
|
log_message(f"π Processing: {filename}") |
|
|
|
|
|
|
|
|
if os.path.exists(extract_dir): |
|
|
shutil.rmtree(extract_dir, ignore_errors=True) |
|
|
|
|
|
|
|
|
os.makedirs(extract_dir, exist_ok=True) |
|
|
if not extract_with_retry(rar_path, extract_dir): |
|
|
raise Exception("RAR extraction failed") |
|
|
|
|
|
|
|
|
for root, dirs, files in os.walk(extract_dir): |
|
|
for file in files: |
|
|
if file.lower().endswith(".mp4"): |
|
|
source_path = os.path.join(root, file) |
|
|
|
|
|
dest_path = os.path.join(course_mp4_output_dir, file) |
|
|
|
|
|
try: |
|
|
shutil.copy2(source_path, dest_path) |
|
|
file_info = { |
|
|
"id": os.path.join(course_name, file), |
|
|
"original_name": file, |
|
|
"course_name": course_name, |
|
|
"size": os.path.getsize(dest_path), |
|
|
"path": dest_path, |
|
|
"created_at": time.strftime("%Y-%m-%d %H:%M:%S") |
|
|
} |
|
|
mp4_files.append(file_info) |
|
|
log_message(f"β
Extracted MP4: {file} -> {os.path.join(course_name, file)}") |
|
|
except Exception as e: |
|
|
log_message(f"β Failed to copy MP4 {file}: {e}") |
|
|
|
|
|
|
|
|
if mp4_files: |
|
|
log_message(f"ποΈ Starting frame extraction for {len(mp4_files)} MP4 files...") |
|
|
|
|
|
|
|
|
frames_dir = os.path.join(MP4_OUTPUT_FOLDER, f"{course_name}_frames") |
|
|
os.makedirs(frames_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
extraction_args = [ |
|
|
(mp4["path"], frames_dir, 10) |
|
|
for mp4 in mp4_files |
|
|
] |
|
|
|
|
|
|
|
|
cpu_count = multiprocessing.cpu_count() |
|
|
with multiprocessing.Pool(processes=cpu_count) as pool: |
|
|
results = pool.map(frame_extractor.extract_frames_from_video, extraction_args) |
|
|
|
|
|
|
|
|
total_frames = sum(count for count in results if count is not None) |
|
|
log_message(f"ποΈ Extracted {total_frames} frames from {len(mp4_files)} videos using {cpu_count} CPU cores") |
|
|
|
|
|
processing_status["extracted_courses"] += 1 |
|
|
processing_status["extracted_mp4s"] += len(mp4_files) |
|
|
log_message(f"β
Successfully processed '{course_name}' - found {len(mp4_files)} MP4 files") |
|
|
|
|
|
return mp4_files |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = str(e) |
|
|
log_message(f"β Processing failed: {error_msg}") |
|
|
log_failed_file(filename, error_msg) |
|
|
return [] |
|
|
|
|
|
finally: |
|
|
processing_status["current_file"] = None |
|
|
|
|
|
if os.path.exists(extract_dir): |
|
|
shutil.rmtree(extract_dir, ignore_errors=True) |
|
|
|
|
|
def process_hf_files_background(start_index: int = 9, limit: int = DEFAULT_RAR_LIMIT): |
|
|
"""Background task to process HuggingFace files""" |
|
|
if not hf_api: |
|
|
log_message("β HuggingFace API not configured (missing HF_TOKEN)") |
|
|
return |
|
|
|
|
|
processing_status["is_running"] = True |
|
|
|
|
|
try: |
|
|
|
|
|
processed_rars = load_json_state(PROCESS_STATE_FILE, {"processed_rars": []})["processed_rars"] |
|
|
download_state = load_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": 9}) |
|
|
|
|
|
|
|
|
current_index = start_index if start_index > 0 else download_state["next_download_index"] |
|
|
|
|
|
log_message(f"π Starting processing from index {current_index} with a limit of {limit} files.") |
|
|
log_message(f"π Previously processed: {len(processed_rars)} files") |
|
|
|
|
|
|
|
|
try: |
|
|
files = list(hf_api.list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset")) |
|
|
rar_files = sorted([f for f in files if f.endswith(".rar")]) |
|
|
|
|
|
processing_status["total_files"] = len(rar_files) |
|
|
log_message(f"π Found {len(rar_files)} RAR files in repository") |
|
|
|
|
|
if current_index >= len(rar_files): |
|
|
log_message("β
All files have been processed!") |
|
|
return |
|
|
|
|
|
except Exception as e: |
|
|
log_message(f"β Failed to get file list: {str(e)}") |
|
|
return |
|
|
|
|
|
processed_count = 0 |
|
|
while processed_count < limit and current_index < len(rar_files) and processing_status["is_running"]: |
|
|
rar_file = rar_files[current_index] |
|
|
filename = os.path.basename(rar_file) |
|
|
|
|
|
if filename in processed_rars: |
|
|
log_message(f"βοΈ Skipping already processed: {filename}") |
|
|
processing_status["processed_files"] += 1 |
|
|
current_index += 1 |
|
|
save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": current_index}) |
|
|
continue |
|
|
|
|
|
log_message(f"π₯ Downloading: {filename}") |
|
|
dest_path = os.path.join(DOWNLOAD_FOLDER, filename) |
|
|
|
|
|
|
|
|
download_url = f"https://huggingface.co/datasets/{SOURCE_REPO_ID}/resolve/main/{rar_file}" |
|
|
if download_with_retry(download_url, dest_path): |
|
|
|
|
|
mp4_files = process_rar_file(dest_path) |
|
|
if mp4_files: |
|
|
processed_rars.append(filename) |
|
|
save_json_state(PROCESS_STATE_FILE, {"processed_rars": processed_rars}) |
|
|
log_message(f"β
Successfully processed: {filename}") |
|
|
processing_status["processed_files"] += 1 |
|
|
else: |
|
|
log_message(f"β Failed to process: {filename}") |
|
|
processing_status["failed_files"] += 1 |
|
|
|
|
|
|
|
|
try: |
|
|
os.remove(dest_path) |
|
|
log_message(f"ποΈ Cleaned up download: {filename}") |
|
|
except: |
|
|
pass |
|
|
else: |
|
|
log_message(f"β Failed to download: {filename}") |
|
|
processing_status["failed_files"] += 1 |
|
|
|
|
|
|
|
|
current_index += 1 |
|
|
processed_count += 1 |
|
|
save_json_state(DOWNLOAD_STATE_FILE, {"next_download_index": current_index}) |
|
|
|
|
|
if current_index >= len(rar_files): |
|
|
log_message("π All available RAR files have been processed!") |
|
|
elif not processing_status["is_running"]: |
|
|
log_message("βΉοΈ Processing stopped by request.") |
|
|
else: |
|
|
log_message(f"β
Processed {processed_count} RAR files. Next index to process: {current_index}") |
|
|
|
|
|
except Exception as e: |
|
|
log_message(f"β Fatal error in background processing: {str(e)}") |
|
|
|
|
|
|