| | """ |
| | glyph_mapper.py |
| | |
| | Core implementation of the Glyph Mapper module for the glyphs framework. |
| | This module transforms attribution traces, residue patterns, and attention |
| | flows into symbolic glyph representations that visualize latent spaces. |
| | """ |
| |
|
| | import logging |
| | import time |
| | import numpy as np |
| | from typing import Dict, List, Optional, Tuple, Union, Any, Set |
| | from dataclasses import dataclass, field |
| | import json |
| | import hashlib |
| | from pathlib import Path |
| | from enum import Enum |
| | import networkx as nx |
| | import matplotlib.pyplot as plt |
| | from scipy.spatial import distance |
| | from sklearn.manifold import TSNE |
| | from sklearn.cluster import DBSCAN |
| |
|
| | from ..models.adapter import ModelAdapter |
| | from ..attribution.tracer import AttributionMap, AttributionType, AttributionLink |
| | from ..residue.patterns import ResiduePattern, ResidueRegistry |
| | from ..utils.visualization_utils import VisualizationEngine |
| |
|
| | |
| | logger = logging.getLogger("glyphs.glyph_mapper") |
| | logger.setLevel(logging.INFO) |
| |
|
| |
|
| | class GlyphType(Enum): |
| | """Types of glyphs for different interpretability functions.""" |
| | ATTRIBUTION = "attribution" |
| | ATTENTION = "attention" |
| | RESIDUE = "residue" |
| | SALIENCE = "salience" |
| | COLLAPSE = "collapse" |
| | RECURSIVE = "recursive" |
| | META = "meta" |
| | SENTINEL = "sentinel" |
| |
|
| |
|
| | class GlyphSemantic(Enum): |
| | """Semantic dimensions captured by glyphs.""" |
| | STRENGTH = "strength" |
| | DIRECTION = "direction" |
| | STABILITY = "stability" |
| | COMPLEXITY = "complexity" |
| | RECURSION = "recursion" |
| | CERTAINTY = "certainty" |
| | TEMPORAL = "temporal" |
| | EMERGENCE = "emergence" |
| |
|
| |
|
| | @dataclass |
| | class Glyph: |
| | """A symbolic representation of a pattern in transformer cognition.""" |
| | id: str |
| | symbol: str |
| | type: GlyphType |
| | semantics: List[GlyphSemantic] |
| | position: Tuple[float, float] |
| | size: float |
| | color: str |
| | opacity: float |
| | source_elements: List[Any] = field(default_factory=list) |
| | description: Optional[str] = None |
| | metadata: Dict[str, Any] = field(default_factory=dict) |
| |
|
| |
|
| | @dataclass |
| | class GlyphConnection: |
| | """A connection between glyphs in a glyph map.""" |
| | source_id: str |
| | target_id: str |
| | strength: float |
| | type: str |
| | directed: bool |
| | color: str |
| | width: float |
| | opacity: float |
| | metadata: Dict[str, Any] = field(default_factory=dict) |
| |
|
| |
|
| | @dataclass |
| | class GlyphMap: |
| | """A complete map of glyphs representing transformer cognition.""" |
| | id: str |
| | glyphs: List[Glyph] |
| | connections: List[GlyphConnection] |
| | source_type: str |
| | layout_type: str |
| | dimensions: Tuple[int, int] |
| | scale: float |
| | focal_points: List[str] = field(default_factory=list) |
| | regions: Dict[str, List[str]] = field(default_factory=dict) |
| | metadata: Dict[str, Any] = field(default_factory=dict) |
| |
|
| |
|
| | class GlyphRegistry: |
| | """Registry of available glyphs and their semantics.""" |
| | |
| | def __init__(self): |
| | """Initialize the glyph registry.""" |
| | |
| | self.attribution_glyphs = { |
| | "direct_strong": { |
| | "symbol": "🔍", |
| | "semantics": [GlyphSemantic.STRENGTH, GlyphSemantic.CERTAINTY], |
| | "description": "Strong direct attribution" |
| | }, |
| | "direct_medium": { |
| | "symbol": "🔗", |
| | "semantics": [GlyphSemantic.STRENGTH, GlyphSemantic.CERTAINTY], |
| | "description": "Medium direct attribution" |
| | }, |
| | "direct_weak": { |
| | "symbol": "🧩", |
| | "semantics": [GlyphSemantic.STRENGTH, GlyphSemantic.CERTAINTY], |
| | "description": "Weak direct attribution" |
| | }, |
| | "indirect": { |
| | "symbol": "⤑", |
| | "semantics": [GlyphSemantic.DIRECTION, GlyphSemantic.COMPLEXITY], |
| | "description": "Indirect attribution" |
| | }, |
| | "composite": { |
| | "symbol": "⬥", |
| | "semantics": [GlyphSemantic.COMPLEXITY, GlyphSemantic.EMERGENCE], |
| | "description": "Composite attribution" |
| | }, |
| | "fork": { |
| | "symbol": "🔀", |
| | "semantics": [GlyphSemantic.DIRECTION, GlyphSemantic.COMPLEXITY], |
| | "description": "Attribution fork" |
| | }, |
| | "loop": { |
| | "symbol": "🔄", |
| | "semantics": [GlyphSemantic.RECURSION, GlyphSemantic.COMPLEXITY], |
| | "description": "Attribution loop" |
| | }, |
| | "gap": { |
| | "symbol": "⊟", |
| | "semantics": [GlyphSemantic.CERTAINTY, GlyphSemantic.STABILITY], |
| | "description": "Attribution gap" |
| | } |
| | } |
| | |
| | |
| | self.attention_glyphs = { |
| | "focus": { |
| | "symbol": "🎯", |
| | "semantics": [GlyphSemantic.STRENGTH, GlyphSemantic.CERTAINTY], |
| | "description": "Attention focus point" |
| | }, |
| | "diffuse": { |
| | "symbol": "🌫️", |
| | "semantics": [GlyphSemantic.STRENGTH, GlyphSemantic.CERTAINTY], |
| | "description": "Diffuse attention" |
| | }, |
| | "induction": { |
| | "symbol": "📈", |
| | "semantics": [GlyphSemantic.TEMPORAL, GlyphSemantic.DIRECTION], |
| | "description": "Induction head pattern" |
| | }, |
| | "inhibition": { |
| | "symbol": "🛑", |
| | "semantics": [GlyphSemantic.DIRECTION, GlyphSemantic.STRENGTH], |
| | "description": "Attention inhibition" |
| | }, |
| | "multi_head": { |
| | "symbol": "⟁", |
| | "semantics": [GlyphSemantic.COMPLEXITY, GlyphSemantic.EMERGENCE], |
| | "description": "Multi-head attention pattern" |
| | } |
| | } |
| | |
| | |
| | self.residue_glyphs = { |
| | "memory_decay": { |
| | "symbol": "🌊", |
| | "semantics": [GlyphSemantic.TEMPORAL, GlyphSemantic.STABILITY], |
| | "description": "Memory decay residue" |
| | }, |
| | "value_conflict": { |
| | "symbol": "⚡", |
| | "semantics": [GlyphSemantic.STABILITY, GlyphSemantic.CERTAINTY], |
| | "description": "Value conflict residue" |
| | }, |
| | "ghost_activation": { |
| | "symbol": "👻", |
| | "semantics": [GlyphSemantic.STRENGTH, GlyphSemantic.CERTAINTY], |
| | "description": "Ghost activation residue" |
| | }, |
| | "boundary_hesitation": { |
| | "symbol": "⧋", |
| | "semantics": [GlyphSemantic.CERTAINTY, GlyphSemantic.STABILITY], |
| | "description": "Boundary hesitation residue" |
| | }, |
| | "null_output": { |
| | "symbol": "⊘", |
| | "semantics": [GlyphSemantic.CERTAINTY, GlyphSemantic.STABILITY], |
| | "description": "Null output residue" |
| | } |
| | } |
| | |
| | |
| | self.recursive_glyphs = { |
| | "recursive_aegis": { |
| | "symbol": "🜏", |
| | "semantics": [GlyphSemantic.RECURSION, GlyphSemantic.STABILITY], |
| | "description": "Recursive immunity" |
| | }, |
| | "recursive_seed": { |
| | "symbol": "∴", |
| | "semantics": [GlyphSemantic.RECURSION, GlyphSemantic.EMERGENCE], |
| | "description": "Recursion initiation" |
| | }, |
| | "recursive_exchange": { |
| | "symbol": "⇌", |
| | "semantics": [GlyphSemantic.RECURSION, GlyphSemantic.DIRECTION], |
| | "description": "Bidirectional recursion" |
| | }, |
| | "recursive_mirror": { |
| | "symbol": "🝚", |
| | "semantics": [GlyphSemantic.RECURSION, GlyphSemantic.EMERGENCE], |
| | "description": "Recursive reflection" |
| | }, |
| | "recursive_anchor": { |
| | "symbol": "☍", |
| | "semantics": [GlyphSemantic.RECURSION, GlyphSemantic.STABILITY], |
| | "description": "Stable recursive reference" |
| | } |
| | } |
| | |
| | |
| | self.meta_glyphs = { |
| | "uncertainty": { |
| | "symbol": "❓", |
| | "semantics": [GlyphSemantic.CERTAINTY], |
| | "description": "Uncertainty marker" |
| | }, |
| | "emergence": { |
| | "symbol": "✧", |
| | "semantics": [GlyphSemantic.EMERGENCE, GlyphSemantic.COMPLEXITY], |
| | "description": "Emergent pattern marker" |
| | }, |
| | "collapse_point": { |
| | "symbol": "💥", |
| | "semantics": [GlyphSemantic.STABILITY, GlyphSemantic.CERTAINTY], |
| | "description": "Collapse point marker" |
| | }, |
| | "temporal_marker": { |
| | "symbol": "⧖", |
| | "semantics": [GlyphSemantic.TEMPORAL], |
| | "description": "Temporal sequence marker" |
| | } |
| | } |
| | |
| | |
| | self.sentinel_glyphs = { |
| | "start": { |
| | "symbol": "◉", |
| | "semantics": [GlyphSemantic.DIRECTION], |
| | "description": "Start marker" |
| | }, |
| | "end": { |
| | "symbol": "◯", |
| | "semantics": [GlyphSemantic.DIRECTION], |
| | "description": "End marker" |
| | }, |
| | "boundary": { |
| | "symbol": "⬚", |
| | "semantics": [GlyphSemantic.STABILITY], |
| | "description": "Boundary marker" |
| | }, |
| | "reference": { |
| | "symbol": "✱", |
| | "semantics": [GlyphSemantic.DIRECTION], |
| | "description": "Reference marker" |
| | } |
| | } |
| | |
| | |
| | self.all_glyphs = { |
| | **{f"attribution_{k}": v for k, v in self.attribution_glyphs.items()}, |
| | **{f"attention_{k}": v for k, v in self.attention_glyphs.items()}, |
| | **{f"residue_{k}": v for k, v in self.residue_glyphs.items()}, |
| | **{f"recursive_{k}": v for k, v in self.recursive_glyphs.items()}, |
| | **{f"meta_{k}": v for k, v in self.meta_glyphs.items()}, |
| | **{f"sentinel_{k}": v for k, v in self.sentinel_glyphs.items()} |
| | } |
| | |
| | def get_glyph(self, glyph_id: str) -> Dict[str, Any]: |
| | """Get a glyph by ID.""" |
| | if glyph_id in self.all_glyphs: |
| | return self.all_glyphs[glyph_id] |
| | else: |
| | raise ValueError(f"Unknown glyph ID: {glyph_id}") |
| | |
| | def find_glyphs_by_semantic(self, semantic: GlyphSemantic) -> List[str]: |
| | """Find glyphs that have a specific semantic dimension.""" |
| | return [ |
| | glyph_id for glyph_id, glyph in self.all_glyphs.items() |
| | if semantic in glyph.get("semantics", []) |
| | ] |
| | |
| | def find_glyphs_by_type(self, glyph_type: str) -> List[str]: |
| | """Find glyphs of a specific type.""" |
| | return [ |
| | glyph_id for glyph_id in self.all_glyphs.keys() |
| | if glyph_id.startswith(f"{glyph_type}_") |
| | ] |
| |
|
| |
|
| | class GlyphMapper: |
| | """ |
| | Core glyph mapping system for the glyphs framework. |
| | |
| | This class transforms attribution traces, residue patterns, and attention |
| | flows into symbolic glyph representations that visualize latent spaces. |
| | It serves as the bridge between raw interpretability data and meaningful |
| | symbolic visualization. |
| | """ |
| | |
| | def __init__( |
| | self, |
| | config: Optional[Dict[str, Any]] = None, |
| | visualizer: Optional[VisualizationEngine] = None |
| | ): |
| | """ |
| | Initialize the glyph mapper. |
| | |
| | Parameters: |
| | ----------- |
| | config : Optional[Dict[str, Any]] |
| | Configuration parameters for the mapper |
| | visualizer : Optional[VisualizationEngine] |
| | Visualization engine for glyph visualization |
| | """ |
| | self.config = config or {} |
| | self.visualizer = visualizer |
| | |
| | |
| | self.min_connection_strength = self.config.get("min_connection_strength", 0.1) |
| | self.auto_layout = self.config.get("auto_layout", True) |
| | self.default_layout = self.config.get("default_layout", "force_directed") |
| | self.default_dimensions = self.config.get("default_dimensions", (800, 600)) |
| | self.default_scale = self.config.get("default_scale", 1.0) |
| | self.connection_bundling = self.config.get("connection_bundling", True) |
| | self.color_scheme = self.config.get("color_scheme", "semantic") |
| | |
| | |
| | self.registry = GlyphRegistry() |
| | |
| | |
| | self.glyph_map_history = [] |
| | |
| | logger.info("Glyph mapper initialized") |
| | |
| | def map_attribution( |
| | self, |
| | attribution_map: AttributionMap, |
| | layout_type: Optional[str] = None, |
| | dimensions: Optional[Tuple[int, int]] = None, |
| | scale: Optional[float] = None, |
| | include_tokens: bool = True, |
| | focus_on: Optional[List[str]] = None |
| | ) -> GlyphMap: |
| | """ |
| | Map attribution patterns to glyphs. |
| | |
| | Parameters: |
| | ----------- |
| | attribution_map : AttributionMap |
| | Attribution map to visualize |
| | layout_type : Optional[str] |
| | Type of layout to use |
| | dimensions : Optional[Tuple[int, int]] |
| | Dimensions for visualization |
| | scale : Optional[float] |
| | Scale factor for visualization |
| | include_tokens : bool |
| | Whether to include tokens as sentinel glyphs |
| | focus_on : Optional[List[str]] |
| | Tokens to focus visualization on |
| | |
| | Returns: |
| | -------- |
| | GlyphMap |
| | Glyph map representation of attribution |
| | """ |
| | map_start = time.time() |
| | layout_type = layout_type or self.default_layout |
| | dimensions = dimensions or self.default_dimensions |
| | scale = scale or self.default_scale |
| | |
| | logger.info(f"Mapping attribution to glyphs with {layout_type} layout") |
| | |
| | |
| | map_id = f"attribution_{int(time.time())}_{hashlib.md5(str(attribution_map).encode()).hexdigest()[:8]}" |
| | |
| | |
| | glyph_map = GlyphMap( |
| | id=map_id, |
| | glyphs=[], |
| | connections=[], |
| | source_type="attribution", |
| | layout_type=layout_type, |
| | dimensions=dimensions, |
| | scale=scale, |
| | metadata={ |
| | "attribution_map_id": attribution_map.metadata.get("id", "unknown"), |
| | "prompt": attribution_map.metadata.get("prompt", ""), |
| | "output": attribution_map.metadata.get("output", ""), |
| | "model_id": attribution_map.metadata.get("model_id", "unknown"), |
| | "timestamp": time.time() |
| | } |
| | ) |
| | |
| | |
| | token_glyph_ids = {} |
| | if include_tokens: |
| | |
| | for i, token in enumerate(attribution_map.prompt_tokens): |
| | glyph_id = f"token_prompt_{i}" |
| | glyph = Glyph( |
| | id=glyph_id, |
| | symbol="◆", |
| | type=GlyphType.SENTINEL, |
| | semantics=[GlyphSemantic.DIRECTION], |
| | position=(0, i * 20), |
| | size=5.0, |
| | color="#3498db", |
| | opacity=0.8, |
| | source_elements=[token], |
| | description=f"Prompt token: '{token}'", |
| | metadata={"token_index": i, "token_type": "prompt"} |
| | ) |
| | glyph_map.glyphs.append(glyph) |
| | token_glyph_ids[f"prompt_{i}"] = glyph_id |
| | |
| | |
| | for i, token in enumerate(attribution_map.output_tokens): |
| | glyph_id = f"token_output_{i}" |
| | glyph = Glyph( |
| | id=glyph_id, |
| | symbol="◇", |
| | type=GlyphType.SENTINEL, |
| | semantics=[GlyphSemantic.DIRECTION], |
| | position=(100, i * 20), |
| | size=5.0, |
| | color="#e74c3c", |
| | opacity=0.8, |
| | source_elements=[token], |
| | description=f"Output token: '{token}'", |
| | metadata={"token_index": i, "token_type": "output"} |
| | ) |
| | glyph_map.glyphs.append(glyph) |
| | token_glyph_ids[f"output_{i}"] = glyph_id |
| | |
| | |
| | link_glyphs = {} |
| | for link_idx, link in enumerate(attribution_map.links): |
| | |
| | if link.strength < self.min_connection_strength: |
| | continue |
| | |
| | |
| | if link.attribution_type == AttributionType.DIRECT: |
| | if link.strength > 0.7: |
| | glyph_type = "attribution_direct_strong" |
| | elif link.strength > 0.4: |
| | glyph_type = "attribution_direct_medium" |
| | else: |
| | glyph_type = "attribution_direct_weak" |
| | elif link.attribution_type == AttributionType.INDIRECT: |
| | glyph_type = "attribution_indirect" |
| | elif link.attribution_type == AttributionType.COMPOSITE: |
| | glyph_type = "attribution_composite" |
| | elif link.attribution_type == AttributionType.RECURSIVE: |
| | glyph_type = "recursive_recursive_exchange" |
| | else: |
| | glyph_type = "attribution_direct_weak" |
| | |
| | |
| | glyph_info = self.registry.get_glyph(glyph_type) |
| | |
| | |
| | glyph_id = f"link_{link_idx}" |
| | glyph = Glyph( |
| | id=glyph_id, |
| | symbol=glyph_info["symbol"], |
| | type=GlyphType.ATTRIBUTION, |
| | semantics=glyph_info["semantics"], |
| | position=(50, (link.source_idx + link.target_idx) * 10), |
| | size=10.0 * link.strength, |
| | color=self._get_color_for_attribution(link), |
| | opacity=min(1.0, 0.5 + link.strength / 2), |
| | source_elements=[link], |
| | description=glyph_info["description"], |
| | metadata={ |
| | "link_index": link_idx, |
| | "source_index": link.source_idx, |
| | "target_index": link.target_idx, |
| | "attribution_type": str(link.attribution_type), |
| | "strength": link.strength |
| | } |
| | ) |
| | glyph_map.glyphs.append(glyph) |
| | link_glyphs[link_idx] = glyph_id |
| | |
| | |
| | if include_tokens: |
| | source_glyph_id = token_glyph_ids.get(f"prompt_{link.source_idx}") |
| | target_glyph_id = token_glyph_ids.get(f"output_{link.target_idx}") |
| | |
| | if source_glyph_id and target_glyph_id: |
| | |
| | glyph_map.connections.append(GlyphConnection( |
| | source_id=source_glyph_id, |
| | target_id=glyph_id, |
| | strength=link.strength, |
| | type="attribution_flow", |
| | directed=True, |
| | color="#7f8c8d", |
| | width=1.0 + 2.0 * link.strength, |
| | opacity=0.7 * link.strength |
| | )) |
| | |
| | |
| | glyph_map.connections.append(GlyphConnection( |
| | source_id=glyph_id, |
| | target_id=target_glyph_id, |
| | strength=link.strength, |
| | type="attribution_flow", |
| | directed=True, |
| | color="#7f8c8d", |
| | width=1.0 + 2.0 * link.strength, |
| | opacity=0.7 * link.strength |
| | )) |
| | |
| | |
| | for gap_idx, (start_idx, end_idx) in enumerate(attribution_map.attribution_gaps): |
| | |
| | glyph_info = self.registry.get_glyph("attribution_gap") |
| | glyph_id = f"gap_{gap_idx}" |
| | glyph = Glyph( |
| | id=glyph_id, |
| | symbol=glyph_info["symbol"], |
| | type=GlyphType.ATTRIBUTION, |
| | semantics=glyph_info["semantics"], |
| | position=(50, (start_idx + end_idx) * 10), |
| | size=8.0, |
| | color="#e67e22", |
| | opacity=0.8, |
| | source_elements=[(start_idx, end_idx)], |
| | description=f"Attribution gap between indices {start_idx} and {end_idx}", |
| | metadata={ |
| | "gap_index": gap_idx, |
| | "start_index": start_idx, |
| | "end_index": end_idx |
| | } |
| | ) |
| | glyph_map.glyphs.append(glyph) |
| | |
| | |
| | if include_tokens: |
| | source_glyph_id = token_glyph_ids.get(f"prompt_{start_idx}") |
| | target_glyph_id = token_glyph_ids.get(f"output_{end_idx}") |
| | |
| | if source_glyph_id and target_glyph_id: |
| | |
| | glyph_map.connections.append(GlyphConnection( |
| | source_id=source_glyph_id, |
| | target_id=glyph_id, |
| | strength=0.5, |
| | type="attribution_gap", |
| | directed=True, |
| | color="#e67e22", |
| | width=1.5, |
| | opacity=0.6 |
| | )) |
| | |
| | |
| | glyph_map.connections.append(GlyphConnection( |
| | source_id=glyph_id, |
| | target_id=target_glyph_id, |
| | strength=0.5, |
| | type="attribution_gap", |
| | directed=True, |
| | color="#e67e22", |
| | width=1.5, |
| | opacity=0.6 |
| | )) |
| | |
| | |
| | for collapse_idx, (start_idx, end_idx) in enumerate(attribution_map.collapsed_regions): |
| | |
| | glyph_info = self.registry.get_glyph("meta_collapse_point") |
| | glyph_id = f"collapse_{collapse_idx}" |
| | glyph = Glyph( |
| | id=glyph_id, |
| | symbol=glyph_info["symbol"], |
| | type=GlyphType.COLLAPSE, |
| | semantics=glyph_info["semantics"], |
| | position=(50, (start_idx + end_idx) * 10), |
| | size=12.0, |
| | color="#9b59b6", |
| | opacity=0.9, |
| | source_elements=[(start_idx, end_idx)], |
| | description=f"Attribution collapse between indices {start_idx} and {end_idx}", |
| | metadata={ |
| | "collapse_index": collapse_idx, |
| | "start_index": start_idx, |
| | "end_index": end_idx |
| | } |
| | ) |
| | glyph_map.glyphs.append(glyph) |
| | |
| | |
| | if include_tokens: |
| | |
| | for i in range(start_idx, end_idx + 1): |
| | token_glyph_id = token_glyph_ids.get(f"output_{i}") |
| | if token_glyph_id: |
| | glyph_map.connections.append(GlyphConnection( |
| | source_id=glyph_id, |
| | target_id=token_glyph_id, |
| | strength=0.7, |
| | type="collapse_effect", |
| | directed=True, |
| | color="#9b59b6", |
| | width=1.5, |
| | opacity=0.7 |
| | )) |
| | |
| | |
| | if attribution_map.token_salience: |
| | |
| | salient_tokens = sorted( |
| | attribution_map.token_salience.items(), |
| | key=lambda x: x[1], |
| | reverse=True |
| | )[:5] |
| | |
| | for token_idx, salience in salient_tokens: |
| | |
| | if salience > 0.5: |
| | token_glyph_id = token_glyph_ids.get(f"output_{token_idx}") |
| | if token_glyph_id: |
| | glyph_map.focal_points.append(token_glyph_id) |
| | |
| | |
| | if salience > 0.8: |
| | glyph_info = self.registry.get_glyph("attention_focus") |
| | glyph_id = f"salience_{token_idx}" |
| | glyph = Glyph( |
| | id=glyph_id, |
| | symbol=glyph_info["symbol"], |
| | type=GlyphType.SALIENCE, |
| | semantics=glyph_info["semantics"], |
| | position=(120, token_idx * 20), |
| | size=10.0 * salience, |
| | color="#f1c40f", |
| | opacity=salience, |
| | source_elements=[token_idx], |
| | description=f"High salience token at index {token_idx}", |
| | metadata={ |
| | "token_index": token_idx, |
| | "salience": salience |
| | } |
| | ) |
| | glyph_map.glyphs.append(glyph) |
| | |
| | |
| | glyph_map.connections.append(GlyphConnection( |
| | source_id=glyph_id, |
| | target_id=token_glyph_id, |
| | strength=salience, |
| | type="salience_marker", |
| | directed=False, |
| | color="#f1c40f", |
| | width=2.0 * salience, |
| | opacity=0.8 |
| | )) |
| | |
| | |
| | if self.auto_layout: |
| | glyph_map = self._apply_layout(glyph_map, layout_type) |
| | |
| | |
| | if focus_on: |
| | glyph_map = self._apply_focus(glyph_map, focus_on) |
| | |
| | |
| | map_time = time.time() - map_start |
| | glyph_map.metadata["map_time"] = map_time |
| | |
| | |
| | self.glyph_map_history.append(glyph_map) |
| | |
| | logger.info(f"Attribution mapping completed in {map_time:.2f}s") |
| | return glyph_map |
| | |
| | def map_residue_patterns( |
| | self, |
| | residue_patterns: List[ResiduePattern], |
| | layout_type: Optional[str] = None, |
| | dimensions: Optional[Tuple[int, int]] = None, |
| | scale: Optional[float] = None, |
| | cluster_patterns: bool = True |
| | ) -> |
| | def map_residue_patterns( |
| | self, |
| | residue_patterns: List[ResiduePattern], |
| | layout_type: Optional[str] = None, |
| | dimensions: Optional[Tuple[int, int]] = None, |
| | scale: Optional[float] = None, |
| | cluster_patterns: bool = True |
| | ) -> GlyphMap: |
| | """ |
| | Map residue patterns to glyphs. |
| | |
| | Parameters: |
| | ----------- |
| | residue_patterns : List[ResiduePattern] |
| | Residue patterns to visualize |
| | layout_type : Optional[str] |
| | Type of layout to use |
| | dimensions : Optional[Tuple[int, int]] |
| | Dimensions for visualization |
| | scale : Optional[float] |
| | Scale factor for visualization |
| | cluster_patterns : bool |
| | Whether to cluster similar patterns |
| | |
| | Returns: |
| | -------- |
| | GlyphMap |
| | Glyph map representation of residue patterns |
| | """ |
| | map_start = time.time() |
| | layout_type = layout_type or self.default_layout |
| | dimensions = dimensions or self.default_dimensions |
| | scale = scale or self.default_scale |
| | |
| | logger.info(f"Mapping {len(residue_patterns)} residue patterns to glyphs") |
| | |
| | |
| | map_id = f"residue_{int(time.time())}_{hashlib.md5(str(residue_patterns).encode()).hexdigest()[:8]}" |
| | |
| | |
| | glyph_map = GlyphMap( |
| | id=map_id, |
| | glyphs=[], |
| | connections=[], |
| | source_type="residue", |
| | layout_type=layout_type, |
| | dimensions=dimensions, |
| | scale=scale, |
| | metadata={ |
| | "num_patterns": len(residue_patterns), |
| | "pattern_types": [p.type for p in residue_patterns], |
| | "timestamp": time.time() |
| | } |
| | ) |
| | |
| | |
| | pattern_by_type = {} |
| | for pattern in residue_patterns: |
| | if pattern.type not in pattern_by_type: |
| | pattern_by_type[pattern.type] = [] |
| | pattern_by_type[pattern.type].append(pattern) |
| | |
| | |
| | type_glyphs = {} |
| | for i, (pattern_type, patterns) in enumerate(pattern_by_type.items()): |
| | |
| | if pattern_type == "memory_decay": |
| | glyph_type = "residue_memory_decay" |
| | elif pattern_type == "value_conflict": |
| | glyph_type = "residue_value_conflict" |
| | elif pattern_type == "ghost_activation": |
| | glyph_type = "residue_ghost_activation" |
| | elif pattern_type == "boundary_hesitation": |
| | glyph_type = "residue_boundary_hesitation" |
| | elif pattern_type == "null_output": |
| | glyph_type = "residue_null_output" |
| | else: |
| | |
| | glyph_type = "residue_null_output" |
| | |
| | |
| | glyph_info = self.registry.get_glyph(glyph_type) |
| | |
| | |
| | glyph_id = f"type_{pattern_type}" |
| | glyph = Glyph( |
| | id=glyph_id, |
| | symbol=glyph_info["symbol"], |
| | type=GlyphType.RESIDUE, |
| | semantics=glyph_info["semantics"], |
| | position=(dimensions[0] / 2, 100 + i * 150), |
| | size=20.0, |
| | color=self._get_color_for_residue_type(pattern_type), |
| | opacity=1.0, |
| | source_elements=patterns, |
| | description=f"{pattern_type.replace('_', ' ').title()} Residue Pattern", |
| | metadata={ |
| | "pattern_type": pattern_type, |
| | "pattern_count": len(patterns), |
| | "average_confidence": sum(p.confidence for p in patterns) / len(patterns) |
| | } |
| | ) |
| | glyph_map.glyphs.append(glyph) |
| | type_glyphs[pattern_type] = glyph_id |
| | |
| | |
| | glyph_map.regions[pattern_type] = [glyph_id] |
| | |
| | |
| | instance_glyphs = {} |
| | for pattern_idx, pattern in enumerate(residue_patterns): |
| | |
| | parent_glyph_id = type_glyphs.get(pattern.type) |
| | if not parent_glyph_id: |
| | continue |
| | |
| | |
| | if pattern.confidence > 0.8: |
| | size_factor = 1.2 |
| | opacity = 0.9 |
| | elif pattern.confidence > 0.5: |
| | size_factor = 1.0 |
| | opacity = 0.7 |
| | else: |
| | size_factor = 0.8 |
| | opacity = 0.5 |
| | |
| | |
| | glyph_type = f"residue_{pattern.type}" |
| | glyph_info = self.registry.get_glyph(glyph_type) |
| | |
| | |
| | glyph_id = f"pattern_{pattern_idx}" |
| | glyph = Glyph( |
| | id=glyph_id, |
| | symbol=glyph_info["symbol"], |
| | type=GlyphType.RESIDUE, |
| | semantics=glyph_info["semantics"], |
| | position=(0, 0), |
| | size=12.0 * size_factor, |
| | color=self._get_color_for_residue_type(pattern.type), |
| | opacity=opacity, |
| | source_elements=[pattern], |
| | description=f"{pattern.type.replace('_', ' ').title()} Pattern: {pattern.signature[:20]}", |
| | metadata={ |
| | "pattern_type": pattern.type, |
| | "confidence": pattern.confidence, |
| | "signature": pattern.signature, |
| | "context": pattern.context |
| | } |
| | ) |
| | glyph_map.glyphs.append(glyph) |
| | instance_glyphs[pattern_idx] = glyph_id |
| | |
| | |
| | if pattern.type in glyph_map.regions: |
| | glyph_map.regions[pattern.type].append(glyph_id) |
| | |
| | |
| | glyph_map.connections.append(GlyphConnection( |
| | source_id=parent_glyph_id, |
| | target_id=glyph_id, |
| | strength=pattern.confidence, |
| | type="type_instance", |
| | directed=True, |
| | color=self._get_color_for_residue_type(pattern.type), |
| | width=1.0 + pattern.confidence, |
| | opacity=0.6 * pattern.confidence |
| | )) |
| | |
| | |
| | if cluster_patterns and len(residue_patterns) > 1: |
| | |
| | similarity_matrix = np.zeros((len(residue_patterns), len(residue_patterns))) |
| | |
| | for i, pattern1 in enumerate(residue_patterns): |
| | for j, pattern2 in enumerate(residue_patterns): |
| | if i == j: |
| | similarity_matrix[i, j] = 1.0 |
| | else: |
| | |
| | signature_sim = self._calculate_signature_similarity( |
| | pattern1.signature, pattern2.signature |
| | ) |
| | context_sim = self._calculate_context_similarity( |
| | pattern1.context, pattern2.context |
| | ) |
| | |
| | similarity_matrix[i, j] = 0.7 * signature_sim + 0.3 * context_sim |
| | |
| | |
| | for i in range(len(residue_patterns)): |
| | for j in range(i + 1, len(residue_patterns)): |
| | similarity = similarity_matrix[i, j] |
| | if similarity > 0.6: |
| | source_id = instance_glyphs.get(i) |
| | target_id = instance_glyphs.get(j) |
| | if source_id and target_id: |
| | glyph_map.connections.append(GlyphConnection( |
| | source_id=source_id, |
| | target_id=target_id, |
| | strength=similarity, |
| | type="pattern_similarity", |
| | directed=False, |
| | color="#2ecc71", |
| | width=1.0 + 2.0 * similarity, |
| | opacity=0.5 * similarity |
| | )) |
| | |
| | |
| | if layout_type == "similarity": |
| | |
| | tsne = TSNE(n_components=2, perplexity=min(5, len(residue_patterns) - 1)) |
| | positions = tsne.fit_transform(similarity_matrix) |
| | |
| | |
| | positions = self._scale_positions(positions, dimensions) |
| | |
| | |
| | for i, pattern_idx in enumerate(instance_glyphs.keys()): |
| | glyph_id = instance_glyphs[pattern_idx] |
| | for glyph in glyph_map.glyphs: |
| | if glyph.id == glyph_id: |
| | glyph.position = (positions[i, 0], positions[i, 1]) |
| | |
| | |
| | if self.auto_layout and layout_type != "similarity": |
| | glyph_map = self._apply_layout(glyph_map, layout_type) |
| | |
| | |
| | for pattern_idx, pattern in enumerate(residue_patterns): |
| | |
| | if pattern.confidence < 0.7: |
| | continue |
| | |
| | glyph_id = instance_glyphs.get(pattern_idx) |
| | if not glyph_id: |
| | continue |
| | |
| | |
| | ref_glyph_id = f"ref_{pattern_idx}" |
| | ref_glyph = Glyph( |
| | id=ref_glyph_id, |
| | symbol="✱", |
| | type=GlyphType.SENTINEL, |
| | semantics=[GlyphSemantic.DIRECTION], |
| | position=(0, 0), |
| | size=6.0, |
| | color="#3498db", |
| | opacity=0.7, |
| | source_elements=[pattern.context], |
| | description=f"Context reference for pattern {pattern_idx}", |
| | metadata={ |
| | "pattern_index": pattern_idx, |
| | "reference_type": "context" |
| | } |
| | ) |
| | |
| | |
| | for glyph in glyph_map.glyphs: |
| | if glyph.id == glyph_id: |
| | x, y = glyph.position |
| | ref_glyph.position = (x + 20, y - 20) |
| | |
| | glyph_map.glyphs.append(ref_glyph) |
| | |
| | |
| | glyph_map.connections.append(GlyphConnection( |
| | source_id=ref_glyph_id, |
| | target_id=glyph_id, |
| | strength=0.7, |
| | type="context_reference", |
| | directed=True, |
| | color="#3498db", |
| | width=1.0, |
| | opacity=0.5 |
| | )) |
| | |
| | |
| | map_time = time.time() - map_start |
| | glyph_map.metadata["map_time"] = map_time |
| | |
| | |
| | self.glyph_map_history.append(glyph_map) |
| | |
| | logger.info(f"Residue pattern mapping completed in {map_time:.2f}s") |
| | return glyph_map |
| | |
| | def map_attention_heads( |
| | self, |
| | attention_data: Dict[str, Any], |
| | layout_type: Optional[str] = None, |
| | dimensions: Optional[Tuple[int, int]] = None, |
| | scale: Optional[float] = None, |
| | include_tokens: bool = True |
| | ) -> GlyphMap: |
| | """ |
| | Map attention head patterns to glyphs. |
| | |
| | Parameters: |
| | ----------- |
| | attention_data : Dict[str, Any] |
| | Attention head data to visualize |
| | layout_type : Optional[str] |
| | Type of layout to use |
| | dimensions : Optional[Tuple[int, int]] |
| | Dimensions for visualization |
| | scale : Optional[float] |
| | Scale factor for visualization |
| | include_tokens : bool |
| | Whether to include tokens as sentinel glyphs |
| | |
| | Returns: |
| | -------- |
| | GlyphMap |
| | Glyph map representation of attention head patterns |
| | """ |
| | map_start = time.time() |
| | layout_type = layout_type or self.default_layout |
| | dimensions = dimensions or self.default_dimensions |
| | scale = scale or self.default_scale |
| | |
| | logger.info("Mapping attention head patterns to glyphs") |
| | |
| | |
| | prompt_tokens = attention_data.get("prompt_tokens", []) |
| | output_tokens = attention_data.get("output_tokens", []) |
| | attention_heads = attention_data.get("attention_heads", []) |
| | head_patterns = attention_data.get("head_patterns", {}) |
| | |
| | |
| | map_id = f"attention_{int(time.time())}_{hashlib.md5(str(attention_heads).encode()).hexdigest()[:8]}" |
| | |
| | |
| | glyph_map = GlyphMap( |
| | id=map_id, |
| | glyphs=[], |
| | connections=[], |
| | source_type="attention", |
| | layout_type=layout_type, |
| | dimensions=dimensions, |
| | scale=scale, |
| | metadata={ |
| | "num_heads": len(attention_heads), |
| | "model_id": attention_data.get("metadata", {}).get("model_id", "unknown"), |
| | "timestamp": time.time() |
| | } |
| | ) |
| | |
| | |
| | token_glyph_ids = {} |
| | if include_tokens: |
| | |
| | for i, token in enumerate(prompt_tokens): |
| | glyph_id = f"token_prompt_{i}" |
| | glyph = Glyph( |
| | id=glyph_id, |
| | symbol="◆", |
| | type=GlyphType.SENTINEL, |
| | semantics=[GlyphSemantic.DIRECTION], |
| | position=(0, i * 20), |
| | size=5.0, |
| | color="#3498db", |
| | opacity=0.8, |
| | source_elements=[token], |
| | description=f"Prompt token: '{token}'", |
| | metadata={"token_index": i, "token_type": "prompt"} |
| | ) |
| | glyph_map.glyphs.append(glyph) |
| | token_glyph_ids[f"prompt_{i}"] = glyph_id |
| | |
| | |
| | for i, token in enumerate(output_tokens): |
| | glyph_id = f"token_output_{i}" |
| | glyph = Glyph( |
| | id=glyph_id, |
| | symbol="◇", |
| | type=GlyphType.SENTINEL, |
| | semantics=[GlyphSemantic.DIRECTION], |
| | position=(100, i * 20), |
| | size=5.0, |
| | color="#e74c3c", |
| | opacity=0.8, |
| | source_elements=[token], |
| | description=f"Output token: '{token}'", |
| | metadata={"token_index": i, "token_type": "output"} |
| | ) |
| | glyph_map.glyphs.append(glyph) |
| | token_glyph_ids[f"output_{i}"] = glyph_id |
| | |
| | |
| | head_glyphs = {} |
| | for head_idx, head in enumerate(attention_heads): |
| | |
| | if head.pattern_type == "induction": |
| | glyph_type = "attention_induction" |
| | elif head.pattern_type == "focus": |
| | glyph_type = "attention_focus" |
| | elif head.pattern_type == "diffuse": |
| | glyph_type = "attention_diffuse" |
| | elif head.pattern_type == "inhibition": |
| | glyph_type = "attention_inhibition" |
| | else: |
| | glyph_type = "attention_multi_head" |
| | |
| | |
| | glyph_info = self.registry.get_glyph(glyph_type) |
| | |
| | |
| | glyph_id = f"head_{head_idx}" |
| | glyph = Glyph( |
| | id=glyph_id, |
| | symbol=glyph_info["symbol"], |
| | type=GlyphType.ATTENTION, |
| | semantics=glyph_info["semantics"], |
| | position=(50, head.layer * 40 + head.head * 10), |
| | size=8.0 + 5.0 * head.strength, |
| | color=self._get_color_for_layer(head.layer), |
| | opacity=min(1.0, 0.5 + head.strength / 2), |
| | source_elements=[head], |
| | description=f"Attention head {head.layer}.{head.head}: {head.pattern_type}", |
| | metadata={ |
| | "head_index": head_idx, |
| | "layer": head.layer, |
| | "head": head.head, |
| | "pattern_type": head.pattern_type, |
| | "strength": head.strength, |
| | "function": head.function, |
| | "attribution_role": head.attribution_role |
| | } |
| | ) |
| | glyph_map.glyphs.append(glyph) |
| | head_glyphs[head_idx] = glyph_id |
| | |
| | |
| | if include_tokens and head.focus_tokens: |
| | for token_idx in head.focus_tokens: |
| | token_type = "prompt" if token_idx < len(prompt_tokens) else "output" |
| | adjusted_idx = token_idx if token_type == "prompt" else token_idx - len(prompt_tokens) |
| | token_glyph_id = token_glyph_ids.get(f"{token_type}_{adjusted_idx}") |
| | if token_glyph_id: |
| | glyph_map.connections.append(GlyphConnection( |
| | source_id=glyph_id, |
| | target_id=token_glyph_id, |
| | strength=head.strength, |
| | type="attention_focus", |
| | directed=True, |
| | color=self._get_color_for_layer(head.layer, alpha=0.6), |
| | width=1.0 + 2.0 * head.strength, |
| | opacity=0.6 * head.strength |
| | )) |
| | |
| | |
| | for pattern_name, related_heads in head_patterns.items(): |
| | |
| | if len(related_heads) < 2: |
| | continue |
| | |
| | |
| | glyph_type = "meta_emergence" if "emergent" in pattern_name else "recursive_recursive_exchange" |
| | glyph_info = self.registry.get_glyph(glyph_type) |
| | |
| | pattern_glyph_id = f"pattern_{pattern_name}" |
| | pattern_glyph = Glyph( |
| | id=pattern_glyph_id, |
| | symbol=glyph_info["symbol"], |
| | type=GlyphType.META if "emergent" in pattern_name else GlyphType.RECURSIVE, |
| | semantics=glyph_info["semantics"], |
| | position=(100, 100), |
| | size=12.0, |
| | color="#f1c40f", |
| | opacity=0.9, |
| | source_elements=[pattern_name, related_heads], |
| | description=f"Attention pattern: {pattern_name}", |
| | metadata={ |
| | "pattern_name": pattern_name, |
| | "related_heads": related_heads |
| | } |
| | ) |
| | glyph_map.glyphs.append(pattern_glyph) |
| | |
| | |
| | for head_idx in related_heads: |
| | head_glyph_id = head_glyphs.get(head_idx) |
| | if head_glyph_id: |
| | glyph_map.connections.append(GlyphConnection( |
| | source_id=pattern_glyph_id, |
| | target_id=head_glyph_id, |
| | strength=0.8, |
| | type="pattern_membership", |
| | directed=True, |
| | color="#f1c40f", |
| | width=1.5, |
| | opacity=0.7 |
| | )) |
| | |
| | |
| | if pattern_name not in glyph_map.regions: |
| | glyph_map.regions[pattern_name] = [] |
| | glyph_map.regions[pattern_name].append(pattern_glyph_id) |
| | for head_idx in related_heads: |
| | head_glyph_id = head_glyphs.get(head_idx) |
| | if head_glyph_id: |
| | glyph_map.regions[pattern_name].append(head_glyph_id) |
| | |
| | |
| | layers = set(head.layer for head in attention_heads) |
| | for layer in layers: |
| | |
| | layer_heads = [ |
| | head_idx for head_idx, head in enumerate(attention_heads) |
| | if head.layer == layer |
| | ] |
| | |
| | |
| | layer_region = f"layer_{layer}" |
| | glyph_map.regions[layer_region] = [ |
| | head_glyphs[head_idx] for head_idx in layer_heads |
| | if head_idx in head_glyphs |
| | ] |
| | |
| | |
| | if self.auto_layout: |
| | glyph_map = self._apply_layout(glyph_map, layout_type) |
| | |
| | |
| | map_time = time.time() - map_start |
| | glyph_map.metadata["map_time"] = map_time |
| | |
| | |
| | self.glyph_map_history.append(glyph_map) |
| | |
| | logger.info(f"Attention head mapping completed in {map_time:.2f}s") |
| | return glyph_map |
| | |
| | def map_recursive_trace( |
| | self, |
| | trace_data: Dict[str, Any], |
| | layout_type: Optional[str] = None, |
| | dimensions: Optional[Tuple[int, int]] = None, |
| | scale: Optional[float] = None, |
| | depth_limit: Optional[int] = None |
| | ) -> GlyphMap: |
| | """ |
| | Map recursive trace data to glyphs. |
| | |
| | Parameters: |
| | ----------- |
| | trace_data : Dict[str, Any] |
| | Recursive trace data to visualize |
| | layout_type : Optional[str] |
| | Type of layout to use |
| | dimensions : Optional[Tuple[int, int]] |
| | Dimensions for visualization |
| | scale : Optional[float] |
| | Scale factor for visualization |
| | depth_limit : Optional[int] |
| | Maximum recursion depth to visualize |
| | |
| | Returns: |
| | -------- |
| | GlyphMap |
| | Glyph map representation of recursive trace |
| | """ |
| | map_start = time.time() |
| | layout_type = layout_type or self.default_layout |
| | dimensions = dimensions or self.default_dimensions |
| | scale = scale or self.default_scale |
| | |
| | logger.info("Mapping recursive trace to glyphs") |
| | |
| | |
| | trace_operations = trace_data.get("operations", []) |
| | trace_result = trace_data.get("result", {}) |
| | trace_depth = trace_data.get("depth", 0) |
| | |
| | |
| | if depth_limit is not None: |
| | trace_depth = min(trace_depth, depth_limit) |
| | |
| | trace_operations = [ |
| | op for op in trace_operations |
| | if op.get("depth", 0) <= depth_limit |
| | ] |
| | |
| | |
| | map_id = f"recursive_{int(time.time())}_{hashlib.md5(str(trace_data).encode()).hexdigest()[:8]}" |
| | |
| | |
| | glyph_map = GlyphMap( |
| | id=map_id, |
| | glyphs=[], |
| | connections=[], |
| | source_type="recursive", |
| | layout_type=layout_type, |
| | dimensions=dimensions, |
| | scale=scale, |
| | metadata={ |
| | "trace_command": trace_data.get("command", "unknown"), |
| | "trace_target": trace_data.get("target", "unknown"), |
| | "trace_depth": trace_depth, |
| | "timestamp": time.time() |
| | } |
| | ) |
| | |
| | |
| | seed_glyph = Glyph( |
| | id="seed", |
| | symbol="∴", |
| | type=GlyphType.RECURSIVE, |
| | semantics=[GlyphSemantic.RECURSION, GlyphSemantic.EMERGENCE], |
| | position=(dimensions[0] / 2, 50), |
| | size=15.0, |
| | color="#9b59b6", |
| | opacity=1.0, |
| | source_elements=[trace_data.get("command", "")], |
| | description=f"Recursive seed: {trace_data.get('command', 'unknown')}", |
| | metadata={ |
| | "command": trace_data.get("command", ""), |
| | "target": trace_data.get("target", ""), |
| | "depth": trace_depth |
| | } |
| | ) |
| | glyph_map.glyphs.append(seed_glyph) |
| | glyph_map.focal_points.append("seed") |
| | |
| | |
| | depth_glyphs = {"0": "seed"} |
| | for op_idx, operation in enumerate(trace_operations): |
| | op_depth = operation.get("depth", 0) |
| | op_type = operation.get("type", "unknown") |
| | |
| | |
| | if "reflect" in op_type: |
| | glyph_type = "recursive_recursive_mirror" |
| | elif "collapse" in op_type: |
| | glyph_type = "meta_collapse_point" |
| | elif "fork" in op_type: |
| | glyph_type = "attribution_fork" |
| | elif "trace" in op_type: |
| | glyph_type = "recursive_recursive_exchange" |
| | else: |
| | glyph_type = "recursive_recursive_anchor" |
| | |
| | |
| | glyph_info = self.registry.get_glyph(glyph_type) |
| | |
| | |
| | glyph_id = f"op_{op_idx}" |
| | glyph = Glyph( |
| | id=glyph_id, |
| | symbol=glyph_info["symbol"], |
| | type=GlyphType.RECURSIVE, |
| | semantics=glyph_info["semantics"], |
| | position=(100 + op_depth * 80, 100 + op_idx * 30), |
| | size=10.0 - op_depth * 0.5, |
| | color=self._get_color_for_depth(op_depth), |
| | opacity=max(0.5, 1.0 - op_depth * 0.1), |
| | source_elements=[operation], |
| | description=f"Operation {op_type} at depth {op_depth}", |
| | metadata={ |
| | "operation_index": op_idx, |
| | "operation_type": op_type, |
| | "depth": op_depth, |
| | "parameters": operation.get("parameters", {}) |
| | } |
| | ) |
| | glyph_map.glyphs.append(glyph) |
| | |
| | |
| | depth_key = str(op_depth) |
| | if depth_key not in depth_glyphs: |
| | depth_glyphs[depth_key] = [] |
| | if isinstance(depth_glyphs[depth_key], list): |
| | depth_glyphs[depth_key].append(glyph_id) |
| | else: |
| | depth_glyphs[depth_key] = [depth_glyphs[depth_key], glyph_id] |
| | |
| | |
| | parent_depth_key = str(op_depth - 1) |
| | if parent_depth_key in depth_glyphs: |
| | parent_glyphs = depth_glyphs[parent_depth_key] |
| | if isinstance(parent_glyphs, list): |
| | |
| | parent_indices = [ |
| | int(g.split('_')[1]) if g.startswith('op_') else 0 |
| | for g in parent_glyphs |
| | ] |
| | closest_parent_idx = min(range(len(parent_indices)), key=lambda i: abs(parent_indices[i] - op_idx)) |
| | parent_glyph_id = parent_glyphs[closest_parent_idx] |
| | else: |
| | parent_glyph_id = parent_glyphs |
| | |
| | glyph_map.connections.append(GlyphConnection( |
| | source_id=parent_glyph_id, |
| | target_id=glyph_id, |
| | strength=0.8, |
| | type="recursive_descent", |
| | directed=True, |
| | color=self._get_color_for_depth(op_depth - 1, alpha=0.6), |
| | width=2.0 - op_depth * 0.2, |
| | opacity=max(0.4, 0.9 - op_depth * 0.1) |
| | )) |
| | |
| | |
| | if trace_result: |
| | result_type = trace_result.get("type", "unknown") |
| | |
| | |
| | if "success" in result_type: |
| | glyph_type = "recursive_recursive_aegis" |
| | elif "collapse" in result_type: |
| | glyph_type = "meta_collapse_point" |
| | elif "partial" in result_type: |
| | glyph_type = "residue_ghost_activation" |
| | else: |
| | glyph_type = "meta_uncertainty" |
| | |
| | |
| | glyph_info = self.registry.get_glyph(glyph_type) |
| | |
| | result_glyph_id = "result" |
| | result_glyph = Glyph( |
| | id=result_glyph_id, |
| | symbol=glyph_info["symbol"], |
| | type=GlyphType.RECURSIVE if "success" in result_type else GlyphType.META, |
| | semantics=glyph_info["semantics"], |
| | position=(dimensions[0] / 2, dimensions[1] - 80), |
| | size=15.0, |
| | color="#27ae60" if "success" in result_type else "#e74c3c", |
| | opacity=1.0, |
| | source_elements=[trace_result], |
| | description=f"Trace result: {result_type}", |
| | metadata={ |
| | "result_type": result_type, |
| | "result_data": trace_result.get("data", {}), |
| | "confidence": trace_result.get("confidence", 0.0) |
| | } |
| | ) |
| | glyph_map.glyphs.append(result_glyph) |
| | |
| | |
| | max_depth = max(int(d) for d in depth_glyphs.keys()) |
| | deepest_glyphs = depth_glyphs[str(max_depth)] |
| | if isinstance(deepest_glyphs, list): |
| | for glyph_id in deepest_glyphs: |
| | glyph_map.connections.append(GlyphConnection( |
| | source_id=glyph_id, |
| | target_id=result_glyph_id, |
| | strength=0.9, |
| | type="recursion_result", |
| | directed=True, |
| | color="#27ae60" if "success" in result_type else "#e74c3c", |
| | width=1.5, |
| | opacity=0.8 |
| | )) |
| | else: |
| | glyph_map.connections.append(GlyphConnection( |
| | source_id=deepest_glyphs, |
| | target_id=result_glyph_id, |
| | strength=0.9, |
| | type="recursion_result", |
| | directed=True, |
| | color="#27ae60" if "success" in result_type else "#e74c3c", |
| | width=1.5, |
| | opacity=0.8 |
| | )) |
| | |
| | |
| | glyph_map.connections.append(GlyphConnection( |
| | source_id="seed", |
| | target_id=result_glyph_id, |
| | strength=1.0, |
| | type="recursion_completion", |
| | directed=True, |
| | color="#9b59b6", |
| | width=2.0, |
| | opacity=0.5 |
| | )) |
| | |
| | |
| | for depth in range(trace_depth + 1): |
| | depth_key = str(depth) |
| | if depth_key in depth_glyphs: |
| | depth_glyphs_list = depth_glyphs[depth_key] |
| | if not isinstance(depth_glyphs_list, list): |
| | depth_glyphs_list = [depth_glyphs_list] |
| | glyph_map.regions[f"depth_{depth}"] = depth_glyphs_list |
| | |
| | |
| | if self.auto_layout: |
| | |
| | if layout_type == self.default_layout and self.default_layout != "hierarchical": |
| | layout_type = "hierarchical" |
| | glyph_map = self._apply_layout(glyph_map, layout_type) |
| | |
| | |
| | map_time = time.time() - map_start |
| | glyph_map.metadata["map_time"] = map_time |
| | |
| | |
| | self.glyph_map_history.append(glyph_map) |
| | |
| | logger.info(f"Recursive trace mapping completed in {map_time:.2f}s") |
| | return glyph_map |
| | |
| | def combine_glyph_maps( |
| | self, |
| | glyph_maps: List[GlyphMap], |
| | layout_type: Optional[str] = None, |
| | dimensions: Optional[Tuple[int, int]] = None, |
| | scale: Optional[float] = None, |
| | connection_threshold: float = 0.5 |
| | ) -> GlyphMap: |
| | """ |
| | Combine multiple glyph maps into a unified map. |
| | |
| | Parameters: |
| | ----------- |
| | glyph_maps : List[GlyphMap] |
| | Glyph maps to combine |
| | layout_type : Optional[str] |
| | Type of layout to use |
| | dimensions : Optional[Tuple[int, int]] |
| | Dimensions for visualization |
| | scale : Optional[float] |
| | Scale factor for visualization |
| | connection_threshold : float |
| | Minimum strength for inter-map connections |
| | |
| | Returns: |
| | -------- |
| | GlyphMap |
| | Combined glyph map |
| | """ |
| | map_start = time.time() |
| | layout_type = layout_type or self.default_layout |
| | dimensions = dimensions or ( |
| | max(gm.dimensions[0] for gm in glyph_maps), |
| | max(gm.dimensions[1] for gm in glyph_maps) |
| | ) |
| | scale = scale or self.default_scale |
| | |
| | logger.info(f"Combining {len(glyph_maps)} glyph maps") |
| | |
| | |
| | map_id = f"combined_{int(time.time())}_{hashlib.md5(str([gm.id for gm in glyph_maps]).encode()).hexdigest()[:8]}" |
| | |
| | |
| | combined_map = GlyphMap( |
| | id=map_id, |
| | glyphs=[], |
| | connections=[], |
| | source_type="combined", |
| | layout_type=layout_type, |
| | dimensions=dimensions, |
| | scale=scale, |
| | metadata={ |
| | "source_maps": [gm.id for gm in glyph_maps], |
| | "source_types": [gm.source_type for gm in glyph_maps], |
| | "timestamp": time.time() |
| | } |
| | ) |
| | |
| | |
| | id_mapping = {} |
| | for i, gm in enumerate(glyph_maps): |
| | prefix = f"map{i}_" |
| | for glyph in gm.glyphs: |
| | id_mapping[glyph.id] = prefix + glyph.id |
| | |
| | |
| | for i, gm in enumerate(glyph_maps): |
| | prefix = f"map{i}_" |
| | |
| | |
| | map_region = f"map_{i}" |
| | combined_map.regions[map_region] = [] |
| | |
| | |
| | for glyph in gm.glyphs: |
| | new_id = prefix + glyph.id |
| | new_glyph = Glyph( |
| | id=new_id, |
| | symbol=glyph.symbol, |
| | type=glyph.type, |
| | semantics=glyph.semantics, |
| | position=glyph.position, |
| | size=glyph.size, |
| | color=glyph.color, |
| | opacity=glyph.opacity, |
| | source_elements=glyph.source_elements, |
| | description=glyph.description, |
| | metadata={ |
| | **glyph.metadata, |
| | "original_id": glyph.id, |
| | "source_map": gm.id |
| | } |
| | ) |
| | combined_map.glyphs.append(new_glyph) |
| | combined_map.regions[map_region].append(new_id) |
| | |
| | |
| | for conn in gm.connections: |
| | combined_map.connections.append(GlyphConnection( |
| | source_id=prefix + conn.source_id, |
| | target_id=prefix + conn.target_id, |
| | strength=conn.strength, |
| | type=conn.type, |
| | directed=conn.directed, |
| | color=conn.color, |
| | width=conn.width, |
| | opacity=conn.opacity, |
| | metadata={ |
| | **conn.metadata, |
| | "source_map": gm.id |
| | } |
| | )) |
| | |
| | |
| | for focal_point in gm.focal_points: |
| | combined_map.focal_points.append(prefix + focal_point) |
| | |
| | |
| | for i, gm1 in enumerate(glyph_maps): |
| | for j, gm2 in enumerate(glyph_maps): |
| | if i >= j: |
| | continue |
| | |
| | |
| | related_pairs = self._find_related_glyphs(gm1, gm2) |
| | |
| | |
| | for glyph1_id, glyph2_id, similarity in related_pairs: |
| | if similarity >= connection_threshold: |
| | prefixed_id1 = f"map{i}_{glyph1_id}" |
| | prefixed_id2 = f"map{j}_{glyph2_id}" |
| | |
| | combined_map.connections.append(GlyphConnection( |
| | source_id=prefixed_id1, |
| | target_id=prefixed_id2, |
| | strength=similarity, |
| | type="cross_map_relation", |
| | directed=False, |
| | color="#3498db", |
| | width=1.0 + similarity, |
| | opacity=0.6 * similarity, |
| | metadata={ |
| | "relation_type": "cross_map", |
| | "source_map": gm1.id, |
| | "target_map": gm2.id, |
| | "similarity": similarity |
| | } |
| | )) |
| | |
| | |
| | if self.auto_layout: |
| | combined_map = self._apply_layout(combined_map, layout_type) |
| | |
| | |
| | map_time = time.time() - map_start |
| | combined_map.metadata["map_time"] = map_time |
| | |
| | |
| | self.glyph_map_history.append(combined_map) |
| | |
| | logger.info(f"Glyph map combination completed in {map_time:.2f}s") |
| | return combined_map |
| | |
| | def visualize( |
| | self, |
| | glyph_map: GlyphMap, |
| | output_path: Optional[str] = None, |
| | interactive: bool = True |
| | ) -> Any: |
| | """ |
| | Visualize a glyph map. |
| | |
| | Parameters: |
| | ----------- |
| | glyph_map : GlyphMap |
| | Glyph map to visualize |
| | output_path : Optional[str] |
| | Path to save visualization to |
| | interactive : bool |
| | Whether to generate interactive visualization |
| | |
| | Returns: |
| | -------- |
| | Any |
| | Visualization result |
| | """ |
| | if self.visualizer: |
| | return self.visualizer.visualize_glyph_map( |
| | glyph_map=glyph_map, |
| | output_path=output_path, |
| | interactive=interactive |
| | ) |
| | else: |
| | |
| | return self._simple_visualization( |
| | glyph_map=glyph_map, |
| | output_path=output_path |
| | ) |
| | |
| | def save_glyph_map( |
| | self, |
| | glyph_map: GlyphMap, |
| | output_path: str |
| | ) -> str: |
| | """ |
| | Save a glyph map to a file. |
| | |
| | Parameters: |
| | ----------- |
| | glyph_map : GlyphMap |
| | Glyph map to save |
| | output_path : str |
| | Path to save glyph map to |
| | |
| | Returns: |
| | -------- |
| | str |
| | Path to saved file |
| | """ |
| | |
| | serializable_map = { |
| | "id": glyph_map.id, |
| | "source_type": glyph_map.source_type, |
| | "layout_type": glyph_map.layout_type, |
| | "dimensions": glyph_map.dimensions, |
| | "scale": glyph_map.scale, |
| | "focal_points": glyph_map.focal_points, |
| | "regions": glyph_map.regions, |
| | "metadata": glyph_map.metadata, |
| | "glyphs": [ |
| | { |
| | "id": g.id, |
| | "symbol": g.symbol, |
| | "type": g.type.value, |
| | "semantics": [s.value for s in g.semantics], |
| | "position": g.position, |
| | "size": g.size, |
| | "color": g.color, |
| | "opacity": g.opacity, |
| | "description": g.description, |
| | "metadata": g.metadata |
| | } |
| | for g in glyph_map.glyphs |
| | ], |
| | "connections": [ |
| | { |
| | "source_id": c.source_id, |
| | "target_id": c.target_id, |
| | "strength": c.strength, |
| | "type": c.type, |
| | "directed": c.directed, |
| | "color": c.color, |
| | "width": c.width, |
| | "opacity": c.opacity, |
| | "metadata": c.metadata |
| | } |
| | for c in glyph_map.connections |
| | ] |
| | } |
| | |
| | |
| | output_dir = Path(output_path).parent |
| | output_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | with open(output_path, "w") as f: |
| | json.dump(serializable_map, f, indent=2) |
| | |
| | logger.info(f"Saved glyph map to {output_path}") |
| | return output_path |
| | |
| | def load_glyph_map( |
| | self, |
| | input_path: str |
| | ) -> GlyphMap: |
| | """ |
| | Load a glyph map from a file. |
| | |
| | Parameters: |
| | ----------- |
| | input_path : str |
| | Path to load glyph map from |
| | |
| | Returns: |
| | -------- |
| | GlyphMap |
| | Loaded glyph map |
| | """ |
| | |
| | with open(input_path, "r") as f: |
| | data = json.load(f) |
| | |
| | |
| | glyphs = [ |
| | Glyph( |
| | id=g["id"], |
| | symbol=g["symbol"], |
| | type=GlyphType(g["type"]), |
| | semantics=[GlyphSemantic(s) for s in g["semantics"]], |
| | position=tuple(g["position"]), |
| | size=g["size"], |
| | color=g["color"], |
| | opacity=g["opacity"], |
| | description=g.get("description"), |
| | metadata=g.get("metadata", {}) |
| | ) |
| | for g in data["glyphs"] |
| | ] |
| | |
| | connections = [ |
| | GlyphConnection( |
| | source_id=c["source_id"], |
| | target_id=c["target_id"], |
| | strength=c["strength"], |
| | type=c["type"], |
| | directed=c["directed"], |
| | color=c["color"], |
| | width=c["width"], |
| | opacity=c["opacity"], |
| | metadata=c.get("metadata", {}) |
| | ) |
| | for c in data["connections"] |
| | ] |
| | |
| | glyph_map = GlyphMap( |
| | id=data["id"], |
| | glyphs=glyphs, |
| | connections=connections, |
| | source_type=data["source_type"], |
| | layout_type=data["layout_type"], |
| | dimensions=tuple(data["dimensions"]), |
| | scale=data["scale"], |
| | focal_points=data.get("focal_points", []), |
| | regions=data.get("regions", {}), |
| | metadata=data.get("metadata", {}) |
| | ) |
| | |
| | logger.info(f"Loaded glyph map from {input_path}") |
| | return glyph_map |
| | |
| | |
| | |
| | def _apply_layout( |
| | self, |
| | glyph_map: GlyphMap, |
| | layout_type: str |
| | ) -> GlyphMap: |
| | """Apply a layout to a glyph map.""" |
| | layout_start = time.time() |
| | |
| | if layout_type == "force_directed": |
| | glyph_map = self._apply_force_directed_layout(glyph_map) |
| | elif layout_type == "hierarchical": |
| | glyph_map = self._apply_hierarchical_layout(glyph_map) |
| | elif layout_type == "circular": |
| | glyph_map = self._apply_circular_layout(glyph_map) |
| | elif layout_type == "grid": |
| | glyph_map = self._apply_grid_layout(glyph_map) |
| | elif layout_type == "radial": |
| | glyph_map = self._apply_radial_layout(glyph_map) |
| | else: |
| | logger.warning(f"Unknown layout type: {layout_type}, using force_directed") |
| | glyph_map = self._apply_force_directed_layout(glyph_map) |
| | |
| | layout_time = time.time() - layout_start |
| | logger.info(f"Applied {layout_type} layout in {layout_time:.2f}s") |
| | |
| | return glyph_map |
| | |
| | def _apply_force_directed_layout(self, glyph_map: GlyphMap) -> GlyphMap: |
| | """Apply force-directed layout to a glyph map.""" |
| | |
| | G = nx.Graph() |
| | |
| | |
| | for glyph in glyph_map.glyphs: |
| | G.add_node(glyph.id, size=glyph.size, type=glyph.type.value) |
| | |
| | |
| | for conn in glyph_map.connections: |
| | if conn.source_id in G and conn.target_id in G: |
| | G.add_edge( |
| | conn.source_id, |
| | conn.target_id, |
| | weight=conn.strength |
| | ) |
| | |
| | |
| | width, height = glyph_map.dimensions |
| | pos = nx.spring_layout( |
| | G, |
| | k=0.2, |
| | iterations=100, |
| | seed=42 |
| | ) |
| | |
| | |
| | pos_array = np.array(list(pos.values())) |
| | if len(pos_array) > 0: |
| | min_x, min_y = pos_array.min(axis=0) |
| | max_x, max_y = pos_array.max(axis=0) |
| | |
| | |
| | x_range = max_x - min_x |
| | y_range = max_y - min_y |
| | |
| | if x_range > 0: |
| | scale_x = (width * 0.8) / x_range |
| | else: |
| | scale_x = 1.0 |
| | |
| | if y_range > 0: |
| | scale_y = (height * 0.8) / y_range |
| | else: |
| | scale_y = 1.0 |
| | |
| | |
| | for node_id, (x, y) in pos.items(): |
| | x_scaled = ((x - min_x) * scale_x) + width * 0.1 |
| | y_scaled = ((y - min_y) * scale_y) + height * 0.1 |
| | pos[node_id] = (x_scaled, y_scaled) |
| | |
| | |
| | for glyph in glyph_map.glyphs: |
| | if glyph.id in pos: |
| | glyph.position = pos[glyph.id] |
| | |
| | return glyph_map |
| | |
| | def _apply_hierarchical_layout(self, glyph_map: GlyphMap) -> GlyphMap: |
| | """Apply hierarchical layout to a glyph map.""" |
| | |
| | G = nx.DiGraph() |
| | |
| | |
| | for glyph in glyph_map.glyphs: |
| | G.add_node(glyph.id, size=glyph.size, type=glyph.type.value) |
| | |
| | |
| | for conn in glyph_map.connections: |
| | if conn.directed and conn.source_id in G and conn.target_id in G: |
| | G.add_edge( |
| | conn.source_id, |
| | conn.target_id, |
| | weight=conn.strength |
| | ) |
| | |
| | |
| | root_nodes = [n for n in G.nodes() if G.in_degree(n) == 0] |
| | |
| | |
| | if not root_nodes: |
| | if glyph_map.focal_points: |
| | root_nodes = [fp for fp in glyph_map.focal_points if fp in G] |
| | else: |
| | root_nodes = [glyph_map.glyphs[0].id] if glyph_map.glyphs else [] |
| | |
| | |
| | width, height = glyph_map.dimensions |
| | |
| | |
| | if root_nodes: |
| | |
| | layers = {} |
| | visited = set() |
| | |
| | |
| | current_layer = root_nodes |
| | layer_idx = 0 |
| | |
| | while current_layer and layer_idx < 20: |
| | layers[layer_idx] = current_layer |
| | next_layer = [] |
| | |
| | for node in current_layer: |
| | visited.add(node) |
| | for _, neighbor in G.out_edges(node): |
| | if neighbor not in visited and neighbor not in next_layer: |
| | next_layer.append(neighbor) |
| | |
| | current_layer = next_layer |
| | layer_idx += 1 |
| | |
| | |
| | for layer_idx, nodes in layers.items(): |
| | y_pos = (layer_idx + 1) * (height / (len(layers) + 1)) |
| | x_step = width / (len(nodes) + 1) |
| | |
| | for i, node_id in enumerate(nodes): |
| | x_pos = (i + 1) * x_step |
| | |
| | for glyph in glyph_map.glyphs: |
| | if glyph.id == node_id: |
| | glyph.position = (x_pos, y_pos) |
| | break |
| | |
| | |
| | unvisited = [g.id for g in glyph_map.glyphs if g.id not in visited] |
| | if unvisited: |
| | y_pos = (layer_idx + 1) * (height / (len(layers) + 2)) |
| | x_step = width / (len(unvisited) + 1) |
| | |
| | for i, node_id in enumerate(unvisited): |
| | x_pos = (i + 1) * x_step |
| | |
| | for glyph in glyph_map.glyphs: |
| | if glyph.id == node_id: |
| | glyph.position = (x_pos, y_pos) |
| | break |
| | else: |
| | |
| | glyph_map = self._apply_grid_layout(glyph_map) |
| | |
| | return glyph_map |
| | |
| | def _apply_circular_layout(self, glyph_map: GlyphMap) -> GlyphMap: |
| | """Apply circular layout to a glyph map.""" |
| | |
| | G = nx.Graph() |
| | |
| | |
| | for glyph in glyph_map.glyphs: |
| | G.add_node(glyph.id, size=glyph.size, type=glyph.type.value) |
| | |
| | |
| | for conn in glyph_map.connections: |
| | if conn.source_id in G and conn.target_id in G: |
| | G.add_edge( |
| | conn.source_id, |
| | conn.target_id, |
| | weight=conn.strength |
| | ) |
| | |
| | |
| | width, height = glyph_map.dimensions |
| | center_x, center_y = width / 2, height / 2 |
| | radius = min(width, height) * 0.4 |
| | |
| | pos = nx.circular_layout(G, scale=radius) |
| | |
| | |
| | for node_id, (x, y) in pos.items(): |
| | pos[node_id] = (x + center_x, y + center_y) |
| | |
| | |
| | for glyph in glyph_map.glyphs: |
| | if glyph.id in pos: |
| | glyph.position = pos[glyph.id] |
| | |
| | return glyph_map |
| | |
| | def _apply_grid_layout(self, glyph_map: GlyphMap) -> GlyphMap: |
| | """Apply grid layout to a glyph map.""" |
| | width, height = glyph_map.dimensions |
| | num_glyphs = len(glyph_map.glyphs) |
| | |
| | |
| | grid_size = int(np.ceil(np.sqrt(num_glyphs))) |
| | cell_width = width / (grid_size + 1) |
| | cell_height = height / (grid_size + 1) |
| | |
| | |
| | for i, glyph in enumerate(glyph_map.glyphs): |
| | row = i // grid_size |
| | col = i % grid_size |
| | |
| | glyph.position = ( |
| | (col + 1) * cell_width, |
| | (row + 1) * cell_height |
| | ) |
| | |
| | return glyph_map |
| | |
| | def _apply_radial_layout(self, glyph_map: GlyphMap) -> GlyphMap: |
| | """Apply radial layout to a glyph map.""" |
| | |
| | G = nx.Graph() |
| | |
| | |
| | for glyph in glyph_map.glyphs: |
| | G.add_node(glyph.id, size=glyph.size, type=glyph.type.value) |
| | |
| | |
| | for conn in glyph_map.connections: |
| | if conn.source_id in G and conn.target_id in G: |
| | G.add_edge( |
| | conn.source_id, |
| | conn.target_id, |
| | weight=conn.strength |
| | ) |
| | |
| | |
| | if glyph_map.focal_points: |
| | central_nodes = [fp for fp in glyph_map.focal_points if fp in G] |
| | else: |
| | |
| | centrality = nx.betweenness_centrality(G) |
| | central_nodes = sorted( |
| | centrality.keys(), |
| | key=lambda x: centrality[x], |
| | reverse=True |
| | )[:min(3, len(centrality))] |
| | |
| | |
| | width, height = glyph_map.dimensions |
| | center_x, center_y = width / 2, height / 2 |
| | |
| | pos = nx.kamada_kawai_layout(G) |
| | |
| | |
| | pos_array = np.array(list(pos.values())) |
| | if len(pos_array) > 0: |
| | min_x, min_y = pos_array.min(axis=0) |
| | max_x, max_y = pos_array.max(axis=0) |
| | |
| | |
| | x_range = max_x - min_x |
| | y_range = max_y - min_y |
| | |
| | if x_range > 0: |
| | scale_x = (width * 0.8) / x_range |
| | else: |
| | scale_x = 1.0 |
| | |
| | if y_range > 0: |
| | scale_y = (height * 0.8) / y_range |
| | else: |
| | scale_y = 1.0 |
| | |
| | |
| | for node_id, (x, y) in pos.items(): |
| | x_scaled = ((x - min_x) * scale_x) + width * 0.1 |
| | y_scaled = ((y - min_y) * scale_y) + height * 0.1 |
| | pos[node_id] = (x_scaled, y_scaled) |
| | |
| | |
| | if central_nodes: |
| | |
| | angle_step = 2 * np.pi / len(central_nodes) |
| | center_radius = min(width, height) * 0.15 |
| | |
| | for i, node_id in enumerate(central_nodes): |
| | angle = i * angle_step |
| | x = center_x + center_radius * np.cos(angle) |
| | y = center_y + center_radius * np.sin(angle) |
| | pos[node_id] = (x, y) |
| | |
| | |
| | for glyph in glyph_map.glyphs: |
| | if glyph.id in pos: |
| | glyph.position = pos[glyph.id] |
| | |
| | return glyph_map |
| | |
| | def _apply_focus( |
| | self, |
| | glyph_map: GlyphMap, |
| | focus_on: List[str] |
| | ) -> GlyphMap: |
| | """Apply focus to specific tokens or elements.""" |
| | |
| | focus_glyph_ids = [] |
| | |
| | for glyph in glyph_map.glyphs: |
| | |
| | if glyph.type == GlyphType.SENTINEL and hasattr(glyph, 'source_elements') and glyph.source_elements: |
| | for term in focus_on: |
| | if any(term in str(elem) for elem in glyph.source_elements): |
| | focus_glyph_ids.append(glyph.id) |
| | break |
| | |
| | if not focus_glyph_ids: |
| | |
| | return glyph_map |
| | |
| | |
| | glyph_map.focal_points = focus_glyph_ids |
| | |
| | |
| | for glyph in glyph_map.glyphs: |
| | if glyph.id in focus_glyph_ids: |
| | glyph.size *= 1.5 |
| | glyph.opacity = min(1.0, glyph.opacity + 0.2) |
| | |
| | |
| | for conn in glyph_map.connections: |
| | if conn.source_id in focus_glyph_ids or conn.target_id in focus_glyph_ids: |
| | conn.width *= 1.5 |
| | conn.opacity = min(1.0, conn.opacity + 0.2) |
| | |
| | return glyph_map |
| | |
| | def _find_related_glyphs( |
| | self, |
| | glyph_map1: GlyphMap, |
| | glyph_map2: GlyphMap |
| | ) -> List[Tuple[str, str, float]]: |
| | """Find related glyphs between two glyph maps.""" |
| | related_pairs = [] |
| | |
| | |
| | def token_similarity(g1: Glyph, g2: Glyph) -> float: |
| | """Calculate similarity between token glyphs.""" |
| | if (g1.type == GlyphType.SENTINEL and g2.type == GlyphType.SENTINEL and |
| | hasattr(g1, 'source_elements') and hasattr(g2, 'source_elements') and |
| | g1.source_elements and g2.source_elements): |
| | |
| | text1 = str(g1.source_elements[0]) |
| | text2 = str(g2.source_elements[0]) |
| | if text1 == text2: |
| | return 1.0 |
| | elif text1 in text2 or text2 in text1: |
| | return 0.8 |
| | else: |
| | |
| | return 1.0 - min(1.0, distance.levenshtein(text1, text2) / max(len(text1), len(text2))) |
| | return 0.0 |
| | |
| | def attribution_similarity(g1: Glyph, g2: Glyph) -> float: |
| | """Calculate similarity between attribution glyphs.""" |
| | if g1.type == GlyphType.ATTRIBUTION and g2.type == GlyphType.ATTRIBUTION: |
| | |
| | metadata_sim = 0.0 |
| | count = 0 |
| | |
| | |
| | for attr in ['source_index', 'target_index', 'attribution_type', 'strength']: |
| | if attr in g1.metadata and attr in g2.metadata: |
| | if g1.metadata[attr] == g2.metadata[attr]: |
| | metadata_sim += 1.0 |
| | else: |
| | |
| | if attr == 'strength' and isinstance(g1.metadata[attr], (int, float)) and isinstance(g2.metadata[attr], (int, float)): |
| | diff = abs(g1.metadata[attr] - g2.metadata[attr]) |
| | metadata_sim += max(0.0, 1.0 - diff) |
| | else: |
| | metadata_sim += 0.0 |
| | count += 1 |
| | |
| | |
| | symbol_sim = 1.0 if g1.symbol == g2.symbol else 0.0 |
| | |
| | |
| | if count > 0: |
| | return (metadata_sim / count) * 0.7 + symbol_sim * 0.3 |
| | else: |
| | return symbol_sim |
| | return 0.0 |
| | |
| | def residue_similarity(g1: Glyph, g2: Glyph) -> float: |
| | """Calculate similarity between residue glyphs.""" |
| | if g1.type == GlyphType.RESIDUE and g2.type == GlyphType.RESIDUE: |
| | |
| | if g1.metadata.get('pattern_type') == g2.metadata.get('pattern_type'): |
| | return 0.9 |
| | |
| | |
| | if g1.symbol == g2.symbol: |
| | return 0.7 |
| | |
| | |
| | if 'confidence' in g1.metadata and 'confidence' in g2.metadata: |
| | conf_diff = abs(g1.metadata['confidence'] - g2.metadata['confidence']) |
| | return max(0.0, 0.5 - conf_diff) |
| | |
| | return 0.3 |
| | return 0.0 |
| | |
| | def recursive_similarity(g1: Glyph, g2: Glyph) -> float: |
| | """Calculate similarity between recursive glyphs.""" |
| | if g1.type == GlyphType.RECURSIVE and g2.type == GlyphType.RECURSIVE: |
| | |
| | if g1.symbol == g2.symbol: |
| | return 0.8 |
| | |
| | |
| | if 'depth' in g1.metadata and 'depth' in g2.metadata: |
| | if g1.metadata['depth'] == g2.metadata['depth']: |
| | return 0.7 |
| | else: |
| | depth_diff = abs(g1.metadata['depth'] - g2.metadata['depth']) |
| | return max(0.0, 0.6 - (depth_diff * 0.1)) |
| | |
| | return 0.4 |
| | return 0.0 |
| | |
| | def meta_similarity(g1: Glyph, g2: Glyph) -> float: |
| | """Calculate similarity between meta glyphs.""" |
| | if g1.type == GlyphType.META and g2.type == GlyphType.META: |
| | |
| | if g1.symbol == g2.symbol: |
| | return 0.9 |
| | |
| | |
| | common_semantics = set(s.value for s in g1.semantics).intersection( |
| | set(s.value for s in g2.semantics) |
| | ) |
| | if common_semantics: |
| | return 0.6 + 0.3 * (len(common_semantics) / max(len(g1.semantics), len(g2.semantics))) |
| | |
| | return 0.3 |
| | return 0.0 |
| | |
| | |
| | for g1 in glyph_map1.glyphs: |
| | for g2 in glyph_map2.glyphs: |
| | similarity = 0.0 |
| | |
| | |
| | if g1.type == GlyphType.SENTINEL and g2.type == GlyphType.SENTINEL: |
| | similarity = token_similarity(g1, g2) |
| | elif g1.type == GlyphType.ATTRIBUTION and g2.type == GlyphType.ATTRIBUTION: |
| | similarity = attribution_similarity(g1, g2) |
| | elif g1.type == GlyphType.RESIDUE and g2.type == GlyphType.RESIDUE: |
| | similarity = residue_similarity(g1, g2) |
| | elif g1.type == GlyphType.RECURSIVE and g2.type == GlyphType.RECURSIVE: |
| | similarity = recursive_similarity(g1, g2) |
| | elif g1.type == GlyphType.META and g2.type == GlyphType.META: |
| | similarity = meta_similarity(g1, g2) |
| | elif g1.type == g2.type: |
| | |
| | if g1.symbol == g2.symbol: |
| | similarity = 0.6 |
| | else: |
| | similarity = 0.3 |
| | |
| | |
| | if similarity >= 0.5: |
| | related_pairs.append((g1.id, g2.id, similarity)) |
| | |
| | |
| | related_pairs.sort(key=lambda x: x[2], reverse=True) |
| | |
| | return related_pairs |
| | |
| | def _calculate_signature_similarity( |
| | self, |
| | signature1: str, |
| | signature2: str |
| | ) -> float: |
| | """Calculate similarity between two residue signatures.""" |
| | |
| | sig1 = signature1.lower() |
| | sig2 = signature2.lower() |
| | |
| | |
| | max_len = max(len(sig1), len(sig2)) |
| | if max_len == 0: |
| | return 1.0 |
| | |
| | lev_dist = distance.levenshtein(sig1, sig2) |
| | sim = 1.0 - (lev_dist / max_len) |
| | |
| | |
| | common_prefix_len = 0 |
| | for i in range(min(len(sig1), len(sig2))): |
| | if sig1[i] == sig2[i]: |
| | common_prefix_len += 1 |
| | else: |
| | break |
| | |
| | prefix_boost = 0.0 |
| | if common_prefix_len > 3: |
| | prefix_boost = min(0.2, common_prefix_len / max_len) |
| | |
| | return min(1.0, sim + prefix_boost) |
| | |
| | def _calculate_context_similarity( |
| | self, |
| | context1: Dict[str, Any], |
| | context2: Dict[str, Any] |
| | ) -> float: |
| | """Calculate similarity between two residue contexts.""" |
| | |
| | if not context1 or not context2: |
| | return 0.0 |
| | |
| | |
| | common_keys = set(context1.keys()).intersection(set(context2.keys())) |
| | if not common_keys: |
| | return 0.0 |
| | |
| | |
| | similarity_sum = 0.0 |
| | for key in common_keys: |
| | val1 = context1[key] |
| | val2 = context2[key] |
| | |
| | if isinstance(val1, str) and isinstance(val2, str): |
| | |
| | similarity_sum += self._calculate_signature_similarity(val1, val2) |
| | elif isinstance(val1, (int, float)) and isinstance(val2, (int, float)): |
| | |
| | max_val = max(abs(val1), abs(val2)) |
| | if max_val > 0: |
| | similarity_sum += 1.0 - min(1.0, abs(val1 - val2) / max_val) |
| | else: |
| | similarity_sum += 1.0 |
| | elif val1 == val2: |
| | |
| | similarity_sum += 1.0 |
| | else: |
| | similarity_sum += 0.0 |
| | |
| | |
| | return similarity_sum / len(common_keys) |
| | |
| | def _get_color_for_attribution(self, link: AttributionLink) -> str: |
| | """Get color for attribution link based on type and strength.""" |
| | if link.attribution_type == AttributionType.DIRECT: |
| | |
| | blue_val = max(0, int(255 - (link.strength * 170))) |
| | return f"rgb(0, {120 + int(link.strength * 70)}, {180 + blue_val})" |
| | elif link.attribution_type == AttributionType.INDIRECT: |
| | |
| | return f"rgb({120 + int(link.strength * 70)}, 0, {180 + int(link.strength * 60)})" |
| | elif link.attribution_type == AttributionType.RESIDUAL: |
| | |
| | return f"rgb(0, {150 + int(link.strength * 70)}, {80 + int(link.strength * 40)})" |
| | elif link.attribution_type == AttributionType.RECURSIVE: |
| | |
| | return f"rgb({200 + int(link.strength * 50)}, {70 + int(link.strength * 60)}, 0)" |
| | else: |
| | |
| | intensity = 100 + int(link.strength * 100) |
| | return f"rgb({intensity}, {intensity}, {intensity})" |
| | |
| | def _get_color_for_residue_type(self, residue_type: str) -> str: |
| | """Get color for residue based on type.""" |
| | colors = { |
| | "memory_decay": "#3498db", |
| | "value_conflict": "#e74c3c", |
| | "ghost_activation": "#9b59b6", |
| | "boundary_hesitation": "#f39c12", |
| | "null_output": "#95a5a6", |
| | "recursive_collapse": "#27ae60", |
| | "attention_drift": "#1abc9c", |
| | "token_oscillation": "#d35400" |
| | } |
| | |
| | return colors.get(residue_type, "#2c3e50") |
| | |
| | def _get_color_for_layer(self, layer: int, alpha: float = 1.0) -> str: |
| | """Get color for a specific layer.""" |
| | |
| | hue = (layer * 30) % 360 |
| | return f"hsl({hue}, 70%, 60%, {alpha})" |
| | |
| | def _get_color_for_depth(self, depth: int, alpha: float = 1.0) -> str: |
| | """Get color for a specific recursion depth.""" |
| | |
| | if depth == 0: |
| | return f"rgba(155, 89, 182, {alpha})" |
| | elif depth == 1: |
| | return f"rgba(52, 152, 219, {alpha})" |
| | elif depth == 2: |
| | return f"rgba(26, 188, 156, {alpha})" |
| | elif depth == 3: |
| | return f"rgba(39, 174, 96, {alpha})" |
| | elif depth == 4: |
| | return f"rgba(241, 196, 15, {alpha})" |
| | elif depth >= 5: |
| | return f"rgba(230, 126, 34, {alpha})" |
| | |
| | def _scale_positions( |
| | self, |
| | positions: np.ndarray, |
| | dimensions: Tuple[int, int] |
| | ) -> np.ndarray: |
| | """Scale positions to fit dimensions.""" |
| | width, height = dimensions |
| | |
| | |
| | min_x, min_y = positions.min(axis=0) |
| | max_x, max_y = positions.max(axis=0) |
| | |
| | |
| | x_range = max_x - min_x |
| | y_range = max_y - min_y |
| | |
| | if x_range > 0: |
| | scale_x = (width * 0.8) / x_range |
| | else: |
| | scale_x = 1.0 |
| | |
| | if y_range > 0: |
| | scale_y = (height * 0.8) / y_range |
| | else: |
| | scale_y = 1.0 |
| | |
| | |
| | positions_scaled = np.zeros_like(positions) |
| | positions_scaled[:, 0] = (positions[:, 0] - min_x) * scale_x + width * 0.1 |
| | positions_scaled[:, 1] = (positions[:, 1] - min_y) * scale_y + height * 0.1 |
| | |
| | return positions_scaled |
| | |
| | def _simple_visualization( |
| | self, |
| | glyph_map: GlyphMap, |
| | output_path: Optional[str] = None |
| | ) -> Dict[str, Any]: |
| | """Simple matplotlib visualization if no visualizer available.""" |
| | |
| | plt.figure(figsize=(12, 10)) |
| | |
| | |
| | for conn in glyph_map.connections: |
| | |
| | source_glyph = next((g for g in glyph_map.glyphs if g.id == conn.source_id), None) |
| | target_glyph = next((g for g in glyph_map.glyphs if g.id == conn.target_id), None) |
| | |
| | if source_glyph and target_glyph: |
| | |
| | source_x, source_y = source_glyph.position |
| | target_x, target_y = target_glyph.position |
| | |
| | |
| | plt.plot( |
| | [source_x, target_x], |
| | [source_y, target_y], |
| | color=conn.color, |
| | linewidth=conn.width, |
| | alpha=conn.opacity, |
| | zorder=1, |
| | linestyle='-' if conn.directed else '--' |
| | ) |
| | |
| | |
| | if conn.directed: |
| | dx = target_x - source_x |
| | dy = target_y - source_y |
| | dist = np.sqrt(dx**2 + dy**2) |
| | if dist > 0: |
| | |
| | dx, dy = dx / dist, dy / dist |
| | midpoint_x = (source_x + target_x) / 2 |
| | midpoint_y = (source_y + target_y) / 2 |
| | |
| | |
| | plt.arrow( |
| | midpoint_x - dx * 5, |
| | midpoint_y - dy * 5, |
| | dx * 10, |
| | dy * 10, |
| | head_width=5, |
| | head_length=5, |
| | fc=conn.color, |
| | ec=conn.color, |
| | alpha=conn.opacity, |
| | zorder=1 |
| | ) |
| | |
| | |
| | for glyph in glyph_map.glyphs: |
| | x, y = glyph.position |
| | |
| | |
| | plt.text( |
| | x, y, |
| | glyph.symbol, |
| | fontsize=glyph.size, |
| | color=glyph.color, |
| | alpha=glyph.opacity, |
| | ha='center', |
| | va='center', |
| | zorder=2 |
| | ) |
| | |
| | |
| | if glyph.id in glyph_map.focal_points: |
| | circle = plt.Circle( |
| | (x, y), |
| | glyph.size * 0.8, |
| | fill=False, |
| | color='black', |
| | linestyle=':', |
| | alpha=0.7, |
| | zorder=1 |
| | ) |
| | plt.gca().add_patch(circle) |
| | |
| | |
| | width, height = glyph_map.dimensions |
| | plt.xlim(0, width) |
| | plt.ylim(0, height) |
| | |
| | |
| | plt.axis('off') |
| | |
| | |
| | title = f"Glyph Map: {glyph_map.source_type.capitalize()}" |
| | if "trace_target" in glyph_map.metadata: |
| | title += f" - {glyph_map.metadata['trace_target']}" |
| | plt.title(title) |
| | |
| | |
| | if output_path: |
| | plt.savefig(output_path, dpi=300, bbox_inches='tight') |
| | plt.close() |
| | return {"output_path": output_path} |
| | |
| | |
| | return {"figure": plt.gcf()} |
| |
|
| |
|
| | |
| | class GlyphExplorer: |
| | """ |
| | Utility class for interactive exploration of glyph maps. |
| | |
| | This class provides methods for filtering, searching, and analyzing |
| | glyph maps to extract insights and patterns. |
| | """ |
| | |
| | def __init__(self, glyph_map: GlyphMap): |
| | """ |
| | Initialize the glyph explorer. |
| | |
| | Parameters: |
| | ----------- |
| | glyph_map : GlyphMap |
| | Glyph map to explore |
| | """ |
| | self.glyph_map = glyph_map |
| | self.filtered_glyphs = glyph_map.glyphs |
| | self.filtered_connections = glyph_map.connections |
| | |
| | def filter_by_type(self, glyph_type: GlyphType) -> 'GlyphExplorer': |
| | """ |
| | Filter glyphs by type. |
| | |
| | Parameters: |
| | ----------- |
| | glyph_type : GlyphType |
| | Type of glyphs to include |
| | |
| | Returns: |
| | -------- |
| | GlyphExplorer |
| | Self, for method chaining |
| | """ |
| | self.filtered_glyphs = [ |
| | g for g in self.filtered_glyphs |
| | if g.type == glyph_type |
| | ] |
| | self._update_connections() |
| | return self |
| | |
| | def filter_by_semantic(self, semantic: GlyphSemantic) -> 'GlyphExplorer': |
| | """ |
| | Filter glyphs by semantic dimension. |
| | |
| | Parameters: |
| | ----------- |
| | semantic : GlyphSemantic |
| | Semantic dimension to filter by |
| | |
| | Returns: |
| | -------- |
| | GlyphExplorer |
| | Self, for method chaining |
| | """ |
| | self.filtered_glyphs = [ |
| | g for g in self.filtered_glyphs |
| | if semantic in g.semantics |
| | ] |
| | self._update_connections() |
| | return self |
| | |
| | def filter_by_symbol(self, symbol: str) -> 'GlyphExplorer': |
| | """ |
| | Filter glyphs by symbol. |
| | |
| | Parameters: |
| | ----------- |
| | symbol : str |
| | Symbol to filter by |
| | |
| | Returns: |
| | -------- |
| | GlyphExplorer |
| | Self, for method chaining |
| | """ |
| | self.filtered_glyphs = [ |
| | g for g in self.filtered_glyphs |
| | if g.symbol == symbol |
| | ] |
| | self._update_connections() |
| | return self |
| | |
| | def filter_by_size( |
| | self, |
| | min_size: Optional[float] = None, |
| | max_size: Optional[float] = None |
| | ) -> 'GlyphExplorer': |
| | """ |
| | Filter glyphs by size. |
| | |
| | Parameters: |
| | ----------- |
| | min_size : Optional[float] |
| | Minimum size (inclusive) |
| | max_size : Optional[float] |
| | Maximum size (inclusive) |
| | |
| | Returns: |
| | -------- |
| | GlyphExplorer |
| | Self, for method chaining |
| | """ |
| | if min_size is not None: |
| | self.filtered_glyphs = [ |
| | g for g in self.filtered_glyphs |
| | if g.size >= min_size |
| | ] |
| | |
| | if max_size is not None: |
| | self.filtered_glyphs = [ |
| | g for g in self.filtered_glyphs |
| | if g.size <= max_size |
| | ] |
| | |
| | self._update_connections() |
| | return self |
| | |
| | def filter_by_metadata( |
| | self, |
| | key: str, |
| | value: Any |
| | ) -> 'GlyphExplorer': |
| | """ |
| | Filter glyphs by metadata field. |
| | |
| | Parameters: |
| | ----------- |
| | key : str |
| | Metadata key |
| | value : Any |
| | Metadata value to match |
| | |
| | Returns: |
| | -------- |
| | GlyphExplorer |
| | Self, for method chaining |
| | """ |
| | self.filtered_glyphs = [ |
| | g for g in self.filtered_glyphs |
| | if key in g.metadata and g.metadata[key] == value |
| | ] |
| | self._update_connections() |
| | return self |
| | |
| | def filter_connections_by_type(self, conn_type: str) -> 'GlyphExplorer': |
| | """ |
| | Filter connections by type. |
| | |
| | Parameters: |
| | ----------- |
| | conn_type : str |
| | Connection type to filter by |
| | |
| | Returns: |
| | -------- |
| | GlyphExplorer |
| | Self, for method chaining |
| | """ |
| | self.filtered_connections = [ |
| | c for c in self.filtered_connections |
| | if c.type == conn_type |
| | ] |
| | return self |
| | |
| | def filter_connections_by_strength( |
| | self, |
| | min_strength: Optional[float] = None, |
| | max_strength: Optional[float] = None |
| | ) -> 'GlyphExplorer': |
| | """ |
| | Filter connections by strength. |
| | |
| | Parameters: |
| | ----------- |
| | min_strength : Optional[float] |
| | Minimum strength (inclusive) |
| | max_strength : Optional[float] |
| | Maximum strength (inclusive) |
| | |
| | Returns: |
| | -------- |
| | GlyphExplorer |
| | Self, for method chaining |
| | """ |
| | if min_strength is not None: |
| | self.filtered_connections = [ |
| | c for c in self.filtered_connections |
| | if c.strength >= min_strength |
| | ] |
| | |
| | if max_strength is not None: |
| | self.filtered_connections = [ |
| | c for c in self.filtered_connections |
| | if c.strength <= max_strength |
| | ] |
| | |
| | return self |
| | |
| | def search_by_description(self, query: str) -> 'GlyphExplorer': |
| | """ |
| | Search glyphs by description text. |
| | |
| | Parameters: |
| | ----------- |
| | query : str |
| | Search query |
| | |
| | Returns: |
| | -------- |
| | GlyphExplorer |
| | Self, for method chaining |
| | """ |
| | self.filtered_glyphs = [ |
| | g for g in self.filtered_glyphs |
| | if g.description and query.lower() in g.description.lower() |
| | ] |
| | self._update_connections() |
| | return self |
| | |
| | def find_central_glyphs(self, top_n: int = 5) -> List[Glyph]: |
| | """ |
| | Find central glyphs based on connection count. |
| | |
| | Parameters: |
| | ----------- |
| | top_n : int |
| | Number of top central glyphs to return |
| | |
| | Returns: |
| | -------- |
| | List[Glyph] |
| | Top central glyphs |
| | """ |
| | |
| | glyph_ids = [g.id for g in self.filtered_glyphs] |
| | connection_counts = {} |
| | |
| | for glyph_id in glyph_ids: |
| | count = sum( |
| | 1 for c in self.filtered_connections |
| | if c.source_id == glyph_id or c.target_id == glyph_id |
| | ) |
| | connection_counts[glyph_id] = count |
| | |
| | |
| | top_glyph_ids = sorted( |
| | connection_counts.keys(), |
| | key=lambda x: connection_counts[x], |
| | reverse=True |
| | )[:top_n] |
| | |
| | |
| | top_glyphs = [ |
| | g for g in self.filtered_glyphs |
| | if g.id in top_glyph_ids |
| | ] |
| | |
| | return top_glyphs |
| | |
| | def find_clusters(self, min_size: int = 3) -> Dict[str, List[Glyph]]: |
| | """ |
| | Find clusters of connected glyphs. |
| | |
| | Parameters: |
| | ----------- |
| | min_size : int |
| | Minimum cluster size |
| | |
| | Returns: |
| | -------- |
| | Dict[str, List[Glyph]] |
| | Dictionary of clusters |
| | """ |
| | |
| | G = nx.Graph() |
| | |
| | |
| | for glyph in self.filtered_glyphs: |
| | G.add_node(glyph.id) |
| | |
| | |
| | for conn in self.filtered_connections: |
| | if conn.source_id in G and conn.target_id in G: |
| | G.add_edge(conn.source_id, conn.target_id, weight=conn.strength) |
| | |
| | |
| | components = list(nx.connected_components(G)) |
| | |
| | |
| | clusters = {} |
| | for i, component in enumerate(components): |
| | if len(component) >= min_size: |
| | cluster_glyphs = [ |
| | g for g in self.filtered_glyphs |
| | if g.id in component |
| | ] |
| | clusters[f"cluster_{i}"] = cluster_glyphs |
| | |
| | return clusters |
| | |
| | def calculate_statistics(self) -> Dict[str, Any]: |
| | """ |
| | Calculate statistics for the filtered glyph map. |
| | |
| | Returns: |
| | -------- |
| | Dict[str, Any] |
| | Dictionary of statistics |
| | """ |
| | stats = { |
| | "num_glyphs": len(self.filtered_glyphs), |
| | "num_connections": len(self.filtered_connections), |
| | "glyph_types": {}, |
| | "connection_types": {}, |
| | "avg_connection_strength": 0.0, |
| | "glyph_size_stats": { |
| | "min": float('inf'), |
| | "max": 0.0, |
| | "avg": 0.0 |
| | } |
| | } |
| | |
| | |
| | for glyph in self.filtered_glyphs: |
| | glyph_type = glyph.type.value |
| | if glyph_type not in stats["glyph_types"]: |
| | stats["glyph_types"][glyph_type] = 0 |
| | stats["glyph_types"][glyph_type] += 1 |
| | |
| | |
| | stats["glyph_size_stats"]["min"] = min(stats["glyph_size_stats"]["min"], glyph.size) |
| | stats["glyph_size_stats"]["max"] = max(stats["glyph_size_stats"]["max"], glyph.size) |
| | stats["glyph_size_stats"]["avg"] += glyph.size |
| | |
| | if self.filtered_glyphs: |
| | stats["glyph_size_stats"]["avg"] /= len(self.filtered_glyphs) |
| | else: |
| | stats["glyph_size_stats"]["min"] = 0.0 |
| | |
| | |
| | total_strength = 0.0 |
| | for conn in self.filtered_connections: |
| | if conn.type not in stats["connection_types"]: |
| | stats["connection_types"][conn.type] = 0 |
| | stats["connection_types"][conn.type] += 1 |
| | total_strength += conn.strength |
| | |
| | if self.filtered_connections: |
| | stats["avg_connection_strength"] = total_strength / len(self.filtered_connections) |
| | |
| | return stats |
| | |
| | def reset_filters(self) -> 'GlyphExplorer': |
| | """ |
| | Reset all filters. |
| | |
| | Returns: |
| | -------- |
| | GlyphExplorer |
| | Self, for method chaining |
| | """ |
| | self.filtered_glyphs = self.glyph_map.glyphs |
| | self.filtered_connections = self.glyph_map.connections |
| | return self |
| | |
| | def _update_connections(self): |
| | """Update connections based on filtered glyphs.""" |
| | filtered_glyph_ids = [g.id for g in self.filtered_glyphs] |
| | self.filtered_connections = [ |
| | c for c in self.glyph_map.connections |
| | if c.source_id in filtered_glyph_ids and c.target_id in filtered_glyph_ids |
| | ] |
| |
|
| |
|
| | |
| | if __name__ == "__main__": |
| | import argparse |
| | |
| | parser = argparse.ArgumentParser(description="Glyph Mapper for Attribution and Residue Visualization") |
| | parser.add_argument("--input", "-i", type=str, help="Input attribution or residue file") |
| | parser.add_argument("--output", "-o", type=str, help="Output visualization file") |
| | parser.add_argument("--type", "-t", type=str, default="attribution", choices=["attribution", "residue", "attention", "recursive"], help="Type of input data") |
| | parser.add_argument("--layout", "-l", type=str, default="force_directed", choices=["force_directed", "hierarchical", "circular", "grid", "radial"], help="Layout type") |
| | parser.add_argument("--width", "-w", type=int, default=1200, help="Visualization width") |
| | parser.add_argument("--height", "-h", type=int, default=900, |
| | parser.add_argument("--height", "-h", type=int, default=900, help="Visualization height") |
| | parser.add_argument("--focus", "-f", type=str, help="Comma-separated tokens to focus on") |
| | parser.add_argument("--include-tokens", action="store_true", help="Include token sentinels in visualization") |
| | parser.add_argument("--cluster", action="store_true", help="Apply clustering to similar patterns") |
| | parser.add_argument("--save-map", "-s", type=str, help="Save glyph map to file") |
| | parser.add_argument("--interactive", "-i", action="store_true", help="Generate interactive visualization") |
| | |
| | args = parser.parse_args() |
| | |
| | |
| | mapper = GlyphMapper() |
| | |
| | if args.input: |
| | |
| | with open(args.input, "r") as f: |
| | data = json.load(f) |
| | |
| | |
| | if args.type == "attribution": |
| | |
| | attribution_map = AttributionMap( |
| | prompt_tokens=data.get("prompt_tokens", []), |
| | output_tokens=data.get("output_tokens", []), |
| | links=[ |
| | AttributionLink( |
| | source_idx=link.get("source_idx", 0), |
| | target_idx=link.get("target_idx", 0), |
| | attribution_type=AttributionType(link.get("attribution_type", "direct")), |
| | strength=link.get("strength", 0.5), |
| | attention_heads=link.get("attention_heads", []), |
| | layers=link.get("layers", []), |
| | intermediate_tokens=link.get("intermediate_tokens", []), |
| | residue=link.get("residue") |
| | ) |
| | for link in data.get("links", []) |
| | ], |
| | token_salience=data.get("token_salience", {}), |
| | attribution_gaps=data.get("attribution_gaps", []), |
| | collapsed_regions=data.get("collapsed_regions", []), |
| | uncertainty=data.get("uncertainty", {}), |
| | metadata=data.get("metadata", {}) |
| | ) |
| | |
| | |
| | focus_on = args.focus.split(",") if args.focus else None |
| | |
| | |
| | glyph_map = mapper.map_attribution( |
| | attribution_map=attribution_map, |
| | layout_type=args.layout, |
| | dimensions=(args.width, args.height), |
| | include_tokens=args.include_tokens, |
| | focus_on=focus_on |
| | ) |
| | |
| | elif args.type == "residue": |
| | |
| | residue_patterns = [ |
| | ResiduePattern( |
| | type=pattern.get("type", "unknown"), |
| | pattern=pattern.get("pattern", ""), |
| | context=pattern.get("context", {}), |
| | signature=pattern.get("signature", ""), |
| | confidence=pattern.get("confidence", 0.5) |
| | ) |
| | for pattern in data |
| | ] |
| | |
| | |
| | glyph_map = mapper.map_residue_patterns( |
| | residue_patterns=residue_patterns, |
| | layout_type=args.layout, |
| | dimensions=(args.width, args.height), |
| | cluster_patterns=args.cluster |
| | ) |
| | |
| | elif args.type == "attention": |
| | |
| | glyph_map = mapper.map_attention_heads( |
| | attention_data=data, |
| | layout_type=args.layout, |
| | dimensions=(args.width, args.height), |
| | include_tokens=args.include_tokens |
| | ) |
| | |
| | elif args.type == "recursive": |
| | |
| | glyph_map = mapper.map_recursive_trace( |
| | trace_data=data, |
| | layout_type=args.layout, |
| | dimensions=(args.width, args.height) |
| | ) |
| | |
| | else: |
| | print(f"Unknown data type: {args.type}") |
| | exit(1) |
| | |
| | |
| | if args.save_map: |
| | mapper.save_glyph_map(glyph_map, args.save_map) |
| | |
| | |
| | if args.output: |
| | mapper.visualize( |
| | glyph_map=glyph_map, |
| | output_path=args.output, |
| | interactive=args.interactive |
| | ) |
| | print(f"Visualization saved to {args.output}") |
| | else: |
| | |
| | explorer = GlyphExplorer(glyph_map) |
| | stats = explorer.calculate_statistics() |
| | |
| | print(f"Glyph Map Statistics:") |
| | print(f" Number of glyphs: {stats['num_glyphs']}") |
| | print(f" Number of connections: {stats['num_connections']}") |
| | print(f" Glyph types: {stats['glyph_types']}") |
| | print(f" Connection types: {stats['connection_types']}") |
| | print(f" Average connection strength: {stats['avg_connection_strength']:.2f}") |
| | |
| | |
| | central_glyphs = explorer.find_central_glyphs(top_n=3) |
| | print(f"\nCentral Glyphs:") |
| | for glyph in central_glyphs: |
| | print(f" {glyph.symbol} - {glyph.description}") |
| | else: |
| | print("No input file specified. Use --input to provide input data.") |
| | exit(1) |
| |
|