""" Predictive Engine — anticipates agent's next steps and pre-computes. Ultra-lightweight strategy prediction using pattern matching and past execution history. No ML framework needed — uses compressed execution signatures and similarity matching. """ import os import json import time import hashlib from typing import Optional from dataclasses import dataclass, field _MAX_SIGNATURES = int(os.getenv("ADAM_PREDICTIVE_SIGNATURES", "200")) @dataclass class StrategyPrediction: strategy: str confidence: float execution_path: list[str] = field(default_factory=list) class PredictiveEngine: """ Predicts optimal strategies for goals based on past execution patterns. Maintains a compressed signature store of past goal→strategy mappings. Uses fast cosine similarity over hashed n-gram features. """ def __init__(self, llm_call_fn=None): self._llm = llm_call_fn self._signatures: dict[str, dict] = {} self._precomputed: dict[str, str] = {} async def predict_strategy(self, goal: str) -> Optional[StrategyPrediction]: """Predict the best strategy for a goal based on past executions.""" goal_sig = self._compute_signature(goal) # Fast exact match if goal_sig in self._signatures: entry = self._signatures[goal_sig] return StrategyPrediction( strategy=entry["strategy"], confidence=0.95, execution_path=entry.get("path", []), ) # Similarity match best_sim = 0.0 best_entry = None for sig, entry in self._signatures.items(): sim = self._jaccard_similarity(goal_sig, sig) if sim > best_sim: best_sim = sim best_entry = entry if best_sim > 0.6 and best_entry: return StrategyPrediction( strategy=best_entry["strategy"], confidence=best_sim, execution_path=best_entry.get("path", []), ) # LLM-based prediction for novel goals if self._llm and self._is_complex_goal(goal): return await self._llm_predict(goal) return None async def _llm_predict(self, goal: str) -> Optional[StrategyPrediction]: """Use LLM to predict strategy for novel goals.""" prompt = f"""Predict the optimal execution strategy for this goal. Choose ONE: code_forge, web_forge, api_forge, knowledge_forge, meta_forge, direct_reply Goal: {goal} Context: {self._get_recent_strategies()} Return JSON: {{"strategy": "...", "confidence": 0.0-1.0, "reasoning": "..."}} """ try: raw = await self._llm(prompt, model_hint="fast", max_tokens=200) import re, json as j match = re.search(r'\{[^{}]*\}', raw, re.DOTALL) if match: data = j.loads(match.group(0)) return StrategyPrediction( strategy=data.get("strategy", "code_forge"), confidence=float(data.get("confidence", 0.5)), ) except Exception: pass return None async def precompute_next(self, goal: str, completed_ids: set, node_results: dict): """Background: precompute likely next nodes while user waits.""" cache_key = f"{hashlib.md5(goal.encode()).hexdigest()}:{len(completed_ids)}" if cache_key in self._precomputed: return if self._llm and len(completed_ids) > 0: prompt = f"""Goal: {goal} Completed steps: {len(completed_ids)} What is the most likely next action needed? Respond with a single short phrase. """ try: result = await self._llm(prompt, model_hint="fast", max_tokens=50) self._precomputed[cache_key] = result except Exception: pass def record_strategy(self, goal: str, strategy: str, success: bool, latency_ms: int): """Record a strategy execution for future predictions.""" goal_sig = self._compute_signature(goal) if goal_sig not in self._signatures: if len(self._signatures) >= _MAX_SIGNATURES: # Evict oldest oldest = min(self._signatures.keys(), key=lambda k: self._signatures[k].get("ts", 0)) del self._signatures[oldest] self._signatures[goal_sig] = { "strategy": strategy, "success_count": 0, "fail_count": 0, "avg_latency": 0.0, "path": [], "ts": time.time(), } entry = self._signatures[goal_sig] entry["success_count"] += 1 if success else 0 entry["fail_count"] += 0 if success else 1 entry["avg_latency"] = (entry["avg_latency"] * 0.7 + latency_ms * 0.3) entry["ts"] = time.time() def _compute_signature(self, text: str) -> str: """Compute a compressed n-gram signature of the text.""" words = text.lower().split()[:20] ngrams = set() for i in range(len(words) - 1): ngrams.add(f"{words[i]}_{words[i+1]}") return "|".join(sorted(ngrams)) if ngrams else text[:50] def _jaccard_similarity(self, sig1: str, sig2: str) -> float: """Compute Jaccard similarity between two signatures.""" set1 = set(sig1.split("|")) set2 = set(sig2.split("|")) if not set1 and not set2: return 0.0 intersection = len(set1 & set2) union = len(set1 | set2) return intersection / union if union > 0 else 0.0 def _is_complex_goal(self, goal: str) -> bool: """Determine if a goal is complex enough to warrant LLM prediction.""" return len(goal) > 30 or any(c in goal for c in ["?", "!", ".", "\n"]) def _get_recent_strategies(self) -> str: """Get recently successful strategies for context.""" recent = sorted(self._signatures.values(), key=lambda e: e["ts"], reverse=True)[:5] return ", ".join(e["strategy"] for e in recent if e["success_count"] > e["fail_count"])