Spaces:
Sleeping
Sleeping
File size: 3,956 Bytes
dcac90b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | # import os
# import glob
import asyncio
from ast import literal_eval
# import streamlit as st
import re
# def get_filename():
# # Return the current filename
# return os.path.realpath('__').split('/')[-1]
# def set_page_meta(path):
# # Set title
# title = path[path.rfind('/')+1:path.rfind('.py')].title().replace('_','')
# # Remove emoji from the title
# title = re.sub(r'[^a-zA-Z]','',title)
# # Generate the Page properties
# return st.Page(path,title=title)
# # Filter out unavailable framework (i.e., smolagents or llamaindex)
# def filter_framework():
# framework_filter = []
# try:
# import smolagents
# except:
# framework_filter.append('smolagents')
# try:
# import llama_index
# except:
# framework_filter.append('llamaindex')
# finally:
# try:
# import llama_index
# except:
# framework_filter.append('llamaindex')
# return framework_filter
# def get_paths(root:str = 'pages'):
# # Get the path structure (dir/file)
# paths = glob.glob(f'{root}/**/*.py',recursive=True)
# # Available framework only
# framework_filter = filter_framework()
# paths = [path for path in paths for framework in framework_filter if path.find(framework)==-1]
# # Prepare key for each navigation
# keys = [path[path.rfind('/')+1:path.rfind('.py')] if '/' not in path[path.find('/')+1:] else path[path.find('/')+1:path.rfind('/')] for path in paths]
# # Replace emoji in the Category
# keys = [re.sub(r'[^a-zA-Z]','',key) for key in keys]
# # Initialize dictionary
# page_dict = {key:[] for key in keys}
# # Populate the dictionary
# for key, path in zip(keys, paths):
# page_dict.get(key).append(set_page_meta(path))
# # Convert key to title()
# pages = {key.title():value for key, value in page_dict.items()}
# return pages
# # Validate if the tool/agent (namely, work) has been created
# def validate_agent(framework:str,work_type=None,task_type=None) -> bool:
# status = False
# if work_type and task_type:
# if 'agent' in st.session_state and \
# st.session_state.agent['framework'] == framework and \
# st.session_state.agent['work_type'] == work_type and \
# st.session_state.agent['task_type'] == task_type and \
# st.session_state.agent['agent']:
# status = True
# # task_type (tools) not specified
# elif work_type:
# if 'agent' in st.session_state and \
# st.session_state.agent['framework'] == framework and \
# st.session_state.agent['work_type'] == work_type and \
# st.session_state.agent['agent']:
# status = True
# else:
# if 'agent' in st.session_state and \
# st.session_state.agent['framework'] == framework and \
# st.session_state.agent['agent']:
# status = True
# return status
# Convert the type-safe conversion
def check_value(input):
return literal_eval(input)
# Silence the torch error
def init_async():
import torch
torch.classes.__path__ = [] # add this line to manually set it to empty.
def run_async_task(async_func, *args):
"""
Run an asynchronous function in a new event loop.
Args:
async_func (coroutine): The asynchronous function to execute.
*args: Arguments to pass to the asynchronous function.
Returns:
None
"""
loop = None
try:
loop = asyncio.new_event_loop()
loop.run_until_complete(async_func(*args))
except:
# Close the existing loop if open
if loop is not None:
loop.close()
# Create a new loop for retry
loop = asyncio.new_event_loop()
loop.run_until_complete(async_func(*args))
finally:
if loop is not None:
loop.close() |