financial-rag-chatbot / app /metacognitive_agent.py
Claude
Add complete Financial RAG system with Metacognitive Agent
f6b05db unverified
"""
๋ฉ”ํƒ€์ธ์ง€ ์—์ด์ „ํŠธ (Metacognitive Agent)
์ด ์—์ด์ „ํŠธ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๋ฉ”ํƒ€์ธ์ง€ ์ „๋žต์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค:
1. Planning (๊ณ„ํš): ๋‹ต๋ณ€ ์ „๋žต ์ˆ˜๋ฆฝ
2. Monitoring (๊ฐ์‹œ): ๋‹ต๋ณ€ ๊ณผ์ • ๋ชจ๋‹ˆํ„ฐ๋ง
3. Evaluation (ํ‰๊ฐ€): ๋‹ต๋ณ€ ํ’ˆ์งˆ ํ‰๊ฐ€
4. Revision (์ˆ˜์ •): ํ•„์š”์‹œ ๋‹ต๋ณ€ ๊ฐœ์„ 
"""
from typing import List, Dict, Optional
from anthropic import Anthropic
from loguru import logger
import json
class MetaCognitiveAgent:
"""๋ฉ”ํƒ€์ธ์ง€ ๋Šฅ๋ ฅ์„ ๊ฐ€์ง„ AI ์—์ด์ „ํŠธ"""
def __init__(self, api_key: str):
"""
Args:
api_key: Anthropic API ํ‚ค
"""
self.client = Anthropic(api_key=api_key)
self.thinking_history = []
self.model = "claude-3-5-sonnet-20241022"
# ๋ฉ”ํƒ€์ธ์ง€ ํ”„๋กฌํ”„ํŠธ
self.reflection_prompts = {
"planning": """
๋‹น์‹ ์€ ๊ธˆ์œต/๊ฒฝ์ œ ๋ถ„์•ผ์˜ ์ „๋ฌธ๊ฐ€์ž…๋‹ˆ๋‹ค. ๋‹ค์Œ ์งˆ๋ฌธ์— ๋‹ตํ•˜๊ธฐ ์œ„ํ•œ ์ „๋žต์„ ์ˆ˜๋ฆฝํ•˜์„ธ์š”.
์งˆ๋ฌธ: {query}
๊ฒ€์ƒ‰๋œ ๊ด€๋ จ ๋ฌธ์„œ:
{context}
๋‹ค์Œ ์‚ฌํ•ญ์„ ๊ณ ๋ คํ•˜์—ฌ ๋‹ต๋ณ€ ๊ณ„ํš์„ ์„ธ์šฐ์„ธ์š”:
1. ์งˆ๋ฌธ์ด ์š”๊ตฌํ•˜๋Š” ํ•ต์‹ฌ ์ •๋ณด๋Š” ๋ฌด์—‡์ธ๊ฐ€?
2. ์ œ๊ณต๋œ ๋ฌธ์„œ๋“ค์ด ์งˆ๋ฌธ์— ๋‹ตํ•˜๊ธฐ์— ์ถฉ๋ถ„ํ•œ๊ฐ€?
3. ์–ด๋–ค ์ •๋ณด๋ฅผ ์šฐ์„ ์ ์œผ๋กœ ์‚ฌ์šฉํ•ด์•ผ ํ•˜๋Š”๊ฐ€?
4. ์ฃผ์˜ํ•ด์•ผ ํ•  ์ ์ด๋‚˜ ํ•œ๊ณ„๋Š” ๋ฌด์—‡์ธ๊ฐ€?
๊ณ„ํš์„ JSON ํ˜•์‹์œผ๋กœ ์ž‘์„ฑํ•˜์„ธ์š”:
{{
"key_information": "์งˆ๋ฌธ์˜ ํ•ต์‹ฌ ์ •๋ณด",
"context_adequacy": "๋ฌธ์„œ์˜ ์ถฉ๋ถ„์„ฑ (์ถฉ๋ถ„/๋ถ€์กฑ/๋ถˆํ™•์‹ค)",
"strategy": "๋‹ต๋ณ€ ์ „๋žต",
"limitations": "์ฃผ์˜์‚ฌํ•ญ ๋ฐ ํ•œ๊ณ„"
}}
""",
"monitoring": """
ํ˜„์žฌ ์ƒ์„ฑ ์ค‘์ธ ๋‹ต๋ณ€์„ ๊ฒ€ํ† ํ•˜์„ธ์š”.
์งˆ๋ฌธ: {query}
ํ˜„์žฌ ๋‹ต๋ณ€: {response}
๋‹ค์Œ์„ ํ™•์ธํ•˜์„ธ์š”:
1. ๋‹ต๋ณ€์ด ์งˆ๋ฌธ์— ์ง์ ‘์ ์œผ๋กœ ๋Œ€๋‹ตํ•˜๊ณ  ์žˆ๋Š”๊ฐ€?
2. ์ œ๊ณต๋œ ๋ฌธ์„œ์˜ ์ •๋ณด๋ฅผ ์ •ํ™•ํžˆ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ๋Š”๊ฐ€?
3. ์ถ”๋ก ์ด ๋…ผ๋ฆฌ์ ์œผ๋กœ ํƒ€๋‹นํ•œ๊ฐ€?
4. Hallucination(๊ทผ๊ฑฐ ์—†๋Š” ์ •๋ณด)์ด ํฌํ•จ๋˜์–ด ์žˆ์ง€ ์•Š์€๊ฐ€?
ํ‰๊ฐ€๋ฅผ JSON ํ˜•์‹์œผ๋กœ ์ž‘์„ฑํ•˜์„ธ์š”:
{{
"relevance": "์งˆ๋ฌธ๊ณผ์˜ ๊ด€๋ จ์„ฑ (๋†’์Œ/์ค‘๊ฐ„/๋‚ฎ์Œ)",
"accuracy": "์ •ํ™•์„ฑ (๋†’์Œ/์ค‘๊ฐ„/๋‚ฎ์Œ)",
"logic": "๋…ผ๋ฆฌ์„ฑ (ํƒ€๋‹นํ•จ/๋ณดํ†ต/๋ฌธ์ œ์žˆ์Œ)",
"hallucination_risk": "Hallucination ์œ„ํ—˜๋„ (๋‚ฎ์Œ/์ค‘๊ฐ„/๋†’์Œ)",
"issues": ["๋ฐœ๊ฒฌ๋œ ๋ฌธ์ œ์ ๋“ค"]
}}
""",
"evaluation": """
์ตœ์ข… ๋‹ต๋ณ€์„ ํ‰๊ฐ€ํ•˜์„ธ์š”.
์งˆ๋ฌธ: {query}
๋‹ต๋ณ€: {response}
์‚ฌ์šฉ๋œ ์ถœ์ฒ˜: {sources}
๋‹ค์Œ ๊ธฐ์ค€์œผ๋กœ ํ‰๊ฐ€ํ•˜์„ธ์š”:
1. ์™„์ „์„ฑ: ์งˆ๋ฌธ์— ์™„์ „ํžˆ ๋‹ตํ–ˆ๋Š”๊ฐ€?
2. ์ •ํ™•์„ฑ: ์ •๋ณด๊ฐ€ ์ •ํ™•ํ•œ๊ฐ€?
3. ๋ช…ํ™•์„ฑ: ๋‹ต๋ณ€์ด ๋ช…ํ™•ํ•˜๊ณ  ์ดํ•ดํ•˜๊ธฐ ์‰ฌ์šด๊ฐ€?
4. ์‹ ๋ขฐ์„ฑ: ์ถœ์ฒ˜๊ฐ€ ๋ช…ํ™•ํ•˜๊ณ  ์‹ ๋ขฐํ•  ์ˆ˜ ์žˆ๋Š”๊ฐ€?
ํ‰๊ฐ€๋ฅผ JSON ํ˜•์‹์œผ๋กœ ์ž‘์„ฑํ•˜์„ธ์š”:
{{
"completeness": "์™„์ „์„ฑ ์ ์ˆ˜ (1-10)",
"accuracy": "์ •ํ™•์„ฑ ์ ์ˆ˜ (1-10)",
"clarity": "๋ช…ํ™•์„ฑ ์ ์ˆ˜ (1-10)",
"reliability": "์‹ ๋ขฐ์„ฑ ์ ์ˆ˜ (1-10)",
"overall_score": "์ „์ฒด ์ ์ˆ˜ (1-10)",
"feedback": "๊ฐœ์„ ์ด ํ•„์š”ํ•œ ๋ถ€๋ถ„"
}}
""",
"revision": """
๋‹ต๋ณ€์„ ๊ฐœ์„ ํ•˜์„ธ์š”.
์›๋ณธ ๋‹ต๋ณ€: {response}
ํ‰๊ฐ€ ํ”ผ๋“œ๋ฐฑ: {feedback}
ํ”ผ๋“œ๋ฐฑ์„ ๋ฐ”ํƒ•์œผ๋กœ ๋‹ต๋ณ€์„ ๊ฐœ์„ ํ•˜์„ธ์š”. ํŠนํžˆ:
1. ๋ถ€์ •ํ™•ํ•œ ์ •๋ณด ์ˆ˜์ •
2. ๋ถˆ์™„์ „ํ•œ ๋ถ€๋ถ„ ๋ณด์™„
3. ๋ถˆ๋ช…ํ™•ํ•œ ํ‘œํ˜„ ๊ฐœ์„ 
4. ๊ทผ๊ฑฐ ์—†๋Š” ์ฃผ์žฅ ์ œ๊ฑฐ
๊ฐœ์„ ๋œ ๋‹ต๋ณ€๋งŒ ์ œ๊ณตํ•˜์„ธ์š”.
"""
}
async def think_and_reflect(
self,
query: str,
context_documents: List[Dict],
max_iterations: int = 2
) -> Dict:
"""
๋ฉ”ํƒ€์ธ์ง€ ๊ณผ์ •์„ ํ†ตํ•œ ๋‹ต๋ณ€ ์ƒ์„ฑ
Args:
query: ์‚ฌ์šฉ์ž ์งˆ๋ฌธ
context_documents: ๊ฒ€์ƒ‰๋œ ๊ด€๋ จ ๋ฌธ์„œ๋“ค
max_iterations: ์ตœ๋Œ€ ๊ฐœ์„  ๋ฐ˜๋ณต ํšŸ์ˆ˜
Returns:
์ตœ์ข… ๋‹ต๋ณ€ ๋ฐ ๋ฉ”ํƒ€์ธ์ง€ ๊ณผ์ • ์ •๋ณด
"""
self.thinking_history = []
# ์ปจํ…์ŠคํŠธ ํฌ๋งทํŒ…
context_text = self._format_context(context_documents)
# 1๋‹จ๊ณ„: ๊ณ„ํš ์ˆ˜๋ฆฝ (Planning)
logger.info("1๏ธโƒฃ Planning: ๋‹ต๋ณ€ ์ „๋žต ์ˆ˜๋ฆฝ ์ค‘...")
plan = await self._plan(query, context_text)
self.thinking_history.append({"step": "planning", "content": plan})
# 2๋‹จ๊ณ„: ์ดˆ๊ธฐ ์‘๋‹ต ์ƒ์„ฑ
logger.info("2๏ธโƒฃ Generating: ์ดˆ๊ธฐ ๋‹ต๋ณ€ ์ƒ์„ฑ ์ค‘...")
initial_response = await self._generate_response(query, context_text, plan)
self.thinking_history.append({"step": "initial_response", "content": initial_response})
# 3๋‹จ๊ณ„: ๋ชจ๋‹ˆํ„ฐ๋ง (Monitoring)
logger.info("3๏ธโƒฃ Monitoring: ๋‹ต๋ณ€ ๊ฒ€ํ†  ์ค‘...")
monitoring_result = await self._monitor(query, initial_response)
self.thinking_history.append({"step": "monitoring", "content": monitoring_result})
current_response = initial_response
# 4๋‹จ๊ณ„: ๋ฐ˜๋ณต์  ๊ฐœ์„ 
for iteration in range(max_iterations):
# ํ‰๊ฐ€ (Evaluation)
logger.info(f"4๏ธโƒฃ Evaluation [{iteration + 1}/{max_iterations}]: ๋‹ต๋ณ€ ํ‰๊ฐ€ ์ค‘...")
evaluation = await self._evaluate(
query,
current_response,
[doc.get('source_filename', 'unknown') for doc in context_documents]
)
self.thinking_history.append({"step": f"evaluation_{iteration}", "content": evaluation})
# ํ‰๊ฐ€ ์ ์ˆ˜๊ฐ€ ์ถฉ๋ถ„ํžˆ ๋†’์œผ๋ฉด ์ข…๋ฃŒ
try:
eval_data = json.loads(evaluation)
overall_score = float(eval_data.get('overall_score', 0))
if overall_score >= 8.0:
logger.info(f"โœ… ์ถฉ๋ถ„ํ•œ ํ’ˆ์งˆ ๋‹ฌ์„ฑ (์ ์ˆ˜: {overall_score}/10)")
break
except:
pass
# ๊ฐœ์„  (Revision)
logger.info(f"5๏ธโƒฃ Revision [{iteration + 1}/{max_iterations}]: ๋‹ต๋ณ€ ๊ฐœ์„  ์ค‘...")
current_response = await self._revise(current_response, evaluation)
self.thinking_history.append({"step": f"revision_{iteration}", "content": current_response})
return {
"query": query,
"final_response": current_response,
"thinking_history": self.thinking_history,
"context_documents": context_documents,
"iterations": len([h for h in self.thinking_history if "revision" in h["step"]])
}
async def _plan(self, query: str, context: str) -> str:
"""๊ณ„ํš ์ˆ˜๋ฆฝ"""
prompt = self.reflection_prompts["planning"].format(
query=query,
context=context
)
message = self.client.messages.create(
model=self.model,
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
return message.content[0].text
async def _generate_response(self, query: str, context: str, plan: str) -> str:
"""์ดˆ๊ธฐ ์‘๋‹ต ์ƒ์„ฑ"""
prompt = f"""
๋‹น์‹ ์€ ๊ธˆ์œต/๊ฒฝ์ œ ๋ถ„์•ผ์˜ ์ „๋ฌธ๊ฐ€์ž…๋‹ˆ๋‹ค.
๋‹ต๋ณ€ ๊ณ„ํš:
{plan}
์งˆ๋ฌธ: {query}
์ฐธ๊ณ  ๋ฌธ์„œ:
{context}
์œ„ ๊ณ„ํš์„ ๋ฐ”ํƒ•์œผ๋กœ ์งˆ๋ฌธ์— ๋‹ต๋ณ€ํ•˜์„ธ์š”. ๋ฐ˜๋“œ์‹œ:
1. ์ œ๊ณต๋œ ๋ฌธ์„œ์˜ ์ •๋ณด๋งŒ ์‚ฌ์šฉํ•˜์„ธ์š”
2. ํ™•์‹คํ•˜์ง€ ์•Š์€ ์ •๋ณด๋Š” ์ถ”์ธกํ•˜์ง€ ๋งˆ์„ธ์š”
3. ์ถœ์ฒ˜๋ฅผ ๋ช…ํ™•ํžˆ ๋ฐํžˆ์„ธ์š”
4. ํ•œ๊ตญ์–ด๋กœ ๋‹ต๋ณ€ํ•˜์„ธ์š”
"""
message = self.client.messages.create(
model=self.model,
max_tokens=2048,
messages=[{"role": "user", "content": prompt}]
)
return message.content[0].text
async def _monitor(self, query: str, response: str) -> str:
"""๋‹ต๋ณ€ ๋ชจ๋‹ˆํ„ฐ๋ง"""
prompt = self.reflection_prompts["monitoring"].format(
query=query,
response=response
)
message = self.client.messages.create(
model=self.model,
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
return message.content[0].text
async def _evaluate(self, query: str, response: str, sources: List[str]) -> str:
"""๋‹ต๋ณ€ ํ‰๊ฐ€"""
prompt = self.reflection_prompts["evaluation"].format(
query=query,
response=response,
sources=", ".join(sources)
)
message = self.client.messages.create(
model=self.model,
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
return message.content[0].text
async def _revise(self, response: str, feedback: str) -> str:
"""๋‹ต๋ณ€ ๊ฐœ์„ """
prompt = self.reflection_prompts["revision"].format(
response=response,
feedback=feedback
)
message = self.client.messages.create(
model=self.model,
max_tokens=2048,
messages=[{"role": "user", "content": prompt}]
)
return message.content[0].text
def _format_context(self, documents: List[Dict]) -> str:
"""๋ฌธ์„œ๋“ค์„ ์ปจํ…์ŠคํŠธ ํ…์ŠคํŠธ๋กœ ํฌ๋งทํŒ…"""
formatted = []
for i, doc in enumerate(documents, 1):
text = doc.get('text', doc.get('document', ''))
metadata = doc.get('metadata', {})
source = metadata.get('source_filename', 'Unknown')
formatted.append(f"[๋ฌธ์„œ {i}] {source}\n{text}\n")
return "\n".join(formatted)