| """ |
| MINDI 1.5 Vision-Coder β Complete Model |
| |
| Combines MINDIArchitecture (Qwen2.5-Coder + LoRA), VisionEncoder (CLIP ViT-L/14), |
| and VisionLanguageFusion into a single MINDI15 class with forward(), generate(), |
| parse_output(), save(), and load() methods. |
| |
| Uses the MINDI custom tokenizer (data/tokenizer/mindi_tokenizer/) with 22 special |
| tokens for agentic code generation capabilities. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import re |
| from pathlib import Path |
| from typing import Optional |
|
|
| import torch |
| import torch.nn as nn |
| from PIL import Image |
| from transformers import AutoTokenizer, PreTrainedTokenizerFast |
|
|
| from src.model.architecture import MINDIArchitecture |
| from src.model.fusion_layer import VisionLanguageFusion |
| from src.model.vision_encoder import VisionEncoder |
|
|
| |
| MINDI_SECTION_TOKENS: dict[str, tuple[str, str]] = { |
| "thinking": ("<|think_start|>", "<|think_end|>"), |
| "file": ("<|file_start|>", "<|file_end|>"), |
| "code": ("<|code_start|>", "<|code_end|>"), |
| "critique": ("<|critique_start|>", "<|critique_end|>"), |
| "suggest": ("<|suggest_start|>", "<|suggest_end|>"), |
| "search": ("<|search_start|>", "<|search_end|>"), |
| "error": ("<|error_start|>", "<|error_end|>"), |
| "fix": ("<|fix_start|>", "<|fix_end|>"), |
| } |
|
|
| |
| PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent |
| DEFAULT_TOKENIZER_PATH = PROJECT_ROOT / "data" / "tokenizer" / "mindi_tokenizer" |
|
|
|
|
| class MINDI15(nn.Module): |
| """ |
| MINDI 1.5 Vision-Coder β complete multimodal coding model. |
| |
| Components: |
| - architecture: Qwen2.5-Coder-7B-Instruct + LoRA |
| - vision_encoder: CLIP ViT-L/14 (frozen) β 256 tokens Γ 3584 |
| - fusion: Linear + LayerNorm prepend fusion |
| - tokenizer: MINDI custom tokenizer with 22 special tokens |
| """ |
|
|
| def __init__( |
| self, |
| model_name: str = "Qwen/Qwen2.5-Coder-7B-Instruct", |
| clip_model: str = "openai/clip-vit-large-patch14", |
| hidden_size: int = 3584, |
| num_visual_tokens: int = 256, |
| tokenizer_path: Optional[Path] = None, |
| device: Optional[str] = None, |
| torch_dtype: torch.dtype = torch.bfloat16, |
| cache_dir: Optional[Path] = None, |
| ) -> None: |
| """ |
| Initialize MINDI 1.5 with all components. |
| |
| Args: |
| model_name: HuggingFace base LLM identifier. |
| clip_model: HuggingFace CLIP vision model identifier. |
| hidden_size: LLM hidden dimension (must match Qwen config). |
| num_visual_tokens: Number of visual tokens from CLIP (256). |
| tokenizer_path: Path to MINDI custom tokenizer directory. |
| device: Target device ('cuda', 'cpu', or None for auto). |
| torch_dtype: Data type for model weights. |
| cache_dir: Base directory for model weight caches. |
| """ |
| super().__init__() |
| self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") |
| self.hidden_size = hidden_size |
| self.num_visual_tokens = num_visual_tokens |
| self.torch_dtype = torch_dtype |
|
|
| cache_base = Path(cache_dir) if cache_dir else PROJECT_ROOT / "checkpoints" |
|
|
| print("=" * 60) |
| print(" MINDI 1.5 Vision-Coder β Initializing") |
| print("=" * 60) |
|
|
| |
| tok_path = Path(tokenizer_path) if tokenizer_path else DEFAULT_TOKENIZER_PATH |
| print(f"\n[MINDI15] Loading MINDI tokenizer from {tok_path} ...") |
| self.tokenizer: PreTrainedTokenizerFast = AutoTokenizer.from_pretrained( |
| str(tok_path), |
| trust_remote_code=True, |
| ) |
| print(f" Vocab size: {len(self.tokenizer)}") |
|
|
| |
| self.architecture = MINDIArchitecture( |
| model_name=model_name, |
| device=self.device, |
| cache_dir=cache_base / "base", |
| torch_dtype=torch_dtype, |
| ) |
|
|
| |
| self.architecture.resize_embeddings(len(self.tokenizer)) |
|
|
| |
| self.architecture.apply_lora() |
|
|
| |
| self.llm = self.architecture.get_model() |
|
|
| |
| self.vision_encoder = VisionEncoder( |
| model_name=clip_model, |
| llm_hidden_size=hidden_size, |
| device=self.device, |
| cache_dir=cache_base / "vision", |
| ) |
|
|
| |
| self.fusion = VisionLanguageFusion( |
| hidden_size=hidden_size, |
| num_visual_tokens=num_visual_tokens, |
| ) |
| self.fusion.to(self.device) |
|
|
| |
| self._special_ids: dict[str, int] = {} |
| for section, (start_tok, end_tok) in MINDI_SECTION_TOKENS.items(): |
| sid = self.tokenizer.convert_tokens_to_ids(start_tok) |
| eid = self.tokenizer.convert_tokens_to_ids(end_tok) |
| self._special_ids[f"{section}_start"] = sid |
| self._special_ids[f"{section}_end"] = eid |
|
|
| self._print_summary() |
|
|
| def _print_summary(self) -> None: |
| """Print initialization summary.""" |
| llm_info = self.architecture.get_trainable_params() |
| vis_info = { |
| "trainable": sum(p.numel() for p in self.vision_encoder.parameters() if p.requires_grad), |
| "total": sum(p.numel() for p in self.vision_encoder.parameters()), |
| } |
| fus_info = self.fusion.get_trainable_params() |
|
|
| total_trainable = llm_info["trainable"] + vis_info["trainable"] + fus_info["trainable"] |
| total_all = llm_info["total"] + vis_info["total"] + fus_info["total"] |
|
|
| print() |
| print("=" * 60) |
| print(" MINDI 1.5 β Initialization Complete") |
| print("=" * 60) |
| print(f" LLM trainable (LoRA): {llm_info['trainable']:>14,}") |
| print(f" Vision trainable: {vis_info['trainable']:>14,}") |
| print(f" Fusion trainable: {fus_info['trainable']:>14,}") |
| print(f" βββββββββββββββββββββββββββββββββββββ") |
| print(f" Total trainable: {total_trainable:>14,}") |
| print(f" Total params: {total_all:>14,}") |
| print(f" Tokenizer vocab: {len(self.tokenizer):>14,}") |
| print("=" * 60) |
| print() |
|
|
| |
|
|
| def forward( |
| self, |
| input_ids: torch.Tensor, |
| attention_mask: Optional[torch.Tensor] = None, |
| labels: Optional[torch.Tensor] = None, |
| image: Optional[Image.Image] = None, |
| ) -> dict: |
| """ |
| Forward pass with optional vision input. |
| |
| Args: |
| input_ids: Token IDs (batch, seq_len). |
| attention_mask: Attention mask (batch, seq_len). |
| labels: Target token IDs for loss computation (batch, seq_len). |
| image: Optional PIL image for multimodal input. |
| |
| Returns: |
| Dict with 'loss', 'logits', and optionally 'visual_tokens'. |
| """ |
| |
| text_embeds = self.llm.get_input_embeddings()(input_ids) |
|
|
| |
| visual_tokens = None |
| if image is not None: |
| visual_tokens = self.vision_encoder.encode_image(image) |
|
|
| |
| fused_embeds, fused_mask = self.fusion(text_embeds, visual_tokens, attention_mask) |
|
|
| |
| if visual_tokens is not None and labels is not None: |
| batch_size = labels.shape[0] |
| |
| visual_labels = torch.full( |
| (batch_size, self.num_visual_tokens), |
| fill_value=-100, |
| dtype=labels.dtype, |
| device=labels.device, |
| ) |
| labels = torch.cat([visual_labels, labels], dim=1) |
|
|
| |
| outputs = self.llm( |
| inputs_embeds=fused_embeds, |
| attention_mask=fused_mask, |
| labels=labels, |
| ) |
|
|
| result = { |
| "loss": outputs.loss, |
| "logits": outputs.logits, |
| } |
| if visual_tokens is not None: |
| result["visual_tokens"] = visual_tokens |
|
|
| return result |
|
|
| |
|
|
| @torch.no_grad() |
| def generate( |
| self, |
| prompt: str, |
| image: Optional[Image.Image] = None, |
| max_new_tokens: int = 2048, |
| temperature: float = 0.7, |
| top_p: float = 0.9, |
| top_k: int = 50, |
| do_sample: bool = True, |
| repetition_penalty: float = 1.1, |
| ) -> str: |
| """ |
| Generate text from a prompt, optionally conditioned on an image. |
| |
| Uses the MINDI custom tokenizer (with special tokens) for both |
| encoding the prompt and decoding the output. |
| |
| Args: |
| prompt: Input text prompt. |
| image: Optional PIL image for multimodal generation. |
| max_new_tokens: Maximum tokens to generate. |
| temperature: Sampling temperature. |
| top_p: Nucleus sampling threshold. |
| top_k: Top-k sampling threshold. |
| do_sample: Whether to sample (False = greedy). |
| repetition_penalty: Penalty for repeated tokens. |
| |
| Returns: |
| Generated text string (decoded with MINDI tokenizer). |
| """ |
| self.llm.eval() |
|
|
| |
| inputs = self.tokenizer(prompt, return_tensors="pt") |
| input_ids = inputs["input_ids"].to(self.device) |
| attention_mask = inputs["attention_mask"].to(self.device) |
|
|
| |
| if image is not None: |
| text_embeds = self.llm.get_input_embeddings()(input_ids) |
| visual_tokens = self.vision_encoder.encode_image(image) |
| fused_embeds, fused_mask = self.fusion(text_embeds, visual_tokens, attention_mask) |
|
|
| output_ids = self.llm.generate( |
| inputs_embeds=fused_embeds, |
| attention_mask=fused_mask, |
| max_new_tokens=max_new_tokens, |
| temperature=temperature, |
| top_p=top_p, |
| top_k=top_k, |
| do_sample=do_sample, |
| repetition_penalty=repetition_penalty, |
| pad_token_id=self.tokenizer.pad_token_id or self.tokenizer.eos_token_id, |
| ) |
| else: |
| |
| output_ids = self.llm.generate( |
| input_ids=input_ids, |
| attention_mask=attention_mask, |
| max_new_tokens=max_new_tokens, |
| temperature=temperature, |
| top_p=top_p, |
| top_k=top_k, |
| do_sample=do_sample, |
| repetition_penalty=repetition_penalty, |
| pad_token_id=self.tokenizer.pad_token_id or self.tokenizer.eos_token_id, |
| ) |
|
|
| |
| generated_ids = output_ids[:, input_ids.shape[1]:] |
| text = self.tokenizer.decode(generated_ids[0], skip_special_tokens=False) |
| return text.strip() |
|
|
| |
|
|
| @staticmethod |
| def parse_output(text: str) -> dict[str, list[str]]: |
| """ |
| Parse generated text and extract ALL MINDI special-token sections. |
| |
| Extracts content between each pair of special tokens: |
| <|think_start|> ... <|think_end|> β "thinking" |
| <|file_start|> ... <|file_end|> β "file" |
| <|code_start|> ... <|code_end|> β "code" |
| <|critique_start|> ... <|critique_end|> β "critique" |
| <|suggest_start|> ... <|suggest_end|> β "suggest" |
| <|search_start|> ... <|search_end|> β "search" |
| <|error_start|> ... <|error_end|> β "error" |
| <|fix_start|> ... <|fix_end|> β "fix" |
| |
| Each section may appear multiple times; all occurrences are captured. |
| |
| Args: |
| text: Raw generated text potentially containing special tokens. |
| |
| Returns: |
| Dict mapping section name β list of extracted content strings. |
| Empty list if section not found. Also includes "raw" with full text. |
| """ |
| result: dict[str, list[str]] = {"raw": [text]} |
|
|
| for section, (start_tok, end_tok) in MINDI_SECTION_TOKENS.items(): |
| |
| pattern = re.escape(start_tok) + r"(.*?)" + re.escape(end_tok) |
| matches = re.findall(pattern, text, flags=re.DOTALL) |
| result[section] = [m.strip() for m in matches] |
|
|
| return result |
|
|
| |
|
|
| def set_trainable_components( |
| self, |
| lora: bool = False, |
| vision_projection: bool = False, |
| fusion: bool = False, |
| ) -> dict[str, int]: |
| """ |
| Enable/disable training for specific components. |
| |
| Used by the trainer to implement 3-phase training: |
| Phase 1: lora=True, vision_projection=False, fusion=False |
| Phase 2: lora=False, vision_projection=True, fusion=True |
| Phase 3: lora=True, vision_projection=True, fusion=True |
| |
| Args: |
| lora: Whether LoRA adapter parameters should be trainable. |
| vision_projection: Whether the vision projection layer should train. |
| fusion: Whether the fusion layer should be trainable. |
| |
| Returns: |
| Dict with trainable param counts per component. |
| """ |
| counts = {} |
|
|
| |
| peft_model = self.architecture.peft_model |
| if peft_model is not None: |
| for name, param in peft_model.named_parameters(): |
| if "lora_" in name: |
| param.requires_grad = lora |
| counts["lora"] = sum( |
| p.numel() for n, p in (peft_model or self.architecture.model).named_parameters() |
| if "lora_" in n and p.requires_grad |
| ) |
|
|
| |
| for param in self.vision_encoder.projection.parameters(): |
| param.requires_grad = vision_projection |
| counts["vision_projection"] = sum( |
| p.numel() for p in self.vision_encoder.projection.parameters() if p.requires_grad |
| ) |
|
|
| |
| for param in self.fusion.parameters(): |
| param.requires_grad = fusion |
| counts["fusion"] = sum( |
| p.numel() for p in self.fusion.parameters() if p.requires_grad |
| ) |
|
|
| counts["total_trainable"] = counts["lora"] + counts["vision_projection"] + counts["fusion"] |
|
|
| print(f"[MINDI15] Trainable: LoRA={counts['lora']:,} | " |
| f"VisionProj={counts['vision_projection']:,} | " |
| f"Fusion={counts['fusion']:,} | " |
| f"Total={counts['total_trainable']:,}") |
|
|
| return counts |
|
|
| |
|
|
| def save(self, save_dir: Optional[Path] = None) -> Path: |
| """ |
| Save all trainable weights (LoRA + vision projection + fusion). |
| |
| Args: |
| save_dir: Root directory for saving. Defaults to checkpoints/mindi15. |
| |
| Returns: |
| Path to save directory. |
| """ |
| save_path = Path(save_dir) if save_dir else PROJECT_ROOT / "checkpoints" / "mindi15" |
| save_path.mkdir(parents=True, exist_ok=True) |
|
|
| |
| self.architecture.save_lora(save_path / "lora") |
|
|
| |
| self.vision_encoder.save_projection(save_path / "vision") |
|
|
| |
| fusion_path = save_path / "fusion" |
| fusion_path.mkdir(parents=True, exist_ok=True) |
| torch.save(self.fusion.state_dict(), fusion_path / "fusion.pt") |
|
|
| print(f"[MINDI15] All weights saved to {save_path}") |
| return save_path |
|
|
| def load(self, load_dir: Path) -> None: |
| """ |
| Load all trainable weights (LoRA + vision projection + fusion). |
| |
| Args: |
| load_dir: Root directory containing saved weights. |
| """ |
| load_path = Path(load_dir) |
| if not load_path.exists(): |
| raise FileNotFoundError(f"Checkpoint not found: {load_path}") |
|
|
| |
| lora_path = load_path / "lora" |
| if lora_path.exists(): |
| self.architecture.load_lora(lora_path) |
|
|
| |
| vision_path = load_path / "vision" |
| if vision_path.exists(): |
| self.vision_encoder.load_projection(vision_path) |
|
|
| |
| fusion_file = load_path / "fusion" / "fusion.pt" |
| if fusion_file.exists(): |
| state_dict = torch.load(fusion_file, map_location=self.device, weights_only=True) |
| self.fusion.load_state_dict(state_dict) |
| print(f"[MINDI15] Fusion loaded from {fusion_file.parent}") |
|
|
| print(f"[MINDI15] All weights loaded from {load_path}") |
|
|
| |
|
|
| def get_all_trainable_params(self) -> dict: |
| """Get combined trainable parameter counts across all components.""" |
| llm = self.architecture.get_trainable_params() |
| vis_trainable = sum( |
| p.numel() for p in self.vision_encoder.parameters() if p.requires_grad |
| ) |
| fus = self.fusion.get_trainable_params() |
|
|
| total_trainable = llm["trainable"] + vis_trainable + fus["trainable"] |
| total_all = llm["total"] + sum(p.numel() for p in self.vision_encoder.parameters()) + fus["total"] |
|
|
| return { |
| "llm_trainable": llm["trainable"], |
| "llm_total": llm["total"], |
| "vision_trainable": vis_trainable, |
| "fusion_trainable": fus["trainable"], |
| "total_trainable": total_trainable, |
| "total_params": total_all, |
| "trainable_pct": round(100.0 * total_trainable / total_all, 4) if total_all > 0 else 0.0, |
| } |
|
|
| def print_info(self) -> None: |
| """Print complete model information.""" |
| self.architecture.print_model_info() |
| info = self.get_all_trainable_params() |
| print(" MINDI 1.5 Combined Trainable Parameters:") |
| print(f" LLM (LoRA): {info['llm_trainable']:>14,}") |
| print(f" Vision proj: {info['vision_trainable']:>14,}") |
| print(f" Fusion: {info['fusion_trainable']:>14,}") |
| print(f" Total trainable: {info['total_trainable']:>14,}") |
| print(f" Total params: {info['total_params']:>14,}") |
| print(f" Trainable %: {info['trainable_pct']:>13.2f}%") |
| print() |
|
|
|
|
| |
| if __name__ == "__main__": |
| print("=" * 60) |
| print(" MINDI 1.5 β Complete Model Test") |
| print("=" * 60) |
| print() |
|
|
| |
| print(" Test 1: parse_output()") |
| sample_output = ( |
| "<|think_start|>The user wants a Python function.<|think_end|>" |
| "<|file_start|>main.py<|file_end|>" |
| "<|code_start|>def hello():\n print('Hello MINDI!')<|code_end|>" |
| "<|critique_start|>Missing type hints and docstring.<|critique_end|>" |
| "<|suggest_start|>Add return type annotation.<|suggest_end|>" |
| "<|search_start|>python type hints best practices<|search_end|>" |
| "<|error_start|>NameError: name 'x' is not defined<|error_end|>" |
| "<|fix_start|>Add x = 0 before the loop.<|fix_end|>" |
| "<|think_start|>Let me also add error handling.<|think_end|>" |
| ) |
|
|
| parsed = MINDI15.parse_output(sample_output) |
|
|
| assert len(parsed["thinking"]) == 2, f"Expected 2 thinking sections, got {len(parsed['thinking'])}" |
| assert parsed["thinking"][0] == "The user wants a Python function." |
| assert parsed["thinking"][1] == "Let me also add error handling." |
| assert parsed["file"] == ["main.py"] |
| assert parsed["code"] == ["def hello():\n print('Hello MINDI!')"] |
| assert parsed["critique"] == ["Missing type hints and docstring."] |
| assert parsed["suggest"] == ["Add return type annotation."] |
| assert parsed["search"] == ["python type hints best practices"] |
| assert parsed["error"] == ["NameError: name 'x' is not defined"] |
| assert parsed["fix"] == ["Add x = 0 before the loop."] |
| assert "raw" in parsed |
| print(" All 8 section types extracted correctly β") |
| print(f" Sections found: {[k for k, v in parsed.items() if k != 'raw' and v]}") |
|
|
| |
| print("\n Test 2: parse_output() with partial output") |
| partial = "<|code_start|>print('hi')<|code_end|>" |
| parsed2 = MINDI15.parse_output(partial) |
| assert parsed2["code"] == ["print('hi')"] |
| assert parsed2["thinking"] == [] |
| assert parsed2["file"] == [] |
| assert parsed2["fix"] == [] |
| print(" Missing sections return empty lists β") |
|
|
| |
| print("\n Test 3: parse_output() with empty string") |
| parsed3 = MINDI15.parse_output("") |
| assert all(v == [] for k, v in parsed3.items() if k != "raw") |
| print(" Empty input returns all empty lists β") |
|
|
| |
| print("\n Test 4: Token coverage") |
| expected_sections = {"thinking", "file", "code", "critique", "suggest", "search", "error", "fix"} |
| assert set(MINDI_SECTION_TOKENS.keys()) == expected_sections |
| print(f" All 8 sections defined: {sorted(expected_sections)} β") |
|
|
| |
| if torch.cuda.is_available(): |
| print("\n Test 5: Full model initialization (GPU)") |
| model = MINDI15() |
| model.print_info() |
|
|
| |
| print("\n Test 6: Phase 1 β LoRA only") |
| counts = model.set_trainable_components(lora=True, vision_projection=False, fusion=False) |
| assert counts["lora"] > 0 |
| assert counts["vision_projection"] == 0 |
| assert counts["fusion"] == 0 |
|
|
| |
| print("\n Test 7: Phase 2 β Vision bridge only") |
| counts = model.set_trainable_components(lora=False, vision_projection=True, fusion=True) |
| assert counts["lora"] == 0 |
| assert counts["vision_projection"] > 0 |
| assert counts["fusion"] > 0 |
|
|
| |
| print("\n Test 8: Phase 3 β All trainable") |
| counts = model.set_trainable_components(lora=True, vision_projection=True, fusion=True) |
| assert counts["lora"] > 0 |
| assert counts["vision_projection"] > 0 |
| assert counts["fusion"] > 0 |
|
|
| |
| print("\n Test 9: Forward pass (text only)") |
| tokens = model.tokenizer("Hello MINDI!", return_tensors="pt") |
| input_ids = tokens["input_ids"].to(model.device) |
| attn_mask = tokens["attention_mask"].to(model.device) |
| result = model(input_ids=input_ids, attention_mask=attn_mask, labels=input_ids) |
| assert result["loss"] is not None |
| print(f" Loss: {result['loss'].item():.4f}") |
| print(f" Logits: {result['logits'].shape}") |
|
|
| |
| print("\n Test 10: Forward pass (with dummy image)") |
| dummy_img = Image.new("RGB", (224, 224), color=(100, 150, 200)) |
| result_v = model(input_ids=input_ids, attention_mask=attn_mask, labels=input_ids, image=dummy_img) |
| assert result_v["loss"] is not None |
| assert "visual_tokens" in result_v |
| print(f" Loss: {result_v['loss'].item():.4f}") |
| print(f" Visual tokens: {result_v['visual_tokens'].shape}") |
|
|
| |
| print("\n Test 11: Generate (text only, short)") |
| output = model.generate("Write a hello world in Python:", max_new_tokens=50) |
| print(f" Output: {output[:100]}...") |
|
|
| print("\n Test 12: Save/load round-trip") |
| import tempfile |
| with tempfile.TemporaryDirectory() as tmp: |
| model.save(Path(tmp)) |
| |
| assert (Path(tmp) / "lora").exists() |
| assert (Path(tmp) / "vision" / "projection.pt").exists() |
| assert (Path(tmp) / "fusion" / "fusion.pt").exists() |
| print(" Save β") |
| else: |
| print("\n [SKIP] GPU tests (no CUDA available)") |
| print(" Tests 5-12 require GPU with ~20GB VRAM") |
|
|
| print("\n β All MINDI 1.5 model tests passed!") |
| print("=" * 60) |
|
|