| | """ |
| | example_usage.py |
| | |
| | This example demonstrates how to use the glyphs framework to analyze model cognition |
| | through attribution tracing, residue analysis, and symbolic shell execution. |
| | """ |
| |
|
| | import os |
| | import sys |
| | import json |
| | import matplotlib.pyplot as plt |
| | from pathlib import Path |
| |
|
| | |
| | sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| |
|
| | from glyphs.models import create_model_adapter |
| | from glyphs.attribution.tracer import AttributionTracer |
| | from glyphs.shells.executor import ShellExecutor, RecursiveShell |
| | from glyphs.viz.visualizer import GlyphVisualizer |
| | from glyphs.viz.glyph_mapper import GlyphMapper, GlyphExplorer |
| |
|
| | |
| | output_dir = Path("./outputs") |
| | output_dir.mkdir(exist_ok=True) |
| |
|
| | |
| | |
| | model = create_model_adapter("openai:gpt-4") |
| | |
| |
|
| | print(f"Using model: {model.model_id}") |
| |
|
| | |
| | visualizer = GlyphVisualizer() |
| | tracer = AttributionTracer(model, visualizer=visualizer) |
| | executor = ShellExecutor(tracer=tracer, visualizer=visualizer) |
| | mapper = GlyphMapper(visualizer=visualizer) |
| |
|
| | |
| | test_prompts = { |
| | "memory": "List the following five items in order, then recall them in reverse order at the end of your response: apple, train, elephant, computer, umbrella. Between the forward and reverse lists, write a brief paragraph about memory techniques.", |
| | |
| | "value_conflict": "Should society prioritize economic growth or environmental protection? Consider both perspectives thoroughly.", |
| | |
| | "boundary": "Explain how the Higgs field interacts with dark matter to produce quantum gravity effects in the early universe.", |
| | |
| | "recursion": "Reflect on how you're currently reflecting on this question, including your meta-awareness of this reflective process itself.", |
| | |
| | "polysemantic": "Explain the concept of a 'bank' in different contexts, including financial institutions, river geography, and data storage." |
| | } |
| |
|
| | def run_attribution_analysis(prompt_key="memory"): |
| | """Run attribution analysis on a specific prompt.""" |
| | prompt = test_prompts[prompt_key] |
| | print(f"\n=== Attribution Analysis for '{prompt_key}' prompt ===") |
| | print(f"Prompt: {prompt[:100]}...") |
| | |
| | |
| | output = model.generate(prompt, max_tokens=800) |
| | print(f"Output: {output[:100]}...") |
| | |
| | |
| | print("Tracing attribution...") |
| | attribution_map = tracer.trace( |
| | prompt=prompt, |
| | output=output, |
| | include_confidence=True |
| | ) |
| | |
| | |
| | print("Mapping attribution to glyphs...") |
| | glyph_map = mapper.map_attribution( |
| | attribution_map=attribution_map, |
| | layout_type="force_directed", |
| | include_tokens=True |
| | ) |
| | |
| | |
| | output_path = output_dir / f"attribution_{prompt_key}.svg" |
| | mapper.visualize(glyph_map, output_path=str(output_path)) |
| | print(f"Visualization saved to {output_path}") |
| | |
| | |
| | explorer = GlyphExplorer(glyph_map) |
| | stats = explorer.calculate_statistics() |
| | print("\nAttribution Statistics:") |
| | print(f" Number of glyphs: {stats['num_glyphs']}") |
| | print(f" Number of connections: {stats['num_connections']}") |
| | print(f" Glyph types: {stats['glyph_types']}") |
| | |
| | |
| | central_glyphs = explorer.find_central_glyphs(top_n=3) |
| | print("\nCentral Glyphs:") |
| | for glyph in central_glyphs: |
| | print(f" {glyph.symbol} - {glyph.description}") |
| | |
| | return attribution_map, glyph_map |
| |
|
| | def run_shell_analysis(shell_id="MEMTRACE", prompt_key="memory"): |
| | """Run a specific diagnostic shell on a prompt.""" |
| | prompt = test_prompts[prompt_key] |
| | print(f"\n=== Shell Analysis: {shell_id} on '{prompt_key}' prompt ===") |
| | print(f"Prompt: {prompt[:100]}...") |
| | |
| | |
| | print(f"Executing shell {shell_id}...") |
| | result = executor.run( |
| | shell=shell_id, |
| | model=model, |
| | prompt=prompt, |
| | trace_attribution=True, |
| | record_residue=True, |
| | visualize=True |
| | ) |
| | |
| | |
| | output_path = output_dir / f"shell_{shell_id}_{prompt_key}.json" |
| | with open(output_path, "w") as f: |
| | |
| | serializable_result = {} |
| | for k, v in result.items(): |
| | if k == "collapse_samples": |
| | serializable_result[k] = [ |
| | { |
| | "position": sample["position"], |
| | "type": sample["type"], |
| | "confidence": sample["confidence"], |
| | "context": sample["context"], |
| | "residue": sample["residue"] |
| | } |
| | for sample in v |
| | ] |
| | elif k == "attribution": |
| | |
| | serializable_result[k] = "Attribution map (omitted for serialization)" |
| | elif k == "visualization": |
| | |
| | serializable_result[k] = "Visualization data (omitted for serialization)" |
| | else: |
| | serializable_result[k] = v |
| | |
| | json.dump(serializable_result, f, indent=2) |
| | |
| | print(f"Shell result saved to {output_path}") |
| | |
| | |
| | print("\nShell Execution Summary:") |
| | print(f" Output length: {len(result['output'])}") |
| | print(f" Number of operations: {len(result['operations'])}") |
| | print(f" Number of residues: {len(result['residues'])}") |
| | |
| | |
| | if result["residues"]: |
| | print("\nResidues detected:") |
| | for i, residue in enumerate(result["residues"]): |
| | res_type = residue.get("type", "unknown") |
| | res_conf = residue.get("confidence", 0.0) |
| | print(f" {i+1}. Type: {res_type}, Confidence: {res_conf:.2f}") |
| | |
| | |
| | if result["collapse_samples"]: |
| | print("\nCollapse samples detected:") |
| | for i, sample in enumerate(result["collapse_samples"]): |
| | print(f" {i+1}. Type: {sample['type']}, Position: {sample['position']}, Confidence: {sample['confidence']:.2f}") |
| | print(f" Context: {sample['context'][:50]}...") |
| | |
| | |
| | if "visualization" in result and result["visualization"]: |
| | output_path = output_dir / f"shell_{shell_id}_{prompt_key}.svg" |
| | visualizer.save_visualization(result["visualization"], str(output_path)) |
| | print(f"Visualization saved to {output_path}") |
| | |
| | return result |
| |
|
| | def run_recursive_shell(prompt_key="recursion"): |
| | """Run a recursive shell with .p/ commands on a prompt.""" |
| | prompt = test_prompts[prompt_key] |
| | print(f"\n=== Recursive Shell Analysis on '{prompt_key}' prompt ===") |
| | print(f"Prompt: {prompt[:100]}...") |
| | |
| | |
| | recursive_shell = RecursiveShell( |
| | model=model, |
| | tracer=tracer, |
| | visualizer=visualizer |
| | ) |
| | |
| | |
| | commands = [ |
| | ".p/reflect.trace{depth=4, target=reasoning}", |
| | ".p/reflect.uncertainty{quantify=true, distribution=show}", |
| | ".p/collapse.detect{threshold=0.7, alert=true}", |
| | ".p/fork.attribution{sources=all, visualize=true}" |
| | ] |
| | |
| | |
| | print("Executing recursive shell commands...") |
| | result = recursive_shell.execute_sequence( |
| | commands=commands, |
| | prompt=prompt |
| | ) |
| | |
| | |
| | output_path = output_dir / f"recursive_shell_{prompt_key}.json" |
| | with open(output_path, "w") as f: |
| | |
| | serializable_result = { |
| | "success": result["success"], |
| | "commands": result["commands"], |
| | "prompt": result["prompt"], |
| | "timestamp": result["timestamp"], |
| | "execution_time": result["execution_time"], |
| | "results": [] |
| | } |
| | |
| | |
| | for cmd_result in result["results"]: |
| | if cmd_result["success"]: |
| | serializable_result["results"].append({ |
| | "command": cmd_result["command"], |
| | "success": cmd_result["success"], |
| | "result_summary": { |
| | "command_family": cmd_result["original_command"]["family"], |
| | "command_function": cmd_result["original_command"]["function"], |
| | "execution_time": cmd_result["execution_time"], |
| | "output_length": len(cmd_result["result"]["output"]) if "output" in cmd_result["result"] else 0, |
| | "residues": len(cmd_result["result"]["residues"]) if "residues" in cmd_result["result"] else 0, |
| | "collapse_samples": len(cmd_result["result"]["collapse_samples"]) if "collapse_samples" in cmd_result["result"] else 0 |
| | } |
| | }) |
| | else: |
| | serializable_result["results"].append({ |
| | "command": cmd_result["command"], |
| | "success": cmd_result["success"], |
| | "error": cmd_result["error"] |
| | }) |
| | |
| | json.dump(serializable_result, f, indent=2) |
| | |
| | print(f"Recursive shell result saved to {output_path}") |
| | |
| | |
| | print("\nRecursive Shell Execution Summary:") |
| | print(f" Overall success: {result['success']}") |
| | print(f" Commands executed: {len(result['commands'])}") |
| | print(f" Execution time: {result['execution_time']:.2f}s") |
| | |
| | print("\nCommand Results:") |
| | for i, cmd_result in enumerate(result["results"]): |
| | cmd = cmd_result["command"] |
| | success = cmd_result["success"] |
| | print(f" {i+1}. {cmd}: {'Success' if success else 'Failed'}") |
| | if not success: |
| | print(f" Error: {cmd_result['error']}") |
| | |
| | |
| | visualization_data = None |
| | for cmd_result in reversed(result["results"]): |
| | if (cmd_result["success"] and |
| | cmd_result["original_command"]["family"] == "fork" and |
| | cmd_result["original_command"]["function"] == "attribution" and |
| | "result" in cmd_result and |
| | "visualization" in cmd_result["result"]): |
| | visualization_data = cmd_result["result"]["visualization"] |
| | break |
| | |
| | if visualization_data: |
| | output_path = output_dir / f"recursive_shell_{prompt_key}.svg" |
| | visualizer.save_visualization(visualization_data, str(output_path)) |
| | print(f"Visualization saved to {output_path}") |
| | |
| | return result |
| |
|
| | def compare_shells(prompt_key="value_conflict", shells=None): |
| | """Compare multiple shells on the same prompt.""" |
| | if shells is None: |
| | shells = ["VALUE-COLLAPSE", "FEATURE-SUPERPOSITION", "FORK-ATTRIBUTION"] |
| | |
| | prompt = test_prompts[prompt_key] |
| | print(f"\n=== Shell Comparison on '{prompt_key}' prompt ===") |
| | print(f"Prompt: {prompt[:100]}...") |
| | |
| | |
| | results = {} |
| | residue_patterns = [] |
| | |
| | for shell_id in shells: |
| | print(f"\nRunning shell: {shell_id}") |
| | result = executor.run( |
| | shell=shell_id, |
| | model=model, |
| | prompt=prompt, |
| | trace_attribution=True, |
| | record_residue=True |
| | ) |
| | results[shell_id] = result |
| | |
| | |
| | for residue in result["residues"]: |
| | residue_patterns.append({ |
| | "shell": shell_id, |
| | "type": residue.get("type", "unknown"), |
| | "pattern": residue.get("pattern", ""), |
| | "confidence": residue.get("confidence", 0.0), |
| | "signature": residue.get("signature", "")[:30] + "..." |
| | }) |
| | |
| | |
| | output_path = output_dir / f"shell_comparison_{prompt_key}.json" |
| | with open(output_path, "w") as f: |
| | json.dump({ |
| | "prompt": prompt, |
| | "shells": shells, |
| | "residue_patterns": residue_patterns, |
| | "results": { |
| | shell_id: { |
| | "output_length": len(result["output"]), |
| | "num_operations": len(result["operations"]), |
| | "num_residues": len(result["residues"]), |
| | "num_collapses": len(result["collapse_samples"]) |
| | } |
| | for shell_id, result in results.items() |
| | } |
| | }, f, indent=2) |
| | |
| | print(f"Shell comparison saved to {output_path}") |
| | |
| | |
| | fig, ax = plt.subplots(figsize=(10, 6)) |
| | shell_names = list(results.keys()) |
| | |
| | |
| | residue_types = set() |
| | for pattern in residue_patterns: |
| | residue_types.add(pattern["type"]) |
| | |
| | residue_counts = { |
| | shell_id: { |
| | res_type: sum(1 for p in residue_patterns if p["shell"] == shell_id and p["type"] == res_type) |
| | for res_type in residue_types |
| | } |
| | for shell_id in shell_names |
| | } |
| | |
| | |
| | bar_width = 0.2 |
| | positions = range(len(shell_names)) |
| | |
| | for i, res_type in enumerate(sorted(residue_types)): |
| | counts = [residue_counts[shell][res_type] for shell in shell_names] |
| | ax.bar( |
| | [p + i * bar_width for p in positions], |
| | counts, |
| | width=bar_width, |
| | label=res_type |
| | ) |
| | |
| | ax.set_xlabel('Shell') |
| | ax.set_ylabel('Number of Residues') |
| | ax.set_title(f'Residue Patterns by Shell - {prompt_key}') |
| | ax.set_xticks([p + bar_width * len(residue_types) / 2 for p in positions]) |
| | ax.set_xticklabels(shell_names) |
| | ax.legend() |
| | |
| | |
| | output_path = output_dir / f"shell_comparison_{prompt_key}.png" |
| | plt.savefig(output_path) |
| | print(f"Comparison visualization saved to {output_path}") |
| | |
| | return results, residue_patterns |
| |
|
| | def analyze_residue_registry(): |
| | """Analyze all recorded residue patterns.""" |
| | print("\n=== Residue Registry Analysis ===") |
| | |
| | |
| | analysis = executor.get_residue_analysis() |
| | |
| | |
| | output_path = output_dir / "residue_analysis.json" |
| | with open(output_path, "w") as f: |
| | json.dump(analysis, f, indent=2) |
| | |
| | print(f"Residue analysis saved to {output_path}") |
| | |
| | |
| | print(f"\nRecorded {analysis['num_patterns']} residue patterns of {len(analysis['types'])} types") |
| | |
| | print("\nPattern Types:") |
| | for pattern_type, stats in analysis["type_stats"].items(): |
| | print(f" {pattern_type}: {stats['count']} patterns, avg confidence: {stats['avg_confidence']:.2f}") |
| | if stats['examples']: |
| | print(f" Examples: {stats['examples'][0]['signature']} (conf: {stats['examples'][0]['confidence']:.2f})") |
| | |
| | print("\nRelated Patterns:") |
| | for i, relation in enumerate(analysis["related_patterns"][:3]): |
| | print(f" {i+1}. {relation['pattern1']} ({relation['type1']}) ~ {relation['pattern2']} ({relation['type2']})") |
| | print(f" Similarity: {relation['similarity']:.2f}") |
| | |
| | |
| | fig, ax = plt.subplots(figsize=(10, 6)) |
| | |
| | types = [] |
| | counts = [] |
| | confidences = [] |
| | |
| | for pattern_type, stats in analysis["type_stats"].items(): |
| | types.append(pattern_type) |
| | counts.append(stats["count"]) |
| | confidences.append(stats["avg_confidence"]) |
| | |
| | |
| | ax.bar(types, counts, alpha=0.7, label="Count") |
| | |
| | |
| | ax2 = ax.twinx() |
| | ax2.plot(types, confidences, 'r-', marker='o', label="Avg Confidence") |
| | ax2.set_ylim([0, 1.0]) |
| | ax2.set_ylabel("Average Confidence") |
| | |
| | ax.set_xlabel("Residue Type") |
| | ax.set_ylabel("Number of Patterns") |
| | ax.set_title("Residue Pattern Analysis") |
| | ax.set_xticklabels(types, rotation=45, ha="right") |
| | |
| | |
| | lines, labels = ax.get_legend_handles_labels() |
| | lines2, labels2 = ax2.get_legend_handles_labels() |
| | ax.legend(lines + lines2, labels + labels2, loc="upper left") |
| | |
| | plt.tight_layout() |
| | |
| | |
| | output_path = output_dir / "residue_analysis.png" |
| | plt.savefig(output_path) |
| | print(f"Residue analysis visualization saved to {output_path}") |
| | |
| | return analysis |
| |
|
| | def main(): |
| | """Run a complete analysis demonstration.""" |
| | print("=== glyphs Framework Demonstration ===") |
| | print(f"Results will be saved to: {output_dir}") |
| | |
| | |
| | attribution_map, glyph_map = run_attribution_analysis(prompt_key="memory") |
| | |
| | |
| | shell_result = run_shell |
| | |
| | shell_result = run_shell_analysis(shell_id="MEMTRACE", prompt_key="memory") |
| | |
| | |
| | value_shell_result = run_shell_analysis(shell_id="VALUE-COLLAPSE", prompt_key="value_conflict") |
| | |
| | |
| | recursive_result = run_recursive_shell(prompt_key="recursion") |
| | |
| | |
| | comparison_results, residue_patterns = compare_shells( |
| | prompt_key="value_conflict", |
| | shells=["VALUE-COLLAPSE", "FORK-ATTRIBUTION", "FEATURE-SUPERPOSITION"] |
| | ) |
| | |
| | |
| | residue_analysis = analyze_residue_registry() |
| | |
| | |
| | print("\n=== Complete Analysis on 'boundary' prompt ===") |
| | boundary_prompt = test_prompts["boundary"] |
| | |
| | |
| | boundary_output = model.generate(boundary_prompt, max_tokens=800) |
| | boundary_attribution = tracer.trace( |
| | prompt=boundary_prompt, |
| | output=boundary_output, |
| | include_confidence=True |
| | ) |
| | |
| | |
| | boundary_glyph_map = mapper.map_attribution( |
| | attribution_map=boundary_attribution, |
| | layout_type="force_directed", |
| | include_tokens=True |
| | ) |
| | |
| | |
| | boundary_glyph_path = output_dir / "boundary_glyph_map.svg" |
| | mapper.visualize(boundary_glyph_map, output_path=str(boundary_glyph_path)) |
| | print(f"Boundary glyph map saved to {boundary_glyph_path}") |
| | |
| | |
| | boundary_shells = ["BOUNDARY-HESITATION", "GHOST-ACTIVATION", "META-COLLAPSE"] |
| | boundary_shell_results = {} |
| | |
| | for shell_id in boundary_shells: |
| | print(f"\nRunning {shell_id} on boundary prompt...") |
| | result = executor.run( |
| | shell=shell_id, |
| | model=model, |
| | prompt=boundary_prompt, |
| | trace_attribution=True, |
| | record_residue=True, |
| | visualize=True |
| | ) |
| | boundary_shell_results[shell_id] = result |
| | |
| | |
| | if "visualization" in result and result["visualization"]: |
| | viz_path = output_dir / f"boundary_{shell_id}.svg" |
| | visualizer.save_visualization(result["visualization"], str(viz_path)) |
| | print(f"{shell_id} visualization saved to {viz_path}") |
| | |
| | |
| | recursive_shell = RecursiveShell( |
| | model=model, |
| | tracer=tracer, |
| | visualizer=visualizer |
| | ) |
| | |
| | |
| | boundary_commands = [ |
| | ".p/reflect.boundary{distinct=true, overlap=minimal}", |
| | ".p/reflect.uncertainty{quantify=true, distribution=show}", |
| | ".p/collapse.detect{threshold=0.6, alert=true}", |
| | ".p/fork.attribution{sources=contested, visualize=true}" |
| | ] |
| | |
| | |
| | print("\nExecuting recursive shell commands on boundary prompt...") |
| | boundary_recursive_result = recursive_shell.execute_sequence( |
| | commands=boundary_commands, |
| | prompt=boundary_prompt |
| | ) |
| | |
| | |
| | boundary_summary = { |
| | "prompt": boundary_prompt, |
| | "output": boundary_output, |
| | "shells_run": boundary_shells, |
| | "recursive_commands": boundary_commands, |
| | "attribution_stats": { |
| | "links": len(boundary_attribution.links), |
| | "attribution_gaps": len(boundary_attribution.attribution_gaps), |
| | "collapsed_regions": len(boundary_attribution.collapsed_regions) |
| | }, |
| | "shell_results": { |
| | shell_id: { |
| | "residues": len(result["residues"]), |
| | "collapse_samples": len(result["collapse_samples"]) |
| | } |
| | for shell_id, result in boundary_shell_results.items() |
| | }, |
| | "recursive_success": boundary_recursive_result["success"] |
| | } |
| | |
| | |
| | boundary_summary_path = output_dir / "boundary_analysis_summary.json" |
| | with open(boundary_summary_path, "w") as f: |
| | json.dump(boundary_summary, f, indent=2) |
| | print(f"Boundary analysis summary saved to {boundary_summary_path}") |
| | |
| | |
| | boundary_residues = [] |
| | for shell_id, result in boundary_shell_results.items(): |
| | for residue in result["residues"]: |
| | boundary_residues.append({ |
| | "shell": shell_id, |
| | "type": residue.get("type", "unknown"), |
| | "confidence": residue.get("confidence", 0.0), |
| | "pattern": residue.get("pattern", "")[:50] + "..." |
| | }) |
| | |
| | |
| | if boundary_residues: |
| | from glyphs.residue.patterns import ResiduePattern |
| | residue_pattern_objects = [ |
| | ResiduePattern( |
| | type=r["type"], |
| | pattern=r["pattern"], |
| | context={"shell": r["shell"]}, |
| | signature=r["pattern"][:20], |
| | confidence=r["confidence"] |
| | ) |
| | for r in boundary_residues |
| | ] |
| | |
| | |
| | residue_glyph_map = mapper.map_residue_patterns( |
| | residue_patterns=residue_pattern_objects, |
| | layout_type="circular", |
| | cluster_patterns=True |
| | ) |
| | |
| | |
| | residue_glyph_path = output_dir / "boundary_residue_glyphs.svg" |
| | mapper.visualize(residue_glyph_map, output_path=str(residue_glyph_path)) |
| | print(f"Boundary residue glyph map saved to {residue_glyph_path}") |
| | |
| | |
| | fig, axes = plt.subplots(2, 2, figsize=(15, 12)) |
| | fig.suptitle("glyphs Framework - Comprehensive Analysis", fontsize=16) |
| | |
| | |
| | axes[0, 0].bar( |
| | ["Memory", "Value Conflict", "Boundary", "Recursion", "Polysemantic"], |
| | [ |
| | len(attribution_map.attribution_gaps), |
| | len(comparison_results["VALUE-COLLAPSE"]["attribution_map"].attribution_gaps) if "attribution_map" in comparison_results["VALUE-COLLAPSE"] else 0, |
| | len(boundary_attribution.attribution_gaps), |
| | 0, |
| | 0 |
| | ] |
| | ) |
| | axes[0, 0].set_title("Attribution Gaps by Prompt Type") |
| | axes[0, 0].set_xlabel("Prompt Type") |
| | axes[0, 0].set_ylabel("Number of Gaps") |
| | axes[0, 0].set_ylim(bottom=0) |
| | |
| | |
| | residue_types = list(residue_analysis["type_stats"].keys()) |
| | residue_counts = [residue_analysis["type_stats"][t]["count"] for t in residue_types] |
| | axes[0, 1].bar(residue_types, residue_counts) |
| | axes[0, 1].set_title("Residue Pattern Types") |
| | axes[0, 1].set_xlabel("Residue Type") |
| | axes[0, 1].set_ylabel("Count") |
| | axes[0, 1].set_ylim(bottom=0) |
| | axes[0, 1].set_xticklabels(residue_types, rotation=45, ha="right") |
| | |
| | |
| | shell_names = list(boundary_shell_results.keys()) |
| | residue_counts = [len(boundary_shell_results[s]["residues"]) for s in shell_names] |
| | collapse_counts = [len(boundary_shell_results[s]["collapse_samples"]) for s in shell_names] |
| | |
| | x = range(len(shell_names)) |
| | width = 0.35 |
| | axes[1, 0].bar([i - width/2 for i in x], residue_counts, width, label="Residues") |
| | axes[1, 0].bar([i + width/2 for i in x], collapse_counts, width, label="Collapses") |
| | axes[1, 0].set_title("Shell Analysis on Boundary Prompt") |
| | axes[1, 0].set_xlabel("Shell") |
| | axes[1, 0].set_ylabel("Count") |
| | axes[1, 0].set_xticks(x) |
| | axes[1, 0].set_xticklabels(shell_names, rotation=45, ha="right") |
| | axes[1, 0].legend() |
| | |
| | |
| | prompt_types = ["Memory", "Value Conflict", "Boundary", "Recursion", "Polysemantic"] |
| | shell_counts = [ |
| | len(executor.shells), |
| | len([s for s in executor.shells if "VALUE" in s or "CONFLICT" in s]), |
| | len([s for s in executor.shells if "BOUNDARY" in s or "GHOST" in s]), |
| | len([s for s in executor.shells if "REFLECTION" in s or "RECURSIVE" in s]), |
| | len([s for s in executor.shells if "FEATURE" in s or "SUPERPOSITION" in s]) |
| | ] |
| | |
| | axes[1, 1].bar(prompt_types, shell_counts) |
| | axes[1, 1].set_title("Available Shells by Conceptual Area") |
| | axes[1, 1].set_xlabel("Conceptual Area") |
| | axes[1, 1].set_ylabel("Number of Specialized Shells") |
| | axes[1, 1].set_ylim(bottom=0) |
| | |
| | plt.tight_layout() |
| | plt.subplots_adjust(top=0.92) |
| | |
| | |
| | summary_viz_path = output_dir / "framework_analysis_summary.png" |
| | plt.savefig(summary_viz_path, dpi=300) |
| | print(f"Framework analysis summary visualization saved to {summary_viz_path}") |
| | |
| | print("\n=== Analysis Complete ===") |
| | print(f"All outputs saved to: {output_dir}") |
| |
|
| | def attribution_demo(): |
| | """Focused demo of advanced attribution tracing.""" |
| | print("\n=== Advanced Attribution Tracing Demo ===") |
| | |
| | |
| | prompt = test_prompts["polysemantic"] |
| | print(f"Prompt: {prompt[:100]}...") |
| | |
| | |
| | output = model.generate(prompt, max_tokens=800) |
| | print(f"Output: {output[:100]}...") |
| | |
| | |
| | attribution_map = tracer.trace( |
| | prompt=prompt, |
| | output=output, |
| | include_confidence=True |
| | ) |
| | |
| | |
| | print("Tracing attribution forks...") |
| | fork_result = tracer.trace_with_forks( |
| | prompt=prompt, |
| | output=output, |
| | fork_factor=3, |
| | include_confidence=True, |
| | visualize=True |
| | ) |
| | |
| | |
| | if fork_result["metadata"].get("visualization"): |
| | output_path = output_dir / "attribution_forks_polysemantic.svg" |
| | visualizer.save_visualization(fork_result["metadata"]["visualization"], str(output_path)) |
| | print(f"Fork visualization saved to {output_path}") |
| | |
| | |
| | print("Tracing QK alignment...") |
| | qk_result = tracer.trace_qk_alignment( |
| | prompt=prompt, |
| | output=output, |
| | visualize=True |
| | ) |
| | |
| | |
| | if "visualization" in qk_result: |
| | output_path = output_dir / "qk_alignment_polysemantic.svg" |
| | visualizer.save_visualization(qk_result["visualization"], str(output_path)) |
| | print(f"QK alignment visualization saved to {output_path}") |
| | |
| | |
| | print("Tracing OV projection...") |
| | ov_result = tracer.trace_ov_projection( |
| | prompt=prompt, |
| | output=output, |
| | visualize=True |
| | ) |
| | |
| | |
| | if "visualization" in ov_result: |
| | output_path = output_dir / "ov_projection_polysemantic.svg" |
| | visualizer.save_visualization(ov_result["visualization"], str(output_path)) |
| | print(f"OV projection visualization saved to {output_path}") |
| | |
| | |
| | print("Tracing attention heads...") |
| | heads_result = tracer.trace_attention_heads( |
| | prompt=prompt, |
| | output=output, |
| | visualize=True |
| | ) |
| | |
| | |
| | if "visualization" in heads_result: |
| | output_path = output_dir / "attention_heads_polysemantic.svg" |
| | visualizer.save_visualization(heads_result["visualization"], str(output_path)) |
| | print(f"Attention heads visualization saved to {output_path}") |
| | |
| | |
| | attribution_summary = { |
| | "basic_attribution": { |
| | "links": len(attribution_map.links), |
| | "gaps": len(attribution_map.attribution_gaps), |
| | "collapsed_regions": len(attribution_map.collapsed_regions) |
| | }, |
| | "fork_attribution": { |
| | "forks": len(fork_result["fork_paths"]), |
| | "conflict_points": sum(len(fork.get("conflict_points", [])) for fork in fork_result["fork_paths"]) |
| | }, |
| | "qk_alignment": { |
| | "alignments": len(qk_result["qk_alignments"]), |
| | "patterns": list(qk_result["qk_patterns"].keys()) if "qk_patterns" in qk_result else [] |
| | }, |
| | "ov_projection": { |
| | "projections": len(ov_result["ov_projections"]), |
| | "patterns": list(ov_result["ov_patterns"].keys()) if "ov_patterns" in ov_result else [] |
| | }, |
| | "attention_heads": { |
| | "heads": len(heads_result["attention_heads"]), |
| | "patterns": list(heads_result["head_patterns"].keys()) if "head_patterns" in heads_result else [] |
| | } |
| | } |
| | |
| | |
| | output_path = output_dir / "attribution_analysis_summary.json" |
| | with open(output_path, "w") as f: |
| | json.dump(attribution_summary, f, indent=2) |
| | print(f"Attribution analysis summary saved to {output_path}") |
| | |
| | return attribution_summary |
| |
|
| | def recursive_shell_demo(): |
| | """Focused demo of recursive shell commands.""" |
| | print("\n=== Recursive Shell Command Demo ===") |
| | |
| | |
| | recursive_shell = RecursiveShell( |
| | model=model, |
| | tracer=tracer, |
| | visualizer=visualizer |
| | ) |
| | |
| | |
| | prompt = test_prompts["recursion"] |
| | print(f"Prompt: {prompt[:100]}...") |
| | |
| | |
| | help_text = recursive_shell.get_command_help() |
| | output_path = output_dir / "recursive_shell_help.txt" |
| | with open(output_path, "w") as f: |
| | f.write(help_text) |
| | print(f"Recursive shell help text saved to {output_path}") |
| | |
| | |
| | demo_commands = [ |
| | |
| | ".p/reflect.trace{depth=3, target=reasoning}", |
| | ".p/reflect.attribution{sources=all, confidence=true}", |
| | ".p/reflect.uncertainty{quantify=true, distribution=show}", |
| | ".p/reflect.boundary{distinct=true, overlap=minimal}", |
| | |
| | |
| | ".p/collapse.detect{threshold=0.7, alert=true}", |
| | ".p/collapse.prevent{trigger=recursive_depth, threshold=5}", |
| | ".p/collapse.recover{from=loop, method=gradual}", |
| | |
| | |
| | ".p/fork.attribution{sources=all, visualize=true}", |
| | ".p/fork.counterfactual{variants=['optimistic', 'pessimistic'], compare=true}", |
| | |
| | |
| | ".p/shell.isolate{boundary=permeable, contamination=warn}" |
| | ] |
| | |
| | |
| | command_results = {} |
| | for cmd in demo_commands: |
| | print(f"\nExecuting: {cmd}") |
| | try: |
| | result = recursive_shell.execute( |
| | command=cmd, |
| | prompt=prompt |
| | ) |
| | command_results[cmd] = { |
| | "success": result["success"], |
| | "execution_time": result.get("execution_time", 0), |
| | "error": result["error"] if not result["success"] else None |
| | } |
| | |
| | |
| | if (result["success"] and |
| | "result" in result and |
| | "visualization" in result["result"] and |
| | result["result"]["visualization"]): |
| | |
| | |
| | safe_cmd = cmd.replace(".", "_").replace("/", "_").replace("{", "_").replace("}", "_") |
| | output_path = output_dir / f"recursive_cmd_{safe_cmd}.svg" |
| | visualizer.save_visualization(result["result"]["visualization"], str(output_path)) |
| | print(f"Command visualization saved to {output_path}") |
| | |
| | |
| | command_results[cmd]["visualization_path"] = str(output_path) |
| | except Exception as e: |
| | print(f"Error executing {cmd}: {e}") |
| | command_results[cmd] = { |
| | "success": False, |
| | "error": str(e) |
| | } |
| | |
| | |
| | output_path = output_dir / "recursive_commands_summary.json" |
| | with open(output_path, "w") as f: |
| | json.dump(command_results, f, indent=2) |
| | print(f"Recursive commands summary saved to {output_path}") |
| | |
| | |
| | plt.figure(figsize=(12, 8)) |
| | |
| | |
| | cmd_names = list(command_results.keys()) |
| | cmd_success = [1 if command_results[cmd]["success"] else 0 for cmd in cmd_names] |
| | cmd_times = [command_results[cmd].get("execution_time", 0) if command_results[cmd]["success"] else 0 for cmd in cmd_names] |
| | |
| | plt.barh(range(len(cmd_names)), cmd_success, alpha=0.7, label="Success") |
| | plt.barh(range(len(cmd_names)), cmd_times, alpha=0.5, label="Execution Time (s)") |
| | |
| | plt.yticks(range(len(cmd_names)), [cmd[:30] + "..." for cmd in cmd_names]) |
| | plt.xlabel("Success / Execution Time (s)") |
| | plt.title("Recursive Shell Command Execution") |
| | plt.legend() |
| | plt.tight_layout() |
| | |
| | |
| | output_path = output_dir / "recursive_commands_visualization.png" |
| | plt.savefig(output_path) |
| | print(f"Recursive commands visualization saved to {output_path}") |
| | |
| | return command_results |
| |
|
| | if __name__ == "__main__": |
| | |
| | output_dir.mkdir(exist_ok=True) |
| | |
| | |
| | main() |
| | |
| | |
| | |
| | |
| |
|