Spaces:
Sleeping
Sleeping
| """ | |
| NullAI 推論サービス | |
| ModelRouterを介して、NullAIのコア機能(RAG、ストリーミング、自己拡充など)を提供するサービス。 | |
| """ | |
| import asyncio | |
| import json | |
| from typing import AsyncGenerator, Dict, Any, Optional | |
| from fastapi import Depends | |
| from datetime import datetime | |
| import sys | |
| import os | |
| import logging | |
| import threading | |
| # プロジェクトルートをsys.pathに追加 | |
| sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../..'))) | |
| from backend.app.config import settings, app_model_router, app_db_router | |
| from backend.app.services.cache_service import CacheService, get_cache_service | |
| logger = logging.getLogger(__name__) | |
| class InferenceService: | |
| """推論サービス""" | |
| def __init__(self, cache_service: CacheService): | |
| self.cache = cache_service | |
| self.engine = app_model_router # ModelRouterのインスタンスを使用 | |
| self.db_router = app_db_router # DBRouterのインスタンスを使用 | |
| self.inference_history = None # NurseLog System (currently disabled) | |
| self._succession_manager = None | |
| async def process_question( | |
| self, | |
| question: str, | |
| user_id: str, | |
| session_id: str, | |
| domain_id: str, | |
| model_id: Optional[str] = None, | |
| temperature: Optional[float] = None, | |
| rag_mode: str = "rag" | |
| ) -> Dict[str, Any]: | |
| """ | |
| 質問を処理して回答を生成(非ストリーミング) | |
| """ | |
| cache_key = f"inference:{domain_id}:{rag_mode}:{hash(question)}" | |
| cached_result = await self.cache.get(cache_key) | |
| if cached_result: | |
| logger.info(f"Cache hit for question: {question[:50]}...") | |
| return json.loads(cached_result) | |
| logger.info(f"Processing question: {question[:50]}... (domain={domain_id}, mode={rag_mode})") | |
| try: | |
| active_model = self.engine.get_model_config(model_id) if model_id else self.engine.get_active_model() | |
| if not active_model: | |
| raise ValueError("No active model available for inference.") | |
| result = await self.engine.infer( | |
| prompt=question, | |
| domain_id=domain_id, | |
| model_config=active_model, | |
| temperature=temperature, | |
| save_to_memory=True, # 自己拡充を有効化 | |
| rag_mode=rag_mode | |
| ) | |
| response = { | |
| "status": "success", | |
| "answer": result.get("response", ""), | |
| "thinking": result.get("thinking", ""), | |
| "confidence": result.get("confidence", 0.5), | |
| "model_used": result.get("model_used", active_model.model_id), | |
| "source_type": result.get("source_type", "unknown") | |
| } | |
| if response.get("status") == "success": | |
| await self.cache.set(cache_key, json.dumps(response), ttl=3600) | |
| # DBに推論結果を保存 | |
| try: | |
| inference_data = { | |
| "question": question, | |
| "response": response["answer"], | |
| "thinking": response["thinking"], | |
| "domain_id": domain_id, | |
| "model_id": response["model_used"], | |
| "confidence": response["confidence"], | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| await self.db_router.save_inference(inference_data) | |
| logger.info(f"Saved inference to DB (domain={domain_id})") | |
| except Exception as db_error: | |
| logger.error(f"Failed to save inference to DB: {db_error}", exc_info=True) | |
| return response | |
| except Exception as e: | |
| logger.error(f"Inference error in process_question: {e}", exc_info=True) | |
| return { | |
| "status": "error", | |
| "error": str(e), | |
| "answer": "An error occurred during inference.", | |
| "confidence": 0.0 | |
| } | |
| async def stream_tokens( | |
| self, | |
| session_id: str, | |
| question: str, | |
| domain_id: str, | |
| model_id: Optional[str] = None, | |
| rag_mode: str = "rag", | |
| temperature: Optional[float] = None, | |
| ) -> AsyncGenerator[Dict[str, Any], None]: | |
| """ | |
| 回答をストリーミングで生成 | |
| """ | |
| logger.info(f"Streaming question: {question[:50]}... (domain={domain_id}, mode={rag_mode})") | |
| try: | |
| active_model = self.engine.get_model_config(model_id) if model_id else self.engine.get_active_model() | |
| if not active_model: | |
| raise ValueError("No active model available for streaming inference.") | |
| full_response_content = "" | |
| async for chunk in self.engine.infer_streaming( | |
| prompt=question, | |
| domain_id=domain_id, | |
| model_config=active_model, | |
| temperature=temperature, | |
| save_to_memory=True, | |
| rag_mode=rag_mode, | |
| ): | |
| yield chunk | |
| if chunk.get("type") == "token": | |
| full_response_content += chunk.get("content", "") | |
| # 倒木システム: ストリーミング完了後、師匠モデルの出力をトレーニングデータとして保存 | |
| logger.info(f"[TRAINING DATA] Stream completed. Checking if should save (model={active_model.model_id})") | |
| is_master = (self.engine.master_model and active_model.model_id == self.engine.master_model.model_id) | |
| if is_master and len(full_response_content) > 0: | |
| logger.info(f"[TRAINING DATA] Saving master output for domain '{domain_id}' ({len(full_response_content)} chars)") | |
| try: | |
| await self.engine._save_master_output_as_training_data( | |
| prompt=question, | |
| master_response=full_response_content, | |
| domain_id=domain_id, | |
| confidence=0.8 | |
| ) | |
| logger.info(f"[TRAINING DATA] ✓ Successfully saved master output") | |
| except Exception as save_error: | |
| logger.error(f"[TRAINING DATA] ✗ Failed to save: {save_error}", exc_info=True) | |
| else: | |
| logger.info(f"[TRAINING DATA] Not saving: is_master={is_master}, content_length={len(full_response_content)}") | |
| # DBに推論結果を保存(全ての推論を保存) | |
| if len(full_response_content) > 0: | |
| try: | |
| inference_data = { | |
| "question": question, | |
| "response": full_response_content, | |
| "thinking": "", # ストリーミング時はthinkingを分離していないため空 | |
| "domain_id": domain_id, | |
| "model_id": active_model.model_id, | |
| "confidence": 0.8, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| await self.db_router.save_inference(inference_data) | |
| logger.info(f"[DB] ✓ Saved inference to DB (domain={domain_id}, length={len(full_response_content)})") | |
| except Exception as db_error: | |
| logger.error(f"[DB] ✗ Failed to save inference to DB: {db_error}", exc_info=True) | |
| # ストリーミング完了後にキャッシュに保存 | |
| # TODO: メタ情報もキャッシュに含める | |
| cache_key = f"inference:{domain_id}:{rag_mode}:{hash(question)}" | |
| # response = {"answer": full_response_content, ...} # メタ情報を含めた完全なレスポンスを構築 | |
| # await self.cache.set(cache_key, json.dumps(response), ttl=3600) | |
| except Exception as e: | |
| logger.error(f"Inference error in stream_tokens: {e}", exc_info=True) | |
| yield { | |
| "type": "error", | |
| "error": str(e), | |
| "content": "An error occurred during streaming inference." | |
| } | |
| # Succession-related methods (stubs for NurseLog System) | |
| async def check_succession_status(self) -> Dict[str, Any]: | |
| """ | |
| Check succession status (NurseLog System stub) | |
| """ | |
| return { | |
| "status": "disabled", | |
| "should_trigger": False, | |
| "high_quality_count": 0, | |
| "threshold": 1000, | |
| "message": "NurseLog System is currently disabled" | |
| } | |
| async def trigger_succession( | |
| self, | |
| domain_id: Optional[str] = None, | |
| min_confidence: float = 0.8, | |
| db_path: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Trigger succession (NurseLog System stub) | |
| """ | |
| return { | |
| "status": "disabled", | |
| "count": 0, | |
| "exports": {}, | |
| "db_snapshot": None, | |
| "log": None, | |
| "message": "NurseLog System is currently disabled" | |
| } | |
| def get_succession_manager(self): | |
| """ | |
| Get succession manager (NurseLog System stub) | |
| """ | |
| return self._succession_manager | |
| # 依存性注入用のファクトリ関数 | |
| def get_inference_service( | |
| cache_service: CacheService = Depends(get_cache_service) | |
| ) -> "InferenceService": | |
| """推論サービスのシングルトンインスタンスを取得""" | |
| # このファクトリはシングルトンインスタンスを返すようにしたほうが良いが、 | |
| # Uvicornのリロードと組み合わせると複雑になるため、リクエストごとに生成する | |
| return InferenceService(cache_service=cache_service) | |