Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import torch | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import networkx as nx | |
| from PIL import Image | |
| import torchvision.transforms as T | |
| from typing import Dict, List, Tuple, Any, Union, Optional | |
| import logging | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| # Import from your existing code | |
| from ultralytics import YOLO | |
| from math import isclose | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Set random seeds for reproducibility | |
| def set_seeds(seed=42): | |
| import random | |
| random.seed(seed) | |
| np.random.seed(seed) | |
| torch.manual_seed(seed) | |
| torch.cuda.manual_seed_all(seed) | |
| torch.backends.cudnn.deterministic = True | |
| torch.backends.cudnn.benchmark = False | |
| # Call this at the start | |
| set_seeds(42) | |
| # Configuration | |
| CONFIG = { | |
| "img_size": 512, | |
| "model": { | |
| "backbone": "resnet50", | |
| "embedding_dim": 512, | |
| "hidden_dim": 256, | |
| }, | |
| "yolo": { | |
| "model": "yolov8n.pt", # Using the smallest YOLOv8 model for speed | |
| "conf": 0.25, # Default confidence threshold | |
| "iou": 0.45, # Default IoU threshold for NMS | |
| }, | |
| } | |
| # Vocabulary class | |
| class Vocabulary: | |
| """Vocabulary for objects, attributes, and relationships in scene graphs.""" | |
| def __init__(self): | |
| # Initialize dictionaries for mapping between terms and IDs | |
| self.object2id = {"<unk>": 0} | |
| self.id2object = {0: "<unk>"} | |
| self.relationship2id = {"<unk>": 0} | |
| self.id2relationship = {0: "<unk>"} | |
| self.attribute2id = {"<unk>": 0} | |
| self.id2attribute = {0: "<unk>"} | |
| def get_object_id(self, obj_name: str) -> int: | |
| return self.object2id.get(obj_name, 0) # Return <unk> ID if not found | |
| def get_relationship_id(self, rel_name: str) -> int: | |
| return self.relationship2id.get(rel_name, 0) # Return <unk> ID if not found | |
| def get_attribute_id(self, attr_name: str) -> int: | |
| return self.attribute2id.get(attr_name, 0) # Return <unk> ID if not found | |
| def get_object_name(self, obj_id: int) -> str: | |
| return self.id2object.get(obj_id, "<unk>") | |
| def get_relationship_name(self, rel_id: int) -> str: | |
| return self.id2relationship.get(rel_id, "<unk>") | |
| def get_attribute_name(self, attr_id: int) -> str: | |
| return self.id2attribute.get(attr_id, "<unk>") | |
| def load(cls, path: str) -> "Vocabulary": | |
| """Load vocabulary from a JSON file.""" | |
| vocab = cls() | |
| with open(path, "r") as f: | |
| data = json.load(f) | |
| # Load objects | |
| vocab.object2id = data["objects"] | |
| vocab.id2object = { | |
| int(k): v for k, v in {v: k for k, v in vocab.object2id.items()}.items() | |
| } | |
| # Load relationships | |
| vocab.relationship2id = data["relationships"] | |
| vocab.id2relationship = { | |
| int(k): v | |
| for k, v in {v: k for k, v in vocab.relationship2id.items()}.items() | |
| } | |
| # Load attributes | |
| vocab.attribute2id = data["attributes"] | |
| vocab.id2attribute = { | |
| int(k): v for k, v in {v: k for k, v in vocab.attribute2id.items()}.items() | |
| } | |
| return vocab | |
| # Model Architecture | |
| class VisualFeatureEncoder(torch.nn.Module): | |
| """Visual feature encoder for scene graph generation.""" | |
| def __init__( | |
| self, | |
| backbone_name: str = "resnet50", | |
| pretrained: bool = False, | |
| ): | |
| super().__init__() | |
| self.backbone_name = backbone_name | |
| self.backbone, self.out_channels = self._get_backbone(backbone_name, pretrained) | |
| def _get_backbone( | |
| self, backbone_name: str, pretrained: bool | |
| ) -> Tuple[torch.nn.Module, int]: | |
| """Get backbone network and output channels.""" | |
| if backbone_name == "resnet50": | |
| from torchvision.models import resnet50 | |
| backbone = resnet50(pretrained=pretrained) | |
| # Remove the last FC layer | |
| backbone = torch.nn.Sequential(*list(backbone.children())[:-2]) | |
| out_channels = 2048 | |
| else: | |
| raise ValueError(f"Unsupported backbone: {backbone_name}") | |
| return backbone, out_channels | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| """Extract features from images.""" | |
| return self.backbone(x) | |
| class RelationshipPredictor(torch.nn.Module): | |
| """Predicts relationships between object pairs.""" | |
| def __init__( | |
| self, | |
| num_obj_classes: int, | |
| num_rel_classes: int, | |
| obj_embed_dim: int = 256, | |
| rel_embed_dim: int = 256, | |
| hidden_dim: int = 512, | |
| dropout: float = 0.2, | |
| ): | |
| super().__init__() | |
| # Object embeddings | |
| self.obj_embedding = torch.nn.Embedding(num_obj_classes, obj_embed_dim) | |
| # Spatial feature extractor | |
| self.spatial_fc = torch.nn.Sequential( | |
| torch.nn.Linear(10, 64), # 10 = 5 (subject) + 5 (object) spatial features | |
| torch.nn.ReLU(), | |
| torch.nn.Dropout(dropout), | |
| torch.nn.Linear(64, 128), | |
| torch.nn.ReLU(), | |
| ) | |
| # Visual feature fusion | |
| self.visual_fusion = torch.nn.Sequential( | |
| torch.nn.Linear(obj_embed_dim * 2 + 128, hidden_dim), | |
| torch.nn.ReLU(), | |
| torch.nn.Dropout(dropout), | |
| torch.nn.Linear(hidden_dim, hidden_dim), | |
| torch.nn.ReLU(), | |
| ) | |
| # Relationship classifier | |
| self.rel_classifier = torch.nn.Linear(hidden_dim, num_rel_classes) | |
| def forward( | |
| self, | |
| obj_features: List[torch.Tensor], | |
| obj_boxes: List[torch.Tensor], | |
| obj_pairs: List[torch.Tensor], | |
| ) -> Dict[str, List[torch.Tensor]]: | |
| """Forward pass for relationship prediction.""" | |
| results = {} | |
| all_rel_logits = [] | |
| # Process each example in the batch | |
| for i, (feats, boxes, pairs) in enumerate( | |
| zip(obj_features, obj_boxes, obj_pairs) | |
| ): | |
| if len(pairs) == 0 or boxes.size(0) == 0: | |
| # No relationships to predict | |
| all_rel_logits.append(None) | |
| continue | |
| # Extract object classes from boxes | |
| obj_classes = boxes[:, 4].long() | |
| obj_embeds = self.obj_embedding(obj_classes) | |
| # Create pairs of object features | |
| subj_idx = pairs[:, 0].long() | |
| obj_idx = pairs[:, 1].long() | |
| subj_feats = obj_embeds[subj_idx] | |
| obj_feats = obj_embeds[obj_idx] | |
| # Spatial features | |
| subj_boxes = boxes[subj_idx, :4] # [x_c, y_c, w, h] | |
| obj_boxes = boxes[obj_idx, :4] # [x_c, y_c, w, h] | |
| # Compute relative spatial features | |
| delta_x = subj_boxes[:, 0] - obj_boxes[:, 0] | |
| delta_y = subj_boxes[:, 1] - obj_boxes[:, 1] | |
| # Concatenate spatial features | |
| spatial_feats = torch.cat( | |
| [subj_boxes, obj_boxes, delta_x.unsqueeze(1), delta_y.unsqueeze(1)], | |
| dim=1, | |
| ) | |
| spatial_feats = self.spatial_fc(spatial_feats) | |
| # Concatenate subject and object features | |
| subj_obj_feats = torch.cat([subj_feats, obj_feats, spatial_feats], dim=1) | |
| # Visual fusion | |
| fused_feats = self.visual_fusion(subj_obj_feats) | |
| # Predict relationships | |
| rel_logits = self.rel_classifier(fused_feats) | |
| all_rel_logits.append(rel_logits) | |
| results["rel_logits"] = all_rel_logits | |
| return results | |
| class SceneGraphGenerationModel(torch.nn.Module): | |
| """Complete scene graph generation model.""" | |
| def __init__( | |
| self, | |
| backbone: torch.nn.Module, | |
| num_obj_classes: int, | |
| num_rel_classes: int, | |
| num_attr_classes: int, | |
| roi_size: int = 7, | |
| embedding_dim: int = 512, | |
| hidden_dim: int = 256, | |
| dropout: float = 0.0, | |
| ): | |
| super().__init__() | |
| self.backbone = backbone | |
| self.num_obj_classes = num_obj_classes | |
| self.num_rel_classes = num_rel_classes | |
| # RoI pooling for object features | |
| self.roi_size = roi_size | |
| self.roi_pool = torch.nn.AdaptiveAvgPool2d((roi_size, roi_size)) | |
| # Object feature embedding | |
| self.obj_feature_embedding = torch.nn.Sequential( | |
| torch.nn.Linear(backbone.out_channels * roi_size * roi_size, embedding_dim), | |
| torch.nn.ReLU(), | |
| torch.nn.Dropout(dropout), | |
| ) | |
| # Object classifier | |
| self.obj_classifier = torch.nn.Linear(embedding_dim, num_obj_classes) | |
| # Attribute classifier | |
| self.attr_classifier = torch.nn.Linear(embedding_dim, num_attr_classes) | |
| # Bounding box regressor | |
| self.bbox_regressor = torch.nn.Linear(embedding_dim, 4) # [x_c, y_c, w, h] | |
| # Relationship predictor | |
| self.relationship_predictor = RelationshipPredictor( | |
| num_obj_classes=num_obj_classes, | |
| num_rel_classes=num_rel_classes, | |
| obj_embed_dim=embedding_dim, | |
| hidden_dim=hidden_dim, | |
| dropout=dropout, | |
| ) | |
| def extract_roi_features( | |
| self, | |
| features: torch.Tensor, # [batch_size, channels, height, width] | |
| boxes: List[ | |
| torch.Tensor | |
| ], # List of [num_boxes, 4] tensors with normalized boxes | |
| ) -> List[torch.Tensor]: | |
| """Extract RoI features for objects.""" | |
| batch_size = features.shape[0] | |
| roi_features = [] | |
| for i in range(batch_size): | |
| if len(boxes[i]) == 0: | |
| # No objects in this image | |
| roi_features.append( | |
| torch.empty( | |
| 0, | |
| self.backbone.out_channels * self.roi_size**2, | |
| device=features.device, | |
| ) | |
| ) | |
| continue | |
| # Convert normalized [x_c, y_c, w, h] to [x1, y1, x2, y2] | |
| bbox = boxes[i][:, :4] | |
| x_c, y_c, w, h = bbox[:, 0], bbox[:, 1], bbox[:, 2], bbox[:, 3] | |
| x1 = (x_c - w / 2) * features.shape[3] | |
| y1 = (y_c - h / 2) * features.shape[2] | |
| x2 = (x_c + w / 2) * features.shape[3] | |
| y2 = (y_c + h / 2) * features.shape[2] | |
| # Ensure boxes are within image | |
| x1 = torch.clamp(x1, 0, features.shape[3] - 1) | |
| y1 = torch.clamp(y1, 0, features.shape[2] - 1) | |
| x2 = torch.clamp(x2, 0, features.shape[3] - 1) | |
| y2 = torch.clamp(y2, 0, features.shape[2] - 1) | |
| # Create RoI boxes for torchvision's RoIPool | |
| rois = torch.stack([x1, y1, x2, y2], dim=1) | |
| # Extract features for each RoI | |
| obj_features = [] | |
| for roi in rois: | |
| x1, y1, x2, y2 = map(int, roi.cpu().numpy()) | |
| # Ensure valid box dimensions | |
| if x2 <= x1 or y2 <= y1: | |
| roi_feat = torch.zeros( | |
| self.backbone.out_channels, | |
| self.roi_size, | |
| self.roi_size, | |
| device=features.device, | |
| ) | |
| else: | |
| # Extract feature for this ROI | |
| roi_feat = self.roi_pool( | |
| features[i, :, y1:y2, x1:x2].unsqueeze(0) | |
| ).squeeze(0) | |
| # Flatten the feature | |
| roi_feat = roi_feat.view(-1) | |
| obj_features.append(roi_feat) | |
| if obj_features: | |
| obj_features = torch.stack(obj_features) | |
| else: | |
| obj_features = torch.empty( | |
| 0, | |
| self.backbone.out_channels * self.roi_size**2, | |
| device=features.device, | |
| ) | |
| roi_features.append(obj_features) | |
| return roi_features | |
| def forward( | |
| self, images: torch.Tensor, boxes: List[torch.Tensor] | |
| ) -> Dict[str, Any]: | |
| """Forward pass for scene graph generation.""" | |
| batch_size = images.shape[0] | |
| # Extract features from backbone | |
| features = self.backbone(images) | |
| # Extract RoI features | |
| roi_features = self.extract_roi_features(features, boxes) | |
| # Process each example in the batch | |
| obj_logits_list = [] | |
| attr_logits_list = [] | |
| bbox_pred_list = [] | |
| obj_features_list = [] | |
| for i in range(batch_size): | |
| if roi_features[i].shape[0] == 0: | |
| # No objects in this image | |
| obj_logits_list.append( | |
| torch.empty(0, self.num_obj_classes, device=images.device) | |
| ) | |
| attr_logits_list.append( | |
| torch.empty(0, self.num_attr_classes, device=images.device) | |
| ) | |
| bbox_pred_list.append(torch.empty(0, 4, device=images.device)) | |
| obj_features_list.append( | |
| torch.empty( | |
| 0, | |
| self.obj_feature_embedding[0].out_features, | |
| device=images.device, | |
| ) | |
| ) | |
| continue | |
| # Embed RoI features | |
| obj_feats = self.obj_feature_embedding(roi_features[i]) | |
| obj_features_list.append(obj_feats) | |
| # Predict object classes | |
| obj_logits = self.obj_classifier(obj_feats) | |
| obj_logits_list.append(obj_logits) | |
| # Predict attributes | |
| attr_logits = self.attr_classifier(obj_feats) | |
| attr_logits_list.append(attr_logits) | |
| # Regress bounding box refinements | |
| bbox_pred = self.bbox_regressor(obj_feats) | |
| bbox_pred_list.append(bbox_pred) | |
| # Create object pairs for relationship prediction | |
| obj_pairs = [] | |
| for i in range(batch_size): | |
| if boxes[i].shape[0] <= 1: | |
| # Need at least 2 objects for relationships | |
| obj_pairs.append(torch.empty(0, 2, device=images.device)) | |
| continue | |
| # Create all possible object pairs | |
| num_objs = boxes[i].shape[0] | |
| subj_idx = torch.arange(num_objs, device=images.device).repeat_interleave( | |
| num_objs | |
| ) | |
| obj_idx = torch.arange(num_objs, device=images.device).repeat(num_objs) | |
| # Exclude self-relationships | |
| mask = subj_idx != obj_idx | |
| pairs = torch.stack([subj_idx[mask], obj_idx[mask]], dim=1) | |
| obj_pairs.append(pairs) | |
| # Predict relationships | |
| rel_preds = self.relationship_predictor(obj_features_list, boxes, obj_pairs) | |
| return { | |
| "obj_logits": obj_logits_list, | |
| "attr_logits": attr_logits_list, | |
| "bbox_pred": bbox_pred_list, | |
| "rel_logits": rel_preds.get("rel_logits", []), | |
| "obj_pairs": obj_pairs, | |
| } | |
| # YOLO-based object detection | |
| def detect_objects_yolo( | |
| image_path: str, | |
| vocabulary: Vocabulary, | |
| device: torch.device, | |
| use_fixed_boxes: bool = False, | |
| ) -> torch.Tensor: | |
| """ | |
| Detect objects in an image using YOLOv8. | |
| Args: | |
| image_path: Path to the input image | |
| vocabulary: Vocabulary for mapping class names | |
| device: PyTorch device | |
| use_fixed_boxes: Whether to use fixed boxes or YOLO detection | |
| Returns: | |
| Bounding boxes in format [x_c, y_c, w, h, class_id] (normalized) | |
| """ | |
| # Load YOLOv8 model - will download if not present | |
| yolo_model = YOLO("app/models/yolov8n.pt") | |
| # Run inference | |
| results = yolo_model(image_path, conf=0.45) | |
| detections = results[0] | |
| # No detections | |
| if len(detections.boxes) == 0: | |
| return torch.zeros((0, 5), device=device, dtype=torch.float32) | |
| # Process detections | |
| boxes = [] | |
| # Get image dimensions | |
| img = Image.open(image_path) | |
| img_width, img_height = img.size | |
| # YOLO class names (COCO class names) | |
| yolo_class_names = yolo_model.names | |
| # Create class name mapping from YOLO to our vocabulary | |
| class_name_map = {} | |
| for yolo_id, yolo_name in yolo_class_names.items(): | |
| # Try direct mapping first | |
| if yolo_name in vocabulary.object2id: | |
| class_name_map[yolo_id] = vocabulary.get_object_id(yolo_name) | |
| # Try lowercase | |
| elif yolo_name.lower() in vocabulary.object2id: | |
| class_name_map[yolo_id] = vocabulary.get_object_id(yolo_name.lower()) | |
| # Fallback to <unk> | |
| else: | |
| class_name_map[yolo_id] = 0 # <unk> | |
| # Process each detection | |
| for i in range(len(detections.boxes)): | |
| box = detections.boxes[i] | |
| # Get class ID and confidence | |
| cls_id = int(box.cls.item()) | |
| confidence = box.conf.item() | |
| # Skip low-confidence detections | |
| if confidence < CONFIG["yolo"]["conf"]: | |
| continue | |
| # Get bounding box in xyxy format (unnormalized) | |
| x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() | |
| # Convert to xywh format and normalize | |
| x_c = ((x1 + x2) / 2) / img_width | |
| y_c = ((y1 + y2) / 2) / img_height | |
| w = (x2 - x1) / img_width | |
| h = (y2 - y1) / img_height | |
| # Map class ID to vocabulary | |
| vocab_cls_id = class_name_map.get(cls_id, 0) # Default to <unk> if not found | |
| # Add to boxes | |
| boxes.append([x_c, y_c, w, h, vocab_cls_id]) | |
| # Convert to tensor with explicit float32 dtype | |
| if boxes: | |
| return torch.tensor(boxes, device=device, dtype=torch.float32) | |
| else: | |
| return torch.zeros((0, 5), device=device, dtype=torch.float32) | |
| # Visualization functions | |
| def visualize_image_with_boxes( | |
| image: np.ndarray, objects: List[Dict[str, Any]], output_path: str | |
| ) -> None: | |
| """Visualize image with bounding boxes and labels.""" | |
| # Create figure | |
| plt.figure(figsize=(10, 8)) | |
| # Display image | |
| plt.imshow(image) | |
| # Get image dimensions | |
| img_height, img_width = image.shape[:2] | |
| # Generate colors for classes | |
| num_classes = len(objects) | |
| colors = plt.cm.hsv(np.linspace(0, 1, num_classes)) | |
| # Draw bounding boxes and labels | |
| for i, obj in enumerate(objects): | |
| # Get bounding box | |
| x_c, y_c, w, h = obj["bbox"] | |
| # Scale to image size if normalized | |
| if max(x_c, y_c, w, h) <= 1.0: | |
| x_c *= img_width | |
| y_c *= img_height | |
| w *= img_width | |
| h *= img_height | |
| # Convert to (x1, y1, x2, y2) format | |
| x1 = x_c - w / 2 | |
| y1 = y_c - h / 2 | |
| x2 = x_c + w / 2 | |
| y2 = y_c + h / 2 | |
| # Draw bounding box | |
| rect = plt.Rectangle( | |
| (x1, y1), | |
| x2 - x1, | |
| y2 - y1, | |
| linewidth=2, | |
| edgecolor=colors[i % len(colors)], | |
| facecolor="none", | |
| ) | |
| plt.gca().add_patch(rect) | |
| # Draw label | |
| plt.text( | |
| x1, | |
| y1 - 5, | |
| f"{obj['label']} ({obj['score']:.2f})", | |
| color=colors[i % len(colors)], | |
| fontsize=10, | |
| bbox=dict(facecolor="white", alpha=0.7, edgecolor="none", pad=1), | |
| ) | |
| # Add a title | |
| plt.title("Object Detection") | |
| plt.axis("off") | |
| # Save the figure | |
| plt.tight_layout() | |
| plt.savefig(output_path, dpi=300, bbox_inches="tight") | |
| plt.close() | |
| logger.info(f"Annotated image saved to {output_path}") | |
| def visualize_graph( | |
| objects: List[Dict[str, Any]], relationships: List[Dict[str, Any]], output_path: str | |
| ) -> None: | |
| """Visualize relationship graph.""" | |
| # Create figure | |
| plt.figure(figsize=(10, 8)) | |
| # Create graph | |
| G = nx.DiGraph() | |
| # Add nodes | |
| for i, obj in enumerate(objects): | |
| G.add_node(i, label=obj["label"]) | |
| # Add edges | |
| for rel in relationships: | |
| subj_idx = rel["subject_id"] | |
| obj_idx = rel["object_id"] | |
| G.add_edge(subj_idx, obj_idx, label=rel["predicate"]) | |
| # Position nodes | |
| pos = nx.spring_layout(G, seed=42) | |
| # Draw nodes | |
| nx.draw_networkx_nodes(G, pos, node_size=700, node_color="skyblue", alpha=0.8) | |
| # Draw node labels | |
| nx.draw_networkx_labels(G, pos, font_size=10, font_weight="bold") | |
| # Draw edges | |
| nx.draw_networkx_edges(G, pos, width=2, alpha=0.7, arrows=True, arrowsize=15) | |
| # Draw edge labels | |
| nx.draw_networkx_edge_labels( | |
| G, pos, edge_labels=nx.get_edge_attributes(G, "label"), font_size=8 | |
| ) | |
| # Add a title | |
| plt.title("Scene Graph") | |
| plt.axis("off") | |
| # Save the figure | |
| plt.tight_layout() | |
| plt.savefig(output_path, dpi=300, bbox_inches="tight") | |
| plt.close() | |
| logger.info(f"Graph visualization saved to {output_path}") | |
| def process_image( | |
| image_path: str, | |
| model_path: str, | |
| vocabulary_path: str, | |
| confidence_threshold: float = 0.5, | |
| use_fixed_boxes: bool = False, | |
| output_dir: str = "outputs", | |
| base_filename: str = None, | |
| ) -> Tuple[List, List, str, str]: | |
| """ | |
| Process an image to generate a scene graph. | |
| Args: | |
| image_path: Path to the input image | |
| model_path: Path to the model checkpoint | |
| vocabulary_path: Path to the vocabulary file | |
| confidence_threshold: Confidence threshold for relationships | |
| use_fixed_boxes: Whether to use fixed boxes or YOLO detection | |
| output_dir: Directory to save outputs | |
| base_filename: Optional base filename to use instead of the original image name | |
| Returns: | |
| Tuple of (objects, relationships, annotated_image_path, graph_path) | |
| """ | |
| # Check if files exist | |
| if not os.path.exists(image_path): | |
| raise FileNotFoundError(f"Image not found at {image_path}") | |
| if not os.path.exists(model_path): | |
| raise FileNotFoundError(f"Model not found at {model_path}") | |
| if not os.path.exists(vocabulary_path): | |
| raise FileNotFoundError(f"Vocabulary not found at {vocabulary_path}") | |
| # Create output directory if it doesn't exist | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Load vocabulary | |
| vocabulary = Vocabulary.load(vocabulary_path) | |
| logger.info( | |
| f"Loaded vocabulary with {len(vocabulary.object2id)} objects and {len(vocabulary.relationship2id)} relationships" | |
| ) | |
| # Set device | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| logger.info(f"Using device: {device}") | |
| # Load and preprocess image | |
| image = Image.open(image_path).convert("RGB") | |
| img_width, img_height = image.size | |
| # Use YOLO for object detection | |
| logger.info("Detecting objects with YOLO...") | |
| boxes = detect_objects_yolo(image_path, vocabulary, device, use_fixed_boxes) | |
| logger.info(f"Detected {len(boxes)} objects") | |
| if len(boxes) == 0: | |
| raise ValueError("No objects detected. Cannot generate scene graph.") | |
| # Create encoder | |
| encoder = VisualFeatureEncoder(backbone_name=CONFIG["model"]["backbone"]) | |
| # Create model | |
| model = SceneGraphGenerationModel( | |
| backbone=encoder, | |
| num_obj_classes=len(vocabulary.object2id), | |
| num_rel_classes=len(vocabulary.relationship2id), | |
| num_attr_classes=len(vocabulary.attribute2id), | |
| embedding_dim=CONFIG["model"]["embedding_dim"], | |
| hidden_dim=CONFIG["model"]["hidden_dim"], | |
| ) | |
| # Load model weights | |
| logger.info(f"Loading model from {model_path}...") | |
| checkpoint = torch.load(model_path, map_location=device) | |
| if "model_state_dict" in checkpoint: | |
| model.load_state_dict(checkpoint["model_state_dict"]) | |
| logger.info("Loaded model state dict from checkpoint") | |
| else: | |
| model.load_state_dict(checkpoint) | |
| logger.info("Loaded direct model state from checkpoint") | |
| model.to(device) | |
| model.eval() | |
| # Preprocess image for scene graph model | |
| transform = T.Compose( | |
| [ | |
| T.Resize((CONFIG["img_size"], CONFIG["img_size"])), | |
| T.ToTensor(), | |
| T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), | |
| ] | |
| ) | |
| img_tensor = transform(image).unsqueeze(0).to(device) | |
| # Run inference for scene graph generation | |
| logger.info("Generating scene graph...") | |
| with torch.no_grad(): | |
| # Forward pass | |
| outputs = model(img_tensor, [boxes]) | |
| # Process predictions | |
| obj_logits = outputs["obj_logits"][0] | |
| obj_probs = torch.softmax(obj_logits, dim=1) | |
| obj_scores, obj_labels = torch.max(obj_probs, dim=1) | |
| # Get bounding box predictions | |
| bbox_pred = outputs["bbox_pred"][0] | |
| # Create object list | |
| objects = [] | |
| for i in range(len(obj_labels)): | |
| bbox = bbox_pred[i].cpu().numpy().tolist() | |
| label_id = obj_labels[i].item() | |
| score = obj_scores[i].item() | |
| objects.append( | |
| { | |
| "label": vocabulary.get_object_name(label_id), | |
| "label_id": label_id, | |
| "score": score, | |
| "bbox": bbox, | |
| } | |
| ) | |
| # Process relationships | |
| relationships = [] | |
| if "rel_logits" in outputs and outputs["rel_logits"]: | |
| rel_logits = outputs["rel_logits"][0] | |
| obj_pairs = outputs["obj_pairs"][0] | |
| if rel_logits is not None and len(rel_logits) > 0: | |
| rel_probs = torch.softmax(rel_logits, dim=1) | |
| rel_scores, rel_labels = torch.max(rel_probs, dim=1) | |
| # Filter by confidence | |
| rel_mask = rel_scores > confidence_threshold | |
| rel_labels = rel_labels[rel_mask] | |
| rel_scores = rel_scores[rel_mask] | |
| filtered_pairs = obj_pairs[rel_mask] | |
| # Create relationship list | |
| for i in range(len(rel_labels)): | |
| subj_idx = filtered_pairs[i, 0].item() | |
| obj_idx = filtered_pairs[i, 1].item() | |
| label_id = rel_labels[i].item() | |
| score = rel_scores[i].item() | |
| # Map to filtered object indices | |
| subj_new_idx = -1 | |
| obj_new_idx = -1 | |
| for j, obj in enumerate(objects): | |
| if j == subj_idx: | |
| subj_new_idx = j | |
| if j == obj_idx: | |
| obj_new_idx = j | |
| if subj_new_idx != -1 and obj_new_idx != -1: | |
| relationships.append( | |
| { | |
| "subject_id": subj_new_idx, | |
| "object_id": obj_new_idx, | |
| "predicate": vocabulary.get_relationship_name(label_id), | |
| "predicate_id": label_id, | |
| "score": score, | |
| "subject": objects[subj_new_idx]["label"], | |
| "object": objects[obj_new_idx]["label"], | |
| } | |
| ) | |
| # Determine base filename for output files | |
| if base_filename: | |
| # Use provided base filename if specified | |
| file_prefix = base_filename | |
| else: | |
| # Otherwise use the original image name | |
| file_prefix = os.path.splitext(os.path.basename(image_path))[0] | |
| # Generate output filenames with consistent naming pattern | |
| annotated_image_path = os.path.join(output_dir, f"{file_prefix}_annotated.png") | |
| graph_path = os.path.join(output_dir, f"{file_prefix}_graph.png") | |
| # Log the paths for debugging | |
| logger.info(f"Using file prefix: {file_prefix}") | |
| logger.info(f"Saving annotated image to: {annotated_image_path}") | |
| logger.info(f"Saving graph to: {graph_path}") | |
| # Save visualizations | |
| visualize_image_with_boxes(np.array(image), objects, annotated_image_path) | |
| visualize_graph(objects, relationships, graph_path) | |
| logger.info(f"Visualization complete. Files saved to:") | |
| logger.info(f" - {annotated_image_path}") | |
| logger.info(f" - {graph_path}") | |
| # Convert objects for JSON serialization | |
| serializable_objects = [] | |
| for obj in objects: | |
| serializable_objects.append( | |
| { | |
| "label": obj["label"], | |
| "label_id": int(obj["label_id"]), | |
| "score": float(obj["score"]), | |
| "bbox": [float(val) for val in obj["bbox"]], | |
| } | |
| ) | |
| return serializable_objects, relationships, annotated_image_path, graph_path | |
| if __name__ == "__main__": | |
| # This can be used for testing the service directly | |
| image_path = "test.jpg" | |
| model_path = "app/models/model.pth" | |
| vocabulary_path = "app/models/vocabulary.json" | |
| objects, relationships, annotated_path, graph_path = process_image( | |
| image_path=image_path, | |
| model_path=model_path, | |
| vocabulary_path=vocabulary_path, | |
| confidence_threshold=0.3, | |
| output_dir="outputs", | |
| ) | |
| print(f"Processed {len(objects)} objects and {len(relationships)} relationships") | |