firermsdata-agent / seed_vector_data.py
Aryan Jain
migrate to pinecone and show graph color
411c555
import asyncio
import os
import aiofiles
import shutil
from src.utils import PineconeClient
from loguru import logger
from dotenv import load_dotenv
load_dotenv()
BASE_FOLDER_PATH = "data"
INPUT_DATA_FOLDER_PATH = os.path.join(BASE_FOLDER_PATH, "rag_input_data")
PROCESSED_DATA_FOLDER_PATH = os.path.join(BASE_FOLDER_PATH, "rag_processed_data")
TABLE_DATA_FILE_SUFFIX = "_table_details.txt"
COLUMNS_DETAILS_FILE_SUFFIX = "_columns_details.txt"
if not os.path.exists(PROCESSED_DATA_FOLDER_PATH):
os.makedirs(PROCESSED_DATA_FOLDER_PATH)
async def _is_input_data_exist():
files = os.listdir(INPUT_DATA_FOLDER_PATH)
return len(files) > 0
async def _list_files(is_table_files=True):
files = os.listdir(INPUT_DATA_FOLDER_PATH)
if is_table_files:
files = [file for file in files if TABLE_DATA_FILE_SUFFIX in file]
else:
files = [file for file in files if COLUMNS_DETAILS_FILE_SUFFIX in file]
return files
async def _get_table_data():
files = await _list_files(is_table_files=True)
data = []
for file in files:
async with aiofiles.open(os.path.join(INPUT_DATA_FOLDER_PATH, file), "r") as f:
file_content = await f.read()
texts = file_content.split("\n\n")
table_name = texts[0].split(": ")[1]
metadata = {
"table_name": table_name,
"column_name": None,
}
for text in texts[1:]:
data.append({"text": text, "metadata": metadata})
shutil.move(
os.path.join(INPUT_DATA_FOLDER_PATH, file),
os.path.join(PROCESSED_DATA_FOLDER_PATH, file),
)
return data
async def _get_column_data():
files = await _list_files(is_table_files=False)
data = []
for file in files:
async with aiofiles.open(os.path.join(INPUT_DATA_FOLDER_PATH, file), "r") as f:
file_content = await f.read()
texts = file_content.split("\n\n")
table_name = texts[0].split(": ")[1]
for text in texts[1:]:
text = text.lstrip("\n")
column_name = text.split("\n")[0].split(": ")[1]
metadata = {
"table_name": table_name,
"column_name": column_name,
}
data.append({"text": text, "metadata": metadata})
shutil.move(
os.path.join(INPUT_DATA_FOLDER_PATH, file),
os.path.join(PROCESSED_DATA_FOLDER_PATH, file),
)
return data
async def _upsert_data(data):
texts = [item["text"] for item in data]
metadatas = [item["metadata"] for item in data]
async with PineconeClient() as pinecone_client:
await pinecone_client.upsert(texts=texts, metadatas=metadatas)
async def seed_vector_data():
logger.info("Seeding vector data...")
if not await _is_input_data_exist():
logger.info("No data to seed vector data.")
return
table_data = await _get_table_data()
await _upsert_data(table_data)
logger.info("Table data seeded successfully...")
column_data = await _get_column_data()
await _upsert_data(column_data)
logger.info("Column data seeded successfully...")
logger.info("Vector data seeded successfully...")
async def main():
await seed_vector_data()
if __name__ == "__main__":
asyncio.run(main())