nullai-knowledge-system / null_ai /model_router.py
kofdai's picture
Deploy NullAI Knowledge System to Spaces
075a2b6 verified
import logging
from typing import Optional, Dict, Any, AsyncGenerator, List
import asyncio
import time
import threading # For TextIteratorStreamer
import uuid # For generating unique IDs
import os
from backend.app.config import ConfigManager, ModelConfig, ModelProvider
# SessionLocal は循環インポートを避けるため遅延インポート
from backend.app.services.knowledge_service import KnowledgeService, get_knowledge_service
from null_ai.llm_providers import HuggingFaceProvider, OllamaProvider, MLXProvider, GGUFProvider # Import all providers
from null_ai.iath_memory import DendriticMemorySpace # 樹木型空間記憶
from null_ai.coordinate_estimator import CoordinateEstimator # 座標自動推定
from null_ai.iath_writer import IathWriter, create_tile_from_ai_output # .iath保存
logger = logging.getLogger(__name__)
class ModelRouter:
"""
推論エンジン間でモデルのルーティングと切り替えを管理するクラス。
師匠(Master)モデルと弟子(Apprentice)モデルの状態を保持する。
"""
_instance: Optional['ModelRouter'] = None
_initialized = False
def __new__(cls, config_manager: ConfigManager):
if cls._instance == None:
cls._instance = super(ModelRouter, cls).__new__(cls)
return cls._instance
def __init__(self, config_manager: ConfigManager):
if self._initialized:
return
self.config_manager = config_manager
self.knowledge_service = get_knowledge_service() # Instantiate KnowledgeService internally
self.master_model: Optional[ModelConfig] = None
self.apprentice_model: Optional[ModelConfig] = None
self.active_domain_id: Optional[str] = None # Active domain managed by ConfigManager
self.managed_engines: Dict[str, Dict[str, Any]] = {} # Tracks all engines and their status
# 樹木型空間記憶(.iathファイル)の初期化
self.dendritic_memory: Optional[DendriticMemorySpace] = None
self._load_dendritic_memory()
# 座標自動推定器の初期化
self.coordinate_estimator = CoordinateEstimator()
# .iathライターの初期化
iath_file_path = os.getenv("IATH_DB_PATH", "knowledge_base.iath")
self.iath_writer = IathWriter(iath_file_path)
# LLMプロバイダーを初期化
self.providers: Dict[ModelProvider, Any] = {
ModelProvider.HUGGINGFACE: HuggingFaceProvider(),
ModelProvider.OLLAMA: OllamaProvider(),
ModelProvider.MLX: MLXProvider(),
ModelProvider.GGUF: GGUFProvider(),
}
# 初期ロード時にデフォルトモデルを設定
self._load_all_engines() # Load all configured models into managed_engines
self._load_default_models() # This will now also try to load active master/apprentice from config
self._initialized = True
logger.info("ModelRouter initialized.")
def _load_dendritic_memory(self):
"""樹木型空間記憶(.iathファイル)をロードする"""
try:
# TODO: ドメインごとに異なる.iathファイルを使用する
# 現在はデフォルトのパスを使用
import os
iath_file_path = os.getenv("IATH_DB_PATH", "knowledge_base.iath")
if os.path.exists(iath_file_path):
self.dendritic_memory = DendriticMemorySpace(iath_file_path)
stats = self.dendritic_memory.get_statistics()
logger.info(f"Dendritic memory loaded: {stats['total_tiles']} tiles from {iath_file_path}")
else:
logger.warning(f".iath file not found: {iath_file_path}. Starting with empty memory.")
self.dendritic_memory = None
except Exception as e:
logger.error(f"Failed to load dendritic memory: {e}")
self.dendritic_memory = None
def _get_any_available_model(self) -> Optional[ModelConfig]:
"""
利用可能なモデルを1つ取得する(座標推定などで使用)
優先順位:
1. 師匠モデル
2. 設定された最初のモデル
Returns:
ModelConfig or None
"""
if self.master_model:
return self.master_model
# 設定から最初のモデルを取得
for model_id, model_config in self.config_manager.models.items():
return model_config
return None
def _load_all_engines(self):
"""設定ファイルから全てのモデルをロードし、managed_enginesを初期化する"""
self.managed_engines = {}
for model_id, model_config in self.config_manager.models.items():
self.managed_engines[model_id] = {
"config": model_config.dict(),
"status": "available", # Default status
"unique_id": None # Only for apprentices
}
logger.info(f"Loaded {len(self.managed_engines)} engines into management.")
def _update_engine_status(self, model_id: str, status: str, unique_id: Optional[str] = None):
"""managed_engines内のエンジンのステータスを更新するヘルパー"""
if model_id in self.managed_engines:
self.managed_engines[model_id]["status"] = status
self.managed_engines[model_id]["unique_id"] = unique_id
else:
logger.warning(f"Attempted to update status for unknown engine: {model_id}")
def _load_default_models(self):
"""設定からデフォルトの師匠モデルと弟子モデルをロードする"""
self.active_domain_id = self.config_manager.get_active_domain_id()
# ConfigManagerから永続化されたアクティブな師匠・弟子モデルIDをロード
persisted_master_id = self.config_manager.get_null_ai_setting("active_master_id")
persisted_apprentice_id = self.config_manager.get_null_ai_setting("active_apprentice_id")
# 師匠モデルの設定
if persisted_master_id:
master_config = self.config_manager.get_model_config(persisted_master_id)
if master_config:
self.master_model = master_config
self._update_engine_status(self.master_model.model_id, "master")
logger.info(f"Persisted master model loaded: {self.master_model.display_name}")
else:
logger.warning(f"Persisted master model '{persisted_master_id}' not found in configuration. Attempting to set default.")
self._set_initial_master_from_config() # 永続化されたモデルが見つからない場合はデフォルトを設定
else:
self._set_initial_master_from_config() # 永続化されたマスターIDがない場合はデフォルトを設定
# 弟子モデルの設定
if persisted_apprentice_id:
apprentice_config = self.config_manager.get_model_config(persisted_apprentice_id)
if apprentice_config:
self.apprentice_model = apprentice_config
# unique_idもconfigからロードする
apprentice_unique_id = self.config_manager.get_null_ai_setting(f"apprentice_unique_id_{persisted_apprentice_id}")
self._update_engine_status(self.apprentice_model.model_id, "apprentice", apprentice_unique_id)
logger.info(f"Persisted apprentice model loaded: {self.apprentice_model.display_name}")
else:
logger.warning(f"Persisted apprentice model '{persisted_apprentice_id}' not found in configuration. Clearing active apprentice.")
self.apprentice_model = None
self.config_manager.set_null_ai_setting("active_apprentice_id", None)
else:
self.apprentice_model = None
self.config_manager.set_null_ai_setting("active_apprentice_id", None)
def _set_initial_master_from_config(self):
"""設定からデフォルトの師匠モデルをロード(persistedがない場合や見つからない場合)"""
default_master_config = self.config_manager.get_default_model_config(domain_id=self.active_domain_id)
if default_master_config:
self.master_model = default_master_config
self._update_engine_status(self.master_model.model_id, "master")
self.config_manager.set_null_ai_setting("active_master_id", self.master_model.model_id)
logger.info(f"Default master model loaded for domain '{self.active_domain_id}': {self.master_model.display_name}")
else:
logger.warning(f"No default master model found for domain '{self.active_domain_id}' in configuration. Master model remains unset.")
self.master_model = None
self.config_manager.set_null_ai_setting("active_master_id", None)
def set_active_domain_id(self, domain_id: str):
"""アクティブなドメインIDを設定し、それに応じてモデルを再ロードする"""
if self.active_domain_id != domain_id:
self.active_domain_id = domain_id
self._load_default_models() # アクティブドメイン変更時は師匠・弟子モデルも再設定
logger.info(f"ModelRouter active domain set to {domain_id} and models reloaded.")
def set_master_model(self, model_id: str) -> bool:
"""師匠モデルを設定する"""
model = self.config_manager.get_model_config(model_id)
if model:
# 古い師匠を'retired'に戻す (ただし、それがまさに今昇格している弟子ではない場合)
if self.master_model and self.master_model.model_id != model_id:
# 昇格の場合、古い師匠はretiredにする
self._update_engine_status(self.master_model.model_id, "retired")
logger.info(f"Old master model '{self.master_model.display_name}' set to 'retired'.")
self.master_model = model
self._update_engine_status(model_id, "master")
self.config_manager.set_null_ai_setting("active_master_id", model_id)
logger.info(f"Master model set to: {model.display_name}")
return True
logger.error(f"Model with ID '{model_id}' not found for master setting.")
return False
def set_apprentice_model(self, model_id: Optional[str]) -> bool:
"""弟子モデルを設定する (Noneでクリア)"""
# 古い弟子を'available'に戻す
if self.apprentice_model:
self._update_engine_status(self.apprentice_model.model_id, "available")
if model_id is None or model_id == 'none':
self.apprentice_model = None
self.config_manager.set_null_ai_setting("active_apprentice_id", None)
logger.info("Apprentice model cleared.")
return True
model = self.config_manager.get_model_config(model_id)
if model:
self.apprentice_model = model
# For apprentices, we need to ensure unique_id is tracked if this is a named apprentice
apprentice_unique_id = self.managed_engines[model_id].get("unique_id") # Get existing unique_id
self._update_engine_status(model_id, "apprentice", apprentice_unique_id)
self.config_manager.set_null_ai_setting("active_apprentice_id", model_id)
logger.info(f"Apprentice model set to: {model.display_name}")
return True
logger.error(f"Model with ID '{model_id}' not found for apprentice setting.")
return False
def get_all_managed_engines(self) -> List[Dict[str, Any]]:
"""管理している全てのエンジンとそのステータス、ユニークIDを含むリストを返す"""
return list(self.managed_engines.values())
def get_master_model(self) -> Optional[ModelConfig]:
"""現在の師匠モデルを取得する"""
return self.master_model
def get_apprentice_model(self) -> Optional[ModelConfig]:
"""現在の弟子モデルを取得する"""
return self.apprentice_model
def get_model_config(self, model_id: str) -> Optional[ModelConfig]:
"""指定されたmodel_idのモデル設定を取得する"""
return self.config_manager.models.get(model_id)
def swap_engines(self, apprentice_model_id: str) -> bool:
"""師匠と指定した弟子を入れ替える"""
if not self.master_model:
logger.error("Cannot swap: No master model is currently set.")
return False
apprentice_candidate = self.config_manager.get_model_config(apprentice_model_id)
if not apprentice_candidate:
logger.error(f"Cannot swap: Apprentice model '{apprentice_model_id}' not found.")
return False
# 現在の師匠と弟子の情報を取得
old_master_model = self.master_model
# 新しい師匠は指定された弟子
new_master_model = apprentice_candidate
# 新しい弟子は古い師匠 (ただし、古い師匠が現在のアプレンティス候補でなければ)
new_apprentice_model = old_master_model if old_master_model.model_id != apprentice_model_id else None
# ロールを入れ替える
self.master_model = new_master_model
self.apprentice_model = new_apprentice_model
# ステータスと永続化を更新
self._update_engine_status(new_master_model.model_id, "master", self.managed_engines[new_master_model.model_id].get("unique_id"))
self.config_manager.set_null_ai_setting("active_master_id", new_master_model.model_id)
if new_apprentice_model:
self._update_engine_status(new_apprentice_model.model_id, "apprentice", self.managed_engines[new_apprentice_model.model_id].get("unique_id"))
self.config_manager.set_null_ai_setting("active_apprentice_id", new_apprentice_model.model_id)
else:
self.config_manager.set_null_ai_setting("active_apprentice_id", None)
# 古いアプレンティスが指定されたアプレンティスとは異なる場合、そのステータスをavailableに戻す
if old_master_model and old_master_model.model_id != new_master_model.model_id:
self._update_engine_status(old_master_model.model_id, "available") # Old master becomes available if not the new apprentice
logger.info(f"Engines swapped: New Master is {new_master_model.display_name}, New Apprentice is {new_apprentice_model.display_name if new_apprentice_model else 'None'}")
return True
def promote_apprentice(self, apprentice_model_id: str) -> bool:
"""指定した弟子を師匠に昇格させ、現在の師匠を引退させる"""
apprentice_to_promote = self.config_manager.get_model_config(apprentice_model_id)
if not apprentice_to_promote:
logger.error(f"Cannot promote: Apprentice model '{apprentice_model_id}' not found.")
return False
# 現在の師匠を引退させる
if self.master_model:
self._update_engine_status(self.master_model.model_id, "retired")
logger.info(f"Current master model '{self.master_model.display_name}' retired.")
# 弟子を師匠に昇格させる
self.master_model = apprentice_to_promote
self._update_engine_status(apprentice_to_promote.model_id, "master", self.managed_engines[apprentice_to_promote.model_id].get("unique_id"))
self.config_manager.set_null_ai_setting("active_master_id", apprentice_to_promote.model_id)
# 弟子モデルのロールをクリア (昇格したため)
if self.apprentice_model and self.apprentice_model.model_id == apprentice_model_id:
self.apprentice_model = None
self.config_manager.set_null_ai_setting("active_apprentice_id", None)
logger.info(f"Apprentice model '{apprentice_to_promote.display_name}' promoted to master and apprentice role cleared.")
else:
logger.warning(f"Apprentice '{apprentice_model_id}' was promoted, but was not the currently active apprentice.")
logger.info(f"Apprentice '{apprentice_to_promote.display_name}' promoted to Master.")
return True
def create_new_apprentice(self) -> Optional[Dict[str, Any]]:
"""新しい「空っぽの弟子」推論エンジンを生成し、登録する"""
new_apprentice_unique_id = str(uuid.uuid4())
new_apprentice_model_id = f"apprentice-{new_apprentice_unique_id[:8]}"
new_apprentice_display_name = f"Apprentice ({new_apprentice_unique_id[:4]})"
# ユーザーは「文字通り学習データがないもの」と指定。
# ここでは、汎用的なベースモデルを想定するか、あるいは特定のプロバイダー/モデル名を指定する。
# 仮に、Ollamaの何らかの軽量モデルをテンプレートとして利用する。
# または、単にモデル設定のみを生成し、実際のモデルファイルは後で学習時にロードする。
# NOTE: この "empty" モデルが具体的に何を指すかは、
# 後続のファインチューニングのロジックによって具体化される必要がある。
# ここでは、設定上の一エントリとして追加する。
# TODO: ベースとなる「空っぽ」モデルの設定をnull_ai_config.jsonなどから取得できるようにする
base_apprentice_provider = ModelProvider.OLLAMA # 仮のデフォルトプロバイダー
base_apprentice_model_name = "mistral:latest" # 仮のデフォルトモデル名
# Configuration for the new apprentice
new_model_config_data = {
"model_id": new_apprentice_model_id,
"display_name": new_apprentice_display_name,
"provider": base_apprentice_provider.value,
"model_name": base_apprentice_model_name,
"max_tokens": 4096,
"temperature": 0.7,
"timeout": 120,
"is_default": False,
"supported_domains": ["general"], # New apprentices start with general knowledge
"description": f"Newly generated empty apprentice model with unique ID {new_apprentice_unique_id}."
}
try:
new_model = self.config_manager.add_model(new_model_config_data)
if not new_model:
raise Exception("Failed to add new model via config_manager.")
self.managed_engines[new_model.model_id] = {
"config": new_model.dict(),
"status": "available",
"unique_id": new_apprentice_unique_id
}
# unique_idも永続化しておく
self.config_manager.set_null_ai_setting(f"apprentice_unique_id_{new_model.model_id}", new_apprentice_unique_id)
logger.info(f"New empty apprentice '{new_apprentice_display_name}' created with ID: {new_apprentice_model_id}")
return self.managed_engines[new_model.model_id]
except Exception as e:
logger.error(f"Failed to create new apprentice: {e}")
return None
def get_active_model(self, for_inference: bool = True) -> Optional[ModelConfig]:
"""
推論に使用するアクティブなモデルを取得する。
for_inferenceがTrueの場合、師匠モデルを返す。
倒木システムにおける「学習」などの用途で弟子モデルが必要な場合は、
直接get_apprentice_modelを使用する。
"""
if for_inference:
return self.master_model
# 将来的に「成長した」弟子モデルへの自動切り替えロジックが入る可能性
return self.master_model # デフォルトでは師匠モデルを使用
async def infer(self, prompt: str, domain_id: str, model_config: ModelConfig, temperature: Optional[float] = None, save_to_memory: bool = False, rag_mode: str = "rag") -> Dict[str, Any]:
"""
指定されたモデルで推論を実行する。
RAG: DBに知識があればそれを使用、なければAI内部知識で推論してDBに蓄積。
rag_modeに応じて動作を切り替える。
"""
# rag_mode == 'direct' の場合の処理
if rag_mode == "direct":
relevant_knowledge = self._retrieve_relevant_knowledge(domain_id, prompt, top_k=1)
if relevant_knowledge:
# DBから直接回答を返す
best_match = relevant_knowledge[0]
logger.info(f"Direct mode: DB knowledge found. Returning direct answer from tile {best_match['id']}.")
return {
"response": best_match['content'],
"thinking": f"Directly retrieved from knowledge base. Tile ID: {best_match['id']}.",
"confidence": best_match['confidence_score'],
"model_used": "database_direct",
"latency_ms": 50, # DB検索なので高速
"source_type": "db_direct"
}
else:
# DBに情報がない場合は「わかりません」と返す
logger.info("Direct mode: No DB knowledge found. Returning 'Not found'.")
return {
"response": "ご指定の情報はナレッジベース内に見つかりませんでした。",
"thinking": "Directly searched knowledge base, but no relevant information was found.",
"confidence": 0.9, # 「見つからない」という回答には高い信頼度を与える
"model_used": "database_direct",
"latency_ms": 50,
"source_type": "db_direct"
}
# rag_mode == 'rag' の場合の処理 (既存のロジック)
has_db_knowledge = self._check_db_knowledge(domain_id, prompt)
source_type = "db_augmented" if has_db_knowledge else "ai_internal_weights"
thinking_process = ""
augmented_prompt = prompt
if has_db_knowledge:
# DBから関連知識を取得
relevant_knowledge = self._retrieve_relevant_knowledge(domain_id, prompt, top_k=3)
logger.info(f"RAG mode: DB knowledge found for domain '{domain_id}'. Retrieved {len(relevant_knowledge)} relevant tiles.")
# RAG: プロンプトに知識を統合
knowledge_context = "\n\n".join([
f"[Knowledge {i+1} - {tile['verification_type']} verification, confidence: {tile['confidence_score']}]\n"
f"Topic: {tile['topic']}\n"
f"Content: {tile['content']}"
for i, tile in enumerate(relevant_knowledge)
])
augmented_prompt = f"""Based on the following verified knowledge from the database:
{knowledge_context}
Now, please answer the following question accurately:
{prompt}"""
response = await self._perform_llm_inference(model_config, augmented_prompt, temperature or model_config.temperature)
thinking_process = f"Accessed knowledge base. Retrieved {len(relevant_knowledge)} relevant knowledge tiles. " + response.get("thinking", "")
else:
# DBに知識がない場合、AI内部知識で推論
logger.info(f"RAG mode: No DB knowledge found for domain '{domain_id}'. Using AI internal weights.")
response = await self._perform_llm_inference(model_config, prompt, temperature or model_config.temperature)
thinking_process = "No specific DB knowledge found. Inferred from AI internal weights. " + response.get("thinking", "")
# 自己拡充: 推論結果をDBに保存
if save_to_memory and response.get("confidence", 0) >= 0.7:
await self._save_inference_to_db(
domain_id=domain_id,
prompt=prompt,
response=response.get("response", ""),
confidence=response.get("confidence", 0.7),
source_type="ai",
model_id=model_config.model_id
)
# 倒木システム: 師匠の出力を教師データとして保存
is_master = (self.master_model and model_config.model_id == self.master_model.model_id)
logger.info(f"[Training Data Check] master_model={self.master_model.model_id if self.master_model else None}, "
f"current_model={model_config.model_id}, is_master={is_master}, "
f"confidence={response.get('confidence', 0)}, save_to_memory={save_to_memory}")
if is_master and response.get("confidence", 0) >= 0.8:
logger.info(f"[Training Data] Saving master output for domain '{domain_id}'...")
await self._save_master_output_as_training_data(
prompt=prompt,
master_response=response.get("response", ""),
domain_id=domain_id,
confidence=response.get("confidence", 0.8)
)
else:
logger.info(f"[Training Data] NOT saving: is_master={is_master}, confidence={response.get('confidence', 0)}")
return {
"response": response.get("response", ""),
"thinking": thinking_process,
"confidence": response.get("confidence", 0.7),
"model_used": model_config.model_id,
"latency_ms": response.get("latency_ms", 0),
"source_type": source_type
}
async def infer_streaming(self, prompt: str, domain_id: str, model_config: ModelConfig, temperature: Optional[float] = None, save_to_memory: bool = False, rag_mode: str = "rag") -> AsyncGenerator[Dict[str, Any], None]:
print(f"DEBUG: infer_streaming called - model={model_config.model_id if model_config else 'None'}")
logger.error(f"[CRITICAL DEBUG] infer_streaming ENTRY POINT - model={model_config.model_id if model_config else 'None'}, save_to_memory={save_to_memory}")
"""
指定されたモデルで推論をストリーミングで実行する。
RAG: DBに知識があればそれを使用、なければAI内部知識で推論してDBに蓄積。
rag_modeに応じて動作を切り替える。
"""
logger.info(f"[INFER_STREAMING] Called with model={model_config.model_id}, domain={domain_id}, save_to_memory={save_to_memory}")
# 師匠の応答を蓄積するための変数(finallyブロックで使用)
generated_response = ""
try:
# rag_mode == 'direct' の場合の処理
if rag_mode == "direct":
relevant_knowledge = self._retrieve_relevant_knowledge(domain_id, prompt, top_k=1)
if relevant_knowledge:
best_match = relevant_knowledge[0]
logger.info(f"Direct mode (streaming): DB knowledge found. Yielding direct answer from tile {best_match['id']}.")
yield {"type": "token", "content": best_match['content']}
yield {"type": "meta", "source_type": "db_direct", "thinking": f"Directly retrieved from knowledge base. Tile ID: {best_match['id']}.", "confidence": best_match['confidence_score'], "model_used": "database_direct"}
yield {"type": "complete", "content": best_match['content']}
return
else:
logger.info("Direct mode (streaming): No DB knowledge found. Yielding 'Not found'.")
response_text = "ご指定の情報はナレッジベース内に見つかりませんでした。"
yield {"type": "token", "content": response_text}
yield {"type": "meta", "source_type": "db_direct", "thinking": "Directly searched knowledge base, but no relevant information was found.", "confidence": 0.9, "model_used": "database_direct"}
yield {"type": "complete", "content": response_text}
return
# rag_mode == 'rag' の場合の処理 (既存のロジック)
has_db_knowledge = self._check_db_knowledge(domain_id, prompt)
source_type = "db_augmented" if has_db_knowledge else "ai_internal_weights"
augmented_prompt = prompt
yield {"type": "thinking", "content": f"Checking DB knowledge for domain '{domain_id}'..."}
await asyncio.sleep(0.1)
if has_db_knowledge:
# DBから関連知識を取得
relevant_knowledge = self._retrieve_relevant_knowledge(domain_id, prompt, top_k=3)
yield {"type": "thinking", "content": f"Relevant DB knowledge found. Retrieved {len(relevant_knowledge)} tiles. Augmenting prompt..."}
# RAG: プロンプトに知識を統合
knowledge_context = "\n\n".join([
f"[Knowledge {i+1} - {tile['verification_type']} verification, confidence: {tile['confidence_score']}]\n"
f"Topic: {tile['topic']}\n"
f"Content: {tile['content']}"
for i, tile in enumerate(relevant_knowledge)
])
augmented_prompt = f"""Based on the following verified knowledge from the database:
{knowledge_context}
Now, please answer the following question accurately:
{prompt}"""
# ストリーミング推論実行
async for chunk in self._perform_llm_streaming_inference(model_config, augmented_prompt, temperature or model_config.temperature):
if chunk.get("type") == "token":
generated_response += chunk.get("content", "")
yield chunk
yield {"type": "meta", "source_type": source_type, "thinking": f"Accessed knowledge base. Retrieved {len(relevant_knowledge)} relevant knowledge tiles."}
# 倒木システム: 師匠の出力を教師データとして保存 (DB知識使用時も保存)
logger.error(f"[CRITICAL DEBUG] Reached training data save section (with DB knowledge)! generated_response length={len(generated_response)}")
is_master = (self.master_model and model_config.model_id == self.master_model.model_id)
logger.info(f"[Training Data Check - DB Augmented] master_model={self.master_model.model_id if self.master_model else None}, "
f"current_model={model_config.model_id}, is_master={is_master}, "
f"response_length={len(generated_response)}, save_to_memory={save_to_memory}")
if is_master and len(generated_response) > 0:
logger.info(f"[Training Data - DB Augmented] Saving master output for domain '{domain_id}'...")
await self._save_master_output_as_training_data(
prompt=prompt,
master_response=generated_response,
domain_id=domain_id,
confidence=0.8
)
else:
logger.info(f"[Training Data - DB Augmented] NOT saving: is_master={is_master}, response_length={len(generated_response)}")
else:
yield {"type": "thinking", "content": "No specific DB knowledge found. Using AI internal weights."}
# ストリーミング推論実行
async for chunk in self._perform_llm_streaming_inference(model_config, prompt, temperature or model_config.temperature):
if chunk.get("type") == "token":
generated_response += chunk.get("content", "")
yield chunk
yield {"type": "meta", "source_type": source_type, "thinking": "Inferred from AI internal weights."}
# 倒木システム: 師匠の出力を教師データとして保存
logger.error(f"[CRITICAL DEBUG] Reached training data save section! generated_response length={len(generated_response)}")
is_master = (self.master_model and model_config.model_id == self.master_model.model_id)
logger.info(f"[Training Data Check - Streaming] master_model={self.master_model.model_id if self.master_model else None}, "
f"current_model={model_config.model_id}, is_master={is_master}, "
f"response_length={len(generated_response)}, save_to_memory={save_to_memory}")
if is_master and len(generated_response) > 0:
logger.info(f"[Training Data - Streaming] Saving master output for domain '{domain_id}'...")
await self._save_master_output_as_training_data(
prompt=prompt,
master_response=generated_response,
domain_id=domain_id,
confidence=0.8
)
else:
logger.info(f"[Training Data - Streaming] NOT saving: is_master={is_master}, response_length={len(generated_response)}")
# 自己拡充: 推論結果をDBに保存(信頼度が十分な場合)
if save_to_memory and len(generated_response) > 0:
await self._save_inference_to_db(
domain_id=domain_id,
prompt=prompt,
response=generated_response,
confidence=0.75, # ストリーミングなので固定値
source_type="ai",
model_id=model_config.model_id
)
except Exception as e:
logger.error(f"Error during streaming inference: {e}")
yield {"type": "error", "content": str(e)}
def _check_db_knowledge(self, domain_id: str, prompt: str) -> bool:
"""
指定されたドメインとプロンプトに関連する知識があるかチェックする。
樹木型空間記憶(.iath)を使用。
"""
try:
if self.dendritic_memory is None:
return False
# テキスト検索でマッチがあるかチェック
results = self.dendritic_memory.search_by_text(prompt[:200], top_k=1)
return len(results) > 0
except Exception as e:
logger.error(f"Error checking dendritic memory: {e}")
return False
def _retrieve_relevant_knowledge(self, domain_id: str, prompt: str, top_k: int = 3) -> list:
"""
指定されたドメインとプロンプトに関連する知識を取得する。
1. まずSQLデータベースから検索
2. 見つからなければ樹木型空間記憶(.iath)から検索
"""
results = []
# 1. SQLデータベースから検索
try:
from backend.app.database.session import SessionLocal
db = SessionLocal()
try:
# キーワード検索でSQLデータベースから関連する知識タイルを取得
tiles_orm, total_count = self.knowledge_service.list_tiles(
db=db, page=1, page_size=top_k,
domain_id=domain_id, search=prompt
)
if tiles_orm:
logger.info(f"Found {len(tiles_orm)} tiles from SQL database for query: '{prompt[:50]}...'")
# SQLデータベースの結果を統一フォーマットに変換
for tile in tiles_orm:
results.append({
"id": tile.id,
"topic": tile.topic,
"content": tile.content,
"thinking_process": "", # SQLにthinking_processは別に保存されていない
"confidence_score": tile.confidence_score if tile.confidence_score else 0.5,
"verification_type": tile.verification_type or "community",
"coordinates": tile.coordinates if tile.coordinates else [],
"text_match_score": 0.8, # SQL検索なので固定値
"spatial_distance": None
})
finally:
db.close()
if results:
return results[:top_k]
except Exception as e:
logger.warning(f"SQL database search failed: {e}. Falling back to .iath memory.")
# 2. SQLデータベースで見つからない場合、.iathメモリから検索
try:
if self.dendritic_memory is None:
logger.warning("No dendritic memory available and SQL search failed")
return []
# ハイブリッド検索(テキスト + 空間座標)
iath_results = self.dendritic_memory.hybrid_search(prompt, query_coords=None, top_k=top_k)
# 統一フォーマットに変換
results = [
{
"id": tile["metadata"]["knowledge_id"],
"topic": tile["metadata"]["topic"],
"content": tile["content"]["final_response"],
"thinking_process": tile["content"]["thinking_process"],
"confidence_score": tile["verification"]["initial_certainty"],
"verification_type": tile["verification"]["status"],
"coordinates": tile["coordinates"],
"text_match_score": tile.get("text_match_score", 0),
"spatial_distance": tile.get("spatial_distance", None)
}
for tile in iath_results
]
if results:
logger.info(f"Found {len(results)} tiles from .iath memory for query: '{prompt[:50]}...'")
return results
except Exception as e:
logger.error(f"Error retrieving relevant knowledge from both SQL and .iath: {e}", exc_info=True)
return []
async def _perform_llm_inference(self, model_config: ModelConfig, prompt: str, temperature: float) -> Dict[str, Any]:
"""
指定されたモデル設定でLLM推論を実行する。
各プロバイダーの実装を使用。
"""
provider_type = model_config.provider
if provider_type not in self.providers:
raise ValueError(f"Unsupported provider: {provider_type}")
provider = self.providers[provider_type]
try:
result = await provider.infer(model_config, prompt, temperature)
return result
except Exception as e:
logger.error(f"Error during LLM inference with provider '{provider_type}': {e}")
raise
async def _perform_llm_streaming_inference(self, model_config: ModelConfig, prompt: str, temperature: float) -> AsyncGenerator[Dict[str, Any], None]:
"""
指定されたモデル設定でLLMストリーミング推論を実行する。
各プロバイダーの実装を使用。
"""
provider_type = model_config.provider
if provider_type not in self.providers:
raise ValueError(f"Unsupported provider: {provider_type}")
provider = self.providers[provider_type]
try:
async for chunk in provider.infer_streaming(model_config, prompt, temperature):
yield chunk
except Exception as e:
logger.error(f"Error during LLM streaming inference with provider '{provider_type}': {e}")
yield {"type": "error", "content": str(e)}
async def _save_inference_to_db(self, domain_id: str, prompt: str, response: str, confidence: float, source_type: str, model_id: str) -> bool:
"""
推論結果をKnowledge TileとしてDBに保存する(自己拡充)。
Priority 2実装: 座標自動推定 + .iath保存
"""
try:
from backend.app.database.models import KnowledgeTile
from backend.app.database.session import SessionLocal # 遅延インポート
db = SessionLocal()
tile_id = f"ai_ktile_{uuid.uuid4().hex}"
# AI生成の知識タイルを作成(SQLite)
new_tile = KnowledgeTile(
id=tile_id,
workspace_id="default_workspace", # ローカル版なのでデフォルトワークスペース
domain_id=domain_id,
topic=prompt[:200], # プロンプトをトピックとして使用
content=response,
confidence_score=confidence,
verification_type="ai", # AI生成を示す
verification_count=1,
contributor_id=None, # AIなのでcontributorなし
last_verified_by_id=None
)
db.add(new_tile)
db.commit()
db.close()
logger.info(f"Saved AI inference to SQLite: {new_tile.id}")
# Priority 2: 座標自動推定 + .iath保存
try:
# 座標推定用のLLM推論関数を作成
async def llm_inference_for_coords(coord_prompt):
# 師匠モデル(またはDeepSeek)を使って座標推定
estimation_model = self.master_model if self.master_model else self._get_any_available_model()
if estimation_model:
result = await self._perform_llm_inference(
estimation_model,
coord_prompt,
temperature=0.3 # 低温度で一貫性を保つ
)
return result.get("response", "")
return "{\"coordinates\": [0.5, 0.5, 0.5, 0.5, 0.5, 0.5], \"confidence\": 0.3}"
# 座標を推定
coord_result = await self.coordinate_estimator.estimate_coordinates(
prompt=prompt,
response=response,
domain_id=domain_id,
llm_inference_func=llm_inference_for_coords,
use_reasoning=False # 高速化のため推論過程は省略
)
# .iath Tileオブジェクトを作成
iath_tile = create_tile_from_ai_output(
knowledge_id=tile_id,
topic=prompt[:100],
prompt=prompt,
response=response,
coordinates=coord_result["coordinates"],
confidence=confidence,
domain_id=domain_id,
source="ai_generated"
)
# .iathファイルに保存
success = self.iath_writer.append_tile(iath_tile)
if success:
logger.info(f"Saved AI inference to .iath with coordinates: {coord_result['coordinates']}")
# メモリをリロード(新しいタイルを検索可能にする)
self._load_dendritic_memory()
else:
logger.warning(f"Failed to save to .iath, but SQLite save succeeded")
except Exception as iath_error:
logger.error(f"Error saving to .iath (SQLite save succeeded): {iath_error}")
# .iath保存失敗してもSQLite保存は成功しているのでTrueを返す
return True
except Exception as e:
logger.error(f"Error saving inference to DB: {e}")
return False
async def _save_master_output_as_training_data(self, prompt: str, master_response: str, domain_id: str, confidence: float) -> bool:
"""
師匠の出力を弟子のファインチューニング用教師データとして保存する。
倒木システムの核心機能。
"""
try:
import json
import os
from datetime import datetime
logger.info(f"[Save Training Data] Starting to save master output (domain={domain_id}, confidence={confidence})")
# トレーニングデータ保存ディレクトリ
training_data_dir = "training_data/master_outputs"
logger.info(f"[Save Training Data] Creating directory: {training_data_dir}")
os.makedirs(training_data_dir, exist_ok=True)
# Alpaca形式で保存
training_example = {
"instruction": f"You are an expert in {domain_id}. Provide accurate information based on verified knowledge.",
"input": prompt,
"output": master_response,
"metadata": {
"domain_id": domain_id,
"confidence": confidence,
"master_model_id": self.master_model.model_id if self.master_model else "unknown",
"timestamp": datetime.utcnow().isoformat(),
"source": "master_output"
}
}
# JSONLファイルに追記
output_file = os.path.join(training_data_dir, f"master_outputs_{domain_id}.jsonl")
logger.info(f"[Save Training Data] Writing to file: {output_file}")
with open(output_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(training_example, ensure_ascii=False) + '\n')
logger.info(f"✓ Successfully saved master output as training data: {output_file}")
return True
except Exception as e:
logger.error(f"✗ Error saving master output as training data: {e}", exc_info=True)
return False