d-commerce / generate_embeddings.py
Crcs1225
hey
c135be2
import asyncio
import time
from database import db
from rag_system import rag_pipeline
def build_content_string(doc: dict) -> str:
parts = []
if doc.get("title"):
parts.append(f"Title: {doc['title']}")
if doc.get("product_description"):
parts.append(f"Description: {doc['product_description']}")
if doc.get("category"):
parts.append(f"Category: {doc['category']}")
for key, value in doc.items():
if key in ["_id", "embedding", "title", "product_description", "category"]:
continue
if isinstance(value, (str, int, float)):
parts.append(f"{key}: {value}")
return ". ".join(str(p) for p in parts if p)
async def generate_and_store_embeddings():
await db.connect()
cursor = db.collection.find({"embedding": {"$exists": False}})
updated_count = 0
batch_size = 20
async for doc in cursor:
try:
content = build_content_string(doc)
if content.strip():
embedding = await rag_pipeline.get_embeddings([content])
await db.collection.update_one(
{"_id": doc["_id"]},
{"$set": {"embedding": embedding[0]}}
)
updated_count += 1
if updated_count % 10 == 0:
print(f"βœ… Processed {updated_count} documents...")
except Exception as e:
print(f"❌ Error processing {doc.get('_id')}: {e}")
continue
time.sleep(0.2) # small delay to avoid overload
print(f"πŸŽ‰ Embedding generation completed! {updated_count} documents updated.")
if __name__ == "__main__":
asyncio.run(generate_and_store_embeddings())