| import os
|
| import json
|
| import torch
|
| import torch.nn as nn
|
| from PIL import Image
|
| from typing import Dict
|
| from huggingface_hub import hf_hub_download
|
| from transformers import AutoProcessor, Qwen3VLForConditionalGeneration
|
| from peft import LoraConfig, get_peft_model
|
|
|
|
|
| class RegressionHead(nn.Module):
|
| def __init__(self, input_dim: int, hidden_dim: int = 512,
|
| num_layers: int = 3, dropout: float = 0.15):
|
| super().__init__()
|
|
|
| layers = []
|
| current_dim = input_dim
|
|
|
| for i in range(num_layers):
|
| out_dim = hidden_dim
|
| layers.append(nn.Linear(current_dim, out_dim))
|
| if i < num_layers - 1:
|
| layers.append(nn.LayerNorm(out_dim))
|
| layers.append(nn.GELU())
|
| layers.append(nn.Dropout(dropout))
|
| current_dim = out_dim
|
|
|
| self.backbone = nn.Sequential(*layers)
|
| self.speed_head = nn.Linear(hidden_dim, 1)
|
| self.steer_head = nn.Linear(hidden_dim, 1)
|
|
|
| def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
|
| pooled = hidden_states.mean(dim=1)
|
| features = self.backbone(pooled)
|
| speed = self.speed_head(features)
|
| steer = self.steer_head(features)
|
| return torch.cat([speed, steer], dim=-1)
|
|
|
|
|
| class AutopilotModel(nn.Module):
|
| def __init__(self, vlm_model, config: dict):
|
| super().__init__()
|
| self.vlm = vlm_model
|
| self.config = config
|
|
|
| hidden_size = self._get_hidden_size(vlm_model)
|
|
|
| self.regression_head = RegressionHead(
|
| input_dim=hidden_size,
|
| hidden_dim=config.get("hidden_dim", 512),
|
| num_layers=config.get("num_regression_layers", 3),
|
| dropout=config.get("dropout", 0.15),
|
| )
|
|
|
| @staticmethod
|
| def _get_hidden_size(model) -> int:
|
| cfg = model.config
|
| if hasattr(cfg, "hidden_size"): return cfg.hidden_size
|
| if hasattr(cfg, "text_config") and hasattr(cfg.text_config, "hidden_size"): return cfg.text_config.hidden_size
|
| return 2048
|
|
|
| def forward(self, **kwargs):
|
| outputs = self.vlm(
|
| **kwargs,
|
| output_hidden_states=True,
|
| return_dict=True,
|
| )
|
| last_hidden = outputs.hidden_states[-1]
|
| predictions = self.regression_head(last_hidden.float())
|
| return predictions
|
|
|
|
|
| class AutopilotInference:
|
| def __init__(self, model, processor, config: dict, device: str):
|
| self.model = model
|
| self.processor = processor
|
| self.config = config
|
| self.device = device
|
|
|
| self.stats = config.get("stats", {
|
| "speed_mean": 77.0, "speed_std": 31.5,
|
| "steer_mean": 0.22, "steer_std": 1.26,
|
| })
|
| self.image_size = config.get("image_size", 384)
|
| self.model.eval()
|
|
|
| @classmethod
|
| def from_pretrained(
|
| cls,
|
| repo_id: str,
|
| device: str = "cuda" if torch.cuda.is_available() else "cpu",
|
| dtype: torch.dtype = torch.bfloat16 if torch.cuda.is_available() else torch.float32,
|
| ) -> "AutopilotInference":
|
| print(f"Загрузка конфигурации")
|
| config_path = hf_hub_download(repo_id, "config.json")
|
| model_path = hf_hub_download(repo_id, "model.pt")
|
|
|
| with open(config_path) as f:
|
| config = json.load(f)
|
|
|
| base_model = config.get("base_model", "Qwen/Qwen3-VL-2B-Instruct")
|
|
|
| print(f"Инициализация процессора и базовой модели")
|
| processor = AutoProcessor.from_pretrained(base_model)
|
| vlm = Qwen3VLForConditionalGeneration.from_pretrained(
|
| base_model,
|
| torch_dtype=dtype,
|
| device_map="auto" if device == "cuda" else None,
|
| )
|
|
|
| lora_config = LoraConfig(
|
| r=config.get("lora_r", 8),
|
| lora_alpha=config.get("lora_alpha", 16),
|
| lora_dropout=config.get("lora_dropout", 0.05),
|
| target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],
|
| task_type="CAUSAL_LM",
|
| )
|
| vlm = get_peft_model(vlm, lora_config)
|
|
|
| autopilot = AutopilotModel(vlm, config).to(device)
|
| checkpoint = torch.load(model_path, map_location=device, weights_only=False)
|
|
|
| state_dict = checkpoint["model_state"] if "model_state" in checkpoint else checkpoint
|
| autopilot.load_state_dict(state_dict)
|
|
|
| print("Модель готова ")
|
| return cls(autopilot, processor, config, device)
|
|
|
| @torch.no_grad()
|
| def predict(self, image: Image.Image) -> Dict[str, float]:
|
| image = image.convert("RGB").resize(
|
| (self.image_size, self.image_size),
|
| Image.LANCZOS,
|
| )
|
|
|
| prompt = "Analyze the driving scene."
|
| messages = [{"role": "user", "content": [
|
| {"type": "image", "image": image},
|
| {"type": "text", "text": prompt},
|
| ]}]
|
|
|
| text = self.processor.apply_chat_template(
|
| messages, tokenize=False, add_generation_prompt=False
|
| )
|
|
|
| inputs = self.processor(
|
| text=[text],
|
| images=[image],
|
| return_tensors="pt",
|
| padding=True,
|
| truncation=True,
|
| max_length=256,
|
| )
|
|
|
| inputs = {k: v.to(self.device) for k, v in inputs.items()}
|
|
|
| with torch.amp.autocast("cuda", dtype=torch.bfloat16 if self.device == "cuda" else torch.float32):
|
| predictions = self.model(**inputs)
|
|
|
| preds = predictions.cpu().float().numpy()[0]
|
|
|
| speed = float(preds[0] * self.stats["speed_std"] + self.stats["speed_mean"])
|
| steering = float(preds[1] * self.stats["steer_std"] + self.stats["steer_mean"])
|
|
|
| return {
|
| "speed_kmh": round(speed, 1),
|
| "steering_N": round(steering, 3),
|
| } |