app / src /knowledge_bases /pipeline.py
lemdaddy's picture
Add new coingecko data endpoints
2f3a7c5
raw
history blame
2.39 kB
from chainlit import make_async
from src.data_sources.cryptocompare import CryptoCompare
from src.data_sources.coin_gecko import CoinGecko
from src.knowledge_bases.json import json_knowledge_base
coin_gecko = CoinGecko()
crypto_compare = CryptoCompare()
store_json_data_in_s3 = make_async(json_knowledge_base.store_json_data_in_s3)
async def fetch_and_load_crypto_compare_json_data():
get_all_coins = make_async(crypto_compare.get_all_coins)
get_news_categories = make_async(crypto_compare.get_news_categories)
get_top_tier_exchanges_list = make_async(crypto_compare.get_top_tier_exchanges_list)
get_all_wallets_general_info = make_async(crypto_compare.get_all_wallets_general_info)
get_all_exchanges_general_info = make_async(crypto_compare.get_all_exchanges_general_info)
all_coins = await get_all_coins()
news_categories = await get_news_categories()
top_tier_exchanges_list = await get_top_tier_exchanges_list()
all_wallets_general_info = await get_all_wallets_general_info()
all_exchanges_general_info = await get_all_exchanges_general_info()
await store_json_data_in_s3(all_coins, "crypto_compare_all_coins.json")
await store_json_data_in_s3(news_categories, "crypto_compare_news_categories.json")
await store_json_data_in_s3(top_tier_exchanges_list, "crypto_compare_top_tier_exchanges_list.json")
await store_json_data_in_s3(all_wallets_general_info, "crypto_compare_all_wallets_general_info.json")
await store_json_data_in_s3(all_exchanges_general_info, "crypto_compare_all_exchanges_general_info.json")
async def fetch_and_load_coin_gecko_json_data():
get_all_coins = make_async(coin_gecko.get_exchanges_list)
get_asset_platforms_list = make_async(coin_gecko.get_asset_platforms_list)
all_exchange_data = await get_all_coins()
asset_platforms_list = await get_asset_platforms_list()
await store_json_data_in_s3(all_exchange_data, "coin_gecko_all_exchange_data")
await store_json_data_in_s3(asset_platforms_list, "coin_gecko_asset_platforms_list")
async def load_json_data_into_knowledge_base():
load_knowledge_base = make_async(json_knowledge_base.load_knowledge_base)
await load_knowledge_base(recreate=True)
async def main():
await fetch_and_load_coin_gecko_json_data()
await fetch_and_load_crypto_compare_json_data()
await load_json_data_into_knowledge_base()