Spaces:
Paused
Paused
File size: 4,382 Bytes
ff6d067 a7991fe 8e0205b a7991fe ff6d067 a7991fe ff6d067 a7991fe 8e0205b a7991fe ff6d067 4beaccd a7991fe 8e0205b a7991fe ff6d067 a7991fe 8e0205b a7991fe 8e0205b a7991fe 8e0205b a7991fe f8dcbb0 a7991fe f8dcbb0 a7991fe ff6d067 a7991fe |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
import os
import shutil
import logging
import warnings
import time
from dotenv import load_dotenv
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Suppress warnings
warnings.filterwarnings("ignore", category=UserWarning)
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
CHROMA_PATH = "./chroma_db"
PROCESSED_PATH = "data/books"
API_TIMEOUT = 60 # 1 minute timeout for API calls
CHUNK_SIZE = 100 # Number of documents to process in each chunk
DELAY_BETWEEN_FILES = 60 # 1 minute delay between processing files
def main():
logger.info("Starting document processing")
generate_data_store()
logger.info("Document processing completed")
def generate_data_store():
files = get_files_to_process()
all_chunks = []
for file in files:
chunks = process_single_file(file)
all_chunks.extend(chunks)
time.sleep(DELAY_BETWEEN_FILES)
save_to_chroma(all_chunks)
def get_files_to_process():
return [f for f in os.listdir(PROCESSED_PATH) if f.endswith('.txt')]
def process_single_file(file):
logger.info(f"Processing file: {file}")
file_path = os.path.join(PROCESSED_PATH, file)
document = load_document(file_path)
chunks = split_text([document])
return chunks
def load_document(file_path):
loader = TextLoader(file_path)
return loader.load()[0]
def calculate_chunk_size(total_lines):
if total_lines < 1000:
return 1000
elif total_lines < 5000:
return 2000
elif total_lines < 10000:
return 4000
elif total_lines < 50000:
return 8000
else:
return 16000 # For very large documents
def split_text(documents: list[Document]):
logger.info("Starting text splitting process")
chunks = []
for doc in documents:
with open(doc.metadata['source'], 'r', encoding='utf-8') as file:
lines = file.readlines()
total_lines = len(lines)
chunk_size = calculate_chunk_size(total_lines)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_size // 5, # 20% overlap
length_function=len,
separators=['\n\n', '\n', ' ', ''],
add_start_index=True,
)
doc_chunks = text_splitter.create_documents(['\n'.join(lines)], [doc.metadata])
chunks.extend(doc_chunks)
logger.info(f"Split document {doc.metadata['source']} into {len(doc_chunks)} chunks with chunk size {chunk_size} lines")
if len(doc_chunks) > 1000:
logger.warning(f"Document {doc.metadata['source']} has been split into a large number of chunks ({len(doc_chunks)}). Consider further preprocessing or summarization.")
if chunks:
sample_chunk = chunks[0]
logger.info(f"Sample chunk: {sample_chunk.page_content[:100]}...")
logger.info(f"Sample chunk metadata: {sample_chunk.metadata}")
return chunks
def save_to_chroma(chunks: list[Document]):
logger.info(f"Preparing to save chunks to Chroma at {CHROMA_PATH}")
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key, timeout=API_TIMEOUT)
# Use a single collection for all documents
collection_name = "all_documents"
db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embeddings, collection_name=collection_name)
for i in range(0, len(chunks), CHUNK_SIZE):
chunk_batch = chunks[i:i+CHUNK_SIZE]
try:
start_time = time.time()
db.add_documents(chunk_batch)
end_time = time.time()
logger.info(f"Processed and saved chunk {i//CHUNK_SIZE + 1} of {len(chunks)//CHUNK_SIZE + 1} in {end_time - start_time:.2f} seconds")
except Exception as e:
logger.error(f"Error processing chunk {i//CHUNK_SIZE + 1}: {str(e)}")
db.persist()
logger.info(f"Successfully saved {len(chunks)} chunks to {CHROMA_PATH} in collection {collection_name}")
if __name__ == "__main__":
main() |