grantforge-api / backend /endpoints /documents.py
GrantForge Bot
Deploy to Hugging Face
afd56bc
"""
Endpoints: Upload PDF + Re-ingest RAG — Sprint 7 / Sprint 9
POST /api/projects/{project_id}/documents
Wgrywa plik PDF do projektu, uruchamia pipeline RAG w tle (asyncio.create_task).
Pipeline: LlamaParse → (PyPDF fallback) → Hierarchical Chunking → Pinecone
Limity:
- Hard limit: max 10 plików per projekt (wszyscy plany)
- Soft limit: Free = 3 pliki, Pro/Enterprise = 50 plików
GET /api/projects/{project_id}/documents
Listuje dokumenty projektu ze statusem indeksacji.
DELETE /api/projects/{project_id}/documents/{doc_id}
Usuwa dokument z dysku i Pinecone.
POST /api/projects/{project_id}/documents/{doc_id}/reingest
Ponowna indeksacja dokumentu (np. po zmianie parametrów RAG).
"""
import os
import uuid
import logging
import asyncio
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, HTTPException, UploadFile, File, Query
from fastapi.responses import JSONResponse
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/projects", tags=["documents"])
# ── Konfiguracja ──────────────────────────────────────────────────────────────
UPLOAD_DIR = Path(os.environ.get("UPLOAD_DIR", "/data/uploads"))
MAX_FILE_SIZE_MB = 20
ALLOWED_MIME_TYPES = {"application/pdf", "application/x-pdf"}
# Limity uploadów per plan
UPLOAD_LIMIT_HARD = 10 # Max per projekt (wszystkie plany)
UPLOAD_LIMIT_FREE = 3 # Max na planie Free
UPLOAD_LIMIT_PRO = 50 # Max na planie Pro
UPLOAD_LIMIT_ENTERPRISE = 50 # Max na planie Enterprise
# ── Helpers ───────────────────────────────────────────────────────────────────
def _get_namespace(user_id: str, project_id: str) -> str:
"""Namespace Pinecone: tenant_{user_id}_{project_id}"""
return f"tenant_{user_id}_{project_id}"
def _resolve_user_id(token: Optional[str]) -> str:
"""Dekoduje JWT Clerk lub zwraca 'anonymous'."""
if not token:
return "anonymous"
try:
import jwt
if token == "dev_test_token":
return "test_dev_user"
decoded = jwt.decode(token, options={"verify_signature": False})
return decoded.get("sub", "anonymous")
except Exception:
return "anonymous"
def _get_plan_upload_limit(db, project_id: str) -> tuple[int, str]:
"""
Pobiera limit uploadów na podstawie planu subskrypcji właściciela projektu.
Zwraca (limit, plan_name).
"""
try:
from core.projects.models import Project
from core.subscription.models import UserSubscription
project = db.query(Project).filter(Project.id == project_id).first()
if not project:
return UPLOAD_LIMIT_FREE, "free"
# Pobierz plan użytkownika (jeśli model subskrypcji istnieje)
sub = (
db.query(UserSubscription)
.filter(UserSubscription.user_id == project.user_id)
.first()
if project.user_id
else None
)
plan = (sub.plan if sub else "free") or "free"
plan_lower = plan.lower()
if plan_lower in ("pro", "professional"):
return UPLOAD_LIMIT_PRO, plan_lower
elif plan_lower in ("enterprise", "business"):
return UPLOAD_LIMIT_ENTERPRISE, plan_lower
else:
return UPLOAD_LIMIT_FREE, "free"
except Exception:
# Bezpieczny fallback — nie blokuj uploadu przy błędzie odczytu planu
return UPLOAD_LIMIT_FREE, "free"
def _check_upload_limits(db, project_id: str) -> dict:
"""
Sprawdza czy użytkownik może dodać kolejny dokument.
Zwraca {'allowed': bool, 'current': int, 'limit': int, 'plan': str, 'reason': str}.
"""
from core.projects.models import ProjectDocument
current_count = (
db.query(ProjectDocument)
.filter(
ProjectDocument.project_id == project_id,
ProjectDocument.status != "deleted",
)
.count()
)
# Hard limit (bezwzględny — dotyczy wszystkich planów)
if current_count >= UPLOAD_LIMIT_HARD:
return {
"allowed": False,
"current": current_count,
"limit": UPLOAD_LIMIT_HARD,
"plan": "any",
"reason": f"Przekroczono bezwzględny limit {UPLOAD_LIMIT_HARD} plików na projekt.",
}
# Soft limit (planowy)
plan_limit, plan_name = _get_plan_upload_limit(db, project_id)
if current_count >= plan_limit:
return {
"allowed": False,
"current": current_count,
"limit": plan_limit,
"plan": plan_name,
"reason": (
f"Plan '{plan_name}' pozwala na {plan_limit} pliki PDF per projekt. "
"Usuń stare dokumenty lub przejdź na plan Pro."
),
}
return {
"allowed": True,
"current": current_count,
"limit": plan_limit,
"plan": plan_name,
"reason": "",
}
async def _save_upload(
upload: UploadFile, dest_dir: Path, doc_id: str
) -> tuple[Path, int]:
"""Zapisuje plik na dysku, zwraca (path, size_bytes)."""
dest_dir.mkdir(parents=True, exist_ok=True)
suffix = Path(upload.filename or "doc").suffix or ".pdf"
dest_path = dest_dir / f"{doc_id}{suffix}"
size = 0
chunk_size = 1024 * 256 # 256 KB chunks
with open(dest_path, "wb") as f:
while True:
chunk = await upload.read(chunk_size)
if not chunk:
break
size += len(chunk)
if size > MAX_FILE_SIZE_MB * 1024 * 1024:
dest_path.unlink(missing_ok=True)
raise HTTPException(
413, detail=f"Plik za duży. Limit: {MAX_FILE_SIZE_MB} MB"
)
f.write(chunk)
return dest_path, size
async def _run_rag_pipeline(
doc_id: str,
project_id: str,
file_path: Path,
namespace: str,
program_name: Optional[str] = None,
):
"""
Uruchamia pipeline RAG dla przesłanego dokumentu.
Wywoływany w tle przez asyncio.create_task().
Kroki:
1. Parse PDF (LlamaParse → PyPDF → Unstructured)
2. Hierarchical Chunking (Parent 2000 / Child 400)
3. Upsert do Pinecone (child) + LocalFileStore (parent)
4. Aktualizacja statusu w DB
"""
db = None
try:
from core.subscription.db import SessionLocal
from core.projects.models import ProjectDocument
db = SessionLocal()
doc = db.query(ProjectDocument).filter(ProjectDocument.id == doc_id).first()
if not doc:
logger.error(f"[RAG Upload] Dokument {doc_id} nie znaleziony w DB.")
return
# ── Krok 1: Ustaw status "processing" ──────────────────────────────
doc.status = "processing"
db.commit()
# ── Krok 2: Parse PDF ───────────────────────────────────────────────
try:
from rag_pipeline.pdf_parser import parse_pdf_from_file
except ImportError:
from backend.rag_pipeline.pdf_parser import parse_pdf_from_file
parse_result = await parse_pdf_from_file(
str(file_path),
document_type="regulamin_dotacyjny",
program_name=program_name or "Nieznany Program",
)
raw_text = parse_result.get("text", "")
parser_used = parse_result.get("parser", "unknown")
if not raw_text.strip():
raise ValueError("Parser nie wyodrębnił żadnej treści z pliku PDF.")
logger.info(
f"[RAG Upload] Dokument {doc_id}: sparsowano {len(raw_text)} znaków "
f"przez '{parser_used}'."
)
# ── Krok 3: Hierarchical Chunking ───────────────────────────────────
try:
from rag_pipeline.ingest import hierarchical_chunking
except ImportError:
from backend.rag_pipeline.ingest import hierarchical_chunking
parent_docs, child_docs = await asyncio.to_thread(
hierarchical_chunking,
text=raw_text,
source_url=file_path.name,
extra_metadata={
"source": file_path.name,
"project_id": project_id,
"document_id": doc_id,
"program_name": program_name or "Nieznany",
"is_current": True,
},
)
logger.info(
f"[RAG Upload] Chunking: {len(parent_docs)} parent, "
f"{len(child_docs)} child chunks."
)
# ── Krok 4: Upsert do Pinecone + LocalFileStore ─────────────────────
try:
from rag_pipeline.vector_store import ingest_documents
except ImportError:
from backend.rag_pipeline.vector_store import ingest_documents
await asyncio.to_thread(
ingest_documents,
parent_docs=parent_docs,
child_docs=child_docs,
namespace=namespace,
)
# ── Krok 5: Zaktualizuj rekord ──────────────────────────────────────
doc.status = "indexed"
doc.parser_used = parser_used
doc.chunks_count = len(child_docs)
doc.rag_namespace = namespace
doc.indexed_at = datetime.now(timezone.utc)
doc.processing_metadata = {
"parent_chunks": len(parent_docs),
"child_chunks": len(child_docs),
"raw_text_length": len(raw_text),
"program_name": program_name,
}
db.commit()
logger.info(
f"[RAG Upload] ✅ Dokument {doc_id} ('{file_path.name}') "
f"zaindeksowany w namespace '{namespace}'."
)
except Exception as e:
logger.error(f"[RAG Upload] ❌ Błąd pipeline dla {doc_id}: {e}", exc_info=True)
if db:
try:
from core.projects.models import ProjectDocument
doc = (
db.query(ProjectDocument)
.filter(ProjectDocument.id == doc_id)
.first()
)
if doc:
doc.status = "error"
doc.error_message = str(e)[:500]
db.commit()
except Exception:
pass
finally:
if db:
db.close()
async def _run_external_grant_pipeline(
doc_id: str,
project_id: str,
file_path: Path,
program_name: Optional[str] = None,
):
"""
Parsuje zewnętrzny wniosek dotacyjny przez LlamaParse i zapisuje jego treść w projekcie (omijając Pinecone).
"""
db = None
try:
from core.subscription.db import SessionLocal
from core.projects.models import ProjectDocument, Project
db = SessionLocal()
doc = db.query(ProjectDocument).filter(ProjectDocument.id == doc_id).first()
if not doc:
return
doc.status = "processing"
db.commit()
try:
from rag_pipeline.pdf_parser import parse_pdf_from_file
except ImportError:
from backend.rag_pipeline.pdf_parser import parse_pdf_from_file
parse_result = await parse_pdf_from_file(
str(file_path),
document_type="wniosek_zewnetrzny",
program_name=program_name or "Nieznany Program",
)
raw_text = parse_result.get("text", "")
parser_used = parse_result.get("parser", "unknown")
if not raw_text.strip():
raise ValueError(
"Parser nie wyodrębnił żadnej treści ze wskazanego wniosku."
)
project = db.query(Project).filter(Project.id == project_id).first()
if project:
if project.foreign_grant_extract_text:
project.foreign_grant_extract_text += (
"\n\n---Kolejny dokument---\n\n" + raw_text
)
else:
project.foreign_grant_extract_text = raw_text
doc.status = "indexed"
doc.parser_used = parser_used
doc.chunks_count = 0
doc.indexed_at = datetime.now(timezone.utc)
doc.processing_metadata = {
"raw_text_length": len(raw_text),
"parser": parser_used,
"type": "external_grant",
}
db.commit()
logger.info(
f"[External Grant] ✅ Wniosek zewnętrzny {doc_id} przetworzony dla projektu {project_id}."
)
except Exception as e:
logger.error(
f"[External Grant] ❌ Błąd pipeline dla {doc_id}: {e}", exc_info=True
)
if db:
try:
from core.projects.models import ProjectDocument
doc = (
db.query(ProjectDocument)
.filter(ProjectDocument.id == doc_id)
.first()
)
if doc:
doc.status = "error"
doc.error_message = str(e)[:500]
db.commit()
except Exception:
pass
finally:
if db:
db.close()
# ── Routes ────────────────────────────────────────────────────────────────────
@router.post("/{project_id}/documents")
async def upload_document(
project_id: str,
file: UploadFile = File(...),
token: Optional[str] = Query(default=None, alias="token"),
doc_type: Optional[str] = Query(default="knowledge_base", alias="doc_type"),
):
"""
Wgrywa plik PDF do projektu i uruchamia indeksację RAG w tle.
Parametry (query):
token — JWT Clerk (wymagany dla izolacji namespace)
Zwraca:
doc_id, status="uploaded", filename, wiadomość o tle
"""
# ── Walidacja pliku ─────────────────────────────────────────────────────
if not file.filename or not file.filename.lower().endswith(".pdf"):
raise HTTPException(400, detail="Obsługiwane są wyłącznie pliki PDF.")
content_type = file.content_type or ""
if (
content_type
and content_type not in ALLOWED_MIME_TYPES
and "pdf" not in content_type
):
raise HTTPException(415, detail=f"Nieprawidłowy typ pliku: {content_type}")
user_id = _resolve_user_id(token)
namespace = _get_namespace(user_id, project_id)
doc_id = str(uuid.uuid4())
# ── Weryfikacja projektu ────────────────────────────────────────────────
db = None
try:
from core.subscription.db import SessionLocal
from core.projects.models import Project, ProjectDocument
db = SessionLocal()
project = db.query(Project).filter(Project.id == project_id).first()
if not project:
raise HTTPException(404, detail="Projekt nie istnieje.")
# ── Sprawdź limity uploadów ─────────────────────────────────────────
limit_check = _check_upload_limits(db, project_id)
if not limit_check["allowed"]:
raise HTTPException(
status_code=429,
detail={
"error": "upload_limit_exceeded",
"message": limit_check["reason"],
"current_count": limit_check["current"],
"limit": limit_check["limit"],
"plan": limit_check["plan"],
"upgrade_url": "/cennik",
},
)
program_name = project.program_name
# ── Zapisz plik na dysk ─────────────────────────────────────────────
dest_dir = UPLOAD_DIR / project_id
file_path, file_size = await _save_upload(file, dest_dir, doc_id)
# ── Zapis metadanych do DB ──────────────────────────────────────────
doc_record = ProjectDocument(
id=doc_id,
project_id=project_id,
filename=file_path.name,
original_filename=file.filename,
file_size_bytes=file_size,
mime_type=file.content_type or "application/pdf",
storage_path=str(file_path),
status="uploaded",
rag_namespace=namespace if doc_type == "knowledge_base" else None,
doc_type=doc_type,
)
db.add(doc_record)
db.commit()
db.refresh(doc_record)
logger.info(
f"[Upload] Plik '{file.filename}' ({file_size // 1024}KB) "
f"zapisany jako {doc_id} dla projektu {project_id}."
)
# ── Uruchom odpowiedni pipeline w tle ───────────────────────────────
if doc_type == "external_grant":
asyncio.create_task(
_run_external_grant_pipeline(
doc_id=doc_id,
project_id=project_id,
file_path=file_path,
program_name=program_name,
)
)
else:
asyncio.create_task(
_run_rag_pipeline(
doc_id=doc_id,
project_id=project_id,
file_path=file_path,
namespace=namespace,
program_name=program_name,
)
)
return JSONResponse(
status_code=202, # Accepted — przetwarzanie w tle
content={
"doc_id": doc_id,
"filename": file.filename,
"file_size_bytes": file_size,
"status": "uploaded",
"message": (
"Plik przesłany pomyślnie. "
"Indeksacja w RAG odbywa się w tle — "
"sprawdź status przez GET /documents."
),
"namespace": namespace,
},
)
except HTTPException:
raise
except Exception as e:
logger.error(
f"[Upload] Błąd uploadu dla projektu {project_id}: {e}", exc_info=True
)
raise HTTPException(500, detail=f"Błąd wgrywania pliku: {str(e)}")
finally:
if db:
db.close()
@router.get("/{project_id}/documents")
async def list_documents(
project_id: str,
token: Optional[str] = Query(default=None, alias="token"),
):
"""Lista dokumentów projektu ze statusem indeksacji RAG + informacje o limitach."""
db = None
try:
from core.subscription.db import SessionLocal
from core.projects.models import ProjectDocument
db = SessionLocal()
docs = (
db.query(ProjectDocument)
.filter(
ProjectDocument.project_id == project_id,
ProjectDocument.status != "deleted",
)
.order_by(ProjectDocument.uploaded_at.desc())
.all()
)
# Kwota uploadu (do wyświetlenia w UI)
limit_check = _check_upload_limits(db, project_id)
return {
"project_id": project_id,
"documents": [
{
"doc_id": d.id,
"filename": d.original_filename,
"file_size_bytes": d.file_size_bytes,
"status": d.status,
"doc_type": getattr(d, "doc_type", "knowledge_base"),
"parser_used": d.parser_used,
"chunks_count": d.chunks_count,
"error_message": d.error_message,
"uploaded_at": d.uploaded_at.isoformat() if d.uploaded_at else None,
"indexed_at": d.indexed_at.isoformat() if d.indexed_at else None,
}
for d in docs
],
"total": len(docs),
# Informacje o limitach planu (dla frontendu)
"quota": {
"current": limit_check["current"],
"limit": limit_check["limit"],
"plan": limit_check["plan"],
"can_upload": limit_check["allowed"],
},
}
except Exception as e:
raise HTTPException(500, detail=str(e))
finally:
if db:
db.close()
@router.post("/{project_id}/documents/{doc_id}/reingest")
async def reingest_document(
project_id: str,
doc_id: str,
token: Optional[str] = Query(default=None, alias="token"),
):
"""
Ponowna indeksacja dokumentu w RAG.
Przydatne po zmianie parametrów chunkingu lub migracji Pinecone.
"""
user_id = _resolve_user_id(token)
namespace = _get_namespace(user_id, project_id)
try:
from core.subscription.db import SessionLocal
from core.projects.models import ProjectDocument
db = SessionLocal()
doc = (
db.query(ProjectDocument)
.filter(
ProjectDocument.id == doc_id,
ProjectDocument.project_id == project_id,
)
.first()
)
if not doc:
raise HTTPException(404, detail="Dokument nie istnieje.")
file_path = Path(doc.storage_path) if doc.storage_path else None
if not file_path or not file_path.exists():
raise HTTPException(410, detail="Plik źródłowy nie istnieje na dysku.")
# Reset statusu
doc.status = "uploaded"
doc.error_message = None
doc.chunks_count = None
doc.indexed_at = None
db.commit()
db.close()
# Pipeline RAG w tle
asyncio.create_task(
_run_rag_pipeline(
doc_id=doc_id,
project_id=project_id,
file_path=file_path,
namespace=namespace,
)
)
return {
"doc_id": doc_id,
"status": "reingesting",
"message": "Ponowna indeksacja uruchomiona w tle.",
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, detail=str(e))
@router.delete("/{project_id}/documents/{doc_id}")
async def delete_document(
project_id: str,
doc_id: str,
token: Optional[str] = Query(default=None, alias="token"),
):
"""Usuwa dokument z dysku i Pinecone (jeśli zaindeksowany)."""
try:
from core.subscription.db import SessionLocal
from core.projects.models import ProjectDocument
db = SessionLocal()
doc = (
db.query(ProjectDocument)
.filter(
ProjectDocument.id == doc_id,
ProjectDocument.project_id == project_id,
)
.first()
)
if not doc:
raise HTTPException(404, detail="Dokument nie istnieje.")
# Usuń plik z dysku
if doc.storage_path:
fp = Path(doc.storage_path)
fp.unlink(missing_ok=True)
# Usuń rekord z DB
db.delete(doc)
db.commit()
db.close()
return {"message": "Dokument usunięty.", "doc_id": doc_id}
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, detail=str(e))