File size: 9,724 Bytes
2a8faae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import pickle
import logging
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import List, Optional, Iterable
from langchain.schema import Document
from langchain_community.vectorstores import FAISS

from .config import get_embedding_model, VECTOR_STORE_DIR, CHUNKS_PATH, NEW_DATA
from .text_processors import markdown_splitter, recursive_splitter
from . import data_loaders

logger = logging.getLogger(__name__)

MAX_WORKERS = max(2, min(8, (os.cpu_count() or 4)))


def load_company_vector_store() -> Optional[FAISS]:
    """Load existing vector store with proper error handling.
    Only attempt to load if required FAISS files are present.
    """
    try:
        store_dir = Path(VECTOR_STORE_DIR)
        index_file = store_dir / "index.faiss"
        meta_file = store_dir / "index.pkl"  # created by LangChain FAISS.save_local

        # If directory exists but files are missing, do not attempt load
        if not (index_file.exists() and meta_file.exists()):
            logger.info("Vector store not initialized yet; index files not found. Skipping load.")
            return None

        vector_store = FAISS.load_local(
            str(VECTOR_STORE_DIR),
            get_embedding_model(),
            allow_dangerous_deserialization=True,
        )
        logger.info("Successfully loaded existing vector store")
        return vector_store
    except Exception as e:
        logger.error(f"Failed to load vector store: {e}")
        return None

    
def load_chunks() -> Optional[List[Document]]:
    """Load pre-processed chunks with error handling"""
    try:
        if Path(CHUNKS_PATH).exists():
            with open(CHUNKS_PATH, 'rb') as f:
                company_chunks = pickle.load(f)
            logger.info(f"Successfully loaded {len(company_chunks)} chunks from cache")
            return company_chunks
        else:
            logger.info("No cached chunks found")
            return None
    except Exception as e:
        logger.error(f"Failed to load chunks: {e}")
        return None
    
def save_chunks(chunks: List[Document]) -> bool:
    """Save processed chunks to file"""
    try:
        # Ensure directory exists
        Path(CHUNKS_PATH).parent.mkdir(parents=True, exist_ok=True)
        
        with open(CHUNKS_PATH, 'wb') as f:
            pickle.dump(chunks, f)
        logger.info(f"Successfully saved {len(chunks)} chunks to {CHUNKS_PATH}")
        return True
    except Exception as e:
        logger.error(f"Failed to save chunks: {e}")
        return False


# --------------------------------------------------------------------------------------
# New functionality: scan new_data, load, split, and update vector store
# --------------------------------------------------------------------------------------
def _iter_files(root: Path) -> Iterable[Path]:
    """Yield PDF and Markdown files under the given root directory recursively."""
    if not root.exists():
        return []
    for p in root.rglob('*'):
        if p.is_file() and p.suffix.lower() in {'.pdf', '.md'}:
            yield p


def create_company_documents() -> List[Document]:
    """Backward-compatible wrapper to load documents from NEW_DATA.
    Prefer using create_company_documents_and_files() if you need file list.
    """
    docs, _ = create_company_documents_and_files()
    return docs


def _load_documents_for_file(file_path: Path) -> List[Document]:
    try:
        if file_path.suffix.lower() == '.pdf':
            return data_loaders.load_pdf_documents(file_path)
        return data_loaders.load_markdown_documents(file_path)
    except Exception as e:
        logger.error(f"Failed to load {file_path}: {e}")
        return []


def create_company_documents_and_files() -> tuple[List[Document], List[Path]]:
    """Create Documents list and return the exact files loaded from NEW_DATA.

    Returns:
        (documents, files)
    """
    documents: List[Document] = []
    files = list(_iter_files(NEW_DATA))
    if not files:
        logger.info(f"No new files found under {NEW_DATA}")
        return documents, []

    worker_count = min(MAX_WORKERS, len(files)) or 1
    with ThreadPoolExecutor(max_workers=worker_count) as executor:
        futures = {executor.submit(_load_documents_for_file, file_path): file_path for file_path in files}
        for future in as_completed(futures):
            documents.extend(future.result())
    logger.info(f"Loaded {len(documents)} Documents from {NEW_DATA}")
    return documents, files


def _segment_document(doc: Document) -> List[Document]:
    source_name = str(doc.metadata.get("source", "")).lower()
    if source_name.endswith('.md'):
        try:
            md_sections = markdown_splitter.split_text(doc.page_content)
            return [Document(page_content=section.page_content, metadata={**doc.metadata, **section.metadata}) for section in md_sections]
        except Exception:
            return [doc]
    return [doc]


