GrantForge Bot
Deploy to Hugging Face
afd56bc
"""
LlamaParse + Hierarchical Chunking β€” serce pipeline RAG dla GrantForge AI.
FAZA 2: Zaawansowane parsowanie PDF dokumentΓ³w prawnych (regulaminy dotacji,
wytyczne MFiPR, zaΕ‚Δ…czniki KOP) z zachowaniem struktury tabelarycznej.
Architektura failover:
1. LlamaParse API (LLAMA_CLOUD_API_KEY) β€” najlepsza jakoΕ›Δ‡, zachowa tabele i listy
2. PyPDF2 + struktura heurystyczna (pypdf) β€” bez klucza API
3. Unstructured β€” dla trudnych skanΓ³w
ZgodnoΕ›Δ‡: FAZA 2 planu Enterprise (LlamaParse dla dokumentΓ³w prawnych).
"""
import os
import asyncio
import tempfile
import logging
from typing import Optional
from tenacity import retry, stop_after_attempt, wait_exponential
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────────────────────
# Downloader PDF (z retry)
# ──────────────────────────────────────────────────────────────────────────────
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=8))
async def download_pdf(url: str) -> Optional[str]:
"""Pobiera PDF do pliku tymczasowego. Retry 3x z exponential backoff."""
import httpx
try:
async with httpx.AsyncClient(follow_redirects=True, timeout=45.0) as client:
response = await client.get(url)
response.raise_for_status()
fd, temp_path = tempfile.mkstemp(suffix=".pdf")
with os.fdopen(fd, "wb") as f:
f.write(response.content)
logger.info(f"[PDF] Pobrano: {url} ({len(response.content) / 1024:.1f} KB)")
return temp_path
except Exception as e:
logger.error(f"[PDF] BΕ‚Δ…d pobierania {url}: {e}")
raise
# ──────────────────────────────────────────────────────────────────────────────
# WARSTWA 1: LlamaParse (najlepsza jakoΕ›Δ‡ β€” zachowuje tabele, paragrafy, Β§)
# ──────────────────────────────────────────────────────────────────────────────
_LLAMAPARSE_INSTRUCTION = """
Parsing a Polish-language legal document related to EU grant programs
(dotacje europejskie, fundusze strukturalne).
Rules:
1. Preserve ALL paragraph headers (Β§ 1, Art. 1, RozdziaΕ‚ I, etc.)
2. Preserve tables exactly (budget tables, timeline tables, criteria scoring)
3. Preserve numbered lists and bullet points with their hierarchy
4. Mark page breaks as: <!-- PAGE_BREAK -->
5. If a section header spans multiple lines, merge them on one line
6. Do NOT skip footnotes β€” mark as [Przypis N]: text
7. Polish legal abbreviations must remain unchanged (MFiPR, PARP, NCBR, UE, IOB)
"""
def _parse_llamaparse_sync(file_path: str) -> str:
"""
LlamaParse z instrukcjami dla dokumentΓ³w prawnych polskich dotacji.
Zwraca Markdown z zachowanΔ… strukturΔ… Β§/Art./RozdziaΕ‚.
"""
from llama_parse import LlamaParse
api_key = os.environ.get("LLAMA_CLOUD_API_KEY")
if not api_key:
raise EnvironmentError("LLAMA_CLOUD_API_KEY nie skonfigurowany.")
logger.info("[LlamaParse] Uruchamianie parsowania PDF (warstwa 1)...")
parser = LlamaParse(
api_key=api_key,
result_type="markdown",
verbose=False,
language="pl", # jΔ™zyk polski
parsing_instruction=_LLAMAPARSE_INSTRUCTION,
page_separator="\n<!-- PAGE_BREAK -->\n",
skip_diagonal_text=True, # ignoruj znaki wodne / stopki
invalidate_cache=False, # cache API dla tego samego PDF
do_not_unroll_columns=False, # zachowaj ukΕ‚ad kolumn β†’ tabele
)
documents = parser.load_data(file_path)
result = "\n\n".join(doc.text for doc in documents)
logger.info(f"[LlamaParse] Sukces β€” {len(documents)} stron, {len(result)} znakΓ³w.")
return result
# ──────────────────────────────────────────────────────────────────────────────
# WARSTWA 2: PyPDF (fallback bez klucza API)
# ──────────────────────────────────────────────────────────────────────────────
def _parse_pypdf_sync(file_path: str) -> str:
"""
Fallback: PyPDF + heurystyczny ekstraktor struktury Β§ / Art. / RozdziaΕ‚.
Wolniejszy i mniej precyzyjny niΕΌ LlamaParse, ale dziaΕ‚a offline.
"""
try:
from pypdf import PdfReader
reader = PdfReader(file_path)
pages_text = []
for i, page in enumerate(reader.pages):
text = page.extract_text() or ""
if text.strip():
pages_text.append(f"<!-- PAGE {i+1} -->\n{text}")
full_text = "\n\n".join(pages_text)
logger.info(
f"[PyPDF] Sparsowano {len(reader.pages)} stron, {len(full_text)} znakΓ³w."
)
return full_text
except ImportError:
logger.warning("[PyPDF] pypdf nie zainstalowany β€” prΓ³ba z unstructured.")
raise
# ──────────────────────────────────────────────────────────────────────────────
# WARSTWA 3: Unstructured (fallback dla skanΓ³w)
# ──────────────────────────────────────────────────────────────────────────────
def _parse_unstructured_sync(file_path: str) -> str:
"""Ostatnia linia obrony β€” unstructured dla skanΓ³w i trudnych PDFΓ³w."""
# from unstructured.partition.pdf import partition_pdf
logger.info("[Unstructured] Fallback parsowania wyΕ‚Δ…czony (zbyt ciΔ™ΕΌka zaleΕΌnoΕ›Δ‡).")
# elements = partition_pdf(filename=file_path)
# return "\n\n".join(str(el) for el in elements)
raise ImportError("Unstructured.partition is disabled for performance reasons.")
# ──────────────────────────────────────────────────────────────────────────────
# Orkiestrator β€” waterfall failover
# ──────────────────────────────────────────────────────────────────────────────
async def parse_pdf_from_url(url: str, **kwargs) -> dict:
"""
GΕ‚Γ³wny orchestrator parsowania PDF:
LlamaParse β†’ PyPDF β†’ Unstructured β†’ ""
"""
try:
file_path = await download_pdf(url)
except Exception as e:
logger.error(f"[PDF] Nie udało się pobrać PDF: {e}")
return {"text": "", "parser": "failed_download"}
try:
# Warstwa 1: LlamaParse (najlepsza)
if os.environ.get("LLAMA_CLOUD_API_KEY"):
try:
text = await asyncio.to_thread(_parse_llamaparse_sync, file_path)
return {"text": text, "parser": "llamaparse"}
except Exception as e:
logger.warning(f"[LlamaParse] Nieudane ({e}) β€” fallback PyPDF.")
# Warstwa 2: PyPDF (offline)
try:
text = await asyncio.to_thread(_parse_pypdf_sync, file_path)
return {"text": text, "parser": "pypdf"}
except Exception as e:
logger.warning(f"[PyPDF] Nieudane ({e}) β€” fallback Unstructured.")
# Warstwa 3: Unstructured (skanowane PDFy)
text = await asyncio.to_thread(_parse_unstructured_sync, file_path)
return {"text": text, "parser": "unstructured"}
except Exception as e:
logger.error(f"[PDF] Wszystkie parsery zawiodΕ‚y dla {url}: {e}")
return {"text": "", "parser": "error"}
finally:
try:
os.unlink(file_path)
except Exception:
pass
async def parse_pdf_from_file(file_path: str, **kwargs) -> dict:
"""
Parsuje PDF z lokalnego pliku (uΕΌywany przy upload przez uΕΌytkownika).
Identyczny waterfall jak parse_pdf_from_url.
"""
try:
if os.environ.get("LLAMA_CLOUD_API_KEY"):
try:
text = await asyncio.to_thread(_parse_llamaparse_sync, file_path)
return {"text": text, "parser": "llamaparse"}
except Exception as e:
logger.warning(f"[LlamaParse] BΕ‚Δ…d upload: {e} β€” fallback PyPDF.")
try:
text = await asyncio.to_thread(_parse_pypdf_sync, file_path)
return {"text": text, "parser": "pypdf"}
except Exception:
text = await asyncio.to_thread(_parse_unstructured_sync, file_path)
return {"text": text, "parser": "unstructured"}
except Exception as e:
logger.error(f"[PDF] Parsowanie pliku {file_path} nieudane: {e}")
return {"text": "", "parser": "error"}