Spaces:
Running
Running
| import os | |
| import sys | |
| import asyncio | |
| import logging | |
| from pathlib import Path | |
| # Fix python path for backend modules | |
| sys.path.append(str(Path(__file__).parent.parent)) | |
| # Hack na przestarza艂e zale偶no艣ci Langchaina | |
| import langchain_text_splitters | |
| sys.modules['langchain.text_splitter'] = langchain_text_splitters | |
| from langchain_core.documents import Document | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from core.parp_client import parp_client | |
| from core.ncbr_client import ncbr_client | |
| from core.zus_client import zus_client | |
| from core.urzad_pracy_client import up_client | |
| from rag_pipeline.vector_store import ingest_documents, delete_grant_documents, delete_namespace | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| async def fetch_and_ingest(): | |
| """Pobiera wszystkie aktywne nabory i indeksuje je w Pinecone.""" | |
| logger.info("Rozpoczynam pobieranie nabor贸w z PARP...") | |
| parp_nabory = await parp_client.get_active_nabory(force_refresh=True) | |
| logger.info(f"Pobrano {len(parp_nabory)} nabor贸w z PARP.") | |
| logger.info("Rozpoczynam pobieranie nabor贸w z NCBR...") | |
| ncbr_nabory = await ncbr_client.get_active_nabory(force_refresh=True) | |
| logger.info(f"Pobrano {len(ncbr_nabory)} nabor贸w z NCBR.") | |
| logger.info("Rozpoczynam pobieranie nabor贸w z ZUS...") | |
| zus_nabory = await zus_client.get_active_nabory(force_refresh=True) | |
| logger.info(f"Pobrano {len(zus_nabory)} nabor贸w z ZUS.") | |
| logger.info("Rozpoczynam pobieranie nabor贸w z Urz臋du Pracy...") | |
| up_nabory = await up_client.get_active_nabory(force_refresh=True) | |
| logger.info(f"Pobrano {len(up_nabory)} nabor贸w z UP.") | |
| all_nabory = parp_nabory + ncbr_nabory + zus_nabory + up_nabory | |
| if not all_nabory: | |
| logger.warning("Brak aktywnych nabor贸w do przetworzenia. Zako艅czono.") | |
| return | |
| # Inicjalizacja splitter贸w (chunking) | |
| # parent: du偶e porcje kodu/tekstu z zachowaniem pe艂nego kontekstu | |
| # child: ma艂e fragmenty dla semantycznego wyszukiwania w Pinecone | |
| parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200) | |
| child_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=50) | |
| namespace = "grants_guidelines" | |
| logger.info(f"Czyszczenie starej bazy wektorowej dla namespace: {namespace}...") | |
| delete_namespace(namespace) | |
| for nabor in all_nabory: | |
| grant_id = nabor.get("id") | |
| title = nabor.get("name") | |
| logger.info(f"Przetwarzanie naboru: {title} ({grant_id})") | |
| # Na cele produkcyjne pobieramy pe艂en opis z URL lub parsowanego markdownu naboru. | |
| # W tym skrypcie u偶ywamy metadanych jako g艂贸wnej zawarto艣ci dokumentu bazowego. | |
| raw_text = ( | |
| f"Nazwa Programu: {nabor.get('program')}\n" | |
| f"Nazwa Naboru: {title}\n" | |
| f"ID Naboru: {grant_id}\n" | |
| f"Typ Naboru: {nabor.get('type', 'Brak danych')}\n" | |
| f"Opis Naboru: {nabor.get('description', 'Brak opisu.')}\n" | |
| f"Status: {nabor.get('status')}\n" | |
| f"Termin (Deadline): {nabor.get('deadline', 'Brak danych')}\n" | |
| f"Dofinansowanie: od {nabor.get('min_dofinansowanie_pln', 0)} PLN do {nabor.get('max_dofinansowanie_pln', 0)} PLN (do {nabor.get('dofinansowanie_pct_max', 0)}%)\n" | |
| f"Kwalifikowalne regiony: {', '.join(nabor.get('eligible_regions', []))}\n" | |
| f"Wielko艣膰 firm (M艢P): {', '.join(nabor.get('eligible_company_sizes', []))}\n" | |
| f"Kwalifikowalne PKD: {', '.join(nabor.get('eligible_pkd', []))}\n" | |
| f"Link oficjalny: {nabor.get('url', 'Brak linku')}\n" | |
| ) | |
| # Pr贸ba pobrania pe艂nej tre艣ci (np. z plik贸w PDF lub dok艂adnej strony) | |
| grant_url = nabor.get("url") | |
| if grant_url and grant_url != "Brak linku": | |
| try: | |
| # Najpierw spr贸bujmy pobra膰 zawarto艣膰 strony z Firecrawl API, je艣li mamy klucz, | |
| # lub poprzez WebBaseLoader/PyPDFLoader w zale偶no艣ci od formatu. | |
| if grant_url.lower().endswith(".pdf"): | |
| from langchain_community.document_loaders import PyPDFLoader | |
| import tempfile | |
| import urllib.request | |
| logger.info(f"Wykryto bezpo艣redni link do PDF: {grant_url}") | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: | |
| urllib.request.urlretrieve(grant_url, tmp_file.name) | |
| loader = PyPDFLoader(tmp_file.name) | |
| pdf_docs = loader.load() | |
| pdf_text = "\\n".join([doc.page_content for doc in pdf_docs]) | |
| raw_text += f"\\n\\n--- TRE艢膯 REGULAMINU (PDF) ---\\n{pdf_text}\\n" | |
| os.unlink(tmp_file.name) | |
| except Exception as pdf_err: | |
| logger.warning(f"B艂膮d pobierania PDF z {grant_url}: {pdf_err}") | |
| else: | |
| api_key = os.getenv("FIRECRAWL_API_KEY") | |
| if api_key: | |
| import requests | |
| logger.info(f"Pobieranie pe艂nej tre艣ci HTML/Markdown przez Firecrawl: {grant_url}") | |
| resp = requests.post( | |
| "https://api.firecrawl.dev/v1/scrape", | |
| headers={"Authorization": f"Bearer {api_key}"}, | |
| json={"url": grant_url, "formats": ["markdown"]} | |
| ) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| if data.get("success") and data.get("data", {}).get("markdown"): | |
| raw_text += f"\\n\\n--- PE艁NY OPIS ZE STRONY ---\\n{data['data']['markdown']}\\n" | |
| else: | |
| logger.warning(f"B艂膮d pobierania przez Firecrawl dla {grant_url}: {resp.status_code}") | |
| else: | |
| logger.warning(f"Brak FIRECRAWL_API_KEY, zignorowano pobieranie {grant_url}") | |
| except ImportError as e: | |
| logger.warning(f"Brak biblioteki do obs艂ugi parsowania: {e}. Zignorowano pe艂ne parsowanie.") | |
| except Exception as e: | |
| logger.warning(f"B艂膮d podczas analizy grant_url {grant_url}: {e}") | |
| base_doc = Document( | |
| page_content=raw_text, | |
| metadata={ | |
| "grant_id": grant_id, | |
| "program": nabor.get("program"), | |
| "title": title, | |
| "is_current": True, | |
| "type": "grant_guideline", | |
| "source": nabor.get("url"), | |
| } | |
| ) | |
| parent_docs = parent_splitter.split_documents([base_doc]) | |
| child_docs = [] | |
| # Tworzenie mniejszych chunk贸w child z referencj膮 (parent_index) | |
| for i, p_doc in enumerate(parent_docs): | |
| c_docs = child_splitter.split_documents([p_doc]) | |
| for c_doc in c_docs: | |
| c_doc.metadata["parent_index"] = i | |
| child_docs.extend(c_docs) | |
| # Ingest do wektorowej bazy i local file store | |
| if parent_docs and child_docs: | |
| ingest_documents(parent_docs, child_docs, namespace=namespace) | |
| else: | |
| logger.warning(f"Brak chunk贸w do wektoryzacji dla {grant_id}.") | |
| logger.info("Zako艅czono proces wektoryzacji nabor贸w.") | |
| if __name__ == "__main__": | |
| asyncio.run(fetch_and_ingest()) | |