grantforge-api / backend /scripts /ingest_grants.py
GrantForge Bot
Deploy to Hugging Face
afd56bc
import os
import sys
import asyncio
import logging
from pathlib import Path
# Fix python path for backend modules
sys.path.append(str(Path(__file__).parent.parent))
# Hack na przestarza艂e zale偶no艣ci Langchaina
import langchain_text_splitters
sys.modules['langchain.text_splitter'] = langchain_text_splitters
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from core.parp_client import parp_client
from core.ncbr_client import ncbr_client
from core.zus_client import zus_client
from core.urzad_pracy_client import up_client
from rag_pipeline.vector_store import ingest_documents, delete_grant_documents, delete_namespace
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
async def fetch_and_ingest():
"""Pobiera wszystkie aktywne nabory i indeksuje je w Pinecone."""
logger.info("Rozpoczynam pobieranie nabor贸w z PARP...")
parp_nabory = await parp_client.get_active_nabory(force_refresh=True)
logger.info(f"Pobrano {len(parp_nabory)} nabor贸w z PARP.")
logger.info("Rozpoczynam pobieranie nabor贸w z NCBR...")
ncbr_nabory = await ncbr_client.get_active_nabory(force_refresh=True)
logger.info(f"Pobrano {len(ncbr_nabory)} nabor贸w z NCBR.")
logger.info("Rozpoczynam pobieranie nabor贸w z ZUS...")
zus_nabory = await zus_client.get_active_nabory(force_refresh=True)
logger.info(f"Pobrano {len(zus_nabory)} nabor贸w z ZUS.")
logger.info("Rozpoczynam pobieranie nabor贸w z Urz臋du Pracy...")
up_nabory = await up_client.get_active_nabory(force_refresh=True)
logger.info(f"Pobrano {len(up_nabory)} nabor贸w z UP.")
all_nabory = parp_nabory + ncbr_nabory + zus_nabory + up_nabory
if not all_nabory:
logger.warning("Brak aktywnych nabor贸w do przetworzenia. Zako艅czono.")
return
# Inicjalizacja splitter贸w (chunking)
# parent: du偶e porcje kodu/tekstu z zachowaniem pe艂nego kontekstu
# child: ma艂e fragmenty dla semantycznego wyszukiwania w Pinecone
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=50)
namespace = "grants_guidelines"
logger.info(f"Czyszczenie starej bazy wektorowej dla namespace: {namespace}...")
delete_namespace(namespace)
for nabor in all_nabory:
grant_id = nabor.get("id")
title = nabor.get("name")
logger.info(f"Przetwarzanie naboru: {title} ({grant_id})")
# Na cele produkcyjne pobieramy pe艂en opis z URL lub parsowanego markdownu naboru.
# W tym skrypcie u偶ywamy metadanych jako g艂贸wnej zawarto艣ci dokumentu bazowego.
raw_text = (
f"Nazwa Programu: {nabor.get('program')}\n"
f"Nazwa Naboru: {title}\n"
f"ID Naboru: {grant_id}\n"
f"Typ Naboru: {nabor.get('type', 'Brak danych')}\n"
f"Opis Naboru: {nabor.get('description', 'Brak opisu.')}\n"
f"Status: {nabor.get('status')}\n"
f"Termin (Deadline): {nabor.get('deadline', 'Brak danych')}\n"
f"Dofinansowanie: od {nabor.get('min_dofinansowanie_pln', 0)} PLN do {nabor.get('max_dofinansowanie_pln', 0)} PLN (do {nabor.get('dofinansowanie_pct_max', 0)}%)\n"
f"Kwalifikowalne regiony: {', '.join(nabor.get('eligible_regions', []))}\n"
f"Wielko艣膰 firm (M艢P): {', '.join(nabor.get('eligible_company_sizes', []))}\n"
f"Kwalifikowalne PKD: {', '.join(nabor.get('eligible_pkd', []))}\n"
f"Link oficjalny: {nabor.get('url', 'Brak linku')}\n"
)
# Pr贸ba pobrania pe艂nej tre艣ci (np. z plik贸w PDF lub dok艂adnej strony)
grant_url = nabor.get("url")
if grant_url and grant_url != "Brak linku":
try:
# Najpierw spr贸bujmy pobra膰 zawarto艣膰 strony z Firecrawl API, je艣li mamy klucz,
# lub poprzez WebBaseLoader/PyPDFLoader w zale偶no艣ci od formatu.
if grant_url.lower().endswith(".pdf"):
from langchain_community.document_loaders import PyPDFLoader
import tempfile
import urllib.request
logger.info(f"Wykryto bezpo艣redni link do PDF: {grant_url}")
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
urllib.request.urlretrieve(grant_url, tmp_file.name)
loader = PyPDFLoader(tmp_file.name)
pdf_docs = loader.load()
pdf_text = "\\n".join([doc.page_content for doc in pdf_docs])
raw_text += f"\\n\\n--- TRE艢膯 REGULAMINU (PDF) ---\\n{pdf_text}\\n"
os.unlink(tmp_file.name)
except Exception as pdf_err:
logger.warning(f"B艂膮d pobierania PDF z {grant_url}: {pdf_err}")
else:
api_key = os.getenv("FIRECRAWL_API_KEY")
if api_key:
import requests
logger.info(f"Pobieranie pe艂nej tre艣ci HTML/Markdown przez Firecrawl: {grant_url}")
resp = requests.post(
"https://api.firecrawl.dev/v1/scrape",
headers={"Authorization": f"Bearer {api_key}"},
json={"url": grant_url, "formats": ["markdown"]}
)
if resp.status_code == 200:
data = resp.json()
if data.get("success") and data.get("data", {}).get("markdown"):
raw_text += f"\\n\\n--- PE艁NY OPIS ZE STRONY ---\\n{data['data']['markdown']}\\n"
else:
logger.warning(f"B艂膮d pobierania przez Firecrawl dla {grant_url}: {resp.status_code}")
else:
logger.warning(f"Brak FIRECRAWL_API_KEY, zignorowano pobieranie {grant_url}")
except ImportError as e:
logger.warning(f"Brak biblioteki do obs艂ugi parsowania: {e}. Zignorowano pe艂ne parsowanie.")
except Exception as e:
logger.warning(f"B艂膮d podczas analizy grant_url {grant_url}: {e}")
base_doc = Document(
page_content=raw_text,
metadata={
"grant_id": grant_id,
"program": nabor.get("program"),
"title": title,
"is_current": True,
"type": "grant_guideline",
"source": nabor.get("url"),
}
)
parent_docs = parent_splitter.split_documents([base_doc])
child_docs = []
# Tworzenie mniejszych chunk贸w child z referencj膮 (parent_index)
for i, p_doc in enumerate(parent_docs):
c_docs = child_splitter.split_documents([p_doc])
for c_doc in c_docs:
c_doc.metadata["parent_index"] = i
child_docs.extend(c_docs)
# Ingest do wektorowej bazy i local file store
if parent_docs and child_docs:
ingest_documents(parent_docs, child_docs, namespace=namespace)
else:
logger.warning(f"Brak chunk贸w do wektoryzacji dla {grant_id}.")
logger.info("Zako艅czono proces wektoryzacji nabor贸w.")
if __name__ == "__main__":
asyncio.run(fetch_and_ingest())