| | """ |
| | AttributionTracer - Decision Provenance and Causal Tracing Framework |
| | |
| | This module implements the attribution tracing architecture that enables |
| | transparent decision provenance for all agents in the AGI-HEDGE-FUND system. |
| | |
| | Key capabilities: |
| | - Multi-level attribution across reasoning chains |
| | - Causal tracing from decision back to evidence |
| | - Confidence weighting of attribution factors |
| | - Value-weighted attribution alignment |
| | - Attribution visualization for interpretability |
| | |
| | Internal Note: The attribution tracer encodes the ECHO-ATTRIBUTION and ATTRIBUTION-REFLECT |
| | interpretability shells for causal path tracing and attribution transparency. |
| | """ |
| |
|
| | import datetime |
| | import uuid |
| | import math |
| | from typing import Dict, List, Any, Optional, Tuple, Set |
| | import numpy as np |
| | from collections import defaultdict |
| |
|
| | from pydantic import BaseModel, Field |
| |
|
| |
|
| | class AttributionEntry(BaseModel): |
| | """Single attribution entry linking a decision to a cause.""" |
| | |
| | id: str = Field(default_factory=lambda: str(uuid.uuid4())) |
| | source: str = Field(...) |
| | source_type: str = Field(...) |
| | target: str = Field(...) |
| | weight: float = Field(default=1.0) |
| | confidence: float = Field(default=1.0) |
| | timestamp: datetime.datetime = Field(default_factory=datetime.datetime.now) |
| | description: Optional[str] = Field(default=None) |
| | value_alignment: Optional[float] = Field(default=None) |
| |
|
| |
|
| | class AttributionChain(BaseModel): |
| | """Chain of attribution entries forming a causal path.""" |
| | |
| | id: str = Field(default_factory=lambda: str(uuid.uuid4())) |
| | entries: List[AttributionEntry] = Field(default_factory=list) |
| | start_point: str = Field(...) |
| | end_point: str = Field(...) |
| | total_weight: float = Field(default=1.0) |
| | confidence: float = Field(default=1.0) |
| | timestamp: datetime.datetime = Field(default_factory=datetime.datetime.now) |
| |
|
| |
|
| | class AttributionGraph(BaseModel): |
| | """Complete attribution graph for a decision.""" |
| | |
| | id: str = Field(default_factory=lambda: str(uuid.uuid4())) |
| | decision_id: str = Field(...) |
| | chains: List[AttributionChain] = Field(default_factory=list) |
| | sources: Dict[str, Dict[str, Any]] = Field(default_factory=dict) |
| | timestamp: datetime.datetime = Field(default_factory=datetime.datetime.now) |
| | |
| | def add_chain(self, chain: AttributionChain) -> None: |
| | """Add attribution chain to graph.""" |
| | self.chains.append(chain) |
| | |
| | def add_source(self, source_id: str, metadata: Dict[str, Any]) -> None: |
| | """Add source metadata to graph.""" |
| | self.sources[source_id] = metadata |
| | |
| | def calculate_source_contributions(self) -> Dict[str, float]: |
| | """Calculate normalized contribution of each source to decision.""" |
| | |
| | contributions = defaultdict(float) |
| | |
| | |
| | for chain in self.chains: |
| | for entry in chain.entries: |
| | |
| | contributions[entry.source] += entry.weight * chain.confidence |
| | |
| | |
| | total = sum(contributions.values()) |
| | if total > 0: |
| | for source in contributions: |
| | contributions[source] /= total |
| | |
| | return dict(contributions) |
| |
|
| |
|
| | class AttributionTracer: |
| | """ |
| | Attribution tracing engine for causal decision provenance. |
| | |
| | Enables: |
| | - Tracing the causal path from decisions back to evidence |
| | - Weighting attribution factors by confidence and relevance |
| | - Aligning attribution with agent value system |
| | - Visualizing attribution patterns for interpretability |
| | """ |
| | |
| | def __init__(self): |
| | """Initialize attribution tracer.""" |
| | self.attribution_history: Dict[str, AttributionGraph] = {} |
| | self.trace_registry: Dict[str, Dict[str, Any]] = {} |
| | self.value_weights: Dict[str, float] = {} |
| | |
| | def trace_attribution(self, signal: Dict[str, Any], agent_state: Dict[str, Any], |
| | reasoning_depth: int = 3) -> Dict[str, Any]: |
| | """ |
| | Trace attribution for a decision signal. |
| | |
| | Args: |
| | signal: Decision signal |
| | agent_state: Agent's current state |
| | reasoning_depth: Depth of attribution tracing |
| | |
| | Returns: |
| | Attribution trace results |
| | """ |
| | |
| | decision_id = signal.get("signal_id", str(uuid.uuid4())) |
| | |
| | |
| | attribution_graph = AttributionGraph( |
| | decision_id=decision_id, |
| | ) |
| | |
| | |
| | ticker = signal.get("ticker", "") |
| | action = signal.get("action", "") |
| | confidence = signal.get("confidence", 0.5) |
| | reasoning = signal.get("reasoning", "") |
| | intent = signal.get("intent", "") |
| | value_basis = signal.get("value_basis", "") |
| | |
| | |
| | evidence_sources = self._extract_evidence_sources(agent_state, ticker, action) |
| | |
| | |
| | reasoning_steps = self._extract_reasoning_steps(reasoning) |
| | |
| | |
| | chains = self._generate_attribution_chains( |
| | decision_id=decision_id, |
| | evidence_sources=evidence_sources, |
| | reasoning_steps=reasoning_steps, |
| | intent=intent, |
| | value_basis=value_basis, |
| | confidence=confidence, |
| | reasoning_depth=reasoning_depth |
| | ) |
| | |
| | |
| | for chain in chains: |
| | attribution_graph.add_chain(chain) |
| | |
| | |
| | for source_id, metadata in evidence_sources.items(): |
| | attribution_graph.add_source(source_id, metadata) |
| | |
| | |
| | source_contributions = attribution_graph.calculate_source_contributions() |
| | |
| | |
| | |
| | self.attribution_history[decision_id] = attribution_graph |
| | |
| | |
| | trace_id = str(uuid.uuid4()) |
| | |
| | |
| | self.trace_registry[trace_id] = { |
| | "attribution_graph": attribution_graph, |
| | "decision_id": decision_id, |
| | "timestamp": datetime.datetime.now(), |
| | } |
| | |
| | |
| | attribution_trace = { |
| | "trace_id": trace_id, |
| | "decision_id": decision_id, |
| | "attribution_map": source_contributions, |
| | "confidence": confidence, |
| | "top_factors": self._get_top_attribution_factors(source_contributions, 5), |
| | "value_alignment": self._calculate_value_alignment(value_basis, source_contributions), |
| | "reasoning_depth": reasoning_depth, |
| | "timestamp": datetime.datetime.now().isoformat(), |
| | } |
| | |
| | return attribution_trace |
| | |
| | def _extract_evidence_sources(self, agent_state: Dict[str, Any], |
| | ticker: str, action: str) -> Dict[str, Dict[str, Any]]: |
| | """ |
| | Extract evidence sources from agent state. |
| | |
| | Args: |
| | agent_state: Agent's current state |
| | ticker: Stock ticker |
| | action: Decision action |
| | |
| | Returns: |
| | Dictionary of evidence sources |
| | """ |
| | evidence_sources = {} |
| | |
| | |
| | belief_state = agent_state.get("belief_state", {}) |
| | if ticker in belief_state: |
| | source_id = f"belief:{ticker}" |
| | evidence_sources[source_id] = { |
| | "type": "belief", |
| | "ticker": ticker, |
| | "value": belief_state[ticker], |
| | "description": f"Belief about {ticker}", |
| | } |
| | |
| | |
| | working_memory = agent_state.get("working_memory", {}) |
| | |
| | |
| | if ticker in working_memory: |
| | source_id = f"working_memory:{ticker}" |
| | evidence_sources[source_id] = { |
| | "type": "working_memory", |
| | "ticker": ticker, |
| | "data": working_memory[ticker], |
| | "description": f"Current analysis of {ticker}", |
| | } |
| | |
| | |
| | performance_trace = agent_state.get("performance_trace", {}) |
| | if ticker in performance_trace: |
| | source_id = f"performance:{ticker}" |
| | evidence_sources[source_id] = { |
| | "type": "performance", |
| | "ticker": ticker, |
| | "performance": performance_trace[ticker], |
| | "description": f"Performance history of {ticker}", |
| | } |
| | |
| | |
| | decision_history = agent_state.get("decision_history", []) |
| | for i, decision in enumerate(decision_history): |
| | if decision.get("ticker") == ticker and decision.get("action") == action: |
| | source_id = f"past_decision:{i}:{ticker}" |
| | evidence_sources[source_id] = { |
| | "type": "past_decision", |
| | "ticker": ticker, |
| | "action": action, |
| | "decision": decision, |
| | "description": f"Past {action} decision for {ticker}", |
| | } |
| | |
| | return evidence_sources |
| | |
| | def _extract_reasoning_steps(self, reasoning: str) -> List[Dict[str, Any]]: |
| | """ |
| | Extract reasoning steps from reasoning string. |
| | |
| | Args: |
| | reasoning: Reasoning string |
| | |
| | Returns: |
| | List of reasoning steps |
| | """ |
| | |
| | sentences = [s.strip() for s in reasoning.replace('\n', '. ').split('.') if s.strip()] |
| | |
| | reasoning_steps = [] |
| | for i, sentence in enumerate(sentences): |
| | step_id = f"step:{i}" |
| | reasoning_steps.append({ |
| | "id": step_id, |
| | "text": sentence, |
| | "position": i, |
| | "type": "reasoning_step", |
| | }) |
| | |
| | return reasoning_steps |
| | |
| | def _generate_attribution_chains(self, decision_id: str, evidence_sources: Dict[str, Dict[str, Any]], |
| | reasoning_steps: List[Dict[str, Any]], intent: str, value_basis: str, |
| | confidence: float, reasoning_depth: int) -> List[AttributionChain]: |
| | """ |
| | Generate attribution chains linking decision to evidence. |
| | |
| | Args: |
| | decision_id: Decision ID |
| | evidence_sources: Evidence sources |
| | reasoning_steps: Reasoning steps |
| | intent: Decision intent |
| | value_basis: Value basis for decision |
| | confidence: Decision confidence |
| | reasoning_depth: Depth of attribution tracing |
| | |
| | Returns: |
| | List of attribution chains |
| | """ |
| | attribution_chains = [] |
| | |
| | |
| | end_point = decision_id |
| | |
| | |
| | for source_id, source_data in evidence_sources.items(): |
| | |
| | entry = AttributionEntry( |
| | source=source_id, |
| | source_type=source_data.get("type", "evidence"), |
| | target=decision_id, |
| | weight=self._calculate_evidence_weight(source_data, confidence), |
| | confidence=confidence, |
| | description=f"Direct influence of {source_data.get('description', source_id)} on decision", |
| | ) |
| | |
| | |
| | chain = AttributionChain( |
| | entries=[entry], |
| | start_point=source_id, |
| | end_point=end_point, |
| | total_weight=entry.weight, |
| | confidence=entry.confidence, |
| | ) |
| | |
| | attribution_chains.append(chain) |
| | |
| | |
| | if reasoning_steps: |
| | |
| | for source_id, source_data in evidence_sources.items(): |
| | |
| | for step in reasoning_steps[:reasoning_depth]: |
| | |
| | step_entry = AttributionEntry( |
| | source=source_id, |
| | source_type=source_data.get("type", "evidence"), |
| | target=step["id"], |
| | weight=self._calculate_step_relevance(source_data, step), |
| | confidence=confidence * 0.9, |
| | description=f"Influence of {source_data.get('description', source_id)} on reasoning step", |
| | ) |
| | |
| | |
| | decision_entry = AttributionEntry( |
| | source=step["id"], |
| | source_type="reasoning_step", |
| | target=decision_id, |
| | weight=self._calculate_step_importance(step, len(reasoning_steps)), |
| | confidence=confidence, |
| | description=f"Influence of reasoning step on decision", |
| | ) |
| | |
| | |
| | chain = AttributionChain( |
| | entries=[step_entry, decision_entry], |
| | start_point=source_id, |
| | end_point=end_point, |
| | total_weight=step_entry.weight * decision_entry.weight, |
| | confidence=min(step_entry.confidence, decision_entry.confidence), |
| | ) |
| | |
| | attribution_chains.append(chain) |
| | |
| | |
| | if intent: |
| | intent_id = f"intent:{intent[:20]}" |
| | intent_entry = AttributionEntry( |
| | source=intent_id, |
| | source_type="intent", |
| | target=decision_id, |
| | weight=0.8, |
| | confidence=confidence, |
| | description=f"Influence of stated intent on decision", |
| | ) |
| | |
| | intent_chain = AttributionChain( |
| | entries=[intent_entry], |
| | start_point=intent_id, |
| | end_point=end_point, |
| | total_weight=intent_entry.weight, |
| | confidence=intent_entry.confidence, |
| | ) |
| | |
| | attribution_chains.append(intent_chain) |
| | |
| | if value_basis: |
| | value_id = f"value:{value_basis[:20]}" |
| | value_entry = AttributionEntry( |
| | source=value_id, |
| | source_type="value", |
| | target=decision_id, |
| | weight=0.9, |
| | confidence=confidence, |
| | description=f"Influence of value basis on decision", |
| | value_alignment=1.0, |
| | ) |
| | |
| | value_chain = AttributionChain( |
| | entries=[value_entry], |
| | start_point=value_id, |
| | end_point=end_point, |
| | total_weight=value_entry.weight, |
| | confidence=value_entry.confidence, |
| | ) |
| | |
| | attribution_chains.append(value_chain) |
| | |
| | return attribution_chains |
| | |
| | def _calculate_evidence_weight(self, evidence: Dict[str, Any], base_confidence: float) -> float: |
| | """ |
| | Calculate weight of evidence. |
| | |
| | Args: |
| | evidence: Evidence data |
| | base_confidence: Base confidence level |
| | |
| | Returns: |
| | Evidence weight |
| | """ |
| | |
| | weight = 0.5 |
| | |
| | |
| | evidence_type = evidence.get("type", "") |
| | |
| | if evidence_type == "belief": |
| | |
| | belief_value = evidence.get("value", 0.5) |
| | weight = 0.5 + (abs(belief_value - 0.5) * 0.5) |
| | |
| | elif evidence_type == "working_memory": |
| | |
| | weight = 0.8 |
| | |
| | elif evidence_type == "performance": |
| | |
| | weight = 0.7 |
| | |
| | elif evidence_type == "past_decision": |
| | |
| | weight = 0.6 |
| | |
| | |
| | weight *= base_confidence |
| | |
| | return min(1.0, weight) |
| | |
| | def _calculate_step_relevance(self, evidence: Dict[str, Any], step: Dict[str, Any]) -> float: |
| | """ |
| | Calculate relevance of evidence to reasoning step. |
| | |
| | Args: |
| | evidence: Evidence data |
| | step: Reasoning step |
| | |
| | Returns: |
| | Relevance weight |
| | """ |
| | |
| | evidence_desc = evidence.get("description", "") |
| | step_text = step.get("text", "") |
| | |
| | |
| | ticker = evidence.get("ticker", "") |
| | if ticker and ticker in step_text: |
| | return 0.8 |
| | |
| | |
| | evidence_words = set(evidence_desc.lower().split()) |
| | step_words = set(step_text.lower().split()) |
| | |
| | overlap = len(evidence_words.intersection(step_words)) |
| | total_words = len(evidence_words.union(step_words)) |
| | |
| | if total_words > 0: |
| | overlap_ratio = overlap / total_words |
| | return min(1.0, 0.5 + overlap_ratio) |
| | |
| | return 0.5 |
| | |
| | def _calculate_step_importance(self, step: Dict[str, Any], total_steps: int) -> float: |
| | """ |
| | Calculate importance of reasoning step. |
| | |
| | Args: |
| | step: Reasoning step |
| | total_steps: Total number of steps |
| | |
| | Returns: |
| | Importance weight |
| | """ |
| | |
| | position = step.get("position", 0) |
| | position_weight = 0.5 + (position / (2 * total_steps)) if total_steps > 0 else 0.5 |
| | |
| | |
| | text = step.get("text", "") |
| | length = len(text) |
| | length_weight = min(1.0, 0.5 + (length / 200)) |
| | |
| | |
| | return (position_weight * 0.7) + (length_weight * 0.3) |
| | |
| | def _get_top_attribution_factors(self, source_contributions: Dict[str, float], limit: int = 5) -> List[Dict[str, Any]]: |
| | """ |
| | Get top attribution factors. |
| | |
| | Args: |
| | source_contributions: Source contribution dictionary |
| | limit: Maximum number of factors to return |
| | |
| | Returns: |
| | List of top attribution factors |
| | """ |
| | |
| | sorted_contributions = sorted( |
| | source_contributions.items(), |
| | key=lambda x: x[1], |
| | reverse=True |
| | ) |
| | |
| | |
| | top_factors = [] |
| | for source, weight in sorted_contributions[:limit]: |
| | |
| | source_type = source.split(":", 1)[0] if ":" in source else "unknown" |
| | |
| | top_factors.append({ |
| | "source": source, |
| | "type": source_type, |
| | "weight": weight, |
| | }) |
| | |
| | return top_factors |
| | |
| | def _calculate_value_alignment(self, value_basis: str, source_contributions: Dict[str, float]) -> float: |
| | """ |
| | Calculate value alignment score. |
| | |
| | Args: |
| | value_basis: Value basis string |
| | source_contributions: Source contribution dictionary |
| | |
| | Returns: |
| | Value alignment score |
| | """ |
| | |
| | value_alignment = 0.5 |
| | |
| | |
| | value_sources = [source for source in source_contributions if source.startswith("value:")] |
| | |
| | if value_sources: |
| | |
| | value_contribution = sum(source_contributions[source] for source in value_sources) |
| | |
| | |
| | value_alignment = 0.5 + (value_contribution * 0.5) |
| | |
| | return min(1.0, value_alignment) |
| | |
| | def get_trace(self, trace_id: str) -> Optional[Dict[str, Any]]: |
| | """ |
| | Get attribution trace by ID. |
| | |
| | Args: |
| | trace_id: Trace ID |
| | |
| | Returns: |
| | Attribution trace or None if not found |
| | """ |
| | if trace_id not in self.trace_registry: |
| | return None |
| | |
| | trace_data = self.trace_registry[trace_id] |
| | attribution_graph = trace_data.get("attribution_graph") |
| | |
| | if not attribution_graph: |
| | return None |
| | |
| | |
| | source_contributions = attribution_graph.calculate_source_contributions() |
| | |
| | |
| | attribution_trace = { |
| | "trace_id": trace_id, |
| | "decision_id": attribution_graph.decision_id, |
| | "attribution_map": source_contributions, |
| | "top_factors": self._get_top_attribution_factors(source_contributions, 5), |
| | "chains": len(attribution_graph.chains), |
| | "sources": len(attribution_graph.sources), |
| | "timestamp": trace_data.get("timestamp", datetime.datetime.now()).isoformat(), |
| | } |
| | |
| | return attribution_trace |
| | |
| | def get_decision_traces(self, decision_id: str) -> List[str]: |
| | """ |
| | Get trace IDs for a decision. |
| | |
| | Args: |
| | decision_id: Decision ID |
| | |
| | Returns: |
| | List of trace IDs |
| | """ |
| | return [trace_id for trace_id, trace_data in self.trace_registry.items() |
| | if trace_data.get("decision_id") == decision_id] |
| | |
| | def visualize_attribution(self, trace_id: str) -> Dict[str, Any]: |
| | """ |
| | Generate attribution visualization data. |
| | |
| | Args: |
| | trace_id: Trace ID |
| | |
| | Returns: |
| | Visualization data |
| | """ |
| | if trace_id not in self.trace_registry: |
| | return {"error": "Trace not found"} |
| | |
| | trace_data = self.trace_registry[trace_id] |
| | attribution_graph = trace_data.get("attribution_graph") |
| | |
| | if not attribution_graph: |
| | return {"error": "Attribution graph not found"} |
| | |
| | |
| | nodes = [] |
| | links = [] |
| | |
| | |
| | decision_id = attribution_graph.decision_id |
| | nodes.append({ |
| | "id": decision_id, |
| | "type": "decision", |
| | "label": "Decision", |
| | "size": 15, |
| | }) |
| | |
| | |
| | for chain_idx, chain in enumerate(attribution_graph.chains): |
| | |
| | source_id = chain.start_point |
| | if not any(node["id"] == source_id for node in nodes): |
| | |
| | source_type = "unknown" |
| | if source_id.startswith("belief:"): |
| | source_type = "belief" |
| | elif source_id.startswith("working_memory:"): |
| | source_type = "working_memory" |
| | elif source_id.startswith("performance:"): |
| | source_type = "performance" |
| | elif source_id.startswith("past_decision:"): |
| | source_type = "past_decision" |
| | elif source_id.startswith("intent:"): |
| | source_type = "intent" |
| | elif source_id.startswith("value:"): |
| | source_type = "value" |
| | |
| | |
| | nodes.append({ |
| | "id": source_id, |
| | "type": source_type, |
| | "label": source_id.split(":", 1)[1] if ":" in source_id else source_id, |
| | "size": 10, |
| | }) |
| | |
| | |
| | prev_node_id = None |
| | for entry_idx, entry in enumerate(chain.entries): |
| | source_node_id = entry.source |
| | target_node_id = entry.target |
| | |
| | |
| | if entry.source_type == "reasoning_step" and not any(node["id"] == source_node_id for node in nodes): |
| | nodes.append({ |
| | "id": source_node_id, |
| | "type": "reasoning_step", |
| | "label": f"Step {source_node_id.split(':', 1)[1] if ':' in source_node_id else source_node_id}", |
| | "size": 8, |
| | }) |
| | |
| | |
| | links.append({ |
| | "source": source_node_id, |
| | "target": target_node_id, |
| | "value": entry.weight, |
| | "confidence": entry.confidence, |
| | "label": entry.description if entry.description else f"Weight: {entry.weight:.2f}", |
| | }) |
| | |
| | prev_node_id = target_node_id |
| | |
| | |
| | visualization = { |
| | "nodes": nodes, |
| | "links": links, |
| | "trace_id": trace_id, |
| | "decision_id": decision_id, |
| | } |
| | |
| | return visualization |
| | |
| | def set_value_weights(self, value_weights: Dict[str, float]) -> None: |
| | """ |
| | Set weights for different values. |
| | |
| | Args: |
| | value_weights: Dictionary mapping value names to weights |
| | """ |
| | self.value_weights = value_weights.copy() |
| | |
| | def clear_history(self, before_timestamp: Optional[datetime.datetime] = None) -> int: |
| | """ |
| | Clear attribution history. |
| | |
| | Args: |
| | before_timestamp: Optional timestamp to clear history before |
| | |
| | Returns: |
| | Number of entries cleared |
| | """ |
| | if before_timestamp is None: |
| | |
| | count = len(self.attribution_history) |
| | self.attribution_history = {} |
| | self.trace_registry = {} |
| | return count |
| | |
| | |
| | to_remove_history = [] |
| | to_remove_registry = [] |
| | |
| | for decision_id, graph in self.attribution_history.items(): |
| | if graph.timestamp < before_timestamp: |
| | to_remove_history.append(decision_id) |
| | |
| | for trace_id, trace_data in self.trace_registry.items(): |
| | if trace_data.get("timestamp", datetime.datetime.now()) < before_timestamp: |
| | to_remove_registry.append(trace_id) |
| | |
| | |
| | for decision_id in to_remove_history: |
| | del self.attribution_history[decision_id] |
| | |
| | |
| | for trace_id in to_remove_registry: |
| | del self.trace_registry[trace_id] |
| | |
| | return len(to_remove_history) + len(to_remove_registry) |
| |
|