Spaces:
Sleeping
Sleeping
File size: 4,818 Bytes
20256e7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 | import uuid
import logging
from ..core.celery_app import celery_app
from sqlalchemy.orm import Session
from qdrant_client import models
from ..schemas.knowledgebase import EmbeddingModelConfig
from app.db.session import SessionLocal
from app.services import document_service, kb_service, minio_service, qdrant_service, summary_service
from app.rag.chunking.methods import get_chunker
from app.rag.embedding.models import get_embedder
from app.rag.parsing import extract_text_from_file
logger = logging.getLogger(__name__)
@celery_app.task
def process_document_task(doc_id_str: str):
db: Session = SessionLocal()
doc = None
try:
doc_id = uuid.UUID(doc_id_str)
doc = document_service.get_doc_by_id_internal(db, doc_id=doc_id)
if not doc:
print(f"Task failed: Document with id {doc_id} not found.")
return
kb = doc.kb
doc.processing_status = "PROCESSING"
db.commit()
print(f"Processing doc: {doc.name} ({doc.id}) for KB: {kb.name}")
# Clean up existing chunks and summary (handles reprocessing)
qdrant_service.qdrant_service.delete_points_by_doc_id(doc_id=str(doc.id))
qdrant_service.qdrant_service.delete_summary_by_doc_id(doc_id=str(doc.id))
# 1. Download from Minio and Parse
file_data = minio_service.minio_client.get_object_file(doc.file_path_in_minio)
full_text = extract_text_from_file(file_data, doc.name)
# 2. Chunk the text
chunker = get_chunker(kb.chunking_strategy)
chunks = chunker.chunk(full_text)
# 3. Embed the chunks
print(f"Embedding with config: {kb.embedding_model}")
if isinstance(kb.embedding_model, dict):
embedding_config = EmbeddingModelConfig(**kb.embedding_model)
else:
embedding_config = kb.embedding_model
embedder = get_embedder(config=embedding_config)
# 4. Embed and Upsert in Batches
batch_size = 32
total_chunks = len(chunks)
for i in range(0, total_chunks, batch_size):
batch_chunks = chunks[i:i + batch_size]
batch_embeddings = embedder.embed(batch_chunks)
points_to_upsert = []
for j, (chunk_text, embedding_dict) in enumerate(zip(batch_chunks, batch_embeddings)):
point_id = str(uuid.uuid4())
payload = {
"kb_id": str(kb.id),
"doc_id": str(doc.id),
"doc_name": doc.name,
"user_id": str(doc.user_id),
"chunk_num": i + j + 1,
"chunk_content": chunk_text,
}
vector_payload = {}
if embedding_dict.get('dense') is not None:
vector_payload['dense'] = embedding_dict['dense']
if embedding_dict.get('sparse') is not None:
vector_payload['sparse'] = embedding_dict['sparse']
if embedding_dict.get('multi_vector') is not None:
vector_payload['multi_vector'] = embedding_dict['multi_vector']
if vector_payload:
points_to_upsert.append(
models.PointStruct(id=point_id, vector=vector_payload, payload=payload)
)
if points_to_upsert:
qdrant_service.qdrant_service.upsert_points(points=points_to_upsert)
# 5. Update chunk count before summary generation
doc.num_chunks = len(chunks)
db.commit()
# 6. Generate and store document summary
logger.info(f"Starting summary generation for '{doc.name}'")
try:
summary_service.generate_and_store_summary(
full_text=full_text,
doc=doc,
kb=kb,
embedding_config=embedding_config
)
logger.info(f"Summary generation completed for '{doc.name}'")
except Exception as summary_err:
# Summary failure should NOT fail the entire document processing pipeline.
# The document's chunks are already stored and usable.
logger.error(
f"Summary generation failed for '{doc.name}': {summary_err}. "
f"Document will be marked COMPLETED without a summary."
)
# 7. Update status to COMPLETED
doc.processing_status = "COMPLETED"
print(f"Successfully processed document: {doc.name}")
except Exception as e:
print(f"Error processing document {doc_id_str}: {e}")
if doc:
doc.processing_status = "FAILED"
raise
finally:
if doc:
db.commit()
db.close() |