Spaces:
Running
Running
File size: 7,604 Bytes
afd56bc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | import os
import sys
import asyncio
import logging
from pathlib import Path
# Fix python path for backend modules
sys.path.append(str(Path(__file__).parent.parent))
# Hack na przestarza艂e zale偶no艣ci Langchaina
import langchain_text_splitters
sys.modules['langchain.text_splitter'] = langchain_text_splitters
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from core.parp_client import parp_client
from core.ncbr_client import ncbr_client
from core.zus_client import zus_client
from core.urzad_pracy_client import up_client
from rag_pipeline.vector_store import ingest_documents, delete_grant_documents, delete_namespace
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
async def fetch_and_ingest():
"""Pobiera wszystkie aktywne nabory i indeksuje je w Pinecone."""
logger.info("Rozpoczynam pobieranie nabor贸w z PARP...")
parp_nabory = await parp_client.get_active_nabory(force_refresh=True)
logger.info(f"Pobrano {len(parp_nabory)} nabor贸w z PARP.")
logger.info("Rozpoczynam pobieranie nabor贸w z NCBR...")
ncbr_nabory = await ncbr_client.get_active_nabory(force_refresh=True)
logger.info(f"Pobrano {len(ncbr_nabory)} nabor贸w z NCBR.")
logger.info("Rozpoczynam pobieranie nabor贸w z ZUS...")
zus_nabory = await zus_client.get_active_nabory(force_refresh=True)
logger.info(f"Pobrano {len(zus_nabory)} nabor贸w z ZUS.")
logger.info("Rozpoczynam pobieranie nabor贸w z Urz臋du Pracy...")
up_nabory = await up_client.get_active_nabory(force_refresh=True)
logger.info(f"Pobrano {len(up_nabory)} nabor贸w z UP.")
all_nabory = parp_nabory + ncbr_nabory + zus_nabory + up_nabory
if not all_nabory:
logger.warning("Brak aktywnych nabor贸w do przetworzenia. Zako艅czono.")
return
# Inicjalizacja splitter贸w (chunking)
# parent: du偶e porcje kodu/tekstu z zachowaniem pe艂nego kontekstu
# child: ma艂e fragmenty dla semantycznego wyszukiwania w Pinecone
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=50)
namespace = "grants_guidelines"
logger.info(f"Czyszczenie starej bazy wektorowej dla namespace: {namespace}...")
delete_namespace(namespace)
for nabor in all_nabory:
grant_id = nabor.get("id")
title = nabor.get("name")
logger.info(f"Przetwarzanie naboru: {title} ({grant_id})")
# Na cele produkcyjne pobieramy pe艂en opis z URL lub parsowanego markdownu naboru.
# W tym skrypcie u偶ywamy metadanych jako g艂贸wnej zawarto艣ci dokumentu bazowego.
raw_text = (
f"Nazwa Programu: {nabor.get('program')}\n"
f"Nazwa Naboru: {title}\n"
f"ID Naboru: {grant_id}\n"
f"Typ Naboru: {nabor.get('type', 'Brak danych')}\n"
f"Opis Naboru: {nabor.get('description', 'Brak opisu.')}\n"
f"Status: {nabor.get('status')}\n"
f"Termin (Deadline): {nabor.get('deadline', 'Brak danych')}\n"
f"Dofinansowanie: od {nabor.get('min_dofinansowanie_pln', 0)} PLN do {nabor.get('max_dofinansowanie_pln', 0)} PLN (do {nabor.get('dofinansowanie_pct_max', 0)}%)\n"
f"Kwalifikowalne regiony: {', '.join(nabor.get('eligible_regions', []))}\n"
f"Wielko艣膰 firm (M艢P): {', '.join(nabor.get('eligible_company_sizes', []))}\n"
f"Kwalifikowalne PKD: {', '.join(nabor.get('eligible_pkd', []))}\n"
f"Link oficjalny: {nabor.get('url', 'Brak linku')}\n"
)
# Pr贸ba pobrania pe艂nej tre艣ci (np. z plik贸w PDF lub dok艂adnej strony)
grant_url = nabor.get("url")
if grant_url and grant_url != "Brak linku":
try:
# Najpierw spr贸bujmy pobra膰 zawarto艣膰 strony z Firecrawl API, je艣li mamy klucz,
# lub poprzez WebBaseLoader/PyPDFLoader w zale偶no艣ci od formatu.
if grant_url.lower().endswith(".pdf"):
from langchain_community.document_loaders import PyPDFLoader
import tempfile
import urllib.request
logger.info(f"Wykryto bezpo艣redni link do PDF: {grant_url}")
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
urllib.request.urlretrieve(grant_url, tmp_file.name)
loader = PyPDFLoader(tmp_file.name)
pdf_docs = loader.load()
pdf_text = "\\n".join([doc.page_content for doc in pdf_docs])
raw_text += f"\\n\\n--- TRE艢膯 REGULAMINU (PDF) ---\\n{pdf_text}\\n"
os.unlink(tmp_file.name)
except Exception as pdf_err:
logger.warning(f"B艂膮d pobierania PDF z {grant_url}: {pdf_err}")
else:
api_key = os.getenv("FIRECRAWL_API_KEY")
if api_key:
import requests
logger.info(f"Pobieranie pe艂nej tre艣ci HTML/Markdown przez Firecrawl: {grant_url}")
resp = requests.post(
"https://api.firecrawl.dev/v1/scrape",
headers={"Authorization": f"Bearer {api_key}"},
json={"url": grant_url, "formats": ["markdown"]}
)
if resp.status_code == 200:
data = resp.json()
if data.get("success") and data.get("data", {}).get("markdown"):
raw_text += f"\\n\\n--- PE艁NY OPIS ZE STRONY ---\\n{data['data']['markdown']}\\n"
else:
logger.warning(f"B艂膮d pobierania przez Firecrawl dla {grant_url}: {resp.status_code}")
else:
logger.warning(f"Brak FIRECRAWL_API_KEY, zignorowano pobieranie {grant_url}")
except ImportError as e:
logger.warning(f"Brak biblioteki do obs艂ugi parsowania: {e}. Zignorowano pe艂ne parsowanie.")
except Exception as e:
logger.warning(f"B艂膮d podczas analizy grant_url {grant_url}: {e}")
base_doc = Document(
page_content=raw_text,
metadata={
"grant_id": grant_id,
"program": nabor.get("program"),
"title": title,
"is_current": True,
"type": "grant_guideline",
"source": nabor.get("url"),
}
)
parent_docs = parent_splitter.split_documents([base_doc])
child_docs = []
# Tworzenie mniejszych chunk贸w child z referencj膮 (parent_index)
for i, p_doc in enumerate(parent_docs):
c_docs = child_splitter.split_documents([p_doc])
for c_doc in c_docs:
c_doc.metadata["parent_index"] = i
child_docs.extend(c_docs)
# Ingest do wektorowej bazy i local file store
if parent_docs and child_docs:
ingest_documents(parent_docs, child_docs, namespace=namespace)
else:
logger.warning(f"Brak chunk贸w do wektoryzacji dla {grant_id}.")
logger.info("Zako艅czono proces wektoryzacji nabor贸w.")
if __name__ == "__main__":
asyncio.run(fetch_and_ingest())
|