Spaces:
Sleeping
Sleeping
| # runner_engine.py | |
| import asyncio | |
| from hot_cache import LRUCache | |
| from web_search_autonomy import WebSearchAutonomySystem | |
| # NOTE: The mock objects previously in this file have been moved to `mock_objects.py` | |
| # for centralized test management. The main `RunnerEngine` class below is the | |
| # actual implementation. | |
| class RunnerEngine: | |
| """推論実行エンジン(Layer 3)""" | |
| def __init__(self, llm_client, db_interface, web_search_system): | |
| self.llm = llm_client | |
| self.db = db_interface | |
| self.web_search_system = web_search_system | |
| self.hot_cache = LRUCache(max_size=20) | |
| async def _fetch_db_coordinates(self, db_coordinates: list) -> dict: | |
| """DB座標から知識を取得(ホットキャッシュ利用)""" | |
| results = {} | |
| for coord in db_coordinates[:5]: | |
| if coord in self.hot_cache: | |
| print(f"Cache: Hit for {coord}") | |
| results[coord] = self.hot_cache[coord] | |
| continue | |
| print(f"Cache: Miss for {coord}") | |
| tile = await self.db.fetch_async(coord) | |
| if tile: | |
| self.hot_cache[coord] = tile | |
| results[coord] = tile | |
| return results | |
| def _build_context(self, question: str, db_results: dict, session_context) -> str: | |
| """LLMプロンプト用のコンテキストを構築""" | |
| context_parts = [] | |
| if session_context: # この例では未使用 | |
| context_parts.append(f"セッション履歴: {session_context}") | |
| if db_results: | |
| for coord, tile in db_results.items(): | |
| context_parts.append(f"【確実性{tile['certainty']}%】{tile['content']}") | |
| return "\n\n".join(context_parts) | |
| def _format_prompt(self, question: str, context: str) -> str: | |
| return f"情報: {context}\n\n質問: {question}\n\n指示: 提供された情報に基づき回答してください。" | |
| async def generate_response_streaming(self, question: str, db_coordinates: list, session_context=None): | |
| """ストリーミング形式での回答生成と動的なWeb検索判断""" | |
| web_decision = self.web_search_system.should_search(question) | |
| db_task = asyncio.create_task(self._fetch_db_coordinates(db_coordinates)) | |
| web_task = asyncio.create_task(mock_web_search_api(question)) if web_decision["should_search"] else None | |
| try: | |
| db_results = await asyncio.wait_for(db_task, timeout=0.5) | |
| except asyncio.TimeoutError: | |
| db_results = {} | |
| context = self._build_context(question, db_results, session_context) | |
| prompt = self._format_prompt(question, context) | |
| partial_response = "" | |
| final_metadata = {} | |
| async for result in self.llm.generate_streaming(prompt): | |
| if result['type'] == 'response_token': | |
| token = result['token'] | |
| partial_response += token | |
| yield result # トークンをそのまま中継 | |
| # 推論中の動的Web検索判定 | |
| if len(partial_response) > 5 and len(partial_response) % 20 == 0 and not web_task: | |
| class MockInferenceState: partial_response = "" | |
| inference_state = MockInferenceState() | |
| inference_state.partial_response = partial_response | |
| dynamic_decision = self.web_search_system.should_search(question, inference_state=inference_state) | |
| if dynamic_decision["should_search"]: | |
| print("\n*** Dynamic Web Search Triggered! ***\n") | |
| web_task = asyncio.create_task(mock_web_search_api(question)) | |
| elif result['type'] == 'completion': | |
| # Judge層で必要となる構造化されたメタデータを準備 | |
| final_metadata = result['metadata'] | |
| web_results_content = [] | |
| if web_task: | |
| try: | |
| web_results_content = await asyncio.wait_for(web_task, timeout=2.0) | |
| yield {"type": "web_results", "results": web_results_content} | |
| except asyncio.TimeoutError: | |
| yield {"type": "web_results", "results": [], "error": "timeout"} | |
| # 最終的なメタデータを生成して終了 | |
| final_metadata["referenced_coords"] = db_coordinates | |
| final_metadata["web_results"] = web_results_content | |
| yield { | |
| "type": "final_structured_response", | |
| "is_complete": True, | |
| "main_response": partial_response, | |
| **final_metadata # thinking_process, key_pointsなどを展開 | |
| } | |
| # --- 実行例 --- | |
| async def main(): | |
| # モックコンポーネントの初期化 | |
| llm = MockLLMClient() | |
| db = MockDBInterface() | |
| web_search = WebSearchAutonomySystem() | |
| runner = RunnerEngine(llm, db, web_search) | |
| question = "最新の心筋梗塞の診断について" | |
| # Layer 1で抽出された想定の座標 | |
| db_coordinates = [(28, 35, 15)] | |
| print(f"--- Running pipeline for question: '{question}' ---") | |
| final_response = {} | |
| async for event in runner.generate_response_streaming(question, db_coordinates): | |
| if event['type'] == 'response_token': | |
| print(event['token'], end='', flush=True) | |
| elif event['type'] == 'web_results': | |
| print(f"\n\n--- Web Results Received ---") | |
| print(event['results']) | |
| elif event['type'] == 'final_structured_response': | |
| final_response = event | |
| print("\n\n--- Final Structured Response (for Judge Layer) ---") | |
| import json | |
| print(json.dumps(final_response, indent=2, ensure_ascii=False)) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |