Spaces:
Sleeping
Sleeping
| """ | |
| NullAI - .iath Memory System | |
| 樹木型空間記憶(Dendritic Memory Space)の実装 | |
| .iathファイル形式との完全互換性を持つ知識検索システム | |
| 6次元座標系による空間的RAG(Retrieval-Augmented Generation) | |
| """ | |
| import struct | |
| import zstandard as zstd | |
| import json | |
| import logging | |
| import numpy as np | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from pathlib import Path | |
| from datetime import datetime | |
| logger = logging.getLogger(__name__) | |
| class IathDecoder: | |
| """ | |
| .iathファイル形式のデコーダー | |
| dendritic-memory-editorとの互換性を保持 | |
| """ | |
| def __init__(self, iath_file_path: str): | |
| """ | |
| Args: | |
| iath_file_path: .iathファイルのパス | |
| """ | |
| self.file_path = Path(iath_file_path) | |
| self.header = None | |
| self.index = [] | |
| self.data_section_offset = 0 | |
| if self.file_path.exists(): | |
| self._load_header_and_index() | |
| else: | |
| logger.warning(f".iath file not found: {iath_file_path}") | |
| def _load_header_and_index(self): | |
| """ヘッダーとインデックスセクションを読み込む""" | |
| try: | |
| with open(self.file_path, 'rb') as f: | |
| # ヘッダー読み込み (64 bytes) | |
| header_data = f.read(64) | |
| magic, version, domain_code, compression_type, checksum, index_offset, data_offset = struct.unpack( | |
| "<4sIBB32sQQ6x", header_data | |
| ) | |
| self.header = { | |
| "magic": magic.decode('ascii'), | |
| "version": version, | |
| "domain_code": domain_code, | |
| "compression_type": compression_type, | |
| "index_offset": index_offset, | |
| "data_offset": data_offset | |
| } | |
| logger.info(f"Loaded .iath header: magic={self.header['magic']}, domain={domain_code}") | |
| # インデックス読み込み | |
| f.seek(index_offset) | |
| index_binary = f.read(data_offset - index_offset) | |
| self.index = json.loads(index_binary.decode('utf-8')) | |
| self.data_section_offset = data_offset | |
| logger.info(f"Loaded {len(self.index)} tiles from index") | |
| except Exception as e: | |
| logger.error(f"Error loading .iath file: {e}") | |
| raise | |
| def _decode_string(self, data: bytes, offset: int) -> Tuple[str, int]: | |
| """NULL終端文字列をデコード""" | |
| end = data.find(b'\0', offset) | |
| if end == -1: | |
| return data[offset:].decode('utf-8'), len(data) | |
| return data[offset:end].decode('utf-8'), end + 1 | |
| def _decode_tile_data(self, compressed_data: bytes) -> dict: | |
| """圧縮されたタイルデータをデコード""" | |
| # zstd解凍 | |
| dctx = zstd.ZstdDecompressor() | |
| uncompressed = dctx.decompress(compressed_data) | |
| offset = 0 | |
| # メタデータ | |
| metadata_len = struct.unpack("<I", uncompressed[offset:offset+4])[0] | |
| offset += 4 | |
| metadata_bin = uncompressed[offset:offset+metadata_len] | |
| offset += metadata_len | |
| # メタデータ内の文字列をパース | |
| meta_offset = 0 | |
| knowledge_id, meta_offset = self._decode_string(metadata_bin, meta_offset) | |
| topic, meta_offset = self._decode_string(metadata_bin, meta_offset) | |
| created_at = metadata_bin[meta_offset:meta_offset+27].decode('ascii').strip('\0') | |
| metadata = { | |
| "knowledge_id": knowledge_id, | |
| "topic": topic, | |
| "created_at": created_at | |
| } | |
| # 座標 | |
| coord_len = struct.unpack("<I", uncompressed[offset:offset+4])[0] | |
| offset += 4 | |
| coord_data = uncompressed[offset:offset+coord_len] | |
| offset += coord_len | |
| x, y, z, c, g, v = struct.unpack("<ffffff", coord_data) | |
| coordinates = { | |
| "medical_space": [x, y, z], | |
| "meta_space": [c, g, v] | |
| } | |
| # コンテンツ | |
| content_len = struct.unpack("<I", uncompressed[offset:offset+4])[0] | |
| offset += 4 | |
| content_data = uncompressed[offset:offset+content_len] | |
| offset += content_len | |
| # thinking_process | |
| thinking_len = struct.unpack("<I", content_data[0:4])[0] | |
| thinking = content_data[4:4+thinking_len].decode('utf-8') | |
| # final_response | |
| response_offset = 4 + thinking_len | |
| response_len = struct.unpack("<I", content_data[response_offset:response_offset+4])[0] | |
| response = content_data[response_offset+4:response_offset+4+response_len].decode('utf-8') | |
| content = { | |
| "thinking_process": thinking, | |
| "final_response": response | |
| } | |
| # 検証(簡易実装) | |
| verification_len = struct.unpack("<I", uncompressed[offset:offset+4])[0] | |
| offset += 4 | |
| verification_data = uncompressed[offset:offset+verification_len] | |
| status_code, initial_certainty, reviewer_count = struct.unpack("<BBI", verification_data[:6]) | |
| status_map = {0: "pending_review", 1: "partial_verified", 2: "verified", 3: "expert_confirmed"} | |
| verification = { | |
| "status": status_map.get(status_code, "pending_review"), | |
| "initial_certainty": initial_certainty / 100.0, | |
| "reviewers": [] # 詳細は省略 | |
| } | |
| return { | |
| "metadata": metadata, | |
| "coordinates": coordinates, | |
| "content": content, | |
| "verification": verification | |
| } | |
| def get_tile_by_id(self, knowledge_id: str) -> Optional[dict]: | |
| """IDで特定のタイルを取得""" | |
| try: | |
| # インデックスから検索 | |
| index_entry = next((entry for entry in self.index if entry["id"] == knowledge_id), None) | |
| if not index_entry: | |
| return None | |
| # データセクションから読み込み | |
| with open(self.file_path, 'rb') as f: | |
| f.seek(self.data_section_offset + index_entry["offset"]) | |
| compressed_data = f.read(index_entry["length"]) | |
| return self._decode_tile_data(compressed_data) | |
| except Exception as e: | |
| logger.error(f"Error loading tile {knowledge_id}: {e}") | |
| return None | |
| def get_all_tiles(self) -> List[dict]: | |
| """全タイルを取得(メモリに注意)""" | |
| tiles = [] | |
| for entry in self.index: | |
| tile = self.get_tile_by_id(entry["id"]) | |
| if tile: | |
| tiles.append(tile) | |
| return tiles | |
| class DendriticMemorySpace: | |
| """ | |
| 樹木型空間記憶システム | |
| 6次元座標系による知識の空間配置と検索: | |
| - medical_space [x, y, z]: ドメイン固有の3次元空間 | |
| - meta_space [c, g, v]: Certainty, Granularity, Verification | |
| """ | |
| def __init__(self, iath_file_path: str): | |
| """ | |
| Args: | |
| iath_file_path: .iathファイルのパス | |
| """ | |
| self.decoder = IathDecoder(iath_file_path) | |
| self.tiles_cache = [] # 全タイルをメモリにキャッシュ(最適化可能) | |
| self._build_spatial_index() | |
| def _build_spatial_index(self): | |
| """空間インデックスを構築""" | |
| logger.info("Building spatial index from .iath file...") | |
| self.tiles_cache = self.decoder.get_all_tiles() | |
| # 座標行列を構築(高速検索用) | |
| if self.tiles_cache: | |
| self.coordinates_matrix = np.array([ | |
| tile["coordinates"]["medical_space"] + tile["coordinates"]["meta_space"] | |
| for tile in self.tiles_cache | |
| ]) | |
| logger.info(f"Spatial index built: {len(self.tiles_cache)} tiles") | |
| else: | |
| self.coordinates_matrix = np.array([]) | |
| logger.warning("No tiles found in .iath file") | |
| def search_by_coordinates(self, query_coords: List[float], top_k: int = 5, distance_threshold: float = None) -> List[dict]: | |
| """ | |
| 座標ベースの空間検索(樹木型空間記憶の核心機能) | |
| Args: | |
| query_coords: クエリ座標 [x, y, z, c, g, v] | |
| top_k: 返却する上位K件 | |
| distance_threshold: 距離閾値(Noneなら無制限) | |
| Returns: | |
| 関連するタイルのリスト(距離の近い順) | |
| """ | |
| if len(self.tiles_cache) == 0: | |
| return [] | |
| query_vector = np.array(query_coords) | |
| # ユークリッド距離を計算 | |
| distances = np.linalg.norm(self.coordinates_matrix - query_vector, axis=1) | |
| # 距離でソート | |
| sorted_indices = np.argsort(distances) | |
| # top_k件を取得 | |
| results = [] | |
| for idx in sorted_indices[:top_k]: | |
| distance = distances[idx] | |
| if distance_threshold is not None and distance > distance_threshold: | |
| break | |
| tile = self.tiles_cache[idx].copy() | |
| tile["spatial_distance"] = float(distance) | |
| results.append(tile) | |
| return results | |
| def search_by_text(self, query_text: str, top_k: int = 5) -> List[dict]: | |
| """ | |
| テキスト検索(簡易実装:キーワードマッチング) | |
| Args: | |
| query_text: 検索クエリテキスト | |
| top_k: 返却する上位K件 | |
| Returns: | |
| 関連するタイルのリスト | |
| """ | |
| query_lower = query_text.lower() | |
| matches = [] | |
| for tile in self.tiles_cache: | |
| topic = tile["metadata"]["topic"].lower() | |
| content = tile["content"]["final_response"].lower() | |
| # シンプルなスコアリング(含まれる回数) | |
| score = topic.count(query_lower) * 2 + content.count(query_lower) | |
| if score > 0: | |
| tile_copy = tile.copy() | |
| tile_copy["text_match_score"] = score | |
| matches.append(tile_copy) | |
| # スコアでソート | |
| matches.sort(key=lambda x: x["text_match_score"], reverse=True) | |
| return matches[:top_k] | |
| def hybrid_search(self, query_text: str, query_coords: Optional[List[float]] = None, top_k: int = 5) -> List[dict]: | |
| """ | |
| ハイブリッド検索:テキスト + 空間座標 | |
| Args: | |
| query_text: 検索クエリテキスト | |
| query_coords: クエリ座標(オプション) | |
| top_k: 返却する上位K件 | |
| Returns: | |
| 関連するタイルのリスト | |
| """ | |
| # テキスト検索 | |
| text_results = self.search_by_text(query_text, top_k=top_k*2) | |
| # 座標検索が指定されている場合 | |
| if query_coords: | |
| spatial_results = self.search_by_coordinates(query_coords, top_k=top_k*2) | |
| # 両方に出現するタイルを優先的にスコアリング | |
| combined_scores = {} | |
| for tile in text_results: | |
| tile_id = tile["metadata"]["knowledge_id"] | |
| combined_scores[tile_id] = { | |
| "tile": tile, | |
| "text_score": tile.get("text_match_score", 0), | |
| "spatial_score": 0 | |
| } | |
| for tile in spatial_results: | |
| tile_id = tile["metadata"]["knowledge_id"] | |
| if tile_id in combined_scores: | |
| combined_scores[tile_id]["spatial_score"] = 1.0 / (1.0 + tile.get("spatial_distance", 10)) | |
| else: | |
| combined_scores[tile_id] = { | |
| "tile": tile, | |
| "text_score": 0, | |
| "spatial_score": 1.0 / (1.0 + tile.get("spatial_distance", 10)) | |
| } | |
| # 複合スコアで並び替え | |
| ranked = sorted( | |
| combined_scores.values(), | |
| key=lambda x: x["text_score"] * 0.6 + x["spatial_score"] * 0.4, | |
| reverse=True | |
| ) | |
| return [item["tile"] for item in ranked[:top_k]] | |
| else: | |
| # テキスト検索のみ | |
| return text_results[:top_k] | |
| def get_statistics(self) -> dict: | |
| """メモリ空間の統計情報を取得""" | |
| if len(self.tiles_cache) == 0: | |
| return {"total_tiles": 0} | |
| coords = self.coordinates_matrix | |
| return { | |
| "total_tiles": len(self.tiles_cache), | |
| "coordinate_ranges": { | |
| "medical_x": {"min": float(coords[:, 0].min()), "max": float(coords[:, 0].max())}, | |
| "medical_y": {"min": float(coords[:, 1].min()), "max": float(coords[:, 1].max())}, | |
| "medical_z": {"min": float(coords[:, 2].min()), "max": float(coords[:, 2].max())}, | |
| "certainty": {"min": float(coords[:, 3].min()), "max": float(coords[:, 3].max())}, | |
| "granularity": {"min": float(coords[:, 4].min()), "max": float(coords[:, 4].max())}, | |
| "verification": {"min": float(coords[:, 5].min()), "max": float(coords[:, 5].max())} | |
| }, | |
| "verification_status_distribution": self._get_verification_distribution() | |
| } | |
| def _get_verification_distribution(self) -> dict: | |
| """検証ステータスの分布を取得""" | |
| distribution = {} | |
| for tile in self.tiles_cache: | |
| status = tile["verification"]["status"] | |
| distribution[status] = distribution.get(status, 0) + 1 | |
| return distribution | |