Spaces:
Sleeping
Sleeping
File size: 3,386 Bytes
e8d7414 f961e0d 411c555 f961e0d 5676359 f961e0d 9c10d0c f961e0d 411c555 f961e0d e8d7414 5676359 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | import asyncio
import os
import aiofiles
import shutil
from src.utils import PineconeClient
from loguru import logger
from dotenv import load_dotenv
load_dotenv()
BASE_FOLDER_PATH = "data"
INPUT_DATA_FOLDER_PATH = os.path.join(BASE_FOLDER_PATH, "rag_input_data")
PROCESSED_DATA_FOLDER_PATH = os.path.join(BASE_FOLDER_PATH, "rag_processed_data")
TABLE_DATA_FILE_SUFFIX = "_table_details.txt"
COLUMNS_DETAILS_FILE_SUFFIX = "_columns_details.txt"
if not os.path.exists(PROCESSED_DATA_FOLDER_PATH):
os.makedirs(PROCESSED_DATA_FOLDER_PATH)
async def _is_input_data_exist():
files = os.listdir(INPUT_DATA_FOLDER_PATH)
return len(files) > 0
async def _list_files(is_table_files=True):
files = os.listdir(INPUT_DATA_FOLDER_PATH)
if is_table_files:
files = [file for file in files if TABLE_DATA_FILE_SUFFIX in file]
else:
files = [file for file in files if COLUMNS_DETAILS_FILE_SUFFIX in file]
return files
async def _get_table_data():
files = await _list_files(is_table_files=True)
data = []
for file in files:
async with aiofiles.open(os.path.join(INPUT_DATA_FOLDER_PATH, file), "r") as f:
file_content = await f.read()
texts = file_content.split("\n\n")
table_name = texts[0].split(": ")[1]
metadata = {
"table_name": table_name,
"column_name": None,
}
for text in texts[1:]:
data.append({"text": text, "metadata": metadata})
shutil.move(
os.path.join(INPUT_DATA_FOLDER_PATH, file),
os.path.join(PROCESSED_DATA_FOLDER_PATH, file),
)
return data
async def _get_column_data():
files = await _list_files(is_table_files=False)
data = []
for file in files:
async with aiofiles.open(os.path.join(INPUT_DATA_FOLDER_PATH, file), "r") as f:
file_content = await f.read()
texts = file_content.split("\n\n")
table_name = texts[0].split(": ")[1]
for text in texts[1:]:
text = text.lstrip("\n")
column_name = text.split("\n")[0].split(": ")[1]
metadata = {
"table_name": table_name,
"column_name": column_name,
}
data.append({"text": text, "metadata": metadata})
shutil.move(
os.path.join(INPUT_DATA_FOLDER_PATH, file),
os.path.join(PROCESSED_DATA_FOLDER_PATH, file),
)
return data
async def _upsert_data(data):
texts = [item["text"] for item in data]
metadatas = [item["metadata"] for item in data]
async with PineconeClient() as pinecone_client:
await pinecone_client.upsert(texts=texts, metadatas=metadatas)
async def seed_vector_data():
logger.info("Seeding vector data...")
if not await _is_input_data_exist():
logger.info("No data to seed vector data.")
return
table_data = await _get_table_data()
await _upsert_data(table_data)
logger.info("Table data seeded successfully...")
column_data = await _get_column_data()
await _upsert_data(column_data)
logger.info("Column data seeded successfully...")
logger.info("Vector data seeded successfully...")
async def main():
await seed_vector_data()
if __name__ == "__main__":
asyncio.run(main())
|