| |
| """ |
| Database Enrichment Module |
| |
| AIを使って自動的に知識ベースを拡充するモジュール。 |
| DeepSeekが質問を生成 → 師匠モデルが回答 → .iathに保存 |
| """ |
|
|
| import logging |
| import asyncio |
| from typing import List, Dict, Any, Optional, Callable |
| from datetime import datetime |
| import uuid |
| import json |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class AIEnrichmentEngine: |
| """ |
| AI駆動の知識ベース拡充エンジン |
| |
| 動作フロー: |
| 1. DeepSeek(永久指導者)が拡充用の質問を生成 |
| 2. 師匠モデルが質問に回答 |
| 3. 座標を自動推定 |
| 4. .iathに保存 |
| """ |
|
|
| def __init__( |
| self, |
| prompt_generator_func, |
| answer_generator_func, |
| coordinate_estimator, |
| iath_writer |
| ): |
| """ |
| Args: |
| prompt_generator_func: async def(prompt) -> str (DeepSeek使用) |
| answer_generator_func: async def(prompt) -> dict (師匠モデル使用) |
| coordinate_estimator: CoordinateEstimatorインスタンス |
| iath_writer: IathWriterインスタンス |
| """ |
| self.prompt_generator = prompt_generator_func |
| self.answer_generator = answer_generator_func |
| self.coordinate_estimator = coordinate_estimator |
| self.iath_writer = iath_writer |
|
|
| self.enrichment_state = { |
| "is_running": False, |
| "progress": 0.0, |
| "current_question": 0, |
| "total_questions": 0, |
| "generated_tiles": 0, |
| "start_time": None, |
| "domain_id": None |
| } |
|
|
| async def generate_enrichment_questions( |
| self, |
| domain_id: str, |
| num_questions: int = 10, |
| focus_areas: Optional[List[str]] = None |
| ) -> List[str]: |
| """ |
| DeepSeekを使って拡充用の質問を生成 |
| |
| Args: |
| domain_id: ドメインID |
| num_questions: 生成する質問数 |
| focus_areas: 重点領域(例: ["基礎理論", "応用例", "最新研究"]) |
| |
| Returns: |
| 生成された質問のリスト |
| """ |
| logger.info(f"Generating {num_questions} enrichment questions for domain '{domain_id}'") |
|
|
| |
| focus_text = "" |
| if focus_areas: |
| focus_text = f"\nFocus on these specific areas: {', '.join(focus_areas)}" |
|
|
| |
| generation_prompt = f"""You are an expert knowledge curator in the domain of {domain_id}. |
| |
| Your task is to generate {num_questions} diverse, high-quality questions that would enrich a knowledge base in this domain. |
| |
| Guidelines: |
| 1. Cover a wide range of topics within the domain |
| 2. Include questions at different complexity levels (basic, intermediate, advanced) |
| 3. Focus on practical applications and edge cases |
| 4. Avoid overly broad or trivial questions |
| 5. Each question should elicit detailed, informative answers{focus_text} |
| |
| Output format: |
| Return ONLY a JSON array of questions, like this: |
| ["Question 1?", "Question 2?", ...] |
| |
| Domain: {domain_id} |
| Number of questions: {num_questions} |
| |
| Generate the questions now:""" |
|
|
| try: |
| |
| logger.info(f"Calling prompt generator with prompt length: {len(generation_prompt)}") |
| response = await self.prompt_generator(generation_prompt) |
|
|
| |
| logger.info(f"Received response from LLM, length: {len(response) if response else 0}") |
| if not response: |
| logger.error("LLM returned empty response!") |
| return [] |
|
|
| logger.debug(f"Response preview (first 200 chars): {response[:200]}") |
|
|
| |
| questions = self._extract_questions_from_response(response) |
|
|
| |
| questions = list(dict.fromkeys(questions)) |
|
|
| logger.info(f"Generated {len(questions)} unique questions") |
| return questions[:num_questions] |
|
|
| except Exception as e: |
| logger.error(f"Failed to generate enrichment questions: {e}", exc_info=True) |
| return [] |
|
|
| def _extract_questions_from_response(self, response: str) -> List[str]: |
| """ |
| LLMのレスポンスから質問リストを抽出 |
| """ |
| |
| if not response or not response.strip(): |
| logger.error(f"Empty response from LLM. Response: '{response}'") |
| return [] |
|
|
| try: |
| |
| json_str = "" |
| if "```json" in response: |
| json_start = response.find("```json") + 7 |
| json_end = response.find("```", json_start) |
| json_str = response[json_start:json_end].strip() |
| elif "```" in response: |
| json_start = response.find("```") + 3 |
| json_end = response.find("```", json_start) |
| json_str = response[json_start:json_end].strip() |
| elif "[" in response and "]" in response: |
| |
| json_start = response.find("[") |
| json_end = response.rfind("]") + 1 |
| json_str = response[json_start:json_end] |
| else: |
| json_str = response.strip() |
|
|
| |
| if not json_str: |
| logger.error(f"Could not extract JSON from response. Full response: '{response[:500]}'") |
| return self._fallback_parse(response) |
|
|
| |
| questions = json.loads(json_str) |
|
|
| if isinstance(questions, list): |
| |
| return [q for q in questions if isinstance(q, str)] |
| else: |
| logger.error(f"Expected list, got {type(questions)}") |
| return [] |
|
|
| except json.JSONDecodeError as e: |
| logger.error(f"JSON parse error: {e}. Attempted to parse: '{json_str[:200]}'") |
| logger.error(f"Full response (first 500 chars): '{response[:500]}'") |
| return self._fallback_parse(response) |
|
|
| def _fallback_parse(self, response: str) -> List[str]: |
| """ |
| JSONパースに失敗した場合のフォールバック処理 |
| 改行区切りのテキストとして質問を抽出 |
| """ |
| logger.info("Attempting fallback parsing of response...") |
| lines = response.split('\n') |
| questions = [] |
| for line in lines: |
| line = line.strip() |
| |
| if line and len(line) > 5: |
| if line[0].isdigit() or line.startswith('-') or line.startswith('*'): |
| |
| cleaned = line.lstrip('0123456789.-* ').strip() |
| if cleaned and '?' in cleaned: |
| questions.append(cleaned) |
| elif '?' in line: |
| |
| questions.append(line) |
|
|
| logger.info(f"Fallback parsing extracted {len(questions)} questions") |
| return questions |
|
|
| async def enrich_with_ai( |
| self, |
| domain_id: str, |
| num_questions: int = 10, |
| focus_areas: Optional[List[str]] = None, |
| progress_callback: Optional[Callable] = None |
| ) -> Dict[str, Any]: |
| """ |
| AIを使って知識ベースを拡充 |
| |
| Args: |
| domain_id: ドメインID |
| num_questions: 生成する質問数 |
| focus_areas: 重点領域 |
| progress_callback: 進捗コールバック async def(state) |
| |
| Returns: |
| 拡充結果 |
| """ |
| self.enrichment_state.update({ |
| "is_running": True, |
| "progress": 0.0, |
| "current_question": 0, |
| "total_questions": num_questions, |
| "generated_tiles": 0, |
| "start_time": datetime.utcnow().isoformat(), |
| "domain_id": domain_id |
| }) |
|
|
| try: |
| |
| logger.info(f"Step 1: Generating {num_questions} questions...") |
| questions = await self.generate_enrichment_questions( |
| domain_id, num_questions, focus_areas |
| ) |
|
|
| if not questions: |
| return { |
| "success": False, |
| "error": "Failed to generate questions" |
| } |
|
|
| self.enrichment_state["total_questions"] = len(questions) |
|
|
| |
| tiles_created = 0 |
|
|
| for i, question in enumerate(questions): |
| logger.info(f"Processing question {i+1}/{len(questions)}: {question[:50]}...") |
|
|
| self.enrichment_state["current_question"] = i + 1 |
| self.enrichment_state["progress"] = (i / len(questions)) * 100 |
|
|
| if progress_callback: |
| await progress_callback(self.enrichment_state) |
|
|
| try: |
| |
| answer_result = await self.answer_generator(question) |
|
|
| if not answer_result or "response" not in answer_result: |
| logger.warning(f"No response for question: {question}") |
| continue |
|
|
| response = answer_result["response"] |
| confidence = answer_result.get("confidence", 0.7) |
|
|
| |
| async def coord_inference(coord_prompt): |
| result = await self.prompt_generator(coord_prompt) |
| return result |
|
|
| |
| coord_result = await self.coordinate_estimator.estimate_coordinates( |
| prompt=question, |
| response=response, |
| domain_id=domain_id, |
| llm_inference_func=coord_inference, |
| use_reasoning=False |
| ) |
|
|
| |
| from null_ai.iath_writer import create_tile_from_ai_output |
|
|
| tile = create_tile_from_ai_output( |
| knowledge_id=f"ai_enrich_{uuid.uuid4().hex}", |
| topic=question[:100], |
| prompt=question, |
| response=response, |
| coordinates=coord_result["coordinates"], |
| confidence=confidence, |
| domain_id=domain_id, |
| source="ai_enrichment" |
| ) |
|
|
| |
| success = self.iath_writer.append_tile(tile) |
|
|
| if success: |
| tiles_created += 1 |
| self.enrichment_state["generated_tiles"] = tiles_created |
| logger.info(f"Saved enrichment tile {tiles_created}: {question[:50]}...") |
| else: |
| logger.warning(f"Failed to save tile for: {question}") |
|
|
| except Exception as e: |
| logger.error(f"Error processing question '{question}': {e}") |
| continue |
|
|
| |
| self.enrichment_state.update({ |
| "is_running": False, |
| "progress": 100.0, |
| "current_question": len(questions) |
| }) |
|
|
| if progress_callback: |
| await progress_callback(self.enrichment_state) |
|
|
| return { |
| "success": True, |
| "questions_generated": len(questions), |
| "tiles_created": tiles_created, |
| "domain_id": domain_id |
| } |
|
|
| except Exception as e: |
| logger.error(f"AI enrichment failed: {e}") |
| self.enrichment_state["is_running"] = False |
| return { |
| "success": False, |
| "error": str(e) |
| } |
|
|
| def get_enrichment_status(self) -> Dict[str, Any]: |
| """現在の拡充ステータスを取得""" |
| return self.enrichment_state.copy() |
|
|
| def stop_enrichment(self): |
| """拡充処理を停止(実装は簡易版)""" |
| self.enrichment_state["is_running"] = False |
| logger.info("Enrichment stop requested") |
|
|
|
|
| class WebEnrichmentEngine: |
| """ |
| Web検索による知識ベース拡充エンジン |
| |
| 動作フロー: |
| 1. Web検索(Brave Search / Google Custom Search) |
| 2. ページから知識を抽出 |
| 3. Knowledge Tile形式に変換 |
| 4. .iathに保存 |
| """ |
|
|
| def __init__( |
| self, |
| search_api_key: Optional[str] = None, |
| search_engine: str = "brave", |
| coordinate_estimator=None, |
| iath_writer=None |
| ): |
| """ |
| Args: |
| search_api_key: 検索APIキー |
| search_engine: 使用する検索エンジン |
| coordinate_estimator: CoordinateEstimatorインスタンス |
| iath_writer: IathWriterインスタンス |
| """ |
| self.api_key = search_api_key |
| self.search_engine = search_engine |
| self.coordinate_estimator = coordinate_estimator |
| self.iath_writer = iath_writer |
|
|
| async def search_web(self, query: str, max_results: int = 5) -> List[Dict[str, str]]: |
| """ |
| Web検索を実行 |
| |
| Args: |
| query: 検索クエリ |
| max_results: 最大結果数 |
| |
| Returns: |
| [{"title": "...", "url": "...", "snippet": "..."}, ...] |
| """ |
| if self.search_engine == "brave": |
| return await self._search_brave(query, max_results) |
| elif self.search_engine == "google": |
| return await self._search_google(query, max_results) |
| else: |
| raise ValueError(f"Unknown search engine: {self.search_engine}") |
|
|
| async def _search_brave(self, query: str, max_results: int) -> List[Dict[str, str]]: |
| """Brave Search APIで検索""" |
| try: |
| import aiohttp |
|
|
| if not self.api_key: |
| logger.error("Brave Search API key not configured") |
| return [] |
|
|
| url = "https://api.search.brave.com/res/v1/web/search" |
| headers = { |
| "Accept": "application/json", |
| "X-Subscription-Token": self.api_key |
| } |
| params = { |
| "q": query, |
| "count": max_results |
| } |
|
|
| async with aiohttp.ClientSession() as session: |
| async with session.get(url, headers=headers, params=params) as response: |
| if response.status == 200: |
| data = await response.json() |
| results = [] |
|
|
| for item in data.get("web", {}).get("results", []): |
| results.append({ |
| "title": item.get("title", ""), |
| "url": item.get("url", ""), |
| "snippet": item.get("description", "") |
| }) |
|
|
| return results |
| else: |
| logger.error(f"Brave Search API error: {response.status}") |
| return [] |
|
|
| except Exception as e: |
| logger.error(f"Brave Search failed: {e}") |
| return [] |
|
|
| async def _search_google(self, query: str, max_results: int) -> List[Dict[str, str]]: |
| """Google Custom Search APIで検索""" |
| |
| logger.warning("Google Custom Search not implemented yet") |
| return [] |
|
|
| async def enrich_from_web( |
| self, |
| query: str, |
| domain_id: str, |
| max_results: int = 5 |
| ) -> Dict[str, Any]: |
| """ |
| Web検索結果から知識ベースを拡充 |
| |
| Args: |
| query: 検索クエリ |
| domain_id: ドメインID |
| max_results: 最大結果数 |
| |
| Returns: |
| 拡充結果 |
| """ |
| try: |
| |
| search_results = await self.search_web(query, max_results) |
|
|
| if not search_results: |
| return { |
| "success": False, |
| "error": "No search results found" |
| } |
|
|
| |
| tiles_created = 0 |
|
|
| for result in search_results: |
| try: |
| |
| tile_content = f"""Title: {result['title']} |
| |
| Source: {result['url']} |
| |
| Summary: |
| {result['snippet']} |
| """ |
|
|
| |
| |
| coordinates = [0.5, 0.5, 0.5, 0.6, 0.5, 0.7] |
|
|
| from null_ai.iath_writer import create_tile_from_ai_output |
|
|
| tile = create_tile_from_ai_output( |
| knowledge_id=f"web_{uuid.uuid4().hex}", |
| topic=result['title'], |
| prompt=query, |
| response=tile_content, |
| coordinates=coordinates, |
| confidence=0.6, |
| domain_id=domain_id, |
| source="web_search" |
| ) |
|
|
| |
| success = self.iath_writer.append_tile(tile) |
|
|
| if success: |
| tiles_created += 1 |
| logger.info(f"Saved web tile: {result['title']}") |
|
|
| except Exception as e: |
| logger.error(f"Error creating tile from search result: {e}") |
| continue |
|
|
| return { |
| "success": True, |
| "search_results": len(search_results), |
| "tiles_created": tiles_created, |
| "query": query, |
| "domain_id": domain_id |
| } |
|
|
| except Exception as e: |
| logger.error(f"Web enrichment failed: {e}") |
| return { |
| "success": False, |
| "error": str(e) |
| } |
|
|