File size: 4,382 Bytes
ff6d067
a7991fe
8e0205b
a7991fe
 
ff6d067
a7991fe
 
 
 
 
ff6d067
a7991fe
 
8e0205b
 
a7991fe
 
 
ff6d067
 
 
4beaccd
a7991fe
 
 
8e0205b
a7991fe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ff6d067
a7991fe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8e0205b
a7991fe
 
 
 
 
 
8e0205b
 
a7991fe
 
8e0205b
a7991fe
f8dcbb0
a7991fe
 
 
 
 
 
 
 
 
f8dcbb0
a7991fe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ff6d067
 
a7991fe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import os
import shutil
import logging
import warnings
import time
from dotenv import load_dotenv
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Suppress warnings
warnings.filterwarnings("ignore", category=UserWarning)

load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
CHROMA_PATH = "./chroma_db"
PROCESSED_PATH = "data/books"
API_TIMEOUT = 60  # 1 minute timeout for API calls
CHUNK_SIZE = 100  # Number of documents to process in each chunk
DELAY_BETWEEN_FILES = 60  # 1 minute delay between processing files

def main():
    logger.info("Starting document processing")
    generate_data_store()
    logger.info("Document processing completed")

def generate_data_store():
    files = get_files_to_process()
    all_chunks = []
    for file in files:
        chunks = process_single_file(file)
        all_chunks.extend(chunks)
        time.sleep(DELAY_BETWEEN_FILES)
    save_to_chroma(all_chunks)

def get_files_to_process():
    return [f for f in os.listdir(PROCESSED_PATH) if f.endswith('.txt')]

def process_single_file(file):
    logger.info(f"Processing file: {file}")
    file_path = os.path.join(PROCESSED_PATH, file)
    document = load_document(file_path)
    chunks = split_text([document])
    return chunks

def load_document(file_path):
    loader = TextLoader(file_path)
    return loader.load()[0]

def calculate_chunk_size(total_lines):
    if total_lines < 1000:
        return 1000
    elif total_lines < 5000:
        return 2000
    elif total_lines < 10000:
        return 4000
    elif total_lines < 50000:
        return 8000
    else:
        return 16000 # For very large documents

def split_text(documents: list[Document]):
    logger.info("Starting text splitting process")
    chunks = []
    for doc in documents:
        with open(doc.metadata['source'], 'r', encoding='utf-8') as file:
            lines = file.readlines()
        total_lines = len(lines)
        chunk_size = calculate_chunk_size(total_lines)
        
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_size // 5,  # 20% overlap
            length_function=len,
            separators=['\n\n', '\n', ' ', ''],
            add_start_index=True,
        )
        
        doc_chunks = text_splitter.create_documents(['\n'.join(lines)], [doc.metadata])
        chunks.extend(doc_chunks)
        
        logger.info(f"Split document {doc.metadata['source']} into {len(doc_chunks)} chunks with chunk size {chunk_size} lines")
        
        if len(doc_chunks) > 1000:
            logger.warning(f"Document {doc.metadata['source']} has been split into a large number of chunks ({len(doc_chunks)}). Consider further preprocessing or summarization.")

    if chunks:
        sample_chunk = chunks[0]
        logger.info(f"Sample chunk: {sample_chunk.page_content[:100]}...")
        logger.info(f"Sample chunk metadata: {sample_chunk.metadata}")

    return chunks

def save_to_chroma(chunks: list[Document]):
    logger.info(f"Preparing to save chunks to Chroma at {CHROMA_PATH}")
    
    embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key, timeout=API_TIMEOUT)
    
    # Use a single collection for all documents
    collection_name = "all_documents"
    db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embeddings, collection_name=collection_name)

    for i in range(0, len(chunks), CHUNK_SIZE):
        chunk_batch = chunks[i:i+CHUNK_SIZE]
        
        try:
            start_time = time.time()
            db.add_documents(chunk_batch)
            end_time = time.time()
            logger.info(f"Processed and saved chunk {i//CHUNK_SIZE + 1} of {len(chunks)//CHUNK_SIZE + 1} in {end_time - start_time:.2f} seconds")
        except Exception as e:
            logger.error(f"Error processing chunk {i//CHUNK_SIZE + 1}: {str(e)}")
    
    db.persist()
    logger.info(f"Successfully saved {len(chunks)} chunks to {CHROMA_PATH} in collection {collection_name}")

if __name__ == "__main__":
    main()