import os import time import requests import zipfile import shutil from urllib.parse import urlparse from huggingface_hub import upload_file from fastapi import FastAPI from contextlib import asynccontextmanager import asyncio import logging # === CONFIGURATION === HF_TOKEN = os.environ.get("HF_TOKEN") REPO_ID = "factorstudios/Pipeline" DATA_PATH = "Blenders" OUTPUT_DIR = "batch_downloads" DOWNLOAD_URLS = [ "https://ww3.zeroupload.xyz/9af645304686f205f714cdb6ff5a22e9/Yansculpts__SculptingInBlenderForBeginners_DownloadPirate.com.rar?download_token=0125595e259d499ff61ed576411c06db3e00571210d33569a11fc7a49a78817b" ] DELAY_BETWEEN_DOWNLOADS = 12 # seconds # === Setup Logging === logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") # === Prepare output folder === os.makedirs(OUTPUT_DIR, exist_ok=True) app = FastAPI() # === DUMMY ROUTE TO KEEP SERVER HEALTHY === @app.get("/") def keep_alive(): return {"status": "running"} # === Upload Function === def upload_to_dataset(filepath): try: upload_file( path_or_fileobj=filepath, path_in_repo=f"{DATA_PATH}/{os.path.basename(filepath)}", repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN ) logging.info(f"[↑] Uploaded: {filepath}") except Exception as e: logging.error(f"[!] Upload failed: {filepath} — {e}") # === Upload Directory Contents === def upload_directory_contents(directory): try: for root, dirs, files in os.walk(directory): for file in files: filepath = os.path.join(root, file) relative_path = os.path.relpath(filepath, directory) upload_file( path_or_fileobj=filepath, path_in_repo=f"{DATA_PATH}/{relative_path}", repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN ) logging.info(f"[↑] Uploaded: {relative_path}") except Exception as e: logging.error(f"[!] Upload directory failed: {directory} — {e}") # === Background Worker === async def downloader_worker(): for direct_download_link in DOWNLOAD_URLS: logging.info("[*] Waiting before next download...") await asyncio.sleep(DELAY_BETWEEN_DOWNLOADS) try: logging.info(f"[*] Downloading from: {direct_download_link}") filename = os.path.basename(urlparse(direct_download_link).path) if not filename or "." not in filename: filename = "downloaded_file_" + str(int(time.time())) local_path = os.path.join(OUTPUT_DIR, filename) logging.info(f"[*] Saving to: {local_path}") with requests.get(direct_download_link, stream=True) as r: r.raise_for_status() with open(local_path, "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) logging.info(f"[✓] Downloaded: {filename}") # Check if file is a zip file if filename.lower().endswith('.zip'): logging.info(f"[*] Extracting zip file: {filename}") extract_dir = os.path.join(OUTPUT_DIR, os.path.splitext(filename)[0]) os.makedirs(extract_dir, exist_ok=True) try: with zipfile.ZipFile(local_path, 'r') as zip_ref: zip_ref.extractall(extract_dir) logging.info(f"[✓] Extracted to: {extract_dir}") # Upload all extracted contents upload_directory_contents(extract_dir) # Cleanup shutil.rmtree(extract_dir) os.remove(local_path) logging.info(f"[✓] Cleaned up extracted files and zip") except zipfile.BadZipFile: logging.error(f"[!] Invalid zip file: {filename}") os.remove(local_path) else: # If not a zip, upload directly as before upload_to_dataset(local_path) os.remove(local_path) except Exception as e: logging.error(f"[!] Error with {direct_download_link}: {e}") logging.info("✅ All files processed.") @app.get("/") def stay_alive(): return {"msg": "Running"} @app.get("/health") def healthcheck(): return {"healthy": True} # === FastAPI Lifespan === @asynccontextmanager async def lifespan(app: FastAPI): logging.info("🚀 Starting FastAPI download-uploader microservice...") task = asyncio.create_task(downloader_worker()) yield task.cancel() logging.info("🛑 Shutting down microservice.") # === FastAPI App === app = FastAPI(lifespan=lifespan) # Re-assign app with lifespan logic app.router.lifespan_context = lifespan