| |
| """ |
| .iath File Writer Module |
| |
| AI生成知識を.iath形式で保存するためのモジュール。 |
| dendritic-memory-editor完全互換。 |
| """ |
|
|
| import struct |
| import zstandard as zstd |
| from datetime import datetime |
| import json |
| import logging |
| from pathlib import Path |
| from typing import Dict, List, Any, Optional |
| import uuid |
|
|
| from null_ai.iath_memory import IathDecoder |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class IathWriter: |
| """ |
| .iathファイルへの書き込みを管理するクラス |
| """ |
|
|
| def __init__(self, iath_file_path: str): |
| self.file_path = Path(iath_file_path) |
| self.encoder = IathTileEncoder() |
|
|
| def append_tile(self, tile: Dict[str, Any]) -> bool: |
| """ |
| 既存の.iathファイルに新しいタイルを追記 |
| |
| Args: |
| tile: Knowledge Tile オブジェクト |
| |
| Returns: |
| 成功したかどうか |
| """ |
| try: |
| |
| if not self.file_path.exists(): |
| |
| return self._create_new_iath_file([tile]) |
|
|
| |
| decoder = IathDecoder(str(self.file_path)) |
| existing_tiles = decoder.get_all_tiles() |
|
|
| |
| existing_tiles.append(tile) |
|
|
| |
| return self._rebuild_iath_file(existing_tiles) |
|
|
| except Exception as e: |
| logger.error(f"Failed to append tile to .iath: {e}") |
| return False |
|
|
| def append_tiles_batch(self, tiles: List[Dict[str, Any]]) -> bool: |
| """ |
| 複数のタイルを一括追記 |
| |
| Args: |
| tiles: Knowledge Tile のリスト |
| |
| Returns: |
| 成功したかどうか |
| """ |
| try: |
| if not self.file_path.exists(): |
| return self._create_new_iath_file(tiles) |
|
|
| decoder = IathDecoder(str(self.file_path)) |
| existing_tiles = decoder.get_all_tiles() |
|
|
| |
| existing_tiles.extend(tiles) |
|
|
| |
| return self._rebuild_iath_file(existing_tiles) |
|
|
| except Exception as e: |
| logger.error(f"Failed to append tiles batch to .iath: {e}") |
| return False |
|
|
| def _create_new_iath_file(self, tiles: List[Dict[str, Any]]) -> bool: |
| """ |
| 新規.iathファイルを作成 |
| """ |
| try: |
| with open(self.file_path, 'wb') as f: |
| |
| header = self._create_header(len(tiles)) |
| f.write(header) |
|
|
| |
| index_data, tiles_data = self._create_index_and_data(tiles) |
| index_json = json.dumps(index_data, ensure_ascii=False).encode('utf-8') |
|
|
| |
| f.write(struct.pack('<I', len(index_json))) |
| f.write(index_json) |
|
|
| |
| f.write(tiles_data) |
|
|
| logger.info(f"Created new .iath file: {self.file_path}") |
| return True |
|
|
| except Exception as e: |
| logger.error(f"Failed to create new .iath file: {e}") |
| return False |
|
|
| def _rebuild_iath_file(self, tiles: List[Dict[str, Any]]) -> bool: |
| """ |
| .iathファイルを再構築 |
| """ |
| |
| temp_path = self.file_path.with_suffix('.iath.tmp') |
|
|
| try: |
| with open(temp_path, 'wb') as f: |
| |
| header = self._create_header(len(tiles)) |
| f.write(header) |
|
|
| |
| index_data, tiles_data = self._create_index_and_data(tiles) |
| index_json = json.dumps(index_data, ensure_ascii=False).encode('utf-8') |
|
|
| |
| f.write(struct.pack('<I', len(index_json))) |
| f.write(index_json) |
|
|
| |
| f.write(tiles_data) |
|
|
| |
| temp_path.replace(self.file_path) |
|
|
| logger.info(f"Rebuilt .iath file with {len(tiles)} tiles: {self.file_path}") |
| return True |
|
|
| except Exception as e: |
| logger.error(f"Failed to rebuild .iath file: {e}") |
| if temp_path.exists(): |
| temp_path.unlink() |
| return False |
|
|
| def _create_header(self, tile_count: int) -> bytes: |
| """ |
| .iathヘッダーを作成(64バイト) |
| """ |
| |
| magic = b'IATH' |
|
|
| |
| version_major = 1 |
| version_minor = 0 |
|
|
| |
| |
| created_at = int(datetime.now().timestamp()) |
|
|
| |
| reserved = b'\x00' * 46 |
|
|
| header = struct.pack( |
| '<4s 2B I Q 46s', |
| magic, |
| version_major, |
| version_minor, |
| tile_count, |
| created_at, |
| reserved |
| ) |
|
|
| assert len(header) == 64, f"Header size must be 64 bytes, got {len(header)}" |
| return header |
|
|
| def _create_index_and_data(self, tiles: List[Dict[str, Any]]) -> tuple: |
| """ |
| インデックスとデータセクションを作成 |
| |
| Returns: |
| (index_dict, data_bytes) |
| """ |
| index = {"tiles": []} |
| data_buffer = bytearray() |
|
|
| current_offset = 0 |
|
|
| for tile in tiles: |
| |
| tile_bytes = self.encoder.encode_tile(tile) |
|
|
| |
| index["tiles"].append({ |
| "id": tile["metadata"]["knowledge_id"], |
| "offset": current_offset, |
| "size": len(tile_bytes) |
| }) |
|
|
| |
| data_buffer.extend(tile_bytes) |
| current_offset += len(tile_bytes) |
|
|
| return index, bytes(data_buffer) |
|
|
|
|
| class IathTileEncoder: |
| """ |
| Knowledge Tileを.iath互換のバイナリ形式にエンコード |
| |
| dendritic-memory-editorのIathEncoderと互換性あり |
| """ |
|
|
| def encode_tile(self, tile: Dict[str, Any]) -> bytes: |
| """ |
| 単一のKnowledge Tileをエンコードし、zstdで圧縮 |
| |
| Args: |
| tile: Knowledge Tile オブジェクト |
| |
| Returns: |
| 圧縮されたバイナリデータ |
| """ |
| |
| metadata_bin = self._encode_metadata(tile["metadata"]) |
| coord_bin = self._encode_coordinates(tile["coordinates"]) |
| content_bin = self._encode_content(tile["content"]) |
| verification_bin = self._encode_verification(tile["verification"]) |
|
|
| |
| uncompressed = b"".join([ |
| struct.pack("<I", len(metadata_bin)), metadata_bin, |
| struct.pack("<I", len(coord_bin)), coord_bin, |
| struct.pack("<I", len(content_bin)), content_bin, |
| struct.pack("<I", len(verification_bin)), verification_bin, |
| ]) |
|
|
| |
| cctx = zstd.ZstdCompressor(level=19) |
| compressed = cctx.compress(uncompressed) |
|
|
| return compressed |
|
|
| def _encode_metadata(self, metadata: Dict[str, Any]) -> bytes: |
| """メタデータをバイナリ化""" |
| kid = self._encode_string(metadata["knowledge_id"]) |
| topic = self._encode_string(metadata["topic"]) |
|
|
| |
| created_at_iso = metadata.get("created_at", datetime.now().isoformat()) |
| created_at = created_at_iso.encode('ascii')[:27] |
|
|
| return kid + topic + created_at |
|
|
| def _encode_coordinates(self, coordinates: Dict[str, Any]) -> bytes: |
| """ |
| 座標をバイナリ化(6つの浮動小数点数) |
| |
| 座標は以下のいずれかの形式を受け付ける: |
| 1. {"medical_space": [x, y, z], "meta_space": [c, g, v]} |
| 2. [x, y, z, c, g, v] |
| """ |
| if isinstance(coordinates, dict): |
| medical_space = coordinates["medical_space"] |
| meta_space = coordinates["meta_space"] |
|
|
| return struct.pack( |
| "<ffffff", |
| float(medical_space[0]), float(medical_space[1]), float(medical_space[2]), |
| float(meta_space[0]), float(meta_space[1]), float(meta_space[2]) |
| ) |
| elif isinstance(coordinates, list) and len(coordinates) == 6: |
| |
| return struct.pack( |
| "<ffffff", |
| float(coordinates[0]), float(coordinates[1]), float(coordinates[2]), |
| float(coordinates[3]), float(coordinates[4]), float(coordinates[5]) |
| ) |
| else: |
| raise ValueError(f"Invalid coordinates format: {coordinates}") |
|
|
| def _encode_content(self, content: Dict[str, Any]) -> bytes: |
| """コンテンツ(テキスト)をバイナリ化""" |
| thinking = content.get("thinking_process", "").encode('utf-8') |
| response = content.get("final_response", "").encode('utf-8') |
|
|
| |
| result = struct.pack("<I", len(thinking)) + thinking |
| result += struct.pack("<I", len(response)) + response |
|
|
| return result |
|
|
| def _encode_verification(self, verification: Dict[str, Any]) -> bytes: |
| """検証履歴をバイナリ化""" |
| status_map = { |
| "pending_review": 0, |
| "partial_verified": 1, |
| "verified": 2, |
| "expert_confirmed": 3 |
| } |
| status_code = status_map.get(verification.get("status", "pending_review"), 0) |
|
|
| initial_certainty = int(verification.get("initial_certainty", 0) * 100) |
| reviewer_count = len(verification.get("reviewers", [])) |
|
|
| result = struct.pack("<BBI", status_code, initial_certainty, reviewer_count) |
|
|
| |
| for reviewer in verification.get("reviewers", []): |
| result += self._encode_reviewer_reference(reviewer) |
|
|
| return result |
|
|
| def _encode_reviewer_reference(self, reviewer: Dict[str, Any]) -> bytes: |
| """レビュアー情報をエンコード""" |
| reviewer_id = reviewer.get("reviewer_id", "unknown").encode('utf-8') |
| return struct.pack("<36s", reviewer_id[:36]) |
|
|
| def _encode_string(self, s: str) -> bytes: |
| """NULL終端のUTF-8文字列をエンコード""" |
| return s.encode('utf-8') + b'\0' |
|
|
|
|
| def create_tile_from_ai_output( |
| knowledge_id: str, |
| topic: str, |
| prompt: str, |
| response: str, |
| coordinates: List[float], |
| confidence: float, |
| domain_id: str, |
| source: str = "ai_generated" |
| ) -> Dict[str, Any]: |
| """ |
| AI出力からKnowledge Tileオブジェクトを作成 |
| |
| Args: |
| knowledge_id: 知識ID(UUID推奨) |
| topic: トピック |
| prompt: ユーザーの質問 |
| response: AIの回答 |
| coordinates: 6次元座標 [x, y, z, c, g, v] |
| confidence: 信頼度 (0.0-1.0) |
| domain_id: ドメインID |
| source: ソース("ai_generated", "human_verified", etc.) |
| |
| Returns: |
| Knowledge Tile オブジェクト |
| """ |
| if len(coordinates) != 6: |
| raise ValueError(f"Coordinates must be 6-dimensional, got {len(coordinates)}") |
|
|
| |
| if confidence >= 0.9: |
| status = "expert_confirmed" |
| elif confidence >= 0.8: |
| status = "verified" |
| elif confidence >= 0.7: |
| status = "partial_verified" |
| else: |
| status = "pending_review" |
|
|
| tile = { |
| "metadata": { |
| "knowledge_id": knowledge_id, |
| "topic": topic, |
| "created_at": datetime.now().isoformat(), |
| "domain_id": domain_id, |
| "source": source |
| }, |
| "coordinates": { |
| "medical_space": coordinates[:3], |
| "meta_space": coordinates[3:] |
| }, |
| "content": { |
| "thinking_process": f"Question: {prompt}", |
| "final_response": response |
| }, |
| "verification": { |
| "status": status, |
| "initial_certainty": confidence, |
| "reviewers": [] |
| } |
| } |
|
|
| return tile |
|
|
|
|
| def merge_jsonl_to_iath( |
| jsonl_path: str, |
| iath_path: str, |
| coordinate_estimator, |
| llm_inference_func |
| ) -> int: |
| """ |
| JSONLファイルの訓練データを.iath形式に変換して追記 |
| |
| Args: |
| jsonl_path: Alpaca形式のJSONLファイルパス |
| iath_path: 出力先.iathファイルパス |
| coordinate_estimator: CoordinateEstimatorインスタンス |
| llm_inference_func: LLM推論関数 |
| |
| Returns: |
| 変換したタイル数 |
| """ |
| import asyncio |
|
|
| writer = IathWriter(iath_path) |
| tiles_created = 0 |
|
|
| with open(jsonl_path, 'r', encoding='utf-8') as f: |
| for line in f: |
| try: |
| example = json.loads(line.strip()) |
|
|
| |
| coord_result = asyncio.run(coordinate_estimator.estimate_coordinates( |
| prompt=example["input"], |
| response=example["output"], |
| domain_id=example["metadata"]["domain_id"], |
| llm_inference_func=llm_inference_func |
| )) |
|
|
| |
| tile = create_tile_from_ai_output( |
| knowledge_id=str(uuid.uuid4()), |
| topic=example["input"][:100], |
| prompt=example["input"], |
| response=example["output"], |
| coordinates=coord_result["coordinates"], |
| confidence=example["metadata"]["confidence"], |
| domain_id=example["metadata"]["domain_id"], |
| source="master_output" |
| ) |
|
|
| |
| writer.append_tile(tile) |
| tiles_created += 1 |
|
|
| except Exception as e: |
| logger.error(f"Failed to convert line to tile: {e}") |
| continue |
|
|
| logger.info(f"Merged {tiles_created} tiles from {jsonl_path} to {iath_path}") |
| return tiles_created |
|
|