import os import sys import asyncio import logging from pathlib import Path # Fix python path for backend modules sys.path.append(str(Path(__file__).parent.parent)) # Hack na przestarzałe zależności Langchaina import langchain_text_splitters sys.modules['langchain.text_splitter'] = langchain_text_splitters from langchain_core.documents import Document from langchain_text_splitters import RecursiveCharacterTextSplitter from core.parp_client import parp_client from core.ncbr_client import ncbr_client from core.zus_client import zus_client from core.urzad_pracy_client import up_client from rag_pipeline.vector_store import ingest_documents, delete_grant_documents, delete_namespace logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) async def fetch_and_ingest(): """Pobiera wszystkie aktywne nabory i indeksuje je w Pinecone.""" logger.info("Rozpoczynam pobieranie naborów z PARP...") parp_nabory = await parp_client.get_active_nabory(force_refresh=True) logger.info(f"Pobrano {len(parp_nabory)} naborów z PARP.") logger.info("Rozpoczynam pobieranie naborów z NCBR...") ncbr_nabory = await ncbr_client.get_active_nabory(force_refresh=True) logger.info(f"Pobrano {len(ncbr_nabory)} naborów z NCBR.") logger.info("Rozpoczynam pobieranie naborów z ZUS...") zus_nabory = await zus_client.get_active_nabory(force_refresh=True) logger.info(f"Pobrano {len(zus_nabory)} naborów z ZUS.") logger.info("Rozpoczynam pobieranie naborów z Urzędu Pracy...") up_nabory = await up_client.get_active_nabory(force_refresh=True) logger.info(f"Pobrano {len(up_nabory)} naborów z UP.") all_nabory = parp_nabory + ncbr_nabory + zus_nabory + up_nabory if not all_nabory: logger.warning("Brak aktywnych naborów do przetworzenia. Zakończono.") return # Inicjalizacja splitterów (chunking) # parent: duże porcje kodu/tekstu z zachowaniem pełnego kontekstu # child: małe fragmenty dla semantycznego wyszukiwania w Pinecone parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200) child_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=50) namespace = "grants_guidelines" logger.info(f"Czyszczenie starej bazy wektorowej dla namespace: {namespace}...") delete_namespace(namespace) for nabor in all_nabory: grant_id = nabor.get("id") title = nabor.get("name") logger.info(f"Przetwarzanie naboru: {title} ({grant_id})") # Na cele produkcyjne pobieramy pełen opis z URL lub parsowanego markdownu naboru. # W tym skrypcie używamy metadanych jako głównej zawartości dokumentu bazowego. raw_text = ( f"Nazwa Programu: {nabor.get('program')}\n" f"Nazwa Naboru: {title}\n" f"ID Naboru: {grant_id}\n" f"Typ Naboru: {nabor.get('type', 'Brak danych')}\n" f"Opis Naboru: {nabor.get('description', 'Brak opisu.')}\n" f"Status: {nabor.get('status')}\n" f"Termin (Deadline): {nabor.get('deadline', 'Brak danych')}\n" f"Dofinansowanie: od {nabor.get('min_dofinansowanie_pln', 0)} PLN do {nabor.get('max_dofinansowanie_pln', 0)} PLN (do {nabor.get('dofinansowanie_pct_max', 0)}%)\n" f"Kwalifikowalne regiony: {', '.join(nabor.get('eligible_regions', []))}\n" f"Wielkość firm (MŚP): {', '.join(nabor.get('eligible_company_sizes', []))}\n" f"Kwalifikowalne PKD: {', '.join(nabor.get('eligible_pkd', []))}\n" f"Link oficjalny: {nabor.get('url', 'Brak linku')}\n" ) # Próba pobrania pełnej treści (np. z plików PDF lub dokładnej strony) grant_url = nabor.get("url") if grant_url and grant_url != "Brak linku": try: # Najpierw spróbujmy pobrać zawartość strony z Firecrawl API, jeśli mamy klucz, # lub poprzez WebBaseLoader/PyPDFLoader w zależności od formatu. if grant_url.lower().endswith(".pdf"): from langchain_community.document_loaders import PyPDFLoader import tempfile import urllib.request logger.info(f"Wykryto bezpośredni link do PDF: {grant_url}") try: with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: urllib.request.urlretrieve(grant_url, tmp_file.name) loader = PyPDFLoader(tmp_file.name) pdf_docs = loader.load() pdf_text = "\\n".join([doc.page_content for doc in pdf_docs]) raw_text += f"\\n\\n--- TREŚĆ REGULAMINU (PDF) ---\\n{pdf_text}\\n" os.unlink(tmp_file.name) except Exception as pdf_err: logger.warning(f"Błąd pobierania PDF z {grant_url}: {pdf_err}") else: api_key = os.getenv("FIRECRAWL_API_KEY") if api_key: import requests logger.info(f"Pobieranie pełnej treści HTML/Markdown przez Firecrawl: {grant_url}") resp = requests.post( "https://api.firecrawl.dev/v1/scrape", headers={"Authorization": f"Bearer {api_key}"}, json={"url": grant_url, "formats": ["markdown"]} ) if resp.status_code == 200: data = resp.json() if data.get("success") and data.get("data", {}).get("markdown"): raw_text += f"\\n\\n--- PEŁNY OPIS ZE STRONY ---\\n{data['data']['markdown']}\\n" else: logger.warning(f"Błąd pobierania przez Firecrawl dla {grant_url}: {resp.status_code}") else: logger.warning(f"Brak FIRECRAWL_API_KEY, zignorowano pobieranie {grant_url}") except ImportError as e: logger.warning(f"Brak biblioteki do obsługi parsowania: {e}. Zignorowano pełne parsowanie.") except Exception as e: logger.warning(f"Błąd podczas analizy grant_url {grant_url}: {e}") base_doc = Document( page_content=raw_text, metadata={ "grant_id": grant_id, "program": nabor.get("program"), "title": title, "is_current": True, "type": "grant_guideline", "source": nabor.get("url"), } ) parent_docs = parent_splitter.split_documents([base_doc]) child_docs = [] # Tworzenie mniejszych chunków child z referencją (parent_index) for i, p_doc in enumerate(parent_docs): c_docs = child_splitter.split_documents([p_doc]) for c_doc in c_docs: c_doc.metadata["parent_index"] = i child_docs.extend(c_docs) # Ingest do wektorowej bazy i local file store if parent_docs and child_docs: ingest_documents(parent_docs, child_docs, namespace=namespace) else: logger.warning(f"Brak chunków do wektoryzacji dla {grant_id}.") logger.info("Zakończono proces wektoryzacji naborów.") if __name__ == "__main__": asyncio.run(fetch_and_ingest())