import os import re import time import tempfile import shutil from pathlib import Path from huggingface_hub import hf_hub_download, upload_file, list_repo_files from fastapi import FastAPI from contextlib import asynccontextmanager import asyncio import logging try: import rarfile except ImportError: rarfile = None # === CONFIGURATION === HF_TOKEN = os.environ.get("HF_TOKEN") REPO_ID = "factorstudios/Pipeline" DATA_PATH = "Blenders" EXTRACTED_PATH = "Blenders/extracted" TEMP_DIR = tempfile.gettempdir() # === Setup Logging === logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") app = FastAPI() # === Health Check Routes === @app.get("/") def root(): return {"status": "RAR Extractor running"} @app.get("/health") def health(): return {"healthy": True} # === Get Course Name from Filename === def extract_course_name(filename: str) -> str: """Extract course name from RAR filename""" name = os.path.splitext(filename)[0] # Remove common patterns like .part1, .001, etc name = re.sub(r'\.(part\d+|r\d+|\d+)$', '', name, flags=re.IGNORECASE) return name # === Download File from Dataset === def download_from_dataset(filename: str, repo_path: str) -> str: try: logging.info(f"[*] Downloading from dataset: {repo_path}/{filename}") local_path = hf_hub_download( repo_id=REPO_ID, filename=f"{repo_path}/{filename}", repo_type="dataset", token=HF_TOKEN, cache_dir=TEMP_DIR ) logging.info(f"[✓] Downloaded: {filename}") return local_path except Exception as e: logging.error(f"[!] Download failed: {filename} — {e}") return None # === Extract RAR File === def extract_rar(rar_path: str, extract_dir: str) -> bool: try: if rarfile is None: logging.error("[!] rarfile module not installed. Install with: pip install rarfile") return False logging.info(f"[*] Extracting RAR: {os.path.basename(rar_path)}") with rarfile.RarFile(rar_path) as rf: rf.extractall(extract_dir) logging.info(f"[✓] Extracted to: {extract_dir}") return True except Exception as e: logging.error(f"[!] RAR extraction failed: {rar_path} — {e}") return False # === Upload Directory Contents to Dataset === def upload_directory_to_dataset(local_dir: str, dataset_path: str) -> bool: try: file_count = 0 for root, dirs, files in os.walk(local_dir): for file in files: filepath = os.path.join(root, file) relative_path = os.path.relpath(filepath, local_dir) remote_path = f"{dataset_path}/{relative_path}".replace("\\", "/") upload_file( path_or_fileobj=filepath, path_in_repo=remote_path, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN ) logging.info(f"[↑] Uploaded: {remote_path}") file_count += 1 logging.info(f"[✓] Uploaded {file_count} files to {dataset_path}") return True except Exception as e: logging.error(f"[!] Upload directory failed: {local_dir} — {e}") return False # === List RAR Files in Dataset === def list_rar_files_in_dataset() -> list: try: logging.info(f"[*] Scanning dataset for RAR files in {DATA_PATH}") all_files = list_repo_files( repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN ) rar_files = [ f for f in all_files if f.startswith(DATA_PATH) and (f.lower().endswith(".rar") or re.search(r'\.r\d{2}$', f, re.IGNORECASE)) ] logging.info(f"[*] Found {len(rar_files)} RAR files") for rf in rar_files: logging.info(f" - {rf}") return rar_files except Exception as e: logging.error(f"[!] Failed to list files: {e}") return [] # === List Extracted Courses in Dataset === def list_extracted_courses_in_dataset() -> set: try: logging.info(f"[*] Scanning dataset for extracted courses in {EXTRACTED_PATH}") all_files = list_repo_files( repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN ) extracted_courses = set() for f in all_files: if f.startswith(EXTRACTED_PATH + '/') and len(f.split('/')) > len(EXTRACTED_PATH.split('/')): # Extract the course name from the path course_name = f.split('/')[len(EXTRACTED_PATH.split('/'))] extracted_courses.add(course_name) logging.info(f"[*] Found {len(extracted_courses)} extracted courses") for ec in extracted_courses: logging.info(f" - {ec}") return extracted_courses except Exception as e: logging.error(f"[!] Failed to list extracted courses: {e}") return set() # === Extract and Upload RAR === async def extract_and_upload_rar(rar_file_path: str): try: # Get filename filename = os.path.basename(rar_file_path) course_name = extract_course_name(filename) # Create temp extraction directory extract_dir = os.path.join(TEMP_DIR, f"rar_extract_{int(time.time())}") os.makedirs(extract_dir, exist_ok=True) logging.info(f"[*] Processing: {filename} (Course: {course_name})") # Download RAR file local_rar = download_from_dataset(filename, DATA_PATH) if not local_rar: return False # Extract RAR if not extract_rar(local_rar, extract_dir): shutil.rmtree(extract_dir, ignore_errors=True) return False # Upload to dataset under blenders/extracted/{course_name} remote_path = f"{EXTRACTED_PATH}/{course_name}" if not upload_directory_to_dataset(extract_dir, remote_path): shutil.rmtree(extract_dir, ignore_errors=True) return False # Cleanup shutil.rmtree(extract_dir, ignore_errors=True) logging.info(f"[✓] Completed: {course_name}") return True except Exception as e: logging.error(f"[!] Error processing {rar_file_path}: {e}") return False # === Background Worker === async def rar_processor_worker(): logging.info("🚀 RAR Processor started") while True: try: logging.info("[*] Scanning for RAR files...") rar_files = list_rar_files_in_dataset() extracted_courses = list_extracted_courses_in_dataset() untouched_rar_files = [] for rar_file in rar_files: course_name = extract_course_name(os.path.basename(rar_file)) if course_name not in extracted_courses: untouched_rar_files.append(rar_file) if untouched_rar_files: logging.info(f"[*] Found {len(untouched_rar_files)} untouched RAR files.") for rar_file in untouched_rar_files: logging.info(f" - {rar_file}") await extract_and_upload_rar(rar_file) await asyncio.sleep(5) # Delay between files else: logging.info("[*] No untouched RAR files found, waiting...") # Wait 60 seconds before next scan await asyncio.sleep(60) except Exception as e: logging.error(f"[!] Worker error: {e}") await asyncio.sleep(60) # === FastAPI Lifespan === @asynccontextmanager async def lifespan(app: FastAPI): logging.info("🚀 Starting RAR Extractor FastAPI server...") task = asyncio.create_task(rar_processor_worker()) yield task.cancel() logging.info("🛑 Shutting down RAR Extractor.") # === Update App with Lifespan === app = FastAPI(lifespan=lifespan) # === API Endpoints === @app.get("/") def root(): return {"status": "RAR Extractor running"} @app.get("/health") def health(): return {"healthy": True} @app.get("/scan") def scan_rars(): """Manually trigger RAR file scan and identify untouched ones""" rar_files = list_rar_files_in_dataset() extracted_courses = list_extracted_courses_in_dataset() untouched_rar_files = [] for rar_file in rar_files: course_name = extract_course_name(os.path.basename(rar_file)) if course_name not in extracted_courses: untouched_rar_files.append(rar_file) return {"total_rar_files": len(rar_files), "extracted_courses": len(extracted_courses), "untouched_rar_files": untouched_rar_files} @app.post("/extract-all") async def extract_all(): """Manually trigger extraction of all untouched RAR files""" rar_files = list_rar_files_in_dataset() extracted_courses = list_extracted_courses_in_dataset() untouched_rar_files = [] for rar_file in rar_files: course_name = extract_course_name(os.path.basename(rar_file)) if course_name not in extracted_courses: untouched_rar_files.append(rar_file) if not untouched_rar_files: return {"message": "No untouched RAR files found to extract"} results = [] for rar_file in untouched_rar_files: success = await extract_and_upload_rar(rar_file) results.append({"file": rar_file, "success": success}) await asyncio.sleep(5) return {"processed": len(results), "results": results}