Spaces:
Sleeping
Sleeping
| import logging | |
| from typing import Optional, Dict, Any, AsyncGenerator, List | |
| import asyncio | |
| import time | |
| import threading # For TextIteratorStreamer | |
| import uuid # For generating unique IDs | |
| import os | |
| from backend.app.config import ConfigManager, ModelConfig, ModelProvider | |
| # SessionLocal は循環インポートを避けるため遅延インポート | |
| from backend.app.services.knowledge_service import KnowledgeService, get_knowledge_service | |
| from null_ai.llm_providers import HuggingFaceProvider, OllamaProvider, MLXProvider, GGUFProvider # Import all providers | |
| from null_ai.iath_memory import DendriticMemorySpace # 樹木型空間記憶 | |
| from null_ai.coordinate_estimator import CoordinateEstimator # 座標自動推定 | |
| from null_ai.iath_writer import IathWriter, create_tile_from_ai_output # .iath保存 | |
| logger = logging.getLogger(__name__) | |
| class ModelRouter: | |
| """ | |
| 推論エンジン間でモデルのルーティングと切り替えを管理するクラス。 | |
| 師匠(Master)モデルと弟子(Apprentice)モデルの状態を保持する。 | |
| """ | |
| _instance: Optional['ModelRouter'] = None | |
| _initialized = False | |
| def __new__(cls, config_manager: ConfigManager): | |
| if cls._instance == None: | |
| cls._instance = super(ModelRouter, cls).__new__(cls) | |
| return cls._instance | |
| def __init__(self, config_manager: ConfigManager): | |
| if self._initialized: | |
| return | |
| self.config_manager = config_manager | |
| self.knowledge_service = get_knowledge_service() # Instantiate KnowledgeService internally | |
| self.master_model: Optional[ModelConfig] = None | |
| self.apprentice_model: Optional[ModelConfig] = None | |
| self.active_domain_id: Optional[str] = None # Active domain managed by ConfigManager | |
| self.managed_engines: Dict[str, Dict[str, Any]] = {} # Tracks all engines and their status | |
| # 樹木型空間記憶(.iathファイル)の初期化 | |
| self.dendritic_memory: Optional[DendriticMemorySpace] = None | |
| self._load_dendritic_memory() | |
| # 座標自動推定器の初期化 | |
| self.coordinate_estimator = CoordinateEstimator() | |
| # .iathライターの初期化 | |
| iath_file_path = os.getenv("IATH_DB_PATH", "knowledge_base.iath") | |
| self.iath_writer = IathWriter(iath_file_path) | |
| # LLMプロバイダーを初期化 | |
| self.providers: Dict[ModelProvider, Any] = { | |
| ModelProvider.HUGGINGFACE: HuggingFaceProvider(), | |
| ModelProvider.OLLAMA: OllamaProvider(), | |
| ModelProvider.MLX: MLXProvider(), | |
| ModelProvider.GGUF: GGUFProvider(), | |
| } | |
| # 初期ロード時にデフォルトモデルを設定 | |
| self._load_all_engines() # Load all configured models into managed_engines | |
| self._load_default_models() # This will now also try to load active master/apprentice from config | |
| self._initialized = True | |
| logger.info("ModelRouter initialized.") | |
| def _load_dendritic_memory(self): | |
| """樹木型空間記憶(.iathファイル)をロードする""" | |
| try: | |
| # TODO: ドメインごとに異なる.iathファイルを使用する | |
| # 現在はデフォルトのパスを使用 | |
| import os | |
| iath_file_path = os.getenv("IATH_DB_PATH", "knowledge_base.iath") | |
| if os.path.exists(iath_file_path): | |
| self.dendritic_memory = DendriticMemorySpace(iath_file_path) | |
| stats = self.dendritic_memory.get_statistics() | |
| logger.info(f"Dendritic memory loaded: {stats['total_tiles']} tiles from {iath_file_path}") | |
| else: | |
| logger.warning(f".iath file not found: {iath_file_path}. Starting with empty memory.") | |
| self.dendritic_memory = None | |
| except Exception as e: | |
| logger.error(f"Failed to load dendritic memory: {e}") | |
| self.dendritic_memory = None | |
| def _get_any_available_model(self) -> Optional[ModelConfig]: | |
| """ | |
| 利用可能なモデルを1つ取得する(座標推定などで使用) | |
| 優先順位: | |
| 1. 師匠モデル | |
| 2. 設定された最初のモデル | |
| Returns: | |
| ModelConfig or None | |
| """ | |
| if self.master_model: | |
| return self.master_model | |
| # 設定から最初のモデルを取得 | |
| for model_id, model_config in self.config_manager.models.items(): | |
| return model_config | |
| return None | |
| def _load_all_engines(self): | |
| """設定ファイルから全てのモデルをロードし、managed_enginesを初期化する""" | |
| self.managed_engines = {} | |
| for model_id, model_config in self.config_manager.models.items(): | |
| self.managed_engines[model_id] = { | |
| "config": model_config.dict(), | |
| "status": "available", # Default status | |
| "unique_id": None # Only for apprentices | |
| } | |
| logger.info(f"Loaded {len(self.managed_engines)} engines into management.") | |
| def _update_engine_status(self, model_id: str, status: str, unique_id: Optional[str] = None): | |
| """managed_engines内のエンジンのステータスを更新するヘルパー""" | |
| if model_id in self.managed_engines: | |
| self.managed_engines[model_id]["status"] = status | |
| self.managed_engines[model_id]["unique_id"] = unique_id | |
| else: | |
| logger.warning(f"Attempted to update status for unknown engine: {model_id}") | |
| def _load_default_models(self): | |
| """設定からデフォルトの師匠モデルと弟子モデルをロードする""" | |
| self.active_domain_id = self.config_manager.get_active_domain_id() | |
| # ConfigManagerから永続化されたアクティブな師匠・弟子モデルIDをロード | |
| persisted_master_id = self.config_manager.get_null_ai_setting("active_master_id") | |
| persisted_apprentice_id = self.config_manager.get_null_ai_setting("active_apprentice_id") | |
| # 師匠モデルの設定 | |
| if persisted_master_id: | |
| master_config = self.config_manager.get_model_config(persisted_master_id) | |
| if master_config: | |
| self.master_model = master_config | |
| self._update_engine_status(self.master_model.model_id, "master") | |
| logger.info(f"Persisted master model loaded: {self.master_model.display_name}") | |
| else: | |
| logger.warning(f"Persisted master model '{persisted_master_id}' not found in configuration. Attempting to set default.") | |
| self._set_initial_master_from_config() # 永続化されたモデルが見つからない場合はデフォルトを設定 | |
| else: | |
| self._set_initial_master_from_config() # 永続化されたマスターIDがない場合はデフォルトを設定 | |
| # 弟子モデルの設定 | |
| if persisted_apprentice_id: | |
| apprentice_config = self.config_manager.get_model_config(persisted_apprentice_id) | |
| if apprentice_config: | |
| self.apprentice_model = apprentice_config | |
| # unique_idもconfigからロードする | |
| apprentice_unique_id = self.config_manager.get_null_ai_setting(f"apprentice_unique_id_{persisted_apprentice_id}") | |
| self._update_engine_status(self.apprentice_model.model_id, "apprentice", apprentice_unique_id) | |
| logger.info(f"Persisted apprentice model loaded: {self.apprentice_model.display_name}") | |
| else: | |
| logger.warning(f"Persisted apprentice model '{persisted_apprentice_id}' not found in configuration. Clearing active apprentice.") | |
| self.apprentice_model = None | |
| self.config_manager.set_null_ai_setting("active_apprentice_id", None) | |
| else: | |
| self.apprentice_model = None | |
| self.config_manager.set_null_ai_setting("active_apprentice_id", None) | |
| def _set_initial_master_from_config(self): | |
| """設定からデフォルトの師匠モデルをロード(persistedがない場合や見つからない場合)""" | |
| default_master_config = self.config_manager.get_default_model_config(domain_id=self.active_domain_id) | |
| if default_master_config: | |
| self.master_model = default_master_config | |
| self._update_engine_status(self.master_model.model_id, "master") | |
| self.config_manager.set_null_ai_setting("active_master_id", self.master_model.model_id) | |
| logger.info(f"Default master model loaded for domain '{self.active_domain_id}': {self.master_model.display_name}") | |
| else: | |
| logger.warning(f"No default master model found for domain '{self.active_domain_id}' in configuration. Master model remains unset.") | |
| self.master_model = None | |
| self.config_manager.set_null_ai_setting("active_master_id", None) | |
| def set_active_domain_id(self, domain_id: str): | |
| """アクティブなドメインIDを設定し、それに応じてモデルを再ロードする""" | |
| if self.active_domain_id != domain_id: | |
| self.active_domain_id = domain_id | |
| self._load_default_models() # アクティブドメイン変更時は師匠・弟子モデルも再設定 | |
| logger.info(f"ModelRouter active domain set to {domain_id} and models reloaded.") | |
| def set_master_model(self, model_id: str) -> bool: | |
| """師匠モデルを設定する""" | |
| model = self.config_manager.get_model_config(model_id) | |
| if model: | |
| # 古い師匠を'retired'に戻す (ただし、それがまさに今昇格している弟子ではない場合) | |
| if self.master_model and self.master_model.model_id != model_id: | |
| # 昇格の場合、古い師匠はretiredにする | |
| self._update_engine_status(self.master_model.model_id, "retired") | |
| logger.info(f"Old master model '{self.master_model.display_name}' set to 'retired'.") | |
| self.master_model = model | |
| self._update_engine_status(model_id, "master") | |
| self.config_manager.set_null_ai_setting("active_master_id", model_id) | |
| logger.info(f"Master model set to: {model.display_name}") | |
| return True | |
| logger.error(f"Model with ID '{model_id}' not found for master setting.") | |
| return False | |
| def set_apprentice_model(self, model_id: Optional[str]) -> bool: | |
| """弟子モデルを設定する (Noneでクリア)""" | |
| # 古い弟子を'available'に戻す | |
| if self.apprentice_model: | |
| self._update_engine_status(self.apprentice_model.model_id, "available") | |
| if model_id is None or model_id == 'none': | |
| self.apprentice_model = None | |
| self.config_manager.set_null_ai_setting("active_apprentice_id", None) | |
| logger.info("Apprentice model cleared.") | |
| return True | |
| model = self.config_manager.get_model_config(model_id) | |
| if model: | |
| self.apprentice_model = model | |
| # For apprentices, we need to ensure unique_id is tracked if this is a named apprentice | |
| apprentice_unique_id = self.managed_engines[model_id].get("unique_id") # Get existing unique_id | |
| self._update_engine_status(model_id, "apprentice", apprentice_unique_id) | |
| self.config_manager.set_null_ai_setting("active_apprentice_id", model_id) | |
| logger.info(f"Apprentice model set to: {model.display_name}") | |
| return True | |
| logger.error(f"Model with ID '{model_id}' not found for apprentice setting.") | |
| return False | |
| def get_all_managed_engines(self) -> List[Dict[str, Any]]: | |
| """管理している全てのエンジンとそのステータス、ユニークIDを含むリストを返す""" | |
| return list(self.managed_engines.values()) | |
| def get_master_model(self) -> Optional[ModelConfig]: | |
| """現在の師匠モデルを取得する""" | |
| return self.master_model | |
| def get_apprentice_model(self) -> Optional[ModelConfig]: | |
| """現在の弟子モデルを取得する""" | |
| return self.apprentice_model | |
| def get_model_config(self, model_id: str) -> Optional[ModelConfig]: | |
| """指定されたmodel_idのモデル設定を取得する""" | |
| return self.config_manager.models.get(model_id) | |
| def swap_engines(self, apprentice_model_id: str) -> bool: | |
| """師匠と指定した弟子を入れ替える""" | |
| if not self.master_model: | |
| logger.error("Cannot swap: No master model is currently set.") | |
| return False | |
| apprentice_candidate = self.config_manager.get_model_config(apprentice_model_id) | |
| if not apprentice_candidate: | |
| logger.error(f"Cannot swap: Apprentice model '{apprentice_model_id}' not found.") | |
| return False | |
| # 現在の師匠と弟子の情報を取得 | |
| old_master_model = self.master_model | |
| # 新しい師匠は指定された弟子 | |
| new_master_model = apprentice_candidate | |
| # 新しい弟子は古い師匠 (ただし、古い師匠が現在のアプレンティス候補でなければ) | |
| new_apprentice_model = old_master_model if old_master_model.model_id != apprentice_model_id else None | |
| # ロールを入れ替える | |
| self.master_model = new_master_model | |
| self.apprentice_model = new_apprentice_model | |
| # ステータスと永続化を更新 | |
| self._update_engine_status(new_master_model.model_id, "master", self.managed_engines[new_master_model.model_id].get("unique_id")) | |
| self.config_manager.set_null_ai_setting("active_master_id", new_master_model.model_id) | |
| if new_apprentice_model: | |
| self._update_engine_status(new_apprentice_model.model_id, "apprentice", self.managed_engines[new_apprentice_model.model_id].get("unique_id")) | |
| self.config_manager.set_null_ai_setting("active_apprentice_id", new_apprentice_model.model_id) | |
| else: | |
| self.config_manager.set_null_ai_setting("active_apprentice_id", None) | |
| # 古いアプレンティスが指定されたアプレンティスとは異なる場合、そのステータスをavailableに戻す | |
| if old_master_model and old_master_model.model_id != new_master_model.model_id: | |
| self._update_engine_status(old_master_model.model_id, "available") # Old master becomes available if not the new apprentice | |
| logger.info(f"Engines swapped: New Master is {new_master_model.display_name}, New Apprentice is {new_apprentice_model.display_name if new_apprentice_model else 'None'}") | |
| return True | |
| def promote_apprentice(self, apprentice_model_id: str) -> bool: | |
| """指定した弟子を師匠に昇格させ、現在の師匠を引退させる""" | |
| apprentice_to_promote = self.config_manager.get_model_config(apprentice_model_id) | |
| if not apprentice_to_promote: | |
| logger.error(f"Cannot promote: Apprentice model '{apprentice_model_id}' not found.") | |
| return False | |
| # 現在の師匠を引退させる | |
| if self.master_model: | |
| self._update_engine_status(self.master_model.model_id, "retired") | |
| logger.info(f"Current master model '{self.master_model.display_name}' retired.") | |
| # 弟子を師匠に昇格させる | |
| self.master_model = apprentice_to_promote | |
| self._update_engine_status(apprentice_to_promote.model_id, "master", self.managed_engines[apprentice_to_promote.model_id].get("unique_id")) | |
| self.config_manager.set_null_ai_setting("active_master_id", apprentice_to_promote.model_id) | |
| # 弟子モデルのロールをクリア (昇格したため) | |
| if self.apprentice_model and self.apprentice_model.model_id == apprentice_model_id: | |
| self.apprentice_model = None | |
| self.config_manager.set_null_ai_setting("active_apprentice_id", None) | |
| logger.info(f"Apprentice model '{apprentice_to_promote.display_name}' promoted to master and apprentice role cleared.") | |
| else: | |
| logger.warning(f"Apprentice '{apprentice_model_id}' was promoted, but was not the currently active apprentice.") | |
| logger.info(f"Apprentice '{apprentice_to_promote.display_name}' promoted to Master.") | |
| return True | |
| def create_new_apprentice(self) -> Optional[Dict[str, Any]]: | |
| """新しい「空っぽの弟子」推論エンジンを生成し、登録する""" | |
| new_apprentice_unique_id = str(uuid.uuid4()) | |
| new_apprentice_model_id = f"apprentice-{new_apprentice_unique_id[:8]}" | |
| new_apprentice_display_name = f"Apprentice ({new_apprentice_unique_id[:4]})" | |
| # ユーザーは「文字通り学習データがないもの」と指定。 | |
| # ここでは、汎用的なベースモデルを想定するか、あるいは特定のプロバイダー/モデル名を指定する。 | |
| # 仮に、Ollamaの何らかの軽量モデルをテンプレートとして利用する。 | |
| # または、単にモデル設定のみを生成し、実際のモデルファイルは後で学習時にロードする。 | |
| # NOTE: この "empty" モデルが具体的に何を指すかは、 | |
| # 後続のファインチューニングのロジックによって具体化される必要がある。 | |
| # ここでは、設定上の一エントリとして追加する。 | |
| # TODO: ベースとなる「空っぽ」モデルの設定をnull_ai_config.jsonなどから取得できるようにする | |
| base_apprentice_provider = ModelProvider.OLLAMA # 仮のデフォルトプロバイダー | |
| base_apprentice_model_name = "mistral:latest" # 仮のデフォルトモデル名 | |
| # Configuration for the new apprentice | |
| new_model_config_data = { | |
| "model_id": new_apprentice_model_id, | |
| "display_name": new_apprentice_display_name, | |
| "provider": base_apprentice_provider.value, | |
| "model_name": base_apprentice_model_name, | |
| "max_tokens": 4096, | |
| "temperature": 0.7, | |
| "timeout": 120, | |
| "is_default": False, | |
| "supported_domains": ["general"], # New apprentices start with general knowledge | |
| "description": f"Newly generated empty apprentice model with unique ID {new_apprentice_unique_id}." | |
| } | |
| try: | |
| new_model = self.config_manager.add_model(new_model_config_data) | |
| if not new_model: | |
| raise Exception("Failed to add new model via config_manager.") | |
| self.managed_engines[new_model.model_id] = { | |
| "config": new_model.dict(), | |
| "status": "available", | |
| "unique_id": new_apprentice_unique_id | |
| } | |
| # unique_idも永続化しておく | |
| self.config_manager.set_null_ai_setting(f"apprentice_unique_id_{new_model.model_id}", new_apprentice_unique_id) | |
| logger.info(f"New empty apprentice '{new_apprentice_display_name}' created with ID: {new_apprentice_model_id}") | |
| return self.managed_engines[new_model.model_id] | |
| except Exception as e: | |
| logger.error(f"Failed to create new apprentice: {e}") | |
| return None | |
| def get_active_model(self, for_inference: bool = True) -> Optional[ModelConfig]: | |
| """ | |
| 推論に使用するアクティブなモデルを取得する。 | |
| for_inferenceがTrueの場合、師匠モデルを返す。 | |
| 倒木システムにおける「学習」などの用途で弟子モデルが必要な場合は、 | |
| 直接get_apprentice_modelを使用する。 | |
| """ | |
| if for_inference: | |
| return self.master_model | |
| # 将来的に「成長した」弟子モデルへの自動切り替えロジックが入る可能性 | |
| return self.master_model # デフォルトでは師匠モデルを使用 | |
| async def infer(self, prompt: str, domain_id: str, model_config: ModelConfig, temperature: Optional[float] = None, save_to_memory: bool = False, rag_mode: str = "rag") -> Dict[str, Any]: | |
| """ | |
| 指定されたモデルで推論を実行する。 | |
| RAG: DBに知識があればそれを使用、なければAI内部知識で推論してDBに蓄積。 | |
| rag_modeに応じて動作を切り替える。 | |
| """ | |
| # rag_mode == 'direct' の場合の処理 | |
| if rag_mode == "direct": | |
| relevant_knowledge = self._retrieve_relevant_knowledge(domain_id, prompt, top_k=1) | |
| if relevant_knowledge: | |
| # DBから直接回答を返す | |
| best_match = relevant_knowledge[0] | |
| logger.info(f"Direct mode: DB knowledge found. Returning direct answer from tile {best_match['id']}.") | |
| return { | |
| "response": best_match['content'], | |
| "thinking": f"Directly retrieved from knowledge base. Tile ID: {best_match['id']}.", | |
| "confidence": best_match['confidence_score'], | |
| "model_used": "database_direct", | |
| "latency_ms": 50, # DB検索なので高速 | |
| "source_type": "db_direct" | |
| } | |
| else: | |
| # DBに情報がない場合は「わかりません」と返す | |
| logger.info("Direct mode: No DB knowledge found. Returning 'Not found'.") | |
| return { | |
| "response": "ご指定の情報はナレッジベース内に見つかりませんでした。", | |
| "thinking": "Directly searched knowledge base, but no relevant information was found.", | |
| "confidence": 0.9, # 「見つからない」という回答には高い信頼度を与える | |
| "model_used": "database_direct", | |
| "latency_ms": 50, | |
| "source_type": "db_direct" | |
| } | |
| # rag_mode == 'rag' の場合の処理 (既存のロジック) | |
| has_db_knowledge = self._check_db_knowledge(domain_id, prompt) | |
| source_type = "db_augmented" if has_db_knowledge else "ai_internal_weights" | |
| thinking_process = "" | |
| augmented_prompt = prompt | |
| if has_db_knowledge: | |
| # DBから関連知識を取得 | |
| relevant_knowledge = self._retrieve_relevant_knowledge(domain_id, prompt, top_k=3) | |
| logger.info(f"RAG mode: DB knowledge found for domain '{domain_id}'. Retrieved {len(relevant_knowledge)} relevant tiles.") | |
| # RAG: プロンプトに知識を統合 | |
| knowledge_context = "\n\n".join([ | |
| f"[Knowledge {i+1} - {tile['verification_type']} verification, confidence: {tile['confidence_score']}]\n" | |
| f"Topic: {tile['topic']}\n" | |
| f"Content: {tile['content']}" | |
| for i, tile in enumerate(relevant_knowledge) | |
| ]) | |
| augmented_prompt = f"""Based on the following verified knowledge from the database: | |
| {knowledge_context} | |
| Now, please answer the following question accurately: | |
| {prompt}""" | |
| response = await self._perform_llm_inference(model_config, augmented_prompt, temperature or model_config.temperature) | |
| thinking_process = f"Accessed knowledge base. Retrieved {len(relevant_knowledge)} relevant knowledge tiles. " + response.get("thinking", "") | |
| else: | |
| # DBに知識がない場合、AI内部知識で推論 | |
| logger.info(f"RAG mode: No DB knowledge found for domain '{domain_id}'. Using AI internal weights.") | |
| response = await self._perform_llm_inference(model_config, prompt, temperature or model_config.temperature) | |
| thinking_process = "No specific DB knowledge found. Inferred from AI internal weights. " + response.get("thinking", "") | |
| # 自己拡充: 推論結果をDBに保存 | |
| if save_to_memory and response.get("confidence", 0) >= 0.7: | |
| await self._save_inference_to_db( | |
| domain_id=domain_id, | |
| prompt=prompt, | |
| response=response.get("response", ""), | |
| confidence=response.get("confidence", 0.7), | |
| source_type="ai", | |
| model_id=model_config.model_id | |
| ) | |
| # 倒木システム: 師匠の出力を教師データとして保存 | |
| is_master = (self.master_model and model_config.model_id == self.master_model.model_id) | |
| logger.info(f"[Training Data Check] master_model={self.master_model.model_id if self.master_model else None}, " | |
| f"current_model={model_config.model_id}, is_master={is_master}, " | |
| f"confidence={response.get('confidence', 0)}, save_to_memory={save_to_memory}") | |
| if is_master and response.get("confidence", 0) >= 0.8: | |
| logger.info(f"[Training Data] Saving master output for domain '{domain_id}'...") | |
| await self._save_master_output_as_training_data( | |
| prompt=prompt, | |
| master_response=response.get("response", ""), | |
| domain_id=domain_id, | |
| confidence=response.get("confidence", 0.8) | |
| ) | |
| else: | |
| logger.info(f"[Training Data] NOT saving: is_master={is_master}, confidence={response.get('confidence', 0)}") | |
| return { | |
| "response": response.get("response", ""), | |
| "thinking": thinking_process, | |
| "confidence": response.get("confidence", 0.7), | |
| "model_used": model_config.model_id, | |
| "latency_ms": response.get("latency_ms", 0), | |
| "source_type": source_type | |
| } | |
| async def infer_streaming(self, prompt: str, domain_id: str, model_config: ModelConfig, temperature: Optional[float] = None, save_to_memory: bool = False, rag_mode: str = "rag") -> AsyncGenerator[Dict[str, Any], None]: | |
| print(f"DEBUG: infer_streaming called - model={model_config.model_id if model_config else 'None'}") | |
| logger.error(f"[CRITICAL DEBUG] infer_streaming ENTRY POINT - model={model_config.model_id if model_config else 'None'}, save_to_memory={save_to_memory}") | |
| """ | |
| 指定されたモデルで推論をストリーミングで実行する。 | |
| RAG: DBに知識があればそれを使用、なければAI内部知識で推論してDBに蓄積。 | |
| rag_modeに応じて動作を切り替える。 | |
| """ | |
| logger.info(f"[INFER_STREAMING] Called with model={model_config.model_id}, domain={domain_id}, save_to_memory={save_to_memory}") | |
| # 師匠の応答を蓄積するための変数(finallyブロックで使用) | |
| generated_response = "" | |
| try: | |
| # rag_mode == 'direct' の場合の処理 | |
| if rag_mode == "direct": | |
| relevant_knowledge = self._retrieve_relevant_knowledge(domain_id, prompt, top_k=1) | |
| if relevant_knowledge: | |
| best_match = relevant_knowledge[0] | |
| logger.info(f"Direct mode (streaming): DB knowledge found. Yielding direct answer from tile {best_match['id']}.") | |
| yield {"type": "token", "content": best_match['content']} | |
| yield {"type": "meta", "source_type": "db_direct", "thinking": f"Directly retrieved from knowledge base. Tile ID: {best_match['id']}.", "confidence": best_match['confidence_score'], "model_used": "database_direct"} | |
| yield {"type": "complete", "content": best_match['content']} | |
| return | |
| else: | |
| logger.info("Direct mode (streaming): No DB knowledge found. Yielding 'Not found'.") | |
| response_text = "ご指定の情報はナレッジベース内に見つかりませんでした。" | |
| yield {"type": "token", "content": response_text} | |
| yield {"type": "meta", "source_type": "db_direct", "thinking": "Directly searched knowledge base, but no relevant information was found.", "confidence": 0.9, "model_used": "database_direct"} | |
| yield {"type": "complete", "content": response_text} | |
| return | |
| # rag_mode == 'rag' の場合の処理 (既存のロジック) | |
| has_db_knowledge = self._check_db_knowledge(domain_id, prompt) | |
| source_type = "db_augmented" if has_db_knowledge else "ai_internal_weights" | |
| augmented_prompt = prompt | |
| yield {"type": "thinking", "content": f"Checking DB knowledge for domain '{domain_id}'..."} | |
| await asyncio.sleep(0.1) | |
| if has_db_knowledge: | |
| # DBから関連知識を取得 | |
| relevant_knowledge = self._retrieve_relevant_knowledge(domain_id, prompt, top_k=3) | |
| yield {"type": "thinking", "content": f"Relevant DB knowledge found. Retrieved {len(relevant_knowledge)} tiles. Augmenting prompt..."} | |
| # RAG: プロンプトに知識を統合 | |
| knowledge_context = "\n\n".join([ | |
| f"[Knowledge {i+1} - {tile['verification_type']} verification, confidence: {tile['confidence_score']}]\n" | |
| f"Topic: {tile['topic']}\n" | |
| f"Content: {tile['content']}" | |
| for i, tile in enumerate(relevant_knowledge) | |
| ]) | |
| augmented_prompt = f"""Based on the following verified knowledge from the database: | |
| {knowledge_context} | |
| Now, please answer the following question accurately: | |
| {prompt}""" | |
| # ストリーミング推論実行 | |
| async for chunk in self._perform_llm_streaming_inference(model_config, augmented_prompt, temperature or model_config.temperature): | |
| if chunk.get("type") == "token": | |
| generated_response += chunk.get("content", "") | |
| yield chunk | |
| yield {"type": "meta", "source_type": source_type, "thinking": f"Accessed knowledge base. Retrieved {len(relevant_knowledge)} relevant knowledge tiles."} | |
| # 倒木システム: 師匠の出力を教師データとして保存 (DB知識使用時も保存) | |
| logger.error(f"[CRITICAL DEBUG] Reached training data save section (with DB knowledge)! generated_response length={len(generated_response)}") | |
| is_master = (self.master_model and model_config.model_id == self.master_model.model_id) | |
| logger.info(f"[Training Data Check - DB Augmented] master_model={self.master_model.model_id if self.master_model else None}, " | |
| f"current_model={model_config.model_id}, is_master={is_master}, " | |
| f"response_length={len(generated_response)}, save_to_memory={save_to_memory}") | |
| if is_master and len(generated_response) > 0: | |
| logger.info(f"[Training Data - DB Augmented] Saving master output for domain '{domain_id}'...") | |
| await self._save_master_output_as_training_data( | |
| prompt=prompt, | |
| master_response=generated_response, | |
| domain_id=domain_id, | |
| confidence=0.8 | |
| ) | |
| else: | |
| logger.info(f"[Training Data - DB Augmented] NOT saving: is_master={is_master}, response_length={len(generated_response)}") | |
| else: | |
| yield {"type": "thinking", "content": "No specific DB knowledge found. Using AI internal weights."} | |
| # ストリーミング推論実行 | |
| async for chunk in self._perform_llm_streaming_inference(model_config, prompt, temperature or model_config.temperature): | |
| if chunk.get("type") == "token": | |
| generated_response += chunk.get("content", "") | |
| yield chunk | |
| yield {"type": "meta", "source_type": source_type, "thinking": "Inferred from AI internal weights."} | |
| # 倒木システム: 師匠の出力を教師データとして保存 | |
| logger.error(f"[CRITICAL DEBUG] Reached training data save section! generated_response length={len(generated_response)}") | |
| is_master = (self.master_model and model_config.model_id == self.master_model.model_id) | |
| logger.info(f"[Training Data Check - Streaming] master_model={self.master_model.model_id if self.master_model else None}, " | |
| f"current_model={model_config.model_id}, is_master={is_master}, " | |
| f"response_length={len(generated_response)}, save_to_memory={save_to_memory}") | |
| if is_master and len(generated_response) > 0: | |
| logger.info(f"[Training Data - Streaming] Saving master output for domain '{domain_id}'...") | |
| await self._save_master_output_as_training_data( | |
| prompt=prompt, | |
| master_response=generated_response, | |
| domain_id=domain_id, | |
| confidence=0.8 | |
| ) | |
| else: | |
| logger.info(f"[Training Data - Streaming] NOT saving: is_master={is_master}, response_length={len(generated_response)}") | |
| # 自己拡充: 推論結果をDBに保存(信頼度が十分な場合) | |
| if save_to_memory and len(generated_response) > 0: | |
| await self._save_inference_to_db( | |
| domain_id=domain_id, | |
| prompt=prompt, | |
| response=generated_response, | |
| confidence=0.75, # ストリーミングなので固定値 | |
| source_type="ai", | |
| model_id=model_config.model_id | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error during streaming inference: {e}") | |
| yield {"type": "error", "content": str(e)} | |
| def _check_db_knowledge(self, domain_id: str, prompt: str) -> bool: | |
| """ | |
| 指定されたドメインとプロンプトに関連する知識があるかチェックする。 | |
| 樹木型空間記憶(.iath)を使用。 | |
| """ | |
| try: | |
| if self.dendritic_memory is None: | |
| return False | |
| # テキスト検索でマッチがあるかチェック | |
| results = self.dendritic_memory.search_by_text(prompt[:200], top_k=1) | |
| return len(results) > 0 | |
| except Exception as e: | |
| logger.error(f"Error checking dendritic memory: {e}") | |
| return False | |
| def _retrieve_relevant_knowledge(self, domain_id: str, prompt: str, top_k: int = 3) -> list: | |
| """ | |
| 指定されたドメインとプロンプトに関連する知識を取得する。 | |
| 1. まずSQLデータベースから検索 | |
| 2. 見つからなければ樹木型空間記憶(.iath)から検索 | |
| """ | |
| results = [] | |
| # 1. SQLデータベースから検索 | |
| try: | |
| from backend.app.database.session import SessionLocal | |
| db = SessionLocal() | |
| try: | |
| # キーワード検索でSQLデータベースから関連する知識タイルを取得 | |
| tiles_orm, total_count = self.knowledge_service.list_tiles( | |
| db=db, page=1, page_size=top_k, | |
| domain_id=domain_id, search=prompt | |
| ) | |
| if tiles_orm: | |
| logger.info(f"Found {len(tiles_orm)} tiles from SQL database for query: '{prompt[:50]}...'") | |
| # SQLデータベースの結果を統一フォーマットに変換 | |
| for tile in tiles_orm: | |
| results.append({ | |
| "id": tile.id, | |
| "topic": tile.topic, | |
| "content": tile.content, | |
| "thinking_process": "", # SQLにthinking_processは別に保存されていない | |
| "confidence_score": tile.confidence_score if tile.confidence_score else 0.5, | |
| "verification_type": tile.verification_type or "community", | |
| "coordinates": tile.coordinates if tile.coordinates else [], | |
| "text_match_score": 0.8, # SQL検索なので固定値 | |
| "spatial_distance": None | |
| }) | |
| finally: | |
| db.close() | |
| if results: | |
| return results[:top_k] | |
| except Exception as e: | |
| logger.warning(f"SQL database search failed: {e}. Falling back to .iath memory.") | |
| # 2. SQLデータベースで見つからない場合、.iathメモリから検索 | |
| try: | |
| if self.dendritic_memory is None: | |
| logger.warning("No dendritic memory available and SQL search failed") | |
| return [] | |
| # ハイブリッド検索(テキスト + 空間座標) | |
| iath_results = self.dendritic_memory.hybrid_search(prompt, query_coords=None, top_k=top_k) | |
| # 統一フォーマットに変換 | |
| results = [ | |
| { | |
| "id": tile["metadata"]["knowledge_id"], | |
| "topic": tile["metadata"]["topic"], | |
| "content": tile["content"]["final_response"], | |
| "thinking_process": tile["content"]["thinking_process"], | |
| "confidence_score": tile["verification"]["initial_certainty"], | |
| "verification_type": tile["verification"]["status"], | |
| "coordinates": tile["coordinates"], | |
| "text_match_score": tile.get("text_match_score", 0), | |
| "spatial_distance": tile.get("spatial_distance", None) | |
| } | |
| for tile in iath_results | |
| ] | |
| if results: | |
| logger.info(f"Found {len(results)} tiles from .iath memory for query: '{prompt[:50]}...'") | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error retrieving relevant knowledge from both SQL and .iath: {e}", exc_info=True) | |
| return [] | |
| async def _perform_llm_inference(self, model_config: ModelConfig, prompt: str, temperature: float) -> Dict[str, Any]: | |
| """ | |
| 指定されたモデル設定でLLM推論を実行する。 | |
| 各プロバイダーの実装を使用。 | |
| """ | |
| provider_type = model_config.provider | |
| if provider_type not in self.providers: | |
| raise ValueError(f"Unsupported provider: {provider_type}") | |
| provider = self.providers[provider_type] | |
| try: | |
| result = await provider.infer(model_config, prompt, temperature) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error during LLM inference with provider '{provider_type}': {e}") | |
| raise | |
| async def _perform_llm_streaming_inference(self, model_config: ModelConfig, prompt: str, temperature: float) -> AsyncGenerator[Dict[str, Any], None]: | |
| """ | |
| 指定されたモデル設定でLLMストリーミング推論を実行する。 | |
| 各プロバイダーの実装を使用。 | |
| """ | |
| provider_type = model_config.provider | |
| if provider_type not in self.providers: | |
| raise ValueError(f"Unsupported provider: {provider_type}") | |
| provider = self.providers[provider_type] | |
| try: | |
| async for chunk in provider.infer_streaming(model_config, prompt, temperature): | |
| yield chunk | |
| except Exception as e: | |
| logger.error(f"Error during LLM streaming inference with provider '{provider_type}': {e}") | |
| yield {"type": "error", "content": str(e)} | |
| async def _save_inference_to_db(self, domain_id: str, prompt: str, response: str, confidence: float, source_type: str, model_id: str) -> bool: | |
| """ | |
| 推論結果をKnowledge TileとしてDBに保存する(自己拡充)。 | |
| Priority 2実装: 座標自動推定 + .iath保存 | |
| """ | |
| try: | |
| from backend.app.database.models import KnowledgeTile | |
| from backend.app.database.session import SessionLocal # 遅延インポート | |
| db = SessionLocal() | |
| tile_id = f"ai_ktile_{uuid.uuid4().hex}" | |
| # AI生成の知識タイルを作成(SQLite) | |
| new_tile = KnowledgeTile( | |
| id=tile_id, | |
| workspace_id="default_workspace", # ローカル版なのでデフォルトワークスペース | |
| domain_id=domain_id, | |
| topic=prompt[:200], # プロンプトをトピックとして使用 | |
| content=response, | |
| confidence_score=confidence, | |
| verification_type="ai", # AI生成を示す | |
| verification_count=1, | |
| contributor_id=None, # AIなのでcontributorなし | |
| last_verified_by_id=None | |
| ) | |
| db.add(new_tile) | |
| db.commit() | |
| db.close() | |
| logger.info(f"Saved AI inference to SQLite: {new_tile.id}") | |
| # Priority 2: 座標自動推定 + .iath保存 | |
| try: | |
| # 座標推定用のLLM推論関数を作成 | |
| async def llm_inference_for_coords(coord_prompt): | |
| # 師匠モデル(またはDeepSeek)を使って座標推定 | |
| estimation_model = self.master_model if self.master_model else self._get_any_available_model() | |
| if estimation_model: | |
| result = await self._perform_llm_inference( | |
| estimation_model, | |
| coord_prompt, | |
| temperature=0.3 # 低温度で一貫性を保つ | |
| ) | |
| return result.get("response", "") | |
| return "{\"coordinates\": [0.5, 0.5, 0.5, 0.5, 0.5, 0.5], \"confidence\": 0.3}" | |
| # 座標を推定 | |
| coord_result = await self.coordinate_estimator.estimate_coordinates( | |
| prompt=prompt, | |
| response=response, | |
| domain_id=domain_id, | |
| llm_inference_func=llm_inference_for_coords, | |
| use_reasoning=False # 高速化のため推論過程は省略 | |
| ) | |
| # .iath Tileオブジェクトを作成 | |
| iath_tile = create_tile_from_ai_output( | |
| knowledge_id=tile_id, | |
| topic=prompt[:100], | |
| prompt=prompt, | |
| response=response, | |
| coordinates=coord_result["coordinates"], | |
| confidence=confidence, | |
| domain_id=domain_id, | |
| source="ai_generated" | |
| ) | |
| # .iathファイルに保存 | |
| success = self.iath_writer.append_tile(iath_tile) | |
| if success: | |
| logger.info(f"Saved AI inference to .iath with coordinates: {coord_result['coordinates']}") | |
| # メモリをリロード(新しいタイルを検索可能にする) | |
| self._load_dendritic_memory() | |
| else: | |
| logger.warning(f"Failed to save to .iath, but SQLite save succeeded") | |
| except Exception as iath_error: | |
| logger.error(f"Error saving to .iath (SQLite save succeeded): {iath_error}") | |
| # .iath保存失敗してもSQLite保存は成功しているのでTrueを返す | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error saving inference to DB: {e}") | |
| return False | |
| async def _save_master_output_as_training_data(self, prompt: str, master_response: str, domain_id: str, confidence: float) -> bool: | |
| """ | |
| 師匠の出力を弟子のファインチューニング用教師データとして保存する。 | |
| 倒木システムの核心機能。 | |
| """ | |
| try: | |
| import json | |
| import os | |
| from datetime import datetime | |
| logger.info(f"[Save Training Data] Starting to save master output (domain={domain_id}, confidence={confidence})") | |
| # トレーニングデータ保存ディレクトリ | |
| training_data_dir = "training_data/master_outputs" | |
| logger.info(f"[Save Training Data] Creating directory: {training_data_dir}") | |
| os.makedirs(training_data_dir, exist_ok=True) | |
| # Alpaca形式で保存 | |
| training_example = { | |
| "instruction": f"You are an expert in {domain_id}. Provide accurate information based on verified knowledge.", | |
| "input": prompt, | |
| "output": master_response, | |
| "metadata": { | |
| "domain_id": domain_id, | |
| "confidence": confidence, | |
| "master_model_id": self.master_model.model_id if self.master_model else "unknown", | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "source": "master_output" | |
| } | |
| } | |
| # JSONLファイルに追記 | |
| output_file = os.path.join(training_data_dir, f"master_outputs_{domain_id}.jsonl") | |
| logger.info(f"[Save Training Data] Writing to file: {output_file}") | |
| with open(output_file, 'a', encoding='utf-8') as f: | |
| f.write(json.dumps(training_example, ensure_ascii=False) + '\n') | |
| logger.info(f"✓ Successfully saved master output as training data: {output_file}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"✗ Error saving master output as training data: {e}", exc_info=True) | |
| return False | |