nullai-knowledge-system / runner_engine.py
kofdai's picture
Deploy NullAI Knowledge System to Spaces
075a2b6 verified
# runner_engine.py
import asyncio
from hot_cache import LRUCache
from web_search_autonomy import WebSearchAutonomySystem
# NOTE: The mock objects previously in this file have been moved to `mock_objects.py`
# for centralized test management. The main `RunnerEngine` class below is the
# actual implementation.
class RunnerEngine:
"""推論実行エンジン(Layer 3)"""
def __init__(self, llm_client, db_interface, web_search_system):
self.llm = llm_client
self.db = db_interface
self.web_search_system = web_search_system
self.hot_cache = LRUCache(max_size=20)
async def _fetch_db_coordinates(self, db_coordinates: list) -> dict:
"""DB座標から知識を取得(ホットキャッシュ利用)"""
results = {}
for coord in db_coordinates[:5]:
if coord in self.hot_cache:
print(f"Cache: Hit for {coord}")
results[coord] = self.hot_cache[coord]
continue
print(f"Cache: Miss for {coord}")
tile = await self.db.fetch_async(coord)
if tile:
self.hot_cache[coord] = tile
results[coord] = tile
return results
def _build_context(self, question: str, db_results: dict, session_context) -> str:
"""LLMプロンプト用のコンテキストを構築"""
context_parts = []
if session_context: # この例では未使用
context_parts.append(f"セッション履歴: {session_context}")
if db_results:
for coord, tile in db_results.items():
context_parts.append(f"【確実性{tile['certainty']}%】{tile['content']}")
return "\n\n".join(context_parts)
def _format_prompt(self, question: str, context: str) -> str:
return f"情報: {context}\n\n質問: {question}\n\n指示: 提供された情報に基づき回答してください。"
async def generate_response_streaming(self, question: str, db_coordinates: list, session_context=None):
"""ストリーミング形式での回答生成と動的なWeb検索判断"""
web_decision = self.web_search_system.should_search(question)
db_task = asyncio.create_task(self._fetch_db_coordinates(db_coordinates))
web_task = asyncio.create_task(mock_web_search_api(question)) if web_decision["should_search"] else None
try:
db_results = await asyncio.wait_for(db_task, timeout=0.5)
except asyncio.TimeoutError:
db_results = {}
context = self._build_context(question, db_results, session_context)
prompt = self._format_prompt(question, context)
partial_response = ""
final_metadata = {}
async for result in self.llm.generate_streaming(prompt):
if result['type'] == 'response_token':
token = result['token']
partial_response += token
yield result # トークンをそのまま中継
# 推論中の動的Web検索判定
if len(partial_response) > 5 and len(partial_response) % 20 == 0 and not web_task:
class MockInferenceState: partial_response = ""
inference_state = MockInferenceState()
inference_state.partial_response = partial_response
dynamic_decision = self.web_search_system.should_search(question, inference_state=inference_state)
if dynamic_decision["should_search"]:
print("\n*** Dynamic Web Search Triggered! ***\n")
web_task = asyncio.create_task(mock_web_search_api(question))
elif result['type'] == 'completion':
# Judge層で必要となる構造化されたメタデータを準備
final_metadata = result['metadata']
web_results_content = []
if web_task:
try:
web_results_content = await asyncio.wait_for(web_task, timeout=2.0)
yield {"type": "web_results", "results": web_results_content}
except asyncio.TimeoutError:
yield {"type": "web_results", "results": [], "error": "timeout"}
# 最終的なメタデータを生成して終了
final_metadata["referenced_coords"] = db_coordinates
final_metadata["web_results"] = web_results_content
yield {
"type": "final_structured_response",
"is_complete": True,
"main_response": partial_response,
**final_metadata # thinking_process, key_pointsなどを展開
}
# --- 実行例 ---
async def main():
# モックコンポーネントの初期化
llm = MockLLMClient()
db = MockDBInterface()
web_search = WebSearchAutonomySystem()
runner = RunnerEngine(llm, db, web_search)
question = "最新の心筋梗塞の診断について"
# Layer 1で抽出された想定の座標
db_coordinates = [(28, 35, 15)]
print(f"--- Running pipeline for question: '{question}' ---")
final_response = {}
async for event in runner.generate_response_streaming(question, db_coordinates):
if event['type'] == 'response_token':
print(event['token'], end='', flush=True)
elif event['type'] == 'web_results':
print(f"\n\n--- Web Results Received ---")
print(event['results'])
elif event['type'] == 'final_structured_response':
final_response = event
print("\n\n--- Final Structured Response (for Judge Layer) ---")
import json
print(json.dumps(final_response, indent=2, ensure_ascii=False))
if __name__ == "__main__":
asyncio.run(main())