Spaces:
Running
Running
| import os | |
| import io | |
| import json | |
| import time | |
| import logging | |
| import threading | |
| import requests | |
| import redis | |
| from dotenv import load_dotenv | |
| from PIL import Image | |
| from rembg import remove, new_session | |
| from fastapi import FastAPI | |
| from .config import MODEL_NAME, PREFIX | |
| load_dotenv() | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| REDIS_URL = os.getenv("REDIS_URL") | |
| WEB_APP_URL = os.getenv("WEB_APP_URL") | |
| if not REDIS_URL: | |
| logger.error("REDIS_URL is not set. Worker will fail to start.") | |
| exit(1) | |
| if not WEB_APP_URL: | |
| logger.error("WEB_APP_URL is not set. Worker will fail to upload.") | |
| exit(1) | |
| WORKER_SECRET = os.getenv("WORKER_SECRET") | |
| r = redis.Redis.from_url(REDIS_URL, decode_responses=True) | |
| session = new_session(MODEL_NAME) | |
| app = FastAPI(title="NoBG Worker") | |
| def upload_to_uploadthing(image_bytes: bytes, filename: str) -> str: | |
| url = f"{WEB_APP_URL}/api/worker/upload" | |
| files = {"file": (filename, image_bytes, "image/png")} | |
| headers = {"Authorization": f"Bearer {WORKER_SECRET}"} if WORKER_SECRET else {} | |
| response = requests.post(url, files=files, headers=headers) | |
| if response.status_code != 200: | |
| logger.error(f"Worker Upload API error: {response.status_code} - {response.text}") | |
| response.raise_for_status() | |
| data = response.json() | |
| blob_url = data.get("url") | |
| if not blob_url: | |
| raise ValueError("Worker Upload API did not return a URL") | |
| logger.info(f"Successfully uploaded to Uploadthing: {blob_url}") | |
| return blob_url | |
| def process_job(job_data_str: str): | |
| try: | |
| job = json.loads(job_data_str) | |
| job_id = job["id"] | |
| source_url = job["url"] | |
| original_filename = job.get("filename", "image.png") | |
| logger.info(f"Processing job {job_id} from {source_url}") | |
| response = requests.get(source_url) | |
| response.raise_for_status() | |
| r.hset(f"{PREFIX}:job_status:{job_id}", mapping={"status": "processing"}) | |
| input_image = Image.open(io.BytesIO(response.content)) | |
| output_image = remove(input_image, session=session) | |
| img_byte_arr = io.BytesIO() | |
| output_image.save(img_byte_arr, format="PNG") | |
| img_bytes = img_byte_arr.getvalue() | |
| name_parts = original_filename.rsplit(".", 1) | |
| if len(name_parts) == 2: | |
| base_name, ext = name_parts | |
| filename = f"{base_name}-nobg.{ext}" | |
| else: | |
| filename = f"{original_filename}-nobg.png" | |
| result_url = upload_to_uploadthing(img_bytes, filename) | |
| r.hset( | |
| f"{PREFIX}:job_status:{job_id}", | |
| mapping={"status": "completed", "result_url": result_url}, | |
| ) | |
| r.expire(f"{PREFIX}:job_status:{job_id}", 3600) | |
| logger.info(f"Job {job_id} completed successfully. URL: {result_url}") | |
| except Exception as e: | |
| logger.error(f"Error processing job: {str(e)}") | |
| if "job_id" in locals(): | |
| r.hset(f"{PREFIX}:job_status:{job_id}", mapping={"status": "failed"}) | |
| r.expire(f"{PREFIX}:job_status:{job_id}", 3600) | |
| def worker_loop(): | |
| logger.info(f"Starting Redis worker loop... Listening on queue: {PREFIX}:job_queue") | |
| while True: | |
| try: | |
| result = r.brpop(f"{PREFIX}:job_queue", timeout=0) | |
| if result: | |
| _, job_data_str = result | |
| process_job(job_data_str) | |
| except Exception as e: | |
| logger.error(f"Redis connection/worker loop error: {str(e)}") | |
| time.sleep(5) | |
| def startup_event(): | |
| thread = threading.Thread(target=worker_loop, daemon=True) | |
| thread.start() | |
| def health_check(): | |
| return {"status": "Ok", "message": "NoBG Worker is Running"} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.getenv("PORT", 8000)) | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |