| import os |
| import json |
| import torch |
| from transformers import pipeline, AutoTokenizer, AutoConfig |
| from typing import Dict, List, Any, Optional, Union |
|
|
| class EndpointHandler: |
| def __init__(self, path=""): |
| |
| self.model_path = path if path else os.environ.get("MODEL_PATH", "") |
| |
| |
| try: |
| config = AutoConfig.from_pretrained(self.model_path) |
| |
| |
| if hasattr(config, "rope_scaling") and "short_factor" in config.rope_scaling: |
| short_factor = config.rope_scaling["short_factor"] |
| if len(short_factor) == 48: |
| print("Fixing rope_scaling short_factor length from 48 to 64") |
| |
| padded_short_factor = list(short_factor) + [0.0] * (64 - len(short_factor)) |
| config.rope_scaling["short_factor"] = padded_short_factor |
| |
| |
| config.save_pretrained(self.model_path) |
| print("Fixed config saved") |
| except Exception as e: |
| print(f"Warning: Could not fix RoPE scaling configuration: {str(e)}") |
| |
| |
| self.tokenizer = AutoTokenizer.from_pretrained(self.model_path) |
| |
| |
| self.pipe = pipeline( |
| "text-generation", |
| model=self.model_path, |
| tokenizer=self.tokenizer, |
| torch_dtype=torch.float16, |
| device_map="auto", |
| return_full_text=False |
| ) |
| |
| def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]: |
| """Handle inference request in OpenAI-like format""" |
| try: |
| |
| inputs = self._parse_input(data) |
| |
| |
| outputs = self._generate(inputs) |
| |
| |
| return self._format_response(outputs, inputs) |
| except Exception as e: |
| return { |
| "error": { |
| "message": str(e), |
| "type": "invalid_request_error", |
| "code": 400 |
| } |
| } |
| |
| def _parse_input(self, data: Dict[str, Any]) -> Dict[str, Any]: |
| """Parse input data to extract generation parameters""" |
| |
| messages = data.get("messages", []) |
| if not messages: |
| raise ValueError("No messages provided") |
| |
| |
| prompt = self._convert_messages_to_prompt(messages) |
| |
| |
| generation_params = { |
| "max_tokens": data.get("max_tokens", 256), |
| "temperature": data.get("temperature", 0.7), |
| "top_p": data.get("top_p", 1.0), |
| "n": data.get("n", 1), |
| "stream": data.get("stream", False), |
| "stop": data.get("stop", None), |
| "presence_penalty": data.get("presence_penalty", 0.0), |
| "frequency_penalty": data.get("frequency_penalty", 0.0), |
| } |
| |
| return { |
| "prompt": prompt, |
| "messages": messages, |
| "generation_params": generation_params |
| } |
| |
| def _convert_messages_to_prompt(self, messages: List[Dict[str, str]]) -> str: |
| """Convert list of messages to a prompt string""" |
| prompt = "" |
| for message in messages: |
| role = message.get("role", "") |
| content = message.get("content", "") |
| |
| if role == "system": |
| prompt += f"System: {content}\n\n" |
| elif role == "user": |
| prompt += f"User: {content}\n\n" |
| elif role == "assistant": |
| prompt += f"Assistant: {content}\n\n" |
| |
| |
| prompt += "Assistant: " |
| return prompt |
| |
| def _generate(self, inputs: Dict[str, Any]) -> Dict[str, Any]: |
| """Generate response using the pipeline""" |
| prompt = inputs["prompt"] |
| params = inputs["generation_params"] |
| |
| |
| input_tokens = len(self.tokenizer.encode(prompt)) |
| |
| |
| generation_kwargs = { |
| "max_new_tokens": params["max_tokens"], |
| "temperature": params["temperature"], |
| "top_p": params["top_p"], |
| "num_return_sequences": params["n"], |
| "do_sample": params["temperature"] > 0, |
| } |
| |
| |
| if params["stop"]: |
| generation_kwargs["stopping_criteria"] = params["stop"] |
| |
| |
| pipeline_outputs = self.pipe( |
| prompt, |
| **generation_kwargs |
| ) |
| |
| |
| generated_texts = [] |
| for output in pipeline_outputs: |
| gen_text = output["generated_text"] |
| |
| |
| if params["stop"]: |
| for stop in params["stop"]: |
| if stop in gen_text: |
| gen_text = gen_text[:gen_text.find(stop)] |
| |
| generated_texts.append(gen_text) |
| |
| |
| completion_tokens = [len(self.tokenizer.encode(text)) for text in generated_texts] |
| |
| return { |
| "generated_texts": generated_texts, |
| "prompt_tokens": input_tokens, |
| "completion_tokens": completion_tokens, |
| } |
| |
| def _format_response(self, outputs: Dict[str, Any], inputs: Dict[str, Any]) -> Dict[str, Any]: |
| """Format response in OpenAI-like format""" |
| generated_texts = outputs["generated_texts"] |
| prompt_tokens = outputs["prompt_tokens"] |
| completion_tokens = outputs["completion_tokens"] |
| |
| choices = [] |
| for i, text in enumerate(generated_texts): |
| choices.append({ |
| "index": i, |
| "message": { |
| "role": "assistant", |
| "content": text |
| }, |
| "finish_reason": "stop" |
| }) |
| |
| return { |
| "id": f"cmpl-{hash(inputs['prompt']) % 10000}", |
| "object": "chat.completion", |
| "created": int(torch.cuda.current_device()) if torch.cuda.is_available() else 0, |
| "model": os.path.basename(self.model_path), |
| "choices": choices, |
| "usage": { |
| "prompt_tokens": prompt_tokens, |
| "completion_tokens": sum(completion_tokens), |
| "total_tokens": prompt_tokens + sum(completion_tokens) |
| } |
| } |