Spaces:
Running
Running
| import asyncio | |
| import time | |
| from database import db | |
| from rag_system import rag_pipeline | |
| def build_content_string(doc: dict) -> str: | |
| parts = [] | |
| if doc.get("title"): | |
| parts.append(f"Title: {doc['title']}") | |
| if doc.get("product_description"): | |
| parts.append(f"Description: {doc['product_description']}") | |
| if doc.get("category"): | |
| parts.append(f"Category: {doc['category']}") | |
| for key, value in doc.items(): | |
| if key in ["_id", "embedding", "title", "product_description", "category"]: | |
| continue | |
| if isinstance(value, (str, int, float)): | |
| parts.append(f"{key}: {value}") | |
| return ". ".join(str(p) for p in parts if p) | |
| async def generate_and_store_embeddings(): | |
| await db.connect() | |
| cursor = db.collection.find({"embedding": {"$exists": False}}) | |
| updated_count = 0 | |
| batch_size = 20 | |
| async for doc in cursor: | |
| try: | |
| content = build_content_string(doc) | |
| if content.strip(): | |
| embedding = await rag_pipeline.get_embeddings([content]) | |
| await db.collection.update_one( | |
| {"_id": doc["_id"]}, | |
| {"$set": {"embedding": embedding[0]}} | |
| ) | |
| updated_count += 1 | |
| if updated_count % 10 == 0: | |
| print(f"β Processed {updated_count} documents...") | |
| except Exception as e: | |
| print(f"β Error processing {doc.get('_id')}: {e}") | |
| continue | |
| time.sleep(0.2) # small delay to avoid overload | |
| print(f"π Embedding generation completed! {updated_count} documents updated.") | |
| if __name__ == "__main__": | |
| asyncio.run(generate_and_store_embeddings()) |