| | """ |
| | Document processor module for Norwegian RAG chatbot. |
| | Orchestrates the document processing pipeline with remote embeddings. |
| | """ |
| |
|
| | import os |
| | import json |
| | import numpy as np |
| | from typing import List, Dict, Any, Optional, Tuple, Union |
| | from datetime import datetime |
| |
|
| | from .extractor import TextExtractor |
| | from .chunker import TextChunker |
| | from ..api.huggingface_api import HuggingFaceAPI |
| | from ..api.config import CHUNK_SIZE, CHUNK_OVERLAP |
| |
|
| | class DocumentProcessor: |
| | """ |
| | Orchestrates the document processing pipeline: |
| | 1. Extract text from documents |
| | 2. Split text into chunks |
| | 3. Generate embeddings using remote API |
| | 4. Store processed documents and embeddings |
| | """ |
| | |
| | def __init__( |
| | self, |
| | api_client: Optional[HuggingFaceAPI] = None, |
| | documents_dir: str = "/home/ubuntu/chatbot_project/data/documents", |
| | processed_dir: str = "/home/ubuntu/chatbot_project/data/processed", |
| | chunk_size: int = CHUNK_SIZE, |
| | chunk_overlap: int = CHUNK_OVERLAP, |
| | chunking_strategy: str = "paragraph" |
| | ): |
| | """ |
| | Initialize the document processor. |
| | |
| | Args: |
| | api_client: HuggingFaceAPI client for generating embeddings |
| | documents_dir: Directory for storing original documents |
| | processed_dir: Directory for storing processed documents and embeddings |
| | chunk_size: Maximum size of each chunk |
| | chunk_overlap: Overlap between consecutive chunks |
| | chunking_strategy: Strategy for chunking text ('fixed', 'paragraph', or 'sentence') |
| | """ |
| | self.api_client = api_client or HuggingFaceAPI() |
| | self.documents_dir = documents_dir |
| | self.processed_dir = processed_dir |
| | self.chunk_size = chunk_size |
| | self.chunk_overlap = chunk_overlap |
| | self.chunking_strategy = chunking_strategy |
| | |
| | |
| | os.makedirs(self.documents_dir, exist_ok=True) |
| | os.makedirs(self.processed_dir, exist_ok=True) |
| | |
| | |
| | self.document_index_path = os.path.join(self.processed_dir, "document_index.json") |
| | self.document_index = self._load_document_index() |
| | |
| | def process_document( |
| | self, |
| | file_path: str, |
| | document_id: Optional[str] = None, |
| | metadata: Optional[Dict[str, Any]] = None |
| | ) -> str: |
| | """ |
| | Process a document through the entire pipeline. |
| | |
| | Args: |
| | file_path: Path to the document file |
| | document_id: Optional custom document ID |
| | metadata: Optional metadata for the document |
| | |
| | Returns: |
| | Document ID |
| | """ |
| | |
| | if document_id is None: |
| | document_id = f"doc_{datetime.now().strftime('%Y%m%d%H%M%S')}_{os.path.basename(file_path)}" |
| | |
| | |
| | text = TextExtractor.extract_from_file(file_path) |
| | if not text: |
| | raise ValueError(f"Failed to extract text from {file_path}") |
| | |
| | |
| | chunks = TextChunker.chunk_text( |
| | text, |
| | chunk_size=self.chunk_size, |
| | chunk_overlap=self.chunk_overlap, |
| | strategy=self.chunking_strategy |
| | ) |
| | |
| | |
| | chunks = [TextChunker.clean_chunk(chunk) for chunk in chunks] |
| | |
| | |
| | embeddings = self.api_client.generate_embeddings(chunks) |
| | |
| | |
| | if metadata is None: |
| | metadata = {} |
| | |
| | metadata.update({ |
| | "filename": os.path.basename(file_path), |
| | "processed_date": datetime.now().isoformat(), |
| | "chunk_count": len(chunks), |
| | "chunking_strategy": self.chunking_strategy, |
| | "embedding_model": self.api_client.embedding_model_id |
| | }) |
| | |
| | |
| | self._save_processed_document(document_id, chunks, embeddings, metadata) |
| | |
| | |
| | self._update_document_index(document_id, metadata) |
| | |
| | return document_id |
| | |
| | def process_text( |
| | self, |
| | text: str, |
| | document_id: Optional[str] = None, |
| | metadata: Optional[Dict[str, Any]] = None |
| | ) -> str: |
| | """ |
| | Process text directly through the pipeline. |
| | |
| | Args: |
| | text: Text content to process |
| | document_id: Optional custom document ID |
| | metadata: Optional metadata for the document |
| | |
| | Returns: |
| | Document ID |
| | """ |
| | |
| | if document_id is None: |
| | document_id = f"text_{datetime.now().strftime('%Y%m%d%H%M%S')}" |
| | |
| | |
| | chunks = TextChunker.chunk_text( |
| | text, |
| | chunk_size=self.chunk_size, |
| | chunk_overlap=self.chunk_overlap, |
| | strategy=self.chunking_strategy |
| | ) |
| | |
| | |
| | chunks = [TextChunker.clean_chunk(chunk) for chunk in chunks] |
| | |
| | |
| | embeddings = self.api_client.generate_embeddings(chunks) |
| | |
| | |
| | if metadata is None: |
| | metadata = {} |
| | |
| | metadata.update({ |
| | "source": "direct_text", |
| | "processed_date": datetime.now().isoformat(), |
| | "chunk_count": len(chunks), |
| | "chunking_strategy": self.chunking_strategy, |
| | "embedding_model": self.api_client.embedding_model_id |
| | }) |
| | |
| | |
| | self._save_processed_document(document_id, chunks, embeddings, metadata) |
| | |
| | |
| | self._update_document_index(document_id, metadata) |
| | |
| | return document_id |
| | |
| | def get_document_chunks(self, document_id: str) -> List[str]: |
| | """ |
| | Get all chunks for a document. |
| | |
| | Args: |
| | document_id: Document ID |
| | |
| | Returns: |
| | List of text chunks |
| | """ |
| | document_path = os.path.join(self.processed_dir, f"{document_id}.json") |
| | if not os.path.exists(document_path): |
| | raise FileNotFoundError(f"Document not found: {document_id}") |
| | |
| | with open(document_path, 'r', encoding='utf-8') as f: |
| | document_data = json.load(f) |
| | |
| | return document_data.get("chunks", []) |
| | |
| | def get_document_embeddings(self, document_id: str) -> List[List[float]]: |
| | """ |
| | Get all embeddings for a document. |
| | |
| | Args: |
| | document_id: Document ID |
| | |
| | Returns: |
| | List of embedding vectors |
| | """ |
| | document_path = os.path.join(self.processed_dir, f"{document_id}.json") |
| | if not os.path.exists(document_path): |
| | raise FileNotFoundError(f"Document not found: {document_id}") |
| | |
| | with open(document_path, 'r', encoding='utf-8') as f: |
| | document_data = json.load(f) |
| | |
| | return document_data.get("embeddings", []) |
| | |
| | def get_all_documents(self) -> Dict[str, Dict[str, Any]]: |
| | """ |
| | Get all documents in the index. |
| | |
| | Returns: |
| | Dictionary of document IDs to metadata |
| | """ |
| | return self.document_index |
| | |
| | def delete_document(self, document_id: str) -> bool: |
| | """ |
| | Delete a document and its processed data. |
| | |
| | Args: |
| | document_id: Document ID |
| | |
| | Returns: |
| | True if successful, False otherwise |
| | """ |
| | if document_id not in self.document_index: |
| | return False |
| | |
| | |
| | del self.document_index[document_id] |
| | self._save_document_index() |
| | |
| | |
| | document_path = os.path.join(self.processed_dir, f"{document_id}.json") |
| | if os.path.exists(document_path): |
| | os.remove(document_path) |
| | |
| | return True |
| | |
| | def _save_processed_document( |
| | self, |
| | document_id: str, |
| | chunks: List[str], |
| | embeddings: List[List[float]], |
| | metadata: Dict[str, Any] |
| | ) -> None: |
| | """ |
| | Save processed document data. |
| | |
| | Args: |
| | document_id: Document ID |
| | chunks: List of text chunks |
| | embeddings: List of embedding vectors |
| | metadata: Document metadata |
| | """ |
| | document_data = { |
| | "document_id": document_id, |
| | "metadata": metadata, |
| | "chunks": chunks, |
| | "embeddings": embeddings |
| | } |
| | |
| | document_path = os.path.join(self.processed_dir, f"{document_id}.json") |
| | with open(document_path, 'w', encoding='utf-8') as f: |
| | json.dump(document_data, f, ensure_ascii=False, indent=2) |
| | |
| | def _load_document_index(self) -> Dict[str, Dict[str, Any]]: |
| | """ |
| | Load the document index from disk. |
| | |
| | Returns: |
| | Dictionary of document IDs to metadata |
| | """ |
| | if os.path.exists(self.document_index_path): |
| | try: |
| | with open(self.document_index_path, 'r', encoding='utf-8') as f: |
| | return json.load(f) |
| | except Exception as e: |
| | print(f"Error loading document index: {str(e)}") |
| | |
| | return {} |
| | |
| | def _save_document_index(self) -> None: |
| | """ |
| | Save the document index to disk. |
| | """ |
| | with open(self.document_index_path, 'w', encoding='utf-8') as f: |
| | json.dump(self.document_index, f, ensure_ascii=False, indent=2) |
| | |
| | def _update_document_index(self, document_id: str, metadata: Dict[str, Any]) -> None: |
| | """ |
| | Update the document index with a new or updated document. |
| | |
| | Args: |
| | document_id: Document ID |
| | metadata: Document metadata |
| | """ |
| | self.document_index[document_id] = metadata |
| | self._save_document_index() |
| |
|