final_assessment / utils.py
staedi's picture
Create utils.py
dcac90b verified
# import os
# import glob
import asyncio
from ast import literal_eval
# import streamlit as st
import re
# def get_filename():
# # Return the current filename
# return os.path.realpath('__').split('/')[-1]
# def set_page_meta(path):
# # Set title
# title = path[path.rfind('/')+1:path.rfind('.py')].title().replace('_','')
# # Remove emoji from the title
# title = re.sub(r'[^a-zA-Z]','',title)
# # Generate the Page properties
# return st.Page(path,title=title)
# # Filter out unavailable framework (i.e., smolagents or llamaindex)
# def filter_framework():
# framework_filter = []
# try:
# import smolagents
# except:
# framework_filter.append('smolagents')
# try:
# import llama_index
# except:
# framework_filter.append('llamaindex')
# finally:
# try:
# import llama_index
# except:
# framework_filter.append('llamaindex')
# return framework_filter
# def get_paths(root:str = 'pages'):
# # Get the path structure (dir/file)
# paths = glob.glob(f'{root}/**/*.py',recursive=True)
# # Available framework only
# framework_filter = filter_framework()
# paths = [path for path in paths for framework in framework_filter if path.find(framework)==-1]
# # Prepare key for each navigation
# keys = [path[path.rfind('/')+1:path.rfind('.py')] if '/' not in path[path.find('/')+1:] else path[path.find('/')+1:path.rfind('/')] for path in paths]
# # Replace emoji in the Category
# keys = [re.sub(r'[^a-zA-Z]','',key) for key in keys]
# # Initialize dictionary
# page_dict = {key:[] for key in keys}
# # Populate the dictionary
# for key, path in zip(keys, paths):
# page_dict.get(key).append(set_page_meta(path))
# # Convert key to title()
# pages = {key.title():value for key, value in page_dict.items()}
# return pages
# # Validate if the tool/agent (namely, work) has been created
# def validate_agent(framework:str,work_type=None,task_type=None) -> bool:
# status = False
# if work_type and task_type:
# if 'agent' in st.session_state and \
# st.session_state.agent['framework'] == framework and \
# st.session_state.agent['work_type'] == work_type and \
# st.session_state.agent['task_type'] == task_type and \
# st.session_state.agent['agent']:
# status = True
# # task_type (tools) not specified
# elif work_type:
# if 'agent' in st.session_state and \
# st.session_state.agent['framework'] == framework and \
# st.session_state.agent['work_type'] == work_type and \
# st.session_state.agent['agent']:
# status = True
# else:
# if 'agent' in st.session_state and \
# st.session_state.agent['framework'] == framework and \
# st.session_state.agent['agent']:
# status = True
# return status
# Convert the type-safe conversion
def check_value(input):
return literal_eval(input)
# Silence the torch error
def init_async():
import torch
torch.classes.__path__ = [] # add this line to manually set it to empty.
def run_async_task(async_func, *args):
"""
Run an asynchronous function in a new event loop.
Args:
async_func (coroutine): The asynchronous function to execute.
*args: Arguments to pass to the asynchronous function.
Returns:
None
"""
loop = None
try:
loop = asyncio.new_event_loop()
loop.run_until_complete(async_func(*args))
except:
# Close the existing loop if open
if loop is not None:
loop.close()
# Create a new loop for retry
loop = asyncio.new_event_loop()
loop.run_until_complete(async_func(*args))
finally:
if loop is not None:
loop.close()