grantforge-api / backend /agents /panel_nodes.py
GrantForge Bot
Deploy to Hugging Face
2754b82
from typing import Dict, Any
from langchain_core.messages import HumanMessage, ToolMessage, SystemMessage, AIMessage
from core.llm_router import get_llm
from core.search.regulation_engine import regulation_engine, kruczkowski_trap_agent
from core.trust.trust_scorer import compute_grant_trust_score
from agents.auditor import (
GlobalAuditOutput,
_PerspectiveResult,
_ROLE_PROMPTS,
_SHARED_INSTRUCTIONS,
)
from agents.tools.legal_retriever_tool import search_legal_documents
from agents.tools.krs_graph_tool import analyze_company_network
from agents.tools.neo4j_cypher_tool import query_neo4j_graph
from agents.tools.budget_rules_tool import search_budget_rules
from agents.tools.technology_retriever_tool import search_technology_trends
from agents.panel_state import AuditorPanelState
import logging
logger = logging.getLogger(__name__)
# --- PRAWNIK NODE (Dynamic Query Routing) ---
def prawnik_node(state: AuditorPanelState) -> Dict[str, Any]:
"""Agent Prawny z obsługą poszukiwań w RAG oraz RAG Grafowym (KRS)."""
llm_with_tools = get_llm(
task_type="legal_audit",
tools=[search_legal_documents, analyze_company_network, query_neo4j_graph],
)
# Inicjalizacja wiadomości, jeśli pierwsze wywołanie
messages = state.get("messages", [])
initial_messages_added = []
if not messages:
ext_prompt = (
"Zewnętrzny Rewizor: Weryfikujesz cudzy, gotowy wniosek (z biura konsultingowego) przesłany do nas w celu tzw. Reverse-Audit. Nastaw się na bezlitosną weryfikację błędów."
if state.get("is_external_audit", False)
else ""
)
sys_prompt = f"{_ROLE_PROMPTS['prawnik']}\n{ext_prompt}\n{_SHARED_INSTRUCTIONS}\n\nProgram: {state['program_name']}\nZanim ocenisz, zawsze skorzystaj z narzędzia search_legal_documents, aby sprawdzić wymogi dla perspektywy {state['program_name']}. Jeśli nie znajdziesz nic lub perspektywa nie będzie się zgadzać, PONÓW WYSZUKIWANIE z innym zapytaniem. Jak jesteś gotowy wydać ocenę wywołaj narzędzie submit_evaluation, NIE generuj go jako plain text."
initial_messages_added.append(SystemMessage(content=sys_prompt))
initial_messages_added.append(
HumanMessage(content=f"TREŚĆ WNIOSKU:\n{state['content'][:150000]}")
)
messages = initial_messages_added
# Wywołanie modelu
try:
response = llm_with_tools.invoke(messages)
except Exception as e:
logger.error(f"[PRAWNIK] Błąd wywołania modelu: {e}")
response = AIMessage(
content=f"Wystąpił błąd podczas wywołania LLM: {e}. Przechodzę do podsumowania."
)
return {
"messages": initial_messages_added + [response],
"legal_attempts": state.get("legal_attempts", 0),
}
def prawnik_tools_node(state: AuditorPanelState) -> Dict[str, Any]:
"""Uruchamia narzędzie wyszukiwania dla Prawnika."""
last_message = state["messages"][-1]
tool_messages = []
for tool_call in last_message.tool_calls:
if tool_call["name"] in [
"search_legal_documents",
"analyze_company_network",
"query_neo4j_graph",
]:
logger.info(
f"[PRAWNIK] Wykorzystanie narzędzia {tool_call['name']}: {tool_call['args']}"
)
# Bezpieczne wykonanie narzędzia
try:
if tool_call["name"] == "search_legal_documents":
result = search_legal_documents.invoke(tool_call["args"])
elif tool_call["name"] == "analyze_company_network":
result = analyze_company_network.invoke(tool_call["args"])
else:
result = query_neo4j_graph.invoke(tool_call["args"])
except Exception as e:
result = f"Błąd wykonania narzędzia: {e}"
tool_messages.append(
ToolMessage(content=result, tool_call_id=tool_call["id"])
)
return {
"messages": tool_messages,
"legal_attempts": state.get("legal_attempts", 0) + 1,
"legal_queries": [str(tc["args"]) for tc in last_message.tool_calls],
}
def prawnik_evaluator_node(state: AuditorPanelState) -> Dict[str, Any]:
"""Generuje ostateczny Pydantic output Prawnika po zebraniu wiedzy z RAG."""
# Ekstrakcja do schematu
llm = get_llm(task_type="legal_audit", structured_output_schema=_PerspectiveResult)
# Przebieg całej konwersacji prawnika
conversation_text = "\n".join(
[m.content for m in state["messages"] if isinstance(m.content, str)]
)
# Aktywne użycie silnika w Prawniku (Faza 3) - nie tylko kontekst, ale weryfikacja
engine_rules = ""
engine_compliance_check = ""
trust_context = ""
try:
rules = regulation_engine.get_structured_rules_for_program(state['program_name'] or "")
if rules:
engine_rules = "\n\n--- STRUCTURED RULES FROM REGULATION ENGINE (traktuj jako źródło prawdy) ---\n"
engine_rules += "\n".join(rules.get("key_rules", [])[:7])
# Aktywna weryfikacja + wpływ na wynik (Faza 3)
compliance = regulation_engine.check_cost_eligibility(state['program_name'] or "", state['content'][:3000])
if compliance.get("status") == "evaluated":
engine_compliance_check = f"\n\n--- REGULATION ENGINE COMPLIANCE CHECK ---\n{compliance}"
if compliance.get("severity") == "critical":
engine_compliance_check += "\n[CRITICAL ISSUE - RECOMMEND BLOCKING EXPORT OR HUMAN REVIEW]"
# Dodajemy issue bezpośrednio do wyniku (zostanie scalone w ewaluatorze)
# v5.0: Kruczkowski Compliance & Trap + Citation Verifier integration (Faza 2/3)
try:
if kruczkowski_trap_agent:
trap_result = kruczkowski_trap_agent.detect_traps(
document_text=state['content'][:4500],
program=state['program_name'] or "",
msp_context=state.get("msp_analysis") or state.get("external_context", {}).get("msp_analysis")
)
if trap_result.get("overall_trap_risk") in ("high", "critical"):
engine_compliance_check += f"\n\n--- KRUCZKOWSKI COMPLIANCE & TRAP v5.0 ---\nRisk: {trap_result['overall_trap_risk']} | Traps: {trap_result['num_traps']} | Blocks export: {trap_result.get('blocks_export_recommendation')}\nCitation score: {trap_result.get('citation_verification', {}).get('overall_citation_score')}\n[STRICT MODE: Zalecane dodatkowe sprawdzenie przez człowieka]"
# Zawsze wstrzykujemy citation grounding info
cit = trap_result.get("citation_verification", {})
if cit.get("overall_citation_score"):
engine_compliance_check += f"\n[CITATION GROUNDING: {cit.get('overall_citation_score')} ({cit.get('citation_quality')}) — {cit.get('recommendation', '')[:120]}]"
except Exception as _trap_e:
logger.debug(f"[Prawnik v5.0] Trap agent skipped: {_trap_e}")
# Trust Score injection (Cycle 10)
trust_score = compute_grant_trust_score({"program": state.get('program_name')})
trust_context = f"\n\n[Trust Score dla programu: {trust_score}/100 — im niższy, tym ostrożniej podchodzić do wniosków i rekomendacji]"
except Exception:
pass
prompt = f"""
Na podstawie zebranych dotychczas informacji i analizy (patrz historia, upewnij się, że opierasz się na zweryfikowanym prawie z narzędzia):
{conversation_text}
{engine_rules}
{engine_compliance_check}
{trust_context}
Wygeneruj ostateczny wynik audytu prawnego dla wniosku ({state['program_name']}) wg struktury.
Oceń projekt. Role: prawnik.
TREŚĆ:
{state['content'][:150000]}
"""
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=2, max=10),
reraise=True,
)
def invoke_eval():
result: _PerspectiveResult = llm.invoke(prompt)
if not result.summary or len(result.summary.strip()) < 10:
raise ValueError("Błąd sanity check: Puste podsumowanie audytu prawnego.")
for issue in result.issues:
issue.perspective = "prawnik"
# Jeśli silnik wykrył critical problem — obniżamy score
if "CRITICAL ISSUE" in engine_compliance_check:
result.partial_score = max(0, result.partial_score - 30)
return {
"issues": result.issues,
"perspectives_summary": {"prawnik": result.summary},
"perspective_scores": [result.partial_score],
"prawnik_done": True,
}
try:
return invoke_eval()
except Exception as e:
logger.error(f"[PRAWNIK] Ostateczny błąd ewaluatora: {e}")
return {
"prawnik_done": True,
"perspectives_summary": {
"prawnik": f"Błąd audytu prawnego po 5 próbach: {e}"
},
}
def prawnik_routing(state: AuditorPanelState) -> str:
"""Decyduje czy prawnik musi szukać dalej, oceniać czy przekroczył limit."""
last_message = state["messages"][-1]
if last_message.tool_calls:
if state["legal_attempts"] >= 3:
logger.warning(
"[PRAWNIK] Przekroczono limit wyszukiwań, wymuszam ewaluację."
)
return "evaluate"
return "tools"
return "evaluate"
# --- FINANSISTA NODE (Dynamic Query Routing) ---
def finansista_node(state: AuditorPanelState) -> Dict[str, Any]:
"""Agent Finansowy z obsługą poszukiwań w RAG (regulaminy finansowe)."""
llm_with_tools = get_llm(
task_type="legal_audit",
tools=[search_budget_rules, analyze_company_network, query_neo4j_graph],
)
# Inicjalizacja wiadomości, jeśli pierwsze wywołanie
messages = state.get("finansista_messages", [])
initial_messages_added = []
if not messages:
ext_prompt = (
"Zewnętrzny Rewizor: Weryfikujesz cudzy, gotowy wniosek (z biura konsultingowego) przesłany do nas w celu tzw. Reverse-Audit."
if state.get("is_external_audit", False)
else ""
)
sys_prompt = f"{_ROLE_PROMPTS['finansista']}\n{ext_prompt}\n{_SHARED_INSTRUCTIONS}\n\nProgram: {state['program_name']}\nZanim ocenisz wniosek, używaj narzędzia search_budget_rules aby sprawdzić zasady z budżetu programu. Aby zweryfikować MŚP z perspektywy finansowej na podstawie NIP/KRS uzyj analyze_company_network. Gdy będziesz gotowy zwrócić ocenę bez korzystania z narzędzia, powróć i wykonaj finalną ocenę strukturyzowaną."
initial_messages_added.append(SystemMessage(content=sys_prompt))
initial_messages_added.append(
HumanMessage(content=f"TREŚĆ WNIOSKU:\n{state['content'][:150000]}")
)
messages = initial_messages_added
try:
response = llm_with_tools.invoke(messages)
except Exception as e:
logger.error(f"[FINANSISTA] Błąd wywołania modelu: {e}")
response = AIMessage(
content=f"Wystąpił błąd podczas wywołania LLM: {e}. Przechodzę do podsumowania."
)
return {
"finansista_messages": initial_messages_added + [response],
"finansista_attempts": state.get("finansista_attempts", 0),
}
def finansista_tools_node(state: AuditorPanelState) -> Dict[str, Any]:
"""Uruchamia narzędzie wyszukiwania dla Finansisty."""
last_message = state["finansista_messages"][-1]
tool_messages = []
for tool_call in last_message.tool_calls:
if tool_call["name"] in [
"search_budget_rules",
"analyze_company_network",
"query_neo4j_graph",
]:
logger.info(
f"[FINANSISTA] Wykorzystanie narzędzia {tool_call['name']}: {tool_call['args']}"
)
try:
if tool_call["name"] == "search_budget_rules":
result = search_budget_rules.invoke(tool_call["args"])
elif tool_call["name"] == "analyze_company_network":
result = analyze_company_network.invoke(tool_call["args"])
else:
result = query_neo4j_graph.invoke(tool_call["args"])
except Exception as e:
result = f"Błąd wykonania narzędzia: {e}"
tool_messages.append(
ToolMessage(content=result, tool_call_id=tool_call["id"])
)
return {
"finansista_messages": tool_messages,
"finansista_attempts": state.get("finansista_attempts", 0) + 1,
"finansista_queries": [str(tc["args"]) for tc in last_message.tool_calls],
}
def finansista_evaluator_node(state: AuditorPanelState) -> Dict[str, Any]:
"""Generuje ostateczny Pydantic output Finansisty po zebraniu wiedzy z RAG."""
llm = get_llm(task_type="legal_audit", structured_output_schema=_PerspectiveResult)
conversation_text = "\n".join(
[
m.content
for m in state.get("finansista_messages", [])
if isinstance(m.content, str)
]
)
# Aktywne użycie RegulationEngine przy analizie budżetowej (Faza 3)
engine_context = ""
trust_context = ""
try:
eligibility = regulation_engine.check_cost_eligibility(state.get('program_name') or "", state.get('content', '')[:2000])
if eligibility.get("status") == "evaluated":
engine_context = f"\n\nREGULATION ENGINE BUDGET CHECK (źródło prawdy):\n eligible={eligibility.get('eligible')} severity={eligibility.get('severity')}\n justification: {eligibility.get('justification','')}\n reference: {eligibility.get('regulation_reference','')}\n rec: {eligibility.get('recommendation','')}\n"
# v5.0 Citation + Kruczkowski for finansista (budget traps)
try:
if kruczkowski_trap_agent:
t = kruczkowski_trap_agent.detect_traps(state.get('content', '')[:3000], state.get('program_name') or "")
if t.get("citation_verification"):
c = t["citation_verification"]
engine_context += f"\n[CITATION GROUNDING (fin): {c.get('overall_citation_score')} / {c.get('citation_quality')}]"
if t.get("overall_trap_risk") in ("high", "critical"):
engine_context += f"\n[KRUCZKOWSKI BUDGET TRAPS: {t.get('num_traps')}{t.get('overall_trap_risk')}]"
except Exception:
pass
# Trust Score injection (Cycle 10) — teraz z citation boost z v5.0
trust_score = compute_grant_trust_score({
"program": state.get('program_name'),
"citation_verification_score": (t.get("citation_verification", {}).get("overall_citation_score") if 't' in locals() else None)
})
trust_context = f"\n\n[Trust Score dla programu: {trust_score}/100 — niski score = wyższe ryzyko w rekomendacjach finansowych]"
except Exception:
pass
prompt = f"""
Na podstawie zebranych dotychczas informacji i analizy finansowej/budżetowej (patrz historia):
{conversation_text}
{engine_context}
{trust_context}
Wygeneruj ostateczny wynik audytu finansowego dla wniosku ({state['program_name']}) wg struktury.
Oceń projekt. Role: finansista.
TREŚĆ:
{state['content'][:150000]}
"""
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=2, max=10),
reraise=True,
)
def invoke_eval():
result: _PerspectiveResult = llm.invoke(prompt)
if not result.summary or len(result.summary.strip()) < 10:
raise ValueError(
"Błąd sanity check: Puste podsumowanie audytu finansowego."
)
for issue in result.issues:
issue.perspective = "finansista"
return {
"issues": result.issues,
"perspectives_summary": {"finansista": result.summary},
"perspective_scores": [result.partial_score],
"finansista_done": True,
}
try:
return invoke_eval()
except Exception as e:
logger.error(f"[FINANSISTA] Ostateczny błąd ewaluatora: {e}")
return {
"finansista_done": True,
"perspectives_summary": {
"finansista": f"Błąd audytu finansowego po 5 próbach: {e}"
},
}
def finansista_routing(state: AuditorPanelState) -> str:
"""Decyduje czy finansista musi szukać dalej, czy oceniać."""
last_message = state["finansista_messages"][-1]
if last_message.tool_calls:
if state.get("finansista_attempts", 0) >= 3:
logger.warning(
"[FINANSISTA] Przekroczono limit wyszukiwań, wymuszam ewaluację."
)
return "evaluate"
return "tools"
return "evaluate"
# --- INNOWATOR NODE (Dynamic Query Routing) ---
def innowator_node(state: AuditorPanelState) -> Dict[str, Any]:
"""Agent Technologiczny (Innowator) z obsługą poszukiwań w RAG (trendy, KIS, B+R)."""
llm_with_tools = get_llm(task_type="legal_audit", tools=[search_technology_trends])
messages = state.get("innowator_messages", [])
initial_messages_added = []
if not messages:
ext_prompt = (
"Zewnętrzny Rewizor: Weryfikujesz cudzy, gotowy wniosek (z biura konsultingowego) przesłany do nas w celu tzw. Reverse-Audit."
if state.get("is_external_audit", False)
else ""
)
sys_prompt = f"{_ROLE_PROMPTS['innowator']}\n{ext_prompt}\n{_SHARED_INSTRUCTIONS}\n\nProgram: {state['program_name']}\nZanim dokonasz oceny innowacyjności, użyj narzędzia search_technology_trends, aby zweryfikować czy technologia, poziom TRL lub KIS są poprawne dla tego programu. Kiedy będziesz gotowy zwrócić ocenę, powróć i wykonaj finalną ocenę strukturyzowaną."
initial_messages_added.append(SystemMessage(content=sys_prompt))
initial_messages_added.append(
HumanMessage(content=f"TREŚĆ WNIOSKU:\n{state['content'][:150000]}")
)
messages = initial_messages_added
try:
response = llm_with_tools.invoke(messages)
except Exception as e:
logger.error(f"[INNOWATOR] Błąd wywołania modelu: {e}")
response = AIMessage(
content=f"Wystąpił błąd podczas wywołania LLM: {e}. Przechodzę do podsumowania."
)
return {
"innowator_messages": initial_messages_added + [response],
"innowator_attempts": state.get("innowator_attempts", 0),
}
def innowator_tools_node(state: AuditorPanelState) -> Dict[str, Any]:
"""Uruchamia narzędzie wyszukiwania dla Innowatora."""
last_message = state["innowator_messages"][-1]
tool_messages = []
for tool_call in last_message.tool_calls:
if tool_call["name"] == "search_technology_trends":
logger.info(
f"[INNOWATOR] Wykorzystanie narzędzia {tool_call['name']}: {tool_call['args']}"
)
try:
result = search_technology_trends.invoke(tool_call["args"])
except Exception as e:
result = f"Błąd wykonania narzędzia: {e}"
tool_messages.append(
ToolMessage(content=result, tool_call_id=tool_call["id"])
)
return {
"innowator_messages": tool_messages,
"innowator_attempts": state.get("innowator_attempts", 0) + 1,
"innowator_queries": [str(tc["args"]) for tc in last_message.tool_calls],
}
def innowator_evaluator_node(state: AuditorPanelState) -> Dict[str, Any]:
"""Generuje ostateczny Pydantic output Innowatora po zebraniu wiedzy z RAG."""
llm = get_llm(task_type="legal_audit", structured_output_schema=_PerspectiveResult)
conversation_text = "\n".join(
[
m.content
for m in state.get("innowator_messages", [])
if isinstance(m.content, str)
]
)
# Aktywne użycie RegulationEngine również w perspektywie innowacyjnej (Faza 3)
engine_context = ""
trust_context = ""
try:
rules = regulation_engine.get_structured_rules_for_program(state.get('program_name') or "")
if rules and (rules.get("key_rules") or rules.get("scoring_criteria")):
engine_context = "\n\n--- STRUCTURED PROGRAM RULES (Regulation Engine) — upewnij się, że innowacyjność jest zgodna z celami i kryteriami programu:\n"
engine_context += "KEY RULES: " + "; ".join((rules.get("key_rules") or [])[:4])
engine_context += "\nSCORING: " + "; ".join((rules.get("scoring_criteria") or [])[:3])
# Trust Score injection (Cycle 10)
trust_score = compute_grant_trust_score({"program": state.get('program_name')})
trust_context = f"\n\n[Trust Score dla programu: {trust_score}/100 — niski score sugeruje większą ostrożność przy ocenie innowacyjności]"
except Exception:
pass
prompt = f"""
Na podstawie zebranych dotychczas informacji i analizy innowacyjnej/technologicznej (patrz historia):
{conversation_text}
{engine_context}
{trust_context}
Wygeneruj ostateczny wynik audytu innowacyjnego dla wniosku ({state['program_name']}) wg struktury.
Oceń projekt. Role: innowator.
TREŚĆ:
{state['content'][:150000]}
"""
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=2, max=10),
reraise=True,
)
def invoke_eval():
result: _PerspectiveResult = llm.invoke(prompt)
if not result.summary or len(result.summary.strip()) < 10:
raise ValueError(
"Błąd sanity check: Puste podsumowanie audytu innowacyjnego."
)
for issue in result.issues:
issue.perspective = "innowator"
return {
"issues": result.issues,
"perspectives_summary": {"innowator": result.summary},
"perspective_scores": [result.partial_score],
"innowator_done": True,
}
try:
return invoke_eval()
except Exception as e:
logger.error(f"[INNOWATOR] Ostateczny błąd ewaluatora: {e}")
return {
"innowator_done": True,
"perspectives_summary": {
"innowator": f"Błąd audytu innowacyjnego po 5 próbach: {e}"
},
}
def innowator_routing(state: AuditorPanelState) -> str:
"""Decyduje czy Innowator musi szukać dalej, czy oceniać."""
last_message = state["innowator_messages"][-1]
if last_message.tool_calls:
if state.get("innowator_attempts", 0) >= 3:
logger.warning(
"[INNOWATOR] Przekroczono limit wyszukiwań, wymuszam ewaluację."
)
return "evaluate"
return "tools"
return "evaluate"
# --- ZARZĄDZAJĄCY NODE ---
def zarzadzajacy_node(state: AuditorPanelState) -> Dict[str, Any]:
"""Reduktor zbierający wszystkie dane i tworzący GlobalAuditOutput."""
scores = state.get("perspective_scores", [])
issues = state.get("issues", [])
has_critical = any(i.severity == "critical" for i in issues)
if not scores:
overall_score = 0
else:
base = int(sum(scores) / len(scores))
overall_score = max(0, base - 20) if has_critical else base
export_status = "ok"
if has_critical:
export_status = "blocked"
elif any(i.severity == "high" for i in issues):
export_status = "warning"
final = GlobalAuditOutput(
is_approved=not has_critical,
export_status=export_status,
overall_score=overall_score,
confidence_score=0.9, # LangGraph gives high confidence theoretically
human_review_required=has_critical or overall_score < 60,
issues=issues,
perspectives_summary=state.get("perspectives_summary", {}),
)
return {"final_output": final}