def _split_chunk(doc: Document) -> List[Document]:
    try:
        return recursive_splitter.split_documents([doc])
    except Exception as exc:
        logger.error(f"Failed to split document {doc.metadata.get('source', 'unknown')}: {exc}")
        return []


def split_documents(documents: List[Document]) -> List[Document]:
    """Split documents using markdown headers when applicable, then recursive splitter for uniform chunks."""
    if not documents:
        return []

    # First pass: optional markdown header segmentation for .md sources
    worker_count = min(MAX_WORKERS, len(documents)) or 1
    with ThreadPoolExecutor(max_workers=worker_count) as executor:
        segmented_lists = list(executor.map(_segment_document, documents))
    segmented: List[Document] = [seg for sublist in segmented_lists for seg in sublist]

    if not segmented:
        return []

    split_worker_count = min(MAX_WORKERS, len(segmented)) or 1
    with ThreadPoolExecutor(max_workers=split_worker_count) as executor:
        chunk_lists = list(executor.map(_split_chunk, segmented))

    chunks = [chunk for chunk_list in chunk_lists for chunk in chunk_list]
    logger.info(f"Split {len(segmented)} docs into {len(chunks)} chunks")
    return chunks


def create_company_vector_store(chunks: List[Document]) -> FAISS:
    """Create a FAISS vector store from chunks and persist it."""
    if not chunks:
        raise ValueError("Cannot create vector store from empty chunks")
    vector_store = FAISS.from_documents(chunks, get_embedding_model())
    vector_store.save_local(str(VECTOR_STORE_DIR))
    logger.info("Vector store created and saved")
    return vector_store


def update_vector_store_with_chunks(chunks: List[Document]) -> FAISS:
    """Load existing store if available, add new chunks, and persist. Returns the updated store."""
    if not chunks:
        existing = load_company_vector_store()
        if existing:
            return existing

    store = load_company_vector_store()
    if store is None:
        store = create_company_vector_store(chunks)
    else:
        # Add to existing store and persist
        store.add_documents(chunks)
        store.save_local(str(VECTOR_STORE_DIR))
        logger.info(f"Added {len(chunks)} new chunks to existing vector store")
    return store


def _delete_paths(paths: List[Path]) -> None:
    """Delete given files, logging any failures."""
    for p in paths:
        try:
            if p.exists() and p.is_file():
                p.unlink()
                logger.info(f"Deleted processed file: {p}")
        except Exception as e:
            logger.error(f"Failed to delete {p}: {e}")


def _cleanup_empty_dirs(root: Path) -> None:
    """Remove empty directories under root (best-effort)."""
    try:
        # Walk bottom-up to remove empty directories
        dirs = [d for d in root.rglob('*') if d.is_dir()]
        for dirpath in sorted(dirs, key=lambda x: len(str(x)), reverse=True):
            try:
                if not any(dirpath.iterdir()):
                    dirpath.rmdir()
                    logger.info(f"Removed empty directory: {dirpath}")
            except Exception:
                pass
    except Exception:
        pass
def process_new_data_and_update_vector_store() -> Optional[FAISS]:
    """If there are files under data/new_data, process and add to the FAISS store.
    Also update chunks cache. After successful update, delete processed files from new_data.
    """
    try:
        docs, files = create_company_documents_and_files()
        if not docs:
            logger.info("No new documents to process.")
            return load_company_vector_store()

        chunks = split_documents(docs)

        # Save/merge chunks first (durability)
        existing_chunks = load_chunks() or []
        merged_chunks = existing_chunks + chunks

        with ThreadPoolExecutor(max_workers=2) as executor:
            save_future = executor.submit(save_chunks, merged_chunks)
            store_future = executor.submit(update_vector_store_with_chunks, chunks)
            save_success = save_future.result()
            store = store_future.result()

        if not save_success:
            logger.warning("Chunk persistence reported failure; vector store was updated but cache may be stale.")

        # If we reached here, store update succeeded; delete processed source files
        _delete_paths(files)
        _cleanup_empty_dirs(NEW_DATA)

        logger.info(
            f"Processed {len(docs)} new docs into {len(chunks)} chunks, updated vector store, and cleaned new_data."
        )
        return store
    except Exception as e:
        logger.error(f"Failed processing new_data: {e}")
        return None