nullai-knowledge-system / db_enrichment.py
kofdai's picture
Upload folder using huggingface_hub
5af8123 verified
# null_ai/db_enrichment.py
"""
Database Enrichment Module
AIを使って自動的に知識ベースを拡充するモジュール。
DeepSeekが質問を生成 → 師匠モデルが回答 → .iathに保存
"""
import logging
import asyncio
from typing import List, Dict, Any, Optional, Callable
from datetime import datetime
import uuid
import json
logger = logging.getLogger(__name__)
class AIEnrichmentEngine:
"""
AI駆動の知識ベース拡充エンジン
動作フロー:
1. DeepSeek(永久指導者)が拡充用の質問を生成
2. 師匠モデルが質問に回答
3. 座標を自動推定
4. .iathに保存
"""
def __init__(
self,
prompt_generator_func, # プロンプト生成用LLM関数
answer_generator_func, # 回答生成用LLM関数
coordinate_estimator, # CoordinateEstimator
iath_writer # IathWriter
):
"""
Args:
prompt_generator_func: async def(prompt) -> str (DeepSeek使用)
answer_generator_func: async def(prompt) -> dict (師匠モデル使用)
coordinate_estimator: CoordinateEstimatorインスタンス
iath_writer: IathWriterインスタンス
"""
self.prompt_generator = prompt_generator_func
self.answer_generator = answer_generator_func
self.coordinate_estimator = coordinate_estimator
self.iath_writer = iath_writer
self.enrichment_state = {
"is_running": False,
"progress": 0.0,
"current_question": 0,
"total_questions": 0,
"generated_tiles": 0,
"start_time": None,
"domain_id": None
}
async def generate_enrichment_questions(
self,
domain_id: str,
num_questions: int = 10,
focus_areas: Optional[List[str]] = None
) -> List[str]:
"""
DeepSeekを使って拡充用の質問を生成
Args:
domain_id: ドメインID
num_questions: 生成する質問数
focus_areas: 重点領域(例: ["基礎理論", "応用例", "最新研究"])
Returns:
生成された質問のリスト
"""
logger.info(f"Generating {num_questions} enrichment questions for domain '{domain_id}'")
# 重点領域の文字列化
focus_text = ""
if focus_areas:
focus_text = f"\nFocus on these specific areas: {', '.join(focus_areas)}"
# プロンプト構築
generation_prompt = f"""You are an expert knowledge curator in the domain of {domain_id}.
Your task is to generate {num_questions} diverse, high-quality questions that would enrich a knowledge base in this domain.
Guidelines:
1. Cover a wide range of topics within the domain
2. Include questions at different complexity levels (basic, intermediate, advanced)
3. Focus on practical applications and edge cases
4. Avoid overly broad or trivial questions
5. Each question should elicit detailed, informative answers{focus_text}
Output format:
Return ONLY a JSON array of questions, like this:
["Question 1?", "Question 2?", ...]
Domain: {domain_id}
Number of questions: {num_questions}
Generate the questions now:"""
try:
# DeepSeekでプロンプト生成
logger.info(f"Calling prompt generator with prompt length: {len(generation_prompt)}")
response = await self.prompt_generator(generation_prompt)
# 応答の長さをログ出力
logger.info(f"Received response from LLM, length: {len(response) if response else 0}")
if not response:
logger.error("LLM returned empty response!")
return []
logger.debug(f"Response preview (first 200 chars): {response[:200]}")
# JSON抽出
questions = self._extract_questions_from_response(response)
# 重複除去
questions = list(dict.fromkeys(questions)) # 順序を保持しつつ重複除去
logger.info(f"Generated {len(questions)} unique questions")
return questions[:num_questions]
except Exception as e:
logger.error(f"Failed to generate enrichment questions: {e}", exc_info=True)
return []
def _extract_questions_from_response(self, response: str) -> List[str]:
"""
LLMのレスポンスから質問リストを抽出
"""
# 空の応答チェック
if not response or not response.strip():
logger.error(f"Empty response from LLM. Response: '{response}'")
return []
try:
# JSONブロックを探す
json_str = ""
if "```json" in response:
json_start = response.find("```json") + 7
json_end = response.find("```", json_start)
json_str = response[json_start:json_end].strip()
elif "```" in response:
json_start = response.find("```") + 3
json_end = response.find("```", json_start)
json_str = response[json_start:json_end].strip()
elif "[" in response and "]" in response:
# JSON配列を直接探す
json_start = response.find("[")
json_end = response.rfind("]") + 1
json_str = response[json_start:json_end]
else:
json_str = response.strip()
# 空のjson_strチェック
if not json_str:
logger.error(f"Could not extract JSON from response. Full response: '{response[:500]}'")
return self._fallback_parse(response)
# JSONパース
questions = json.loads(json_str)
if isinstance(questions, list):
# 文字列のリストかチェック
return [q for q in questions if isinstance(q, str)]
else:
logger.error(f"Expected list, got {type(questions)}")
return []
except json.JSONDecodeError as e:
logger.error(f"JSON parse error: {e}. Attempted to parse: '{json_str[:200]}'")
logger.error(f"Full response (first 500 chars): '{response[:500]}'")
return self._fallback_parse(response)
def _fallback_parse(self, response: str) -> List[str]:
"""
JSONパースに失敗した場合のフォールバック処理
改行区切りのテキストとして質問を抽出
"""
logger.info("Attempting fallback parsing of response...")
lines = response.split('\n')
questions = []
for line in lines:
line = line.strip()
# 数字付きリストのフォーマットを除去 (1. Question -> Question)
if line and len(line) > 5: # 最低5文字以上
if line[0].isdigit() or line.startswith('-') or line.startswith('*'):
# "1. " や "- " を除去
cleaned = line.lstrip('0123456789.-* ').strip()
if cleaned and '?' in cleaned:
questions.append(cleaned)
elif '?' in line:
# 疑問符がある行をそのまま質問として扱う
questions.append(line)
logger.info(f"Fallback parsing extracted {len(questions)} questions")
return questions
async def enrich_with_ai(
self,
domain_id: str,
num_questions: int = 10,
focus_areas: Optional[List[str]] = None,
progress_callback: Optional[Callable] = None
) -> Dict[str, Any]:
"""
AIを使って知識ベースを拡充
Args:
domain_id: ドメインID
num_questions: 生成する質問数
focus_areas: 重点領域
progress_callback: 進捗コールバック async def(state)
Returns:
拡充結果
"""
self.enrichment_state.update({
"is_running": True,
"progress": 0.0,
"current_question": 0,
"total_questions": num_questions,
"generated_tiles": 0,
"start_time": datetime.utcnow().isoformat(),
"domain_id": domain_id
})
try:
# Step 1: 質問生成
logger.info(f"Step 1: Generating {num_questions} questions...")
questions = await self.generate_enrichment_questions(
domain_id, num_questions, focus_areas
)
if not questions:
return {
"success": False,
"error": "Failed to generate questions"
}
self.enrichment_state["total_questions"] = len(questions)
# Step 2: 各質問に対して回答生成 + 保存
tiles_created = 0
for i, question in enumerate(questions):
logger.info(f"Processing question {i+1}/{len(questions)}: {question[:50]}...")
self.enrichment_state["current_question"] = i + 1
self.enrichment_state["progress"] = (i / len(questions)) * 100
if progress_callback:
await progress_callback(self.enrichment_state)
try:
# 師匠モデルで回答生成
answer_result = await self.answer_generator(question)
if not answer_result or "response" not in answer_result:
logger.warning(f"No response for question: {question}")
continue
response = answer_result["response"]
confidence = answer_result.get("confidence", 0.7)
# 座標推定用の推論関数を作成
async def coord_inference(coord_prompt):
result = await self.prompt_generator(coord_prompt)
return result
# 座標推定
coord_result = await self.coordinate_estimator.estimate_coordinates(
prompt=question,
response=response,
domain_id=domain_id,
llm_inference_func=coord_inference,
use_reasoning=False
)
# .iath Tileオブジェクト作成
from null_ai.iath_writer import create_tile_from_ai_output
tile = create_tile_from_ai_output(
knowledge_id=f"ai_enrich_{uuid.uuid4().hex}",
topic=question[:100],
prompt=question,
response=response,
coordinates=coord_result["coordinates"],
confidence=confidence,
domain_id=domain_id,
source="ai_enrichment"
)
# .iathに保存
success = self.iath_writer.append_tile(tile)
if success:
tiles_created += 1
self.enrichment_state["generated_tiles"] = tiles_created
logger.info(f"Saved enrichment tile {tiles_created}: {question[:50]}...")
else:
logger.warning(f"Failed to save tile for: {question}")
except Exception as e:
logger.error(f"Error processing question '{question}': {e}")
continue
# 完了
self.enrichment_state.update({
"is_running": False,
"progress": 100.0,
"current_question": len(questions)
})
if progress_callback:
await progress_callback(self.enrichment_state)
return {
"success": True,
"questions_generated": len(questions),
"tiles_created": tiles_created,
"domain_id": domain_id
}
except Exception as e:
logger.error(f"AI enrichment failed: {e}")
self.enrichment_state["is_running"] = False
return {
"success": False,
"error": str(e)
}
def get_enrichment_status(self) -> Dict[str, Any]:
"""現在の拡充ステータスを取得"""
return self.enrichment_state.copy()
def stop_enrichment(self):
"""拡充処理を停止(実装は簡易版)"""
self.enrichment_state["is_running"] = False
logger.info("Enrichment stop requested")
class WebEnrichmentEngine:
"""
Web検索による知識ベース拡充エンジン
動作フロー:
1. Web検索(Brave Search / Google Custom Search)
2. ページから知識を抽出
3. Knowledge Tile形式に変換
4. .iathに保存
"""
def __init__(
self,
search_api_key: Optional[str] = None,
search_engine: str = "brave", # "brave" or "google"
coordinate_estimator=None,
iath_writer=None
):
"""
Args:
search_api_key: 検索APIキー
search_engine: 使用する検索エンジン
coordinate_estimator: CoordinateEstimatorインスタンス
iath_writer: IathWriterインスタンス
"""
self.api_key = search_api_key
self.search_engine = search_engine
self.coordinate_estimator = coordinate_estimator
self.iath_writer = iath_writer
async def search_web(self, query: str, max_results: int = 5) -> List[Dict[str, str]]:
"""
Web検索を実行
Args:
query: 検索クエリ
max_results: 最大結果数
Returns:
[{"title": "...", "url": "...", "snippet": "..."}, ...]
"""
if self.search_engine == "brave":
return await self._search_brave(query, max_results)
elif self.search_engine == "google":
return await self._search_google(query, max_results)
else:
raise ValueError(f"Unknown search engine: {self.search_engine}")
async def _search_brave(self, query: str, max_results: int) -> List[Dict[str, str]]:
"""Brave Search APIで検索"""
try:
import aiohttp
if not self.api_key:
logger.error("Brave Search API key not configured")
return []
url = "https://api.search.brave.com/res/v1/web/search"
headers = {
"Accept": "application/json",
"X-Subscription-Token": self.api_key
}
params = {
"q": query,
"count": max_results
}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, params=params) as response:
if response.status == 200:
data = await response.json()
results = []
for item in data.get("web", {}).get("results", []):
results.append({
"title": item.get("title", ""),
"url": item.get("url", ""),
"snippet": item.get("description", "")
})
return results
else:
logger.error(f"Brave Search API error: {response.status}")
return []
except Exception as e:
logger.error(f"Brave Search failed: {e}")
return []
async def _search_google(self, query: str, max_results: int) -> List[Dict[str, str]]:
"""Google Custom Search APIで検索"""
# TODO: Google Custom Search API実装
logger.warning("Google Custom Search not implemented yet")
return []
async def enrich_from_web(
self,
query: str,
domain_id: str,
max_results: int = 5
) -> Dict[str, Any]:
"""
Web検索結果から知識ベースを拡充
Args:
query: 検索クエリ
domain_id: ドメインID
max_results: 最大結果数
Returns:
拡充結果
"""
try:
# Web検索
search_results = await self.search_web(query, max_results)
if not search_results:
return {
"success": False,
"error": "No search results found"
}
# 各結果をKnowledge Tileに変換
tiles_created = 0
for result in search_results:
try:
# Knowledge Tile作成(Web検索結果版)
tile_content = f"""Title: {result['title']}
Source: {result['url']}
Summary:
{result['snippet']}
"""
# 座標推定(簡易版: デフォルト座標)
# TODO: より高度な座標推定
coordinates = [0.5, 0.5, 0.5, 0.6, 0.5, 0.7]
from null_ai.iath_writer import create_tile_from_ai_output
tile = create_tile_from_ai_output(
knowledge_id=f"web_{uuid.uuid4().hex}",
topic=result['title'],
prompt=query,
response=tile_content,
coordinates=coordinates,
confidence=0.6, # Web検索結果は中程度の信頼度
domain_id=domain_id,
source="web_search"
)
# .iathに保存
success = self.iath_writer.append_tile(tile)
if success:
tiles_created += 1
logger.info(f"Saved web tile: {result['title']}")
except Exception as e:
logger.error(f"Error creating tile from search result: {e}")
continue
return {
"success": True,
"search_results": len(search_results),
"tiles_created": tiles_created,
"query": query,
"domain_id": domain_id
}
except Exception as e:
logger.error(f"Web enrichment failed: {e}")
return {
"success": False,
"error": str(e)
}