from schemas import AgentState, CompanyProfile, CriticFeedback from langchain_core.messages import HumanMessage, AIMessage from langchain_core.prompts import PromptTemplate from core.llm_router import get_llm from rag_pipeline import get_hybrid_retriever, rerank_documents from core.search.regulation_engine import regulation_engine from core.trust.trust_scorer import compute_grant_trust_score from agents.wizard import wizard_node from agents.critic import critic_node import json import os from langsmith import traceable from langchain_core.tracers.langchain import LangChainTracer # Włącz tracing LangSmith os.environ["LANGCHAIN_TRACING_V2"] = "false" os.environ["LANGCHAIN_PROJECT"] = "grantforge-production" # Opcjonalnie – jeśli chcesz zobaczyć dokładne nazwy runów tracer = LangChainTracer(project_name="grantforge-production") ANTI_HALLUCINATION_PROMPT = """ BEZWZGLĘDNA ZASADA (ANTI-HALLUCINATION / GROUNDED GENERATION): Jesteś surowym audytorem dotacyjnym. Masz bezwzględny zakaz korzystania z jakiejkolwiek wiedzy spoza dostarczonego kontekstu (baza wiedzy RAG / pliki projektu). Jeśli informacja nie wynika wprost z podanych dokumentów lub metadanych – odpowiedz dokładnie: "Brak wystarczających informacji w aktualnych zasobach". Nie wolno Ci zgadywać kwot, terminów ani warunków kwalifikowalności. """ @traceable( run_type="chain", name="generate_section", tags=["rag_pipeline", "faithfulness_target"], ) def generate_section_light( section_type: str, context: str, external_context: dict = None, program_name: str = None, ) -> str: """ Lekka, stabilna ścieżka generowania sekcji. Używana domyślnie w okienku asystenta projektu i jako fallback. """ from core.llm_router import get_llm from langchain_core.messages import HumanMessage llm = get_llm(task_type="fast") company = external_context.get("company_data", {}) if external_context else {} prompt = f"""Jesteś ekspertem przygotowującym wnioski o dofinansowanie. Napisz konkretną, profesjonalną treść do sekcji: **{section_type}** Kontekst projektu: {context[:4000] if context else "Brak szczegółowego opisu"} Dane wnioskodawcy: Nazwa: {company.get('name', 'Wnioskodawca')} NIP: {company.get('nip', '')} Województwo: {company.get('voivodeship', '')} PKD: {', '.join(company.get('pkd', [])[:3]) if company.get('pkd') else ''} Program: {program_name or 'wniosek dotacyjny'} Pisz po polsku, merytorycznie, w stylu urzędowego wniosku. Bądź konkretny.""" try: resp = llm.invoke([HumanMessage(content=prompt)]) content = resp.content if hasattr(resp, "content") else str(resp) return content except Exception as e: return f"Nie udało się wygenerować sekcji w trybie lekkim. Błąd: {str(e)}" def generate_section( project_id: str, section_type: str, context: str, external_context: dict = None, program_name: str = None, user_id: str = "", ) -> str: """ Wywołuje logikę Wizard z RAG tylko dla pojedynczej sekcji, bez przelotu przez cały Graf. """ from core.telemetry import telemetry telemetry.log( "INFO", "Helpers", f"Rozpoczynamy generowanie sekcji: {section_type}", {"project_id": project_id}, ) company_data_str = "" if external_context: if ( "project_description" in external_context and external_context["project_description"] ): company_data_str += f"INFORMACJE OGÓLNE O PROJEKCIE (wpisane przez użytkownika):\n{external_context['project_description']}\n\n" if ( "current_section_content" in external_context and external_context["current_section_content"] ): company_data_str += f"OBECNA TREŚĆ SEKCJI (Zastosuj ewentualne poprawki do tego tekstu, zachowując jego spójność):\n{external_context['current_section_content']}\n\n" if "company_data" in external_context: company_data_str += f"Dane z GUS (kontekst o firmie wnioskodawcy):\n{json.dumps(external_context['company_data'], indent=2, ensure_ascii=False)}\n\n" if "resources" in external_context and external_context["resources"]: company_data_str += "Zasoby projektu (dostarczone pliki):\n" for res in external_context["resources"]: # Limitujemy tekst wyciągnięty z pliku żeby nie wysadzić okna kontekstowego dla gigantycznych plików (opcjonalnie, ale dobra praktyka) text_clip = res.get("extracted_text") or "" if len(text_clip) > 5000: text_clip = text_clip[:5000] + "... [UKRÓCONO]" company_data_str += ( f"--- PLIK: {res.get('filename')} ---\n{text_clip}\n\n" ) if company_data_str: company_data_str += "INSTRUKCJA PRIORYTETU: Dane z sekcji company_data oraz lista resources mają najwyższy priorytet. Używaj ich zawsze jako głównego źródła informacji o firmie. Wiedza ogólna z RAG jest tylko pomocnicza.\n\n" program_context = ( f"\n\nWAŻNE! PROJEKT DOTYCZY PROGRAMU:\n{program_name}\nBezwzględnie dostosuj narrację, słownictwo oraz rozłożenie akcentów we wniosku do specyfiki, wytycznych i głównego celu tego konkretnego programu. Unikaj żargonu z innych typów dotacji, chyba że wprost tu pasuje.\n" if program_name else "" ) # === Faza 1 / Punkt 2 + Faza 3: Regulation Context + Structured Engine Rules === regulation_section_context = "" if program_name: try: from core.search.regulation_context_provider import get_regulation_context_for_candidates, format_regulation_context_for_prompt fake_candidates = [{"id": "current_section", "program": program_name, "name": section_type}] reg_ctx = get_regulation_context_for_candidates( fake_candidates, project_description=context, k_per_grant=3 ) regulation_section_context = "\n\n" + format_regulation_context_for_prompt(reg_ctx, "current_section") # Nowe: Structured rules + eligibility checker z Engine (szczególnie ważne przy budżecie) rules = regulation_engine.get_structured_rules_for_program(program_name) if rules and rules.get("key_rules"): regulation_section_context += "\n\nStrukturalne reguły z Regulation Engine:\n" + "\n".join(rules["key_rules"][:5]) # Trust Score context (Cycle 9) try: ext = external_context or {} ts = compute_grant_trust_score({ "regulation_link_quality": ext.get("regulation_link_quality", "medium"), "precise_regulation_url": ext.get("precise_regulation_url") }) regulation_section_context += f"\n\n[Trust Score dla tego programu: {ts}/100 — im wyższy, tym bezpieczniej cytować regulamin]" except Exception: pass # Bezpośrednie użycie checkera kosztów przy sekcjach budżetowych (aktywna integracja Faza 3) if section_type in ["budget", "budget_details", "koszty", "montaż finansowy"] and context: try: eligibility = regulation_engine.check_cost_eligibility(program_name or "", context[:1500]) if eligibility.get("status") == "evaluated": reg_text = eligibility.get('raw_response', '') regulation_section_context += f"\n\nWeryfikacja kwalifikowalności kosztów (Regulation Engine):\n{reg_text}" # Aktywne zachowanie: jeśli silnik widzi ryzyko — dajemy silną instrukcję generatorowi if "Niekwalifikowalny" in reg_text or "Wymaga weryfikacji" in reg_text: regulation_section_context += ( "\n\nWAŻNA INSTRUKCJA: Powyższa weryfikacja Regulation Engine wskazuje na potencjalne ryzyko niekwalifikowalności. " "Bądź bardzo konserwatywny w opisie kosztów. Oznacz elementy wymagające potwierdzenia przez użytkownika. " "Nie twierdź kategorycznie, że koszty są kwalifikowalne jeśli silnik ma wątpliwości." ) except Exception: pass except Exception as e: import logging logging.getLogger(__name__).warning(f"Nie udało się pobrać regulation context dla sekcji: {e}") # === Faza 3 final (cykl automatyczny): Regulation Grounding Certificate (najwyższa wiarygodność) === grounding_certificate = "" try: from core.search.regulation_snapshot import regulation_snapshot_store snap = regulation_snapshot_store.get_latest_for_program(program_name or "") if program_name else None cert_lines = [] if snap: cert_lines.append("REGULATION GROUNDING CERTIFICATE v1") cert_lines.append(f"Program: {program_name}") cert_lines.append(f"Snapshot ID: {snap.id}") cert_lines.append(f"Version Hash: {snap.version_hash}") cert_lines.append(f"Fetched: {snap.fetched_at}") cert_lines.append(f"Source: {snap.source_url[:80]}") if snap.effective_date: cert_lines.append(f"Effective Date: {snap.effective_date}") if snap.document_version: cert_lines.append(f"Document Version: {snap.document_version}") if snap.source_institution: cert_lines.append(f"Institution: {snap.source_institution}") cert_lines.append(f"Key Rules Cited: {len(snap.key_rules or [])}") cert_lines.append(f"Exclusions Known: {len(snap.exclusions or [])}") # Dodaj wynik aktywnego checku silnika (jeśli dotyczy budżetu lub kwalifikowalności) if section_type in ["budget", "budget_details", "koszty", "montaż finansowy", "kwalifikowalność"] and program_name: try: el = regulation_engine.check_cost_eligibility(program_name, context[:1200] if context else "") if el.get("status") == "evaluated": cert_lines.append(f"Engine Cost Check: eligible={el.get('eligible')} severity={el.get('severity')}") cert_lines.append(f"Engine Ref: {el.get('regulation_reference', '')[:120]}") except Exception: pass if cert_lines: grounding_certificate = "\n\n" + "\n".join(cert_lines) + "\n--- END CERTIFICATE ---\n" # Dodajemy też do kontekstu promptu, żeby model wiedział, że musi to zachować regulation_section_context += grounding_certificate except Exception as cert_e: import logging logging.getLogger(__name__).debug(f"Grounding certificate generation skipped: {cert_e}") initial_prompt = f"Wygeneruj merytoryczną treść dla sekcji '{section_type}'. Kontekst szczegółowy: {context}\n\n{company_data_str}{program_context}{regulation_section_context}\nWAŻNE: PISZ ZAWSZE W JĘZYKU POLSKIM. Na końcu wygenerowanej sekcji ZAWSZE dołącz (lub zachowaj) Regulation Grounding Certificate jeśli był podany w kontekście." tenant_ns = f"tenant_{user_id}_{project_id}" if user_id and project_id else "" state = AgentState( messages=[HumanMessage(content=initial_prompt)], user_id=user_id, tenant_id=tenant_ns, profile=CompanyProfile( nip=external_context.get("company_data", {}).get("nip", "0000000000") if external_context else "0000000000", voivodeship=external_context.get("company_data", {}).get("voivodeship", "Mazowieckie") if external_context else "Mazowieckie" ), ) from core.utils import extract_markdown_and_sanitize from core.circuit_breaker import with_llm_retry, llm_circuit_breaker import logging logger = logging.getLogger(__name__) @llm_circuit_breaker @with_llm_retry def invoke_with_watchdog(): result = wizard_node(state) new_messages = result.get("messages", []) if new_messages and len(new_messages) > 0: last_msg = new_messages[-1] if isinstance(last_msg, dict) and "content" in last_msg: raw_c = last_msg["content"] elif hasattr(last_msg, "content"): raw_c = last_msg.content else: raw_c = "" # Weryfikujemy i wyciągamy Markdown sanitized = extract_markdown_and_sanitize(raw_c) # Sanity Checks: Blokada pustych odpowiedzi i typowych odmów if not sanitized or len(sanitized.strip()) < 20: logger.warning( f"Watchdog: Otrzymano zbyt krótką/pustą odpowiedź (długość: {len(sanitized)}). Wymuszam ponowienie." ) telemetry.log( "WARN", "Watchdog", "Zbyt krótka odpowiedź. Wymuszam ponowienie.", {"project_id": project_id}, ) raise ValueError( "Błąd sanity check: Pusta lub zbyt krótka odpowiedź z modelu." ) lower_c = sanitized.lower() refusals = [ "nie potrafię", "nie jestem w stanie", "nie mogę", "as an ai", "jako model językowy", ] if any(r in lower_c for r in refusals) and len(sanitized) < 200: logger.warning( "Watchdog: Wykryto typową odmowę LLM. Wymuszam ponowienie." ) raise ValueError( "Błąd sanity check: Model odmówił wygenerowania odpowiedzi." ) # === Faza 3: Wymuszony Regulation Grounding Certificate na wyjściu (nawet jeśli LLM pominął) === if grounding_certificate and "--- END CERTIFICATE ---" not in sanitized: sanitized = sanitized.rstrip() + "\n\n" + grounding_certificate.strip() return sanitized raise ValueError("Brak odpowiedzi tekstowej w strukturze wizarda") try: return invoke_with_watchdog() except Exception as e: logger.error(f"Nie powiodła się generacja sekcji (wizard_node + RAG): {e}", exc_info=True) telemetry.log( "ERROR", "Helpers", f"Błąd generacji sekcji: {str(e)}", {"project_id": project_id, "section": section_type} ) # Fallback: prostsza generacja bezpośrednia LLM bez ciężkiego RAG/wizard_node try: from core.llm_router import get_llm from langchain_core.messages import HumanMessage as FallbackHumanMessage llm = get_llm(task_type="fast") simple_prompt = f"""Napisz profesjonalną treść do sekcji '{section_type}' wniosku dotacyjnego. Kontekst projektu: {context[:3000] if context else "Brak szczegółowego kontekstu"} Dane firmy: {json.dumps(external_context.get("company_data", {}), ensure_ascii=False) if external_context else "Brak danych firmy"} Program: {program_name or "Ogólny"} Napisz po polsku, konkretnie i merytorycznie. Jeśli to możliwe, uwzględnij podstawowe wymogi kwalifikowalności.""" response = llm.invoke([FallbackHumanMessage(content=simple_prompt)]) content = response.content if hasattr(response, "content") else str(response) if grounding_certificate: content += "\n\n" + grounding_certificate logger.warning(f"[Helpers] Użyto fallbackowej generacji sekcji {section_type} po awarii głównej ścieżki.") return content + "\n\n---\n*Wygenerowano w trybie awaryjnym (uproszczonym). Zalecana weryfikacja.*" except Exception as fallback_e: logger.error(f"Fallback generation also failed: {fallback_e}") # Ostateczny komunikat if grounding_certificate: return f"Generacja sekcji nie powiodła się po wyczerpaniu prób (główna ścieżka + fallback).\n\nBłąd: {str(e)}\n\nDane użyte w próbie:\n{grounding_certificate}" return "Nie powiodła się generacja sekcji po wyczerpaniu prób. Przepraszamy za utrudnienia. Spróbuj ponownie później lub z prostszym promptem." @traceable( run_type="chain", name="review_section", tags=["rag_pipeline", "faithfulness_eval"] ) def review_section(project_id: str, section_id: str, content: str) -> CriticFeedback: """ Wywołuje logikę recenzenta Critic w celu ewaluacji dostarczonego tekstu wniosku. """ from core.telemetry import telemetry telemetry.log( "INFO", "Helpers", f"Rozpoczynamy recenzję sekcji: {section_id}", {"project_id": project_id}, ) # Critic expects the text to be in the last AI message state = AgentState( messages=[AIMessage(content=content)], user_id="", tenant_id="", critic_iterations=0, ) from core.circuit_breaker import with_llm_retry, llm_circuit_breaker import logging logger = logging.getLogger(__name__) @llm_circuit_breaker @with_llm_retry def invoke_critic(): result = critic_node(state) critic_eval = result.get("critic_evaluation") if not critic_eval: raise ValueError("Brak feedbacku od krytyka") # Sanity check if ( critic_eval.feedback and len(critic_eval.feedback.strip()) < 10 and not critic_eval.is_approved ): logger.warning( "Watchdog: Pusty feedback mimo odrzucenia. Wymuszam ponowienie." ) telemetry.log( "WARN", "Watchdog", "Pusty feedback. Wymuszam ponowienie.", {"project_id": project_id}, ) raise ValueError( "Błąd sanity check: Krytyk odrzucił tekst, ale nie podał powodu." ) return critic_eval try: return invoke_critic() except Exception as e: logger.error(f"Krytyk zawiódł po 5 próbach: {e}") return CriticFeedback( is_approved=True, feedback="Brak feedbacku - problem techniczny. Zatwierdzono warunkowo.", severity="low", ) @traceable(run_type="chain", name="project_qa_agent") def project_qa_agent( project_id: str, question: str, program_name: str, context: str, external_context: dict = None, ) -> dict: """ Weryfikator - odpowiada na pytania związane z projektem na bazie dokumentacji konkursowej i regulaminów (RAG) oraz kontekstu samego projektu (zdefiniowane sekcje wniosku). Zwraca ustrukturyzowaną odpowiedź w formacie słownika. """ hard_filter = {"program_name": program_name} if program_name else None retriever = get_hybrid_retriever(metadata_filter=hard_filter) # Rozszerzamy zapytanie do wektorów o program i dotacje by zwiększyć precyzję wyszukiwania search_query = f"{program_name} {question}" if program_name else question rag_context = "" sources_used = [] if retriever: try: # Wyszukanie odpowiednich dokumentów w RAG docs = retriever.invoke(search_query) reranked_docs = rerank_documents(search_query, docs, top_n=4) def _format_temporal(d): valid_from = d.metadata.get("valid_from", "") valid_to = d.metadata.get("valid_to", "") ver_id = d.metadata.get("version_id", "") time_str = "" if valid_from or valid_to or ver_id: time_str = f" [Wersja: {ver_id}, Ważne od: {valid_from} do: {valid_to}]" return f"SOURCE ({d.metadata.get('source', 'Unknown')} | {d.metadata.get('program_name', 'System')}){time_str}: {d.page_content}" rag_context = "\n\n".join([_format_temporal(d) for d in reranked_docs]) # Zbudowanie listy unikalnych źrodeł unique_sources = set() for d in reranked_docs: src_name = d.metadata.get("source", "Nieznane źródło") if src_name: unique_sources.add(src_name) sources_used = list(unique_sources) except Exception as e: rag_context = f"[Brak wyników z bazy wiedzy. Błąd RAG: {str(e)}]" else: rag_context = "[Baza wektorowa niedostępna]" from schemas import ProjectQAResponse # Używamy modelu o wysokiej precyzji do analityki llm = get_llm(task_type="critical", structured_output_schema=ProjectQAResponse) company_info = "" if external_context: if ( "project_description" in external_context and external_context["project_description"] ): company_info += f"INFORMACJE OGÓLNE O PROJEKCIE (wpisane przez użytkownika):\n{external_context['project_description']}\n\n" if "company_data" in external_context: company_info += f"DANE FIRMY WNIOSKODAWCY (Z GUS):\n{json.dumps(external_context['company_data'], indent=2, ensure_ascii=False)}\n\n" if "resources" in external_context and external_context["resources"]: company_info += "ZASOBY PROJEKTU (dostarczone pliki wg użytkownika):\n" for res in external_context["resources"]: text_clip = res.get("extracted_text") or "" if len(text_clip) > 3000: text_clip = text_clip[:3000] + "... [UKRÓCONO]" company_info += f"--- PLIK: {res.get('filename')} ---\n{text_clip}\n\n" if company_info: company_info += ( "INSTRUKCJA PRIORYTETU (WAŻNE DLA AUTOPILOTU):\n" "- Dane z sekcji company_data, project_description oraz wszystkie informacje wpisane przez użytkownika na początku mają ABSOLUTNY priorytet.\n" "- Jeśli jakaś informacja (adres, forma prawna, wspólnicy, budżet, opis projektu itp.) już istnieje w powyższym kontekście — NIE pytaj użytkownika ponownie. Używaj jej bezpośrednio.\n" "- Wiedza ogólna z RAG jest tylko pomocnicza.\n\n" ) template = ( ANTI_HALLUCINATION_PROMPT + """ Jesteś ekspertowym doradcą ds. dotacji unijnych, realizacji oraz rozliczania projektów R&D. Twoim zadaniem jest odpowiedź na Pytanie w odniesieniu do projektu klienta. WAŻNE ZASADY: 1. Używaj tylko najnowszych, aktualnych regulaminów z bazy wiedzy. Jeśli dostarczone dokumenty RAG wydają się przestarzałe, zachowaj ostrożność. 2. Zawsze cytuj konkretne paragrafy, punkty i nazwy dokumentów w polu "sources". 3. Jeśli nie jesteś w 100% pewien odpowiedzi na podstawie przepisów, ZAWSZE o tym napisz (nie zgaduj). 4. Dostosowuj odpowiedź do etapu projektu (czy to etap przygotowania wniosku, czy już realizacja i rozliczanie wydatków). 5. Traktuj dane w KONTEKST PROJEKTU jako źródło nadrzędne. Nie czepiaj się brakujących danych identyfikacyjnych (KRS, NIP, adres itp.), odpowiedz na pytanie merytorycznie. 6. BEZWZGLĘDNIE ODPOWIADAJ WYŁĄCZNIE W JĘZYKU POLSKIM I TYLKO NA TEMAT. Jeśli pytanie odbiega od funduszy, projektu lub dotacji, grzecznie wskaż, że jesteś specjalistą tylko od dotacji i odmów innej dyskusji. KONTEKST PROJEKTU (dane i treść wniosku): ------------------- {company_info}{project_context} ------------------- BIEŻĄCE WYTYCZNE Z BAZY WIEDZY RAG (przepisy): ------------------- {rag_context} ------------------- Pytanie od użytkownika: {question} """ ) from core.circuit_breaker import with_llm_retry, llm_circuit_breaker structured_llm = llm prompt = PromptTemplate.from_template(template) chain = prompt | structured_llm @llm_circuit_breaker @with_llm_retry def invoke_qa(): response: ProjectQAResponse = chain.invoke( { "company_info": company_info, "project_context": context, "rag_context": rag_context, "question": question, } ) if hasattr(response, "model_dump"): parsed_out = response.model_dump() elif hasattr(response, "dict"): parsed_out = response.dict() else: parsed_out = dict(response) # Sanity check answer_text = parsed_out.get("answer", "") if not answer_text or len(answer_text.strip()) < 10: raise ValueError( "Błąd sanity check: Odpowiedź Q&A jest pusta lub zbyt krótka." ) # Jeśli źródła RAG coś znalazły, ale LLM nic nie podał, sklei to if not parsed_out.get("sources") and sources_used: parsed_out["sources"] = sources_used return parsed_out try: return invoke_qa() except Exception as e: import traceback traceback.print_exc() print( f"Wystąpił błąd structured_output w project_qa_agent: {e}, próba fallbacku..." ) try: # Fallback bez with_structured_output fallback_chain = prompt | llm raw_response = fallback_chain.invoke( { "company_info": company_info, "project_context": context, "rag_context": rag_context, "question": question, } ) return { "answer": raw_response.content if hasattr(raw_response, "content") else str(raw_response), "sources": sources_used, "confidence": 0.5, "recommendation": "Odpowiedź wygenerowana w trybie awaryjnym (fallback). Mogą brakować szczegółowych źródeł wygenerowanych przez AI.", } except Exception: traceback.print_exc() # Ostateczny awaryjny powrót return { "answer": f"Awaria strukturalnego formatowania odpowiedzi modelu i trybu awaryjnego: {str(e)}", "sources": sources_used, "confidence": 0.0, "recommendation": "Spróbuj sformułować pytanie w prostszy sposób. (Sprawdzanie strukturalne zabezpieczyło przed błędem)", }