File size: 8,836 Bytes
0a20b7b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
#!/usr/bin/env python
"""
GGUF LoRA Runtime for ContinuumAgent Project
Integrates LoRA patches with llama-cpp-python GGUF models
Modified for better CPU compatibility
"""
import os
import json
import time
from typing import List, Dict, Any, Optional, Union
from llama_cpp import Llama
from runtime.lora_mux import LoraMux
class GGUFLoraRuntime:
"""Runtime for applying LoRA patches to GGUF models"""
def __init__(self,
model_path: str,
registry_dir: str = "models/registry",
n_gpu_layers: int = 0, # Force CPU-only by default
n_ctx: int = 1024, # Reduced context size for better memory usage
verbose: bool = False):
"""
Initialize the GGUF LoRA runtime
Args:
model_path: Path to GGUF model file
registry_dir: Path to LoRA registry directory
n_gpu_layers: Number of layers to offload to GPU (0 for CPU-only)
n_ctx: Context size
verbose: Enable verbose output
"""
self.model_path = model_path
self.registry_dir = registry_dir
# Get n_gpu_layers from environment variable if set
env_n_gpu_layers = os.environ.get("N_GPU_LAYERS")
if env_n_gpu_layers is not None:
self.n_gpu_layers = int(env_n_gpu_layers)
else:
self.n_gpu_layers = n_gpu_layers
self.n_ctx = n_ctx
self.verbose = verbose
# Initialize LoraMux
self.lora_mux = LoraMux(registry_dir=registry_dir)
# Loaded adapters
self.loaded_adapters = []
# Model instance
self.model = None
# Initialize model with no adapters
try:
self._load_base_model()
except Exception as e:
print(f"Error loading base model: {e}")
print("Continuing with model as None - this will cause failures later but allows initialization")
def _load_base_model(self) -> None:
"""Load base GGUF model"""
print(f"Loading base GGUF model from {self.model_path}...")
try:
# Additional parameters for better CPU performance
self.model = Llama(
model_path=self.model_path,
n_gpu_layers=self.n_gpu_layers,
n_ctx=self.n_ctx,
verbose=self.verbose,
seed=42, # Set seed for reproducibility
n_threads=4, # Use 4 threads for CPU
n_batch=512 # Smaller batch size for CPU
)
print("Base model loaded successfully")
except Exception as e:
print(f"Error loading base model: {e}")
raise
def load_adapters(self, date_str: Optional[str] = None) -> List[str]:
"""
Load LoRA adapters for a specific date
Args:
date_str: Date string in YYYYMMDD format (defaults to today)
Returns:
List of loaded adapter paths
"""
# Get patches for date
patch_paths = self.lora_mux.load_patches(date_str)
if not patch_paths:
print("No adapters available to load")
return []
# Reset loaded adapters
self.loaded_adapters = []
for patch_path in patch_paths:
try:
# Load adapter
adapter_path = os.path.join(patch_path, "adapter_model.bin")
# NOTE: This is a hypothetical implementation, as llama-cpp-python
# doesn't currently support dynamically loading LoRA adapters.
# In a real implementation, we would need to use a custom build or extension.
# self.model.load_adapter(adapter_path)
print(f"Loaded adapter from {adapter_path}")
self.loaded_adapters.append(patch_path)
except Exception as e:
print(f"Error loading adapter from {patch_path}: {e}")
print(f"Loaded {len(self.loaded_adapters)} adapters")
return self.loaded_adapters
def complete(self,
prompt: str,
max_tokens: int = 256,
temperature: float = 0.7,
top_p: float = 0.95,
with_adapters: bool = True) -> Dict[str, Any]:
"""
Generate completion with model
Args:
prompt: Input prompt
max_tokens: Maximum tokens to generate
temperature: Sampling temperature
top_p: Top-p sampling parameter
with_adapters: Whether to use loaded adapters
Returns:
Completion result
"""
# Check if model is loaded
if self.model is None:
return {
"text": "[Error: Model not loaded]",
"elapsed_seconds": 0.0,
"with_adapters": with_adapters,
"adapters_used": []
}
# Check if adapters are loaded
if with_adapters and not self.loaded_adapters:
print("No adapters loaded, loading latest adapters...")
self.load_adapters()
# Generate completion
start_time = time.time()
try:
# NOTE: In a real implementation, this would need to configure
# the model to use/not use adapters based on with_adapters.
completion = self.model.create_completion(
prompt=prompt,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stop=["</s>"] # Stop at end of sequence token
)
output_text = completion.get("choices", [{}])[0].get("text", "")
except Exception as e:
print(f"Error generating completion: {e}")
output_text = f"[Error generating text: {str(e)}]"
elapsed = time.time() - start_time
# Format result
result = {
"text": output_text,
"elapsed_seconds": elapsed,
"with_adapters": with_adapters,
"adapters_used": self.loaded_adapters if with_adapters else []
}
return result
def generate(self,
prompt: str,
system_prompt: Optional[str] = None,
max_tokens: int = 256,
temperature: float = 0.7,
top_p: float = 0.95,
with_adapters: bool = True) -> Dict[str, Any]:
"""
Generate response with Mistral chat format
Args:
prompt: User prompt
system_prompt: Optional system prompt
max_tokens: Maximum tokens to generate
temperature: Sampling temperature
top_p: Top-p sampling parameter
with_adapters: Whether to use loaded adapters
Returns:
Generation result
"""
# Format prompt with Mistral chat template
if system_prompt:
formatted_prompt = f"<s>[INST] {system_prompt} [/INST]</s>[INST] {prompt} [/INST]"
else:
formatted_prompt = f"<s>[INST] {prompt} [/INST]"
# Generate completion
result = self.complete(
prompt=formatted_prompt,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
with_adapters=with_adapters
)
return result
def main():
"""Test GGUF LoRA runtime"""
# Find model path
model_dir = "models/slow"
model_files = [f for f in os.listdir(model_dir) if f.endswith(".gguf")]
if not model_files:
print(f"No GGUF models found in {model_dir}")
return
model_path = os.path.join(model_dir, model_files[0])
print(f"Using model: {model_path}")
# Initialize runtime with forced CPU mode
runtime = GGUFLoraRuntime(
model_path=model_path,
n_gpu_layers=0, # CPU only
n_ctx=1024 # Reduced context
)
# Test simple completion
print("Testing simple completion...")
result = runtime.complete(
prompt="Hello, world!",
max_tokens=20
)
print(f"Completion: {result['text']}")
print(f"Elapsed: {result['elapsed_seconds']:.2f}s")
if __name__ == "__main__":
main() |