nullai-deepseek-r1-32b / src /judge_correction_flow.py
kofdai's picture
Upload src/judge_correction_flow.py with huggingface_hub
deb3a5d verified
raw
history blame
5.84 kB
from datetime import datetime
class JudgeCorrectionFlow:
"""
β-Lobeの検証結果に基づき、回答を「承認」「自動修正」「再生成」の
いずれのアクションに振り分け、実行を制御します。
"""
def __init__(self, alpha_lobe, beta_lobe):
self.alpha_lobe = alpha_lobe
self.beta_lobe = beta_lobe
def _summarize_and_decide_action(self, validation_result: dict) -> str:
"""検証結果の深刻度に基づき、次のアクションを決定する。"""
severity = validation_result.get("severity", "none")
if severity == "critical":
return "regenerate"
if severity == "moderate":
# 中程度の問題が1つでもあれば再生成を試みる(より安全な方針)
return "regenerate"
# 軽微な問題や問題なしの場合は承認
return "approve"
def _construct_regeneration_feedback(self, validation_result: dict) -> str:
"""再生成を指示するためのフィードバック文を構築する。"""
feedback_parts = []
for check_name, check_result in validation_result.get("checks", {}).items():
issues = check_result.get("contradictions", []) + check_result.get("logical_errors", []) + check_result.get("issues", [])
for issue in issues[:2]: # 各カテゴリから最大2件
issue_type = issue.get("type", "issue")
detail = issue.get("fact", issue.get("message", "詳細不明"))
feedback_parts.append(f"✗ {issue_type}: {detail}")
return "\n".join(feedback_parts)
async def _auto_correct_response(self, original_response: str, validation_result: dict) -> str:
"""軽微な問題を自動修正する。"""
corrected_response = original_response
recommendations = validation_result.get("recommendations", [])
for rec in recommendations:
if rec['type'] == 'fact_correction':
if rec['current_statement'] in corrected_response:
corrected_response = corrected_response.replace(rec['current_statement'], rec['correct_statement'])
return corrected_response
async def process_and_correct(self, question: str, db_context: dict, session_context=None, web_results=None, max_regenerations: int = 1, domain_id: str = "medical"):
"""
質問を処理し、生成、検証、修正/再生成の完全なフローを実行する。
ドメイン対応版。
"""
regeneration_count = 0
print(f" -> JudgeCorrectionFlow開始 (domain={domain_id})")
# α-Lobeで初回回答生成(ドメイン対応)
alpha_response = await self.alpha_lobe.generate_response(
question, db_context, session_context, domain_id=domain_id
)
while regeneration_count <= max_regenerations:
# β-Lobeで検証(ドメイン対応)
validation = await self.beta_lobe.validate_response(
question, alpha_response, db_context, web_results, session_context, domain=domain_id
)
# アクションを決定
action = self._summarize_and_decide_action(validation)
print(f" -> 検証結果: severity={validation.get('severity')}, action={action}")
if action == "approve":
return {
"status": "approved",
"response": alpha_response["main_response"],
"structured": alpha_response.get("structured", {}),
"confidence": alpha_response.get("confidence", 0.0),
"validation": validation,
"domain": domain_id
}
if action == "auto_correct":
corrected_text = await self._auto_correct_response(alpha_response["main_response"], validation)
alpha_response["main_response"] = corrected_text
second_validation = await self.beta_lobe.validate_response(
question, alpha_response, db_context, web_results, session_context, domain=domain_id
)
return {
"status": "corrected",
"response": corrected_text,
"original_validation": validation,
"final_validation": second_validation,
"domain": domain_id
}
if action == "regenerate":
if regeneration_count < max_regenerations:
regeneration_count += 1
print(f" -> 再生成試行 {regeneration_count}/{max_regenerations}")
feedback = self._construct_regeneration_feedback(validation)
regeneration_prompt = f"前回の回答に以下の問題がありました:\n{feedback}\n\n元の質問: {question}\n\nこれらの点を修正して、再度回答してください。"
# α-Lobeにフィードバックを与えて再生成(ドメイン対応)
alpha_response = await self.alpha_lobe.generate_response(
regeneration_prompt, db_context, session_context, domain_id=domain_id
)
continue
else:
return {
"status": "unable_to_answer",
"reason": "再生成の上限に達しましたが、問題が解決しませんでした。",
"final_validation": validation,
"domain": domain_id
}
return {"status": "error", "message": "予期せぬエラーが発生しました。", "domain": domain_id}