File size: 3,112 Bytes
3243c68
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# ObsDrive

ObsDrive is a vision-language model designed for multimodal autonomous driving understanding, supporting camera, LiDAR BEV, and RADAR BEV inputs.

---


### 📦 Requirements

```bash
pip install torch transformers accelerate qwen-vl-utils flash-attn
```

## 🚀 Inference

```bash
import torch
from transformers import AutoProcessor, Qwen2_5_VLForConditionalGeneration
from qwen_vl_utils import process_vision_info

MODEL_PATH = "russellyq/ObsDrive/XXX"

class ObsDrive:
    def __init__(self, model_path=MODEL_PATH):
        self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
            model_path,
            torch_dtype=torch.bfloat16,
            device_map="auto",
            attn_implementation="flash_attention_2",
        )
        self.MAX_NEW_TOKENS = 4096
        self.processor = AutoProcessor.from_pretrained(
            model_path,
        )

    def chat(self, question, image=None, system_prompt=None):
        message_content = []

        # Handle image input
        if isinstance(image, str):
            message_content.append({"type": "image", "image": f"file://{image}"})
        elif isinstance(image, list):
            for img_path in image:
                message_content.append({"type": "image", "image": f"file://{img_path}"})

        # Add text input
        text = system_prompt + "\n" + question if system_prompt is not None else question
        message_content.append({"type": "text", "text": text})

        messages = [
            {
                "role": "user",
                "content": message_content,
            }
        ]

        # Apply chat template
        text_prompt = self.processor.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True,
        )

        image_inputs, video_inputs = process_vision_info(messages)

        inputs = self.processor(
            text=[text_prompt],
            images=image_inputs,
            videos=video_inputs,
            padding=True,
            return_tensors="pt",
        )

        inputs = inputs.to("cuda")

        # Generate output
        generated_ids = self.model.generate(
            **inputs,
            max_new_tokens=self.MAX_NEW_TOKENS,
            do_sample=False,
        )

        generated_ids_trimmed = [
            out_ids[len(in_ids):]
            for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
        ]

        output_text = self.processor.batch_decode(
            generated_ids_trimmed,
            skip_special_tokens=True,
            clean_up_tokenization_spaces=False,
        )[0]

        return output_text


if __name__ == "__main__":
    model = ObsDrive()

    question = "Please describe the driving scene."
    image = "/path/to/image.png"

    answer = model.chat(question, image=image)
    print(answer)
```
## 🖼️ Multi-image Inference
```bash
question = "Analyze the scene using all modalities."

images = [
    "/path/to/camera.png",
    "/path/to/lidar_bev.png",
    "/path/to/radar_bev.png",
]

answer = model.chat(question, image=images)
print(answer)
```