File size: 11,443 Bytes
8838cce |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 |
#!/usr/bin/env python
"""
Difficulty Gate for ContinuumAgent Project
Smart routing system to determine whether to use patches based on query complexity
"""
import os
import json
from typing import Dict, Any, List, Optional, Tuple
import numpy as np
from llama_cpp import Llama
class DifficultyGate:
"""
Smart routing system to determine whether to use patches based on query complexity
Uses a simple heuristic approach for initial implementation, can be replaced with a learned classifier
"""
def __init__(self,
model_path: str,
gate_threshold: float = 0.7,
cache_dir: str = "models/gates",
n_gpu_layers: int = 0):
"""
Initialize the difficulty gate
Args:
model_path: Path to GGUF model file
gate_threshold: Threshold for routing to patched model (0.0-1.0)
cache_dir: Directory for caching gate decisions
n_gpu_layers: Number of layers to offload to GPU
"""
self.model_path = model_path
self.gate_threshold = gate_threshold
self.cache_dir = cache_dir
self.n_gpu_layers = n_gpu_layers
# Create cache directory if it doesn't exist
os.makedirs(cache_dir, exist_ok=True)
# Cache file path
self.cache_file = os.path.join(cache_dir, "gate_cache.json")
# Load cache
self.decision_cache = self._load_cache()
# Initialize gate model (small context for efficiency)
self._init_gate_model()
def _init_gate_model(self) -> None:
"""Initialize small gate model"""
try:
print(f"Loading gate model from {self.model_path}...")
self.gate_model = Llama(
model_path=self.model_path,
n_gpu_layers=self.n_gpu_layers,
n_ctx=512 # Small context for efficiency
)
except Exception as e:
print(f"Error loading gate model: {e}")
self.gate_model = None
def _load_cache(self) -> Dict[str, Any]:
"""
Load decision cache from file
Returns:
Cache dictionary
"""
if os.path.exists(self.cache_file):
try:
with open(self.cache_file, "r") as f:
cache = json.load(f)
print(f"Loaded {len(cache.get('queries', []))} cached gate decisions")
return cache
except Exception as e:
print(f"Error loading cache: {e}")
# Return empty cache
return {"queries": {}}
def _save_cache(self) -> None:
"""Save decision cache to file"""
try:
with open(self.cache_file, "w") as f:
json.dump(self.decision_cache, f, indent=2)
except Exception as e:
print(f"Error saving cache: {e}")
def _query_hash(self, query: str) -> str:
"""
Create simple hash for query caching
Args:
query: Query string
Returns:
Query hash
"""
# Simple hash method, can be improved
import hashlib
return hashlib.md5(query.strip().lower().encode()).hexdigest()
def _heuristic_features(self, query: str) -> Dict[str, float]:
"""
Extract heuristic features from query
Args:
query: Query string
Returns:
Dictionary of feature values
"""
# Lowercase query for consistent processing
query_lower = query.lower()
# Feature 1: Query length
length = len(query)
norm_length = min(1.0, length / 200.0) # Normalize to 0-1 (capped at 200 chars)
# Feature 2: Presence of factual question indicators
factual_indicators = [
"what is", "when did", "where is", "who is",
"which", "how many", "list the", "tell me about",
"explain", "define"
]
has_factual = any(indicator in query_lower for indicator in factual_indicators)
# Feature 3: Presence of time indicators (recency)
time_indicators = [
"recent", "latest", "current", "today", "now",
"this week", "this month", "this year",
"2023", "2024", "2025" # Add current years
]
has_time = any(indicator in query_lower for indicator in time_indicators)
# Feature 4: Entity recognition (simplified)
# Check for capitalized terms that may indicate named entities
words = query.split()
capitalized_words = [w for w in words if w[0:1].isupper()]
entity_ratio = len(capitalized_words) / max(1, len(words))
# Feature 5: Question complexity based on interrogative words
complex_indicators = [
"why", "how does", "explain", "compare", "contrast",
"what if", "analyze", "evaluate", "synthesize"
]
complexity_score = sum(indicator in query_lower for indicator in complex_indicators) / 3.0
complexity_score = min(1.0, complexity_score)
# Return features
return {
"length": norm_length,
"has_factual": float(has_factual),
"has_time": float(has_time),
"entity_ratio": entity_ratio,
"complexity": complexity_score
}
def _heuristic_decision(self, features: Dict[str, float]) -> Tuple[bool, float]:
"""
Make decision based on heuristic features
Args:
features: Feature dictionary
Returns:
Tuple of (needs_patches, confidence)
"""
# Weights for different features
weights = {
"length": 0.1,
"has_factual": 0.3,
"has_time": 0.4, # Highest weight for time indicators
"entity_ratio": 0.1,
"complexity": -0.1 # Negative weight - complex reasoning queries may not need patches
}
# Calculate weighted score
score = sum(features[f] * weights[f] for f in features)
# Normalize to 0-1 range
score = max(0.0, min(1.0, score))
# Decision based on threshold
needs_patches = score >= self.gate_threshold
return needs_patches, score
def _model_decision(self, query: str) -> Tuple[bool, float]:
"""
Ask the model to decide if the query needs up-to-date knowledge
Args:
query: Query string
Returns:
Tuple of (needs_patches, confidence)
"""
if not self.gate_model:
# Fall back to heuristic if model not available
features = self._heuristic_features(query)
return self._heuristic_decision(features)
# Prompt for model
prompt = f"""<s>[INST] Please analyze this question and determine if it requires the most up-to-date knowledge to answer correctly.
Respond with only a single word: 'YES' if up-to-date knowledge is needed, or 'NO' if it can be answered with general knowledge.
Question: "{query}"
Requires up-to-date knowledge? [/INST]"""
# Generate completion
completion = self.gate_model.create_completion(
prompt=prompt,
max_tokens=5,
temperature=0.1, # Low temperature for consistent results
stop=["</s>", "\n"]
)
# Extract response
response_text = completion.get("choices", [{}])[0].get("text", "").strip().upper()
# Calculate confidence from logprobs if available
confidence = 0.7 # Default confidence
# Decision based on response
needs_patches = "YES" in response_text
return needs_patches, confidence
def should_use_patches(self, query: str, use_model: bool = True) -> Dict[str, Any]:
"""
Determine if the query requires up-to-date knowledge patches
Args:
query: Query string
use_model: Whether to use model for decision (vs pure heuristics)
Returns:
Decision dictionary with keys:
- needs_patches: Boolean decision
- confidence: Confidence score (0.0-1.0)
- method: Decision method used
- features: Feature values if heuristic method used
"""
# Check cache first
query_hash = self._query_hash(query)
if query_hash in self.decision_cache.get("queries", {}):
cached = self.decision_cache["queries"][query_hash]
cached["from_cache"] = True
return cached
# Extract features
features = self._heuristic_features(query)
# Make decision
if use_model and self.gate_model:
needs_patches, confidence = self._model_decision(query)
method = "model"
else:
needs_patches, confidence = self._heuristic_decision(features)
method = "heuristic"
# Create decision
decision = {
"needs_patches": needs_patches,
"confidence": confidence,
"method": method,
"features": features,
"from_cache": False
}
# Cache decision
self.decision_cache.setdefault("queries", {})[query_hash] = decision
self._save_cache()
return decision
def main():
"""Test difficulty gate"""
# Find model path
model_dir = "models/slow"
model_files = [f for f in os.listdir(model_dir) if f.endswith(".gguf")]
if not model_files:
print(f"No GGUF models found in {model_dir}")
return
model_path = os.path.join(model_dir, model_files[0])
print(f"Using model: {model_path}")
# Initialize gate
gate = DifficultyGate(model_path=model_path)
# Test queries
test_queries = [
"What is the capital of France?",
"Who is the current president of the United States?",
"Explain the theory of relativity",
"What are the latest developments in the conflict in Ukraine?",
"Who won the most recent Super Bowl?",
"How do I write a for loop in Python?"
]
for query in test_queries:
# Test heuristic decision
decision = gate.should_use_patches(query, use_model=False)
print(f"\nQuery: {query}")
print(f"Heuristic Decision: {decision['needs_patches']} (Confidence: {decision['confidence']:.2f})")
print(f"Features: {decision['features']}")
# Test model decision if model is available
if gate.gate_model:
decision = gate.should_use_patches(query, use_model=True)
print(f"Model Decision: {decision['needs_patches']} (Confidence: {decision['confidence']:.2f})")
if __name__ == "__main__":
main() |