nullai-knowledge-system / iath_writer.py
kofdai's picture
Upload folder using huggingface_hub
5af8123 verified
# null_ai/iath_writer.py
"""
.iath File Writer Module
AI生成知識を.iath形式で保存するためのモジュール。
dendritic-memory-editor完全互換。
"""
import struct
import zstandard as zstd
from datetime import datetime
import json
import logging
from pathlib import Path
from typing import Dict, List, Any, Optional
import uuid
from null_ai.iath_memory import IathDecoder
logger = logging.getLogger(__name__)
class IathWriter:
"""
.iathファイルへの書き込みを管理するクラス
"""
def __init__(self, iath_file_path: str):
self.file_path = Path(iath_file_path)
self.encoder = IathTileEncoder()
def append_tile(self, tile: Dict[str, Any]) -> bool:
"""
既存の.iathファイルに新しいタイルを追記
Args:
tile: Knowledge Tile オブジェクト
Returns:
成功したかどうか
"""
try:
# 既存ファイルが存在するか確認
if not self.file_path.exists():
# 新規ファイル作成
return self._create_new_iath_file([tile])
# 既存ファイルの読み込み
decoder = IathDecoder(str(self.file_path))
existing_tiles = decoder.get_all_tiles()
# 新しいタイルを追加
existing_tiles.append(tile)
# ファイルを再構築
return self._rebuild_iath_file(existing_tiles)
except Exception as e:
logger.error(f"Failed to append tile to .iath: {e}")
return False
def append_tiles_batch(self, tiles: List[Dict[str, Any]]) -> bool:
"""
複数のタイルを一括追記
Args:
tiles: Knowledge Tile のリスト
Returns:
成功したかどうか
"""
try:
if not self.file_path.exists():
return self._create_new_iath_file(tiles)
decoder = IathDecoder(str(self.file_path))
existing_tiles = decoder.get_all_tiles()
# 新しいタイルを追加
existing_tiles.extend(tiles)
# ファイルを再構築
return self._rebuild_iath_file(existing_tiles)
except Exception as e:
logger.error(f"Failed to append tiles batch to .iath: {e}")
return False
def _create_new_iath_file(self, tiles: List[Dict[str, Any]]) -> bool:
"""
新規.iathファイルを作成
"""
try:
with open(self.file_path, 'wb') as f:
# Header作成
header = self._create_header(len(tiles))
f.write(header)
# Index作成
index_data, tiles_data = self._create_index_and_data(tiles)
index_json = json.dumps(index_data, ensure_ascii=False).encode('utf-8')
# Indexサイズを書き込み
f.write(struct.pack('<I', len(index_json)))
f.write(index_json)
# Dataセクション書き込み
f.write(tiles_data)
logger.info(f"Created new .iath file: {self.file_path}")
return True
except Exception as e:
logger.error(f"Failed to create new .iath file: {e}")
return False
def _rebuild_iath_file(self, tiles: List[Dict[str, Any]]) -> bool:
"""
.iathファイルを再構築
"""
# 一時ファイルに書き込み
temp_path = self.file_path.with_suffix('.iath.tmp')
try:
with open(temp_path, 'wb') as f:
# Header作成
header = self._create_header(len(tiles))
f.write(header)
# Index作成
index_data, tiles_data = self._create_index_and_data(tiles)
index_json = json.dumps(index_data, ensure_ascii=False).encode('utf-8')
# Indexサイズを書き込み
f.write(struct.pack('<I', len(index_json)))
f.write(index_json)
# Dataセクション書き込み
f.write(tiles_data)
# 一時ファイルを本ファイルに置き換え
temp_path.replace(self.file_path)
logger.info(f"Rebuilt .iath file with {len(tiles)} tiles: {self.file_path}")
return True
except Exception as e:
logger.error(f"Failed to rebuild .iath file: {e}")
if temp_path.exists():
temp_path.unlink()
return False
def _create_header(self, tile_count: int) -> bytes:
"""
.iathヘッダーを作成(64バイト)
"""
# マジックナンバー: "IATH" (4 bytes)
magic = b'IATH'
# バージョン: 1.0 (2 bytes)
version_major = 1
version_minor = 0
# タイル数 (4 bytes)
# 作成日時 (8 bytes, Unix timestamp)
created_at = int(datetime.now().timestamp())
# 予約領域 (46 bytes)
reserved = b'\x00' * 46
header = struct.pack(
'<4s 2B I Q 46s',
magic,
version_major,
version_minor,
tile_count,
created_at,
reserved
)
assert len(header) == 64, f"Header size must be 64 bytes, got {len(header)}"
return header
def _create_index_and_data(self, tiles: List[Dict[str, Any]]) -> tuple:
"""
インデックスとデータセクションを作成
Returns:
(index_dict, data_bytes)
"""
index = {"tiles": []}
data_buffer = bytearray()
current_offset = 0
for tile in tiles:
# タイルをエンコード
tile_bytes = self.encoder.encode_tile(tile)
# インデックスエントリ作成
index["tiles"].append({
"id": tile["metadata"]["knowledge_id"],
"offset": current_offset,
"size": len(tile_bytes)
})
# データバッファに追加
data_buffer.extend(tile_bytes)
current_offset += len(tile_bytes)
return index, bytes(data_buffer)
class IathTileEncoder:
"""
Knowledge Tileを.iath互換のバイナリ形式にエンコード
dendritic-memory-editorのIathEncoderと互換性あり
"""
def encode_tile(self, tile: Dict[str, Any]) -> bytes:
"""
単一のKnowledge Tileをエンコードし、zstdで圧縮
Args:
tile: Knowledge Tile オブジェクト
Returns:
圧縮されたバイナリデータ
"""
# 各セクションをエンコード
metadata_bin = self._encode_metadata(tile["metadata"])
coord_bin = self._encode_coordinates(tile["coordinates"])
content_bin = self._encode_content(tile["content"])
verification_bin = self._encode_verification(tile["verification"])
# 長さプレフィックスを付けて連結
uncompressed = b"".join([
struct.pack("<I", len(metadata_bin)), metadata_bin,
struct.pack("<I", len(coord_bin)), coord_bin,
struct.pack("<I", len(content_bin)), content_bin,
struct.pack("<I", len(verification_bin)), verification_bin,
])
# zstdで圧縮(レベル19 = 最高圧縮率)
cctx = zstd.ZstdCompressor(level=19)
compressed = cctx.compress(uncompressed)
return compressed
def _encode_metadata(self, metadata: Dict[str, Any]) -> bytes:
"""メタデータをバイナリ化"""
kid = self._encode_string(metadata["knowledge_id"])
topic = self._encode_string(metadata["topic"])
# 作成日時(ISO形式)
created_at_iso = metadata.get("created_at", datetime.now().isoformat())
created_at = created_at_iso.encode('ascii')[:27] # ISO format with Z
return kid + topic + created_at
def _encode_coordinates(self, coordinates: Dict[str, Any]) -> bytes:
"""
座標をバイナリ化(6つの浮動小数点数)
座標は以下のいずれかの形式を受け付ける:
1. {"medical_space": [x, y, z], "meta_space": [c, g, v]}
2. [x, y, z, c, g, v]
"""
if isinstance(coordinates, dict):
medical_space = coordinates["medical_space"]
meta_space = coordinates["meta_space"]
return struct.pack(
"<ffffff",
float(medical_space[0]), float(medical_space[1]), float(medical_space[2]),
float(meta_space[0]), float(meta_space[1]), float(meta_space[2])
)
elif isinstance(coordinates, list) and len(coordinates) == 6:
# フラットな配列形式
return struct.pack(
"<ffffff",
float(coordinates[0]), float(coordinates[1]), float(coordinates[2]),
float(coordinates[3]), float(coordinates[4]), float(coordinates[5])
)
else:
raise ValueError(f"Invalid coordinates format: {coordinates}")
def _encode_content(self, content: Dict[str, Any]) -> bytes:
"""コンテンツ(テキスト)をバイナリ化"""
thinking = content.get("thinking_process", "").encode('utf-8')
response = content.get("final_response", "").encode('utf-8')
# 各パートの長さを前に付けて連結
result = struct.pack("<I", len(thinking)) + thinking
result += struct.pack("<I", len(response)) + response
return result
def _encode_verification(self, verification: Dict[str, Any]) -> bytes:
"""検証履歴をバイナリ化"""
status_map = {
"pending_review": 0,
"partial_verified": 1,
"verified": 2,
"expert_confirmed": 3
}
status_code = status_map.get(verification.get("status", "pending_review"), 0)
initial_certainty = int(verification.get("initial_certainty", 0) * 100) # 0-100に変換
reviewer_count = len(verification.get("reviewers", []))
result = struct.pack("<BBI", status_code, initial_certainty, reviewer_count)
# レビュアー情報
for reviewer in verification.get("reviewers", []):
result += self._encode_reviewer_reference(reviewer)
return result
def _encode_reviewer_reference(self, reviewer: Dict[str, Any]) -> bytes:
"""レビュアー情報をエンコード"""
reviewer_id = reviewer.get("reviewer_id", "unknown").encode('utf-8')
return struct.pack("<36s", reviewer_id[:36]) # UUID string length
def _encode_string(self, s: str) -> bytes:
"""NULL終端のUTF-8文字列をエンコード"""
return s.encode('utf-8') + b'\0'
def create_tile_from_ai_output(
knowledge_id: str,
topic: str,
prompt: str,
response: str,
coordinates: List[float],
confidence: float,
domain_id: str,
source: str = "ai_generated"
) -> Dict[str, Any]:
"""
AI出力からKnowledge Tileオブジェクトを作成
Args:
knowledge_id: 知識ID(UUID推奨)
topic: トピック
prompt: ユーザーの質問
response: AIの回答
coordinates: 6次元座標 [x, y, z, c, g, v]
confidence: 信頼度 (0.0-1.0)
domain_id: ドメインID
source: ソース("ai_generated", "human_verified", etc.)
Returns:
Knowledge Tile オブジェクト
"""
if len(coordinates) != 6:
raise ValueError(f"Coordinates must be 6-dimensional, got {len(coordinates)}")
# 検証ステータスの決定
if confidence >= 0.9:
status = "expert_confirmed"
elif confidence >= 0.8:
status = "verified"
elif confidence >= 0.7:
status = "partial_verified"
else:
status = "pending_review"
tile = {
"metadata": {
"knowledge_id": knowledge_id,
"topic": topic,
"created_at": datetime.now().isoformat(),
"domain_id": domain_id,
"source": source
},
"coordinates": {
"medical_space": coordinates[:3],
"meta_space": coordinates[3:]
},
"content": {
"thinking_process": f"Question: {prompt}",
"final_response": response
},
"verification": {
"status": status,
"initial_certainty": confidence,
"reviewers": []
}
}
return tile
def merge_jsonl_to_iath(
jsonl_path: str,
iath_path: str,
coordinate_estimator,
llm_inference_func
) -> int:
"""
JSONLファイルの訓練データを.iath形式に変換して追記
Args:
jsonl_path: Alpaca形式のJSONLファイルパス
iath_path: 出力先.iathファイルパス
coordinate_estimator: CoordinateEstimatorインスタンス
llm_inference_func: LLM推論関数
Returns:
変換したタイル数
"""
import asyncio
writer = IathWriter(iath_path)
tiles_created = 0
with open(jsonl_path, 'r', encoding='utf-8') as f:
for line in f:
try:
example = json.loads(line.strip())
# 座標推定
coord_result = asyncio.run(coordinate_estimator.estimate_coordinates(
prompt=example["input"],
response=example["output"],
domain_id=example["metadata"]["domain_id"],
llm_inference_func=llm_inference_func
))
# Tileオブジェクト作成
tile = create_tile_from_ai_output(
knowledge_id=str(uuid.uuid4()),
topic=example["input"][:100], # 最初の100文字をトピックに
prompt=example["input"],
response=example["output"],
coordinates=coord_result["coordinates"],
confidence=example["metadata"]["confidence"],
domain_id=example["metadata"]["domain_id"],
source="master_output"
)
# .iathに追記
writer.append_tile(tile)
tiles_created += 1
except Exception as e:
logger.error(f"Failed to convert line to tile: {e}")
continue
logger.info(f"Merged {tiles_created} tiles from {jsonl_path} to {iath_path}")
return tiles_created