File size: 6,138 Bytes
043a495 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | """
Predictive Engine — anticipates agent's next steps and pre-computes.
Ultra-lightweight strategy prediction using pattern matching and
past execution history. No ML framework needed — uses compressed
execution signatures and similarity matching.
"""
import os
import json
import time
import hashlib
from typing import Optional
from dataclasses import dataclass, field
_MAX_SIGNATURES = int(os.getenv("ADAM_PREDICTIVE_SIGNATURES", "200"))
@dataclass
class StrategyPrediction:
strategy: str
confidence: float
execution_path: list[str] = field(default_factory=list)
class PredictiveEngine:
"""
Predicts optimal strategies for goals based on past execution patterns.
Maintains a compressed signature store of past goal→strategy mappings.
Uses fast cosine similarity over hashed n-gram features.
"""
def __init__(self, llm_call_fn=None):
self._llm = llm_call_fn
self._signatures: dict[str, dict] = {}
self._precomputed: dict[str, str] = {}
async def predict_strategy(self, goal: str) -> Optional[StrategyPrediction]:
"""Predict the best strategy for a goal based on past executions."""
goal_sig = self._compute_signature(goal)
# Fast exact match
if goal_sig in self._signatures:
entry = self._signatures[goal_sig]
return StrategyPrediction(
strategy=entry["strategy"],
confidence=0.95,
execution_path=entry.get("path", []),
)
# Similarity match
best_sim = 0.0
best_entry = None
for sig, entry in self._signatures.items():
sim = self._jaccard_similarity(goal_sig, sig)
if sim > best_sim:
best_sim = sim
best_entry = entry
if best_sim > 0.6 and best_entry:
return StrategyPrediction(
strategy=best_entry["strategy"],
confidence=best_sim,
execution_path=best_entry.get("path", []),
)
# LLM-based prediction for novel goals
if self._llm and self._is_complex_goal(goal):
return await self._llm_predict(goal)
return None
async def _llm_predict(self, goal: str) -> Optional[StrategyPrediction]:
"""Use LLM to predict strategy for novel goals."""
prompt = f"""Predict the optimal execution strategy for this goal.
Choose ONE: code_forge, web_forge, api_forge, knowledge_forge, meta_forge, direct_reply
Goal: {goal}
Context: {self._get_recent_strategies()}
Return JSON: {{"strategy": "...", "confidence": 0.0-1.0, "reasoning": "..."}}
"""
try:
raw = await self._llm(prompt, model_hint="fast", max_tokens=200)
import re, json as j
match = re.search(r'\{[^{}]*\}', raw, re.DOTALL)
if match:
data = j.loads(match.group(0))
return StrategyPrediction(
strategy=data.get("strategy", "code_forge"),
confidence=float(data.get("confidence", 0.5)),
)
except Exception:
pass
return None
async def precompute_next(self, goal: str, completed_ids: set, node_results: dict):
"""Background: precompute likely next nodes while user waits."""
cache_key = f"{hashlib.md5(goal.encode()).hexdigest()}:{len(completed_ids)}"
if cache_key in self._precomputed:
return
if self._llm and len(completed_ids) > 0:
prompt = f"""Goal: {goal}
Completed steps: {len(completed_ids)}
What is the most likely next action needed?
Respond with a single short phrase.
"""
try:
result = await self._llm(prompt, model_hint="fast", max_tokens=50)
self._precomputed[cache_key] = result
except Exception:
pass
def record_strategy(self, goal: str, strategy: str, success: bool, latency_ms: int):
"""Record a strategy execution for future predictions."""
goal_sig = self._compute_signature(goal)
if goal_sig not in self._signatures:
if len(self._signatures) >= _MAX_SIGNATURES:
# Evict oldest
oldest = min(self._signatures.keys(), key=lambda k: self._signatures[k].get("ts", 0))
del self._signatures[oldest]
self._signatures[goal_sig] = {
"strategy": strategy,
"success_count": 0,
"fail_count": 0,
"avg_latency": 0.0,
"path": [],
"ts": time.time(),
}
entry = self._signatures[goal_sig]
entry["success_count"] += 1 if success else 0
entry["fail_count"] += 0 if success else 1
entry["avg_latency"] = (entry["avg_latency"] * 0.7 + latency_ms * 0.3)
entry["ts"] = time.time()
def _compute_signature(self, text: str) -> str:
"""Compute a compressed n-gram signature of the text."""
words = text.lower().split()[:20]
ngrams = set()
for i in range(len(words) - 1):
ngrams.add(f"{words[i]}_{words[i+1]}")
return "|".join(sorted(ngrams)) if ngrams else text[:50]
def _jaccard_similarity(self, sig1: str, sig2: str) -> float:
"""Compute Jaccard similarity between two signatures."""
set1 = set(sig1.split("|"))
set2 = set(sig2.split("|"))
if not set1 and not set2:
return 0.0
intersection = len(set1 & set2)
union = len(set1 | set2)
return intersection / union if union > 0 else 0.0
def _is_complex_goal(self, goal: str) -> bool:
"""Determine if a goal is complex enough to warrant LLM prediction."""
return len(goal) > 30 or any(c in goal for c in ["?", "!", ".", "\n"])
def _get_recent_strategies(self) -> str:
"""Get recently successful strategies for context."""
recent = sorted(self._signatures.values(), key=lambda e: e["ts"], reverse=True)[:5]
return ", ".join(e["strategy"] for e in recent if e["success_count"] > e["fail_count"])
|