| import os | |
| import shutil | |
| import tempfile | |
| import requests | |
| from langchain.document_loaders.generic import GenericLoader | |
| from langchain.document_loaders.blob_loaders import FileSystemBlobLoader | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| from langchain.vectorstores.utils import DistanceStrategy | |
| from langchain.text_splitter import CharacterTextSplitter | |
| os.makedirs('store', exist_ok = True) | |
| def download(args: dict): | |
| if not 'dir' in args: | |
| raise ValueError('require dir') | |
| if 'zip_url' in args: | |
| res = requests.get(args['zip_url']) | |
| with tempfile.NamedTemporaryFile(suffix=".zip") as t: | |
| with open(t.name, 'wb') as f: | |
| f.write(res.content) | |
| if os.path.exists(f"store/{args['dir']}"): | |
| shutil.rmtree(f"store/{args['dir']}") | |
| shutil.unpack_archive(t.name, f"store/{args['dir']}") | |
| elif 'url' in args: | |
| os.makedirs(f"store/{args['dir']}", exist_ok=True) | |
| res = requests.get(args['url']) | |
| filepath = f"store/{args['dir']}/{os.path.basename(args['url'])}" | |
| with open(filepath, 'wb') as f: | |
| f.write(res.content) | |
| elif 'text' in args: | |
| os.makedirs(f"store/{args['dir']}", exist_ok=True) | |
| filepath = f"store/{args['dir']}/text.txt" | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| f.write(args['text']) | |
| def docs_load(args: dict): | |
| loader = GenericLoader.from_filesystem( | |
| path=f"store/{args['dir']}", | |
| glob="**/[!.]*", | |
| show_progress=True, | |
| ) | |
| docs = loader.load() | |
| return docs | |
| def chunk_split(docs, chunk_size): | |
| text_splitter = CharacterTextSplitter( | |
| separator='\n\n', | |
| chunk_size=chunk_size, | |
| chunk_overlap=0, | |
| length_function=len | |
| ) | |
| chunk_docs = text_splitter.create_documents([doc.page_content for doc in docs]) | |
| return chunk_docs | |
| def vector(docs, args: dict): | |
| embeddings = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large") | |
| vector_store = FAISS.from_documents(documents=docs, | |
| embedding=embeddings, | |
| distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT, | |
| normalize_L2=True) | |
| return vector_store | |
| def vector_save(docs, args: dict): | |
| vector_store = vector(docs, args) | |
| folder_path = f"store/{args['dir']}/vector" | |
| vector_store.save_local(folder_path=folder_path) | |
| return vector_store | |
| def vector_load(args: dict): | |
| folder_path = f"store/{args['dir']}/vector" | |
| if not os.path.exists(folder_path): | |
| raise ValueError(f"missing store/{args['dir']}/vector") | |
| embeddings = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large") | |
| vector_store = FAISS.load_local(folder_path=folder_path, | |
| embeddings=embeddings, | |
| distance_strategy=DistanceStrategy.MAX_INNER_PRODUCT, | |
| normalize_L2=True) | |
| return vector_store | |
| def search(vector_store, args: dict): | |
| results = vector_store.similarity_search_with_score(query=args['query'], k=args['k']) | |
| detail = [] | |
| for r in results: | |
| detail.append([r[0].page_content, float(r[1])]) | |
| return results[0][0].page_content, detail | |
| def load_dirs(): | |
| dirs = [] | |
| for name in os.listdir('store'): | |
| dirs.append(name) | |
| return dirs | |
| def upload(dir, chunk_size, file): | |
| if not dir: | |
| raise ValueError('require dir') | |
| args = { | |
| 'dir': dir, | |
| 'chunk_size': int(chunk_size), | |
| } | |
| if os.path.exists(f"store/{args['dir']}"): | |
| shutil.rmtree(f"store/{args['dir']}") | |
| shutil.unpack_archive(file.name, f"store/{args['dir']}") | |
| docs = docs_load(args) | |
| if args['chunk_size'] > 0: | |
| docs = chunk_split(docs, int(chunk_size)) | |
| vector_save(docs, args) | |
| return f"saved store/{args['dir']}" | |