Autopilot-qwen3-vl / autopilot_inference.py
Aleton's picture
Upload autopilot_inference.py
9db646a verified
import os
import json
import torch
import torch.nn as nn
from PIL import Image
from typing import Dict
from huggingface_hub import hf_hub_download
from transformers import AutoProcessor, Qwen3VLForConditionalGeneration
from peft import LoraConfig, get_peft_model
class RegressionHead(nn.Module):
def __init__(self, input_dim: int, hidden_dim: int = 512,
num_layers: int = 3, dropout: float = 0.15):
super().__init__()
layers = []
current_dim = input_dim
for i in range(num_layers):
out_dim = hidden_dim
layers.append(nn.Linear(current_dim, out_dim))
if i < num_layers - 1:
layers.append(nn.LayerNorm(out_dim))
layers.append(nn.GELU())
layers.append(nn.Dropout(dropout))
current_dim = out_dim
self.backbone = nn.Sequential(*layers)
self.speed_head = nn.Linear(hidden_dim, 1)
self.steer_head = nn.Linear(hidden_dim, 1)
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
pooled = hidden_states.mean(dim=1)
features = self.backbone(pooled)
speed = self.speed_head(features)
steer = self.steer_head(features)
return torch.cat([speed, steer], dim=-1)
class AutopilotModel(nn.Module):
def __init__(self, vlm_model, config: dict):
super().__init__()
self.vlm = vlm_model
self.config = config
hidden_size = self._get_hidden_size(vlm_model)
self.regression_head = RegressionHead(
input_dim=hidden_size,
hidden_dim=config.get("hidden_dim", 512),
num_layers=config.get("num_regression_layers", 3),
dropout=config.get("dropout", 0.15),
)
@staticmethod
def _get_hidden_size(model) -> int:
cfg = model.config
if hasattr(cfg, "hidden_size"): return cfg.hidden_size
if hasattr(cfg, "text_config") and hasattr(cfg.text_config, "hidden_size"): return cfg.text_config.hidden_size
return 2048
def forward(self, **kwargs):
outputs = self.vlm(
**kwargs,
output_hidden_states=True,
return_dict=True,
)
last_hidden = outputs.hidden_states[-1]
predictions = self.regression_head(last_hidden.float())
return predictions
class AutopilotInference:
def __init__(self, model, processor, config: dict, device: str):
self.model = model
self.processor = processor
self.config = config
self.device = device
self.stats = config.get("stats", {
"speed_mean": 77.0, "speed_std": 31.5,
"steer_mean": 0.22, "steer_std": 1.26,
})
self.image_size = config.get("image_size", 384)
self.model.eval()
@classmethod
def from_pretrained(
cls,
repo_id: str,
device: str = "cuda" if torch.cuda.is_available() else "cpu",
dtype: torch.dtype = torch.bfloat16 if torch.cuda.is_available() else torch.float32,
) -> "AutopilotInference":
print(f"Загрузка конфигурации")
config_path = hf_hub_download(repo_id, "config.json")
model_path = hf_hub_download(repo_id, "model.pt")
with open(config_path) as f:
config = json.load(f)
base_model = config.get("base_model", "Qwen/Qwen3-VL-2B-Instruct")
print(f"Инициализация процессора и базовой модели")
processor = AutoProcessor.from_pretrained(base_model)
vlm = Qwen3VLForConditionalGeneration.from_pretrained(
base_model,
torch_dtype=dtype,
device_map="auto" if device == "cuda" else None,
)
lora_config = LoraConfig(
r=config.get("lora_r", 8),
lora_alpha=config.get("lora_alpha", 16),
lora_dropout=config.get("lora_dropout", 0.05),
target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],
task_type="CAUSAL_LM",
)
vlm = get_peft_model(vlm, lora_config)
autopilot = AutopilotModel(vlm, config).to(device)
checkpoint = torch.load(model_path, map_location=device, weights_only=False)
state_dict = checkpoint["model_state"] if "model_state" in checkpoint else checkpoint
autopilot.load_state_dict(state_dict)
print("Модель готова ")
return cls(autopilot, processor, config, device)
@torch.no_grad()
def predict(self, image: Image.Image) -> Dict[str, float]:
image = image.convert("RGB").resize(
(self.image_size, self.image_size),
Image.LANCZOS,
)
prompt = "Analyze the driving scene."
messages = [{"role": "user", "content": [
{"type": "image", "image": image},
{"type": "text", "text": prompt},
]}]
text = self.processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=False
)
inputs = self.processor(
text=[text],
images=[image],
return_tensors="pt",
padding=True,
truncation=True,
max_length=256,
)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
with torch.amp.autocast("cuda", dtype=torch.bfloat16 if self.device == "cuda" else torch.float32):
predictions = self.model(**inputs)
preds = predictions.cpu().float().numpy()[0]
speed = float(preds[0] * self.stats["speed_std"] + self.stats["speed_mean"])
steering = float(preds[1] * self.stats["steer_std"] + self.stats["steer_mean"])
return {
"speed_kmh": round(speed, 1),
"steering_N": round(steering, 3),
}