|
|
|
|
|
"""
|
|
|
GGUF LoRA Runtime for ContinuumAgent Project
|
|
|
Integrates LoRA patches with llama-cpp-python GGUF models
|
|
|
Modified for better CPU compatibility
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import json
|
|
|
import time
|
|
|
from typing import List, Dict, Any, Optional, Union
|
|
|
from llama_cpp import Llama
|
|
|
from runtime.lora_mux import LoraMux
|
|
|
|
|
|
class GGUFLoraRuntime:
|
|
|
"""Runtime for applying LoRA patches to GGUF models"""
|
|
|
|
|
|
def __init__(self,
|
|
|
model_path: str,
|
|
|
registry_dir: str = "models/registry",
|
|
|
n_gpu_layers: int = 0,
|
|
|
n_ctx: int = 1024,
|
|
|
verbose: bool = False):
|
|
|
"""
|
|
|
Initialize the GGUF LoRA runtime
|
|
|
|
|
|
Args:
|
|
|
model_path: Path to GGUF model file
|
|
|
registry_dir: Path to LoRA registry directory
|
|
|
n_gpu_layers: Number of layers to offload to GPU (0 for CPU-only)
|
|
|
n_ctx: Context size
|
|
|
verbose: Enable verbose output
|
|
|
"""
|
|
|
self.model_path = model_path
|
|
|
self.registry_dir = registry_dir
|
|
|
|
|
|
|
|
|
env_n_gpu_layers = os.environ.get("N_GPU_LAYERS")
|
|
|
if env_n_gpu_layers is not None:
|
|
|
self.n_gpu_layers = int(env_n_gpu_layers)
|
|
|
else:
|
|
|
self.n_gpu_layers = n_gpu_layers
|
|
|
|
|
|
self.n_ctx = n_ctx
|
|
|
self.verbose = verbose
|
|
|
|
|
|
|
|
|
self.lora_mux = LoraMux(registry_dir=registry_dir)
|
|
|
|
|
|
|
|
|
self.loaded_adapters = []
|
|
|
|
|
|
|
|
|
self.model = None
|
|
|
|
|
|
|
|
|
try:
|
|
|
self._load_base_model()
|
|
|
except Exception as e:
|
|
|
print(f"Error loading base model: {e}")
|
|
|
print("Continuing with model as None - this will cause failures later but allows initialization")
|
|
|
|
|
|
def _load_base_model(self) -> None:
|
|
|
"""Load base GGUF model"""
|
|
|
print(f"Loading base GGUF model from {self.model_path}...")
|
|
|
|
|
|
try:
|
|
|
|
|
|
self.model = Llama(
|
|
|
model_path=self.model_path,
|
|
|
n_gpu_layers=self.n_gpu_layers,
|
|
|
n_ctx=self.n_ctx,
|
|
|
verbose=self.verbose,
|
|
|
seed=42,
|
|
|
n_threads=4,
|
|
|
n_batch=512
|
|
|
)
|
|
|
print("Base model loaded successfully")
|
|
|
except Exception as e:
|
|
|
print(f"Error loading base model: {e}")
|
|
|
raise
|
|
|
|
|
|
def load_adapters(self, date_str: Optional[str] = None) -> List[str]:
|
|
|
"""
|
|
|
Load LoRA adapters for a specific date
|
|
|
|
|
|
Args:
|
|
|
date_str: Date string in YYYYMMDD format (defaults to today)
|
|
|
|
|
|
Returns:
|
|
|
List of loaded adapter paths
|
|
|
"""
|
|
|
|
|
|
patch_paths = self.lora_mux.load_patches(date_str)
|
|
|
|
|
|
if not patch_paths:
|
|
|
print("No adapters available to load")
|
|
|
return []
|
|
|
|
|
|
|
|
|
self.loaded_adapters = []
|
|
|
|
|
|
for patch_path in patch_paths:
|
|
|
try:
|
|
|
|
|
|
adapter_path = os.path.join(patch_path, "adapter_model.bin")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"Loaded adapter from {adapter_path}")
|
|
|
self.loaded_adapters.append(patch_path)
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error loading adapter from {patch_path}: {e}")
|
|
|
|
|
|
print(f"Loaded {len(self.loaded_adapters)} adapters")
|
|
|
return self.loaded_adapters
|
|
|
|
|
|
def complete(self,
|
|
|
prompt: str,
|
|
|
max_tokens: int = 256,
|
|
|
temperature: float = 0.7,
|
|
|
top_p: float = 0.95,
|
|
|
with_adapters: bool = True) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Generate completion with model
|
|
|
|
|
|
Args:
|
|
|
prompt: Input prompt
|
|
|
max_tokens: Maximum tokens to generate
|
|
|
temperature: Sampling temperature
|
|
|
top_p: Top-p sampling parameter
|
|
|
with_adapters: Whether to use loaded adapters
|
|
|
|
|
|
Returns:
|
|
|
Completion result
|
|
|
"""
|
|
|
|
|
|
if self.model is None:
|
|
|
return {
|
|
|
"text": "[Error: Model not loaded]",
|
|
|
"elapsed_seconds": 0.0,
|
|
|
"with_adapters": with_adapters,
|
|
|
"adapters_used": []
|
|
|
}
|
|
|
|
|
|
|
|
|
if with_adapters and not self.loaded_adapters:
|
|
|
print("No adapters loaded, loading latest adapters...")
|
|
|
self.load_adapters()
|
|
|
|
|
|
|
|
|
start_time = time.time()
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
|
completion = self.model.create_completion(
|
|
|
prompt=prompt,
|
|
|
max_tokens=max_tokens,
|
|
|
temperature=temperature,
|
|
|
top_p=top_p,
|
|
|
stop=["</s>"]
|
|
|
)
|
|
|
|
|
|
output_text = completion.get("choices", [{}])[0].get("text", "")
|
|
|
except Exception as e:
|
|
|
print(f"Error generating completion: {e}")
|
|
|
output_text = f"[Error generating text: {str(e)}]"
|
|
|
|
|
|
elapsed = time.time() - start_time
|
|
|
|
|
|
|
|
|
result = {
|
|
|
"text": output_text,
|
|
|
"elapsed_seconds": elapsed,
|
|
|
"with_adapters": with_adapters,
|
|
|
"adapters_used": self.loaded_adapters if with_adapters else []
|
|
|
}
|
|
|
|
|
|
return result
|
|
|
|
|
|
def generate(self,
|
|
|
prompt: str,
|
|
|
system_prompt: Optional[str] = None,
|
|
|
max_tokens: int = 256,
|
|
|
temperature: float = 0.7,
|
|
|
top_p: float = 0.95,
|
|
|
with_adapters: bool = True) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Generate response with Mistral chat format
|
|
|
|
|
|
Args:
|
|
|
prompt: User prompt
|
|
|
system_prompt: Optional system prompt
|
|
|
max_tokens: Maximum tokens to generate
|
|
|
temperature: Sampling temperature
|
|
|
top_p: Top-p sampling parameter
|
|
|
with_adapters: Whether to use loaded adapters
|
|
|
|
|
|
Returns:
|
|
|
Generation result
|
|
|
"""
|
|
|
|
|
|
if system_prompt:
|
|
|
formatted_prompt = f"<s>[INST] {system_prompt} [/INST]</s>[INST] {prompt} [/INST]"
|
|
|
else:
|
|
|
formatted_prompt = f"<s>[INST] {prompt} [/INST]"
|
|
|
|
|
|
|
|
|
result = self.complete(
|
|
|
prompt=formatted_prompt,
|
|
|
max_tokens=max_tokens,
|
|
|
temperature=temperature,
|
|
|
top_p=top_p,
|
|
|
with_adapters=with_adapters
|
|
|
)
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
def main():
|
|
|
"""Test GGUF LoRA runtime"""
|
|
|
|
|
|
model_dir = "models/slow"
|
|
|
model_files = [f for f in os.listdir(model_dir) if f.endswith(".gguf")]
|
|
|
|
|
|
if not model_files:
|
|
|
print(f"No GGUF models found in {model_dir}")
|
|
|
return
|
|
|
|
|
|
model_path = os.path.join(model_dir, model_files[0])
|
|
|
print(f"Using model: {model_path}")
|
|
|
|
|
|
|
|
|
runtime = GGUFLoraRuntime(
|
|
|
model_path=model_path,
|
|
|
n_gpu_layers=0,
|
|
|
n_ctx=1024
|
|
|
)
|
|
|
|
|
|
|
|
|
print("Testing simple completion...")
|
|
|
result = runtime.complete(
|
|
|
prompt="Hello, world!",
|
|
|
max_tokens=20
|
|
|
)
|
|
|
|
|
|
print(f"Completion: {result['text']}")
|
|
|
print(f"Elapsed: {result['elapsed_seconds']:.2f}s")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main() |