grantforge-api / backend /agents /helpers.py
GrantForge Bot
Deploy to Hugging Face
afd56bc
from schemas import AgentState, CompanyProfile, CriticFeedback
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.prompts import PromptTemplate
from core.llm_router import get_llm
from rag_pipeline import get_hybrid_retriever, rerank_documents
from agents.wizard import wizard_node
from agents.critic import critic_node
import json
import os
from langsmith import traceable
from langchain_core.tracers.langchain import LangChainTracer
# Włącz tracing LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "false"
os.environ["LANGCHAIN_PROJECT"] = "grantforge-production"
# Opcjonalnie – jeśli chcesz zobaczyć dokładne nazwy runów
tracer = LangChainTracer(project_name="grantforge-production")
ANTI_HALLUCINATION_PROMPT = """
BEZWZGLĘDNA ZASADA (ANTI-HALLUCINATION / GROUNDED GENERATION):
Jesteś surowym audytorem dotacyjnym. Masz bezwzględny zakaz korzystania z jakiejkolwiek wiedzy spoza dostarczonego kontekstu (baza wiedzy RAG / pliki projektu).
Jeśli informacja nie wynika wprost z podanych dokumentów lub metadanych – odpowiedz dokładnie: "Brak wystarczających informacji w aktualnych zasobach". Nie wolno Ci zgadywać kwot, terminów ani warunków kwalifikowalności.
"""
@traceable(
run_type="chain",
name="generate_section",
tags=["rag_pipeline", "faithfulness_target"],
)
def generate_section(
project_id: str,
section_type: str,
context: str,
external_context: dict = None,
program_name: str = None,
user_id: str = "",
) -> str:
"""
Wywołuje logikę Wizard z RAG tylko dla pojedynczej sekcji, bez przelotu przez cały Graf.
"""
from core.telemetry import telemetry
telemetry.log(
"INFO",
"Helpers",
f"Rozpoczynamy generowanie sekcji: {section_type}",
{"project_id": project_id},
)
company_data_str = ""
if external_context:
if (
"project_description" in external_context
and external_context["project_description"]
):
company_data_str += f"INFORMACJE OGÓLNE O PROJEKCIE (wpisane przez użytkownika):\n{external_context['project_description']}\n\n"
if (
"current_section_content" in external_context
and external_context["current_section_content"]
):
company_data_str += f"OBECNA TREŚĆ SEKCJI (Zastosuj ewentualne poprawki do tego tekstu, zachowując jego spójność):\n{external_context['current_section_content']}\n\n"
if "company_data" in external_context:
company_data_str += f"Dane z GUS (kontekst o firmie wnioskodawcy):\n{json.dumps(external_context['company_data'], indent=2, ensure_ascii=False)}\n\n"
if "resources" in external_context and external_context["resources"]:
company_data_str += "Zasoby projektu (dostarczone pliki):\n"
for res in external_context["resources"]:
# Limitujemy tekst wyciągnięty z pliku żeby nie wysadzić okna kontekstowego dla gigantycznych plików (opcjonalnie, ale dobra praktyka)
text_clip = res.get("extracted_text") or ""
if len(text_clip) > 5000:
text_clip = text_clip[:5000] + "... [UKRÓCONO]"
company_data_str += (
f"--- PLIK: {res.get('filename')} ---\n{text_clip}\n\n"
)
if company_data_str:
company_data_str += "INSTRUKCJA PRIORYTETU: Dane z sekcji company_data oraz lista resources mają najwyższy priorytet. Używaj ich zawsze jako głównego źródła informacji o firmie. Wiedza ogólna z RAG jest tylko pomocnicza.\n\n"
program_context = (
f"\n\nWAŻNE! PROJEKT DOTYCZY PROGRAMU:\n{program_name}\nBezwzględnie dostosuj narrację, słownictwo oraz rozłożenie akcentów we wniosku do specyfiki, wytycznych i głównego celu tego konkretnego programu. Unikaj żargonu z innych typów dotacji, chyba że wprost tu pasuje.\n"
if program_name
else ""
)
initial_prompt = f"Wygeneruj merytoryczną treść dla sekcji '{section_type}'. Kontekst szczegółowy: {context}\n\n{company_data_str}{program_context}\nWAŻNE: PISZ ZAWSZE W JĘZYKU POLSKIM."
tenant_ns = f"tenant_{user_id}_{project_id}" if user_id and project_id else ""
state = AgentState(
messages=[HumanMessage(content=initial_prompt)],
user_id=user_id,
tenant_id=tenant_ns,
profile=CompanyProfile(
nip=external_context.get("company_data", {}).get("nip", "0000000000") if external_context else "0000000000",
voivodeship=external_context.get("company_data", {}).get("voivodeship", "Mazowieckie") if external_context else "Mazowieckie"
),
)
from core.utils import extract_markdown_and_sanitize
from core.circuit_breaker import with_llm_retry, llm_circuit_breaker
import logging
logger = logging.getLogger(__name__)
@llm_circuit_breaker
@with_llm_retry
def invoke_with_watchdog():
result = wizard_node(state)
new_messages = result.get("messages", [])
if new_messages and len(new_messages) > 0:
last_msg = new_messages[-1]
if isinstance(last_msg, dict) and "content" in last_msg:
raw_c = last_msg["content"]
elif hasattr(last_msg, "content"):
raw_c = last_msg.content
else:
raw_c = ""
# Weryfikujemy i wyciągamy Markdown
sanitized = extract_markdown_and_sanitize(raw_c)
# Sanity Checks: Blokada pustych odpowiedzi i typowych odmów
if not sanitized or len(sanitized.strip()) < 20:
logger.warning(
f"Watchdog: Otrzymano zbyt krótką/pustą odpowiedź (długość: {len(sanitized)}). Wymuszam ponowienie."
)
telemetry.log(
"WARN",
"Watchdog",
"Zbyt krótka odpowiedź. Wymuszam ponowienie.",
{"project_id": project_id},
)
raise ValueError(
"Błąd sanity check: Pusta lub zbyt krótka odpowiedź z modelu."
)
lower_c = sanitized.lower()
refusals = [
"nie potrafię",
"nie jestem w stanie",
"nie mogę",
"as an ai",
"jako model językowy",
]
if any(r in lower_c for r in refusals) and len(sanitized) < 200:
logger.warning(
"Watchdog: Wykryto typową odmowę LLM. Wymuszam ponowienie."
)
raise ValueError(
"Błąd sanity check: Model odmówił wygenerowania odpowiedzi."
)
return sanitized
raise ValueError("Brak odpowiedzi tekstowej w strukturze wizarda")
try:
return invoke_with_watchdog()
except Exception as e:
logger.error(f"Nie powiodła się generacja sekcji: {e}")
telemetry.log(
"ERROR", "Helpers", f"Błąd generacji: {str(e)}", {"project_id": project_id}
)
return "Nie powiodła się generacja sekcji po 5 próbach. Przepraszamy za utrudnienia, spróbuj ponownie."
@traceable(
run_type="chain", name="review_section", tags=["rag_pipeline", "faithfulness_eval"]
)
def review_section(project_id: str, section_id: str, content: str) -> CriticFeedback:
"""
Wywołuje logikę recenzenta Critic w celu ewaluacji dostarczonego tekstu wniosku.
"""
from core.telemetry import telemetry
telemetry.log(
"INFO",
"Helpers",
f"Rozpoczynamy recenzję sekcji: {section_id}",
{"project_id": project_id},
)
# Critic expects the text to be in the last AI message
state = AgentState(
messages=[AIMessage(content=content)],
user_id="",
tenant_id="",
critic_iterations=0,
)
from core.circuit_breaker import with_llm_retry, llm_circuit_breaker
import logging
logger = logging.getLogger(__name__)
@llm_circuit_breaker
@with_llm_retry
def invoke_critic():
result = critic_node(state)
critic_eval = result.get("critic_evaluation")
if not critic_eval:
raise ValueError("Brak feedbacku od krytyka")
# Sanity check
if (
critic_eval.feedback
and len(critic_eval.feedback.strip()) < 10
and not critic_eval.is_approved
):
logger.warning(
"Watchdog: Pusty feedback mimo odrzucenia. Wymuszam ponowienie."
)
telemetry.log(
"WARN",
"Watchdog",
"Pusty feedback. Wymuszam ponowienie.",
{"project_id": project_id},
)
raise ValueError(
"Błąd sanity check: Krytyk odrzucił tekst, ale nie podał powodu."
)
return critic_eval
try:
return invoke_critic()
except Exception as e:
logger.error(f"Krytyk zawiódł po 5 próbach: {e}")
return CriticFeedback(
is_approved=True,
feedback="Brak feedbacku - problem techniczny. Zatwierdzono warunkowo.",
severity="low",
)
@traceable(run_type="chain", name="project_qa_agent")
def project_qa_agent(
project_id: str,
question: str,
program_name: str,
context: str,
external_context: dict = None,
) -> dict:
"""
Weryfikator - odpowiada na pytania związane z projektem na bazie dokumentacji konkursowej i regulaminów (RAG)
oraz kontekstu samego projektu (zdefiniowane sekcje wniosku).
Zwraca ustrukturyzowaną odpowiedź w formacie słownika.
"""
hard_filter = {"program_name": program_name} if program_name else None
retriever = get_hybrid_retriever(metadata_filter=hard_filter)
# Rozszerzamy zapytanie do wektorów o program i dotacje by zwiększyć precyzję wyszukiwania
search_query = f"{program_name} {question}" if program_name else question
rag_context = ""
sources_used = []
if retriever:
try:
# Wyszukanie odpowiednich dokumentów w RAG
docs = retriever.invoke(search_query)
reranked_docs = rerank_documents(search_query, docs, top_n=4)
def _format_temporal(d):
valid_from = d.metadata.get("valid_from", "")
valid_to = d.metadata.get("valid_to", "")
ver_id = d.metadata.get("version_id", "")
time_str = ""
if valid_from or valid_to or ver_id:
time_str = f" [Wersja: {ver_id}, Ważne od: {valid_from} do: {valid_to}]"
return f"SOURCE ({d.metadata.get('source', 'Unknown')} | {d.metadata.get('program_name', 'System')}){time_str}: {d.page_content}"
rag_context = "\n\n".join([_format_temporal(d) for d in reranked_docs])
# Zbudowanie listy unikalnych źrodeł
unique_sources = set()
for d in reranked_docs:
src_name = d.metadata.get("source", "Nieznane źródło")
if src_name:
unique_sources.add(src_name)
sources_used = list(unique_sources)
except Exception as e:
rag_context = f"[Brak wyników z bazy wiedzy. Błąd RAG: {str(e)}]"
else:
rag_context = "[Baza wektorowa niedostępna]"
from schemas import ProjectQAResponse
# Używamy modelu o wysokiej precyzji do analityki
llm = get_llm(task_type="critical", structured_output_schema=ProjectQAResponse)
company_info = ""
if external_context:
if (
"project_description" in external_context
and external_context["project_description"]
):
company_info += f"INFORMACJE OGÓLNE O PROJEKCIE (wpisane przez użytkownika):\n{external_context['project_description']}\n\n"
if "company_data" in external_context:
company_info += f"DANE FIRMY WNIOSKODAWCY (Z GUS):\n{json.dumps(external_context['company_data'], indent=2, ensure_ascii=False)}\n\n"
if "resources" in external_context and external_context["resources"]:
company_info += "ZASOBY PROJEKTU (dostarczone pliki wg użytkownika):\n"
for res in external_context["resources"]:
text_clip = res.get("extracted_text") or ""
if len(text_clip) > 3000:
text_clip = text_clip[:3000] + "... [UKRÓCONO]"
company_info += f"--- PLIK: {res.get('filename')} ---\n{text_clip}\n\n"
if company_info:
company_info += "INSTRUKCJA PRIORYTETU: Dane z sekcji company_data oraz lista resources mają najwyższy priorytet. Używaj ich zawsze jako głównego źródła informacji o firmie. Wiedza ogólna z RAG jest tylko pomocnicza.\n\n"
template = (
ANTI_HALLUCINATION_PROMPT
+ """
Jesteś ekspertowym doradcą ds. dotacji unijnych, realizacji oraz rozliczania projektów R&D. Twoim zadaniem jest odpowiedź na Pytanie w odniesieniu do projektu klienta.
WAŻNE ZASADY:
1. Używaj tylko najnowszych, aktualnych regulaminów z bazy wiedzy. Jeśli dostarczone dokumenty RAG wydają się przestarzałe, zachowaj ostrożność.
2. Zawsze cytuj konkretne paragrafy, punkty i nazwy dokumentów w polu "sources".
3. Jeśli nie jesteś w 100% pewien odpowiedzi na podstawie przepisów, ZAWSZE o tym napisz (nie zgaduj).
4. Dostosowuj odpowiedź do etapu projektu (czy to etap przygotowania wniosku, czy już realizacja i rozliczanie wydatków).
5. Traktuj dane w KONTEKST PROJEKTU jako źródło nadrzędne. Nie czepiaj się brakujących danych identyfikacyjnych (KRS, NIP, adres itp.), odpowiedz na pytanie merytorycznie.
6. BEZWZGLĘDNIE ODPOWIADAJ WYŁĄCZNIE W JĘZYKU POLSKIM I TYLKO NA TEMAT. Jeśli pytanie odbiega od funduszy, projektu lub dotacji, grzecznie wskaż, że jesteś specjalistą tylko od dotacji i odmów innej dyskusji.
KONTEKST PROJEKTU (dane i treść wniosku):
-------------------
{company_info}{project_context}
-------------------
BIEŻĄCE WYTYCZNE Z BAZY WIEDZY RAG (przepisy):
-------------------
{rag_context}
-------------------
Pytanie od użytkownika:
{question}
"""
)
from core.circuit_breaker import with_llm_retry, llm_circuit_breaker
structured_llm = llm
prompt = PromptTemplate.from_template(template)
chain = prompt | structured_llm
@llm_circuit_breaker
@with_llm_retry
def invoke_qa():
response: ProjectQAResponse = chain.invoke(
{
"company_info": company_info,
"project_context": context,
"rag_context": rag_context,
"question": question,
}
)
if hasattr(response, "model_dump"):
parsed_out = response.model_dump()
elif hasattr(response, "dict"):
parsed_out = response.dict()
else:
parsed_out = dict(response)
# Sanity check
answer_text = parsed_out.get("answer", "")
if not answer_text or len(answer_text.strip()) < 10:
raise ValueError(
"Błąd sanity check: Odpowiedź Q&A jest pusta lub zbyt krótka."
)
# Jeśli źródła RAG coś znalazły, ale LLM nic nie podał, sklei to
if not parsed_out.get("sources") and sources_used:
parsed_out["sources"] = sources_used
return parsed_out
try:
return invoke_qa()
except Exception as e:
import traceback
traceback.print_exc()
print(
f"Wystąpił błąd structured_output w project_qa_agent: {e}, próba fallbacku..."
)
try:
# Fallback bez with_structured_output
fallback_chain = prompt | llm
raw_response = fallback_chain.invoke(
{
"company_info": company_info,
"project_context": context,
"rag_context": rag_context,
"question": question,
}
)
return {
"answer": raw_response.content
if hasattr(raw_response, "content")
else str(raw_response),
"sources": sources_used,
"confidence": 0.5,
"recommendation": "Odpowiedź wygenerowana w trybie awaryjnym (fallback). Mogą brakować szczegółowych źródeł wygenerowanych przez AI.",
}
except Exception:
traceback.print_exc()
# Ostateczny awaryjny powrót
return {
"answer": f"Awaria strukturalnego formatowania odpowiedzi modelu i trybu awaryjnego: {str(e)}",
"sources": sources_used,
"confidence": 0.0,
"recommendation": "Spróbuj sformułować pytanie w prostszy sposób. (Sprawdzanie strukturalne zabezpieczyło przed błędem)",
}