Spaces:
Running
Running
File size: 4,253 Bytes
f9ad313 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
"""
Document Processor for RAG.
Converts database rows into semantic documents for embedding.
"""
import logging
import hashlib
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Generator
import re
logger = logging.getLogger(__name__)
@dataclass
class Document:
"""Semantic document from the database."""
id: str
content: str
metadata: Dict[str, Any] = field(default_factory=dict)
table_name: str = ""
column_name: str = ""
primary_key_value: Optional[str] = None
chunk_index: int = 0
total_chunks: int = 1
def __post_init__(self):
if not self.id:
hash_input = f"{self.table_name}:{self.column_name}:{self.primary_key_value}:{self.chunk_index}"
self.id = hashlib.md5(hash_input.encode()).hexdigest()
def to_context_string(self) -> str:
source = f"[Source: {self.table_name}.{self.column_name}"
if self.primary_key_value:
source += f" (id: {self.primary_key_value})"
source += "]"
return f"{source}\n{self.content}"
class TextChunker:
"""Splits long text into overlapping chunks."""
def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.sentence_pattern = re.compile(r'(?<=[.!?])\s+(?=[A-Z])')
def chunk_text(self, text: str) -> List[str]:
if not text or len(text) <= self.chunk_size:
return [text] if text else []
sentences = self.sentence_pattern.split(text)
chunks = []
current_chunk = []
current_length = 0
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
if current_length + len(sentence) + 1 > self.chunk_size:
if current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = [sentence]
current_length = len(sentence)
else:
current_chunk.append(sentence)
current_length += len(sentence) + 1
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks if chunks else [text]
class DocumentProcessor:
"""Converts database rows into semantic documents."""
def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
self.chunker = TextChunker(chunk_size, chunk_overlap)
def process_row(
self, row: Dict[str, Any], table_name: str,
text_columns: List[str], primary_key_column: Optional[str] = None
) -> List[Document]:
documents = []
pk_value = str(row.get(primary_key_column, "")) if primary_key_column else None
for column_name in text_columns:
text = row.get(column_name)
if not text or not isinstance(text, str):
continue
text = text.strip()
if not text:
continue
chunks = self.chunker.chunk_text(text)
for i, chunk in enumerate(chunks):
doc = Document(
id="", content=chunk, table_name=table_name,
column_name=column_name, primary_key_value=pk_value,
chunk_index=i, total_chunks=len(chunks),
metadata={"table": table_name, "column": column_name, "pk": pk_value}
)
documents.append(doc)
return documents
def process_rows(
self, rows: List[Dict[str, Any]], table_name: str,
text_columns: List[str], primary_key_column: Optional[str] = None
) -> Generator[Document, None, None]:
for row in rows:
for doc in self.process_row(row, table_name, text_columns, primary_key_column):
yield doc
def get_document_processor(chunk_size: int = 500, chunk_overlap: int = 50) -> DocumentProcessor:
return DocumentProcessor(chunk_size, chunk_overlap)
|