|
|
import pickle |
|
|
import logging |
|
|
import os |
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
from pathlib import Path |
|
|
from typing import List, Optional, Iterable |
|
|
from langchain.schema import Document |
|
|
from langchain_community.vectorstores import FAISS |
|
|
|
|
|
from .config import get_embedding_model, VECTOR_STORE_DIR, CHUNKS_PATH, NEW_DATA |
|
|
from .text_processors import markdown_splitter, recursive_splitter |
|
|
from . import data_loaders |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
MAX_WORKERS = max(2, min(8, (os.cpu_count() or 4))) |
|
|
|
|
|
|
|
|
def load_company_vector_store() -> Optional[FAISS]: |
|
|
"""Load existing vector store with proper error handling. |
|
|
Only attempt to load if required FAISS files are present. |
|
|
""" |
|
|
try: |
|
|
store_dir = Path(VECTOR_STORE_DIR) |
|
|
index_file = store_dir / "index.faiss" |
|
|
meta_file = store_dir / "index.pkl" |
|
|
|
|
|
|
|
|
if not (index_file.exists() and meta_file.exists()): |
|
|
logger.info("Vector store not initialized yet; index files not found. Skipping load.") |
|
|
return None |
|
|
|
|
|
vector_store = FAISS.load_local( |
|
|
str(VECTOR_STORE_DIR), |
|
|
get_embedding_model(), |
|
|
allow_dangerous_deserialization=True, |
|
|
) |
|
|
logger.info("Successfully loaded existing vector store") |
|
|
return vector_store |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load vector store: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def load_chunks() -> Optional[List[Document]]: |
|
|
"""Load pre-processed chunks with error handling""" |
|
|
try: |
|
|
if Path(CHUNKS_PATH).exists(): |
|
|
with open(CHUNKS_PATH, 'rb') as f: |
|
|
company_chunks = pickle.load(f) |
|
|
logger.info(f"Successfully loaded {len(company_chunks)} chunks from cache") |
|
|
return company_chunks |
|
|
else: |
|
|
logger.info("No cached chunks found") |
|
|
return None |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load chunks: {e}") |
|
|
return None |
|
|
|
|
|
def save_chunks(chunks: List[Document]) -> bool: |
|
|
"""Save processed chunks to file""" |
|
|
try: |
|
|
|
|
|
Path(CHUNKS_PATH).parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
with open(CHUNKS_PATH, 'wb') as f: |
|
|
pickle.dump(chunks, f) |
|
|
logger.info(f"Successfully saved {len(chunks)} chunks to {CHUNKS_PATH}") |
|
|
return True |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to save chunks: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _iter_files(root: Path) -> Iterable[Path]: |
|
|
"""Yield PDF and Markdown files under the given root directory recursively.""" |
|
|
if not root.exists(): |
|
|
return [] |
|
|
for p in root.rglob('*'): |
|
|
if p.is_file() and p.suffix.lower() in {'.pdf', '.md'}: |
|
|
yield p |
|
|
|
|
|
|
|
|
def create_company_documents() -> List[Document]: |
|
|
"""Backward-compatible wrapper to load documents from NEW_DATA. |
|
|
Prefer using create_company_documents_and_files() if you need file list. |
|
|
""" |
|
|
docs, _ = create_company_documents_and_files() |
|
|
return docs |
|
|
|
|
|
|
|
|
def _load_documents_for_file(file_path: Path) -> List[Document]: |
|
|
try: |
|
|
if file_path.suffix.lower() == '.pdf': |
|
|
return data_loaders.load_pdf_documents(file_path) |
|
|
return data_loaders.load_markdown_documents(file_path) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load {file_path}: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
def create_company_documents_and_files() -> tuple[List[Document], List[Path]]: |
|
|
"""Create Documents list and return the exact files loaded from NEW_DATA. |
|
|
|
|
|
Returns: |
|
|
(documents, files) |
|
|
""" |
|
|
documents: List[Document] = [] |
|
|
files = list(_iter_files(NEW_DATA)) |
|
|
if not files: |
|
|
logger.info(f"No new files found under {NEW_DATA}") |
|
|
return documents, [] |
|
|
|
|
|
worker_count = min(MAX_WORKERS, len(files)) or 1 |
|
|
with ThreadPoolExecutor(max_workers=worker_count) as executor: |
|
|
futures = {executor.submit(_load_documents_for_file, file_path): file_path for file_path in files} |
|
|
for future in as_completed(futures): |
|
|
documents.extend(future.result()) |
|
|
logger.info(f"Loaded {len(documents)} Documents from {NEW_DATA}") |
|
|
return documents, files |
|
|
|
|
|
|
|
|
def _segment_document(doc: Document) -> List[Document]: |
|
|
source_name = str(doc.metadata.get("source", "")).lower() |
|
|
if source_name.endswith('.md'): |
|
|
try: |
|
|
md_sections = markdown_splitter.split_text(doc.page_content) |
|
|
return [Document(page_content=section.page_content, metadata={**doc.metadata, **section.metadata}) for section in md_sections] |
|
|
except Exception: |
|
|
return [doc] |
|
|
return [doc] |
|
|
|
|
|
|
|
|
def _split_chunk(doc: Document) -> List[Document]: |
|
|
try: |
|
|
return recursive_splitter.split_documents([doc]) |
|
|
except Exception as exc: |
|
|
logger.error(f"Failed to split document {doc.metadata.get('source', 'unknown')}: {exc}") |
|
|
return [] |
|
|
|
|
|
|
|
|
def split_documents(documents: List[Document]) -> List[Document]: |
|
|
"""Split documents using markdown headers when applicable, then recursive splitter for uniform chunks.""" |
|
|
if not documents: |
|
|
return [] |
|
|
|
|
|
|
|
|
worker_count = min(MAX_WORKERS, len(documents)) or 1 |
|
|
with ThreadPoolExecutor(max_workers=worker_count) as executor: |
|
|
segmented_lists = list(executor.map(_segment_document, documents)) |
|
|
segmented: List[Document] = [seg for sublist in segmented_lists for seg in sublist] |
|
|
|
|
|
if not segmented: |
|
|
return [] |
|
|
|
|
|
split_worker_count = min(MAX_WORKERS, len(segmented)) or 1 |
|
|
with ThreadPoolExecutor(max_workers=split_worker_count) as executor: |
|
|
chunk_lists = list(executor.map(_split_chunk, segmented)) |
|
|
|
|
|
chunks = [chunk for chunk_list in chunk_lists for chunk in chunk_list] |
|
|
logger.info(f"Split {len(segmented)} docs into {len(chunks)} chunks") |
|
|
return chunks |
|
|
|
|
|
|
|
|
def create_company_vector_store(chunks: List[Document]) -> FAISS: |
|
|
"""Create a FAISS vector store from chunks and persist it.""" |
|
|
if not chunks: |
|
|
raise ValueError("Cannot create vector store from empty chunks") |
|
|
vector_store = FAISS.from_documents(chunks, get_embedding_model()) |
|
|
vector_store.save_local(str(VECTOR_STORE_DIR)) |
|
|
logger.info("Vector store created and saved") |
|
|
return vector_store |
|
|
|
|
|
|
|
|
def update_vector_store_with_chunks(chunks: List[Document]) -> FAISS: |
|
|
"""Load existing store if available, add new chunks, and persist. Returns the updated store.""" |
|
|
if not chunks: |
|
|
existing = load_company_vector_store() |
|
|
if existing: |
|
|
return existing |
|
|
|
|
|
store = load_company_vector_store() |
|
|
if store is None: |
|
|
store = create_company_vector_store(chunks) |
|
|
else: |
|
|
|
|
|
store.add_documents(chunks) |
|
|
store.save_local(str(VECTOR_STORE_DIR)) |
|
|
logger.info(f"Added {len(chunks)} new chunks to existing vector store") |
|
|
return store |
|
|
|
|
|
|
|
|
def _delete_paths(paths: List[Path]) -> None: |
|
|
"""Delete given files, logging any failures.""" |
|
|
for p in paths: |
|
|
try: |
|
|
if p.exists() and p.is_file(): |
|
|
p.unlink() |
|
|
logger.info(f"Deleted processed file: {p}") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to delete {p}: {e}") |
|
|
|
|
|
|
|
|
def _cleanup_empty_dirs(root: Path) -> None: |
|
|
"""Remove empty directories under root (best-effort).""" |
|
|
try: |
|
|
|
|
|
dirs = [d for d in root.rglob('*') if d.is_dir()] |
|
|
for dirpath in sorted(dirs, key=lambda x: len(str(x)), reverse=True): |
|
|
try: |
|
|
if not any(dirpath.iterdir()): |
|
|
dirpath.rmdir() |
|
|
logger.info(f"Removed empty directory: {dirpath}") |
|
|
except Exception: |
|
|
pass |
|
|
except Exception: |
|
|
pass |
|
|
def process_new_data_and_update_vector_store() -> Optional[FAISS]: |
|
|
"""If there are files under data/new_data, process and add to the FAISS store. |
|
|
Also update chunks cache. After successful update, delete processed files from new_data. |
|
|
""" |
|
|
try: |
|
|
docs, files = create_company_documents_and_files() |
|
|
if not docs: |
|
|
logger.info("No new documents to process.") |
|
|
return load_company_vector_store() |
|
|
|
|
|
chunks = split_documents(docs) |
|
|
|
|
|
|
|
|
existing_chunks = load_chunks() or [] |
|
|
merged_chunks = existing_chunks + chunks |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=2) as executor: |
|
|
save_future = executor.submit(save_chunks, merged_chunks) |
|
|
store_future = executor.submit(update_vector_store_with_chunks, chunks) |
|
|
save_success = save_future.result() |
|
|
store = store_future.result() |
|
|
|
|
|
if not save_success: |
|
|
logger.warning("Chunk persistence reported failure; vector store was updated but cache may be stale.") |
|
|
|
|
|
|
|
|
_delete_paths(files) |
|
|
_cleanup_empty_dirs(NEW_DATA) |
|
|
|
|
|
logger.info( |
|
|
f"Processed {len(docs)} new docs into {len(chunks)} chunks, updated vector store, and cleaned new_data." |
|
|
) |
|
|
return store |
|
|
except Exception as e: |
|
|
logger.error(f"Failed processing new_data: {e}") |
|
|
return None |