Spaces:
Sleeping
Sleeping
| import asyncio | |
| import os | |
| import aiofiles | |
| import shutil | |
| from src.utils import PineconeClient | |
| from loguru import logger | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| BASE_FOLDER_PATH = "data" | |
| INPUT_DATA_FOLDER_PATH = os.path.join(BASE_FOLDER_PATH, "rag_input_data") | |
| PROCESSED_DATA_FOLDER_PATH = os.path.join(BASE_FOLDER_PATH, "rag_processed_data") | |
| TABLE_DATA_FILE_SUFFIX = "_table_details.txt" | |
| COLUMNS_DETAILS_FILE_SUFFIX = "_columns_details.txt" | |
| if not os.path.exists(PROCESSED_DATA_FOLDER_PATH): | |
| os.makedirs(PROCESSED_DATA_FOLDER_PATH) | |
| async def _is_input_data_exist(): | |
| files = os.listdir(INPUT_DATA_FOLDER_PATH) | |
| return len(files) > 0 | |
| async def _list_files(is_table_files=True): | |
| files = os.listdir(INPUT_DATA_FOLDER_PATH) | |
| if is_table_files: | |
| files = [file for file in files if TABLE_DATA_FILE_SUFFIX in file] | |
| else: | |
| files = [file for file in files if COLUMNS_DETAILS_FILE_SUFFIX in file] | |
| return files | |
| async def _get_table_data(): | |
| files = await _list_files(is_table_files=True) | |
| data = [] | |
| for file in files: | |
| async with aiofiles.open(os.path.join(INPUT_DATA_FOLDER_PATH, file), "r") as f: | |
| file_content = await f.read() | |
| texts = file_content.split("\n\n") | |
| table_name = texts[0].split(": ")[1] | |
| metadata = { | |
| "table_name": table_name, | |
| "column_name": None, | |
| } | |
| for text in texts[1:]: | |
| data.append({"text": text, "metadata": metadata}) | |
| shutil.move( | |
| os.path.join(INPUT_DATA_FOLDER_PATH, file), | |
| os.path.join(PROCESSED_DATA_FOLDER_PATH, file), | |
| ) | |
| return data | |
| async def _get_column_data(): | |
| files = await _list_files(is_table_files=False) | |
| data = [] | |
| for file in files: | |
| async with aiofiles.open(os.path.join(INPUT_DATA_FOLDER_PATH, file), "r") as f: | |
| file_content = await f.read() | |
| texts = file_content.split("\n\n") | |
| table_name = texts[0].split(": ")[1] | |
| for text in texts[1:]: | |
| text = text.lstrip("\n") | |
| column_name = text.split("\n")[0].split(": ")[1] | |
| metadata = { | |
| "table_name": table_name, | |
| "column_name": column_name, | |
| } | |
| data.append({"text": text, "metadata": metadata}) | |
| shutil.move( | |
| os.path.join(INPUT_DATA_FOLDER_PATH, file), | |
| os.path.join(PROCESSED_DATA_FOLDER_PATH, file), | |
| ) | |
| return data | |
| async def _upsert_data(data): | |
| texts = [item["text"] for item in data] | |
| metadatas = [item["metadata"] for item in data] | |
| async with PineconeClient() as pinecone_client: | |
| await pinecone_client.upsert(texts=texts, metadatas=metadatas) | |
| async def seed_vector_data(): | |
| logger.info("Seeding vector data...") | |
| if not await _is_input_data_exist(): | |
| logger.info("No data to seed vector data.") | |
| return | |
| table_data = await _get_table_data() | |
| await _upsert_data(table_data) | |
| logger.info("Table data seeded successfully...") | |
| column_data = await _get_column_data() | |
| await _upsert_data(column_data) | |
| logger.info("Column data seeded successfully...") | |
| logger.info("Vector data seeded successfully...") | |
| async def main(): | |
| await seed_vector_data() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |