NoBG-Worker / src /main.py
github-actions[bot]
Deploy from GitHub Actions: ce809e15cbbd7c8c521c04ac27bb32f105625b4e
de79149
import os
import io
import json
import time
import logging
import threading
import requests
import redis
from dotenv import load_dotenv
from PIL import Image
from rembg import remove, new_session
from fastapi import FastAPI
from .config import MODEL_NAME, PREFIX
load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
REDIS_URL = os.getenv("REDIS_URL")
WEB_APP_URL = os.getenv("WEB_APP_URL")
if not REDIS_URL:
logger.error("REDIS_URL is not set. Worker will fail to start.")
exit(1)
if not WEB_APP_URL:
logger.error("WEB_APP_URL is not set. Worker will fail to upload.")
exit(1)
WORKER_SECRET = os.getenv("WORKER_SECRET")
r = redis.Redis.from_url(REDIS_URL, decode_responses=True)
session = new_session(MODEL_NAME)
app = FastAPI(title="NoBG Worker")
def upload_to_uploadthing(image_bytes: bytes, filename: str) -> str:
url = f"{WEB_APP_URL}/api/worker/upload"
files = {"file": (filename, image_bytes, "image/png")}
headers = {"Authorization": f"Bearer {WORKER_SECRET}"} if WORKER_SECRET else {}
response = requests.post(url, files=files, headers=headers)
if response.status_code != 200:
logger.error(f"Worker Upload API error: {response.status_code} - {response.text}")
response.raise_for_status()
data = response.json()
blob_url = data.get("url")
if not blob_url:
raise ValueError("Worker Upload API did not return a URL")
logger.info(f"Successfully uploaded to Uploadthing: {blob_url}")
return blob_url
def process_job(job_data_str: str):
try:
job = json.loads(job_data_str)
job_id = job["id"]
source_url = job["url"]
original_filename = job.get("filename", "image.png")
logger.info(f"Processing job {job_id} from {source_url}")
response = requests.get(source_url)
response.raise_for_status()
r.hset(f"{PREFIX}:job_status:{job_id}", mapping={"status": "processing"})
input_image = Image.open(io.BytesIO(response.content))
output_image = remove(input_image, session=session)
img_byte_arr = io.BytesIO()
output_image.save(img_byte_arr, format="PNG")
img_bytes = img_byte_arr.getvalue()
name_parts = original_filename.rsplit(".", 1)
if len(name_parts) == 2:
base_name, ext = name_parts
filename = f"{base_name}-nobg.{ext}"
else:
filename = f"{original_filename}-nobg.png"
result_url = upload_to_uploadthing(img_bytes, filename)
r.hset(
f"{PREFIX}:job_status:{job_id}",
mapping={"status": "completed", "result_url": result_url},
)
r.expire(f"{PREFIX}:job_status:{job_id}", 3600)
logger.info(f"Job {job_id} completed successfully. URL: {result_url}")
except Exception as e:
logger.error(f"Error processing job: {str(e)}")
if "job_id" in locals():
r.hset(f"{PREFIX}:job_status:{job_id}", mapping={"status": "failed"})
r.expire(f"{PREFIX}:job_status:{job_id}", 3600)
def worker_loop():
logger.info(f"Starting Redis worker loop... Listening on queue: {PREFIX}:job_queue")
while True:
try:
result = r.brpop(f"{PREFIX}:job_queue", timeout=0)
if result:
_, job_data_str = result
process_job(job_data_str)
except Exception as e:
logger.error(f"Redis connection/worker loop error: {str(e)}")
time.sleep(5)
@app.on_event("startup")
def startup_event():
thread = threading.Thread(target=worker_loop, daemon=True)
thread.start()
@app.get("/")
def health_check():
return {"status": "Ok", "message": "NoBG Worker is Running"}
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", 8000))
uvicorn.run(app, host="0.0.0.0", port=port)