moazx's picture
Initial commit
2a8faae
raw
history blame
9.72 kB
import pickle
import logging
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import List, Optional, Iterable
from langchain.schema import Document
from langchain_community.vectorstores import FAISS
from .config import get_embedding_model, VECTOR_STORE_DIR, CHUNKS_PATH, NEW_DATA
from .text_processors import markdown_splitter, recursive_splitter
from . import data_loaders
logger = logging.getLogger(__name__)
MAX_WORKERS = max(2, min(8, (os.cpu_count() or 4)))
def load_company_vector_store() -> Optional[FAISS]:
"""Load existing vector store with proper error handling.
Only attempt to load if required FAISS files are present.
"""
try:
store_dir = Path(VECTOR_STORE_DIR)
index_file = store_dir / "index.faiss"
meta_file = store_dir / "index.pkl" # created by LangChain FAISS.save_local
# If directory exists but files are missing, do not attempt load
if not (index_file.exists() and meta_file.exists()):
logger.info("Vector store not initialized yet; index files not found. Skipping load.")
return None
vector_store = FAISS.load_local(
str(VECTOR_STORE_DIR),
get_embedding_model(),
allow_dangerous_deserialization=True,
)
logger.info("Successfully loaded existing vector store")
return vector_store
except Exception as e:
logger.error(f"Failed to load vector store: {e}")
return None
def load_chunks() -> Optional[List[Document]]:
"""Load pre-processed chunks with error handling"""
try:
if Path(CHUNKS_PATH).exists():
with open(CHUNKS_PATH, 'rb') as f:
company_chunks = pickle.load(f)
logger.info(f"Successfully loaded {len(company_chunks)} chunks from cache")
return company_chunks
else:
logger.info("No cached chunks found")
return None
except Exception as e:
logger.error(f"Failed to load chunks: {e}")
return None
def save_chunks(chunks: List[Document]) -> bool:
"""Save processed chunks to file"""
try:
# Ensure directory exists
Path(CHUNKS_PATH).parent.mkdir(parents=True, exist_ok=True)
with open(CHUNKS_PATH, 'wb') as f:
pickle.dump(chunks, f)
logger.info(f"Successfully saved {len(chunks)} chunks to {CHUNKS_PATH}")
return True
except Exception as e:
logger.error(f"Failed to save chunks: {e}")
return False
# --------------------------------------------------------------------------------------
# New functionality: scan new_data, load, split, and update vector store
# --------------------------------------------------------------------------------------
def _iter_files(root: Path) -> Iterable[Path]:
"""Yield PDF and Markdown files under the given root directory recursively."""
if not root.exists():
return []
for p in root.rglob('*'):
if p.is_file() and p.suffix.lower() in {'.pdf', '.md'}:
yield p
def create_company_documents() -> List[Document]:
"""Backward-compatible wrapper to load documents from NEW_DATA.
Prefer using create_company_documents_and_files() if you need file list.
"""
docs, _ = create_company_documents_and_files()
return docs
def _load_documents_for_file(file_path: Path) -> List[Document]:
try:
if file_path.suffix.lower() == '.pdf':
return data_loaders.load_pdf_documents(file_path)
return data_loaders.load_markdown_documents(file_path)
except Exception as e:
logger.error(f"Failed to load {file_path}: {e}")
return []
def create_company_documents_and_files() -> tuple[List[Document], List[Path]]:
"""Create Documents list and return the exact files loaded from NEW_DATA.
Returns:
(documents, files)
"""
documents: List[Document] = []
files = list(_iter_files(NEW_DATA))
if not files:
logger.info(f"No new files found under {NEW_DATA}")
return documents, []
worker_count = min(MAX_WORKERS, len(files)) or 1
with ThreadPoolExecutor(max_workers=worker_count) as executor:
futures = {executor.submit(_load_documents_for_file, file_path): file_path for file_path in files}
for future in as_completed(futures):
documents.extend(future.result())
logger.info(f"Loaded {len(documents)} Documents from {NEW_DATA}")
return documents, files
def _segment_document(doc: Document) -> List[Document]:
source_name = str(doc.metadata.get("source", "")).lower()
if source_name.endswith('.md'):
try:
md_sections = markdown_splitter.split_text(doc.page_content)
return [Document(page_content=section.page_content, metadata={**doc.metadata, **section.metadata}) for section in md_sections]
except Exception:
return [doc]
return [doc]
def _split_chunk(doc: Document) -> List[Document]:
try:
return recursive_splitter.split_documents([doc])
except Exception as exc:
logger.error(f"Failed to split document {doc.metadata.get('source', 'unknown')}: {exc}")
return []
def split_documents(documents: List[Document]) -> List[Document]:
"""Split documents using markdown headers when applicable, then recursive splitter for uniform chunks."""
if not documents:
return []
# First pass: optional markdown header segmentation for .md sources
worker_count = min(MAX_WORKERS, len(documents)) or 1
with ThreadPoolExecutor(max_workers=worker_count) as executor:
segmented_lists = list(executor.map(_segment_document, documents))
segmented: List[Document] = [seg for sublist in segmented_lists for seg in sublist]
if not segmented:
return []
split_worker_count = min(MAX_WORKERS, len(segmented)) or 1
with ThreadPoolExecutor(max_workers=split_worker_count) as executor:
chunk_lists = list(executor.map(_split_chunk, segmented))
chunks = [chunk for chunk_list in chunk_lists for chunk in chunk_list]
logger.info(f"Split {len(segmented)} docs into {len(chunks)} chunks")
return chunks
def create_company_vector_store(chunks: List[Document]) -> FAISS:
"""Create a FAISS vector store from chunks and persist it."""
if not chunks:
raise ValueError("Cannot create vector store from empty chunks")
vector_store = FAISS.from_documents(chunks, get_embedding_model())
vector_store.save_local(str(VECTOR_STORE_DIR))
logger.info("Vector store created and saved")
return vector_store
def update_vector_store_with_chunks(chunks: List[Document]) -> FAISS:
"""Load existing store if available, add new chunks, and persist. Returns the updated store."""
if not chunks:
existing = load_company_vector_store()
if existing:
return existing
store = load_company_vector_store()
if store is None:
store = create_company_vector_store(chunks)
else:
# Add to existing store and persist
store.add_documents(chunks)
store.save_local(str(VECTOR_STORE_DIR))
logger.info(f"Added {len(chunks)} new chunks to existing vector store")
return store
def _delete_paths(paths: List[Path]) -> None:
"""Delete given files, logging any failures."""
for p in paths:
try:
if p.exists() and p.is_file():
p.unlink()
logger.info(f"Deleted processed file: {p}")
except Exception as e:
logger.error(f"Failed to delete {p}: {e}")
def _cleanup_empty_dirs(root: Path) -> None:
"""Remove empty directories under root (best-effort)."""
try:
# Walk bottom-up to remove empty directories
dirs = [d for d in root.rglob('*') if d.is_dir()]
for dirpath in sorted(dirs, key=lambda x: len(str(x)), reverse=True):
try:
if not any(dirpath.iterdir()):
dirpath.rmdir()
logger.info(f"Removed empty directory: {dirpath}")
except Exception:
pass
except Exception:
pass
def process_new_data_and_update_vector_store() -> Optional[FAISS]:
"""If there are files under data/new_data, process and add to the FAISS store.
Also update chunks cache. After successful update, delete processed files from new_data.
"""
try:
docs, files = create_company_documents_and_files()
if not docs:
logger.info("No new documents to process.")
return load_company_vector_store()
chunks = split_documents(docs)
# Save/merge chunks first (durability)
existing_chunks = load_chunks() or []
merged_chunks = existing_chunks + chunks
with ThreadPoolExecutor(max_workers=2) as executor:
save_future = executor.submit(save_chunks, merged_chunks)
store_future = executor.submit(update_vector_store_with_chunks, chunks)
save_success = save_future.result()
store = store_future.result()
if not save_success:
logger.warning("Chunk persistence reported failure; vector store was updated but cache may be stale.")
# If we reached here, store update succeeded; delete processed source files
_delete_paths(files)
_cleanup_empty_dirs(NEW_DATA)
logger.info(
f"Processed {len(docs)} new docs into {len(chunks)} chunks, updated vector store, and cleaned new_data."
)
return store
except Exception as e:
logger.error(f"Failed processing new_data: {e}")
return None