|
|
|
|
|
|
|
|
import os |
|
|
import re |
|
|
from glob import glob |
|
|
from functools import lru_cache |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
from typing import List |
|
|
|
|
|
|
|
|
|
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
|
|
|
|
|
|
try: |
|
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
|
except Exception: |
|
|
from langchain.embeddings import HuggingFaceEmbeddings |
|
|
|
|
|
from langchain_community.vectorstores import FAISS |
|
|
from langchain.schema import Document |
|
|
|
|
|
from config import Config |
|
|
|
|
|
|
|
|
try: |
|
|
from langchain_community.document_loaders import TextLoader, PyPDFLoader |
|
|
except Exception: |
|
|
from langchain.document_loaders import TextLoader, PyPDFLoader |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
from langchain_community.vectorstores import FAISS |
|
|
except Exception: |
|
|
|
|
|
from langchain_community.vectorstores import FAISS |
|
|
|
|
|
|
|
|
def _abs(p: str) -> str: |
|
|
return os.path.abspath(os.path.expanduser(p)) |
|
|
|
|
|
def clean_text(text: str) -> str: |
|
|
text = re.sub(r"[^\x00-\x7F]+", " ", text) |
|
|
return re.sub(r"\s+", " ", text).strip() |
|
|
|
|
|
def _load_text_file(path: str) -> List[Document]: |
|
|
"""Load .txt and .md files using TextLoader; return list[Document].""" |
|
|
loader = TextLoader(path, encoding="utf-8") |
|
|
docs = loader.load() |
|
|
for d in docs: |
|
|
d.page_content = clean_text(d.page_content) |
|
|
d.metadata["source"] = os.path.basename(path) |
|
|
return docs |
|
|
|
|
|
def _load_pdf(path: str) -> List[Document]: |
|
|
loader = PyPDFLoader(path) |
|
|
pages = loader.load_and_split() |
|
|
docs = [] |
|
|
for p in pages: |
|
|
p.page_content = clean_text(p.page_content) |
|
|
p.metadata["source"] = os.path.basename(path) |
|
|
docs.append(p) |
|
|
return docs |
|
|
|
|
|
def process_documents() -> List[Document]: |
|
|
""" |
|
|
Reads files matched by Config.DOC_GLOB and returns splitted document chunks. |
|
|
Supports .pdf, .md, .txt. Add more extensions if needed. |
|
|
""" |
|
|
files = glob(Config.DOC_GLOB) |
|
|
if not files: |
|
|
raise RuntimeError( |
|
|
f"No files found for DOC_GLOB={Config.DOC_GLOB} (cwd={os.getcwd()})" |
|
|
) |
|
|
|
|
|
docs = [] |
|
|
with ThreadPoolExecutor() as ex: |
|
|
for p in files: |
|
|
ext = os.path.splitext(p)[1].lower() |
|
|
if ext in [".txt", ".md", ".markdown", ".rst"]: |
|
|
docs.extend(_load_text_file(p)) |
|
|
elif ext in [".pdf"]: |
|
|
docs.extend(_load_pdf(p)) |
|
|
else: |
|
|
|
|
|
try: |
|
|
docs.extend(_load_text_file(p)) |
|
|
except Exception: |
|
|
print(f"Skipping unsupported file type: {p}") |
|
|
if not docs: |
|
|
raise RuntimeError("No documents loaded from files β check DOC_GLOB and file contents.") |
|
|
|
|
|
|
|
|
splitter = RecursiveCharacterTextSplitter( |
|
|
chunk_size=Config.CHUNK_SIZE, |
|
|
chunk_overlap=Config.CHUNK_OVERLAP |
|
|
) |
|
|
chunks = splitter.split_documents(docs) |
|
|
return chunks |
|
|
|
|
|
@lru_cache(maxsize=1) |
|
|
def _get_embeddings(): |
|
|
return HuggingFaceEmbeddings( |
|
|
model_name=Config.EMBEDDING_MODEL, |
|
|
model_kwargs={"device": getattr(Config, "EMBEDDING_DEVICE", "cpu")} |
|
|
) |
|
|
|
|
|
def load_or_create_index(force_rebuild: bool = False): |
|
|
""" |
|
|
Load FAISS index from Config.INDEX_DIR or create it from source documents. |
|
|
If loading fails, it will rebuild from documents. Set force_rebuild=True to force rebuild. |
|
|
""" |
|
|
emb = _get_embeddings() |
|
|
index_dir = _abs(Config.INDEX_DIR) |
|
|
os.makedirs(index_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
if os.path.isdir(index_dir) and not force_rebuild: |
|
|
try: |
|
|
print(f"π Attempting to load existing FAISS index from {index_dir} ...") |
|
|
return FAISS.load_local(index_dir, emb, allow_dangerous_deserialization=True) |
|
|
except Exception as e: |
|
|
print("β οΈ Failed to load existing FAISS index:", e) |
|
|
try: |
|
|
print("Index dir listing:", os.listdir(index_dir)) |
|
|
except Exception as e2: |
|
|
print("Could not list index dir:", e2) |
|
|
print("Will attempt to rebuild the index from source documents.") |
|
|
|
|
|
|
|
|
print("π Building FAISS index from source documents...") |
|
|
chunks = process_documents() |
|
|
if not chunks: |
|
|
raise RuntimeError("No chunks to index after processing documents.") |
|
|
index = FAISS.from_documents(chunks, emb) |
|
|
index.save_local(index_dir) |
|
|
print("β
FAISS index built and saved to", index_dir) |
|
|
return index |
|
|
|