File size: 41,482 Bytes
c64d671 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 |
import os
import json
import requests
import subprocess
import shutil
import asyncio
import threading
import time
import hashlib
import zipfile
import uvicorn
from typing import Dict, List, Set, Optional
from fastapi import FastAPI, BackgroundTasks, HTTPException, Form
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware
from huggingface_hub import HfApi, list_repo_files
from huggingface_hub.utils import HfHubHTTPError
# ==== CONFIGURATION ====
HF_TOKEN = os.getenv("HF_TOKEN", "")
SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1") # Source for RARs
DEST_REPO_ID_RAR = os.getenv("DEST_REPO_RAR", "") # Destination for extracted RAR contents (set to empty string if not needed)
# DEST_REPO_ID_VIDEO = os.getenv("DEST_REPO_VIDEO", "Fred808/BG3") # Destination for zipped video frames - DISABLED FOR DOWNLOAD MODE
DOWNLOAD_FOLDER = "downloads"
EXTRACT_FOLDER = "extracted_tmp"
VIDEO_FRAMES_EXTRACT_FOLDER = "video_frames_tmp"
ZIPPED_FRAMES_FOLDER = "zipped_frames" # This will now store files for download instead of upload
DOWNLOAD_STATE_FILE = "download_progress.json"
PROCESS_STATE_FILE = "process_progress.json"
UPLOADED_FOLDERS_FILE = "uploaded_folders.json" # Track uploaded folder hashes for BG2
PROCESSED_VIDEO_COURSES_FILE = "processed_video_courses.json" # Track processed video course folders for BG3
FAILED_FILES_LOG = "failed_files.txt"
CHUNK_SIZE = 3 # Smaller chunks for Space environment
PROCESSING_DELAY = 2 # Delay between processing files (seconds)
VIDEO_FRAME_FPS = 3 # Frames per second to extract from videos
os.makedirs(DOWNLOAD_FOLDER, exist_ok=True)
os.makedirs(EXTRACT_FOLDER, exist_ok=True)
os.makedirs(VIDEO_FRAMES_EXTRACT_FOLDER, exist_ok=True)
os.makedirs(ZIPPED_FRAMES_FOLDER, exist_ok=True)
api = HfApi(token=HF_TOKEN)
# Global state
processing_status = {
"is_running": False,
"current_file": None,
"total_files": 0,
"processed_files": 0,
"failed_files": 0,
"uploaded_rar_folders": 0,
"extracted_video_courses": 0, # Changed from uploaded_video_courses to extracted_video_courses
"last_update": None,
"logs": []
}
app = FastAPI(title="RAR & Video Processing Service", description="Automated RAR extraction and video frame extraction service with download capability")
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def log_message(message: str):
"""Add message to logs and print it"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
print(log_entry)
processing_status["logs"].append(log_entry)
processing_status["last_update"] = timestamp
# Keep only last 100 log entries
if len(processing_status["logs"]) > 100:
processing_status["logs"] = processing_status["logs"][-100:]
def log_failed_file(filename: str, error_msg: str):
"""Log failed files to a separate file for later review"""
with open(FAILED_FILES_LOG, "a") as f:
f.write(f"{filename}: {error_msg}\n")
log_message(f"β Failed: {filename} - {error_msg}")
def get_folder_hash(folder_name: str) -> str:
"""Generate a hash for the folder name to use as a unique identifier"""
return hashlib.md5(folder_name.encode()).hexdigest()
def load_uploaded_folders() -> Set[str]:
"""Load set of uploaded folder hashes for BG2"""
if os.path.exists(UPLOADED_FOLDERS_FILE):
try:
with open(UPLOADED_FOLDERS_FILE, "r") as f:
data = json.load(f)
return set(data.get("uploaded_folder_hashes", []))
except json.JSONDecodeError:
log_message(f"β οΈ Warning: Could not decode {UPLOADED_FOLDERS_FILE}. Starting with empty set.")
return set()
return set()
def save_uploaded_folders(uploaded_set: Set[str]):
"""Save set of uploaded folder hashes for BG2"""
with open(UPLOADED_FOLDERS_FILE, "w") as f:
json.dump({"uploaded_folder_hashes": list(uploaded_set)}, f)
def load_processed_video_courses() -> Set[str]:
"""Loads the set of processed video course folder names."""
if os.path.exists(PROCESSED_VIDEO_COURSES_FILE):
try:
with open(PROCESSED_VIDEO_COURSES_FILE, "r") as f:
return set(json.load(f))
except json.JSONDecodeError:
log_message(f"β οΈ Warning: Could not decode {PROCESSED_VIDEO_COURSES_FILE}. Starting with empty set.")
return set()
return set()
def save_processed_video_courses(processed_set: set):
"""Saves the set of processed video course folder names to a file."""
with open(PROCESSED_VIDEO_COURSES_FILE, "w") as f:
json.dump(list(processed_set), f)
def load_download_state() -> int:
"""Load download progress state"""
if os.path.exists(DOWNLOAD_STATE_FILE):
try:
with open(DOWNLOAD_STATE_FILE, "r") as f:
return json.load(f).get("next_download_index", 0)
except json.JSONDecodeError:
log_message(f"β οΈ Warning: Could not decode {DOWNLOAD_STATE_FILE}. Starting download from index 0.")
return 0
return 0
def save_download_state(next_index: int):
"""Save download progress state"""
with open(DOWNLOAD_STATE_FILE, "w") as f:
json.dump({"next_download_index": next_index}, f)
def load_processed_files_state() -> set:
"""Load processed files from the state file"""
if os.path.exists(PROCESS_STATE_FILE):
try:
with open(PROCESS_STATE_FILE, "r") as f:
data = json.load(f)
return set(data.get("processed_rars", []))
except json.JSONDecodeError:
log_message(f"β οΈ Warning: Could not decode {PROCESS_STATE_FILE}. Starting with empty processed list.")
return set()
return set()
def save_processed_files_state(processed_set: set):
"""Save processed files to the state file"""
with open(PROCESS_STATE_FILE, "w") as f:
json.dump({"processed_rars": list(processed_set)}, f)
def download_rar_files(start_index: int, chunk_size: int) -> tuple:
"""Downloads a batch of RAR files from the source dataset"""
try:
all_files = list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset", token=HF_TOKEN)
# Filter for .rar files and exclude the specific one
rar_files_in_repo = sorted([f for f in all_files if f.endswith(".rar") and "ZBrush/3DConceptArtist_TheUltimateZbrushGuide_DownloadPirate.com.rar" not in f])
end_index = start_index + chunk_size
files_to_download_metadata = rar_files_in_repo[start_index:end_index]
if not files_to_download_metadata:
log_message("β
No more RAR files to download.")
return [], start_index
log_message(f"π₯ Downloading RAR files {start_index + 1} to {end_index} from {SOURCE_REPO_ID}")
downloaded_paths = []
for file_path_in_repo in files_to_download_metadata:
filename = os.path.basename(file_path_in_repo)
dest_path = os.path.join(DOWNLOAD_FOLDER, filename)
file_url = f"https://huggingface.co/datasets/{SOURCE_REPO_ID}/resolve/main/{file_path_in_repo}"
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
if os.path.exists(dest_path):
log_message(f"β© Already exists, skipping: {filename}")
downloaded_paths.append(dest_path)
continue
log_message(f"π½ Downloading: {file_path_in_repo}")
try:
with requests.get(file_url, headers=headers, stream=True) as r:
r.raise_for_status()
with open(dest_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
log_message(f"β
Downloaded: {filename}")
downloaded_paths.append(dest_path)
except Exception as e:
log_message(f"β Failed to download {file_path_in_repo}: {e}")
log_failed_file(file_path_in_repo, f"Download failed: {e}")
return downloaded_paths, end_index
except Exception as e:
log_message(f"β Error in download_rar_files: {e}")
return [], start_index
def extract_frames(video_path: str, output_folder: str, fps: int) -> bool:
"""Extracts frames from a video using ffmpeg."""
os.makedirs(output_folder, exist_ok=True)
# Ensure ffmpeg is available
if shutil.which("ffmpeg") is None:
log_message("β ffmpeg not found. Please install ffmpeg.")
return False
# ffmpeg command to extract frames at specified FPS
command = [
"ffmpeg",
"-i", video_path,
"-vf", f"fps={fps}",
os.path.join(output_folder, "frame_%04d.png")
]
log_message(f"πΌοΈ Extracting frames from {os.path.basename(video_path)} to {output_folder} at {fps} FPS...")
try:
subprocess.run(command, check=True, capture_output=True, text=True)
log_message(f"β
Successfully extracted frames from {os.path.basename(video_path)}")
return True
except subprocess.CalledProcessError as e:
log_message(f"β Error extracting frames from {os.path.basename(video_path)}: {e.stderr}")
return False
def zip_folder(folder_path: str, output_zip_path: str) -> bool:
"""Zips the contents of a folder."""
log_message(f"π¦ Compressing {folder_path} to {output_zip_path}...")
try:
with zipfile.ZipFile(output_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, folder_path)
zipf.write(file_path, arcname)
log_message(f"β
Successfully zipped {folder_path}")
return True
except Exception as e:
log_message(f"β Error zipping {folder_path}: {e}")
return False
def upload_file_to_hf(local_path: str, path_in_repo: str, repo_id: str, max_retries: int = 5, initial_delay: int = 5) -> bool:
"""Uploads a single file to Hugging Face Hub with retry logic and exponential backoff."""
log_message(f"β¬οΈ Uploading {os.path.basename(local_path)} to {repo_id}/{path_in_repo}")
for attempt in range(max_retries):
try:
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=path_in_repo,
repo_id=repo_id,
repo_type="dataset"
)
log_message(f"β
Uploaded: {os.path.basename(local_path)}")
return True
except HfHubHTTPError as e:
if e.response.status_code == 429 and attempt < max_retries - 1: # Too Many Requests
delay = initial_delay * (2 ** attempt)
log_message(f"β οΈ Rate limit hit. Retrying in {delay} seconds... (Attempt {attempt + 1}/{max_retries})")
time.sleep(delay)
else:
log_message(f"β Hugging Face Hub error uploading {os.path.basename(local_path)}: {e}")
return False
except Exception as e:
log_message(f"β Error uploading {os.path.basename(local_path)}: {e}")
return False
log_message(f"β Failed to upload {os.path.basename(local_path)} after {max_retries} attempts.")
return False
def process_video_frames_for_download(extracted_rar_folder: str, processed_video_courses_set: Set[str]) -> bool:
"""Scans an extracted RAR folder for MP4s, extracts frames, zips, and saves for download."""
video_processed_successfully = False
# Use the top-level folder name of the extracted RAR as the course folder name
course_folder_name = os.path.basename(extracted_rar_folder)
if course_folder_name in processed_video_courses_set:
log_message(f"β© Video frames for course '{course_folder_name}' already processed. Skipping.")
return True
log_message(f"π¬ Processing videos in extracted RAR folder: {course_folder_name}")
video_files_found = []
for root, _, files in os.walk(extracted_rar_folder):
for file in files:
# Check for common video file extensions
if file.lower().endswith(('.mp4', '.avi', '.mov', '.mkv', '.flv', '.wmv')):
video_files_found.append(os.path.join(root, file))
if not video_files_found:
log_message(f"β οΈ No video files found in {course_folder_name}. Skipping video frame extraction.")
return False # Indicate no video processing was done
course_video_extract_dir = os.path.join(VIDEO_FRAMES_EXTRACT_FOLDER, course_folder_name)
os.makedirs(course_video_extract_dir, exist_ok=True)
frames_extracted_count = 0
for video_path in video_files_found:
video_basename = os.path.splitext(os.path.basename(video_path))[0]
# Create a unique output folder for frames from this video within the course's frame directory
video_output_folder = os.path.join(course_video_extract_dir, video_basename)
if extract_frames(video_path, video_output_folder, VIDEO_FRAME_FPS):
frames_extracted_count += 1
else:
log_message(f"β Failed to extract frames from {os.path.basename(video_path)}. Continuing with other videos.")
# Clean up partially extracted frames for this video
if os.path.exists(video_output_folder):
shutil.rmtree(video_output_folder)
# Check if any frames were extracted for the entire course folder
if frames_extracted_count == 0:
log_message(f"β οΈ No frames extracted for any video in {course_folder_name}. Skipping zipping.")
if os.path.exists(course_video_extract_dir):
shutil.rmtree(course_video_extract_dir)
return False
course_zip_path = os.path.join(ZIPPED_FRAMES_FOLDER, f"{course_folder_name}_frames.zip")
if zip_folder(course_video_extract_dir, course_zip_path):
log_message(f"β
Successfully processed video frames and saved {course_folder_name}_frames.zip for download")
processed_video_courses_set.add(course_folder_name) # Mark as processed
save_processed_video_courses(processed_video_courses_set) # Save state
video_processed_successfully = True
# Clean up the temporary extraction folder but keep the zip file for download
log_message(f"π§Ή Cleaning up temporary video frame files for {course_folder_name}")
if os.path.exists(course_video_extract_dir):
shutil.rmtree(course_video_extract_dir)
else:
log_message(f"β Failed to zip video frames for {course_folder_name}")
return video_processed_successfully
def extract_and_upload_rar(rar_path: str, processed_rars_set: set, uploaded_folders_set: Set[str], processed_video_courses_set: Set[str]) -> bool:
"""Extracts a single RAR file, uploads its contents to BG2 (if DEST_REPO_ID_RAR is set), and then processes videos for download"""
filename = os.path.basename(rar_path)
processing_status["current_file"] = filename
folder_name = filename.replace(".rar", "")
folder_hash = get_folder_hash(folder_name)
current_extract_folder = os.path.join(EXTRACT_FOLDER, f"{folder_name}_extracted")
# Check if RAR is already processed (uploaded to BG2 or video frames processed)
# This logic needs to be careful. If BG2 is not set, we only care about video processing.
# If video processing is not needed, we only care about BG2 upload.
is_bg2_processed = (not DEST_REPO_ID_RAR) or (folder_hash in uploaded_folders_set)
is_bg3_processed = (folder_name in processed_video_courses_set)
if filename in processed_rars_set and is_bg2_processed and is_bg3_processed:
log_message(f"β© {filename} already fully processed, skipping.")
return True
# If BG2 upload is enabled and folder already uploaded to BG2, skip RAR extraction/upload to BG2
# but still proceed to video processing if not already done.
if DEST_REPO_ID_RAR and folder_hash in uploaded_folders_set and not is_bg3_processed:
log_message(f"π Folder '{folder_name}' already uploaded to BG2 (hash: {folder_hash[:8]}...), skipping RAR upload.")
# If the extracted folder doesn't exist, we can't process videos from it.
# This scenario might happen if the previous run was interrupted after BG2 upload but before video processing cleanup.
if not os.path.exists(current_extract_folder):
log_message(f"β οΈ Extracted folder {current_extract_folder} not found for video processing. Attempting re-extraction for video processing.")
# Fall through to re-extract and process videos
else:
# Proceed to video processing if not already done
log_message(f"Continuing with video processing for {filename}.")
video_processed = process_video_frames_for_download(current_extract_folder, processed_video_courses_set)
if video_processed:
processed_rars_set.add(filename)
save_processed_files_state(processed_rars_set)
return video_processed
log_message(f"π¦ Attempting to extract: {filename}")
os.makedirs(current_extract_folder, exist_ok=True)
try:
if shutil.which("unrar") is None:
raise RuntimeError("unrar command not found. Please install unrar.")
# Use -idq to suppress query messages and -o+ to overwrite without prompting
unrar_command = ["unrar", "x", "-o+", rar_path, current_extract_folder]
log_message(f"Running command: {' '.join(unrar_command)}")
result = subprocess.run(
unrar_command,
check=True,
capture_output=True,
text=True,
encoding='utf-8'
)
extracted_contents = os.listdir(current_extract_folder)
if not extracted_contents:
raise Exception("Extraction completed but no files were produced in the target directory.")
log_message(f"Successfully extracted {len(extracted_contents)} items")
# Upload extracted files to BG2 if DEST_REPO_ID_RAR is set
if DEST_REPO_ID_RAR:
upload_count = 0
for root, _, files in os.walk(current_extract_folder):
for file in files:
local_path = os.path.join(root, file)
# Construct path in repo relative to the extracted content's root
path_in_repo = os.path.join(folder_name, os.path.relpath(local_path, current_extract_folder))
log_message(f"β¬οΈ Uploading to BG2: {path_in_repo}")
try:
if upload_file_to_hf(
local_path=local_path,
path_in_repo=path_in_repo,
repo_id=DEST_REPO_ID_RAR
):
upload_count += 1
else:
log_message(f"β Failed to upload {path_in_repo} to BG2. Skipping remaining uploads for this RAR.")
# Consider if you want to fail the whole RAR processing here or continue.
# For now, we'll continue but log the failure.
except Exception as upload_error:
log_message(f"β Failed to upload {path_in_repo} to BG2: {upload_error}")
# Don't re-raise, allow other files to be attempted
if upload_count > 0: # Only mark as uploaded if at least one file was successfully uploaded
log_message(f"β
Successfully uploaded {upload_count} files from {filename} to BG2")
# Mark folder as uploaded to BG2 using hash
uploaded_folders_set.add(folder_hash)
save_uploaded_folders(uploaded_folders_set)
processing_status["uploaded_rar_folders"] = len(uploaded_folders_set)
log_message(f"π Folder '{folder_name}' locked in BG2 repo (hash: {folder_hash[:8]}...)")
else:
log_message(f"β οΈ No files were successfully uploaded from {filename} to BG2.")
else:
log_message("Skipping upload to BG2 as DEST_REPO_ID_RAR is not set.")
# Now process video frames from the extracted content for download
video_processed = process_video_frames_for_download(current_extract_folder, processed_video_courses_set)
# Mark RAR as processed only if both BG2 (if enabled) and video processing are successful
# Or if BG2 is not enabled, only video processing needs to be successful
if (not DEST_REPO_ID_RAR or (folder_hash in uploaded_folders_set)) and video_processed:
processed_rars_set.add(filename)
save_processed_files_state(processed_rars_set)
processing_status["processed_files"] = len(processed_rars_set)
processing_status["extracted_video_courses"] = len(processed_video_courses_set)
return True
elif DEST_REPO_ID_RAR and not (folder_hash in uploaded_folders_set):
log_message(f"β RAR processing failed for {filename}: BG2 upload was not successful.")
return False
elif not video_processed:
log_message(f"β RAR processing failed for {filename}: Video frame processing was not successful.")
return False
else:
# This case should ideally not be reached if the above logic is exhaustive
log_message(f"β RAR processing failed for {filename}: Unknown reason.")
return False
except subprocess.CalledProcessError as e:
error_msg = f"RAR extraction failed (exit {e.returncode}): {e.stderr.strip()}"
log_failed_file(filename, error_msg)
processing_status["failed_files"] += 1
return False
except Exception as e:
error_msg = f"Unexpected error during processing {filename}: {str(e)}"
log_failed_file(filename, error_msg)
processing_status["failed_files"] += 1
return False
finally:
# Always cleanup the extraction folder after processing (success or failure)
if os.path.exists(current_extract_folder):
log_message(f"π§Ή Cleaning up extracted RAR files in {current_extract_folder}")
try:
shutil.rmtree(current_extract_folder)
log_message(f"β
Cleaned up RAR extraction folder")
except Exception as e:
log_message(f"β οΈ Could not clean up RAR extraction folder {current_extract_folder}: {e}")
def continuous_processing(start_download_index: Optional[int] = None):
"""Main processing loop that runs continuously, with an optional starting download index"""
processing_status["is_running"] = True
log_message("π Starting continuous RAR and Video processing...")
try:
# Load uploaded folders tracking for BG2
uploaded_folders = load_uploaded_folders()
processing_status["uploaded_rar_folders"] = len(uploaded_folders)
# Load processed video courses tracking
processed_video_courses = load_processed_video_courses()
processing_status["extracted_video_courses"] = len(processed_video_courses)
if start_download_index is not None:
log_message(f"Starting download from index: {start_download_index}")
save_download_state(start_download_index) # Set download state to start from this index
else:
log_message("Starting download from saved state or beginning.")
while processing_status["is_running"]:
# 1. Download a batch of RAR files
download_start_index = load_download_state()
downloaded_rar_paths, next_download_index = download_rar_files(download_start_index, CHUNK_SIZE)
save_download_state(next_download_index)
# 2. Process all available RAR files (downloaded + existing)
all_local_rars = sorted([os.path.join(DOWNLOAD_FOLDER, f) for f in os.listdir(DOWNLOAD_FOLDER) if f.endswith(".rar")])
processed_rars = load_processed_files_state()
processing_status["total_files"] = len(all_local_rars)
# Recalculate processed_files based on actual processed_rars_set to be accurate
processing_status["processed_files"] = len(processed_rars)
# Filter out RARs that are already fully processed based on current state
rars_to_process = []
for rar_file_path in all_local_rars:
filename = os.path.basename(rar_file_path)
folder_name = filename.replace(".rar", "")
is_bg2_processed = (not DEST_REPO_ID_RAR) or (get_folder_hash(folder_name) in uploaded_folders)
is_bg3_processed = (folder_name in processed_video_courses)
if not (filename in processed_rars and is_bg2_processed and is_bg3_processed):
rars_to_process.append(rar_file_path)
if not downloaded_rar_paths and not rars_to_process:
log_message("β
No more RAR files to download or process. Stopping...")
break
for rar_file_path in rars_to_process:
if not processing_status["is_running"]:
break
filename = os.path.basename(rar_file_path)
success = extract_and_upload_rar(rar_file_path, processed_rars, uploaded_folders, processed_video_courses)
if success:
# Delete the RAR file after successful processing
log_message(f"ποΈ Deleting processed RAR: {filename}")
try:
os.remove(rar_file_path)
log_message(f"β
Deleted RAR file: {filename}")
except Exception as e:
log_message(f"β οΈ Could not delete {rar_file_path}: {e}")
# Add delay between processing files
time.sleep(PROCESSING_DELAY)
# If no new files were downloaded and all local files are processed, we're done
if not downloaded_rar_paths and not rars_to_process:
break
except Exception as e:
log_message(f"β Error in continuous processing: {e}")
finally:
processing_status["is_running"] = False
processing_status["current_file"] = None
log_message("π Processing stopped")
@app.get("/", response_class=HTMLResponse)
async def root():
"""Serve the main HTML interface"""
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>RAR & Video Processing Service</title>
<meta charset=\"utf-8\">
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1\">
<style>
body { font-family: Arial, sans-serif; margin: 20px; background-color: #f5f5f5; }
.container { max-width: 1200px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
.status-card { background: #e3f2fd; padding: 15px; border-radius: 5px; margin: 10px 0; }
.logs { background: #f5f5f5; padding: 15px; border-radius: 5px; height: 400px; overflow-y: auto; font-family: monospace; font-size: 12px; }
.button { background: #2196F3; color: white; padding: 10px 20px; border: none; border-radius: 5px; cursor: pointer; margin: 5px; }
.button:hover { background: #1976D2; }
.button:disabled { background: #ccc; cursor: not-allowed; }
.stop-button { background: #f44336; }
.stop-button:hover { background: #d32f2f; }
.download-button { background: #4CAF50; }
.download-button:hover { background: #45a049; }
.stats { display: flex; gap: 20px; margin: 20px 0; }
.stat-item { background: #f0f0f0; padding: 10px; border-radius: 5px; text-align: center; flex: 1; }
.start-form { margin-top: 20px; padding: 15px; border: 1px solid #ddd; border-radius: 5px; background: #f9f9f9; }
.start-form input[type=\"number\"] { width: calc(100% - 120px); padding: 8px; margin-right: 10px; border: 1px solid #ccc; border-radius: 4px; }
.start-form button { padding: 8px 15px; background: #4CAF50; color: white; border: none; border-radius: 4px; cursor: pointer; }
.start-form button:hover { background: #45a049; }
.downloads-section { margin-top: 30px; padding: 20px; border: 1px solid #ddd; border-radius: 5px; background: #f9f9f9; }
.download-list { max-height: 300px; overflow-y: auto; }
.download-item { display: flex; justify-content: space-between; align-items: center; padding: 10px; border-bottom: 1px solid #eee; }
.download-item:last-child { border-bottom: none; }
</style>
</head>
<body>
<div class=\"container\">
<h1>π RAR & Video Processing Service</h1>
<p>Automated extraction and upload of RAR files from BG1 to BG2 dataset, and video frame extraction for download</p>
<div class=\"status-card\">
<h3>Status: <span id=\"status\">Stopped</span></h3>
<p>Current File: <span id=\"current-file\">None</span></p>
<p>Last Update: <span id=\"last-update\">Never</span></p>
</div>
<div class=\"stats\">
<div class=\"stat-item\">
<h4>Total Files (RARs)</h4>
<span id=\"total-files\">0</span>
</div>
<div class=\"stat-item\">
<h4>Processed (RARs)</h4>
<span id=\"processed-files\">0</span>
</div>
<div class=\"stat-item\">
<h4>Uploaded Folders (BG2)</h4>
<span id=\"uploaded-rar-folders\">0</span>
</div>
<div class=\"stat-item\">
<h4>Extracted Video Courses</h4>
<span id=\"extracted-video-courses\">0</span>
</div>
<div class=\"stat-item\">
<h4>Failed</h4>
<span id=\"failed-files\">0</span>
</div>
</div>
<div class=\"start-form\">
<h3>Start Processing from Specific Download Index</h3>
<input type=\"number\" id=\"start-index-input\" placeholder=\"Enter start index (e.g., 0)\" value=\"0\">
<button onclick=\"startProcessingWithIndex()\">Start from Index</button>
</div>
<div>
<button class=\"button\" onclick=\"startProcessing()\" id=\"start-btn\">Start Processing (from last saved index)</button>
<button class=\"button stop-button\" onclick=\"stopProcessing()\" id=\"stop-btn\" disabled>Stop Processing</button>
<button class=\"button\" onclick=\"refreshStatus()\">Refresh Status</button>
<button class=\"button download-button\" onclick=\"refreshDownloads()\">Refresh Downloads</button>
</div>
<div class=\"downloads-section\">
<h3>Available Downloads</h3>
<div class=\"download-list\" id=\"download-list\">
<p>Loading...</p>
</div>
</div>
<h3>Logs</h3>
<div class=\"logs\" id=\"logs\">Loading...</div>
</div>
<script>
async function startProcessing() {
try {
const response = await fetch(\"/start\", { method: \"POST\" });
const result = await response.json();
alert(result.message);
refreshStatus();
} catch (error) {
alert(\"Error starting processing: \" + error.message);
}
}
async function startProcessingWithIndex() {
const index = document.getElementById(\"start-index-input\").value;
if (index === \"\" || isNaN(index)) {
alert(\"Please enter a valid number for the start index.\");
return;
}
try {
const response = await fetch(\"/start_from_index\", {
method: \"POST\",
headers: { \"Content-Type\": \"application/x-www-form-urlencoded\" },
body: `start_index=${parseInt(index)}`
});
const result = await response.json();
alert(result.message);
refreshStatus();
} catch (error) {
alert(\"Error starting processing from index: \" + error.message);
}
}
async function stopProcessing() {
try {
const response = await fetch(\"/stop\", { method: \"POST\" });
const result = await response.json();
alert(result.message);
refreshStatus();
} catch (error) {
alert(\"Error stopping processing: \" + error.message);
}
}
async function refreshStatus() {
try {
const response = await fetch(\"/status\");
const status = await response.json();
document.getElementById(\"status\").textContent = status.is_running ? \"Running\" : \"Stopped\";
document.getElementById(\"current-file\").textContent = status.current_file || \"None\";
document.getElementById(\"last-update\").textContent = status.last_update || \"Never\";
document.getElementById(\"total-files\").textContent = status.total_files;
document.getElementById(\"processed-files\").textContent = status.processed_files;
document.getElementById(\"uploaded-rar-folders\").textContent = status.uploaded_rar_folders;
document.getElementById(\"extracted-video-courses\").textContent = status.extracted_video_courses;
document.getElementById(\"failed-files\").textContent = status.failed_files;
document.getElementById(\"start-btn\").disabled = status.is_running;
document.getElementById(\"stop-btn\").disabled = !status.is_running;
const logsDiv = document.getElementById(\"logs\");
logsDiv.innerHTML = status.logs.join(\"<br>\");
logsDiv.scrollTop = logsDiv.scrollHeight;
} catch (error) {
console.error(\"Error refreshing status:\", error);
}
}
async function refreshDownloads() {
try {
const response = await fetch(\"/downloads\");
const downloads = await response.json();
const downloadList = document.getElementById(\"download-list\");
if (downloads.files.length === 0) {
downloadList.innerHTML = \"<p>No downloads available yet.</p>\";
} else {
downloadList.innerHTML = downloads.files.map(file =>
`<div class=\"download-item\">
<span>${file.name} (${file.size})</span>
<a href=\"/download/${file.name}\" class=\"button download-button\" download>Download</a>
</div>`
).join(\"\");
}
} catch (error) {
console.error(\"Error refreshing downloads:\", error);
document.getElementById(\"download-list\").innerHTML = \"<p>Error loading downloads.</p>\";
}
}
// Auto-refresh every 5 seconds
setInterval(refreshStatus, 5000);
setInterval(refreshDownloads, 10000);
// Initial load
refreshStatus();
refreshDownloads();
</script>
</body>
</html>
"""
return HTMLResponse(content=html_content)
@app.get("/status")
async def get_status():
"""Get current processing status"""
return JSONResponse(content=processing_status)
@app.post("/start")
async def start_processing(background_tasks: BackgroundTasks):
"""Start the processing in background"""
if processing_status["is_running"]:
return {"message": "Processing is already running"}
background_tasks.add_task(continuous_processing)
return {"message": "Processing started"}
@app.post("/start_from_index")
async def start_processing_from_index(background_tasks: BackgroundTasks, start_index: int = Form(...)):
"""Start the processing from a specific download index in background"""
if processing_status["is_running"]:
return {"message": "Processing is already running"}
if start_index < 0:
return {"message": "Start index cannot be negative."}
background_tasks.add_task(continuous_processing, start_download_index=start_index)
return {"message": f"Processing started from download index: {start_index}"}
@app.post("/stop")
async def stop_processing():
"""Stop the processing"""
if not processing_status["is_running"]:
return {"message": "Processing is not running"}
processing_status["is_running"] = False
return {"message": "Processing stop requested"}
@app.get("/logs")
async def get_logs():
"""Get processing logs"""
return {"logs": processing_status["logs"]}
@app.get("/uploaded-folders")
async def get_uploaded_folders():
"""Get list of uploaded folder hashes for BG2"""
uploaded_folders = load_uploaded_folders()
return {"uploaded_folder_count": len(uploaded_folders), "folder_hashes": list(uploaded_folders)}
@app.get("/processed-video-courses")
async def get_processed_video_courses():
"""Get list of processed video course folder names"""
processed_video_courses = load_processed_video_courses()
return {"processed_video_course_count": len(processed_video_courses), "course_names": list(processed_video_courses)}
@app.get("/downloads")
async def list_downloads():
"""List available frame downloads"""
try:
if not os.path.exists(ZIPPED_FRAMES_FOLDER):
return {"files": []}
files = []
for filename in os.listdir(ZIPPED_FRAMES_FOLDER):
if filename.endswith('.zip'):
file_path = os.path.join(ZIPPED_FRAMES_FOLDER, filename)
file_size = os.path.getsize(file_path)
# Convert size to human readable format
if file_size < 1024:
size_str = f"{file_size} B"
elif file_size < 1024 * 1024:
size_str = f"{file_size / 1024:.1f} KB"
elif file_size < 1024 * 1024 * 1024:
size_str = f"{file_size / (1024 * 1024):.1f} MB"
else:
size_str = f"{file_size / (1024 * 1024 * 1024):.1f} GB"
files.append({
"name": filename,
"size": size_str,
"path": file_path
})
# Sort by filename
files.sort(key=lambda x: x["name"])
return {"files": files}
except Exception as e:
log_message(f"β Error listing downloads: {e}")
return {"files": [], "error": str(e)}
@app.get("/download/{filename}")
async def download_file(filename: str):
"""Download a specific frame zip file"""
try:
# Sanitize filename to prevent directory traversal
safe_filename = os.path.basename(filename)
file_path = os.path.join(ZIPPED_FRAMES_FOLDER, safe_filename)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found")
if not file_path.endswith('.zip'):
raise HTTPException(status_code=400, detail="Only zip files can be downloaded")
log_message(f"π₯ Serving download: {safe_filename}")
return FileResponse(
path=file_path,
filename=safe_filename,
media_type='application/zip'
)
except HTTPException:
raise
except Exception as e:
log_message(f"β Error serving download {filename}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)
|