Spaces:
Running
Running
| """ | |
| Endpoints: Upload PDF + Re-ingest RAG — Sprint 7 / Sprint 9 | |
| POST /api/projects/{project_id}/documents | |
| Wgrywa plik PDF do projektu, uruchamia pipeline RAG w tle (asyncio.create_task). | |
| Pipeline: LlamaParse → (PyPDF fallback) → Hierarchical Chunking → Pinecone | |
| Limity: | |
| - Hard limit: max 10 plików per projekt (wszyscy plany) | |
| - Soft limit: Free = 3 pliki, Pro/Enterprise = 50 plików | |
| GET /api/projects/{project_id}/documents | |
| Listuje dokumenty projektu ze statusem indeksacji. | |
| DELETE /api/projects/{project_id}/documents/{doc_id} | |
| Usuwa dokument z dysku i Pinecone. | |
| POST /api/projects/{project_id}/documents/{doc_id}/reingest | |
| Ponowna indeksacja dokumentu (np. po zmianie parametrów RAG). | |
| """ | |
| import os | |
| import uuid | |
| import logging | |
| import asyncio | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Optional | |
| from fastapi import APIRouter, HTTPException, UploadFile, File, Query | |
| from fastapi.responses import JSONResponse | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/api/projects", tags=["documents"]) | |
| # ── Konfiguracja ────────────────────────────────────────────────────────────── | |
| UPLOAD_DIR = Path(os.environ.get("UPLOAD_DIR", "/data/uploads")) | |
| MAX_FILE_SIZE_MB = 20 | |
| ALLOWED_MIME_TYPES = {"application/pdf", "application/x-pdf"} | |
| # Limity uploadów per plan | |
| UPLOAD_LIMIT_HARD = 10 # Max per projekt (wszystkie plany) | |
| UPLOAD_LIMIT_FREE = 3 # Max na planie Free | |
| UPLOAD_LIMIT_PRO = 50 # Max na planie Pro | |
| UPLOAD_LIMIT_ENTERPRISE = 50 # Max na planie Enterprise | |
| # ── Helpers ─────────────────────────────────────────────────────────────────── | |
| def _get_namespace(user_id: str, project_id: str) -> str: | |
| """Namespace Pinecone: tenant_{user_id}_{project_id}""" | |
| return f"tenant_{user_id}_{project_id}" | |
| def _resolve_user_id(token: Optional[str]) -> str: | |
| """Dekoduje JWT Clerk lub zwraca 'anonymous'.""" | |
| if not token: | |
| return "anonymous" | |
| try: | |
| import jwt | |
| if token == "dev_test_token": | |
| return "test_dev_user" | |
| decoded = jwt.decode(token, options={"verify_signature": False}) | |
| return decoded.get("sub", "anonymous") | |
| except Exception: | |
| return "anonymous" | |
| def _get_plan_upload_limit(db, project_id: str) -> tuple[int, str]: | |
| """ | |
| Pobiera limit uploadów na podstawie planu subskrypcji właściciela projektu. | |
| Zwraca (limit, plan_name). | |
| """ | |
| try: | |
| from core.projects.models import Project | |
| from core.subscription.models import UserSubscription | |
| project = db.query(Project).filter(Project.id == project_id).first() | |
| if not project: | |
| return UPLOAD_LIMIT_FREE, "free" | |
| # Pobierz plan użytkownika (jeśli model subskrypcji istnieje) | |
| sub = ( | |
| db.query(UserSubscription) | |
| .filter(UserSubscription.user_id == project.user_id) | |
| .first() | |
| if project.user_id | |
| else None | |
| ) | |
| plan = (sub.plan if sub else "free") or "free" | |
| plan_lower = plan.lower() | |
| if plan_lower in ("pro", "professional"): | |
| return UPLOAD_LIMIT_PRO, plan_lower | |
| elif plan_lower in ("enterprise", "business"): | |
| return UPLOAD_LIMIT_ENTERPRISE, plan_lower | |
| else: | |
| return UPLOAD_LIMIT_FREE, "free" | |
| except Exception: | |
| # Bezpieczny fallback — nie blokuj uploadu przy błędzie odczytu planu | |
| return UPLOAD_LIMIT_FREE, "free" | |
| def _check_upload_limits(db, project_id: str) -> dict: | |
| """ | |
| Sprawdza czy użytkownik może dodać kolejny dokument. | |
| Zwraca {'allowed': bool, 'current': int, 'limit': int, 'plan': str, 'reason': str}. | |
| """ | |
| from core.projects.models import ProjectDocument | |
| current_count = ( | |
| db.query(ProjectDocument) | |
| .filter( | |
| ProjectDocument.project_id == project_id, | |
| ProjectDocument.status != "deleted", | |
| ) | |
| .count() | |
| ) | |
| # Hard limit (bezwzględny — dotyczy wszystkich planów) | |
| if current_count >= UPLOAD_LIMIT_HARD: | |
| return { | |
| "allowed": False, | |
| "current": current_count, | |
| "limit": UPLOAD_LIMIT_HARD, | |
| "plan": "any", | |
| "reason": f"Przekroczono bezwzględny limit {UPLOAD_LIMIT_HARD} plików na projekt.", | |
| } | |
| # Soft limit (planowy) | |
| plan_limit, plan_name = _get_plan_upload_limit(db, project_id) | |
| if current_count >= plan_limit: | |
| return { | |
| "allowed": False, | |
| "current": current_count, | |
| "limit": plan_limit, | |
| "plan": plan_name, | |
| "reason": ( | |
| f"Plan '{plan_name}' pozwala na {plan_limit} pliki PDF per projekt. " | |
| "Usuń stare dokumenty lub przejdź na plan Pro." | |
| ), | |
| } | |
| return { | |
| "allowed": True, | |
| "current": current_count, | |
| "limit": plan_limit, | |
| "plan": plan_name, | |
| "reason": "", | |
| } | |
| async def _save_upload( | |
| upload: UploadFile, dest_dir: Path, doc_id: str | |
| ) -> tuple[Path, int]: | |
| """Zapisuje plik na dysku, zwraca (path, size_bytes).""" | |
| dest_dir.mkdir(parents=True, exist_ok=True) | |
| suffix = Path(upload.filename or "doc").suffix or ".pdf" | |
| dest_path = dest_dir / f"{doc_id}{suffix}" | |
| size = 0 | |
| chunk_size = 1024 * 256 # 256 KB chunks | |
| with open(dest_path, "wb") as f: | |
| while True: | |
| chunk = await upload.read(chunk_size) | |
| if not chunk: | |
| break | |
| size += len(chunk) | |
| if size > MAX_FILE_SIZE_MB * 1024 * 1024: | |
| dest_path.unlink(missing_ok=True) | |
| raise HTTPException( | |
| 413, detail=f"Plik za duży. Limit: {MAX_FILE_SIZE_MB} MB" | |
| ) | |
| f.write(chunk) | |
| return dest_path, size | |
| async def _run_rag_pipeline( | |
| doc_id: str, | |
| project_id: str, | |
| file_path: Path, | |
| namespace: str, | |
| program_name: Optional[str] = None, | |
| ): | |
| """ | |
| Uruchamia pipeline RAG dla przesłanego dokumentu. | |
| Wywoływany w tle przez asyncio.create_task(). | |
| Kroki: | |
| 1. Parse PDF (LlamaParse → PyPDF → Unstructured) | |
| 2. Hierarchical Chunking (Parent 2000 / Child 400) | |
| 3. Upsert do Pinecone (child) + LocalFileStore (parent) | |
| 4. Aktualizacja statusu w DB | |
| """ | |
| db = None | |
| try: | |
| from core.subscription.db import SessionLocal | |
| from core.projects.models import ProjectDocument | |
| db = SessionLocal() | |
| doc = db.query(ProjectDocument).filter(ProjectDocument.id == doc_id).first() | |
| if not doc: | |
| logger.error(f"[RAG Upload] Dokument {doc_id} nie znaleziony w DB.") | |
| return | |
| # ── Krok 1: Ustaw status "processing" ────────────────────────────── | |
| doc.status = "processing" | |
| db.commit() | |
| # ── Krok 2: Parse PDF ─────────────────────────────────────────────── | |
| try: | |
| from rag_pipeline.pdf_parser import parse_pdf_from_file | |
| except ImportError: | |
| from backend.rag_pipeline.pdf_parser import parse_pdf_from_file | |
| parse_result = await parse_pdf_from_file( | |
| str(file_path), | |
| document_type="regulamin_dotacyjny", | |
| program_name=program_name or "Nieznany Program", | |
| ) | |
| raw_text = parse_result.get("text", "") | |
| parser_used = parse_result.get("parser", "unknown") | |
| if not raw_text.strip(): | |
| raise ValueError("Parser nie wyodrębnił żadnej treści z pliku PDF.") | |
| logger.info( | |
| f"[RAG Upload] Dokument {doc_id}: sparsowano {len(raw_text)} znaków " | |
| f"przez '{parser_used}'." | |
| ) | |
| # ── Krok 3: Hierarchical Chunking ─────────────────────────────────── | |
| try: | |
| from rag_pipeline.ingest import hierarchical_chunking | |
| except ImportError: | |
| from backend.rag_pipeline.ingest import hierarchical_chunking | |
| parent_docs, child_docs = await asyncio.to_thread( | |
| hierarchical_chunking, | |
| text=raw_text, | |
| source_url=file_path.name, | |
| extra_metadata={ | |
| "source": file_path.name, | |
| "project_id": project_id, | |
| "document_id": doc_id, | |
| "program_name": program_name or "Nieznany", | |
| "is_current": True, | |
| }, | |
| ) | |
| logger.info( | |
| f"[RAG Upload] Chunking: {len(parent_docs)} parent, " | |
| f"{len(child_docs)} child chunks." | |
| ) | |
| # ── Krok 4: Upsert do Pinecone + LocalFileStore ───────────────────── | |
| try: | |
| from rag_pipeline.vector_store import ingest_documents | |
| except ImportError: | |
| from backend.rag_pipeline.vector_store import ingest_documents | |
| await asyncio.to_thread( | |
| ingest_documents, | |
| parent_docs=parent_docs, | |
| child_docs=child_docs, | |
| namespace=namespace, | |
| ) | |
| # ── Krok 5: Zaktualizuj rekord ────────────────────────────────────── | |
| doc.status = "indexed" | |
| doc.parser_used = parser_used | |
| doc.chunks_count = len(child_docs) | |
| doc.rag_namespace = namespace | |
| doc.indexed_at = datetime.now(timezone.utc) | |
| doc.processing_metadata = { | |
| "parent_chunks": len(parent_docs), | |
| "child_chunks": len(child_docs), | |
| "raw_text_length": len(raw_text), | |
| "program_name": program_name, | |
| } | |
| db.commit() | |
| logger.info( | |
| f"[RAG Upload] ✅ Dokument {doc_id} ('{file_path.name}') " | |
| f"zaindeksowany w namespace '{namespace}'." | |
| ) | |
| except Exception as e: | |
| logger.error(f"[RAG Upload] ❌ Błąd pipeline dla {doc_id}: {e}", exc_info=True) | |
| if db: | |
| try: | |
| from core.projects.models import ProjectDocument | |
| doc = ( | |
| db.query(ProjectDocument) | |
| .filter(ProjectDocument.id == doc_id) | |
| .first() | |
| ) | |
| if doc: | |
| doc.status = "error" | |
| doc.error_message = str(e)[:500] | |
| db.commit() | |
| except Exception: | |
| pass | |
| finally: | |
| if db: | |
| db.close() | |
| async def _run_external_grant_pipeline( | |
| doc_id: str, | |
| project_id: str, | |
| file_path: Path, | |
| program_name: Optional[str] = None, | |
| ): | |
| """ | |
| Parsuje zewnętrzny wniosek dotacyjny przez LlamaParse i zapisuje jego treść w projekcie (omijając Pinecone). | |
| """ | |
| db = None | |
| try: | |
| from core.subscription.db import SessionLocal | |
| from core.projects.models import ProjectDocument, Project | |
| db = SessionLocal() | |
| doc = db.query(ProjectDocument).filter(ProjectDocument.id == doc_id).first() | |
| if not doc: | |
| return | |
| doc.status = "processing" | |
| db.commit() | |
| try: | |
| from rag_pipeline.pdf_parser import parse_pdf_from_file | |
| except ImportError: | |
| from backend.rag_pipeline.pdf_parser import parse_pdf_from_file | |
| parse_result = await parse_pdf_from_file( | |
| str(file_path), | |
| document_type="wniosek_zewnetrzny", | |
| program_name=program_name or "Nieznany Program", | |
| ) | |
| raw_text = parse_result.get("text", "") | |
| parser_used = parse_result.get("parser", "unknown") | |
| if not raw_text.strip(): | |
| raise ValueError( | |
| "Parser nie wyodrębnił żadnej treści ze wskazanego wniosku." | |
| ) | |
| project = db.query(Project).filter(Project.id == project_id).first() | |
| if project: | |
| if project.foreign_grant_extract_text: | |
| project.foreign_grant_extract_text += ( | |
| "\n\n---Kolejny dokument---\n\n" + raw_text | |
| ) | |
| else: | |
| project.foreign_grant_extract_text = raw_text | |
| doc.status = "indexed" | |
| doc.parser_used = parser_used | |
| doc.chunks_count = 0 | |
| doc.indexed_at = datetime.now(timezone.utc) | |
| doc.processing_metadata = { | |
| "raw_text_length": len(raw_text), | |
| "parser": parser_used, | |
| "type": "external_grant", | |
| } | |
| db.commit() | |
| logger.info( | |
| f"[External Grant] ✅ Wniosek zewnętrzny {doc_id} przetworzony dla projektu {project_id}." | |
| ) | |
| except Exception as e: | |
| logger.error( | |
| f"[External Grant] ❌ Błąd pipeline dla {doc_id}: {e}", exc_info=True | |
| ) | |
| if db: | |
| try: | |
| from core.projects.models import ProjectDocument | |
| doc = ( | |
| db.query(ProjectDocument) | |
| .filter(ProjectDocument.id == doc_id) | |
| .first() | |
| ) | |
| if doc: | |
| doc.status = "error" | |
| doc.error_message = str(e)[:500] | |
| db.commit() | |
| except Exception: | |
| pass | |
| finally: | |
| if db: | |
| db.close() | |
| # ── Routes ──────────────────────────────────────────────────────────────────── | |
| async def upload_document( | |
| project_id: str, | |
| file: UploadFile = File(...), | |
| token: Optional[str] = Query(default=None, alias="token"), | |
| doc_type: Optional[str] = Query(default="knowledge_base", alias="doc_type"), | |
| ): | |
| """ | |
| Wgrywa plik PDF do projektu i uruchamia indeksację RAG w tle. | |
| Parametry (query): | |
| token — JWT Clerk (wymagany dla izolacji namespace) | |
| Zwraca: | |
| doc_id, status="uploaded", filename, wiadomość o tle | |
| """ | |
| # ── Walidacja pliku ───────────────────────────────────────────────────── | |
| if not file.filename or not file.filename.lower().endswith(".pdf"): | |
| raise HTTPException(400, detail="Obsługiwane są wyłącznie pliki PDF.") | |
| content_type = file.content_type or "" | |
| if ( | |
| content_type | |
| and content_type not in ALLOWED_MIME_TYPES | |
| and "pdf" not in content_type | |
| ): | |
| raise HTTPException(415, detail=f"Nieprawidłowy typ pliku: {content_type}") | |
| user_id = _resolve_user_id(token) | |
| namespace = _get_namespace(user_id, project_id) | |
| doc_id = str(uuid.uuid4()) | |
| # ── Weryfikacja projektu ──────────────────────────────────────────────── | |
| db = None | |
| try: | |
| from core.subscription.db import SessionLocal | |
| from core.projects.models import Project, ProjectDocument | |
| db = SessionLocal() | |
| project = db.query(Project).filter(Project.id == project_id).first() | |
| if not project: | |
| raise HTTPException(404, detail="Projekt nie istnieje.") | |
| # ── Sprawdź limity uploadów ───────────────────────────────────────── | |
| limit_check = _check_upload_limits(db, project_id) | |
| if not limit_check["allowed"]: | |
| raise HTTPException( | |
| status_code=429, | |
| detail={ | |
| "error": "upload_limit_exceeded", | |
| "message": limit_check["reason"], | |
| "current_count": limit_check["current"], | |
| "limit": limit_check["limit"], | |
| "plan": limit_check["plan"], | |
| "upgrade_url": "/cennik", | |
| }, | |
| ) | |
| program_name = project.program_name | |
| # ── Zapisz plik na dysk ───────────────────────────────────────────── | |
| dest_dir = UPLOAD_DIR / project_id | |
| file_path, file_size = await _save_upload(file, dest_dir, doc_id) | |
| # ── Zapis metadanych do DB ────────────────────────────────────────── | |
| doc_record = ProjectDocument( | |
| id=doc_id, | |
| project_id=project_id, | |
| filename=file_path.name, | |
| original_filename=file.filename, | |
| file_size_bytes=file_size, | |
| mime_type=file.content_type or "application/pdf", | |
| storage_path=str(file_path), | |
| status="uploaded", | |
| rag_namespace=namespace if doc_type == "knowledge_base" else None, | |
| doc_type=doc_type, | |
| ) | |
| db.add(doc_record) | |
| db.commit() | |
| db.refresh(doc_record) | |
| logger.info( | |
| f"[Upload] Plik '{file.filename}' ({file_size // 1024}KB) " | |
| f"zapisany jako {doc_id} dla projektu {project_id}." | |
| ) | |
| # ── Uruchom odpowiedni pipeline w tle ─────────────────────────────── | |
| if doc_type == "external_grant": | |
| asyncio.create_task( | |
| _run_external_grant_pipeline( | |
| doc_id=doc_id, | |
| project_id=project_id, | |
| file_path=file_path, | |
| program_name=program_name, | |
| ) | |
| ) | |
| else: | |
| asyncio.create_task( | |
| _run_rag_pipeline( | |
| doc_id=doc_id, | |
| project_id=project_id, | |
| file_path=file_path, | |
| namespace=namespace, | |
| program_name=program_name, | |
| ) | |
| ) | |
| return JSONResponse( | |
| status_code=202, # Accepted — przetwarzanie w tle | |
| content={ | |
| "doc_id": doc_id, | |
| "filename": file.filename, | |
| "file_size_bytes": file_size, | |
| "status": "uploaded", | |
| "message": ( | |
| "Plik przesłany pomyślnie. " | |
| "Indeksacja w RAG odbywa się w tle — " | |
| "sprawdź status przez GET /documents." | |
| ), | |
| "namespace": namespace, | |
| }, | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error( | |
| f"[Upload] Błąd uploadu dla projektu {project_id}: {e}", exc_info=True | |
| ) | |
| raise HTTPException(500, detail=f"Błąd wgrywania pliku: {str(e)}") | |
| finally: | |
| if db: | |
| db.close() | |
| async def list_documents( | |
| project_id: str, | |
| token: Optional[str] = Query(default=None, alias="token"), | |
| ): | |
| """Lista dokumentów projektu ze statusem indeksacji RAG + informacje o limitach.""" | |
| db = None | |
| try: | |
| from core.subscription.db import SessionLocal | |
| from core.projects.models import ProjectDocument | |
| db = SessionLocal() | |
| docs = ( | |
| db.query(ProjectDocument) | |
| .filter( | |
| ProjectDocument.project_id == project_id, | |
| ProjectDocument.status != "deleted", | |
| ) | |
| .order_by(ProjectDocument.uploaded_at.desc()) | |
| .all() | |
| ) | |
| # Kwota uploadu (do wyświetlenia w UI) | |
| limit_check = _check_upload_limits(db, project_id) | |
| return { | |
| "project_id": project_id, | |
| "documents": [ | |
| { | |
| "doc_id": d.id, | |
| "filename": d.original_filename, | |
| "file_size_bytes": d.file_size_bytes, | |
| "status": d.status, | |
| "doc_type": getattr(d, "doc_type", "knowledge_base"), | |
| "parser_used": d.parser_used, | |
| "chunks_count": d.chunks_count, | |
| "error_message": d.error_message, | |
| "uploaded_at": d.uploaded_at.isoformat() if d.uploaded_at else None, | |
| "indexed_at": d.indexed_at.isoformat() if d.indexed_at else None, | |
| } | |
| for d in docs | |
| ], | |
| "total": len(docs), | |
| # Informacje o limitach planu (dla frontendu) | |
| "quota": { | |
| "current": limit_check["current"], | |
| "limit": limit_check["limit"], | |
| "plan": limit_check["plan"], | |
| "can_upload": limit_check["allowed"], | |
| }, | |
| } | |
| except Exception as e: | |
| raise HTTPException(500, detail=str(e)) | |
| finally: | |
| if db: | |
| db.close() | |
| async def reingest_document( | |
| project_id: str, | |
| doc_id: str, | |
| token: Optional[str] = Query(default=None, alias="token"), | |
| ): | |
| """ | |
| Ponowna indeksacja dokumentu w RAG. | |
| Przydatne po zmianie parametrów chunkingu lub migracji Pinecone. | |
| """ | |
| user_id = _resolve_user_id(token) | |
| namespace = _get_namespace(user_id, project_id) | |
| try: | |
| from core.subscription.db import SessionLocal | |
| from core.projects.models import ProjectDocument | |
| db = SessionLocal() | |
| doc = ( | |
| db.query(ProjectDocument) | |
| .filter( | |
| ProjectDocument.id == doc_id, | |
| ProjectDocument.project_id == project_id, | |
| ) | |
| .first() | |
| ) | |
| if not doc: | |
| raise HTTPException(404, detail="Dokument nie istnieje.") | |
| file_path = Path(doc.storage_path) if doc.storage_path else None | |
| if not file_path or not file_path.exists(): | |
| raise HTTPException(410, detail="Plik źródłowy nie istnieje na dysku.") | |
| # Reset statusu | |
| doc.status = "uploaded" | |
| doc.error_message = None | |
| doc.chunks_count = None | |
| doc.indexed_at = None | |
| db.commit() | |
| db.close() | |
| # Pipeline RAG w tle | |
| asyncio.create_task( | |
| _run_rag_pipeline( | |
| doc_id=doc_id, | |
| project_id=project_id, | |
| file_path=file_path, | |
| namespace=namespace, | |
| ) | |
| ) | |
| return { | |
| "doc_id": doc_id, | |
| "status": "reingesting", | |
| "message": "Ponowna indeksacja uruchomiona w tle.", | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(500, detail=str(e)) | |
| async def delete_document( | |
| project_id: str, | |
| doc_id: str, | |
| token: Optional[str] = Query(default=None, alias="token"), | |
| ): | |
| """Usuwa dokument z dysku i Pinecone (jeśli zaindeksowany).""" | |
| try: | |
| from core.subscription.db import SessionLocal | |
| from core.projects.models import ProjectDocument | |
| db = SessionLocal() | |
| doc = ( | |
| db.query(ProjectDocument) | |
| .filter( | |
| ProjectDocument.id == doc_id, | |
| ProjectDocument.project_id == project_id, | |
| ) | |
| .first() | |
| ) | |
| if not doc: | |
| raise HTTPException(404, detail="Dokument nie istnieje.") | |
| # Usuń plik z dysku | |
| if doc.storage_path: | |
| fp = Path(doc.storage_path) | |
| fp.unlink(missing_ok=True) | |
| # Usuń rekord z DB | |
| db.delete(doc) | |
| db.commit() | |
| db.close() | |
| return {"message": "Dokument usunięty.", "doc_id": doc_id} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(500, detail=str(e)) | |