import logging from typing import Dict, List, TypedDict, Optional from langchain_core.messages import SystemMessage, HumanMessage from langgraph.graph import StateGraph, START, END # Importy z rdzenia backendu try: from core.llm_router import get_llm except ImportError: from backend.core.llm_router import get_llm try: from core.utils import extract_markdown_and_sanitize except ImportError: from backend.core.utils import extract_markdown_and_sanitize from tenacity import retry, stop_after_attempt, wait_exponential try: from rag_pipeline.vector_store import get_parent_document_retriever except ImportError: from backend.rag_pipeline.vector_store import get_parent_document_retriever try: from core.sensitive_data_guard import anonymizer except ImportError: try: from backend.core.sensitive_data_guard import anonymizer except ImportError: anonymizer = None try: from core.audit_logger import audit_log except ImportError: audit_log = None logger = logging.getLogger(__name__) class GeneratorState(TypedDict): """Stan przepływu w LangGraph dla generatora wniosków.""" project_id: str namespace: str document_type: str # Opis projektu wczytany z DB — anonymizowany przed wysłaniem do LLM project_description: Optional[str] sections_plan: List[dict] current_section_idx: int generated_sections: Dict[str, str] context: str is_completed: bool missing_data_question: Optional[str] additional_context: Optional[str] traceability_data: Optional[Dict[str, List[dict]]] from langgraph.checkpoint.memory import MemorySaver # Global checkpointer so states survive across Agent instantiations global_memory_saver = MemorySaver() class DocumentGeneratorAgent: """ Agent korzystający z maszyn stanów LangGraph do wytwarzania długich i sformalizowanych dokumentów, zabezpieczony danymi z RAG (Rząd RP/PARP). """ def __init__(self): # Używamy najlepszego modelu, docelowo task_type="critical" by odpalić np. Gemini 1.5 Pro # (albo dedykowany fine-tuned) self.llm = get_llm(task_type="critical") self.graph = self._build_graph() def _build_graph(self): workflow = StateGraph(GeneratorState) # Rejestracja Węzłów workflow.add_node("plan_document", self.plan_document) workflow.add_node("fetch_context", self.fetch_context) workflow.add_node("draft_section", self.draft_section) workflow.add_node("resolve_missing_data", self.resolve_missing_data) workflow.add_node("ask_missing_data", self.ask_missing_data) # Sterowanie przepływem (Graf) workflow.add_edge(START, "plan_document") workflow.add_edge("plan_document", "fetch_context") workflow.add_edge("fetch_context", "draft_section") # Pętla warunkowa (Czy mamy jeszcze sekcje do wygenerowania, albo czy brakuje danych?) workflow.add_conditional_edges( "draft_section", self._should_continue, { "continue": "fetch_context", "resolve": "resolve_missing_data", "end": END, }, ) # Nowy warunek: po próbie auto-rozwiązania (Tool Calling), decyduje czy nadal pytać użytkownika workflow.add_conditional_edges( "resolve_missing_data", lambda s: "pause" if s.get("missing_data_question") else "draft_section", { "pause": "ask_missing_data", "draft_section": "draft_section" } ) workflow.add_edge("ask_missing_data", "draft_section") return workflow.compile( checkpointer=global_memory_saver, interrupt_before=["ask_missing_data"] ) def resolve_missing_data(self, state: GeneratorState): """Nowy węzeł: Próbuje użyć narzędzi (Tool Calling) z KRS/GUS do zdobycia danych (np. członków zarządu), zamiast angażować użytkownika.""" logger.info("[Generator] Próba automatycznego znalezienia brakujących danych (GUS/KRS)...") question = state.get("missing_data_question") if not question: return {"additional_context": state.get("additional_context")} import re from integrations.krs_client import KRSClient from tools.company_search import fetch_regon_data # Wyciągnięcie NIP/KRS z opisu desc = state.get("project_description", "") nip_match = re.search(r"NIP:\s*(\d{10})", desc) auto_answer = "" if nip_match: nip = nip_match.group(1) try: # Najpierw REGON regon_data = fetch_regon_data(nip) if regon_data: auto_answer += f"Z bazy GUS (NIP {nip}): {regon_data}\n" # Potem KRS jeśli potrzebne (dane z odpisu aktualnego) # Często w opisach padają słowa "członek zarządu", "wspólnik", "= len(sections): logger.error(f"[Generator] IndexError in fetch_context: idx={idx}, sections_plan length={len(sections)}") return {"context": "Błąd: Brak planu sekcji."} section = sections[idx] section_name = section["title"] if isinstance(section, dict) else section section["type"] if isinstance(section, dict) else section_name namespace = state["namespace"] telemetry.log( "INFO", "GeneratorAgent", f"Przeszukuję bazę wektorową (Pinecone) dla sekcji: {section_name}", {"namespace": namespace}, ) logger.info( f"Pobieranie kontekstu RAG dla sekcji '{section_name}' w namespace '{namespace}'." ) try: retriever = get_parent_document_retriever(namespace=namespace) # Wzbogacamy zapytanie o kontekst firmy aby system szukał trafniejszych fragmentów project_context = state.get("project_description", "") # Ograniczamy długość kontekstu do kluczowych informacji aby nie rozmyć zapytania short_context = project_context[:300] if project_context else "" # Retrieval - szukamy najpierw specyfiki projektu w private_namespace query = f"Regulamin i wytyczne dla sekcji: {section_name}. Typ dokumentu: {state['document_type']}. Kontekst i informacje do uwzględnienia (Kryteria, Budżet, Kwalifikowalność): {short_context}" docs = retriever.invoke(query) def _format_temporal(doc): valid_from = doc.metadata.get("valid_from", "") valid_to = doc.metadata.get("valid_to", "") ver_id = doc.metadata.get("version_id", "") time_str = "" if valid_from or valid_to or ver_id: time_str = f" [Wersja: {ver_id}, Ważne od: {valid_from} do: {valid_to}]" return f"{doc.page_content}{time_str}" context = "\n\n".join([_format_temporal(doc) for doc in docs]) import hashlib from datetime import datetime traceability = state.get("traceability_data", {}) or {} current_traces = [] for doc in docs: content_hash = hashlib.sha256(doc.page_content.encode('utf-8')).hexdigest() current_traces.append({ "source": doc.metadata.get("source", "Własny dokument / RAG"), "url": doc.metadata.get("url", "Brak linku"), "date": doc.metadata.get("fetch_date", datetime.now().strftime("%Y-%m-%d")), "hash": content_hash[:16], "version_id": doc.metadata.get("version_id", ""), "valid_from": doc.metadata.get("valid_from", ""), "valid_to": doc.metadata.get("valid_to", "") }) traceability[section_name] = current_traces if not context.strip(): context = "Brak specyficznego kontekstu wgranej dokumentacji w RAG dla tej sekcji." telemetry.log( "INFO", "Pinecone", f"Pobrano {len(docs)} fragmentów z wektorowej bazy danych.", ) except Exception as e: logger.error(f"Błąd RAG pod względem '{namespace}': {e}") telemetry.log("ERROR", "Pinecone", f"Błąd pobierania wektorów: {str(e)}") context = ( "Brak połączenia z RAG. Generowanie oparto na wiedzy ogólnej modelu." ) traceability = state.get("traceability_data", {}) or {} return {"context": context, "traceability_data": traceability} def draft_section(self, state: GeneratorState): """Krok 3: Generowanie sekcji za pomocą LLM. Pipeline RODO (FAZA 1 Enterprise): 1. Anonymizuj opis projektu (NIP, PESEL, IBAN, nazwiska → tokeny) 2. Wyślij zanonimizowany tekst do LLM 3. Deanonymizuj wynik (tokeny → oryginalne wartości) Dzięki temu dane PII nigdy nie trafiają do zewnętrznych API LLM. """ idx = state.get("current_section_idx", 0) sections = state.get("sections_plan", []) if not sections or idx >= len(sections): logger.error(f"[Generator] IndexError in draft_section: idx={idx}, sections_plan length={len(sections)}") return { "generated_sections": state.get("generated_sections", {}), "is_completed": True, "missing_data_question": None } section = sections[idx] section_name = section["title"] if isinstance(section, dict) else section section["type"] if isinstance(section, dict) else section_name context = state.get("context", "") project_desc = state.get("project_description") or "" # ── KROK 1: Anonymizuj opis projektu przed LLM ───────────────────── anon_desc = project_desc if anonymizer and project_desc: try: anon_desc = anonymizer.anonymize_text(project_desc) if anon_desc != project_desc: logger.info( f"[Generator][PII] Opis projektu '{state['project_id']}' " f"zanonimizowany przed wysłaniem do LLM." ) if audit_log: audit_log( "GENERATOR_PII_ANON", f"Projekt: {state['project_id']} | Sekcja: {section_name}", ) except Exception as e: logger.warning( f"[Generator][PII] Anonimizacja nieudana: {e} — kontynuuję bez maskowania." ) anon_desc = project_desc from pydantic import BaseModel, Field from core.telemetry import telemetry class GeneratedSection(BaseModel): content_markdown: Optional[str] = Field( None, description="Zredagowany tekst sekcji w formacie Markdown. ZAWSZE WYPEŁNIJ to pole, nawet jeśli brakuje danych (użyj znaczników [UZUPEŁNIĆ: co brakuje]). BEZWZGLĘDNIE PISZ TYLKO W JĘZYKU POLSKIM (włączając w to nagłówki i tytuły).", ) missing_data_question: Optional[str] = Field( None, description="Jeśli brakuje Ci krytycznych danych, wpisz tu ostrzeżenie, ale i tak wygeneruj `content_markdown` ze znacznikami. BEZWZGLĘDNIE PISZ TYLKO W JĘZYKU POLSKIM.", ) system_prompt = ( "Jesteś profesjonalnym doradcą dotacyjnym (Consultant AI) specjalizującym się w funduszach UE.\n" "Odpowiadasz za najwyższą korporacyjną jakość we wnioskach dotacyjnych.\n" f"Mamy dokument typu: '{state['document_type']}'.\n" f"Obecnie przygotowujesz dokładnie i wyczerpująco sekcję: '{section_name}'.\n\n" "Wytyczne:\n" " - Wykorzystaj dostarczony Kontekst RAG (fragmenty regulaminów i wytycznych programu).\n" " - Zadbaj o analityczny, sformalizowany ton z punktami i statystykami gdzie to możliwe.\n" " - STOSUJ BOGATE FORMATOWANIE MARKDOWN: używaj profesjonalnych nagłówków (###, ####), tabel dla danych liczbowych, list punktowanych i pogrubień (bold) dla kluczowych wskaźników. Dokument ma wyglądać nieskazitelnie, czytelnie i estetycznie!\n" " - Jeżeli w zanonimizowanym opisie projektu znajduje się nazwa firmy lub jej token (np. ), BEZWZGLĘDNIE i NATURALNIE wplataj go w treść sekcji.\n" " - Jeżeli napotkasz tokeny anonimizacji typu , , UŻYJ ICH dosłownie.\n" " - Zakaz wymyślania danych (NIP, daty, kwoty, nazwy firm). Jeśli brakuje danych (np. danych z GUS/KRS takich jak dokładni wspólnicy, NIP, PKD, adres, czy szczegółowe parametry maszyny), użyj placeholderów w formacie [UZUPEŁNIĆ: Czego brakuje]. Opcję 'missing_data_question' traktuj jako OSTATECZNOŚĆ i używaj TYLKO W KRYTYCZNYCH PRZYPADKACH.\n" " - >>> BEZWZGLĘDNIE GENERUJ CAŁĄ TREŚĆ WYŁĄCZNIE W JĘZYKU POLSKIM. <<<\n" " - ZABRANIA SIĘ używania słów w języku angielskim. Wszystkie nagłówki, tabele, teksty i podsumowania MUSZĄ być po polsku.\n" " - NIGDY NIE ZWRACAJ ANGIELSKICH TYTUŁÓW SEKCJI. Masz kategoryczny nakaz użycia oryginalnego, polskiego tytułu sekcji, nad którym pracujesz.\n" " - ZADBÓJ O ZWIĘZŁOŚĆ NAGŁÓWKÓW: Ogranicz długość nagłówków/tytułów sekcji do maksymalnie 5 wyrazów.\n" " - W PRZYPADKU ODPOWIEDZI UŻYTKOWNIKA NA 'missing_data_question': Pamiętaj, aby ZACHOWAĆ dotychczasowy styl sekcji i wpleść nowe informacje spójnie.\n" ) additional_context = state.get("additional_context", "") human_content = f"Kontekst RAG:\n{context}" if anon_desc: human_content += f"\n\nOpis projektu (zanonimizowany):\n{anon_desc}" if additional_context: human_content += ( f"\n\nDodatkowe odpowiedzi od użytkownika:\n{additional_context}" ) telemetry.log( "INFO", "GeneratorAgent", "Rozpoczynam draftowanie sekcji", {"project_id": state["project_id"], "section": section_name}, ) logger.info( f"[Generator] Draftowanie sekcji: '{section_name}' (projekt: {state['project_id']})" ) # ── KROK 2: Wywołanie LLM z walidacją schematu ──────────────────── @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), ) def _generate(): try: structured_llm = get_llm( task_type="critical", structured_output_schema=GeneratedSection ) response = structured_llm.invoke( [ SystemMessage(content=system_prompt), HumanMessage(content=human_content), ] ) if response.missing_data_question: telemetry.log( "WARN", "GeneratorAgent", "Wykryto brak danych, ale generowanie jest kontynuowane.", {"question": response.missing_data_question}, ) section_content_temp = response.content_markdown or "" if not section_content_temp and response.missing_data_question: section_content_temp = f"**Brakujące dane:** {response.missing_data_question}\n\n*Proszę uzupełnić tę sekcję ręcznie lub podać wymagane informacje w opisie projektu.*" else: section_content_temp = extract_markdown_and_sanitize( section_content_temp ) return section_content_temp, response.missing_data_question except Exception as e: logger.warning( f"[Generator] with_structured_output failed for '{section_name}': {e}. Attempting fallback without structured output." ) fallback_response = self.llm.invoke( [ SystemMessage( content=system_prompt + "\nZwróć TYLKO treść sekcji w formacie Markdown (bez pytań o brakujące dane, spróbuj sobie poradzić). BEZWZGLĘDNIE PISZ TYLKO W JĘZYKU POLSKIM. DO NOT USE ENGLISH." ), HumanMessage(content=human_content), ] ) section_content_temp = ( fallback_response.content if hasattr(fallback_response, "content") else str(fallback_response) ) return extract_markdown_and_sanitize(section_content_temp), None try: # INTEGRACJA: Faza 4 - Moduł Analityka Finansowego if section["type"] in ["budget", "finance"]: try: from backend.agents.finance_agent import finance_agent except ImportError: from agents.finance_agent import finance_agent logger.info(f"[Generator] Delegowanie sekcji '{section_name}' do Agenta Finansowego.") section_content, missing_question = finance_agent.draft_financial_section( document_type=state['document_type'], section_name=section_name, project_desc=anon_desc, context=context ) else: section_content, missing_question = _generate() except Exception as e_fallback: logger.error( f"[Generator] Fallback LLM failure for section '{section_name}': {e_fallback}" ) telemetry.log( "ERROR", "GeneratorAgent", "Błąd LLM podczas generowania", {"error": str(e_fallback)}, ) section_content = ( f"*(Wystąpił błąd API podczas generowania sekcji: {str(e_fallback)})*" ) missing_question = None # ── KROK 3: Deanonymizuj wynik (tokeny → oryginalne wartości) ──── if anonymizer and project_desc and anon_desc != project_desc: try: section_content = anonymizer.deanonymize_text(section_content) logger.info( f"[Generator][PII] Sekcja '{section_name}' deanonimizowana po LLM." ) except Exception as e: logger.warning( f"[Generator][PII] Deanonymizacja nieudana: {e} — zwracam zanonimizowaną wersję." ) # ── Aktualizacja stanu ──────────────────────────────────────────── current_sections = dict(state.get("generated_sections", {})) current_sections[section_name] = section_content if missing_question: next_idx = idx is_completed = False else: next_idx = idx + 1 is_completed = next_idx >= len(state["sections_plan"]) telemetry.log( "INFO", "GeneratorAgent", "Zakończono draftowanie sekcji", {"project_id": state["project_id"], "section": section_name}, ) return { "generated_sections": current_sections, "current_section_idx": next_idx, "is_completed": is_completed, "missing_data_question": missing_question, # store the question in state instead of clearing } def _should_continue(self, state: GeneratorState): """Sprawdzak czy istnieją kolejne sekcje do procedowania w Grafie.""" if state.get("missing_data_question"): return "resolve" return "end" if state.get("is_completed") else "continue" def provide_human_response(self, thread_id: str, response: str): """Aktualizuje stan grafu o odpowiedź użytkownika.""" config = {"configurable": {"thread_id": thread_id}} # Oczyszczamy pytanie i dodajemy kontekst do stanu wstrzymanego węzła self.graph.update_state( config, {"additional_context": response, "missing_data_question": None}, as_node="ask_missing_data" ) async def astm_stream( self, initial_state: Optional[GeneratorState], thread_id: str, resume: bool = False, ): """Uruchamia graf w trybie streamingowym zdarzeń żeby zasilić SSE (Server-Sent Events)""" config = {"configurable": {"thread_id": thread_id}, "recursion_limit": 150} # Ochrona przed błędem `Received no input for __start__` gdy MemorySaver jest wyczyszczony. state = self.graph.get_state(config) if resume and not state.values: logger.warning(f"Zażądano wznowienia dla wątku {thread_id}, ale brak stanu w checkpointerze. Resetujemy strumień z initial_state.") input_data = initial_state else: input_data = None if resume else initial_state async for event in self.graph.astream_events( input_data, version="v2", config=config ): yield event