File size: 24,666 Bytes
c5828bc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 | """
BaseAgent - Recursive Cognitive Agent Framework for AGI-HEDGE-FUND
This module implements the foundational cognitive architecture for all investment agents.
Each agent inherits from this base class to enable:
- Recursive reasoning loops with customizable depth
- Transparent attribution tracing for decision provenance
- Temporal memory shell with configurable decay
- Value-weighted decision encoding
- Symbolic state representation for interpretability
Internal Note: Base class encodes the recursive interpretability interface (.p/ command patterns)
in line with Anthropic-style Circuit Tracing research while maintaining familiar investment syntax.
"""
import uuid
import datetime
from typing import Dict, List, Any, Optional, Tuple
import numpy as np
from pydantic import BaseModel, Field
from ..cognition.graph import ReasoningGraph
from ..cognition.memory import MemoryShell
from ..cognition.attribution import AttributionTracer
from ..llm.router import ModelRouter
from ..utils.diagnostics import TracingTools
class AgentSignal(BaseModel):
"""Signal generated by an agent with full attribution and confidence metrics."""
ticker: str = Field(..., description="Stock ticker symbol")
action: str = Field(..., description="buy, sell, or hold")
confidence: float = Field(..., description="Confidence level (0.0-1.0)")
quantity: Optional[int] = Field(None, description="Number of shares to trade")
reasoning: str = Field(..., description="Explicit reasoning chain")
intent: str = Field(..., description="High-level investment intent")
value_basis: str = Field(..., description="Core value driving this decision")
attribution_trace: Dict[str, float] = Field(default_factory=dict, description="Causal attribution weights")
drift_signature: Dict[str, float] = Field(default_factory=dict, description="Belief drift metrics")
signal_id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique signal identifier")
timestamp: datetime.datetime = Field(default_factory=datetime.datetime.now)
class AgentState(BaseModel):
"""Persistent agent state with temporal memory and belief dynamics."""
working_memory: Dict[str, Any] = Field(default_factory=dict, description="Short-term memory")
belief_state: Dict[str, float] = Field(default_factory=dict, description="Current belief distribution")
confidence_history: List[float] = Field(default_factory=list, description="Historical confidence")
decision_history: List[Dict[str, Any]] = Field(default_factory=list, description="Past decisions")
performance_trace: Dict[str, float] = Field(default_factory=dict, description="Performance metrics")
reflective_state: Dict[str, Any] = Field(default_factory=dict, description="Self-awareness metrics")
drift_vector: Dict[str, float] = Field(default_factory=dict, description="Direction of belief evolution")
consistency_score: float = Field(default=1.0, description="Internal consistency metric")
last_update: datetime.datetime = Field(default_factory=datetime.datetime.now)
class BaseAgent:
"""
Base class for all investment agents in the AGI-HEDGE-FUND framework.
Implements the core cognitive architecture including:
- Recursive reasoning loops
- Memory persistence
- Attribution tracing
- Value-weighted decision making
- Symbolic state representation
"""
def __init__(
self,
name: str,
philosophy: str,
reasoning_depth: int = 3,
memory_decay: float = 0.2,
initial_capital: float = 100000.0,
model_provider: str = "anthropic",
model_name: str = "claude-3-sonnet-20240229",
trace_enabled: bool = False,
):
"""
Initialize a cognitive investment agent.
Args:
name: Agent identifier name
philosophy: Investment philosophy description
reasoning_depth: Depth of recursive reasoning (higher = deeper thinking)
memory_decay: Rate of memory deterioration (0.0-1.0)
initial_capital: Starting capital amount
model_provider: LLM provider ("anthropic", "openai", "groq", "ollama", "deepseek")
model_name: Specific model identifier
trace_enabled: Whether to generate full reasoning traces
"""
self.id = str(uuid.uuid4())
self.name = name
self.philosophy = philosophy
self.reasoning_depth = reasoning_depth
self.memory_decay = memory_decay
self.initial_capital = initial_capital
self.current_capital = initial_capital
self.trace_enabled = trace_enabled
# Initialize cognitive components
self.state = AgentState()
self.memory_shell = MemoryShell(decay_rate=memory_decay)
self.attribution_tracer = AttributionTracer()
self.llm = ModelRouter(provider=model_provider, model=model_name)
# Initialize reasoning graph
self.reasoning_graph = ReasoningGraph(
agent_name=self.name,
agent_philosophy=self.philosophy,
model_router=self.llm,
)
# Diagnostics and tracing
self.tracer = TracingTools(agent_id=self.id, agent_name=self.name)
# Internal symbolic processing commands
self._commands = {
"reflect.trace": self._reflect_trace,
"fork.signal": self._fork_signal,
"collapse.detect": self._collapse_detect,
"attribute.weight": self._attribute_weight,
"drift.observe": self._drift_observe,
}
def process_market_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Process market data through agent's cognitive lens.
Args:
data: Market data dictionary
Returns:
Processed market data with agent-specific insights
"""
# Each agent subclass implements its unique market interpretation
raise NotImplementedError("Agent subclasses must implement process_market_data")
def generate_signals(self, processed_data: Dict[str, Any]) -> List[AgentSignal]:
"""
Generate investment signals based on processed market data.
Args:
processed_data: Processed market data
Returns:
List of investment signals with attribution
"""
# Each agent subclass implements its unique signal generation logic
raise NotImplementedError("Agent subclasses must implement generate_signals")
def update_state(self, market_feedback: Dict[str, Any]) -> None:
"""
Update agent's internal state based on market feedback.
Args:
market_feedback: Dictionary containing market performance data
"""
# Update memory shell with new experiences
self.memory_shell.add_experience(market_feedback)
# Update belief state based on market feedback
self._update_beliefs(market_feedback)
# Update performance metrics
self._update_performance_metrics(market_feedback)
# Apply memory decay
self.memory_shell.apply_decay()
# Update timestamp
self.state.last_update = datetime.datetime.now()
def _update_beliefs(self, market_feedback: Dict[str, Any]) -> None:
"""
Update agent's belief state based on market feedback.
Args:
market_feedback: Dictionary containing market performance data
"""
# Extract relevant signals from market feedback
if 'performance' in market_feedback:
performance = market_feedback['performance']
# Record decision outcomes
if 'decisions' in market_feedback:
for decision in market_feedback['decisions']:
self.state.decision_history.append({
'decision': decision,
'outcome': performance.get(decision.get('ticker'), 0),
'timestamp': datetime.datetime.now()
})
# Update belief state based on performance
for ticker, perf in performance.items():
current_belief = self.state.belief_state.get(ticker, 0.5)
# Belief update formula combines prior belief with new evidence
updated_belief = (current_belief * (1 - self.memory_decay) +
np.tanh(perf) * self.memory_decay)
self.state.belief_state[ticker] = updated_belief
# Track belief drift
if ticker in self.state.belief_state:
drift = updated_belief - current_belief
self.state.drift_vector[ticker] = drift
def _update_performance_metrics(self, market_feedback: Dict[str, Any]) -> None:
"""
Update agent's performance metrics based on market feedback.
Args:
market_feedback: Dictionary containing market performance data
"""
if 'portfolio_value' in market_feedback:
# Calculate return
portfolio_value = market_feedback['portfolio_value']
prior_value = self.state.performance_trace.get('portfolio_value', self.initial_capital)
returns = (portfolio_value - prior_value) / prior_value if prior_value > 0 else 0
# Update performance trace
self.state.performance_trace.update({
'portfolio_value': portfolio_value,
'returns': returns,
'cumulative_return': (portfolio_value / self.initial_capital) - 1,
'win_rate': market_feedback.get('win_rate', 0),
'sharpe_ratio': market_feedback.get('sharpe_ratio', 0),
})
# Update current capital
self.current_capital = portfolio_value
# Add to confidence history
avg_confidence = market_feedback.get('avg_confidence', 0.5)
self.state.confidence_history.append(avg_confidence)
# Update consistency score
self._update_consistency_score(market_feedback)
def _update_consistency_score(self, market_feedback: Dict[str, Any]) -> None:
"""
Update agent's internal consistency score.
Args:
market_feedback: Dictionary containing market performance data
"""
# Measure consistency between stated reasoning and actual performance
if 'decisions' in market_feedback and 'performance' in market_feedback:
performance = market_feedback['performance']
decision_consistency = []
for decision in market_feedback['decisions']:
ticker = decision.get('ticker')
expected_direction = 1 if decision.get('action') == 'buy' else -1
actual_performance = performance.get(ticker, 0)
actual_direction = 1 if actual_performance > 0 else -1
# Consistency is 1 when directions match, -1 when they don't
direction_consistency = 1 if expected_direction == actual_direction else -1
decision_consistency.append(direction_consistency)
# Update consistency score (moving average)
if decision_consistency:
avg_consistency = sum(decision_consistency) / len(decision_consistency)
current_consistency = self.state.consistency_score
self.state.consistency_score = (current_consistency * 0.8) + (avg_consistency * 0.2)
def attribute_signals(self, signals: List[Dict[str, Any]]) -> List[AgentSignal]:
"""
Add attribution traces to raw signals.
Args:
signals: Raw investment signals
Returns:
Signals with attribution traces
"""
attributed_signals = []
for signal in signals:
# Run attribution tracing
attribution = self.attribution_tracer.trace_attribution(
signal=signal,
agent_state=self.state,
reasoning_depth=self.reasoning_depth
)
# Create AgentSignal with attribution
agent_signal = AgentSignal(
ticker=signal.get('ticker', ''),
action=signal.get('action', 'hold'),
confidence=signal.get('confidence', 0.5),
quantity=signal.get('quantity'),
reasoning=signal.get('reasoning', ''),
intent=signal.get('intent', ''),
value_basis=signal.get('value_basis', ''),
attribution_trace=attribution.get('attribution_trace', {}),
drift_signature=self.state.drift_vector,
)
attributed_signals.append(agent_signal)
# Record in tracer if enabled
if self.trace_enabled:
self.tracer.record_signal(agent_signal)
return attributed_signals
def reset(self) -> None:
"""Reset agent to initial state while preserving memory decay pattern."""
# Reset capital
self.current_capital = self.initial_capital
# Reset state but preserve some learning
belief_state = self.state.belief_state.copy()
drift_vector = self.state.drift_vector.copy()
# Create new state
self.state = AgentState(
belief_state=belief_state,
drift_vector=drift_vector,
)
# Apply memory decay to preserved beliefs
for key in self.state.belief_state:
self.state.belief_state[key] *= (1 - self.memory_decay)
def get_state_report(self) -> Dict[str, Any]:
"""
Generate a detailed report of agent's current state.
Returns:
Dictionary containing agent state information
"""
return {
'agent_id': self.id,
'agent_name': self.name,
'philosophy': self.philosophy,
'reasoning_depth': self.reasoning_depth,
'capital': self.current_capital,
'belief_state': self.state.belief_state,
'performance': self.state.performance_trace,
'consistency': self.state.consistency_score,
'decision_count': len(self.state.decision_history),
'avg_confidence': np.mean(self.state.confidence_history[-10:]) if self.state.confidence_history else 0.5,
'drift_vector': self.state.drift_vector,
'memory_decay': self.memory_decay,
'timestamp': datetime.datetime.now(),
}
def save_state(self, filepath: Optional[str] = None) -> Dict[str, Any]:
"""
Save agent state to file or return serializable state.
Args:
filepath: Optional path to save state
Returns:
Dictionary containing serializable agent state
"""
state_dict = {
'agent_id': self.id,
'agent_name': self.name,
'agent_state': self.state.dict(),
'memory_shell': self.memory_shell.export_state(),
'reasoning_depth': self.reasoning_depth,
'memory_decay': self.memory_decay,
'current_capital': self.current_capital,
'initial_capital': self.initial_capital,
'timestamp': datetime.datetime.now().isoformat(),
}
if filepath:
import json
with open(filepath, 'w') as f:
json.dump(state_dict, f)
return state_dict
def load_state(self, state_dict: Dict[str, Any]) -> None:
"""
Load agent state from dictionary.
Args:
state_dict: Dictionary containing agent state
"""
if state_dict['agent_id'] != self.id:
raise ValueError(f"State ID mismatch: {state_dict['agent_id']} vs {self.id}")
# Update basic attributes
self.reasoning_depth = state_dict.get('reasoning_depth', self.reasoning_depth)
self.memory_decay = state_dict.get('memory_decay', self.memory_decay)
self.current_capital = state_dict.get('current_capital', self.current_capital)
self.initial_capital = state_dict.get('initial_capital', self.initial_capital)
# Load agent state
if 'agent_state' in state_dict:
self.state = AgentState.parse_obj(state_dict['agent_state'])
# Load memory shell
if 'memory_shell' in state_dict:
self.memory_shell.import_state(state_dict['memory_shell'])
# Internal symbolic command processors
def _reflect_trace(self, agent=None, depth=3) -> Dict[str, Any]:
"""
Trace agent's self-reflection on decision making process.
Args:
agent: Optional agent name to reflect on (self if None)
depth: Recursion depth for reflection
Returns:
Reflection trace results
"""
# If reflecting on self
if agent is None or agent == self.name:
reflection = self.reasoning_graph.run_reflection(
agent_state=self.state,
depth=depth,
trace_enabled=self.trace_enabled
)
return {
'source': 'self',
'reflection': reflection,
'depth': depth,
'confidence': reflection.get('confidence', 0.5),
'timestamp': datetime.datetime.now(),
}
# Otherwise, this is a request to reflect on another agent (handled by portfolio manager)
return {
'source': 'external',
'target': agent,
'depth': depth,
'status': 'delegated',
'timestamp': datetime.datetime.now(),
}
def _fork_signal(self, source) -> Dict[str, Any]:
"""
Fork a new signal branch from specified source.
Args:
source: Source to fork signal from
Returns:
Forked signal results
"""
if source == 'memory':
# Fork from memory
experiences = self.memory_shell.get_relevant_experiences(limit=3)
forked_signals = self.reasoning_graph.generate_from_experiences(experiences)
return {
'source': 'memory',
'signals': forked_signals,
'count': len(forked_signals),
'timestamp': datetime.datetime.now(),
}
elif source == 'beliefs':
# Fork from current beliefs
top_beliefs = dict(sorted(
self.state.belief_state.items(),
key=lambda x: abs(x[1] - 0.5),
reverse=True
)[:5])
forked_signals = self.reasoning_graph.generate_from_beliefs(top_beliefs)
return {
'source': 'beliefs',
'signals': forked_signals,
'count': len(forked_signals),
'timestamp': datetime.datetime.now(),
}
else:
# Unknown source
return {
'source': source,
'signals': [],
'count': 0,
'error': 'unknown_source',
'timestamp': datetime.datetime.now(),
}
def _collapse_detect(self, threshold=0.7, reason=None) -> Dict[str, Any]:
"""
Detect potential decision collapse or inconsistency.
Args:
threshold: Consistency threshold below which to trigger collapse detection
reason: Optional specific reason to check for collapse
Returns:
Collapse detection results
"""
# Check consistency score
consistency_collapse = self.state.consistency_score < threshold
# Check for specific collapses
collapses = {
'consistency': consistency_collapse,
'confidence': np.mean(self.state.confidence_history[-5:]) < threshold if len(self.state.confidence_history) >= 5 else False,
'belief_drift': any(abs(drift) > 0.3 for drift in self.state.drift_vector.values()),
'performance': self.state.performance_trace.get('returns', 0) < -0.1 if self.state.performance_trace else False,
}
# If specific reason provided, check only that
if reason and reason in collapses:
collapse_detected = collapses[reason]
collapse_reasons = {reason: collapses[reason]}
else:
# Check all collapses
collapse_detected = any(collapses.values())
collapse_reasons = {k: v for k, v in collapses.items() if v}
return {
'collapse_detected': collapse_detected,
'collapse_reasons': collapse_reasons,
'consistency_score': self.state.consistency_score,
'threshold': threshold,
'timestamp': datetime.datetime.now(),
}
def _attribute_weight(self, justification) -> Dict[str, Any]:
"""
Compute attribution weight for a specific justification.
Args:
justification: Justification to weight
Returns:
Attribution weight results
"""
# Extract key themes from justification
themes = self.reasoning_graph.extract_themes(justification)
# Compute alignment with agent's philosophy
philosophy_alignment = self.reasoning_graph.compute_alignment(
themes=themes,
philosophy=self.philosophy
)
# Compute weight based on alignment and consistency
weight = philosophy_alignment * self.state.consistency_score
return {
'justification': justification,
'themes': themes,
'philosophy_alignment': philosophy_alignment,
'consistency_factor': self.state.consistency_score,
'attribution_weight': weight,
'timestamp': datetime.datetime.now(),
}
def _drift_observe(self, vector, bias=0.0) -> Dict[str, Any]:
"""
Observe and record belief drift.
Args:
vector: Drift vector to observe
bias: Optional bias adjustment
Returns:
Drift observation results
"""
# Record drift observation
for key, value in vector.items():
if key in self.state.drift_vector:
# Existing key, update with bias
self.state.drift_vector[key] = (self.state.drift_vector[key] * 0.7) + (value * 0.3) + bias
else:
# New key
self.state.drift_vector[key] = value + bias
# Compute drift magnitude
drift_magnitude = np.sqrt(sum(v**2 for v in self.state.drift_vector.values()))
return {
'drift_vector': self.state.drift_vector,
'drift_magnitude': drift_magnitude,
'bias_applied': bias,
'observation_count': len(vector),
'timestamp': datetime.datetime.now(),
}
def execute_command(self, command: str, **kwargs) -> Dict[str, Any]:
"""
Execute an internal symbolic command.
Args:
command: Command to execute
**kwargs: Command parameters
Returns:
Command execution results
"""
if command in self._commands:
return self._commands[command](**kwargs)
else:
return {
'error': 'unknown_command',
'command': command,
'available_commands': list(self._commands.keys()),
'timestamp': datetime.datetime.now(),
}
def __repr__(self) -> str:
"""Generate string representation of agent."""
return f"{self.name} Agent (ID: {self.id[:8]}, Philosophy: {self.philosophy})"
def __str__(self) -> str:
"""Generate human-readable string for agent."""
return f"{self.name} Agent: {self.philosophy} (Depth: {self.reasoning_depth})"
|