|
|
from jinja2 import Template |
|
|
from json import load |
|
|
from os import listdir |
|
|
from os.path import getsize |
|
|
from requests import request, RequestException |
|
|
from subprocess import Popen, PIPE, run |
|
|
from threading import Lock |
|
|
import typing as t |
|
|
|
|
|
|
|
|
__all__ = [ |
|
|
'LLaMaCPP', |
|
|
'LLMS', |
|
|
] |
|
|
|
|
|
|
|
|
with open('/opt/llms/index.json', 'r') as _f: |
|
|
LLMS = load(_f) |
|
|
|
|
|
|
|
|
class LLaMaCPP: |
|
|
|
|
|
def __init__(self): |
|
|
self._model_name = None |
|
|
self._process = None |
|
|
self._readers = 0 |
|
|
self._read_lock = Lock() |
|
|
self._write_lock = Lock() |
|
|
|
|
|
def _add_reader(self): |
|
|
with self._read_lock: |
|
|
self._readers += 1 |
|
|
if self._readers == 1: |
|
|
self._write_lock.acquire() |
|
|
|
|
|
def _remove_reader(self): |
|
|
with self._read_lock: |
|
|
self._readers -= 1 |
|
|
if self._readers == 0: |
|
|
self._write_lock.release() |
|
|
|
|
|
def set_model(self, model_name: str) -> None: |
|
|
if model_name not in self.list_available_models(): |
|
|
raise Exception(f"Model {model_name} not found") |
|
|
with self._write_lock: |
|
|
self._model_name = model_name |
|
|
|
|
|
def load_model(self, print_log: bool = False, seed: int = None, threads: int = None, kv_cache_type: t.Optional[t.Literal['f16', 'bf16', 'q8_0', 'q5_0', 'q4_0']] = None, context: int = None, temperature: float = None, top_p: float = None, top_k: int = None, min_p: float = None) -> None: |
|
|
if self.process_is_alive(): |
|
|
raise Exception("A model is already loaded. Use stop() before loading a new model.") |
|
|
if self._model_name is None: |
|
|
raise Exception("Model not set") |
|
|
short_name = self.short_model_name(self._model_name) |
|
|
if short_name is None: |
|
|
raise Exception(f"Model {self._model_name} not found") |
|
|
if seed is None: |
|
|
seed = -1 |
|
|
if threads is None: |
|
|
threads = 16 |
|
|
if kv_cache_type is None: |
|
|
kv_cache_type = 'q8_0' |
|
|
context = min_none(context, LLMS[short_name]['context']) |
|
|
if temperature is None: |
|
|
temperature = LLMS[short_name]['sampling']['temperature'] |
|
|
if top_p is None: |
|
|
top_p = LLMS[short_name]['sampling']['top_p'] |
|
|
if top_k is None: |
|
|
top_k = LLMS[short_name]['sampling']['top_k'] |
|
|
if min_p is None: |
|
|
min_p = LLMS[short_name]['sampling']['min_p'] |
|
|
with self._write_lock: |
|
|
offload_layers = calculate_offload_layers(self._model_name, short_name) |
|
|
print(f"Loading model {self._model_name} with {offload_layers} layers offloaded") |
|
|
command = [ |
|
|
'/opt/llama.cpp/bin/llama-server', |
|
|
'--threads', str(threads), |
|
|
'--ctx-size', str(context), |
|
|
'--flash-attn', |
|
|
'--no-escape', |
|
|
'--cache-type-k', kv_cache_type, |
|
|
'--cache-type-v', kv_cache_type, |
|
|
'--batch-size', '32', |
|
|
'--ubatch-size', '16', |
|
|
'--mlock', |
|
|
'--n-gpu-layers', str(offload_layers), |
|
|
'--model', f'/opt/llms/{self._model_name}', |
|
|
'--seed', str(seed), |
|
|
'--temp', str(temperature), |
|
|
'--top-k', str(top_k), |
|
|
'--top-p', str(top_p), |
|
|
'--min-p', str(min_p), |
|
|
'--host', '127.0.0.1', |
|
|
'--port', '8432', |
|
|
'--alias', short_name, |
|
|
] |
|
|
if print_log: |
|
|
stdout = None |
|
|
stderr = None |
|
|
else: |
|
|
stdout = PIPE |
|
|
stderr = PIPE |
|
|
self._process = Popen(command, stdout=stdout, stderr=stderr, text=True) |
|
|
return None |
|
|
|
|
|
def apply_chat_template(self, conversation: t.List[t.Dict[str, str]], enable_thinking: bool = False) -> str: |
|
|
short_name = self.short_model_name(self._model_name) |
|
|
chat_template: str = LLMS[short_name]['chat_template'] |
|
|
template = Template(chat_template) |
|
|
options: t.Dict[str, t.Any] = { |
|
|
'messages': conversation, |
|
|
'tools': [], |
|
|
'add_generation_prompt': True, |
|
|
'enable_thinking': False, |
|
|
} |
|
|
if LLMS[short_name]['thinking']: |
|
|
if LLMS[short_name]['optional_thinking']: |
|
|
options['enable_thinking'] = enable_thinking |
|
|
else: |
|
|
options['enable_thinking'] = True |
|
|
else: |
|
|
options['enable_thinking'] = False |
|
|
return template.render(**options) |
|
|
|
|
|
def generate(self, prompt: t.Union[str, t.List[t.Dict[str, str]]], enable_thinking: bool = False, temperature: float = None, top_k: int = None, top_p: float = None, min_p: float = None, n_predict: int = None, grammar: str = None, seed: int = None) -> str: |
|
|
if isinstance(prompt, list): |
|
|
prompt = self.apply_chat_template(prompt, enable_thinking) |
|
|
json_data: t.Dict[str, t.Any] = { |
|
|
'prompt': prompt, |
|
|
} |
|
|
if temperature is not None: |
|
|
json_data['temperature'] = temperature |
|
|
if top_k is not None: |
|
|
json_data['top_k'] = top_k |
|
|
if top_p is not None: |
|
|
json_data['top_p'] = top_p |
|
|
if min_p is not None: |
|
|
json_data['min_p'] = min_p |
|
|
if n_predict is not None: |
|
|
json_data['n_predict'] = n_predict |
|
|
if grammar is not None: |
|
|
json_data['grammar'] = grammar |
|
|
if seed is not None: |
|
|
json_data['seed'] = seed |
|
|
self._add_reader() |
|
|
try: |
|
|
req = request('POST', 'http://127.0.0.1:8432/completion', json=json_data) |
|
|
if req.status_code != 200: |
|
|
raise Exception(req.text) |
|
|
json_return = req.json() |
|
|
return json_return['content'] |
|
|
finally: |
|
|
self._remove_reader() |
|
|
|
|
|
def process_is_alive(self) -> bool: |
|
|
self._add_reader() |
|
|
try: |
|
|
if self._process is None: |
|
|
return False |
|
|
return self._process.poll() is None |
|
|
finally: |
|
|
self._remove_reader() |
|
|
|
|
|
def is_loading(self) -> bool: |
|
|
self._add_reader() |
|
|
try: |
|
|
req = request('GET', 'http://127.0.0.1:8432/health') |
|
|
return req.status_code == 503 |
|
|
except RequestException: |
|
|
return False |
|
|
finally: |
|
|
self._remove_reader() |
|
|
|
|
|
def is_running(self) -> bool: |
|
|
self._add_reader() |
|
|
try: |
|
|
req = request('GET', 'http://127.0.0.1:8432/health') |
|
|
return req.status_code == 200 |
|
|
except RequestException: |
|
|
return False |
|
|
finally: |
|
|
self._remove_reader() |
|
|
|
|
|
def has_error(self) -> bool: |
|
|
self._add_reader() |
|
|
try: |
|
|
req = request('GET', 'http://127.0.0.1:8432/health') |
|
|
return req.status_code not in [200, 503] |
|
|
except RequestException: |
|
|
return True |
|
|
finally: |
|
|
self._remove_reader() |
|
|
|
|
|
def stop(self) -> None: |
|
|
with self._write_lock: |
|
|
if self._process is None: |
|
|
return None |
|
|
self._process.terminate() |
|
|
return None |
|
|
|
|
|
def kill(self): |
|
|
with self._write_lock: |
|
|
if self._process is None: |
|
|
return None |
|
|
self._process.kill() |
|
|
return None |
|
|
|
|
|
def get_system_message(self) -> t.List[t.Dict[str, str]]: |
|
|
short_name = self.short_model_name(self._model_name) |
|
|
system_message = LLMS[short_name]['system_message'] |
|
|
if system_message == '': |
|
|
return [] |
|
|
return [{'role': 'system', 'content': system_message}] |
|
|
|
|
|
@staticmethod |
|
|
def list_available_models() -> t.List[str]: |
|
|
directory_list = listdir('/opt/llms/') |
|
|
model_list = [] |
|
|
for entry in directory_list: |
|
|
if entry.endswith('.gguf') and LLaMaCPP.short_model_name(entry) is not None: |
|
|
model_list.append(entry) |
|
|
return model_list |
|
|
|
|
|
@staticmethod |
|
|
def short_model_name(model_name: str) -> t.Optional[str]: |
|
|
for model in sorted(LLMS.keys(), key=lambda x: len(x) , reverse=True): |
|
|
if model_name.startswith(model): |
|
|
return model |
|
|
return None |
|
|
|
|
|
|
|
|
def min_none(a: t.Any, b: t.Any) -> t.Any: |
|
|
""" |
|
|
Returns the minimum of two values, or the single value if one of them is None. |
|
|
|
|
|
:param a: First value |
|
|
:param b: Second value |
|
|
:return: The minimum of a and b, or a/b if one of them is None |
|
|
""" |
|
|
if a is None: |
|
|
return b |
|
|
if b is None: |
|
|
return a |
|
|
return min(a, b) |
|
|
|
|
|
|
|
|
def calculate_offload_layers(model_name: str, short_model_name: str) -> int: |
|
|
""" |
|
|
Calculates the number of layers to offload |
|
|
|
|
|
:param model_name: The name of the model |
|
|
:param short_model_name: The short name of the model |
|
|
:return: The number of layers to offload |
|
|
""" |
|
|
free_vram = check_free_vram() |
|
|
llm_size = getsize(f"/opt/llms/{model_name}") / (1024 ** 2) |
|
|
llm_size = llm_size * 1.1 |
|
|
layers = LLMS[short_model_name]['layers'] |
|
|
vram_per_layer = llm_size / layers |
|
|
return min(int(free_vram / vram_per_layer), layers) |
|
|
|
|
|
|
|
|
def check_free_vram() -> int: |
|
|
""" |
|
|
Checks the amount of free VRAM on the GPU |
|
|
|
|
|
:return: The amount of free VRAM in MB |
|
|
""" |
|
|
nvidia_smi = run(['nvidia-smi', '--query-gpu=memory.free', '--format=csv,nounits,noheader'], stdout=PIPE, text=True) |
|
|
if nvidia_smi.returncode != 0: |
|
|
raise Exception(nvidia_smi.stderr) |
|
|
return int(nvidia_smi.stdout) |
|
|
|