granite-code-3b / shared /agent /core /predictive_engine.py
AjinkyaPagare's picture
ADAM v2.0: Advanced Agentic Mesh — DAG orchestrator, cognition, knowledge web, forge tools, runtime optimization
ba2ada2
"""
Predictive Engine — anticipates agent's next steps and pre-computes.
Ultra-lightweight strategy prediction using pattern matching and
past execution history. No ML framework needed — uses compressed
execution signatures and similarity matching.
"""
import os
import json
import time
import hashlib
from typing import Optional
from dataclasses import dataclass, field
_MAX_SIGNATURES = int(os.getenv("ADAM_PREDICTIVE_SIGNATURES", "200"))
@dataclass
class StrategyPrediction:
strategy: str
confidence: float
execution_path: list[str] = field(default_factory=list)
class PredictiveEngine:
"""
Predicts optimal strategies for goals based on past execution patterns.
Maintains a compressed signature store of past goal→strategy mappings.
Uses fast cosine similarity over hashed n-gram features.
"""
def __init__(self, llm_call_fn=None):
self._llm = llm_call_fn
self._signatures: dict[str, dict] = {}
self._precomputed: dict[str, str] = {}
async def predict_strategy(self, goal: str) -> Optional[StrategyPrediction]:
"""Predict the best strategy for a goal based on past executions."""
goal_sig = self._compute_signature(goal)
# Fast exact match
if goal_sig in self._signatures:
entry = self._signatures[goal_sig]
return StrategyPrediction(
strategy=entry["strategy"],
confidence=0.95,
execution_path=entry.get("path", []),
)
# Similarity match
best_sim = 0.0
best_entry = None
for sig, entry in self._signatures.items():
sim = self._jaccard_similarity(goal_sig, sig)
if sim > best_sim:
best_sim = sim
best_entry = entry
if best_sim > 0.6 and best_entry:
return StrategyPrediction(
strategy=best_entry["strategy"],
confidence=best_sim,
execution_path=best_entry.get("path", []),
)
# LLM-based prediction for novel goals
if self._llm and self._is_complex_goal(goal):
return await self._llm_predict(goal)
return None
async def _llm_predict(self, goal: str) -> Optional[StrategyPrediction]:
"""Use LLM to predict strategy for novel goals."""
prompt = f"""Predict the optimal execution strategy for this goal.
Choose ONE: code_forge, web_forge, api_forge, knowledge_forge, meta_forge, direct_reply
Goal: {goal}
Context: {self._get_recent_strategies()}
Return JSON: {{"strategy": "...", "confidence": 0.0-1.0, "reasoning": "..."}}
"""
try:
raw = await self._llm(prompt, model_hint="fast", max_tokens=200)
import re, json as j
match = re.search(r'\{[^{}]*\}', raw, re.DOTALL)
if match:
data = j.loads(match.group(0))
return StrategyPrediction(
strategy=data.get("strategy", "code_forge"),
confidence=float(data.get("confidence", 0.5)),
)
except Exception:
pass
return None
async def precompute_next(self, goal: str, completed_ids: set, node_results: dict):
"""Background: precompute likely next nodes while user waits."""
cache_key = f"{hashlib.md5(goal.encode()).hexdigest()}:{len(completed_ids)}"
if cache_key in self._precomputed:
return
if self._llm and len(completed_ids) > 0:
prompt = f"""Goal: {goal}
Completed steps: {len(completed_ids)}
What is the most likely next action needed?
Respond with a single short phrase.
"""
try:
result = await self._llm(prompt, model_hint="fast", max_tokens=50)
self._precomputed[cache_key] = result
except Exception:
pass
def record_strategy(self, goal: str, strategy: str, success: bool, latency_ms: int):
"""Record a strategy execution for future predictions."""
goal_sig = self._compute_signature(goal)
if goal_sig not in self._signatures:
if len(self._signatures) >= _MAX_SIGNATURES:
# Evict oldest
oldest = min(self._signatures.keys(), key=lambda k: self._signatures[k].get("ts", 0))
del self._signatures[oldest]
self._signatures[goal_sig] = {
"strategy": strategy,
"success_count": 0,
"fail_count": 0,
"avg_latency": 0.0,
"path": [],
"ts": time.time(),
}
entry = self._signatures[goal_sig]
entry["success_count"] += 1 if success else 0
entry["fail_count"] += 0 if success else 1
entry["avg_latency"] = (entry["avg_latency"] * 0.7 + latency_ms * 0.3)
entry["ts"] = time.time()
def _compute_signature(self, text: str) -> str:
"""Compute a compressed n-gram signature of the text."""
words = text.lower().split()[:20]
ngrams = set()
for i in range(len(words) - 1):
ngrams.add(f"{words[i]}_{words[i+1]}")
return "|".join(sorted(ngrams)) if ngrams else text[:50]
def _jaccard_similarity(self, sig1: str, sig2: str) -> float:
"""Compute Jaccard similarity between two signatures."""
set1 = set(sig1.split("|"))
set2 = set(sig2.split("|"))
if not set1 and not set2:
return 0.0
intersection = len(set1 & set2)
union = len(set1 | set2)
return intersection / union if union > 0 else 0.0
def _is_complex_goal(self, goal: str) -> bool:
"""Determine if a goal is complex enough to warrant LLM prediction."""
return len(goal) > 30 or any(c in goal for c in ["?", "!", ".", "\n"])
def _get_recent_strategies(self) -> str:
"""Get recently successful strategies for context."""
recent = sorted(self._signatures.values(), key=lambda e: e["ts"], reverse=True)[:5]
return ", ".join(e["strategy"] for e in recent if e["success_count"] > e["fail_count"])