nullai-knowledge-system / backend /app /services /inference_service.py
kofdai's picture
Deploy NullAI Knowledge System to Spaces
075a2b6 verified
"""
NullAI 推論サービス
ModelRouterを介して、NullAIのコア機能(RAG、ストリーミング、自己拡充など)を提供するサービス。
"""
import asyncio
import json
from typing import AsyncGenerator, Dict, Any, Optional
from fastapi import Depends
from datetime import datetime
import sys
import os
import logging
import threading
# プロジェクトルートをsys.pathに追加
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../..')))
from backend.app.config import settings, app_model_router, app_db_router
from backend.app.services.cache_service import CacheService, get_cache_service
logger = logging.getLogger(__name__)
class InferenceService:
"""推論サービス"""
def __init__(self, cache_service: CacheService):
self.cache = cache_service
self.engine = app_model_router # ModelRouterのインスタンスを使用
self.db_router = app_db_router # DBRouterのインスタンスを使用
self.inference_history = None # NurseLog System (currently disabled)
self._succession_manager = None
async def process_question(
self,
question: str,
user_id: str,
session_id: str,
domain_id: str,
model_id: Optional[str] = None,
temperature: Optional[float] = None,
rag_mode: str = "rag"
) -> Dict[str, Any]:
"""
質問を処理して回答を生成(非ストリーミング)
"""
cache_key = f"inference:{domain_id}:{rag_mode}:{hash(question)}"
cached_result = await self.cache.get(cache_key)
if cached_result:
logger.info(f"Cache hit for question: {question[:50]}...")
return json.loads(cached_result)
logger.info(f"Processing question: {question[:50]}... (domain={domain_id}, mode={rag_mode})")
try:
active_model = self.engine.get_model_config(model_id) if model_id else self.engine.get_active_model()
if not active_model:
raise ValueError("No active model available for inference.")
result = await self.engine.infer(
prompt=question,
domain_id=domain_id,
model_config=active_model,
temperature=temperature,
save_to_memory=True, # 自己拡充を有効化
rag_mode=rag_mode
)
response = {
"status": "success",
"answer": result.get("response", ""),
"thinking": result.get("thinking", ""),
"confidence": result.get("confidence", 0.5),
"model_used": result.get("model_used", active_model.model_id),
"source_type": result.get("source_type", "unknown")
}
if response.get("status") == "success":
await self.cache.set(cache_key, json.dumps(response), ttl=3600)
# DBに推論結果を保存
try:
inference_data = {
"question": question,
"response": response["answer"],
"thinking": response["thinking"],
"domain_id": domain_id,
"model_id": response["model_used"],
"confidence": response["confidence"],
"timestamp": datetime.now().isoformat()
}
await self.db_router.save_inference(inference_data)
logger.info(f"Saved inference to DB (domain={domain_id})")
except Exception as db_error:
logger.error(f"Failed to save inference to DB: {db_error}", exc_info=True)
return response
except Exception as e:
logger.error(f"Inference error in process_question: {e}", exc_info=True)
return {
"status": "error",
"error": str(e),
"answer": "An error occurred during inference.",
"confidence": 0.0
}
async def stream_tokens(
self,
session_id: str,
question: str,
domain_id: str,
model_id: Optional[str] = None,
rag_mode: str = "rag",
temperature: Optional[float] = None,
) -> AsyncGenerator[Dict[str, Any], None]:
"""
回答をストリーミングで生成
"""
logger.info(f"Streaming question: {question[:50]}... (domain={domain_id}, mode={rag_mode})")
try:
active_model = self.engine.get_model_config(model_id) if model_id else self.engine.get_active_model()
if not active_model:
raise ValueError("No active model available for streaming inference.")
full_response_content = ""
async for chunk in self.engine.infer_streaming(
prompt=question,
domain_id=domain_id,
model_config=active_model,
temperature=temperature,
save_to_memory=True,
rag_mode=rag_mode,
):
yield chunk
if chunk.get("type") == "token":
full_response_content += chunk.get("content", "")
# 倒木システム: ストリーミング完了後、師匠モデルの出力をトレーニングデータとして保存
logger.info(f"[TRAINING DATA] Stream completed. Checking if should save (model={active_model.model_id})")
is_master = (self.engine.master_model and active_model.model_id == self.engine.master_model.model_id)
if is_master and len(full_response_content) > 0:
logger.info(f"[TRAINING DATA] Saving master output for domain '{domain_id}' ({len(full_response_content)} chars)")
try:
await self.engine._save_master_output_as_training_data(
prompt=question,
master_response=full_response_content,
domain_id=domain_id,
confidence=0.8
)
logger.info(f"[TRAINING DATA] ✓ Successfully saved master output")
except Exception as save_error:
logger.error(f"[TRAINING DATA] ✗ Failed to save: {save_error}", exc_info=True)
else:
logger.info(f"[TRAINING DATA] Not saving: is_master={is_master}, content_length={len(full_response_content)}")
# DBに推論結果を保存(全ての推論を保存)
if len(full_response_content) > 0:
try:
inference_data = {
"question": question,
"response": full_response_content,
"thinking": "", # ストリーミング時はthinkingを分離していないため空
"domain_id": domain_id,
"model_id": active_model.model_id,
"confidence": 0.8,
"timestamp": datetime.now().isoformat()
}
await self.db_router.save_inference(inference_data)
logger.info(f"[DB] ✓ Saved inference to DB (domain={domain_id}, length={len(full_response_content)})")
except Exception as db_error:
logger.error(f"[DB] ✗ Failed to save inference to DB: {db_error}", exc_info=True)
# ストリーミング完了後にキャッシュに保存
# TODO: メタ情報もキャッシュに含める
cache_key = f"inference:{domain_id}:{rag_mode}:{hash(question)}"
# response = {"answer": full_response_content, ...} # メタ情報を含めた完全なレスポンスを構築
# await self.cache.set(cache_key, json.dumps(response), ttl=3600)
except Exception as e:
logger.error(f"Inference error in stream_tokens: {e}", exc_info=True)
yield {
"type": "error",
"error": str(e),
"content": "An error occurred during streaming inference."
}
# Succession-related methods (stubs for NurseLog System)
async def check_succession_status(self) -> Dict[str, Any]:
"""
Check succession status (NurseLog System stub)
"""
return {
"status": "disabled",
"should_trigger": False,
"high_quality_count": 0,
"threshold": 1000,
"message": "NurseLog System is currently disabled"
}
async def trigger_succession(
self,
domain_id: Optional[str] = None,
min_confidence: float = 0.8,
db_path: Optional[str] = None
) -> Dict[str, Any]:
"""
Trigger succession (NurseLog System stub)
"""
return {
"status": "disabled",
"count": 0,
"exports": {},
"db_snapshot": None,
"log": None,
"message": "NurseLog System is currently disabled"
}
def get_succession_manager(self):
"""
Get succession manager (NurseLog System stub)
"""
return self._succession_manager
# 依存性注入用のファクトリ関数
def get_inference_service(
cache_service: CacheService = Depends(get_cache_service)
) -> "InferenceService":
"""推論サービスのシングルトンインスタンスを取得"""
# このファクトリはシングルトンインスタンスを返すようにしたほうが良いが、
# Uvicornのリロードと組み合わせると複雑になるため、リクエストごとに生成する
return InferenceService(cache_service=cache_service)