File size: 4,818 Bytes
20256e7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import uuid
import logging
from ..core.celery_app import celery_app
from sqlalchemy.orm import Session
from qdrant_client import models
from ..schemas.knowledgebase import EmbeddingModelConfig
from app.db.session import SessionLocal
from app.services import document_service, kb_service, minio_service, qdrant_service, summary_service
from app.rag.chunking.methods import get_chunker
from app.rag.embedding.models import get_embedder
from app.rag.parsing import extract_text_from_file

logger = logging.getLogger(__name__)

@celery_app.task
def process_document_task(doc_id_str: str):
    db: Session = SessionLocal()
    doc = None
    try:
        doc_id = uuid.UUID(doc_id_str)
        doc = document_service.get_doc_by_id_internal(db, doc_id=doc_id) 
        if not doc:
            print(f"Task failed: Document with id {doc_id} not found.")
            return

        kb = doc.kb
        doc.processing_status = "PROCESSING"
        db.commit()
        print(f"Processing doc: {doc.name} ({doc.id}) for KB: {kb.name}")

        # Clean up existing chunks and summary (handles reprocessing)
        qdrant_service.qdrant_service.delete_points_by_doc_id(doc_id=str(doc.id))
        qdrant_service.qdrant_service.delete_summary_by_doc_id(doc_id=str(doc.id))

        # 1. Download from Minio and Parse
        file_data = minio_service.minio_client.get_object_file(doc.file_path_in_minio)
        full_text = extract_text_from_file(file_data, doc.name)

        # 2. Chunk the text
        chunker = get_chunker(kb.chunking_strategy)
        chunks = chunker.chunk(full_text)

        # 3. Embed the chunks
        print(f"Embedding with config: {kb.embedding_model}")
        if isinstance(kb.embedding_model, dict):
            embedding_config = EmbeddingModelConfig(**kb.embedding_model)
        else:
            embedding_config = kb.embedding_model
        
        embedder = get_embedder(config=embedding_config)

        # 4. Embed and Upsert in Batches
        batch_size = 32  
        total_chunks = len(chunks)
        
        for i in range(0, total_chunks, batch_size):
            batch_chunks = chunks[i:i + batch_size]
            batch_embeddings = embedder.embed(batch_chunks)
            
            points_to_upsert = []
            for j, (chunk_text, embedding_dict) in enumerate(zip(batch_chunks, batch_embeddings)):
                point_id = str(uuid.uuid4())
                payload = {
                    "kb_id": str(kb.id),
                    "doc_id": str(doc.id),
                    "doc_name": doc.name,
                    "user_id": str(doc.user_id),
                    "chunk_num": i + j + 1,
                    "chunk_content": chunk_text,
                }
                
                vector_payload = {}
                if embedding_dict.get('dense') is not None:
                    vector_payload['dense'] = embedding_dict['dense']
                if embedding_dict.get('sparse') is not None:
                    vector_payload['sparse'] = embedding_dict['sparse']
                if embedding_dict.get('multi_vector') is not None:
                    vector_payload['multi_vector'] = embedding_dict['multi_vector']
                
                if vector_payload: 
                    points_to_upsert.append(
                        models.PointStruct(id=point_id, vector=vector_payload, payload=payload)
                    )
            
            if points_to_upsert:
                qdrant_service.qdrant_service.upsert_points(points=points_to_upsert)

        # 5. Update chunk count before summary generation
        doc.num_chunks = len(chunks)
        db.commit()

        # 6. Generate and store document summary
        logger.info(f"Starting summary generation for '{doc.name}'")
        try:
            summary_service.generate_and_store_summary(
                full_text=full_text,
                doc=doc,
                kb=kb,
                embedding_config=embedding_config
            )
            logger.info(f"Summary generation completed for '{doc.name}'")
        except Exception as summary_err:
            # Summary failure should NOT fail the entire document processing pipeline.
            # The document's chunks are already stored and usable.
            logger.error(
                f"Summary generation failed for '{doc.name}': {summary_err}. "
                f"Document will be marked COMPLETED without a summary."
            )

        # 7. Update status to COMPLETED
        doc.processing_status = "COMPLETED"
        print(f"Successfully processed document: {doc.name}")

    except Exception as e:
        print(f"Error processing document {doc_id_str}: {e}")
        if doc:
            doc.processing_status = "FAILED"
        raise
    finally:
        if doc:
            db.commit()
        db.close()