File size: 28,620 Bytes
c5828bc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 | """
AttributionTracer - Decision Provenance and Causal Tracing Framework
This module implements the attribution tracing architecture that enables
transparent decision provenance for all agents in the AGI-HEDGE-FUND system.
Key capabilities:
- Multi-level attribution across reasoning chains
- Causal tracing from decision back to evidence
- Confidence weighting of attribution factors
- Value-weighted attribution alignment
- Attribution visualization for interpretability
Internal Note: The attribution tracer encodes the ECHO-ATTRIBUTION and ATTRIBUTION-REFLECT
interpretability shells for causal path tracing and attribution transparency.
"""
import datetime
import uuid
import math
from typing import Dict, List, Any, Optional, Tuple, Set
import numpy as np
from collections import defaultdict
from pydantic import BaseModel, Field
class AttributionEntry(BaseModel):
"""Single attribution entry linking a decision to a cause."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
source: str = Field(...) # Source ID (e.g., memory ID, evidence ID)
source_type: str = Field(...) # Type of source (e.g., "memory", "evidence", "reasoning")
target: str = Field(...) # Target ID (e.g., decision ID, reasoning step)
weight: float = Field(default=1.0) # Attribution weight (0-1)
confidence: float = Field(default=1.0) # Confidence in attribution (0-1)
timestamp: datetime.datetime = Field(default_factory=datetime.datetime.now)
description: Optional[str] = Field(default=None) # Optional attribution description
value_alignment: Optional[float] = Field(default=None) # Alignment with agent values (0-1)
class AttributionChain(BaseModel):
"""Chain of attribution entries forming a causal path."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
entries: List[AttributionEntry] = Field(default_factory=list)
start_point: str = Field(...) # ID of chain origin
end_point: str = Field(...) # ID of chain destination
total_weight: float = Field(default=1.0) # Product of weights along chain
confidence: float = Field(default=1.0) # Overall chain confidence
timestamp: datetime.datetime = Field(default_factory=datetime.datetime.now)
class AttributionGraph(BaseModel):
"""Complete attribution graph for a decision."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
decision_id: str = Field(...) # ID of the decision being attributed
chains: List[AttributionChain] = Field(default_factory=list)
sources: Dict[str, Dict[str, Any]] = Field(default_factory=dict) # Source metadata
timestamp: datetime.datetime = Field(default_factory=datetime.datetime.now)
def add_chain(self, chain: AttributionChain) -> None:
"""Add attribution chain to graph."""
self.chains.append(chain)
def add_source(self, source_id: str, metadata: Dict[str, Any]) -> None:
"""Add source metadata to graph."""
self.sources[source_id] = metadata
def calculate_source_contributions(self) -> Dict[str, float]:
"""Calculate normalized contribution of each source to decision."""
# Initialize contributions
contributions = defaultdict(float)
# Sum weights from all chains
for chain in self.chains:
for entry in chain.entries:
# Add contribution weighted by chain confidence
contributions[entry.source] += entry.weight * chain.confidence
# Normalize contributions
total = sum(contributions.values())
if total > 0:
for source in contributions:
contributions[source] /= total
return dict(contributions)
class AttributionTracer:
"""
Attribution tracing engine for causal decision provenance.
Enables:
- Tracing the causal path from decisions back to evidence
- Weighting attribution factors by confidence and relevance
- Aligning attribution with agent value system
- Visualizing attribution patterns for interpretability
"""
def __init__(self):
"""Initialize attribution tracer."""
self.attribution_history: Dict[str, AttributionGraph] = {}
self.trace_registry: Dict[str, Dict[str, Any]] = {}
self.value_weights: Dict[str, float] = {}
def trace_attribution(self, signal: Dict[str, Any], agent_state: Dict[str, Any],
reasoning_depth: int = 3) -> Dict[str, Any]:
"""
Trace attribution for a decision signal.
Args:
signal: Decision signal
agent_state: Agent's current state
reasoning_depth: Depth of attribution tracing
Returns:
Attribution trace results
"""
# Generate decision ID if not present
decision_id = signal.get("signal_id", str(uuid.uuid4()))
# Create attribution graph
attribution_graph = AttributionGraph(
decision_id=decision_id,
)
# Extract signal components for attribution
ticker = signal.get("ticker", "")
action = signal.get("action", "")
confidence = signal.get("confidence", 0.5)
reasoning = signal.get("reasoning", "")
intent = signal.get("intent", "")
value_basis = signal.get("value_basis", "")
# Extract evidence sources from agent state
evidence_sources = self._extract_evidence_sources(agent_state, ticker, action)
# Process reasoning to extract reasoning steps
reasoning_steps = self._extract_reasoning_steps(reasoning)
# Generate attribution chains
chains = self._generate_attribution_chains(
decision_id=decision_id,
evidence_sources=evidence_sources,
reasoning_steps=reasoning_steps,
intent=intent,
value_basis=value_basis,
confidence=confidence,
reasoning_depth=reasoning_depth
)
# Add chains to graph
for chain in chains:
attribution_graph.add_chain(chain)
# Add source metadata
for source_id, metadata in evidence_sources.items():
attribution_graph.add_source(source_id, metadata)
# Calculate source contributions
source_contributions = attribution_graph.calculate_source_contributions()
# Store in history
# Store in history
self.attribution_history[decision_id] = attribution_graph
# Prepare result
trace_id = str(uuid.uuid4())
# Store trace in registry
self.trace_registry[trace_id] = {
"attribution_graph": attribution_graph,
"decision_id": decision_id,
"timestamp": datetime.datetime.now(),
}
# Create attribution trace output
attribution_trace = {
"trace_id": trace_id,
"decision_id": decision_id,
"attribution_map": source_contributions,
"confidence": confidence,
"top_factors": self._get_top_attribution_factors(source_contributions, 5),
"value_alignment": self._calculate_value_alignment(value_basis, source_contributions),
"reasoning_depth": reasoning_depth,
"timestamp": datetime.datetime.now().isoformat(),
}
return attribution_trace
def _extract_evidence_sources(self, agent_state: Dict[str, Any],
ticker: str, action: str) -> Dict[str, Dict[str, Any]]:
"""
Extract evidence sources from agent state.
Args:
agent_state: Agent's current state
ticker: Stock ticker
action: Decision action
Returns:
Dictionary of evidence sources
"""
evidence_sources = {}
# Extract from belief state
belief_state = agent_state.get("belief_state", {})
if ticker in belief_state:
source_id = f"belief:{ticker}"
evidence_sources[source_id] = {
"type": "belief",
"ticker": ticker,
"value": belief_state[ticker],
"description": f"Belief about {ticker}",
}
# Extract from working memory
working_memory = agent_state.get("working_memory", {})
# Check for ticker-specific data in working memory
if ticker in working_memory:
source_id = f"working_memory:{ticker}"
evidence_sources[source_id] = {
"type": "working_memory",
"ticker": ticker,
"data": working_memory[ticker],
"description": f"Current analysis of {ticker}",
}
# Extract from performance trace if action is based on past performance
performance_trace = agent_state.get("performance_trace", {})
if ticker in performance_trace:
source_id = f"performance:{ticker}"
evidence_sources[source_id] = {
"type": "performance",
"ticker": ticker,
"performance": performance_trace[ticker],
"description": f"Performance history of {ticker}",
}
# Extract from decision history
decision_history = agent_state.get("decision_history", [])
for i, decision in enumerate(decision_history):
if decision.get("ticker") == ticker and decision.get("action") == action:
source_id = f"past_decision:{i}:{ticker}"
evidence_sources[source_id] = {
"type": "past_decision",
"ticker": ticker,
"action": action,
"decision": decision,
"description": f"Past {action} decision for {ticker}",
}
return evidence_sources
def _extract_reasoning_steps(self, reasoning: str) -> List[Dict[str, Any]]:
"""
Extract reasoning steps from reasoning string.
Args:
reasoning: Reasoning string
Returns:
List of reasoning steps
"""
# Simple implementation: split by periods or line breaks
sentences = [s.strip() for s in reasoning.replace('\n', '. ').split('.') if s.strip()]
reasoning_steps = []
for i, sentence in enumerate(sentences):
step_id = f"step:{i}"
reasoning_steps.append({
"id": step_id,
"text": sentence,
"position": i,
"type": "reasoning_step",
})
return reasoning_steps
def _generate_attribution_chains(self, decision_id: str, evidence_sources: Dict[str, Dict[str, Any]],
reasoning_steps: List[Dict[str, Any]], intent: str, value_basis: str,
confidence: float, reasoning_depth: int) -> List[AttributionChain]:
"""
Generate attribution chains linking decision to evidence.
Args:
decision_id: Decision ID
evidence_sources: Evidence sources
reasoning_steps: Reasoning steps
intent: Decision intent
value_basis: Value basis for decision
confidence: Decision confidence
reasoning_depth: Depth of attribution tracing
Returns:
List of attribution chains
"""
attribution_chains = []
# Define end point (the decision itself)
end_point = decision_id
# Case 1: Direct evidence -> decision chains
for source_id, source_data in evidence_sources.items():
# Create entry linking evidence directly to decision
entry = AttributionEntry(
source=source_id,
source_type=source_data.get("type", "evidence"),
target=decision_id,
weight=self._calculate_evidence_weight(source_data, confidence),
confidence=confidence,
description=f"Direct influence of {source_data.get('description', source_id)} on decision",
)
# Create chain
chain = AttributionChain(
entries=[entry],
start_point=source_id,
end_point=end_point,
total_weight=entry.weight,
confidence=entry.confidence,
)
attribution_chains.append(chain)
# Case 2: Evidence -> reasoning -> decision chains
if reasoning_steps:
# For each evidence source
for source_id, source_data in evidence_sources.items():
# For relevant reasoning steps (limited by depth)
for step in reasoning_steps[:reasoning_depth]:
# Create entry linking evidence to reasoning step
step_entry = AttributionEntry(
source=source_id,
source_type=source_data.get("type", "evidence"),
target=step["id"],
weight=self._calculate_step_relevance(source_data, step),
confidence=confidence * 0.9, # Slightly lower confidence for indirect paths
description=f"Influence of {source_data.get('description', source_id)} on reasoning step",
)
# Create entry linking reasoning step to decision
decision_entry = AttributionEntry(
source=step["id"],
source_type="reasoning_step",
target=decision_id,
weight=self._calculate_step_importance(step, len(reasoning_steps)),
confidence=confidence,
description=f"Influence of reasoning step on decision",
)
# Create chain
chain = AttributionChain(
entries=[step_entry, decision_entry],
start_point=source_id,
end_point=end_point,
total_weight=step_entry.weight * decision_entry.weight,
confidence=min(step_entry.confidence, decision_entry.confidence),
)
attribution_chains.append(chain)
# Case 3: Intent/value -> decision chains
if intent:
intent_id = f"intent:{intent[:20]}"
intent_entry = AttributionEntry(
source=intent_id,
source_type="intent",
target=decision_id,
weight=0.8, # High weight for intent
confidence=confidence,
description=f"Influence of stated intent on decision",
)
intent_chain = AttributionChain(
entries=[intent_entry],
start_point=intent_id,
end_point=end_point,
total_weight=intent_entry.weight,
confidence=intent_entry.confidence,
)
attribution_chains.append(intent_chain)
if value_basis:
value_id = f"value:{value_basis[:20]}"
value_entry = AttributionEntry(
source=value_id,
source_type="value",
target=decision_id,
weight=0.9, # Very high weight for value basis
confidence=confidence,
description=f"Influence of value basis on decision",
value_alignment=1.0, # Perfect alignment with its own value
)
value_chain = AttributionChain(
entries=[value_entry],
start_point=value_id,
end_point=end_point,
total_weight=value_entry.weight,
confidence=value_entry.confidence,
)
attribution_chains.append(value_chain)
return attribution_chains
def _calculate_evidence_weight(self, evidence: Dict[str, Any], base_confidence: float) -> float:
"""
Calculate weight of evidence.
Args:
evidence: Evidence data
base_confidence: Base confidence level
Returns:
Evidence weight
"""
# Default weight
weight = 0.5
# Adjust based on evidence type
evidence_type = evidence.get("type", "")
if evidence_type == "belief":
# Weight based on belief strength (0.5-1.0)
belief_value = evidence.get("value", 0.5)
weight = 0.5 + (abs(belief_value - 0.5) * 0.5)
elif evidence_type == "working_memory":
# Working memory has high weight
weight = 0.8
elif evidence_type == "performance":
# Performance data moderately important
weight = 0.7
elif evidence_type == "past_decision":
# Past decisions less important
weight = 0.6
# Scale by confidence
weight *= base_confidence
return min(1.0, weight)
def _calculate_step_relevance(self, evidence: Dict[str, Any], step: Dict[str, Any]) -> float:
"""
Calculate relevance of evidence to reasoning step.
Args:
evidence: Evidence data
step: Reasoning step
Returns:
Relevance weight
"""
# Basic implementation using text overlap
evidence_desc = evidence.get("description", "")
step_text = step.get("text", "")
# Check for ticker mention
ticker = evidence.get("ticker", "")
if ticker and ticker in step_text:
return 0.8
# Check for word overlap
evidence_words = set(evidence_desc.lower().split())
step_words = set(step_text.lower().split())
overlap = len(evidence_words.intersection(step_words))
total_words = len(evidence_words.union(step_words))
if total_words > 0:
overlap_ratio = overlap / total_words
return min(1.0, 0.5 + overlap_ratio)
return 0.5
def _calculate_step_importance(self, step: Dict[str, Any], total_steps: int) -> float:
"""
Calculate importance of reasoning step.
Args:
step: Reasoning step
total_steps: Total number of steps
Returns:
Importance weight
"""
# Position-based importance (later steps slightly more important)
position = step.get("position", 0)
position_weight = 0.5 + (position / (2 * total_steps)) if total_steps > 0 else 0.5
# Length-based importance (longer steps slightly more important)
text = step.get("text", "")
length = len(text)
length_weight = min(1.0, 0.5 + (length / 200)) # Cap at 1.0
# Combine weights
return (position_weight * 0.7) + (length_weight * 0.3)
def _get_top_attribution_factors(self, source_contributions: Dict[str, float], limit: int = 5) -> List[Dict[str, Any]]:
"""
Get top attribution factors.
Args:
source_contributions: Source contribution dictionary
limit: Maximum number of factors to return
Returns:
List of top attribution factors
"""
# Sort contributions by weight (descending)
sorted_contributions = sorted(
source_contributions.items(),
key=lambda x: x[1],
reverse=True
)
# Take top 'limit' contributions
top_factors = []
for source, weight in sorted_contributions[:limit]:
# Parse source type from ID
source_type = source.split(":", 1)[0] if ":" in source else "unknown"
top_factors.append({
"source": source,
"type": source_type,
"weight": weight,
})
return top_factors
def _calculate_value_alignment(self, value_basis: str, source_contributions: Dict[str, float]) -> float:
"""
Calculate value alignment score.
Args:
value_basis: Value basis string
source_contributions: Source contribution dictionary
Returns:
Value alignment score
"""
# Simple implementation: check if value sources have high contribution
value_alignment = 0.5 # Default neutral alignment
# Find value-based sources
value_sources = [source for source in source_contributions if source.startswith("value:")]
if value_sources:
# Calculate contribution of value sources
value_contribution = sum(source_contributions[source] for source in value_sources)
# Value alignment increases with value contribution
value_alignment = 0.5 + (value_contribution * 0.5)
return min(1.0, value_alignment)
def get_trace(self, trace_id: str) -> Optional[Dict[str, Any]]:
"""
Get attribution trace by ID.
Args:
trace_id: Trace ID
Returns:
Attribution trace or None if not found
"""
if trace_id not in self.trace_registry:
return None
trace_data = self.trace_registry[trace_id]
attribution_graph = trace_data.get("attribution_graph")
if not attribution_graph:
return None
# Calculate source contributions
source_contributions = attribution_graph.calculate_source_contributions()
# Create attribution trace output
attribution_trace = {
"trace_id": trace_id,
"decision_id": attribution_graph.decision_id,
"attribution_map": source_contributions,
"top_factors": self._get_top_attribution_factors(source_contributions, 5),
"chains": len(attribution_graph.chains),
"sources": len(attribution_graph.sources),
"timestamp": trace_data.get("timestamp", datetime.datetime.now()).isoformat(),
}
return attribution_trace
def get_decision_traces(self, decision_id: str) -> List[str]:
"""
Get trace IDs for a decision.
Args:
decision_id: Decision ID
Returns:
List of trace IDs
"""
return [trace_id for trace_id, trace_data in self.trace_registry.items()
if trace_data.get("decision_id") == decision_id]
def visualize_attribution(self, trace_id: str) -> Dict[str, Any]:
"""
Generate attribution visualization data.
Args:
trace_id: Trace ID
Returns:
Visualization data
"""
if trace_id not in self.trace_registry:
return {"error": "Trace not found"}
trace_data = self.trace_registry[trace_id]
attribution_graph = trace_data.get("attribution_graph")
if not attribution_graph:
return {"error": "Attribution graph not found"}
# Create nodes and links for visualization
nodes = []
links = []
# Add decision node
decision_id = attribution_graph.decision_id
nodes.append({
"id": decision_id,
"type": "decision",
"label": "Decision",
"size": 15,
})
# Process all chains
for chain_idx, chain in enumerate(attribution_graph.chains):
# Add source node if not already added
source_id = chain.start_point
if not any(node["id"] == source_id for node in nodes):
# Determine source type
source_type = "unknown"
if source_id.startswith("belief:"):
source_type = "belief"
elif source_id.startswith("working_memory:"):
source_type = "working_memory"
elif source_id.startswith("performance:"):
source_type = "performance"
elif source_id.startswith("past_decision:"):
source_type = "past_decision"
elif source_id.startswith("intent:"):
source_type = "intent"
elif source_id.startswith("value:"):
source_type = "value"
# Add source node
nodes.append({
"id": source_id,
"type": source_type,
"label": source_id.split(":", 1)[1] if ":" in source_id else source_id,
"size": 10,
})
# Process chain entries
prev_node_id = None
for entry_idx, entry in enumerate(chain.entries):
source_node_id = entry.source
target_node_id = entry.target
# Add intermediate nodes if not already added
if entry.source_type == "reasoning_step" and not any(node["id"] == source_node_id for node in nodes):
nodes.append({
"id": source_node_id,
"type": "reasoning_step",
"label": f"Step {source_node_id.split(':', 1)[1] if ':' in source_node_id else source_node_id}",
"size": 8,
})
# Add link
links.append({
"source": source_node_id,
"target": target_node_id,
"value": entry.weight,
"confidence": entry.confidence,
"label": entry.description if entry.description else f"Weight: {entry.weight:.2f}",
})
prev_node_id = target_node_id
# Create visualization data
visualization = {
"nodes": nodes,
"links": links,
"trace_id": trace_id,
"decision_id": decision_id,
}
return visualization
def set_value_weights(self, value_weights: Dict[str, float]) -> None:
"""
Set weights for different values.
Args:
value_weights: Dictionary mapping value names to weights
"""
self.value_weights = value_weights.copy()
def clear_history(self, before_timestamp: Optional[datetime.datetime] = None) -> int:
"""
Clear attribution history.
Args:
before_timestamp: Optional timestamp to clear history before
Returns:
Number of entries cleared
"""
if before_timestamp is None:
# Clear all history
count = len(self.attribution_history)
self.attribution_history = {}
self.trace_registry = {}
return count
# Clear history before timestamp
to_remove_history = []
to_remove_registry = []
for decision_id, graph in self.attribution_history.items():
if graph.timestamp < before_timestamp:
to_remove_history.append(decision_id)
for trace_id, trace_data in self.trace_registry.items():
if trace_data.get("timestamp", datetime.datetime.now()) < before_timestamp:
to_remove_registry.append(trace_id)
# Remove from history
for decision_id in to_remove_history:
del self.attribution_history[decision_id]
# Remove from registry
for trace_id in to_remove_registry:
del self.trace_registry[trace_id]
return len(to_remove_history) + len(to_remove_registry)
|