object-memory / core /processing.py
russ4stall
fresh history
24f3fb6
import numpy as np
import cv2
from PIL import Image
import clip
import torch
from .models import get_sam_predictor, get_clip, get_device, get_groundingdino_model, get_dinov2_large
from torchvision.ops import box_convert
from groundingdino.datasets.transforms import Compose, RandomResize, ToTensor, Normalize
from groundingdino.util.inference import predict as dino_predict
from torchvision import transforms
from transformers import pipeline
# Initialize an image captioning pipeline.
captioner = pipeline("image-to-text", model="nlpconnect/vit-gpt2-image-captioning")
resize_transform = transforms.Compose([
transforms.Resize(512, interpolation=Image.BICUBIC)
])
def get_dinov2_transform():
"""Get DINOv2 preprocessing transform"""
transform = transforms.Compose([
transforms.Resize(256, interpolation=transforms.InterpolationMode.BICUBIC),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
return transform
def get_dino_boxes_from_prompt(image_np: np.ndarray, prompt : str, box_threshold=0.3, text_threshold=0.25,) -> np.ndarray:
pil = Image.fromarray(image_np)
h, w = pil.height, pil.width
transform = Compose([
RandomResize([800], max_size=1333),
ToTensor(),
Normalize([0.485,0.456,0.406], [0.229,0.224,0.225])
])
# Given your PIL image:
img_t, _ = transform(pil, None) # returns tensor[C,H,W]
img_t = img_t.to(get_device()) # move to GPU if needed
# 3) run DINO’s predict API – it will tokenize, forward, and post‐process for you :contentReference[oaicite:1]{index=1}
boxes, scores, phrases = dino_predict(
model=get_groundingdino_model(),
image=img_t,
caption=prompt,
box_threshold=box_threshold,
text_threshold=text_threshold,
device=get_device()
)
if boxes.numel() == 0:
return image_np # no detections → return original
# 4) convert normalized cxcywh → absolute xyxy pixels :contentReference[oaicite:2]{index=2}
# (boxes is tensor of shape [N,4] with values in [0,1])
boxes_abs = boxes * torch.tensor([w, h, w, h], device=boxes.device)
xyxy = box_convert(boxes=boxes_abs, in_fmt="cxcywh", out_fmt="xyxy")
sam_boxes = xyxy.cpu().numpy() # shape [N,4] in pixel coords
return sam_boxes
def get_sam_mask(image_np: np.ndarray, point_coords: np.ndarray, point_labels: np.ndarray, sam_boxes : np.ndarray) -> np.ndarray:
pred = get_sam_predictor()
pred.set_image(image_np)
masks, scores, _ = pred.predict(
point_coords=point_coords,
point_labels=point_labels,
multimask_output=False,
box=sam_boxes if sam_boxes is not None else None
)
idx = int(np.argmax(scores))
selected_mask = masks[idx] # boolean mask
# Find bounding box from mask
if selected_mask is None or selected_mask.size == 0:
bbox = {"x": 0, "y": 0, "width": 0, "height": 0}
mask_2d = selected_mask
else:
# Remove singleton dimensions if present
mask_2d = np.squeeze(selected_mask)
if mask_2d.ndim != 2:
raise ValueError(f"Expected 2D mask, got shape {mask_2d.shape}")
ys, xs = np.where(mask_2d)
if len(xs) == 0 or len(ys) == 0:
# fallback in case mask is empty
bbox = {"x": 0, "y": 0, "width": 0, "height": 0}
else:
x_min, x_max = int(xs.min()), int(xs.max())
y_min, y_max = int(ys.min()), int(ys.max())
bbox = {
"x": x_min,
"y": y_min,
"width": x_max - x_min,
"height": y_max - y_min
}
return mask_2d, bbox
def embed_image_dino_large(image_np: np.ndarray) -> np.ndarray:
"""
Embed image using DINOv2 large model
Args:
image_np: numpy array representing the image
Returns:
np.ndarray: normalized 1024-dimensional embedding
"""
model = get_dinov2_large()
transform = get_dinov2_transform()
device = get_device()
# Convert numpy array to PIL Image
pil = Image.fromarray(image_np)
# Apply preprocessing
inp = transform(pil).unsqueeze(0).to(device)
# Get embeddings
with torch.no_grad():
feats = model(inp).cpu().numpy()[0]
# Normalize the features
norm = np.linalg.norm(feats)
return feats / norm if norm > 0 else feats
def embed_image_clip(image_np: np.ndarray) -> np.ndarray:
model, preprocess = get_clip()
device = get_device()
pil = Image.fromarray(image_np)
inp = preprocess(pil).unsqueeze(0).to(device)
with torch.no_grad():
feats = model.encode_image(inp).cpu().numpy()[0]
norm = np.linalg.norm(feats)
return feats / norm if norm > 0 else feats
#return [{"id": r.id, "score": r.score, "payload": r.payload} for r in results]
def embed_text(text: str) -> np.ndarray:
model, preprocess = get_clip()
device = get_device()
tokens = clip.tokenize([text]).to(device)
with torch.no_grad():
text_features = model.encode_text(tokens)
return text_features[0].cpu().numpy()
def generate_description_vllm(pil_image : np.ndarray):
"""
Generate a default caption for the image using the captioning model.
"""
output = captioner(pil_image)
return output[0]['generated_text']
def expand_coords_shape(point_coords, point_labels, box_count : int):
"""
Expands point coordinates and labels to match the number of bounding boxes.
Parameters:
point_coords: Array of shape (P, 2) representing point coordinates.
point_labels: Array of shape (P,) representing point labels.
box_count: Number of bounding boxes to tile the coordinates and labels for.
Returns:
A tuple (expanded_point_coords, expanded_point_labels) where:
- expanded_point_coords is of shape (B, P, 2).
- expanded_point_labels is of shape (B, P).
"""
# Add a batch dimension
point_coords = point_coords[None, ...] # (1, P, 2)
point_labels = point_labels[None, ...] # (1, P)
# Tile to match the number of bounding boxes
point_coords = np.tile(point_coords, (box_count, 1, 1)) # (B, P, 2)
point_labels = np.tile(point_labels, (box_count, 1)) # (B, P)
return point_coords, point_labels