zextract / app.py
factorstudios's picture
Update app.py
0eb334a verified
Raw
History Blame Contribute Delete
9.78 kB
import os
import re
import time
import tempfile
import shutil
from pathlib import Path
from huggingface_hub import hf_hub_download, upload_file, list_repo_files
from fastapi import FastAPI
from contextlib import asynccontextmanager
import asyncio
import logging
try:
import rarfile
except ImportError:
rarfile = None
# === CONFIGURATION ===
HF_TOKEN = os.environ.get("HF_TOKEN")
REPO_ID = "factorstudios/Pipeline"
DATA_PATH = "Blenders"
EXTRACTED_PATH = "Blenders/extracted"
TEMP_DIR = tempfile.gettempdir()
# === Setup Logging ===
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
app = FastAPI()
# === Health Check Routes ===
@app.get("/")
def root():
return {"status": "RAR Extractor running"}
@app.get("/health")
def health():
return {"healthy": True}
# === Get Course Name from Filename ===
def extract_course_name(filename: str) -> str:
"""Extract course name from RAR filename"""
name = os.path.splitext(filename)[0]
# Remove common patterns like .part1, .001, etc
name = re.sub(r'\.(part\d+|r\d+|\d+)$', '', name, flags=re.IGNORECASE)
return name
# === Download File from Dataset ===
def download_from_dataset(filename: str, repo_path: str) -> str:
try:
logging.info(f"[*] Downloading from dataset: {repo_path}/{filename}")
local_path = hf_hub_download(
repo_id=REPO_ID,
filename=f"{repo_path}/{filename}",
repo_type="dataset",
token=HF_TOKEN,
cache_dir=TEMP_DIR
)
logging.info(f"[βœ“] Downloaded: {filename}")
return local_path
except Exception as e:
logging.error(f"[!] Download failed: {filename} β€” {e}")
return None
# === Extract RAR File ===
def extract_rar(rar_path: str, extract_dir: str) -> bool:
try:
if rarfile is None:
logging.error("[!] rarfile module not installed. Install with: pip install rarfile")
return False
logging.info(f"[*] Extracting RAR: {os.path.basename(rar_path)}")
with rarfile.RarFile(rar_path) as rf:
rf.extractall(extract_dir)
logging.info(f"[βœ“] Extracted to: {extract_dir}")
return True
except Exception as e:
logging.error(f"[!] RAR extraction failed: {rar_path} β€” {e}")
return False
# === Upload Directory Contents to Dataset ===
def upload_directory_to_dataset(local_dir: str, dataset_path: str) -> bool:
try:
file_count = 0
for root, dirs, files in os.walk(local_dir):
for file in files:
filepath = os.path.join(root, file)
relative_path = os.path.relpath(filepath, local_dir)
remote_path = f"{dataset_path}/{relative_path}".replace("\\", "/")
upload_file(
path_or_fileobj=filepath,
path_in_repo=remote_path,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN
)
logging.info(f"[↑] Uploaded: {remote_path}")
file_count += 1
logging.info(f"[βœ“] Uploaded {file_count} files to {dataset_path}")
return True
except Exception as e:
logging.error(f"[!] Upload directory failed: {local_dir} β€” {e}")
return False
# === List RAR Files in Dataset ===
def list_rar_files_in_dataset() -> list:
try:
logging.info(f"[*] Scanning dataset for RAR files in {DATA_PATH}")
all_files = list_repo_files(
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN
)
rar_files = [
f for f in all_files
if f.startswith(DATA_PATH)
and (f.lower().endswith(".rar") or re.search(r'\.r\d{2}$', f, re.IGNORECASE))
]
logging.info(f"[*] Found {len(rar_files)} RAR files")
for rf in rar_files:
logging.info(f" - {rf}")
return rar_files
except Exception as e:
logging.error(f"[!] Failed to list files: {e}")
return []
# === List Extracted Courses in Dataset ===
def list_extracted_courses_in_dataset() -> set:
try:
logging.info(f"[*] Scanning dataset for extracted courses in {EXTRACTED_PATH}")
all_files = list_repo_files(
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN
)
extracted_courses = set()
for f in all_files:
if f.startswith(EXTRACTED_PATH + '/') and len(f.split('/')) > len(EXTRACTED_PATH.split('/')):
# Extract the course name from the path
course_name = f.split('/')[len(EXTRACTED_PATH.split('/'))]
extracted_courses.add(course_name)
logging.info(f"[*] Found {len(extracted_courses)} extracted courses")
for ec in extracted_courses:
logging.info(f" - {ec}")
return extracted_courses
except Exception as e:
logging.error(f"[!] Failed to list extracted courses: {e}")
return set()
# === Extract and Upload RAR ===
async def extract_and_upload_rar(rar_file_path: str):
try:
# Get filename
filename = os.path.basename(rar_file_path)
course_name = extract_course_name(filename)
# Create temp extraction directory
extract_dir = os.path.join(TEMP_DIR, f"rar_extract_{int(time.time())}")
os.makedirs(extract_dir, exist_ok=True)
logging.info(f"[*] Processing: {filename} (Course: {course_name})")
# Download RAR file
local_rar = download_from_dataset(filename, DATA_PATH)
if not local_rar:
return False
# Extract RAR
if not extract_rar(local_rar, extract_dir):
shutil.rmtree(extract_dir, ignore_errors=True)
return False
# Upload to dataset under blenders/extracted/{course_name}
remote_path = f"{EXTRACTED_PATH}/{course_name}"
if not upload_directory_to_dataset(extract_dir, remote_path):
shutil.rmtree(extract_dir, ignore_errors=True)
return False
# Cleanup
shutil.rmtree(extract_dir, ignore_errors=True)
logging.info(f"[βœ“] Completed: {course_name}")
return True
except Exception as e:
logging.error(f"[!] Error processing {rar_file_path}: {e}")
return False
# === Background Worker ===
async def rar_processor_worker():
logging.info("πŸš€ RAR Processor started")
while True:
try:
logging.info("[*] Scanning for RAR files...")
rar_files = list_rar_files_in_dataset()
extracted_courses = list_extracted_courses_in_dataset()
untouched_rar_files = []
for rar_file in rar_files:
course_name = extract_course_name(os.path.basename(rar_file))
if course_name not in extracted_courses:
untouched_rar_files.append(rar_file)
if untouched_rar_files:
logging.info(f"[*] Found {len(untouched_rar_files)} untouched RAR files.")
for rar_file in untouched_rar_files:
logging.info(f" - {rar_file}")
await extract_and_upload_rar(rar_file)
await asyncio.sleep(5) # Delay between files
else:
logging.info("[*] No untouched RAR files found, waiting...")
# Wait 60 seconds before next scan
await asyncio.sleep(60)
except Exception as e:
logging.error(f"[!] Worker error: {e}")
await asyncio.sleep(60)
# === FastAPI Lifespan ===
@asynccontextmanager
async def lifespan(app: FastAPI):
logging.info("πŸš€ Starting RAR Extractor FastAPI server...")
task = asyncio.create_task(rar_processor_worker())
yield
task.cancel()
logging.info("πŸ›‘ Shutting down RAR Extractor.")
# === Update App with Lifespan ===
app = FastAPI(lifespan=lifespan)
# === API Endpoints ===
@app.get("/")
def root():
return {"status": "RAR Extractor running"}
@app.get("/health")
def health():
return {"healthy": True}
@app.get("/scan")
def scan_rars():
"""Manually trigger RAR file scan and identify untouched ones"""
rar_files = list_rar_files_in_dataset()
extracted_courses = list_extracted_courses_in_dataset()
untouched_rar_files = []
for rar_file in rar_files:
course_name = extract_course_name(os.path.basename(rar_file))
if course_name not in extracted_courses:
untouched_rar_files.append(rar_file)
return {"total_rar_files": len(rar_files), "extracted_courses": len(extracted_courses), "untouched_rar_files": untouched_rar_files}
@app.post("/extract-all")
async def extract_all():
"""Manually trigger extraction of all untouched RAR files"""
rar_files = list_rar_files_in_dataset()
extracted_courses = list_extracted_courses_in_dataset()
untouched_rar_files = []
for rar_file in rar_files:
course_name = extract_course_name(os.path.basename(rar_file))
if course_name not in extracted_courses:
untouched_rar_files.append(rar_file)
if not untouched_rar_files:
return {"message": "No untouched RAR files found to extract"}
results = []
for rar_file in untouched_rar_files:
success = await extract_and_upload_rar(rar_file)
results.append({"file": rar_file, "success": success})
await asyncio.sleep(5)
return {"processed": len(results), "results": results}