""" AttributionTracer - Decision Provenance and Causal Tracing Framework This module implements the attribution tracing architecture that enables transparent decision provenance for all agents in the AGI-HEDGE-FUND system. Key capabilities: - Multi-level attribution across reasoning chains - Causal tracing from decision back to evidence - Confidence weighting of attribution factors - Value-weighted attribution alignment - Attribution visualization for interpretability Internal Note: The attribution tracer encodes the ECHO-ATTRIBUTION and ATTRIBUTION-REFLECT interpretability shells for causal path tracing and attribution transparency. """ import datetime import uuid import math from typing import Dict, List, Any, Optional, Tuple, Set import numpy as np from collections import defaultdict from pydantic import BaseModel, Field class AttributionEntry(BaseModel): """Single attribution entry linking a decision to a cause.""" id: str = Field(default_factory=lambda: str(uuid.uuid4())) source: str = Field(...) # Source ID (e.g., memory ID, evidence ID) source_type: str = Field(...) # Type of source (e.g., "memory", "evidence", "reasoning") target: str = Field(...) # Target ID (e.g., decision ID, reasoning step) weight: float = Field(default=1.0) # Attribution weight (0-1) confidence: float = Field(default=1.0) # Confidence in attribution (0-1) timestamp: datetime.datetime = Field(default_factory=datetime.datetime.now) description: Optional[str] = Field(default=None) # Optional attribution description value_alignment: Optional[float] = Field(default=None) # Alignment with agent values (0-1) class AttributionChain(BaseModel): """Chain of attribution entries forming a causal path.""" id: str = Field(default_factory=lambda: str(uuid.uuid4())) entries: List[AttributionEntry] = Field(default_factory=list) start_point: str = Field(...) # ID of chain origin end_point: str = Field(...) # ID of chain destination total_weight: float = Field(default=1.0) # Product of weights along chain confidence: float = Field(default=1.0) # Overall chain confidence timestamp: datetime.datetime = Field(default_factory=datetime.datetime.now) class AttributionGraph(BaseModel): """Complete attribution graph for a decision.""" id: str = Field(default_factory=lambda: str(uuid.uuid4())) decision_id: str = Field(...) # ID of the decision being attributed chains: List[AttributionChain] = Field(default_factory=list) sources: Dict[str, Dict[str, Any]] = Field(default_factory=dict) # Source metadata timestamp: datetime.datetime = Field(default_factory=datetime.datetime.now) def add_chain(self, chain: AttributionChain) -> None: """Add attribution chain to graph.""" self.chains.append(chain) def add_source(self, source_id: str, metadata: Dict[str, Any]) -> None: """Add source metadata to graph.""" self.sources[source_id] = metadata def calculate_source_contributions(self) -> Dict[str, float]: """Calculate normalized contribution of each source to decision.""" # Initialize contributions contributions = defaultdict(float) # Sum weights from all chains for chain in self.chains: for entry in chain.entries: # Add contribution weighted by chain confidence contributions[entry.source] += entry.weight * chain.confidence # Normalize contributions total = sum(contributions.values()) if total > 0: for source in contributions: contributions[source] /= total return dict(contributions) class AttributionTracer: """ Attribution tracing engine for causal decision provenance. Enables: - Tracing the causal path from decisions back to evidence - Weighting attribution factors by confidence and relevance - Aligning attribution with agent value system - Visualizing attribution patterns for interpretability """ def __init__(self): """Initialize attribution tracer.""" self.attribution_history: Dict[str, AttributionGraph] = {} self.trace_registry: Dict[str, Dict[str, Any]] = {} self.value_weights: Dict[str, float] = {} def trace_attribution(self, signal: Dict[str, Any], agent_state: Dict[str, Any], reasoning_depth: int = 3) -> Dict[str, Any]: """ Trace attribution for a decision signal. Args: signal: Decision signal agent_state: Agent's current state reasoning_depth: Depth of attribution tracing Returns: Attribution trace results """ # Generate decision ID if not present decision_id = signal.get("signal_id", str(uuid.uuid4())) # Create attribution graph attribution_graph = AttributionGraph( decision_id=decision_id, ) # Extract signal components for attribution ticker = signal.get("ticker", "") action = signal.get("action", "") confidence = signal.get("confidence", 0.5) reasoning = signal.get("reasoning", "") intent = signal.get("intent", "") value_basis = signal.get("value_basis", "") # Extract evidence sources from agent state evidence_sources = self._extract_evidence_sources(agent_state, ticker, action) # Process reasoning to extract reasoning steps reasoning_steps = self._extract_reasoning_steps(reasoning) # Generate attribution chains chains = self._generate_attribution_chains( decision_id=decision_id, evidence_sources=evidence_sources, reasoning_steps=reasoning_steps, intent=intent, value_basis=value_basis, confidence=confidence, reasoning_depth=reasoning_depth ) # Add chains to graph for chain in chains: attribution_graph.add_chain(chain) # Add source metadata for source_id, metadata in evidence_sources.items(): attribution_graph.add_source(source_id, metadata) # Calculate source contributions source_contributions = attribution_graph.calculate_source_contributions() # Store in history # Store in history self.attribution_history[decision_id] = attribution_graph # Prepare result trace_id = str(uuid.uuid4()) # Store trace in registry self.trace_registry[trace_id] = { "attribution_graph": attribution_graph, "decision_id": decision_id, "timestamp": datetime.datetime.now(), } # Create attribution trace output attribution_trace = { "trace_id": trace_id, "decision_id": decision_id, "attribution_map": source_contributions, "confidence": confidence, "top_factors": self._get_top_attribution_factors(source_contributions, 5), "value_alignment": self._calculate_value_alignment(value_basis, source_contributions), "reasoning_depth": reasoning_depth, "timestamp": datetime.datetime.now().isoformat(), } return attribution_trace def _extract_evidence_sources(self, agent_state: Dict[str, Any], ticker: str, action: str) -> Dict[str, Dict[str, Any]]: """ Extract evidence sources from agent state. Args: agent_state: Agent's current state ticker: Stock ticker action: Decision action Returns: Dictionary of evidence sources """ evidence_sources = {} # Extract from belief state belief_state = agent_state.get("belief_state", {}) if ticker in belief_state: source_id = f"belief:{ticker}" evidence_sources[source_id] = { "type": "belief", "ticker": ticker, "value": belief_state[ticker], "description": f"Belief about {ticker}", } # Extract from working memory working_memory = agent_state.get("working_memory", {}) # Check for ticker-specific data in working memory if ticker in working_memory: source_id = f"working_memory:{ticker}" evidence_sources[source_id] = { "type": "working_memory", "ticker": ticker, "data": working_memory[ticker], "description": f"Current analysis of {ticker}", } # Extract from performance trace if action is based on past performance performance_trace = agent_state.get("performance_trace", {}) if ticker in performance_trace: source_id = f"performance:{ticker}" evidence_sources[source_id] = { "type": "performance", "ticker": ticker, "performance": performance_trace[ticker], "description": f"Performance history of {ticker}", } # Extract from decision history decision_history = agent_state.get("decision_history", []) for i, decision in enumerate(decision_history): if decision.get("ticker") == ticker and decision.get("action") == action: source_id = f"past_decision:{i}:{ticker}" evidence_sources[source_id] = { "type": "past_decision", "ticker": ticker, "action": action, "decision": decision, "description": f"Past {action} decision for {ticker}", } return evidence_sources def _extract_reasoning_steps(self, reasoning: str) -> List[Dict[str, Any]]: """ Extract reasoning steps from reasoning string. Args: reasoning: Reasoning string Returns: List of reasoning steps """ # Simple implementation: split by periods or line breaks sentences = [s.strip() for s in reasoning.replace('\n', '. ').split('.') if s.strip()] reasoning_steps = [] for i, sentence in enumerate(sentences): step_id = f"step:{i}" reasoning_steps.append({ "id": step_id, "text": sentence, "position": i, "type": "reasoning_step", }) return reasoning_steps def _generate_attribution_chains(self, decision_id: str, evidence_sources: Dict[str, Dict[str, Any]], reasoning_steps: List[Dict[str, Any]], intent: str, value_basis: str, confidence: float, reasoning_depth: int) -> List[AttributionChain]: """ Generate attribution chains linking decision to evidence. Args: decision_id: Decision ID evidence_sources: Evidence sources reasoning_steps: Reasoning steps intent: Decision intent value_basis: Value basis for decision confidence: Decision confidence reasoning_depth: Depth of attribution tracing Returns: List of attribution chains """ attribution_chains = [] # Define end point (the decision itself) end_point = decision_id # Case 1: Direct evidence -> decision chains for source_id, source_data in evidence_sources.items(): # Create entry linking evidence directly to decision entry = AttributionEntry( source=source_id, source_type=source_data.get("type", "evidence"), target=decision_id, weight=self._calculate_evidence_weight(source_data, confidence), confidence=confidence, description=f"Direct influence of {source_data.get('description', source_id)} on decision", ) # Create chain chain = AttributionChain( entries=[entry], start_point=source_id, end_point=end_point, total_weight=entry.weight, confidence=entry.confidence, ) attribution_chains.append(chain) # Case 2: Evidence -> reasoning -> decision chains if reasoning_steps: # For each evidence source for source_id, source_data in evidence_sources.items(): # For relevant reasoning steps (limited by depth) for step in reasoning_steps[:reasoning_depth]: # Create entry linking evidence to reasoning step step_entry = AttributionEntry( source=source_id, source_type=source_data.get("type", "evidence"), target=step["id"], weight=self._calculate_step_relevance(source_data, step), confidence=confidence * 0.9, # Slightly lower confidence for indirect paths description=f"Influence of {source_data.get('description', source_id)} on reasoning step", ) # Create entry linking reasoning step to decision decision_entry = AttributionEntry( source=step["id"], source_type="reasoning_step", target=decision_id, weight=self._calculate_step_importance(step, len(reasoning_steps)), confidence=confidence, description=f"Influence of reasoning step on decision", ) # Create chain chain = AttributionChain( entries=[step_entry, decision_entry], start_point=source_id, end_point=end_point, total_weight=step_entry.weight * decision_entry.weight, confidence=min(step_entry.confidence, decision_entry.confidence), ) attribution_chains.append(chain) # Case 3: Intent/value -> decision chains if intent: intent_id = f"intent:{intent[:20]}" intent_entry = AttributionEntry( source=intent_id, source_type="intent", target=decision_id, weight=0.8, # High weight for intent confidence=confidence, description=f"Influence of stated intent on decision", ) intent_chain = AttributionChain( entries=[intent_entry], start_point=intent_id, end_point=end_point, total_weight=intent_entry.weight, confidence=intent_entry.confidence, ) attribution_chains.append(intent_chain) if value_basis: value_id = f"value:{value_basis[:20]}" value_entry = AttributionEntry( source=value_id, source_type="value", target=decision_id, weight=0.9, # Very high weight for value basis confidence=confidence, description=f"Influence of value basis on decision", value_alignment=1.0, # Perfect alignment with its own value ) value_chain = AttributionChain( entries=[value_entry], start_point=value_id, end_point=end_point, total_weight=value_entry.weight, confidence=value_entry.confidence, ) attribution_chains.append(value_chain) return attribution_chains def _calculate_evidence_weight(self, evidence: Dict[str, Any], base_confidence: float) -> float: """ Calculate weight of evidence. Args: evidence: Evidence data base_confidence: Base confidence level Returns: Evidence weight """ # Default weight weight = 0.5 # Adjust based on evidence type evidence_type = evidence.get("type", "") if evidence_type == "belief": # Weight based on belief strength (0.5-1.0) belief_value = evidence.get("value", 0.5) weight = 0.5 + (abs(belief_value - 0.5) * 0.5) elif evidence_type == "working_memory": # Working memory has high weight weight = 0.8 elif evidence_type == "performance": # Performance data moderately important weight = 0.7 elif evidence_type == "past_decision": # Past decisions less important weight = 0.6 # Scale by confidence weight *= base_confidence return min(1.0, weight) def _calculate_step_relevance(self, evidence: Dict[str, Any], step: Dict[str, Any]) -> float: """ Calculate relevance of evidence to reasoning step. Args: evidence: Evidence data step: Reasoning step Returns: Relevance weight """ # Basic implementation using text overlap evidence_desc = evidence.get("description", "") step_text = step.get("text", "") # Check for ticker mention ticker = evidence.get("ticker", "") if ticker and ticker in step_text: return 0.8 # Check for word overlap evidence_words = set(evidence_desc.lower().split()) step_words = set(step_text.lower().split()) overlap = len(evidence_words.intersection(step_words)) total_words = len(evidence_words.union(step_words)) if total_words > 0: overlap_ratio = overlap / total_words return min(1.0, 0.5 + overlap_ratio) return 0.5 def _calculate_step_importance(self, step: Dict[str, Any], total_steps: int) -> float: """ Calculate importance of reasoning step. Args: step: Reasoning step total_steps: Total number of steps Returns: Importance weight """ # Position-based importance (later steps slightly more important) position = step.get("position", 0) position_weight = 0.5 + (position / (2 * total_steps)) if total_steps > 0 else 0.5 # Length-based importance (longer steps slightly more important) text = step.get("text", "") length = len(text) length_weight = min(1.0, 0.5 + (length / 200)) # Cap at 1.0 # Combine weights return (position_weight * 0.7) + (length_weight * 0.3) def _get_top_attribution_factors(self, source_contributions: Dict[str, float], limit: int = 5) -> List[Dict[str, Any]]: """ Get top attribution factors. Args: source_contributions: Source contribution dictionary limit: Maximum number of factors to return Returns: List of top attribution factors """ # Sort contributions by weight (descending) sorted_contributions = sorted( source_contributions.items(), key=lambda x: x[1], reverse=True ) # Take top 'limit' contributions top_factors = [] for source, weight in sorted_contributions[:limit]: # Parse source type from ID source_type = source.split(":", 1)[0] if ":" in source else "unknown" top_factors.append({ "source": source, "type": source_type, "weight": weight, }) return top_factors def _calculate_value_alignment(self, value_basis: str, source_contributions: Dict[str, float]) -> float: """ Calculate value alignment score. Args: value_basis: Value basis string source_contributions: Source contribution dictionary Returns: Value alignment score """ # Simple implementation: check if value sources have high contribution value_alignment = 0.5 # Default neutral alignment # Find value-based sources value_sources = [source for source in source_contributions if source.startswith("value:")] if value_sources: # Calculate contribution of value sources value_contribution = sum(source_contributions[source] for source in value_sources) # Value alignment increases with value contribution value_alignment = 0.5 + (value_contribution * 0.5) return min(1.0, value_alignment) def get_trace(self, trace_id: str) -> Optional[Dict[str, Any]]: """ Get attribution trace by ID. Args: trace_id: Trace ID Returns: Attribution trace or None if not found """ if trace_id not in self.trace_registry: return None trace_data = self.trace_registry[trace_id] attribution_graph = trace_data.get("attribution_graph") if not attribution_graph: return None # Calculate source contributions source_contributions = attribution_graph.calculate_source_contributions() # Create attribution trace output attribution_trace = { "trace_id": trace_id, "decision_id": attribution_graph.decision_id, "attribution_map": source_contributions, "top_factors": self._get_top_attribution_factors(source_contributions, 5), "chains": len(attribution_graph.chains), "sources": len(attribution_graph.sources), "timestamp": trace_data.get("timestamp", datetime.datetime.now()).isoformat(), } return attribution_trace def get_decision_traces(self, decision_id: str) -> List[str]: """ Get trace IDs for a decision. Args: decision_id: Decision ID Returns: List of trace IDs """ return [trace_id for trace_id, trace_data in self.trace_registry.items() if trace_data.get("decision_id") == decision_id] def visualize_attribution(self, trace_id: str) -> Dict[str, Any]: """ Generate attribution visualization data. Args: trace_id: Trace ID Returns: Visualization data """ if trace_id not in self.trace_registry: return {"error": "Trace not found"} trace_data = self.trace_registry[trace_id] attribution_graph = trace_data.get("attribution_graph") if not attribution_graph: return {"error": "Attribution graph not found"} # Create nodes and links for visualization nodes = [] links = [] # Add decision node decision_id = attribution_graph.decision_id nodes.append({ "id": decision_id, "type": "decision", "label": "Decision", "size": 15, }) # Process all chains for chain_idx, chain in enumerate(attribution_graph.chains): # Add source node if not already added source_id = chain.start_point if not any(node["id"] == source_id for node in nodes): # Determine source type source_type = "unknown" if source_id.startswith("belief:"): source_type = "belief" elif source_id.startswith("working_memory:"): source_type = "working_memory" elif source_id.startswith("performance:"): source_type = "performance" elif source_id.startswith("past_decision:"): source_type = "past_decision" elif source_id.startswith("intent:"): source_type = "intent" elif source_id.startswith("value:"): source_type = "value" # Add source node nodes.append({ "id": source_id, "type": source_type, "label": source_id.split(":", 1)[1] if ":" in source_id else source_id, "size": 10, }) # Process chain entries prev_node_id = None for entry_idx, entry in enumerate(chain.entries): source_node_id = entry.source target_node_id = entry.target # Add intermediate nodes if not already added if entry.source_type == "reasoning_step" and not any(node["id"] == source_node_id for node in nodes): nodes.append({ "id": source_node_id, "type": "reasoning_step", "label": f"Step {source_node_id.split(':', 1)[1] if ':' in source_node_id else source_node_id}", "size": 8, }) # Add link links.append({ "source": source_node_id, "target": target_node_id, "value": entry.weight, "confidence": entry.confidence, "label": entry.description if entry.description else f"Weight: {entry.weight:.2f}", }) prev_node_id = target_node_id # Create visualization data visualization = { "nodes": nodes, "links": links, "trace_id": trace_id, "decision_id": decision_id, } return visualization def set_value_weights(self, value_weights: Dict[str, float]) -> None: """ Set weights for different values. Args: value_weights: Dictionary mapping value names to weights """ self.value_weights = value_weights.copy() def clear_history(self, before_timestamp: Optional[datetime.datetime] = None) -> int: """ Clear attribution history. Args: before_timestamp: Optional timestamp to clear history before Returns: Number of entries cleared """ if before_timestamp is None: # Clear all history count = len(self.attribution_history) self.attribution_history = {} self.trace_registry = {} return count # Clear history before timestamp to_remove_history = [] to_remove_registry = [] for decision_id, graph in self.attribution_history.items(): if graph.timestamp < before_timestamp: to_remove_history.append(decision_id) for trace_id, trace_data in self.trace_registry.items(): if trace_data.get("timestamp", datetime.datetime.now()) < before_timestamp: to_remove_registry.append(trace_id) # Remove from history for decision_id in to_remove_history: del self.attribution_history[decision_id] # Remove from registry for trace_id in to_remove_registry: del self.trace_registry[trace_id] return len(to_remove_history) + len(to_remove_registry)