Spaces:
Sleeping
Sleeping
File size: 5,854 Bytes
075a2b6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# runner_engine.py
import asyncio
from hot_cache import LRUCache
from web_search_autonomy import WebSearchAutonomySystem
# NOTE: The mock objects previously in this file have been moved to `mock_objects.py`
# for centralized test management. The main `RunnerEngine` class below is the
# actual implementation.
class RunnerEngine:
"""推論実行エンジン(Layer 3)"""
def __init__(self, llm_client, db_interface, web_search_system):
self.llm = llm_client
self.db = db_interface
self.web_search_system = web_search_system
self.hot_cache = LRUCache(max_size=20)
async def _fetch_db_coordinates(self, db_coordinates: list) -> dict:
"""DB座標から知識を取得(ホットキャッシュ利用)"""
results = {}
for coord in db_coordinates[:5]:
if coord in self.hot_cache:
print(f"Cache: Hit for {coord}")
results[coord] = self.hot_cache[coord]
continue
print(f"Cache: Miss for {coord}")
tile = await self.db.fetch_async(coord)
if tile:
self.hot_cache[coord] = tile
results[coord] = tile
return results
def _build_context(self, question: str, db_results: dict, session_context) -> str:
"""LLMプロンプト用のコンテキストを構築"""
context_parts = []
if session_context: # この例では未使用
context_parts.append(f"セッション履歴: {session_context}")
if db_results:
for coord, tile in db_results.items():
context_parts.append(f"【確実性{tile['certainty']}%】{tile['content']}")
return "\n\n".join(context_parts)
def _format_prompt(self, question: str, context: str) -> str:
return f"情報: {context}\n\n質問: {question}\n\n指示: 提供された情報に基づき回答してください。"
async def generate_response_streaming(self, question: str, db_coordinates: list, session_context=None):
"""ストリーミング形式での回答生成と動的なWeb検索判断"""
web_decision = self.web_search_system.should_search(question)
db_task = asyncio.create_task(self._fetch_db_coordinates(db_coordinates))
web_task = asyncio.create_task(mock_web_search_api(question)) if web_decision["should_search"] else None
try:
db_results = await asyncio.wait_for(db_task, timeout=0.5)
except asyncio.TimeoutError:
db_results = {}
context = self._build_context(question, db_results, session_context)
prompt = self._format_prompt(question, context)
partial_response = ""
final_metadata = {}
async for result in self.llm.generate_streaming(prompt):
if result['type'] == 'response_token':
token = result['token']
partial_response += token
yield result # トークンをそのまま中継
# 推論中の動的Web検索判定
if len(partial_response) > 5 and len(partial_response) % 20 == 0 and not web_task:
class MockInferenceState: partial_response = ""
inference_state = MockInferenceState()
inference_state.partial_response = partial_response
dynamic_decision = self.web_search_system.should_search(question, inference_state=inference_state)
if dynamic_decision["should_search"]:
print("\n*** Dynamic Web Search Triggered! ***\n")
web_task = asyncio.create_task(mock_web_search_api(question))
elif result['type'] == 'completion':
# Judge層で必要となる構造化されたメタデータを準備
final_metadata = result['metadata']
web_results_content = []
if web_task:
try:
web_results_content = await asyncio.wait_for(web_task, timeout=2.0)
yield {"type": "web_results", "results": web_results_content}
except asyncio.TimeoutError:
yield {"type": "web_results", "results": [], "error": "timeout"}
# 最終的なメタデータを生成して終了
final_metadata["referenced_coords"] = db_coordinates
final_metadata["web_results"] = web_results_content
yield {
"type": "final_structured_response",
"is_complete": True,
"main_response": partial_response,
**final_metadata # thinking_process, key_pointsなどを展開
}
# --- 実行例 ---
async def main():
# モックコンポーネントの初期化
llm = MockLLMClient()
db = MockDBInterface()
web_search = WebSearchAutonomySystem()
runner = RunnerEngine(llm, db, web_search)
question = "最新の心筋梗塞の診断について"
# Layer 1で抽出された想定の座標
db_coordinates = [(28, 35, 15)]
print(f"--- Running pipeline for question: '{question}' ---")
final_response = {}
async for event in runner.generate_response_streaming(question, db_coordinates):
if event['type'] == 'response_token':
print(event['token'], end='', flush=True)
elif event['type'] == 'web_results':
print(f"\n\n--- Web Results Received ---")
print(event['results'])
elif event['type'] == 'final_structured_response':
final_response = event
print("\n\n--- Final Structured Response (for Judge Layer) ---")
import json
print(json.dumps(final_response, indent=2, ensure_ascii=False))
if __name__ == "__main__":
asyncio.run(main())
|