daniel-was-taken commited on
Commit
25d31a4
·
1 Parent(s): 90e2962

Add batch processing

Browse files
Files changed (1) hide show
  1. populate_db.py +70 -13
populate_db.py CHANGED
@@ -34,6 +34,47 @@ def emb_text(text):
34
  return embedding_model.embed_query(text)
35
  # return embedding_model.encode([text], normalize_embeddings=True).tolist()[0]
36
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37
  def create_collection():
38
  """Create collection if it doesn't exist."""
39
  if milvus_client.has_collection(collection_name):
@@ -79,10 +120,11 @@ def main():
79
 
80
  docs = unstructured_document_loader()
81
 
82
- # Prepare data for insertion
83
- data_to_insert = []
 
84
 
85
- print(f"Processing {len(docs)} documents for insertion...")
86
 
87
  for i, doc in enumerate(docs):
88
  # Check text length and truncate if necessary
@@ -91,22 +133,37 @@ def main():
91
  text_content = text_content[:65000]
92
  print(f"Document {i+1} truncated from {len(doc.page_content)} to {len(text_content)} characters")
93
 
94
- # Generate embedding for the document content
95
- embedding = emb_text(text_content)
96
-
97
- # Prepare the data entry
98
- data_entry = {
99
  "id": i,
100
- "vector": embedding,
101
  "text": text_content,
102
  "metadata": doc.metadata if doc.metadata else {}
103
- }
104
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
105
  data_to_insert.append(data_entry)
106
 
107
- # Print progress every 100 documents
108
- if (i + 1) % 100 == 0:
109
- print(f"Processed {i + 1}/{len(docs)} documents")
110
 
111
  print(f"Inserting {len(data_to_insert)} documents into Milvus...")
112
 
 
34
  return embedding_model.embed_query(text)
35
  # return embedding_model.encode([text], normalize_embeddings=True).tolist()[0]
36
 
37
+ def emb_text_batch(texts):
38
+ """Generate embeddings for multiple texts in batch - more efficient."""
39
+ return embedding_model.embed_documents(texts)
40
+
41
+ def process_embeddings_in_batches(texts_to_embed, batch_size=50):
42
+ """Process embeddings in batches with error handling and fallback."""
43
+ all_embeddings = []
44
+
45
+ print(f"Generating embeddings in batches of {batch_size}...")
46
+
47
+ for i in range(0, len(texts_to_embed), batch_size):
48
+ batch_texts = texts_to_embed[i:i + batch_size]
49
+ batch_end = min(i + batch_size, len(texts_to_embed))
50
+
51
+ print(f"Processing embedding batch {i//batch_size + 1}/{(len(texts_to_embed) + batch_size - 1)//batch_size} (documents {i+1}-{batch_end})")
52
+
53
+ try:
54
+ batch_embeddings = emb_text_batch(batch_texts)
55
+ all_embeddings.extend(batch_embeddings)
56
+
57
+ # Add a small delay between batches to be respectful to the API
58
+ time.sleep(1.5)
59
+
60
+ except Exception as e:
61
+ print(f"Error processing batch {i//batch_size + 1}: {e}")
62
+ print("Falling back to individual processing for this batch...")
63
+
64
+ # Fallback to individual processing for this batch
65
+ for j, text in enumerate(batch_texts):
66
+ try:
67
+ embedding = emb_text(text)
68
+ all_embeddings.append(embedding)
69
+ print(f" Individual embedding {i+j+1} completed")
70
+ time.sleep(2) # Longer delay for individual requests
71
+ except Exception as individual_error:
72
+ print(f" Failed to process document {i+j+1}: {individual_error}")
73
+ # Use a zero vector as fallback
74
+ all_embeddings.append([0.0] * 4096)
75
+
76
+ return all_embeddings
77
+
78
  def create_collection():
79
  """Create collection if it doesn't exist."""
80
  if milvus_client.has_collection(collection_name):
 
120
 
121
  docs = unstructured_document_loader()
122
 
123
+ # Prepare texts for batch processing
124
+ texts_to_embed = []
125
+ doc_data = []
126
 
127
+ print(f"Preparing {len(docs)} documents for batch processing...")
128
 
129
  for i, doc in enumerate(docs):
130
  # Check text length and truncate if necessary
 
133
  text_content = text_content[:65000]
134
  print(f"Document {i+1} truncated from {len(doc.page_content)} to {len(text_content)} characters")
135
 
136
+ texts_to_embed.append(text_content)
137
+ doc_data.append({
 
 
 
138
  "id": i,
 
139
  "text": text_content,
140
  "metadata": doc.metadata if doc.metadata else {}
141
+ })
142
 
143
+ # Print progress every 500 documents
144
+ if (i + 1) % 500 == 0:
145
+ print(f"Prepared {i + 1}/{len(docs)} documents")
146
+
147
+ # Process embeddings in batches
148
+ all_embeddings = process_embeddings_in_batches(texts_to_embed, batch_size=25) # Smaller batch size for better reliability
149
+
150
+ # Prepare data for insertion
151
+ data_to_insert = []
152
+
153
+ print(f"Preparing {len(doc_data)} documents for Milvus insertion...")
154
+
155
+ for i, (doc_info, embedding) in enumerate(zip(doc_data, all_embeddings)):
156
+ data_entry = {
157
+ "id": doc_info["id"],
158
+ "vector": embedding,
159
+ "text": doc_info["text"],
160
+ "metadata": doc_info["metadata"]
161
+ }
162
  data_to_insert.append(data_entry)
163
 
164
+ # Print progress every 500 documents
165
+ if (i + 1) % 500 == 0:
166
+ print(f"Prepared {i + 1}/{len(doc_data)} entries for insertion")
167
 
168
  print(f"Inserting {len(data_to_insert)} documents into Milvus...")
169