""" LlamaParse + Hierarchical Chunking — serce pipeline RAG dla GrantForge AI. FAZA 2: Zaawansowane parsowanie PDF dokumentów prawnych (regulaminy dotacji, wytyczne MFiPR, załączniki KOP) z zachowaniem struktury tabelarycznej. Architektura failover: 1. LlamaParse API (LLAMA_CLOUD_API_KEY) — najlepsza jakość, zachowa tabele i listy 2. PyPDF2 + struktura heurystyczna (pypdf) — bez klucza API 3. Unstructured — dla trudnych skanów Zgodność: FAZA 2 planu Enterprise (LlamaParse dla dokumentów prawnych). """ import os import asyncio import tempfile import logging from typing import Optional from tenacity import retry, stop_after_attempt, wait_exponential logger = logging.getLogger(__name__) # ────────────────────────────────────────────────────────────────────────────── # Downloader PDF (z retry) # ────────────────────────────────────────────────────────────────────────────── @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=8)) async def download_pdf(url: str) -> Optional[str]: """Pobiera PDF do pliku tymczasowego. Retry 3x z exponential backoff.""" import httpx try: async with httpx.AsyncClient(follow_redirects=True, timeout=45.0) as client: response = await client.get(url) response.raise_for_status() fd, temp_path = tempfile.mkstemp(suffix=".pdf") with os.fdopen(fd, "wb") as f: f.write(response.content) logger.info(f"[PDF] Pobrano: {url} ({len(response.content) / 1024:.1f} KB)") return temp_path except Exception as e: logger.error(f"[PDF] Błąd pobierania {url}: {e}") raise # ────────────────────────────────────────────────────────────────────────────── # WARSTWA 1: LlamaParse (najlepsza jakość — zachowuje tabele, paragrafy, §) # ────────────────────────────────────────────────────────────────────────────── _LLAMAPARSE_INSTRUCTION = """ Parsing a Polish-language legal document related to EU grant programs (dotacje europejskie, fundusze strukturalne). Rules: 1. Preserve ALL paragraph headers (§ 1, Art. 1, Rozdział I, etc.) 2. Preserve tables exactly (budget tables, timeline tables, criteria scoring) 3. Preserve numbered lists and bullet points with their hierarchy 4. Mark page breaks as: 5. If a section header spans multiple lines, merge them on one line 6. Do NOT skip footnotes — mark as [Przypis N]: text 7. Polish legal abbreviations must remain unchanged (MFiPR, PARP, NCBR, UE, IOB) """ def _parse_llamaparse_sync(file_path: str) -> str: """ LlamaParse z instrukcjami dla dokumentów prawnych polskich dotacji. Zwraca Markdown z zachowaną strukturą §/Art./Rozdział. """ from llama_parse import LlamaParse api_key = os.environ.get("LLAMA_CLOUD_API_KEY") if not api_key: raise EnvironmentError("LLAMA_CLOUD_API_KEY nie skonfigurowany.") logger.info("[LlamaParse] Uruchamianie parsowania PDF (warstwa 1)...") parser = LlamaParse( api_key=api_key, result_type="markdown", verbose=False, language="pl", # język polski parsing_instruction=_LLAMAPARSE_INSTRUCTION, page_separator="\n\n", skip_diagonal_text=True, # ignoruj znaki wodne / stopki invalidate_cache=False, # cache API dla tego samego PDF do_not_unroll_columns=False, # zachowaj układ kolumn → tabele ) documents = parser.load_data(file_path) result = "\n\n".join(doc.text for doc in documents) logger.info(f"[LlamaParse] Sukces — {len(documents)} stron, {len(result)} znaków.") return result # ────────────────────────────────────────────────────────────────────────────── # WARSTWA 2: PyPDF (fallback bez klucza API) # ────────────────────────────────────────────────────────────────────────────── def _parse_pypdf_sync(file_path: str) -> str: """ Fallback: PyPDF + heurystyczny ekstraktor struktury § / Art. / Rozdział. Wolniejszy i mniej precyzyjny niż LlamaParse, ale działa offline. """ try: from pypdf import PdfReader reader = PdfReader(file_path) pages_text = [] for i, page in enumerate(reader.pages): text = page.extract_text() or "" if text.strip(): pages_text.append(f"\n{text}") full_text = "\n\n".join(pages_text) logger.info( f"[PyPDF] Sparsowano {len(reader.pages)} stron, {len(full_text)} znaków." ) return full_text except ImportError: logger.warning("[PyPDF] pypdf nie zainstalowany — próba z unstructured.") raise # ────────────────────────────────────────────────────────────────────────────── # WARSTWA 3: Unstructured (fallback dla skanów) # ────────────────────────────────────────────────────────────────────────────── def _parse_unstructured_sync(file_path: str) -> str: """Ostatnia linia obrony — unstructured dla skanów i trudnych PDFów.""" # from unstructured.partition.pdf import partition_pdf logger.info("[Unstructured] Fallback parsowania wyłączony (zbyt ciężka zależność).") # elements = partition_pdf(filename=file_path) # return "\n\n".join(str(el) for el in elements) raise ImportError("Unstructured.partition is disabled for performance reasons.") # ────────────────────────────────────────────────────────────────────────────── # Orkiestrator — waterfall failover # ────────────────────────────────────────────────────────────────────────────── async def parse_pdf_from_url(url: str, **kwargs) -> dict: """ Główny orchestrator parsowania PDF: LlamaParse → PyPDF → Unstructured → "" """ try: file_path = await download_pdf(url) except Exception as e: logger.error(f"[PDF] Nie udało się pobrać PDF: {e}") return {"text": "", "parser": "failed_download"} try: # Warstwa 1: LlamaParse (najlepsza) if os.environ.get("LLAMA_CLOUD_API_KEY"): try: text = await asyncio.to_thread(_parse_llamaparse_sync, file_path) return {"text": text, "parser": "llamaparse"} except Exception as e: logger.warning(f"[LlamaParse] Nieudane ({e}) — fallback PyPDF.") # Warstwa 2: PyPDF (offline) try: text = await asyncio.to_thread(_parse_pypdf_sync, file_path) return {"text": text, "parser": "pypdf"} except Exception as e: logger.warning(f"[PyPDF] Nieudane ({e}) — fallback Unstructured.") # Warstwa 3: Unstructured (skanowane PDFy) text = await asyncio.to_thread(_parse_unstructured_sync, file_path) return {"text": text, "parser": "unstructured"} except Exception as e: logger.error(f"[PDF] Wszystkie parsery zawiodły dla {url}: {e}") return {"text": "", "parser": "error"} finally: try: os.unlink(file_path) except Exception: pass async def parse_pdf_from_file(file_path: str, **kwargs) -> dict: """ Parsuje PDF z lokalnego pliku (używany przy upload przez użytkownika). Identyczny waterfall jak parse_pdf_from_url. """ try: if os.environ.get("LLAMA_CLOUD_API_KEY"): try: text = await asyncio.to_thread(_parse_llamaparse_sync, file_path) return {"text": text, "parser": "llamaparse"} except Exception as e: logger.warning(f"[LlamaParse] Błąd upload: {e} — fallback PyPDF.") try: text = await asyncio.to_thread(_parse_pypdf_sync, file_path) return {"text": text, "parser": "pypdf"} except Exception: text = await asyncio.to_thread(_parse_unstructured_sync, file_path) return {"text": text, "parser": "unstructured"} except Exception as e: logger.error(f"[PDF] Parsowanie pliku {file_path} nieudane: {e}") return {"text": "", "parser": "error"}