import contextlib, io, base64, torch from PIL import Image import open_clip from reparam import reparameterize_model class EndpointHandler: def __init__(self, path: str = ""): self.device = "cuda" if torch.cuda.is_available() else "cpu" # Fix 1: Load weights directly from the web, just like local script # This guarantees the weights are identical. model, _, self.preprocess = open_clip.create_model_and_transforms( "MobileCLIP-B", pretrained='datacompdr' ) model.eval() self.model = reparameterize_model(model) # fuse branches self.tokenizer = open_clip.get_tokenizer("MobileCLIP-B") self.model.to(self.device) # Fix 2: Explicitly set model to half-precision if on CUDA # This matches the behavior of torch.set_default_dtype(torch.float16) if self.device == "cuda": self.model.to(torch.float16) def __call__(self, data): payload = data.get("inputs", data) img_b64 = payload["image"] labels = payload.get("candidate_labels", []) if not labels: return {"error": "candidate_labels list is empty"} # ---------------- decode inputs ---------------- image = Image.open(io.BytesIO(base64.b64decode(img_b64))).convert("RGB") img_tensor = self.preprocess(image).unsqueeze(0).to(self.device) # The preprocessor might output float32, so ensure tensor matches model dtype if self.device == "cuda": img_tensor = img_tensor.to(torch.float16) text_tokens = self.tokenizer(labels).to(self.device) # ---------------- forward pass ----------------- # No need for autocast if everything is already float16 with torch.no_grad(): img_feat = self.model.encode_image(img_tensor) txt_feat = self.model.encode_text(text_tokens) img_feat /= img_feat.norm(dim=-1, keepdim=True) txt_feat /= txt_feat.norm(dim=-1, keepdim=True) probs = (100 * img_feat @ txt_feat.T).softmax(dim=-1)[0].cpu().tolist() return [ {"label": l, "score": float(p)} for l, p in sorted(zip(labels, probs), key=lambda x: x[1], reverse=True) ] # import contextlib, io, base64, torch # from PIL import Image # import open_clip # from reparam import reparameterize_model # class EndpointHandler: # def __init__(self, path: str = ""): # # You can also pass pretrained='datacompdr' to let OpenCLIP download # weights = f"{path}/mobileclip_b.pt" # self.model, _, self.preprocess = open_clip.create_model_and_transforms( # "MobileCLIP-B", pretrained=weights # ) # self.model.eval() # self.model = reparameterize_model(self.model) # *** fuse branches *** # self.device = "cuda" if torch.cuda.is_available() else "cpu" # self.model.to(self.device) # self.tokenizer = open_clip.get_tokenizer("MobileCLIP-B") # def __call__(self, data): # payload = data.get("inputs", data) # img_b64 = payload["image"] # labels = payload.get("candidate_labels", []) # if not labels: # return {"error": "candidate_labels list is empty"} # # ---------------- decode inputs ---------------- # image = Image.open(io.BytesIO(base64.b64decode(img_b64))).convert("RGB") # img_tensor = self.preprocess(image).unsqueeze(0).to(self.device) # text_tokens = self.tokenizer(labels).to(self.device) # # ---------------- forward pass ----------------- # autocast_ctx = ( # torch.cuda.amp.autocast if self.device.startswith("cuda") else contextlib.nullcontext # ) # with torch.no_grad(), autocast_ctx(): # img_feat = self.model.encode_image(img_tensor) # txt_feat = self.model.encode_text(text_tokens) # img_feat /= img_feat.norm(dim=-1, keepdim=True) # txt_feat /= txt_feat.norm(dim=-1, keepdim=True) # probs = (100 * img_feat @ txt_feat.T).softmax(dim=-1)[0].tolist() # return [ # {"label": l, "score": float(p)} # for l, p in sorted(zip(labels, probs), key=lambda x: x[1], reverse=True) # ]