Spaces:
Running
Running
| import os | |
| import re | |
| import time | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| from huggingface_hub import hf_hub_download, upload_file, list_repo_files | |
| from fastapi import FastAPI | |
| from contextlib import asynccontextmanager | |
| import asyncio | |
| import logging | |
| try: | |
| import rarfile | |
| except ImportError: | |
| rarfile = None | |
| # === CONFIGURATION === | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| REPO_ID = "factorstudios/Pipeline" | |
| DATA_PATH = "Blenders" | |
| EXTRACTED_PATH = "Blenders/extracted" | |
| TEMP_DIR = tempfile.gettempdir() | |
| # === Setup Logging === | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") | |
| app = FastAPI() | |
| # === Health Check Routes === | |
| def root(): | |
| return {"status": "RAR Extractor running"} | |
| def health(): | |
| return {"healthy": True} | |
| # === Get Course Name from Filename === | |
| def extract_course_name(filename: str) -> str: | |
| """Extract course name from RAR filename""" | |
| name = os.path.splitext(filename)[0] | |
| # Remove common patterns like .part1, .001, etc | |
| name = re.sub(r'\.(part\d+|r\d+|\d+)$', '', name, flags=re.IGNORECASE) | |
| return name | |
| # === Download File from Dataset === | |
| def download_from_dataset(filename: str, repo_path: str) -> str: | |
| try: | |
| logging.info(f"[*] Downloading from dataset: {repo_path}/{filename}") | |
| local_path = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=f"{repo_path}/{filename}", | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| cache_dir=TEMP_DIR | |
| ) | |
| logging.info(f"[β] Downloaded: {filename}") | |
| return local_path | |
| except Exception as e: | |
| logging.error(f"[!] Download failed: {filename} β {e}") | |
| return None | |
| # === Extract RAR File === | |
| def extract_rar(rar_path: str, extract_dir: str) -> bool: | |
| try: | |
| if rarfile is None: | |
| logging.error("[!] rarfile module not installed. Install with: pip install rarfile") | |
| return False | |
| logging.info(f"[*] Extracting RAR: {os.path.basename(rar_path)}") | |
| with rarfile.RarFile(rar_path) as rf: | |
| rf.extractall(extract_dir) | |
| logging.info(f"[β] Extracted to: {extract_dir}") | |
| return True | |
| except Exception as e: | |
| logging.error(f"[!] RAR extraction failed: {rar_path} β {e}") | |
| return False | |
| # === Upload Directory Contents to Dataset === | |
| def upload_directory_to_dataset(local_dir: str, dataset_path: str) -> bool: | |
| try: | |
| file_count = 0 | |
| for root, dirs, files in os.walk(local_dir): | |
| for file in files: | |
| filepath = os.path.join(root, file) | |
| relative_path = os.path.relpath(filepath, local_dir) | |
| remote_path = f"{dataset_path}/{relative_path}".replace("\\", "/") | |
| upload_file( | |
| path_or_fileobj=filepath, | |
| path_in_repo=remote_path, | |
| repo_id=REPO_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| logging.info(f"[β] Uploaded: {remote_path}") | |
| file_count += 1 | |
| logging.info(f"[β] Uploaded {file_count} files to {dataset_path}") | |
| return True | |
| except Exception as e: | |
| logging.error(f"[!] Upload directory failed: {local_dir} β {e}") | |
| return False | |
| # === List RAR Files in Dataset === | |
| def list_rar_files_in_dataset() -> list: | |
| try: | |
| logging.info(f"[*] Scanning dataset for RAR files in {DATA_PATH}") | |
| all_files = list_repo_files( | |
| repo_id=REPO_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| rar_files = [ | |
| f for f in all_files | |
| if f.startswith(DATA_PATH) | |
| and (f.lower().endswith(".rar") or re.search(r'\.r\d{2}$', f, re.IGNORECASE)) | |
| ] | |
| logging.info(f"[*] Found {len(rar_files)} RAR files") | |
| for rf in rar_files: | |
| logging.info(f" - {rf}") | |
| return rar_files | |
| except Exception as e: | |
| logging.error(f"[!] Failed to list files: {e}") | |
| return [] | |
| # === List Extracted Courses in Dataset === | |
| def list_extracted_courses_in_dataset() -> set: | |
| try: | |
| logging.info(f"[*] Scanning dataset for extracted courses in {EXTRACTED_PATH}") | |
| all_files = list_repo_files( | |
| repo_id=REPO_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| extracted_courses = set() | |
| for f in all_files: | |
| if f.startswith(EXTRACTED_PATH + '/') and len(f.split('/')) > len(EXTRACTED_PATH.split('/')): | |
| # Extract the course name from the path | |
| course_name = f.split('/')[len(EXTRACTED_PATH.split('/'))] | |
| extracted_courses.add(course_name) | |
| logging.info(f"[*] Found {len(extracted_courses)} extracted courses") | |
| for ec in extracted_courses: | |
| logging.info(f" - {ec}") | |
| return extracted_courses | |
| except Exception as e: | |
| logging.error(f"[!] Failed to list extracted courses: {e}") | |
| return set() | |
| # === Extract and Upload RAR === | |
| async def extract_and_upload_rar(rar_file_path: str): | |
| try: | |
| # Get filename | |
| filename = os.path.basename(rar_file_path) | |
| course_name = extract_course_name(filename) | |
| # Create temp extraction directory | |
| extract_dir = os.path.join(TEMP_DIR, f"rar_extract_{int(time.time())}") | |
| os.makedirs(extract_dir, exist_ok=True) | |
| logging.info(f"[*] Processing: {filename} (Course: {course_name})") | |
| # Download RAR file | |
| local_rar = download_from_dataset(filename, DATA_PATH) | |
| if not local_rar: | |
| return False | |
| # Extract RAR | |
| if not extract_rar(local_rar, extract_dir): | |
| shutil.rmtree(extract_dir, ignore_errors=True) | |
| return False | |
| # Upload to dataset under blenders/extracted/{course_name} | |
| remote_path = f"{EXTRACTED_PATH}/{course_name}" | |
| if not upload_directory_to_dataset(extract_dir, remote_path): | |
| shutil.rmtree(extract_dir, ignore_errors=True) | |
| return False | |
| # Cleanup | |
| shutil.rmtree(extract_dir, ignore_errors=True) | |
| logging.info(f"[β] Completed: {course_name}") | |
| return True | |
| except Exception as e: | |
| logging.error(f"[!] Error processing {rar_file_path}: {e}") | |
| return False | |
| # === Background Worker === | |
| async def rar_processor_worker(): | |
| logging.info("π RAR Processor started") | |
| while True: | |
| try: | |
| logging.info("[*] Scanning for RAR files...") | |
| rar_files = list_rar_files_in_dataset() | |
| extracted_courses = list_extracted_courses_in_dataset() | |
| untouched_rar_files = [] | |
| for rar_file in rar_files: | |
| course_name = extract_course_name(os.path.basename(rar_file)) | |
| if course_name not in extracted_courses: | |
| untouched_rar_files.append(rar_file) | |
| if untouched_rar_files: | |
| logging.info(f"[*] Found {len(untouched_rar_files)} untouched RAR files.") | |
| for rar_file in untouched_rar_files: | |
| logging.info(f" - {rar_file}") | |
| await extract_and_upload_rar(rar_file) | |
| await asyncio.sleep(5) # Delay between files | |
| else: | |
| logging.info("[*] No untouched RAR files found, waiting...") | |
| # Wait 60 seconds before next scan | |
| await asyncio.sleep(60) | |
| except Exception as e: | |
| logging.error(f"[!] Worker error: {e}") | |
| await asyncio.sleep(60) | |
| # === FastAPI Lifespan === | |
| async def lifespan(app: FastAPI): | |
| logging.info("π Starting RAR Extractor FastAPI server...") | |
| task = asyncio.create_task(rar_processor_worker()) | |
| yield | |
| task.cancel() | |
| logging.info("π Shutting down RAR Extractor.") | |
| # === Update App with Lifespan === | |
| app = FastAPI(lifespan=lifespan) | |
| # === API Endpoints === | |
| def root(): | |
| return {"status": "RAR Extractor running"} | |
| def health(): | |
| return {"healthy": True} | |
| def scan_rars(): | |
| """Manually trigger RAR file scan and identify untouched ones""" | |
| rar_files = list_rar_files_in_dataset() | |
| extracted_courses = list_extracted_courses_in_dataset() | |
| untouched_rar_files = [] | |
| for rar_file in rar_files: | |
| course_name = extract_course_name(os.path.basename(rar_file)) | |
| if course_name not in extracted_courses: | |
| untouched_rar_files.append(rar_file) | |
| return {"total_rar_files": len(rar_files), "extracted_courses": len(extracted_courses), "untouched_rar_files": untouched_rar_files} | |
| async def extract_all(): | |
| """Manually trigger extraction of all untouched RAR files""" | |
| rar_files = list_rar_files_in_dataset() | |
| extracted_courses = list_extracted_courses_in_dataset() | |
| untouched_rar_files = [] | |
| for rar_file in rar_files: | |
| course_name = extract_course_name(os.path.basename(rar_file)) | |
| if course_name not in extracted_courses: | |
| untouched_rar_files.append(rar_file) | |
| if not untouched_rar_files: | |
| return {"message": "No untouched RAR files found to extract"} | |
| results = [] | |
| for rar_file in untouched_rar_files: | |
| success = await extract_and_upload_rar(rar_file) | |
| results.append({"file": rar_file, "success": success}) | |
| await asyncio.sleep(5) | |
| return {"processed": len(results), "results": results} | |