Zeup / main.py
factorstudios's picture
Update main.py
88d0066 verified
Raw
History Blame Contribute Delete
5.17 kB
import os
import time
import requests
import zipfile
import shutil
from urllib.parse import urlparse
from huggingface_hub import upload_file
from fastapi import FastAPI
from contextlib import asynccontextmanager
import asyncio
import logging
# === CONFIGURATION ===
HF_TOKEN = os.environ.get("HF_TOKEN")
REPO_ID = "factorstudios/Pipeline"
DATA_PATH = "Blenders"
OUTPUT_DIR = "batch_downloads"
DOWNLOAD_URLS = [
"https://ww3.zeroupload.xyz/9af645304686f205f714cdb6ff5a22e9/Yansculpts__SculptingInBlenderForBeginners_DownloadPirate.com.rar?download_token=0125595e259d499ff61ed576411c06db3e00571210d33569a11fc7a49a78817b"
]
DELAY_BETWEEN_DOWNLOADS = 12 # seconds
# === Setup Logging ===
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# === Prepare output folder ===
os.makedirs(OUTPUT_DIR, exist_ok=True)
app = FastAPI()
# === DUMMY ROUTE TO KEEP SERVER HEALTHY ===
@app.get("/")
def keep_alive():
return {"status": "running"}
# === Upload Function ===
def upload_to_dataset(filepath):
try:
upload_file(
path_or_fileobj=filepath,
path_in_repo=f"{DATA_PATH}/{os.path.basename(filepath)}",
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN
)
logging.info(f"[↑] Uploaded: {filepath}")
except Exception as e:
logging.error(f"[!] Upload failed: {filepath} β€” {e}")
# === Upload Directory Contents ===
def upload_directory_contents(directory):
try:
for root, dirs, files in os.walk(directory):
for file in files:
filepath = os.path.join(root, file)
relative_path = os.path.relpath(filepath, directory)
upload_file(
path_or_fileobj=filepath,
path_in_repo=f"{DATA_PATH}/{relative_path}",
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN
)
logging.info(f"[↑] Uploaded: {relative_path}")
except Exception as e:
logging.error(f"[!] Upload directory failed: {directory} β€” {e}")
# === Background Worker ===
async def downloader_worker():
for direct_download_link in DOWNLOAD_URLS:
logging.info("[*] Waiting before next download...")
await asyncio.sleep(DELAY_BETWEEN_DOWNLOADS)
try:
logging.info(f"[*] Downloading from: {direct_download_link}")
filename = os.path.basename(urlparse(direct_download_link).path)
if not filename or "." not in filename:
filename = "downloaded_file_" + str(int(time.time()))
local_path = os.path.join(OUTPUT_DIR, filename)
logging.info(f"[*] Saving to: {local_path}")
with requests.get(direct_download_link, stream=True) as r:
r.raise_for_status()
with open(local_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
logging.info(f"[βœ“] Downloaded: {filename}")
# Check if file is a zip file
if filename.lower().endswith('.zip'):
logging.info(f"[*] Extracting zip file: {filename}")
extract_dir = os.path.join(OUTPUT_DIR, os.path.splitext(filename)[0])
os.makedirs(extract_dir, exist_ok=True)
try:
with zipfile.ZipFile(local_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
logging.info(f"[βœ“] Extracted to: {extract_dir}")
# Upload all extracted contents
upload_directory_contents(extract_dir)
# Cleanup
shutil.rmtree(extract_dir)
os.remove(local_path)
logging.info(f"[βœ“] Cleaned up extracted files and zip")
except zipfile.BadZipFile:
logging.error(f"[!] Invalid zip file: {filename}")
os.remove(local_path)
else:
# If not a zip, upload directly as before
upload_to_dataset(local_path)
os.remove(local_path)
except Exception as e:
logging.error(f"[!] Error with {direct_download_link}: {e}")
logging.info("βœ… All files processed.")
@app.get("/")
def stay_alive():
return {"msg": "Running"}
@app.get("/health")
def healthcheck():
return {"healthy": True}
# === FastAPI Lifespan ===
@asynccontextmanager
async def lifespan(app: FastAPI):
logging.info("πŸš€ Starting FastAPI download-uploader microservice...")
task = asyncio.create_task(downloader_worker())
yield
task.cancel()
logging.info("πŸ›‘ Shutting down microservice.")
# === FastAPI App ===
app = FastAPI(lifespan=lifespan)
# Re-assign app with lifespan logic
app.router.lifespan_context = lifespan