File size: 4,253 Bytes
f9ad313
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""

Document Processor for RAG.



Converts database rows into semantic documents for embedding.

"""

import logging
import hashlib
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Generator
import re

logger = logging.getLogger(__name__)


@dataclass
class Document:
    """Semantic document from the database."""
    id: str
    content: str
    metadata: Dict[str, Any] = field(default_factory=dict)
    table_name: str = ""
    column_name: str = ""
    primary_key_value: Optional[str] = None
    chunk_index: int = 0
    total_chunks: int = 1
    
    def __post_init__(self):
        if not self.id:
            hash_input = f"{self.table_name}:{self.column_name}:{self.primary_key_value}:{self.chunk_index}"
            self.id = hashlib.md5(hash_input.encode()).hexdigest()
    
    def to_context_string(self) -> str:
        source = f"[Source: {self.table_name}.{self.column_name}"
        if self.primary_key_value:
            source += f" (id: {self.primary_key_value})"
        source += "]"
        return f"{source}\n{self.content}"


class TextChunker:
    """Splits long text into overlapping chunks."""
    
    def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.sentence_pattern = re.compile(r'(?<=[.!?])\s+(?=[A-Z])')
    
    def chunk_text(self, text: str) -> List[str]:
        if not text or len(text) <= self.chunk_size:
            return [text] if text else []
        
        sentences = self.sentence_pattern.split(text)
        chunks = []
        current_chunk = []
        current_length = 0
        
        for sentence in sentences:
            sentence = sentence.strip()
            if not sentence:
                continue
            
            if current_length + len(sentence) + 1 > self.chunk_size:
                if current_chunk:
                    chunks.append(' '.join(current_chunk))
                current_chunk = [sentence]
                current_length = len(sentence)
            else:
                current_chunk.append(sentence)
                current_length += len(sentence) + 1
        
        if current_chunk:
            chunks.append(' '.join(current_chunk))
        
        return chunks if chunks else [text]


class DocumentProcessor:
    """Converts database rows into semantic documents."""
    
    def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
        self.chunker = TextChunker(chunk_size, chunk_overlap)
    
    def process_row(

        self, row: Dict[str, Any], table_name: str,

        text_columns: List[str], primary_key_column: Optional[str] = None

    ) -> List[Document]:
        documents = []
        pk_value = str(row.get(primary_key_column, "")) if primary_key_column else None
        
        for column_name in text_columns:
            text = row.get(column_name)
            if not text or not isinstance(text, str):
                continue
            
            text = text.strip()
            if not text:
                continue
            
            chunks = self.chunker.chunk_text(text)
            for i, chunk in enumerate(chunks):
                doc = Document(
                    id="", content=chunk, table_name=table_name,
                    column_name=column_name, primary_key_value=pk_value,
                    chunk_index=i, total_chunks=len(chunks),
                    metadata={"table": table_name, "column": column_name, "pk": pk_value}
                )
                documents.append(doc)
        
        return documents
    
    def process_rows(

        self, rows: List[Dict[str, Any]], table_name: str,

        text_columns: List[str], primary_key_column: Optional[str] = None

    ) -> Generator[Document, None, None]:
        for row in rows:
            for doc in self.process_row(row, table_name, text_columns, primary_key_column):
                yield doc


def get_document_processor(chunk_size: int = 500, chunk_overlap: int = 50) -> DocumentProcessor:
    return DocumentProcessor(chunk_size, chunk_overlap)