File size: 3,575 Bytes
75c5414
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""MiniCPM4.1-8B text-only planner — lazy singleton."""
from __future__ import annotations

import logging
import os
from typing import Any, Optional, Tuple

import torch

from src import config

log = logging.getLogger(__name__)

_model: Any = None
_tokenizer: Any = None


def get_planner() -> Tuple[Optional[Any], Optional[Any]]:
    """Return (model, tokenizer).  Loads once; returns (None, None) on failure."""
    global _model, _tokenizer
    if _model is not None:
        return _model, _tokenizer

    # Prefer fine-tuned repo when available
    model_id = config.PLANNER_FINETUNED_REPO or config.PLANNER_REPO
    try:
        # MiniCPM4.1 custom code imports is_torch_fx_available, which was
        # removed in transformers 5.x. Patch it back before loading.
        import transformers.utils.import_utils as _iutils
        if not hasattr(_iutils, "is_torch_fx_available"):
            def _is_torch_fx_available():
                try:
                    import torch.fx  # noqa: F401
                    return True
                except ImportError:
                    return False
            _iutils.is_torch_fx_available = _is_torch_fx_available

        from transformers import AutoModelForCausalLM, AutoTokenizer

        device_map = "auto" if os.environ.get("SPACE_ID") else (
            "cuda" if torch.cuda.is_available() else "cpu"
        )
        log.info("Loading planner model %s (device_map=%s)...", model_id, device_map)
        _tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
        _model = AutoModelForCausalLM.from_pretrained(
            model_id,
            torch_dtype=torch.bfloat16,
            trust_remote_code=True,
            device_map=device_map,
        ).eval()
        log.info("Planner model ready.")
    except Exception as exc:
        log.error("Could not load planner model '%s': %s", model_id, exc)
        _model = None
        _tokenizer = None

    return _model, _tokenizer


def infer(prompt: str, max_new_tokens: int = 1024, temperature: float = 0.0) -> str:
    """Run text inference with the planner model.

    Returns empty string if the model is unavailable.
    """
    model, tokenizer = get_planner()
    if model is None or tokenizer is None:
        return ""

    try:
        messages = [{"role": "user", "content": prompt}]

        # return_dict=True yields a BatchEncoding (dict-like) with input_ids +
        # attention_mask. NOTE: BatchEncoding is NOT a `dict` instance, so we
        # must access it via mapping keys, never via tensor attrs like .shape.
        enc = tokenizer.apply_chat_template(
            messages,
            add_generation_prompt=True,
            tokenize=True,
            return_tensors="pt",
            return_dict=True,
        )
        input_ids = enc["input_ids"].to(model.device)
        input_len = input_ids.shape[1]

        gen_inputs = {"input_ids": input_ids}
        attn = enc.get("attention_mask")
        if attn is not None:
            gen_inputs["attention_mask"] = attn.to(model.device)

        gen_kwargs: dict = dict(max_new_tokens=max_new_tokens, do_sample=False)
        if temperature > 0:
            gen_kwargs.update(do_sample=True, temperature=temperature, top_p=0.95)

        with torch.no_grad():
            output = model.generate(**gen_inputs, **gen_kwargs)

        token_ids = output[0][input_len:]
        return tokenizer.decode(token_ids, skip_special_tokens=True)

    except Exception as exc:
        log.error("Planner inference error: %r", exc, exc_info=True)
        return ""