Spaces:
Running
Running
| import pickle | |
| import logging | |
| import os | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from pathlib import Path | |
| from typing import List, Optional, Iterable | |
| from langchain.schema import Document | |
| from langchain_community.vectorstores import FAISS | |
| from .config import get_embedding_model, VECTOR_STORE_DIR, CHUNKS_PATH, NEW_DATA | |
| from .text_processors import markdown_splitter, recursive_splitter | |
| from . import data_loaders | |
| logger = logging.getLogger(__name__) | |
| MAX_WORKERS = max(2, min(8, (os.cpu_count() or 4))) | |
| def load_company_vector_store() -> Optional[FAISS]: | |
| """Load existing vector store with proper error handling. | |
| Only attempt to load if required FAISS files are present. | |
| """ | |
| try: | |
| store_dir = Path(VECTOR_STORE_DIR) | |
| index_file = store_dir / "index.faiss" | |
| meta_file = store_dir / "index.pkl" # created by LangChain FAISS.save_local | |
| # If directory exists but files are missing, do not attempt load | |
| if not (index_file.exists() and meta_file.exists()): | |
| logger.info("Vector store not initialized yet; index files not found. Skipping load.") | |
| return None | |
| vector_store = FAISS.load_local( | |
| str(VECTOR_STORE_DIR), | |
| get_embedding_model(), | |
| allow_dangerous_deserialization=True, | |
| ) | |
| logger.info("Successfully loaded existing vector store") | |
| return vector_store | |
| except Exception as e: | |
| logger.error(f"Failed to load vector store: {e}") | |
| return None | |
| def load_chunks() -> Optional[List[Document]]: | |
| """Load pre-processed chunks with error handling""" | |
| try: | |
| if Path(CHUNKS_PATH).exists(): | |
| with open(CHUNKS_PATH, 'rb') as f: | |
| company_chunks = pickle.load(f) | |
| logger.info(f"Successfully loaded {len(company_chunks)} chunks from cache") | |
| return company_chunks | |
| else: | |
| logger.info("No cached chunks found") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Failed to load chunks: {e}") | |
| return None | |
| def save_chunks(chunks: List[Document]) -> bool: | |
| """Save processed chunks to file""" | |
| try: | |
| # Ensure directory exists | |
| Path(CHUNKS_PATH).parent.mkdir(parents=True, exist_ok=True) | |
| with open(CHUNKS_PATH, 'wb') as f: | |
| pickle.dump(chunks, f) | |
| logger.info(f"Successfully saved {len(chunks)} chunks to {CHUNKS_PATH}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to save chunks: {e}") | |
| return False | |
| # -------------------------------------------------------------------------------------- | |
| # New functionality: scan new_data, load, split, and update vector store | |
| # -------------------------------------------------------------------------------------- | |
| def _iter_files(root: Path) -> Iterable[Path]: | |
| """Yield PDF and Markdown files under the given root directory recursively.""" | |
| if not root.exists(): | |
| return [] | |
| for p in root.rglob('*'): | |
| if p.is_file() and p.suffix.lower() in {'.pdf', '.md'}: | |
| yield p | |
| def create_company_documents() -> List[Document]: | |
| """Backward-compatible wrapper to load documents from NEW_DATA. | |
| Prefer using create_company_documents_and_files() if you need file list. | |
| """ | |
| docs, _ = create_company_documents_and_files() | |
| return docs | |
| def _load_documents_for_file(file_path: Path) -> List[Document]: | |
| try: | |
| if file_path.suffix.lower() == '.pdf': | |
| return data_loaders.load_pdf_documents(file_path) | |
| return data_loaders.load_markdown_documents(file_path) | |
| except Exception as e: | |
| logger.error(f"Failed to load {file_path}: {e}") | |
| return [] | |
| def create_company_documents_and_files() -> tuple[List[Document], List[Path]]: | |
| """Create Documents list and return the exact files loaded from NEW_DATA. | |
| Returns: | |
| (documents, files) | |
| """ | |
| documents: List[Document] = [] | |
| files = list(_iter_files(NEW_DATA)) | |
| if not files: | |
| logger.info(f"No new files found under {NEW_DATA}") | |
| return documents, [] | |
| worker_count = min(MAX_WORKERS, len(files)) or 1 | |
| with ThreadPoolExecutor(max_workers=worker_count) as executor: | |
| futures = {executor.submit(_load_documents_for_file, file_path): file_path for file_path in files} | |
| for future in as_completed(futures): | |
| documents.extend(future.result()) | |
| logger.info(f"Loaded {len(documents)} Documents from {NEW_DATA}") | |
| return documents, files | |
| def _segment_document(doc: Document) -> List[Document]: | |
| source_name = str(doc.metadata.get("source", "")).lower() | |
| if source_name.endswith('.md'): | |
| try: | |
| md_sections = markdown_splitter.split_text(doc.page_content) | |
| return [Document(page_content=section.page_content, metadata={**doc.metadata, **section.metadata}) for section in md_sections] | |
| except Exception: | |
| return [doc] | |
| return [doc] | |
| def _split_chunk(doc: Document) -> List[Document]: | |
| try: | |
| return recursive_splitter.split_documents([doc]) | |
| except Exception as exc: | |
| logger.error(f"Failed to split document {doc.metadata.get('source', 'unknown')}: {exc}") | |
| return [] | |
| def split_documents(documents: List[Document]) -> List[Document]: | |
| """Split documents using markdown headers when applicable, then recursive splitter for uniform chunks.""" | |
| if not documents: | |
| return [] | |
| # First pass: optional markdown header segmentation for .md sources | |
| worker_count = min(MAX_WORKERS, len(documents)) or 1 | |
| with ThreadPoolExecutor(max_workers=worker_count) as executor: | |
| segmented_lists = list(executor.map(_segment_document, documents)) | |
| segmented: List[Document] = [seg for sublist in segmented_lists for seg in sublist] | |
| if not segmented: | |
| return [] | |
| split_worker_count = min(MAX_WORKERS, len(segmented)) or 1 | |
| with ThreadPoolExecutor(max_workers=split_worker_count) as executor: | |
| chunk_lists = list(executor.map(_split_chunk, segmented)) | |
| chunks = [chunk for chunk_list in chunk_lists for chunk in chunk_list] | |
| logger.info(f"Split {len(segmented)} docs into {len(chunks)} chunks") | |
| return chunks | |
| def create_company_vector_store(chunks: List[Document]) -> FAISS: | |
| """Create a FAISS vector store from chunks and persist it.""" | |
| if not chunks: | |
| raise ValueError("Cannot create vector store from empty chunks") | |
| vector_store = FAISS.from_documents(chunks, get_embedding_model()) | |
| vector_store.save_local(str(VECTOR_STORE_DIR)) | |
| logger.info("Vector store created and saved") | |
| return vector_store | |
| def update_vector_store_with_chunks(chunks: List[Document]) -> FAISS: | |
| """Load existing store if available, add new chunks, and persist. Returns the updated store.""" | |
| if not chunks: | |
| existing = load_company_vector_store() | |
| if existing: | |
| return existing | |
| store = load_company_vector_store() | |
| if store is None: | |
| store = create_company_vector_store(chunks) | |
| else: | |
| # Add to existing store and persist | |
| store.add_documents(chunks) | |
| store.save_local(str(VECTOR_STORE_DIR)) | |
| logger.info(f"Added {len(chunks)} new chunks to existing vector store") | |
| return store | |
| def _delete_paths(paths: List[Path]) -> None: | |
| """Delete given files, logging any failures.""" | |
| for p in paths: | |
| try: | |
| if p.exists() and p.is_file(): | |
| p.unlink() | |
| logger.info(f"Deleted processed file: {p}") | |
| except Exception as e: | |
| logger.error(f"Failed to delete {p}: {e}") | |
| def _cleanup_empty_dirs(root: Path) -> None: | |
| """Remove empty directories under root (best-effort).""" | |
| try: | |
| # Walk bottom-up to remove empty directories | |
| dirs = [d for d in root.rglob('*') if d.is_dir()] | |
| for dirpath in sorted(dirs, key=lambda x: len(str(x)), reverse=True): | |
| try: | |
| if not any(dirpath.iterdir()): | |
| dirpath.rmdir() | |
| logger.info(f"Removed empty directory: {dirpath}") | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| def process_new_data_and_update_vector_store() -> Optional[FAISS]: | |
| """If there are files under data/new_data, process and add to the FAISS store. | |
| Also update chunks cache. After successful update, delete processed files from new_data. | |
| """ | |
| try: | |
| docs, files = create_company_documents_and_files() | |
| if not docs: | |
| logger.info("No new documents to process.") | |
| return load_company_vector_store() | |
| chunks = split_documents(docs) | |
| # Save/merge chunks first (durability) | |
| existing_chunks = load_chunks() or [] | |
| merged_chunks = existing_chunks + chunks | |
| with ThreadPoolExecutor(max_workers=2) as executor: | |
| save_future = executor.submit(save_chunks, merged_chunks) | |
| store_future = executor.submit(update_vector_store_with_chunks, chunks) | |
| save_success = save_future.result() | |
| store = store_future.result() | |
| if not save_success: | |
| logger.warning("Chunk persistence reported failure; vector store was updated but cache may be stale.") | |
| # If we reached here, store update succeeded; delete processed source files | |
| _delete_paths(files) | |
| _cleanup_empty_dirs(NEW_DATA) | |
| logger.info( | |
| f"Processed {len(docs)} new docs into {len(chunks)} chunks, updated vector store, and cleaned new_data." | |
| ) | |
| return store | |
| except Exception as e: | |
| logger.error(f"Failed processing new_data: {e}") | |
| return None |