Spaces:
Running
Running
| """ | |
| Document Processor for RAG. | |
| Converts database rows into semantic documents for embedding. | |
| """ | |
| import logging | |
| import hashlib | |
| from dataclasses import dataclass, field | |
| from typing import List, Dict, Any, Optional, Generator | |
| import re | |
| logger = logging.getLogger(__name__) | |
| class Document: | |
| """Semantic document from the database.""" | |
| id: str | |
| content: str | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| table_name: str = "" | |
| column_name: str = "" | |
| primary_key_value: Optional[str] = None | |
| chunk_index: int = 0 | |
| total_chunks: int = 1 | |
| def __post_init__(self): | |
| if not self.id: | |
| hash_input = f"{self.table_name}:{self.column_name}:{self.primary_key_value}:{self.chunk_index}" | |
| self.id = hashlib.md5(hash_input.encode()).hexdigest() | |
| def to_context_string(self) -> str: | |
| source = f"[Source: {self.table_name}.{self.column_name}" | |
| if self.primary_key_value: | |
| source += f" (id: {self.primary_key_value})" | |
| source += "]" | |
| return f"{source}\n{self.content}" | |
| class TextChunker: | |
| """Splits long text into overlapping chunks.""" | |
| def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50): | |
| self.chunk_size = chunk_size | |
| self.chunk_overlap = chunk_overlap | |
| self.sentence_pattern = re.compile(r'(?<=[.!?])\s+(?=[A-Z])') | |
| def chunk_text(self, text: str) -> List[str]: | |
| if not text or len(text) <= self.chunk_size: | |
| return [text] if text else [] | |
| sentences = self.sentence_pattern.split(text) | |
| chunks = [] | |
| current_chunk = [] | |
| current_length = 0 | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if not sentence: | |
| continue | |
| if current_length + len(sentence) + 1 > self.chunk_size: | |
| if current_chunk: | |
| chunks.append(' '.join(current_chunk)) | |
| current_chunk = [sentence] | |
| current_length = len(sentence) | |
| else: | |
| current_chunk.append(sentence) | |
| current_length += len(sentence) + 1 | |
| if current_chunk: | |
| chunks.append(' '.join(current_chunk)) | |
| return chunks if chunks else [text] | |
| class DocumentProcessor: | |
| """Converts database rows into semantic documents.""" | |
| def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50): | |
| self.chunker = TextChunker(chunk_size, chunk_overlap) | |
| def process_row( | |
| self, row: Dict[str, Any], table_name: str, | |
| text_columns: List[str], primary_key_column: Optional[str] = None | |
| ) -> List[Document]: | |
| documents = [] | |
| pk_value = str(row.get(primary_key_column, "")) if primary_key_column else None | |
| for column_name in text_columns: | |
| text = row.get(column_name) | |
| if not text or not isinstance(text, str): | |
| continue | |
| text = text.strip() | |
| if not text: | |
| continue | |
| chunks = self.chunker.chunk_text(text) | |
| for i, chunk in enumerate(chunks): | |
| doc = Document( | |
| id="", content=chunk, table_name=table_name, | |
| column_name=column_name, primary_key_value=pk_value, | |
| chunk_index=i, total_chunks=len(chunks), | |
| metadata={"table": table_name, "column": column_name, "pk": pk_value} | |
| ) | |
| documents.append(doc) | |
| return documents | |
| def process_rows( | |
| self, rows: List[Dict[str, Any]], table_name: str, | |
| text_columns: List[str], primary_key_column: Optional[str] = None | |
| ) -> Generator[Document, None, None]: | |
| for row in rows: | |
| for doc in self.process_row(row, table_name, text_columns, primary_key_column): | |
| yield doc | |
| def get_document_processor(chunk_size: int = 500, chunk_overlap: int = 50) -> DocumentProcessor: | |
| return DocumentProcessor(chunk_size, chunk_overlap) | |