import pickle import logging import os from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import List, Optional, Iterable from langchain.schema import Document from langchain_community.vectorstores import FAISS from .config import get_embedding_model, VECTOR_STORE_DIR, CHUNKS_PATH, NEW_DATA from .text_processors import markdown_splitter, recursive_splitter from . import data_loaders logger = logging.getLogger(__name__) MAX_WORKERS = max(2, min(8, (os.cpu_count() or 4))) def load_company_vector_store() -> Optional[FAISS]: """Load existing vector store with proper error handling. Only attempt to load if required FAISS files are present. """ try: store_dir = Path(VECTOR_STORE_DIR) index_file = store_dir / "index.faiss" meta_file = store_dir / "index.pkl" # created by LangChain FAISS.save_local # If directory exists but files are missing, do not attempt load if not (index_file.exists() and meta_file.exists()): logger.info("Vector store not initialized yet; index files not found. Skipping load.") return None vector_store = FAISS.load_local( str(VECTOR_STORE_DIR), get_embedding_model(), allow_dangerous_deserialization=True, ) logger.info("Successfully loaded existing vector store") return vector_store except Exception as e: logger.error(f"Failed to load vector store: {e}") return None def load_chunks() -> Optional[List[Document]]: """Load pre-processed chunks with error handling""" try: if Path(CHUNKS_PATH).exists(): with open(CHUNKS_PATH, 'rb') as f: company_chunks = pickle.load(f) logger.info(f"Successfully loaded {len(company_chunks)} chunks from cache") return company_chunks else: logger.info("No cached chunks found") return None except Exception as e: logger.error(f"Failed to load chunks: {e}") return None def save_chunks(chunks: List[Document]) -> bool: """Save processed chunks to file""" try: # Ensure directory exists Path(CHUNKS_PATH).parent.mkdir(parents=True, exist_ok=True) with open(CHUNKS_PATH, 'wb') as f: pickle.dump(chunks, f) logger.info(f"Successfully saved {len(chunks)} chunks to {CHUNKS_PATH}") return True except Exception as e: logger.error(f"Failed to save chunks: {e}") return False # -------------------------------------------------------------------------------------- # New functionality: scan new_data, load, split, and update vector store # -------------------------------------------------------------------------------------- def _iter_files(root: Path) -> Iterable[Path]: """Yield PDF and Markdown files under the given root directory recursively.""" if not root.exists(): return [] for p in root.rglob('*'): if p.is_file() and p.suffix.lower() in {'.pdf', '.md'}: yield p def create_company_documents() -> List[Document]: """Backward-compatible wrapper to load documents from NEW_DATA. Prefer using create_company_documents_and_files() if you need file list. """ docs, _ = create_company_documents_and_files() return docs def _load_documents_for_file(file_path: Path) -> List[Document]: try: if file_path.suffix.lower() == '.pdf': return data_loaders.load_pdf_documents(file_path) return data_loaders.load_markdown_documents(file_path) except Exception as e: logger.error(f"Failed to load {file_path}: {e}") return [] def create_company_documents_and_files() -> tuple[List[Document], List[Path]]: """Create Documents list and return the exact files loaded from NEW_DATA. Returns: (documents, files) """ documents: List[Document] = [] files = list(_iter_files(NEW_DATA)) if not files: logger.info(f"No new files found under {NEW_DATA}") return documents, [] worker_count = min(MAX_WORKERS, len(files)) or 1 with ThreadPoolExecutor(max_workers=worker_count) as executor: futures = {executor.submit(_load_documents_for_file, file_path): file_path for file_path in files} for future in as_completed(futures): documents.extend(future.result()) logger.info(f"Loaded {len(documents)} Documents from {NEW_DATA}") return documents, files def _segment_document(doc: Document) -> List[Document]: source_name = str(doc.metadata.get("source", "")).lower() if source_name.endswith('.md'): try: md_sections = markdown_splitter.split_text(doc.page_content) return [Document(page_content=section.page_content, metadata={**doc.metadata, **section.metadata}) for section in md_sections] except Exception: return [doc] return [doc] def _split_chunk(doc: Document) -> List[Document]: try: return recursive_splitter.split_documents([doc]) except Exception as exc: logger.error(f"Failed to split document {doc.metadata.get('source', 'unknown')}: {exc}") return [] def split_documents(documents: List[Document]) -> List[Document]: """Split documents using markdown headers when applicable, then recursive splitter for uniform chunks.""" if not documents: return [] # First pass: optional markdown header segmentation for .md sources worker_count = min(MAX_WORKERS, len(documents)) or 1 with ThreadPoolExecutor(max_workers=worker_count) as executor: segmented_lists = list(executor.map(_segment_document, documents)) segmented: List[Document] = [seg for sublist in segmented_lists for seg in sublist] if not segmented: return [] split_worker_count = min(MAX_WORKERS, len(segmented)) or 1 with ThreadPoolExecutor(max_workers=split_worker_count) as executor: chunk_lists = list(executor.map(_split_chunk, segmented)) chunks = [chunk for chunk_list in chunk_lists for chunk in chunk_list] logger.info(f"Split {len(segmented)} docs into {len(chunks)} chunks") return chunks def create_company_vector_store(chunks: List[Document]) -> FAISS: """Create a FAISS vector store from chunks and persist it.""" if not chunks: raise ValueError("Cannot create vector store from empty chunks") vector_store = FAISS.from_documents(chunks, get_embedding_model()) vector_store.save_local(str(VECTOR_STORE_DIR)) logger.info("Vector store created and saved") return vector_store def update_vector_store_with_chunks(chunks: List[Document]) -> FAISS: """Load existing store if available, add new chunks, and persist. Returns the updated store.""" if not chunks: existing = load_company_vector_store() if existing: return existing store = load_company_vector_store() if store is None: store = create_company_vector_store(chunks) else: # Add to existing store and persist store.add_documents(chunks) store.save_local(str(VECTOR_STORE_DIR)) logger.info(f"Added {len(chunks)} new chunks to existing vector store") return store def _delete_paths(paths: List[Path]) -> None: """Delete given files, logging any failures.""" for p in paths: try: if p.exists() and p.is_file(): p.unlink() logger.info(f"Deleted processed file: {p}") except Exception as e: logger.error(f"Failed to delete {p}: {e}") def _cleanup_empty_dirs(root: Path) -> None: """Remove empty directories under root (best-effort).""" try: # Walk bottom-up to remove empty directories dirs = [d for d in root.rglob('*') if d.is_dir()] for dirpath in sorted(dirs, key=lambda x: len(str(x)), reverse=True): try: if not any(dirpath.iterdir()): dirpath.rmdir() logger.info(f"Removed empty directory: {dirpath}") except Exception: pass except Exception: pass def process_new_data_and_update_vector_store() -> Optional[FAISS]: """If there are files under data/new_data, process and add to the FAISS store. Also update chunks cache. After successful update, delete processed files from new_data. """ try: docs, files = create_company_documents_and_files() if not docs: logger.info("No new documents to process.") return load_company_vector_store() chunks = split_documents(docs) # Save/merge chunks first (durability) existing_chunks = load_chunks() or [] merged_chunks = existing_chunks + chunks with ThreadPoolExecutor(max_workers=2) as executor: save_future = executor.submit(save_chunks, merged_chunks) store_future = executor.submit(update_vector_store_with_chunks, chunks) save_success = save_future.result() store = store_future.result() if not save_success: logger.warning("Chunk persistence reported failure; vector store was updated but cache may be stale.") # If we reached here, store update succeeded; delete processed source files _delete_paths(files) _cleanup_empty_dirs(NEW_DATA) logger.info( f"Processed {len(docs)} new docs into {len(chunks)} chunks, updated vector store, and cleaned new_data." ) return store except Exception as e: logger.error(f"Failed processing new_data: {e}") return None