| """ |
| Predictive Engine — anticipates agent's next steps and pre-computes. |
| |
| Ultra-lightweight strategy prediction using pattern matching and |
| past execution history. No ML framework needed — uses compressed |
| execution signatures and similarity matching. |
| """ |
| import os |
| import json |
| import time |
| import hashlib |
| from typing import Optional |
| from dataclasses import dataclass, field |
|
|
| _MAX_SIGNATURES = int(os.getenv("ADAM_PREDICTIVE_SIGNATURES", "200")) |
|
|
|
|
| @dataclass |
| class StrategyPrediction: |
| strategy: str |
| confidence: float |
| execution_path: list[str] = field(default_factory=list) |
|
|
|
|
| class PredictiveEngine: |
| """ |
| Predicts optimal strategies for goals based on past execution patterns. |
| |
| Maintains a compressed signature store of past goal→strategy mappings. |
| Uses fast cosine similarity over hashed n-gram features. |
| """ |
|
|
| def __init__(self, llm_call_fn=None): |
| self._llm = llm_call_fn |
| self._signatures: dict[str, dict] = {} |
| self._precomputed: dict[str, str] = {} |
|
|
| async def predict_strategy(self, goal: str) -> Optional[StrategyPrediction]: |
| """Predict the best strategy for a goal based on past executions.""" |
| goal_sig = self._compute_signature(goal) |
|
|
| |
| if goal_sig in self._signatures: |
| entry = self._signatures[goal_sig] |
| return StrategyPrediction( |
| strategy=entry["strategy"], |
| confidence=0.95, |
| execution_path=entry.get("path", []), |
| ) |
|
|
| |
| best_sim = 0.0 |
| best_entry = None |
| for sig, entry in self._signatures.items(): |
| sim = self._jaccard_similarity(goal_sig, sig) |
| if sim > best_sim: |
| best_sim = sim |
| best_entry = entry |
|
|
| if best_sim > 0.6 and best_entry: |
| return StrategyPrediction( |
| strategy=best_entry["strategy"], |
| confidence=best_sim, |
| execution_path=best_entry.get("path", []), |
| ) |
|
|
| |
| if self._llm and self._is_complex_goal(goal): |
| return await self._llm_predict(goal) |
|
|
| return None |
|
|
| async def _llm_predict(self, goal: str) -> Optional[StrategyPrediction]: |
| """Use LLM to predict strategy for novel goals.""" |
| prompt = f"""Predict the optimal execution strategy for this goal. |
| Choose ONE: code_forge, web_forge, api_forge, knowledge_forge, meta_forge, direct_reply |
| |
| Goal: {goal} |
| Context: {self._get_recent_strategies()} |
| |
| Return JSON: {{"strategy": "...", "confidence": 0.0-1.0, "reasoning": "..."}} |
| """ |
| try: |
| raw = await self._llm(prompt, model_hint="fast", max_tokens=200) |
| import re, json as j |
| match = re.search(r'\{[^{}]*\}', raw, re.DOTALL) |
| if match: |
| data = j.loads(match.group(0)) |
| return StrategyPrediction( |
| strategy=data.get("strategy", "code_forge"), |
| confidence=float(data.get("confidence", 0.5)), |
| ) |
| except Exception: |
| pass |
| return None |
|
|
| async def precompute_next(self, goal: str, completed_ids: set, node_results: dict): |
| """Background: precompute likely next nodes while user waits.""" |
| cache_key = f"{hashlib.md5(goal.encode()).hexdigest()}:{len(completed_ids)}" |
| if cache_key in self._precomputed: |
| return |
|
|
| if self._llm and len(completed_ids) > 0: |
| prompt = f"""Goal: {goal} |
| Completed steps: {len(completed_ids)} |
| |
| What is the most likely next action needed? |
| Respond with a single short phrase. |
| """ |
| try: |
| result = await self._llm(prompt, model_hint="fast", max_tokens=50) |
| self._precomputed[cache_key] = result |
| except Exception: |
| pass |
|
|
| def record_strategy(self, goal: str, strategy: str, success: bool, latency_ms: int): |
| """Record a strategy execution for future predictions.""" |
| goal_sig = self._compute_signature(goal) |
|
|
| if goal_sig not in self._signatures: |
| if len(self._signatures) >= _MAX_SIGNATURES: |
| |
| oldest = min(self._signatures.keys(), key=lambda k: self._signatures[k].get("ts", 0)) |
| del self._signatures[oldest] |
|
|
| self._signatures[goal_sig] = { |
| "strategy": strategy, |
| "success_count": 0, |
| "fail_count": 0, |
| "avg_latency": 0.0, |
| "path": [], |
| "ts": time.time(), |
| } |
|
|
| entry = self._signatures[goal_sig] |
| entry["success_count"] += 1 if success else 0 |
| entry["fail_count"] += 0 if success else 1 |
| entry["avg_latency"] = (entry["avg_latency"] * 0.7 + latency_ms * 0.3) |
| entry["ts"] = time.time() |
|
|
| def _compute_signature(self, text: str) -> str: |
| """Compute a compressed n-gram signature of the text.""" |
| words = text.lower().split()[:20] |
| ngrams = set() |
| for i in range(len(words) - 1): |
| ngrams.add(f"{words[i]}_{words[i+1]}") |
| return "|".join(sorted(ngrams)) if ngrams else text[:50] |
|
|
| def _jaccard_similarity(self, sig1: str, sig2: str) -> float: |
| """Compute Jaccard similarity between two signatures.""" |
| set1 = set(sig1.split("|")) |
| set2 = set(sig2.split("|")) |
| if not set1 and not set2: |
| return 0.0 |
| intersection = len(set1 & set2) |
| union = len(set1 | set2) |
| return intersection / union if union > 0 else 0.0 |
|
|
| def _is_complex_goal(self, goal: str) -> bool: |
| """Determine if a goal is complex enough to warrant LLM prediction.""" |
| return len(goal) > 30 or any(c in goal for c in ["?", "!", ".", "\n"]) |
|
|
| def _get_recent_strategies(self) -> str: |
| """Get recently successful strategies for context.""" |
| recent = sorted(self._signatures.values(), key=lambda e: e["ts"], reverse=True)[:5] |
| return ", ".join(e["strategy"] for e in recent if e["success_count"] > e["fail_count"]) |
|
|