grantforge-api / backend /agents /generator_agent.py
GrantForge Bot
Deploy to Hugging Face
afd56bc
import logging
from typing import Dict, List, TypedDict, Optional
from langchain_core.messages import SystemMessage, HumanMessage
from langgraph.graph import StateGraph, START, END
# Importy z rdzenia backendu
try:
from core.llm_router import get_llm
except ImportError:
from backend.core.llm_router import get_llm
try:
from core.utils import extract_markdown_and_sanitize
except ImportError:
from backend.core.utils import extract_markdown_and_sanitize
from tenacity import retry, stop_after_attempt, wait_exponential
try:
from rag_pipeline.vector_store import get_parent_document_retriever
except ImportError:
from backend.rag_pipeline.vector_store import get_parent_document_retriever
try:
from core.sensitive_data_guard import anonymizer
except ImportError:
try:
from backend.core.sensitive_data_guard import anonymizer
except ImportError:
anonymizer = None
try:
from core.audit_logger import audit_log
except ImportError:
audit_log = None
logger = logging.getLogger(__name__)
class GeneratorState(TypedDict):
"""Stan przepływu w LangGraph dla generatora wniosków."""
project_id: str
namespace: str
document_type: str
# Opis projektu wczytany z DB — anonymizowany przed wysłaniem do LLM
project_description: Optional[str]
sections_plan: List[dict]
current_section_idx: int
generated_sections: Dict[str, str]
context: str
is_completed: bool
missing_data_question: Optional[str]
additional_context: Optional[str]
traceability_data: Optional[Dict[str, List[dict]]]
from langgraph.checkpoint.memory import MemorySaver
# Global checkpointer so states survive across Agent instantiations
global_memory_saver = MemorySaver()
class DocumentGeneratorAgent:
"""
Agent korzystający z maszyn stanów LangGraph do wytwarzania długich
i sformalizowanych dokumentów, zabezpieczony danymi z RAG (Rząd RP/PARP).
"""
def __init__(self):
# Używamy najlepszego modelu, docelowo task_type="critical" by odpalić np. Gemini 1.5 Pro
# (albo dedykowany fine-tuned)
self.llm = get_llm(task_type="critical")
self.graph = self._build_graph()
def _build_graph(self):
workflow = StateGraph(GeneratorState)
# Rejestracja Węzłów
workflow.add_node("plan_document", self.plan_document)
workflow.add_node("fetch_context", self.fetch_context)
workflow.add_node("draft_section", self.draft_section)
workflow.add_node("resolve_missing_data", self.resolve_missing_data)
workflow.add_node("ask_missing_data", self.ask_missing_data)
# Sterowanie przepływem (Graf)
workflow.add_edge(START, "plan_document")
workflow.add_edge("plan_document", "fetch_context")
workflow.add_edge("fetch_context", "draft_section")
# Pętla warunkowa (Czy mamy jeszcze sekcje do wygenerowania, albo czy brakuje danych?)
workflow.add_conditional_edges(
"draft_section",
self._should_continue,
{
"continue": "fetch_context",
"resolve": "resolve_missing_data",
"end": END,
},
)
# Nowy warunek: po próbie auto-rozwiązania (Tool Calling), decyduje czy nadal pytać użytkownika
workflow.add_conditional_edges(
"resolve_missing_data",
lambda s: "pause" if s.get("missing_data_question") else "draft_section",
{
"pause": "ask_missing_data",
"draft_section": "draft_section"
}
)
workflow.add_edge("ask_missing_data", "draft_section")
return workflow.compile(
checkpointer=global_memory_saver, interrupt_before=["ask_missing_data"]
)
def resolve_missing_data(self, state: GeneratorState):
"""Nowy węzeł: Próbuje użyć narzędzi (Tool Calling) z KRS/GUS do zdobycia danych (np. członków zarządu), zamiast angażować użytkownika."""
logger.info("[Generator] Próba automatycznego znalezienia brakujących danych (GUS/KRS)...")
question = state.get("missing_data_question")
if not question:
return {"additional_context": state.get("additional_context")}
import re
from integrations.krs_client import KRSClient
from tools.company_search import fetch_regon_data
# Wyciągnięcie NIP/KRS z opisu
desc = state.get("project_description", "")
nip_match = re.search(r"NIP:\s*(\d{10})", desc)
auto_answer = ""
if nip_match:
nip = nip_match.group(1)
try:
# Najpierw REGON
regon_data = fetch_regon_data(nip)
if regon_data:
auto_answer += f"Z bazy GUS (NIP {nip}): {regon_data}\n"
# Potem KRS jeśli potrzebne (dane z odpisu aktualnego)
# Często w opisach padają słowa "członek zarządu", "wspólnik", "<OSOBA"
if "zarząd" in question.lower() or "osoba" in question.lower() or "wspólnik" in question.lower() or "reprezent" in question.lower():
# Krok 1: Próba znalezienia numeru KRS w opisie
krs_match = re.search(r"KRS:\s*(\d{10})", desc)
if krs_match:
krs = krs_match.group(1)
odpis = KRSClient.get_odpis_aktualny(krs)
if odpis:
relations = KRSClient.extract_graph_relations(odpis)
auto_answer += f"Z bazy KRS (KRS {krs}): Zarząd: {relations.get('zarzad')}, Wspólnicy: {relations.get('wspolnicy')}\n"
except Exception as e:
logger.warning(f"[Generator] Błąd auto-resolve z GUS/KRS: {e}")
if auto_answer:
logger.info(f"[Generator] Znalazłem dane automatycznie: {auto_answer}")
old_ctx = state.get("additional_context") or ""
new_ctx = old_ctx + f"\n\n[ZAUTOMATYZOWANA ODPOWIEDŹ NARZĘDZIA KRS/GUS na Twoje wątpliwości '{question}']:\n{auto_answer}"
return {
"missing_data_question": None, # usunięto pytanie, system kontynuuje bez użytkownika
"additional_context": new_ctx
}
# Jeśli narzędzia nie pomogły, przepuszczamy do zapytania użytkownika
return {"missing_data_question": question}
def ask_missing_data(self, state: GeneratorState):
"""Węzeł pauzujący działanie grafu przed wykonaniem (interrupt_before)."""
logger.info(
f"Pauza HIL. Oczekiwanie na dane: {state.get('missing_data_question')}"
)
# Po wznowieniu przez użytkownika (update state'u o 'additional_context'),
# czyścimy to zapytanie, żeby nie wpaść w pętlę.
return {"missing_data_question": None}
def plan_document(self, state: GeneratorState):
"""Krok 1: Inicjalizuje sekcje w zależności od zadanego wzorca."""
logger.info(f"Planowanie dokumentu: {state['document_type']}")
from core.telemetry import telemetry
telemetry.log(
"INFO",
"GeneratorAgent",
f"Tworzę plan dokumentu typu {state['document_type']}",
{"project_id": state["project_id"]},
)
plan = state.get("sections_plan", [])
if not plan:
doc_type = state["document_type"].lower()
if "wniosek" in doc_type and "smart" in doc_type:
plan = [
{"type": "project_summary", "title": "Opis projektu i jego celów"},
{"type": "innovation", "title": "Uzasadnienie innowacyjności"},
{"type": "applicant", "title": "Potencjał Wnioskodawcy (doświadczenie i zasoby)"},
{"type": "budget", "title": "Budżet i harmonogram"},
]
elif "biznesplan" in doc_type:
plan = [
{
"type": "executive_summary",
"title": "Streszczenie projektu",
},
{"type": "product", "title": "Opis produktu/usługi"},
{"type": "market", "title": "Analiza rynku i konkurencji"},
{"type": "finance", "title": "Plan finansowy"},
]
else:
plan = [
{"type": "intro", "title": "Wprowadzenie"},
{"type": "body", "title": "Rozwinięcie merytoryczne"},
{"type": "outro", "title": "Zakończenie i podsumowanie"},
]
return {
"sections_plan": plan,
"current_section_idx": state.get("current_section_idx", 0),
"generated_sections": state.get("generated_sections", {}),
"is_completed": False,
"missing_data_question": None,
"traceability_data": state.get("traceability_data", {}),
}
def fetch_context(self, state: GeneratorState):
"""Krok 2: Pobiera wycinki wiedzy z wejściowego RAG lub bazy aktów ISAP/PARP"""
from core.telemetry import telemetry
idx = state.get("current_section_idx", 0)
sections = state.get("sections_plan", [])
if not sections or idx >= len(sections):
logger.error(f"[Generator] IndexError in fetch_context: idx={idx}, sections_plan length={len(sections)}")
return {"context": "Błąd: Brak planu sekcji."}
section = sections[idx]
section_name = section["title"] if isinstance(section, dict) else section
section["type"] if isinstance(section, dict) else section_name
namespace = state["namespace"]
telemetry.log(
"INFO",
"GeneratorAgent",
f"Przeszukuję bazę wektorową (Pinecone) dla sekcji: {section_name}",
{"namespace": namespace},
)
logger.info(
f"Pobieranie kontekstu RAG dla sekcji '{section_name}' w namespace '{namespace}'."
)
try:
retriever = get_parent_document_retriever(namespace=namespace)
# Wzbogacamy zapytanie o kontekst firmy aby system szukał trafniejszych fragmentów
project_context = state.get("project_description", "")
# Ograniczamy długość kontekstu do kluczowych informacji aby nie rozmyć zapytania
short_context = project_context[:300] if project_context else ""
# Retrieval - szukamy najpierw specyfiki projektu w private_namespace
query = f"Regulamin i wytyczne dla sekcji: {section_name}. Typ dokumentu: {state['document_type']}. Kontekst i informacje do uwzględnienia (Kryteria, Budżet, Kwalifikowalność): {short_context}"
docs = retriever.invoke(query)
def _format_temporal(doc):
valid_from = doc.metadata.get("valid_from", "")
valid_to = doc.metadata.get("valid_to", "")
ver_id = doc.metadata.get("version_id", "")
time_str = ""
if valid_from or valid_to or ver_id:
time_str = f" [Wersja: {ver_id}, Ważne od: {valid_from} do: {valid_to}]"
return f"{doc.page_content}{time_str}"
context = "\n\n".join([_format_temporal(doc) for doc in docs])
import hashlib
from datetime import datetime
traceability = state.get("traceability_data", {}) or {}
current_traces = []
for doc in docs:
content_hash = hashlib.sha256(doc.page_content.encode('utf-8')).hexdigest()
current_traces.append({
"source": doc.metadata.get("source", "Własny dokument / RAG"),
"url": doc.metadata.get("url", "Brak linku"),
"date": doc.metadata.get("fetch_date", datetime.now().strftime("%Y-%m-%d")),
"hash": content_hash[:16],
"version_id": doc.metadata.get("version_id", ""),
"valid_from": doc.metadata.get("valid_from", ""),
"valid_to": doc.metadata.get("valid_to", "")
})
traceability[section_name] = current_traces
if not context.strip():
context = "Brak specyficznego kontekstu wgranej dokumentacji w RAG dla tej sekcji."
telemetry.log(
"INFO",
"Pinecone",
f"Pobrano {len(docs)} fragmentów z wektorowej bazy danych.",
)
except Exception as e:
logger.error(f"Błąd RAG pod względem '{namespace}': {e}")
telemetry.log("ERROR", "Pinecone", f"Błąd pobierania wektorów: {str(e)}")
context = (
"Brak połączenia z RAG. Generowanie oparto na wiedzy ogólnej modelu."
)
traceability = state.get("traceability_data", {}) or {}
return {"context": context, "traceability_data": traceability}
def draft_section(self, state: GeneratorState):
"""Krok 3: Generowanie sekcji za pomocą LLM.
Pipeline RODO (FAZA 1 Enterprise):
1. Anonymizuj opis projektu (NIP, PESEL, IBAN, nazwiska → tokeny)
2. Wyślij zanonimizowany tekst do LLM
3. Deanonymizuj wynik (tokeny → oryginalne wartości)
Dzięki temu dane PII nigdy nie trafiają do zewnętrznych API LLM.
"""
idx = state.get("current_section_idx", 0)
sections = state.get("sections_plan", [])
if not sections or idx >= len(sections):
logger.error(f"[Generator] IndexError in draft_section: idx={idx}, sections_plan length={len(sections)}")
return {
"generated_sections": state.get("generated_sections", {}),
"is_completed": True,
"missing_data_question": None
}
section = sections[idx]
section_name = section["title"] if isinstance(section, dict) else section
section["type"] if isinstance(section, dict) else section_name
context = state.get("context", "")
project_desc = state.get("project_description") or ""
# ── KROK 1: Anonymizuj opis projektu przed LLM ─────────────────────
anon_desc = project_desc
if anonymizer and project_desc:
try:
anon_desc = anonymizer.anonymize_text(project_desc)
if anon_desc != project_desc:
logger.info(
f"[Generator][PII] Opis projektu '{state['project_id']}' "
f"zanonimizowany przed wysłaniem do LLM."
)
if audit_log:
audit_log(
"GENERATOR_PII_ANON",
f"Projekt: {state['project_id']} | Sekcja: {section_name}",
)
except Exception as e:
logger.warning(
f"[Generator][PII] Anonimizacja nieudana: {e} — kontynuuję bez maskowania."
)
anon_desc = project_desc
from pydantic import BaseModel, Field
from core.telemetry import telemetry
class GeneratedSection(BaseModel):
content_markdown: Optional[str] = Field(
None,
description="Zredagowany tekst sekcji w formacie Markdown. ZAWSZE WYPEŁNIJ to pole, nawet jeśli brakuje danych (użyj znaczników [UZUPEŁNIĆ: co brakuje]). BEZWZGLĘDNIE PISZ TYLKO W JĘZYKU POLSKIM (włączając w to nagłówki i tytuły).",
)
missing_data_question: Optional[str] = Field(
None,
description="Jeśli brakuje Ci krytycznych danych, wpisz tu ostrzeżenie, ale i tak wygeneruj `content_markdown` ze znacznikami. BEZWZGLĘDNIE PISZ TYLKO W JĘZYKU POLSKIM.",
)
system_prompt = (
"Jesteś profesjonalnym doradcą dotacyjnym (Consultant AI) specjalizującym się w funduszach UE.\n"
"Odpowiadasz za najwyższą korporacyjną jakość we wnioskach dotacyjnych.\n"
f"Mamy dokument typu: '{state['document_type']}'.\n"
f"Obecnie przygotowujesz dokładnie i wyczerpująco sekcję: '{section_name}'.\n\n"
"Wytyczne:\n"
" - Wykorzystaj dostarczony Kontekst RAG (fragmenty regulaminów i wytycznych programu).\n"
" - Zadbaj o analityczny, sformalizowany ton z punktami i statystykami gdzie to możliwe.\n"
" - STOSUJ BOGATE FORMATOWANIE MARKDOWN: używaj profesjonalnych nagłówków (###, ####), tabel dla danych liczbowych, list punktowanych i pogrubień (bold) dla kluczowych wskaźników. Dokument ma wyglądać nieskazitelnie, czytelnie i estetycznie!\n"
" - Jeżeli w zanonimizowanym opisie projektu znajduje się nazwa firmy lub jej token (np. <FIRMA_1>), BEZWZGLĘDNIE i NATURALNIE wplataj go w treść sekcji.\n"
" - Jeżeli napotkasz tokeny anonimizacji typu <NIP_1>, <OSOBA_1>, UŻYJ ICH dosłownie.\n"
" - Zakaz wymyślania danych (NIP, daty, kwoty, nazwy firm). Jeśli brakuje danych (np. danych z GUS/KRS takich jak dokładni wspólnicy, NIP, PKD, adres, czy szczegółowe parametry maszyny), użyj placeholderów w formacie [UZUPEŁNIĆ: Czego brakuje]. Opcję 'missing_data_question' traktuj jako OSTATECZNOŚĆ i używaj TYLKO W KRYTYCZNYCH PRZYPADKACH.\n"
" - >>> BEZWZGLĘDNIE GENERUJ CAŁĄ TREŚĆ WYŁĄCZNIE W JĘZYKU POLSKIM. <<<\n"
" - ZABRANIA SIĘ używania słów w języku angielskim. Wszystkie nagłówki, tabele, teksty i podsumowania MUSZĄ być po polsku.\n"
" - NIGDY NIE ZWRACAJ ANGIELSKICH TYTUŁÓW SEKCJI. Masz kategoryczny nakaz użycia oryginalnego, polskiego tytułu sekcji, nad którym pracujesz.\n"
" - ZADBÓJ O ZWIĘZŁOŚĆ NAGŁÓWKÓW: Ogranicz długość nagłówków/tytułów sekcji do maksymalnie 5 wyrazów.\n"
" - W PRZYPADKU ODPOWIEDZI UŻYTKOWNIKA NA 'missing_data_question': Pamiętaj, aby ZACHOWAĆ dotychczasowy styl sekcji i wpleść nowe informacje spójnie.\n"
)
additional_context = state.get("additional_context", "")
human_content = f"Kontekst RAG:\n{context}"
if anon_desc:
human_content += f"\n\nOpis projektu (zanonimizowany):\n{anon_desc}"
if additional_context:
human_content += (
f"\n\nDodatkowe odpowiedzi od użytkownika:\n{additional_context}"
)
telemetry.log(
"INFO",
"GeneratorAgent",
"Rozpoczynam draftowanie sekcji",
{"project_id": state["project_id"], "section": section_name},
)
logger.info(
f"[Generator] Draftowanie sekcji: '{section_name}' (projekt: {state['project_id']})"
)
# ── KROK 2: Wywołanie LLM z walidacją schematu ────────────────────
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
)
def _generate():
try:
structured_llm = get_llm(
task_type="critical", structured_output_schema=GeneratedSection
)
response = structured_llm.invoke(
[
SystemMessage(content=system_prompt),
HumanMessage(content=human_content),
]
)
if response.missing_data_question:
telemetry.log(
"WARN",
"GeneratorAgent",
"Wykryto brak danych, ale generowanie jest kontynuowane.",
{"question": response.missing_data_question},
)
section_content_temp = response.content_markdown or ""
if not section_content_temp and response.missing_data_question:
section_content_temp = f"**Brakujące dane:** {response.missing_data_question}\n\n*Proszę uzupełnić tę sekcję ręcznie lub podać wymagane informacje w opisie projektu.*"
else:
section_content_temp = extract_markdown_and_sanitize(
section_content_temp
)
return section_content_temp, response.missing_data_question
except Exception as e:
logger.warning(
f"[Generator] with_structured_output failed for '{section_name}': {e}. Attempting fallback without structured output."
)
fallback_response = self.llm.invoke(
[
SystemMessage(
content=system_prompt
+ "\nZwróć TYLKO treść sekcji w formacie Markdown (bez pytań o brakujące dane, spróbuj sobie poradzić). BEZWZGLĘDNIE PISZ TYLKO W JĘZYKU POLSKIM. DO NOT USE ENGLISH."
),
HumanMessage(content=human_content),
]
)
section_content_temp = (
fallback_response.content
if hasattr(fallback_response, "content")
else str(fallback_response)
)
return extract_markdown_and_sanitize(section_content_temp), None
try:
# INTEGRACJA: Faza 4 - Moduł Analityka Finansowego
if section["type"] in ["budget", "finance"]:
try:
from backend.agents.finance_agent import finance_agent
except ImportError:
from agents.finance_agent import finance_agent
logger.info(f"[Generator] Delegowanie sekcji '{section_name}' do Agenta Finansowego.")
section_content, missing_question = finance_agent.draft_financial_section(
document_type=state['document_type'],
section_name=section_name,
project_desc=anon_desc,
context=context
)
else:
section_content, missing_question = _generate()
except Exception as e_fallback:
logger.error(
f"[Generator] Fallback LLM failure for section '{section_name}': {e_fallback}"
)
telemetry.log(
"ERROR",
"GeneratorAgent",
"Błąd LLM podczas generowania",
{"error": str(e_fallback)},
)
section_content = (
f"*(Wystąpił błąd API podczas generowania sekcji: {str(e_fallback)})*"
)
missing_question = None
# ── KROK 3: Deanonymizuj wynik (tokeny → oryginalne wartości) ────
if anonymizer and project_desc and anon_desc != project_desc:
try:
section_content = anonymizer.deanonymize_text(section_content)
logger.info(
f"[Generator][PII] Sekcja '{section_name}' deanonimizowana po LLM."
)
except Exception as e:
logger.warning(
f"[Generator][PII] Deanonymizacja nieudana: {e} — zwracam zanonimizowaną wersję."
)
# ── Aktualizacja stanu ────────────────────────────────────────────
current_sections = dict(state.get("generated_sections", {}))
current_sections[section_name] = section_content
if missing_question:
next_idx = idx
is_completed = False
else:
next_idx = idx + 1
is_completed = next_idx >= len(state["sections_plan"])
telemetry.log(
"INFO",
"GeneratorAgent",
"Zakończono draftowanie sekcji",
{"project_id": state["project_id"], "section": section_name},
)
return {
"generated_sections": current_sections,
"current_section_idx": next_idx,
"is_completed": is_completed,
"missing_data_question": missing_question, # store the question in state instead of clearing
}
def _should_continue(self, state: GeneratorState):
"""Sprawdzak czy istnieją kolejne sekcje do procedowania w Grafie."""
if state.get("missing_data_question"):
return "resolve"
return "end" if state.get("is_completed") else "continue"
def provide_human_response(self, thread_id: str, response: str):
"""Aktualizuje stan grafu o odpowiedź użytkownika."""
config = {"configurable": {"thread_id": thread_id}}
# Oczyszczamy pytanie i dodajemy kontekst do stanu wstrzymanego węzła
self.graph.update_state(
config,
{"additional_context": response, "missing_data_question": None},
as_node="ask_missing_data"
)
async def astm_stream(
self,
initial_state: Optional[GeneratorState],
thread_id: str,
resume: bool = False,
):
"""Uruchamia graf w trybie streamingowym zdarzeń żeby zasilić SSE (Server-Sent Events)"""
config = {"configurable": {"thread_id": thread_id}, "recursion_limit": 150}
# Ochrona przed błędem `Received no input for __start__` gdy MemorySaver jest wyczyszczony.
state = self.graph.get_state(config)
if resume and not state.values:
logger.warning(f"Zażądano wznowienia dla wątku {thread_id}, ale brak stanu w checkpointerze. Resetujemy strumień z initial_state.")
input_data = initial_state
else:
input_data = None if resume else initial_state
async for event in self.graph.astream_events(
input_data, version="v2", config=config
):
yield event