Spaces:
Running
Running
ADAM v2.0: Advanced Agentic Mesh — DAG orchestrator, cognition, knowledge web, forge tools, runtime optimization
ba2ada2 | """ | |
| Predictive Engine — anticipates agent's next steps and pre-computes. | |
| Ultra-lightweight strategy prediction using pattern matching and | |
| past execution history. No ML framework needed — uses compressed | |
| execution signatures and similarity matching. | |
| """ | |
| import os | |
| import json | |
| import time | |
| import hashlib | |
| from typing import Optional | |
| from dataclasses import dataclass, field | |
| _MAX_SIGNATURES = int(os.getenv("ADAM_PREDICTIVE_SIGNATURES", "200")) | |
| class StrategyPrediction: | |
| strategy: str | |
| confidence: float | |
| execution_path: list[str] = field(default_factory=list) | |
| class PredictiveEngine: | |
| """ | |
| Predicts optimal strategies for goals based on past execution patterns. | |
| Maintains a compressed signature store of past goal→strategy mappings. | |
| Uses fast cosine similarity over hashed n-gram features. | |
| """ | |
| def __init__(self, llm_call_fn=None): | |
| self._llm = llm_call_fn | |
| self._signatures: dict[str, dict] = {} | |
| self._precomputed: dict[str, str] = {} | |
| async def predict_strategy(self, goal: str) -> Optional[StrategyPrediction]: | |
| """Predict the best strategy for a goal based on past executions.""" | |
| goal_sig = self._compute_signature(goal) | |
| # Fast exact match | |
| if goal_sig in self._signatures: | |
| entry = self._signatures[goal_sig] | |
| return StrategyPrediction( | |
| strategy=entry["strategy"], | |
| confidence=0.95, | |
| execution_path=entry.get("path", []), | |
| ) | |
| # Similarity match | |
| best_sim = 0.0 | |
| best_entry = None | |
| for sig, entry in self._signatures.items(): | |
| sim = self._jaccard_similarity(goal_sig, sig) | |
| if sim > best_sim: | |
| best_sim = sim | |
| best_entry = entry | |
| if best_sim > 0.6 and best_entry: | |
| return StrategyPrediction( | |
| strategy=best_entry["strategy"], | |
| confidence=best_sim, | |
| execution_path=best_entry.get("path", []), | |
| ) | |
| # LLM-based prediction for novel goals | |
| if self._llm and self._is_complex_goal(goal): | |
| return await self._llm_predict(goal) | |
| return None | |
| async def _llm_predict(self, goal: str) -> Optional[StrategyPrediction]: | |
| """Use LLM to predict strategy for novel goals.""" | |
| prompt = f"""Predict the optimal execution strategy for this goal. | |
| Choose ONE: code_forge, web_forge, api_forge, knowledge_forge, meta_forge, direct_reply | |
| Goal: {goal} | |
| Context: {self._get_recent_strategies()} | |
| Return JSON: {{"strategy": "...", "confidence": 0.0-1.0, "reasoning": "..."}} | |
| """ | |
| try: | |
| raw = await self._llm(prompt, model_hint="fast", max_tokens=200) | |
| import re, json as j | |
| match = re.search(r'\{[^{}]*\}', raw, re.DOTALL) | |
| if match: | |
| data = j.loads(match.group(0)) | |
| return StrategyPrediction( | |
| strategy=data.get("strategy", "code_forge"), | |
| confidence=float(data.get("confidence", 0.5)), | |
| ) | |
| except Exception: | |
| pass | |
| return None | |
| async def precompute_next(self, goal: str, completed_ids: set, node_results: dict): | |
| """Background: precompute likely next nodes while user waits.""" | |
| cache_key = f"{hashlib.md5(goal.encode()).hexdigest()}:{len(completed_ids)}" | |
| if cache_key in self._precomputed: | |
| return | |
| if self._llm and len(completed_ids) > 0: | |
| prompt = f"""Goal: {goal} | |
| Completed steps: {len(completed_ids)} | |
| What is the most likely next action needed? | |
| Respond with a single short phrase. | |
| """ | |
| try: | |
| result = await self._llm(prompt, model_hint="fast", max_tokens=50) | |
| self._precomputed[cache_key] = result | |
| except Exception: | |
| pass | |
| def record_strategy(self, goal: str, strategy: str, success: bool, latency_ms: int): | |
| """Record a strategy execution for future predictions.""" | |
| goal_sig = self._compute_signature(goal) | |
| if goal_sig not in self._signatures: | |
| if len(self._signatures) >= _MAX_SIGNATURES: | |
| # Evict oldest | |
| oldest = min(self._signatures.keys(), key=lambda k: self._signatures[k].get("ts", 0)) | |
| del self._signatures[oldest] | |
| self._signatures[goal_sig] = { | |
| "strategy": strategy, | |
| "success_count": 0, | |
| "fail_count": 0, | |
| "avg_latency": 0.0, | |
| "path": [], | |
| "ts": time.time(), | |
| } | |
| entry = self._signatures[goal_sig] | |
| entry["success_count"] += 1 if success else 0 | |
| entry["fail_count"] += 0 if success else 1 | |
| entry["avg_latency"] = (entry["avg_latency"] * 0.7 + latency_ms * 0.3) | |
| entry["ts"] = time.time() | |
| def _compute_signature(self, text: str) -> str: | |
| """Compute a compressed n-gram signature of the text.""" | |
| words = text.lower().split()[:20] | |
| ngrams = set() | |
| for i in range(len(words) - 1): | |
| ngrams.add(f"{words[i]}_{words[i+1]}") | |
| return "|".join(sorted(ngrams)) if ngrams else text[:50] | |
| def _jaccard_similarity(self, sig1: str, sig2: str) -> float: | |
| """Compute Jaccard similarity between two signatures.""" | |
| set1 = set(sig1.split("|")) | |
| set2 = set(sig2.split("|")) | |
| if not set1 and not set2: | |
| return 0.0 | |
| intersection = len(set1 & set2) | |
| union = len(set1 | set2) | |
| return intersection / union if union > 0 else 0.0 | |
| def _is_complex_goal(self, goal: str) -> bool: | |
| """Determine if a goal is complex enough to warrant LLM prediction.""" | |
| return len(goal) > 30 or any(c in goal for c in ["?", "!", ".", "\n"]) | |
| def _get_recent_strategies(self) -> str: | |
| """Get recently successful strategies for context.""" | |
| recent = sorted(self._signatures.values(), key=lambda e: e["ts"], reverse=True)[:5] | |
| return ", ".join(e["strategy"] for e in recent if e["success_count"] > e["fail_count"]) | |