File size: 3,898 Bytes
2489488
 
 
 
 
 
 
 
de79149
2489488
 
 
1c5e4e8
2489488
de79149
 
2489488
 
 
 
01c2d0a
2489488
 
01c2d0a
 
2489488
01c2d0a
 
 
 
 
2489488
 
 
1c5e4e8
2489488
01c2d0a
2489488
 
01c2d0a
 
 
 
 
 
 
2489488
01c2d0a
 
2489488
 
 
 
 
 
01c2d0a
2489488
01c2d0a
2489488
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
01c2d0a
2489488
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import os
import io
import json
import time
import logging
import threading
import requests
import redis
from dotenv import load_dotenv
from PIL import Image
from rembg import remove, new_session
from fastapi import FastAPI
from .config import MODEL_NAME, PREFIX

load_dotenv()

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

REDIS_URL = os.getenv("REDIS_URL")
WEB_APP_URL = os.getenv("WEB_APP_URL")

if not REDIS_URL:
    logger.error("REDIS_URL is not set. Worker will fail to start.")
    exit(1)

if not WEB_APP_URL:
    logger.error("WEB_APP_URL is not set. Worker will fail to upload.")
    exit(1)

WORKER_SECRET = os.getenv("WORKER_SECRET")

r = redis.Redis.from_url(REDIS_URL, decode_responses=True)

session = new_session(MODEL_NAME)

app = FastAPI(title="NoBG Worker")


def upload_to_uploadthing(image_bytes: bytes, filename: str) -> str:
    url = f"{WEB_APP_URL}/api/worker/upload"
    
    files = {"file": (filename, image_bytes, "image/png")}
    headers = {"Authorization": f"Bearer {WORKER_SECRET}"} if WORKER_SECRET else {}
    
    response = requests.post(url, files=files, headers=headers)

    if response.status_code != 200:
        logger.error(f"Worker Upload API error: {response.status_code} - {response.text}")
        response.raise_for_status()

    data = response.json()
    blob_url = data.get("url")

    if not blob_url:
        raise ValueError("Worker Upload API did not return a URL")

    logger.info(f"Successfully uploaded to Uploadthing: {blob_url}")
    return blob_url


def process_job(job_data_str: str):
    try:
        job = json.loads(job_data_str)
        job_id = job["id"]
        source_url = job["url"]
        original_filename = job.get("filename", "image.png")

        logger.info(f"Processing job {job_id} from {source_url}")

        response = requests.get(source_url)
        response.raise_for_status()

        r.hset(f"{PREFIX}:job_status:{job_id}", mapping={"status": "processing"})

        input_image = Image.open(io.BytesIO(response.content))
        output_image = remove(input_image, session=session)

        img_byte_arr = io.BytesIO()
        output_image.save(img_byte_arr, format="PNG")
        img_bytes = img_byte_arr.getvalue()

        name_parts = original_filename.rsplit(".", 1)
        if len(name_parts) == 2:
            base_name, ext = name_parts
            filename = f"{base_name}-nobg.{ext}"
        else:
            filename = f"{original_filename}-nobg.png"

        result_url = upload_to_uploadthing(img_bytes, filename)

        r.hset(
            f"{PREFIX}:job_status:{job_id}",
            mapping={"status": "completed", "result_url": result_url},
        )
        r.expire(f"{PREFIX}:job_status:{job_id}", 3600)

        logger.info(f"Job {job_id} completed successfully. URL: {result_url}")

    except Exception as e:
        logger.error(f"Error processing job: {str(e)}")
        if "job_id" in locals():
            r.hset(f"{PREFIX}:job_status:{job_id}", mapping={"status": "failed"})
            r.expire(f"{PREFIX}:job_status:{job_id}", 3600)


def worker_loop():
    logger.info(f"Starting Redis worker loop... Listening on queue: {PREFIX}:job_queue")
    while True:
        try:
            result = r.brpop(f"{PREFIX}:job_queue", timeout=0)
            if result:
                _, job_data_str = result
                process_job(job_data_str)
        except Exception as e:
            logger.error(f"Redis connection/worker loop error: {str(e)}")
            time.sleep(5)


@app.on_event("startup")
def startup_event():
    thread = threading.Thread(target=worker_loop, daemon=True)
    thread.start()


@app.get("/")
def health_check():
    return {"status": "Ok", "message": "NoBG Worker is Running"}


if __name__ == "__main__":
    import uvicorn

    port = int(os.getenv("PORT", 8000))
    uvicorn.run(app, host="0.0.0.0", port=port)