|
|
""" |
|
|
fractal_json/encoder.py |
|
|
Recursive Pattern Detection and Fractal Encoding Engine |
|
|
""" |
|
|
|
|
|
import json |
|
|
import numpy as np |
|
|
from collections import defaultdict |
|
|
from typing import Any, Dict, List, Optional, Tuple |
|
|
|
|
|
class FractalEncoder: |
|
|
""" |
|
|
Encodes standard JSON into fractal.json format using recursive pattern detection. |
|
|
""" |
|
|
|
|
|
SYMBOLIC_MARKERS = { |
|
|
'root': '🜏', |
|
|
'seed': '∴', |
|
|
'bidirectional': '⇌', |
|
|
'compression': '⧖', |
|
|
'anchor': '☍' |
|
|
} |
|
|
|
|
|
def __init__(self, compression_threshold: float = 0.8): |
|
|
self.compression_threshold = compression_threshold |
|
|
self.pattern_cache = defaultdict(lambda: defaultdict(int)) |
|
|
self.symbolic_residue = {} |
|
|
self.compression_ratio = 1.0 |
|
|
|
|
|
def encode(self, data: Any, depth: int = 0) -> Dict: |
|
|
""" |
|
|
Main encoding function that converts standard JSON to fractal format. |
|
|
""" |
|
|
|
|
|
if isinstance(data, (str, int, float, bool)) or data is None: |
|
|
return data |
|
|
|
|
|
|
|
|
if isinstance(data, dict): |
|
|
return self._encode_dict(data, depth) |
|
|
elif isinstance(data, list): |
|
|
return self._encode_list(data, depth) |
|
|
else: |
|
|
return data |
|
|
|
|
|
def _encode_dict(self, data: Dict, depth: int) -> Dict: |
|
|
""" |
|
|
Encode dictionary with fractal pattern detection. |
|
|
""" |
|
|
|
|
|
pattern_id = self._detect_pattern(data) |
|
|
fractal_node = { |
|
|
f"{self.SYMBOLIC_MARKERS['compression']}depth": depth, |
|
|
f"{self.SYMBOLIC_MARKERS['root']}pattern": pattern_id |
|
|
} |
|
|
|
|
|
|
|
|
if pattern_id in self.pattern_cache: |
|
|
similar_patterns = self.pattern_cache[pattern_id] |
|
|
if self._can_compress(data, similar_patterns): |
|
|
|
|
|
fractal_node[f"{self.SYMBOLIC_MARKERS['anchor']}anchor"] = self._create_anchor(pattern_id) |
|
|
fractal_node[f"{self.SYMBOLIC_MARKERS['seed']}seed"] = self._extract_seed(data) |
|
|
self.compression_ratio *= 0.85 |
|
|
return fractal_node |
|
|
|
|
|
|
|
|
children = {} |
|
|
for key, value in data.items(): |
|
|
encoded_key = f"{self.SYMBOLIC_MARKERS['bidirectional']}{key}" |
|
|
children[encoded_key] = self.encode(value, depth + 1) |
|
|
|
|
|
if children: |
|
|
fractal_node[f"{self.SYMBOLIC_MARKERS['bidirectional']}children"] = children |
|
|
|
|
|
|
|
|
self.pattern_cache[pattern_id][json.dumps(data, sort_keys=True)] += 1 |
|
|
|
|
|
return fractal_node |
|
|
|
|
|
def _encode_list(self, data: List, depth: int) -> Dict: |
|
|
""" |
|
|
Encode list with fractal pattern detection. |
|
|
""" |
|
|
|
|
|
pattern_groups = self._detect_list_patterns(data) |
|
|
|
|
|
if pattern_groups: |
|
|
|
|
|
return { |
|
|
f"{self.SYMBOLIC_MARKERS['compression']}depth": depth, |
|
|
f"{self.SYMBOLIC_MARKERS['root']}pattern": "list_fractal", |
|
|
f"{self.SYMBOLIC_MARKERS['seed']}seed": self._extract_list_seed(pattern_groups), |
|
|
f"{self.SYMBOLIC_MARKERS['bidirectional']}expansions": [ |
|
|
self.encode(item, depth + 1) for item in data |
|
|
] |
|
|
} |
|
|
else: |
|
|
|
|
|
return [self.encode(item, depth + 1) for item in data] |
|
|
|
|
|
def _detect_pattern(self, data: Dict) -> str: |
|
|
""" |
|
|
Detect structural patterns in dictionaries using recursive hashing. |
|
|
""" |
|
|
|
|
|
structure = {k: type(v).__name__ for k, v in data.items()} |
|
|
structure_hash = hash(frozenset(structure.items())) |
|
|
|
|
|
|
|
|
similarity_score = self._calculate_self_similarity(data) |
|
|
|
|
|
if similarity_score > self.compression_threshold: |
|
|
return f"fractal_{structure_hash}" |
|
|
else: |
|
|
return f"standard_{structure_hash}" |
|
|
|
|
|
def _calculate_self_similarity(self, data: Any, parent_structure: Optional[Dict] = None) -> float: |
|
|
""" |
|
|
Calculate self-similarity score recursively. |
|
|
""" |
|
|
if not isinstance(data, dict): |
|
|
return 0.0 |
|
|
|
|
|
current_structure = {k: type(v).__name__ for k, v in data.items()} |
|
|
|
|
|
if parent_structure is None: |
|
|
|
|
|
child_scores = [] |
|
|
for value in data.values(): |
|
|
if isinstance(value, dict): |
|
|
child_scores.append(self._calculate_self_similarity(value, current_structure)) |
|
|
|
|
|
if child_scores: |
|
|
return np.mean(child_scores) |
|
|
else: |
|
|
return 0.0 |
|
|
else: |
|
|
|
|
|
common_keys = set(current_structure.keys()) & set(parent_structure.keys()) |
|
|
if not common_keys: |
|
|
return 0.0 |
|
|
|
|
|
matching_types = sum(1 for k in common_keys if current_structure[k] == parent_structure[k]) |
|
|
return matching_types / len(common_keys) |
|
|
|
|
|
def _detect_list_patterns(self, data: List) -> List[List[Any]]: |
|
|
""" |
|
|
Detect repeating patterns in lists. |
|
|
""" |
|
|
if len(data) < 2: |
|
|
return [] |
|
|
|
|
|
|
|
|
patterns = [] |
|
|
for pattern_length in range(1, len(data) // 2 + 1): |
|
|
for i in range(len(data) - pattern_length + 1): |
|
|
pattern = data[i:i + pattern_length] |
|
|
|
|
|
occurrences = 0 |
|
|
for j in range(i, len(data) - pattern_length + 1, pattern_length): |
|
|
if data[j:j + pattern_length] == pattern: |
|
|
occurrences += 1 |
|
|
|
|
|
if occurrences >= 2: |
|
|
patterns.append((pattern, occurrences)) |
|
|
|
|
|
|
|
|
if patterns: |
|
|
patterns.sort(key=lambda x: len(x[0]) * x[1], reverse=True) |
|
|
return [p[0] for p in patterns[:3]] |
|
|
|
|
|
return [] |
|
|
|
|
|
def _can_compress(self, data: Dict, similar_patterns: Dict) -> bool: |
|
|
""" |
|
|
Determine if data can be compressed using existing patterns. |
|
|
""" |
|
|
data_str = json.dumps(data, sort_keys=True) |
|
|
|
|
|
return similar_patterns.get(data_str, 0) >= 2 |
|
|
|
|
|
def _create_anchor(self, pattern_id: str) -> str: |
|
|
""" |
|
|
Create anchor reference for pattern compression. |
|
|
""" |
|
|
return f"#/patterns/{pattern_id}" |
|
|
|
|
|
def _extract_seed(self, data: Dict) -> Dict: |
|
|
""" |
|
|
Extract minimal seed pattern from data. |
|
|
""" |
|
|
|
|
|
seed = {} |
|
|
for key, value in data.items(): |
|
|
if isinstance(value, (str, int, float, bool)) or value is None: |
|
|
seed[key] = value |
|
|
else: |
|
|
|
|
|
seed[key] = f"{self.SYMBOLIC_MARKERS['bidirectional']}expand" |
|
|
|
|
|
return seed |
|
|
|
|
|
def _extract_list_seed(self, pattern_groups: List[List[Any]]) -> Dict: |
|
|
""" |
|
|
Extract seed pattern from repeating list elements. |
|
|
""" |
|
|
return { |
|
|
"pattern": pattern_groups[0], |
|
|
"repetitions": len(pattern_groups) |
|
|
} |
|
|
|
|
|
def get_compression_stats(self) -> Dict: |
|
|
""" |
|
|
Return compression statistics. |
|
|
""" |
|
|
return { |
|
|
"compression_ratio": self.compression_ratio, |
|
|
"pattern_count": len(self.pattern_cache), |
|
|
"symbolic_residue": self.symbolic_residue |
|
|
} |
|
|
|