|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import os |
| import pathlib |
|
|
| import numpy as np |
| import triton_python_backend_utils as pb_utils |
| from nlpserving.family_history.serving.models.family_history_model import FamilyHistoryClassificationModel |
|
|
|
|
| class TritonPythonModel: |
| def initialize(self, args): |
| """Initialize the model with performance optimizations.""" |
| PATH = os.path.join(pathlib.Path(__file__).parent.resolve(), '../') |
| self.model = FamilyHistoryClassificationModel(model_dir=PATH) |
| |
| |
| self.batch_size = int(os.environ.get('INFERENCE_BATCH_SIZE', 64)) |
| self.max_sequence_length = int( |
| os.environ.get('MAX_SEQUENCE_LENGTH', 512) |
| ) |
| |
| |
| self._empty_response_cache = None |
| |
| |
| try: |
| dummy_input = ['warmup text'] |
| self.model(dummy_input, batch_size=1, top_k=1) |
| except Exception: |
| pass |
|
|
| def execute(self, requests): |
| """Perform optimized inference with adaptive batching.""" |
| if not requests: |
| return [] |
| |
| |
| all_texts = [] |
| request_boundaries = [] |
| current_idx = 0 |
| |
| for request in requests: |
| input_tensors = pb_utils.get_input_tensor_by_name(request, "text") |
| |
| texts = [ |
| tensor.decode('utf-8') for tensor in input_tensors.as_numpy() |
| ] |
| all_texts.extend(texts) |
| request_boundaries.append((current_idx, current_idx + len(texts))) |
| current_idx += len(texts) |
| |
| if not all_texts: |
| return [] |
| |
| |
| total_chars = sum(len(text) for text in all_texts) |
| avg_chars = total_chars / len(all_texts) if all_texts else 0 |
| |
| |
| if avg_chars > 1000: |
| effective_batch_size = min(len(all_texts), self.batch_size // 2) |
| elif avg_chars < 200: |
| effective_batch_size = min(len(all_texts), self.batch_size * 2) |
| else: |
| effective_batch_size = min(len(all_texts), self.batch_size) |
| |
| |
| all_outputs = self.model( |
| all_texts, |
| batch_size=effective_batch_size, |
| top_k=1 |
| ) |
| |
| |
| responses = [] |
| for start_idx, end_idx in request_boundaries: |
| request_outputs = all_outputs[start_idx:end_idx] |
| |
| output = np.array([ |
| str(output_dict).encode('utf-8') |
| for output_dict in request_outputs |
| ], dtype=object) |
|
|
| response = pb_utils.InferenceResponse( |
| output_tensors=[pb_utils.Tensor("output", output)] |
| ) |
| responses.append(response) |
| |
| return responses |
|
|
| def finalize(self): |
| """Clean up model resources.""" |
| if hasattr(self, 'model'): |
| del self.model |
|
|