File size: 4,788 Bytes
4c195e0
f2d1187
4c195e0
 
 
 
 
 
 
 
 
 
 
 
 
f2d1187
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4c195e0
 
 
 
 
 
 
 
 
f2d1187
4c195e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f2d1187
 
4c195e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import time
from typing import Any, Dict

import torch
from peft import PeftConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer

from infer_local import (
    build_instruction_prompt,
    build_structured_result,
    has_adapter_weights,
    has_full_model_weights,
)


DEFAULT_BASE_MODEL = "Qwen/Qwen2.5-Coder-0.5B-Instruct"


def as_bool(value: Any) -> bool:
    if isinstance(value, bool):
        return value
    if isinstance(value, str):
        return value.strip().lower() in {"1", "true", "yes", "y", "on"}
    return bool(value)


def clamp_int(value: Any, default: int, minimum: int, maximum: int) -> int:
    try:
        parsed = int(value)
    except (TypeError, ValueError):
        parsed = default
    return max(minimum, min(maximum, parsed))


class EndpointHandler:
    def __init__(self, path: str = ""):
        self.path = path or "."
        adapter_config_path = f"{self.path}/adapter_config.json"
        adapter_weights_present = has_adapter_weights(self.path)
        full_model_weights_present = has_full_model_weights(self.path)

        if adapter_weights_present:
            peft_config = PeftConfig.from_pretrained(self.path)
            base_model_name = peft_config.base_model_name_or_path or DEFAULT_BASE_MODEL
            self.tokenizer = AutoTokenizer.from_pretrained(base_model_name)
            base_model = AutoModelForCausalLM.from_pretrained(
                base_model_name,
                torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
            )
            self.model = PeftModel.from_pretrained(base_model, self.path)
        elif full_model_weights_present:
            self.tokenizer = AutoTokenizer.from_pretrained(self.path)
            self.model = AutoModelForCausalLM.from_pretrained(
                self.path,
                torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
            )
        else:
            raise RuntimeError(
                f"No adapter or full-model weights found at endpoint model path: {self.path}"
            )

        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token

        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model.to(self.device)
        self.model.eval()
        self.model.generation_config.do_sample = False
        self.model.generation_config.temperature = 1.0
        self.model.generation_config.top_p = 1.0
        self.model.generation_config.top_k = 50

    def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]:
        user_prompt = data.get("inputs", data.get("prompt", ""))
        if isinstance(user_prompt, list):
            user_prompt = user_prompt[0] if user_prompt else ""
        user_prompt = str(user_prompt).strip()
        if not user_prompt:
            return {
                "error": "Missing prompt. Send {'inputs': 'your coding prompt'}."
            }

        parameters = data.get("parameters", {}) or {}
        max_new_tokens = clamp_int(parameters.get("max_new_tokens"), 320, 1, 1024)
        do_sample = as_bool(parameters.get("do_sample", False))

        prompt_text = build_instruction_prompt(user_prompt)
        inputs = self.tokenizer(prompt_text, return_tensors="pt").to(self.device)

        generation_kwargs = {
            "max_new_tokens": max_new_tokens,
            "output_scores": True,
            "return_dict_in_generate": True,
            "do_sample": do_sample,
            "pad_token_id": self.tokenizer.eos_token_id,
        }
        if do_sample:
            generation_kwargs["temperature"] = float(parameters.get("temperature", 0.25))
            generation_kwargs["top_p"] = float(parameters.get("top_p", 0.9))

        started_at = time.perf_counter()
        with torch.no_grad():
            generated = self.model.generate(**inputs, **generation_kwargs)
        latency_ms = int((time.perf_counter() - started_at) * 1000)

        output_ids = generated.sequences[0]
        prompt_len = inputs["input_ids"].shape[1]
        generated_ids = output_ids[prompt_len:].tolist()
        generated_text = self.tokenizer.decode(
            generated_ids,
            skip_special_tokens=True,
        ).strip()

        token_confidences = []
        if generated.scores:
            for token_id, score_tensor in zip(generated_ids, generated.scores):
                probs = torch.softmax(score_tensor[0], dim=-1)
                token_confidences.append(float(probs[token_id].item()))

        return build_structured_result(
            user_prompt,
            generated_text,
            latency_ms,
            tokenizer=self.tokenizer,
            generated_ids=generated_ids,
            token_confidences=token_confidences,
        )