Spaces:
Paused
Paused
| import os | |
| import json | |
| import requests | |
| import subprocess | |
| import shutil | |
| import asyncio | |
| import threading | |
| import time | |
| import hashlib | |
| from typing import Dict, List, Set, Optional | |
| from fastapi import FastAPI, BackgroundTasks, HTTPException, Form | |
| from fastapi.responses import HTMLResponse, JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from huggingface_hub import HfApi, list_repo_files | |
| import uvicorn | |
| # ==== CONFIGURATION ==== | |
| HF_TOKEN = os.getenv("HF_TOKEN", "") | |
| SOURCE_REPO_ID = os.getenv("SOURCE_REPO", "Fred808/BG1") | |
| DEST_REPO_ID = os.getenv("DEST_REPO", "Fred808/BG3") | |
| DOWNLOAD_FOLDER = "downloads" | |
| EXTRACT_FOLDER = "extracted_tmp" | |
| DOWNLOAD_STATE_FILE = "download_progress.json" | |
| PROCESS_STATE_FILE = "process_progress.json" | |
| UPLOADED_FOLDERS_FILE = "uploaded_folders.json" # New: Track uploaded folder hashes | |
| FAILED_FILES_LOG = "failed_files.txt" | |
| CHUNK_SIZE = 1 # Smaller chunks for Space environment | |
| PROCESSING_DELAY = 2 # Delay between processing files (seconds) | |
| os.makedirs(DOWNLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(EXTRACT_FOLDER, exist_ok=True) | |
| api = HfApi(token=HF_TOKEN) | |
| # Global state | |
| processing_status = { | |
| "is_running": False, | |
| "current_file": None, | |
| "total_files": 0, | |
| "processed_files": 0, | |
| "failed_files": 0, | |
| "uploaded_folders": 0, | |
| "last_update": None, | |
| "logs": [] | |
| } | |
| app = FastAPI(title="RAR Processing Service", description="Automated RAR extraction and upload service") | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def log_message(message: str): | |
| """Add message to logs and print it""" | |
| timestamp = time.strftime("%Y-%m-%d %H:%M:%S") | |
| log_entry = f"[{timestamp}] {message}" | |
| print(log_entry) | |
| processing_status["logs"].append(log_entry) | |
| processing_status["last_update"] = timestamp | |
| # Keep only last 100 log entries | |
| if len(processing_status["logs"]) > 100: | |
| processing_status["logs"] = processing_status["logs"][-100:] | |
| def log_failed_file(filename: str, error_msg: str): | |
| """Log failed files to a separate file for later review""" | |
| with open(FAILED_FILES_LOG, "a") as f: | |
| f.write(f"{filename}: {error_msg}\n") | |
| log_message(f"❌ Failed: {filename} - {error_msg}") | |
| def get_folder_hash(folder_name: str) -> str: | |
| """Generate a hash for the folder name to use as a unique identifier""" | |
| return hashlib.md5(folder_name.encode()).hexdigest() | |
| def load_uploaded_folders() -> Set[str]: | |
| """Load set of uploaded folder hashes""" | |
| if os.path.exists(UPLOADED_FOLDERS_FILE): | |
| try: | |
| with open(UPLOADED_FOLDERS_FILE, "r") as f: | |
| data = json.load(f) | |
| return set(data.get("uploaded_folder_hashes", [])) | |
| except json.JSONDecodeError: | |
| log_message(f"⚠️ Warning: Could not decode {UPLOADED_FOLDERS_FILE}. Starting with empty set.") | |
| return set() | |
| return set() | |
| def save_uploaded_folders(uploaded_set: Set[str]): | |
| """Save set of uploaded folder hashes""" | |
| with open(UPLOADED_FOLDERS_FILE, "w") as f: | |
| json.dump({"uploaded_folder_hashes": list(uploaded_set)}, f) | |
| def load_download_state() -> int: | |
| """Load download progress state""" | |
| if os.path.exists(DOWNLOAD_STATE_FILE): | |
| try: | |
| with open(DOWNLOAD_STATE_FILE, "r") as f: | |
| return json.load(f).get("next_download_index", 0) | |
| except json.JSONDecodeError: | |
| log_message(f"⚠️ Warning: Could not decode {DOWNLOAD_STATE_FILE}. Starting download from index 0.") | |
| return 0 | |
| return 0 | |
| def save_download_state(next_index: int): | |
| """Save download progress state""" | |
| with open(DOWNLOAD_STATE_FILE, "w") as f: | |
| json.dump({"next_download_index": next_index}, f) | |
| def load_processed_files_state() -> set: | |
| """Load processed files from the state file""" | |
| if os.path.exists(PROCESS_STATE_FILE): | |
| try: | |
| with open(PROCESS_STATE_FILE, "r") as f: | |
| data = json.load(f) | |
| return set(data.get("processed_rars", [])) | |
| except json.JSONDecodeError: | |
| log_message(f"⚠️ Warning: Could not decode {PROCESS_STATE_FILE}. Starting with empty processed list.") | |
| return set() | |
| return set() | |
| def save_processed_files_state(processed_set: set): | |
| """Save processed files to the state file""" | |
| with open(PROCESS_STATE_FILE, "w") as f: | |
| json.dump({"processed_rars": list(processed_set)}, f) | |
| def download_rar_files(start_index: int, chunk_size: int) -> tuple: | |
| """Downloads a batch of RAR files from the source dataset""" | |
| try: | |
| all_files = list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset", token=HF_TOKEN) | |
| # Filter for .rar files and exclude the specific one | |
| rar_files_in_repo = sorted([f for f in all_files if f.endswith(".rar") and "ZBrush/3DConceptArtist_TheUltimateZbrushGuide_DownloadPirate.com.rar" not in f]) | |
| end_index = start_index + chunk_size | |
| files_to_download_metadata = rar_files_in_repo[start_index:end_index] | |
| if not files_to_download_metadata: | |
| log_message("✅ No more RAR files to download.") | |
| return [], start_index | |
| log_message(f"📥 Downloading RAR files {start_index + 1} to {end_index} from {SOURCE_REPO_ID}") | |
| downloaded_paths = [] | |
| for file_path_in_repo in files_to_download_metadata: | |
| filename = os.path.basename(file_path_in_repo) | |
| dest_path = os.path.join(DOWNLOAD_FOLDER, filename) | |
| file_url = f"https://huggingface.co/datasets/{SOURCE_REPO_ID}/resolve/main/{file_path_in_repo}" | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| if os.path.exists(dest_path): | |
| log_message(f"⏩ Already exists, skipping: {filename}") | |
| downloaded_paths.append(dest_path) | |
| continue | |
| log_message(f"🔽 Downloading: {file_path_in_repo}") | |
| try: | |
| with requests.get(file_url, headers=headers, stream=True) as r: | |
| r.raise_for_status() | |
| with open(dest_path, "wb") as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| log_message(f"✅ Downloaded: {filename}") | |
| downloaded_paths.append(dest_path) | |
| except Exception as e: | |
| log_message(f"❌ Failed to download {file_path_in_repo}: {e}") | |
| log_failed_file(file_path_in_repo, f"Download failed: {e}") | |
| return downloaded_paths, end_index | |
| except Exception as e: | |
| log_message(f"❌ Error in download_rar_files: {e}") | |
| return [], start_index | |
| def upload_large_folder(local_folder, repo_id, repo_folder=None, repo_type="dataset", api=None): | |
| """ | |
| Upload an entire folder (recursively) to a Hugging Face dataset repo. | |
| local_folder: path to the folder to upload | |
| repo_id: destination repo id | |
| repo_folder: subfolder in the repo (optional) | |
| repo_type: usually 'dataset' | |
| api: HfApi instance | |
| """ | |
| if api is None: | |
| from huggingface_hub import HfApi | |
| api = HfApi(token=HF_TOKEN) | |
| for root, _, files in os.walk(local_folder): | |
| for file in files: | |
| local_path = os.path.join(root, file) | |
| if repo_folder: | |
| path_in_repo = os.path.join(repo_folder, os.path.relpath(local_path, local_folder)) | |
| else: | |
| path_in_repo = os.path.relpath(local_path, local_folder) | |
| api.upload_file( | |
| path_or_fileobj=local_path, | |
| path_in_repo=path_in_repo, | |
| repo_id=repo_id, | |
| repo_type=repo_type | |
| ) | |
| def extract_and_upload_rar(rar_path: str, processed_rars_set: set, uploaded_folders_set: Set[str]) -> bool: | |
| """Extracts a single RAR file and uploads its contents to the destination repo""" | |
| filename = os.path.basename(rar_path) | |
| processing_status["current_file"] = filename | |
| if filename in processed_rars_set: | |
| log_message(f"⏩ {filename} already processed, skipping extraction and upload.") | |
| return True | |
| # Generate folder name and hash for tracking | |
| folder_name = filename.replace(".rar", "") | |
| folder_hash = get_folder_hash(folder_name) | |
| # Check if this folder content has already been uploaded | |
| if folder_hash in uploaded_folders_set: | |
| log_message(f"🔒 Folder '{folder_name}' already uploaded (hash: {folder_hash[:8]}...), skipping.") | |
| processed_rars_set.add(filename) | |
| save_processed_files_state(processed_rars_set) | |
| return True | |
| log_message(f"📦 Attempting to extract: {filename}") | |
| current_extract_folder = os.path.join(EXTRACT_FOLDER, f"{folder_name}_extracted") | |
| os.makedirs(current_extract_folder, exist_ok=True) | |
| try: | |
| if shutil.which("unrar") is None: | |
| raise RuntimeError("unrar command not found. Please install unrar.") | |
| # Use -idq to suppress query messages and -o+ to overwrite without prompting | |
| unrar_command = ["unrar", "x", "-o+", rar_path, current_extract_folder] | |
| log_message(f"Running command: {' '.join(unrar_command)}") | |
| result = subprocess.run( | |
| unrar_command, | |
| check=True, | |
| capture_output=True, | |
| text=True, | |
| encoding='utf-8' | |
| ) | |
| extracted_contents = os.listdir(current_extract_folder) | |
| if not extracted_contents: | |
| raise Exception("Extraction completed but no files were produced in the target directory.") | |
| log_message(f"Successfully extracted {len(extracted_contents)} items") | |
| # Upload the entire folder at once | |
| log_message(f"⬆️ Uploading entire folder: {folder_name}") | |
| upload_large_folder(current_extract_folder, DEST_REPO_ID, repo_folder=folder_name, api=api) | |
| log_message(f"✅ Successfully uploaded all files from {filename} as a folder") | |
| # Mark folder as uploaded using hash | |
| uploaded_folders_set.add(folder_hash) | |
| save_uploaded_folders(uploaded_folders_set) | |
| processing_status["uploaded_folders"] = len(uploaded_folders_set) | |
| log_message(f"🔒 Folder '{folder_name}' locked in BG2 repo (hash: {folder_hash[:8]}...)") | |
| return True | |
| except subprocess.CalledProcessError as e: | |
| error_msg = f"RAR extraction failed (exit {e.returncode}): {e.stderr.strip()}" | |
| log_failed_file(filename, error_msg) | |
| processing_status["failed_files"] += 1 | |
| return False | |
| except Exception as e: | |
| error_msg = f"Unexpected error during processing {filename}: {str(e)}" | |
| log_failed_file(filename, error_msg) | |
| processing_status["failed_files"] += 1 | |
| return False | |
| finally: | |
| # Always cleanup the extraction folder after processing (success or failure) | |
| if os.path.exists(current_extract_folder): | |
| log_message(f"🧹 Cleaning up extracted files in {current_extract_folder}") | |
| try: | |
| shutil.rmtree(current_extract_folder) | |
| log_message(f"✅ Cleaned up extraction folder") | |
| except Exception as e: | |
| log_message(f"⚠️ Could not clean up extraction folder {current_extract_folder}: {e}") | |
| def continuous_processing(start_download_index: Optional[int] = None): | |
| """Main processing loop that runs continuously, with an optional starting download index""" | |
| processing_status["is_running"] = True | |
| log_message("🚀 Starting continuous RAR processing...") | |
| try: | |
| # Load uploaded folders tracking | |
| uploaded_folders = load_uploaded_folders() | |
| processing_status["uploaded_folders"] = len(uploaded_folders) | |
| if start_download_index is not None: | |
| log_message(f"Starting download from index: {start_download_index}") | |
| save_download_state(start_download_index) # Set download state to start from this index | |
| else: | |
| log_message("Starting download from saved state or beginning.") | |
| while processing_status["is_running"]: | |
| # 1. Download a batch of RAR files | |
| download_start_index = load_download_state() | |
| downloaded_rar_paths, next_download_index = download_rar_files(download_start_index, CHUNK_SIZE) | |
| save_download_state(next_download_index) | |
| if not downloaded_rar_paths: | |
| # Check if there are any local RAR files to process | |
| all_local_rars = sorted([os.path.join(DOWNLOAD_FOLDER, f) for f in os.listdir(DOWNLOAD_FOLDER) if f.endswith(".rar")]) | |
| processed_rars = load_processed_files_state() | |
| unprocessed_rars = [rar for rar in all_local_rars if os.path.basename(rar) not in processed_rars] | |
| if not unprocessed_rars: | |
| log_message("✅ No more RAR files to download. Stopping...") | |
| break | |
| else: | |
| log_message(f"📋 Found {len(unprocessed_rars)} unprocessed local RAR files") | |
| # 2. Process all available RAR files (downloaded + existing) | |
| all_local_rars = sorted([os.path.join(DOWNLOAD_FOLDER, f) for f in os.listdir(DOWNLOAD_FOLDER) if f.endswith(".rar")]) | |
| processed_rars = load_processed_files_state() | |
| processing_status["total_files"] = len(all_local_rars) | |
| processing_status["processed_files"] = len(processed_rars) | |
| for rar_file_path in all_local_rars: | |
| if not processing_status["is_running"]: | |
| break | |
| filename = os.path.basename(rar_file_path) | |
| if filename not in processed_rars: | |
| success = extract_and_upload_rar(rar_file_path, processed_rars, uploaded_folders) | |
| if success: | |
| processed_rars.add(filename) | |
| save_processed_files_state(processed_rars) | |
| processing_status["processed_files"] += 1 | |
| # Delete the RAR file after successful processing | |
| log_message(f"🗑️ Deleting processed RAR: {filename}") | |
| try: | |
| os.remove(rar_file_path) | |
| log_message(f"✅ Deleted RAR file: {filename}") | |
| except Exception as e: | |
| log_message(f"⚠️ Could not delete {rar_file_path}: {e}") | |
| # Add delay between processing files | |
| time.sleep(PROCESSING_DELAY) | |
| # If no new files were downloaded and all local files are processed, we're done | |
| if not downloaded_rar_paths: | |
| break | |
| except Exception as e: | |
| log_message(f"❌ Error in continuous processing: {e}") | |
| finally: | |
| processing_status["is_running"] = False | |
| processing_status["current_file"] = None | |
| log_message("🏁 Processing stopped") | |
| async def root(): | |
| """Serve the main HTML interface""" | |
| html_content = """ | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>RAR Processing Service</title> | |
| <meta charset="utf-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1"> | |
| <style> | |
| body { font-family: Arial, sans-serif; margin: 20px; background-color: #f5f5f5; } | |
| .container { max-width: 1200px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); } | |
| .status-card { background: #e3f2fd; padding: 15px; border-radius: 5px; margin: 10px 0; } | |
| .logs { background: #f5f5f5; padding: 15px; border-radius: 5px; height: 400px; overflow-y: auto; font-family: monospace; font-size: 12px; } | |
| .button { background: #2196F3; color: white; padding: 10px 20px; border: none; border-radius: 5px; cursor: pointer; margin: 5px; } | |
| .button:hover { background: #1976D2; } | |
| .button:disabled { background: #ccc; cursor: not-allowed; } | |
| .stop-button { background: #f44336; } | |
| .stop-button:hover { background: #d32f2f; } | |
| .stats { display: flex; gap: 20px; margin: 20px 0; } | |
| .stat-item { background: #f0f0f0; padding: 10px; border-radius: 5px; text-align: center; flex: 1; } | |
| .start-form { margin-top: 20px; padding: 15px; border: 1px solid #ddd; border-radius: 5px; background: #f9f9f9; } | |
| .start-form input[type="number"] { width: calc(100% - 120px); padding: 8px; margin-right: 10px; border: 1px solid #ccc; border-radius: 4px; } | |
| .start-form button { padding: 8px 15px; background: #4CAF50; color: white; border: none; border-radius: 4px; cursor: pointer; } | |
| .start-form button:hover { background: #45a049; } | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <h1>🔄 RAR Processing Service</h1> | |
| <p>Automated extraction and upload of RAR files from BG1 to BG2 dataset</p> | |
| <div class="status-card"> | |
| <h3>Status: <span id="status">Stopped</span></h3> | |
| <p>Current File: <span id="current-file">None</span></p> | |
| <p>Last Update: <span id="last-update">Never</span></p> | |
| </div> | |
| <div class="stats"> | |
| <div class="stat-item"> | |
| <h4>Total Files</h4> | |
| <span id="total-files">0</span> | |
| </div> | |
| <div class="stat-item"> | |
| <h4>Processed</h4> | |
| <span id="processed-files">0</span> | |
| </div> | |
| <div class="stat-item"> | |
| <h4>Uploaded Folders</h4> | |
| <span id="uploaded-folders">0</span> | |
| </div> | |
| <div class="stat-item"> | |
| <h4>Failed</h4> | |
| <span id="failed-files">0</span> | |
| </div> | |
| </div> | |
| <div class="start-form"> | |
| <h3>Start Processing from Specific Download Index</h3> | |
| <input type="number" id="start-index-input" placeholder="Enter start index (e.g., 0)" value="0"> | |
| <button onclick="startProcessingWithIndex()">Start from Index</button> | |
| </div> | |
| <div> | |
| <button class="button" onclick="startProcessing()" id="start-btn">Start Processing (from last saved index)</button> | |
| <button class="button stop-button" onclick="stopProcessing()" id="stop-btn" disabled>Stop Processing</button> | |
| <button class="button" onclick="refreshStatus()">Refresh Status</button> | |
| </div> | |
| <h3>Logs</h3> | |
| <div class="logs" id="logs">Loading...</div> | |
| </div> | |
| <script> | |
| async function startProcessing() { | |
| try { | |
| const response = await fetch(\"/start\", { method: \"POST\" }); | |
| const result = await response.json(); | |
| alert(result.message); | |
| refreshStatus(); | |
| } catch (error) { | |
| alert(\"Error starting processing: \" + error.message); | |
| } | |
| } | |
| async function startProcessingWithIndex() { | |
| const index = document.getElementById(\"start-index-input\").value; | |
| if (index === "" || isNaN(index)) { | |
| alert(\"Please enter a valid number for the start index.\"); | |
| return; | |
| } | |
| try { | |
| const response = await fetch(\"/start_from_index\", { | |
| method: \"POST\", | |
| headers: { \"Content-Type\": \"application/x-www-form-urlencoded\" }, | |
| body: `start_index=${parseInt(index)}` | |
| }); | |
| const result = await response.json(); | |
| alert(result.message); | |
| refreshStatus(); | |
| } catch (error) { | |
| alert(\"Error starting processing from index: \" + error.message); | |
| } | |
| } | |
| async function stopProcessing() { | |
| try { | |
| const response = await fetch(\"/stop\", { method: \"POST\" }); | |
| const result = await response.json(); | |
| alert(result.message); | |
| refreshStatus(); | |
| } catch (error) { | |
| alert(\"Error stopping processing: \" + error.message); | |
| } | |
| } | |
| async function refreshStatus() { | |
| try { | |
| const response = await fetch(\"/status\"); | |
| const status = await response.json(); | |
| document.getElementById(\"status\").textContent = status.is_running ? \"Running\" : \"Stopped\"; | |
| document.getElementById(\"current-file\").textContent = status.current_file || \"None\"; | |
| document.getElementById(\"last-update\").textContent = status.last_update || \"Never\"; | |
| document.getElementById(\"total-files\").textContent = status.total_files; | |
| document.getElementById(\"processed-files\").textContent = status.processed_files; | |
| document.getElementById(\"uploaded-folders\").textContent = status.uploaded_folders; | |
| document.getElementById(\"failed-files\").textContent = status.failed_files; | |
| document.getElementById(\"start-btn\").disabled = status.is_running; | |
| document.getElementById(\"stop-btn\").disabled = !status.is_running; | |
| const logsDiv = document.getElementById(\"logs\"); | |
| logsDiv.innerHTML = status.logs.join(\"<br>\"); | |
| logsDiv.scrollTop = logsDiv.scrollHeight; | |
| } catch (error) { | |
| console.error(\"Error refreshing status:\", error); | |
| } | |
| } | |
| // Auto-refresh every 5 seconds | |
| setInterval(refreshStatus, 5000); | |
| // Initial load | |
| refreshStatus(); | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| return HTMLResponse(content=html_content) | |
| async def get_status(): | |
| """Get current processing status""" | |
| return JSONResponse(content=processing_status) | |
| async def start_processing(background_tasks: BackgroundTasks): | |
| """Start the processing in background""" | |
| if processing_status["is_running"]: | |
| return {"message": "Processing is already running"} | |
| background_tasks.add_task(continuous_processing) | |
| return {"message": "Processing started"} | |
| async def start_processing_from_index(background_tasks: BackgroundTasks, start_index: int = Form(...)): | |
| """Start the processing from a specific download index in background""" | |
| if processing_status["is_running"]: | |
| return {"message": "Processing is already running"} | |
| if start_index < 0: | |
| return {"message": "Start index cannot be negative."} | |
| background_tasks.add_task(continuous_processing, start_download_index=start_index) | |
| return {"message": f"Processing started from download index: {start_index}"} | |
| async def stop_processing(): | |
| """Stop the processing""" | |
| if not processing_status["is_running"]: | |
| return {"message": "Processing is not running"} | |
| processing_status["is_running"] = False | |
| return {"message": "Processing stop requested"} | |
| async def get_logs(): | |
| """Get processing logs""" | |
| return {"logs": processing_status["logs"]} | |
| async def get_uploaded_folders(): | |
| """Get list of uploaded folder hashes""" | |
| uploaded_folders = load_uploaded_folders() | |
| return {"uploaded_folder_count": len(uploaded_folders), "folder_hashes": list(uploaded_folders)} | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |