Spaces:
Build error
Build error
| import gradio as gr | |
| import ssl | |
| from openai import OpenAI | |
| import time | |
| import os | |
| import shutil | |
| from datetime import datetime | |
| import Arcana | |
| from nylon import * | |
| import pandas as pd | |
| import json | |
| import fiber | |
| import cite_source | |
| foldername = 'Celsiaaa' | |
| dbmsmode = 'Fiber' | |
| try: | |
| with open('settings.arcana',mode='r') as file: | |
| foldername,dbmsmode = file.read().split('\n') | |
| except Exception as e: | |
| print(e) | |
| with open('settings.arcana',mode='w') as file: | |
| newsettings = foldername+'\n'+dbmsmode | |
| file.write(newsettings) | |
| # SSL configuration to avoid verification issues | |
| try: | |
| _create_unverified_https_context = ssl._create_unverified_context | |
| except AttributeError: | |
| pass | |
| else: | |
| ssl._create_default_https_context = _create_unverified_https_context | |
| def query_database2(query): | |
| print(dbmsmode) | |
| if dbmsmode == 'Nylon': | |
| db = ChatDatabase(foldername+'.txt') | |
| sender = 'Arcana' | |
| N = 10 | |
| cache = {} | |
| query_tag = None | |
| relevant_messages = db.get_relevant_messages(sender, query, N, cache, query_tag) | |
| print("Relevant messages:") | |
| for message in relevant_messages: | |
| print(f"Sender: {message[0]}, Time: {message[1]}, Tag: {message[3]}") | |
| print(f"Message: {message[2][:100]}...") | |
| print() | |
| df_data = [str(message) for message in relevant_messages] | |
| return ';'.join(df_data) | |
| elif dbmsmode == 'Fiber': | |
| dbms = fiber.FiberDBMS() | |
| # Load or create the database | |
| dbms.load_or_create(foldername+'.txt') | |
| results = dbms.query(query, 3) | |
| # Convert each result dictionary to a string | |
| result_strings = [] | |
| for result in results: | |
| result_str = f"Name: {result['name']}\nContent: {result['content']}\nTags: {result['tags']}\nIndex: {result['index']}" | |
| result_strings.append(result_str) | |
| # Join all result strings with a separator | |
| return ';'.join(result_strings) | |
| def cite(style=None,author=None,title=None,publisher=None,year=None,url=None,date_accessed=None): | |
| return cite_source.generate_citation(style=style,author=author,title=title,publisher=publisher,year=year,url=url,access_date=date_accessed) | |
| def list_files_indb(directory=foldername): | |
| """ | |
| List all files in the given directory, separated by semicolons. | |
| :param directory: The directory to list files from. Defaults to the current directory. | |
| :return: A string of filenames separated by semicolons. | |
| """ | |
| try: | |
| # Get all files in the directory | |
| files = [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))] | |
| # Join the filenames with semicolons | |
| return ';'.join(files) | |
| except Exception as e: | |
| return f"An error occurred: {str(e)}" | |
| search_mode = 0#Always Search | |
| # OpenAI client setup | |
| client = OpenAI( | |
| base_url='https://api.openai-proxy.org/v1', | |
| api_key='sk-JPHnp3AyBA0TOxXuhJ01sbhuq8dRlW6YHQjbGpla279c4dAn', | |
| ) | |
| # Function list for OpenAI API | |
| function_list = [ | |
| { | |
| "name": "search_database", | |
| "description": "Query the database and return a list of results as strings", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "The query to execute against the database" | |
| }, | |
| }, | |
| "required": ["query"] | |
| } | |
| }, | |
| { | |
| "name": "list_database_files", | |
| "description": "Check what files are present in the database", | |
| "parameters":{ | |
| "type":"object", | |
| "properties":{ | |
| "query":{ | |
| "type":"string", | |
| "description":"Gives a list of semicolon seperated file names in the database" | |
| }, | |
| }, | |
| } | |
| } | |
| ] | |
| # Mapping of function names to actual function objects | |
| function_map = { | |
| "search_database": query_database2, | |
| "list_database_files":list_files_indb | |
| } | |
| def execute_function(function_name, function_args): | |
| if function_name in function_map: | |
| return function_map[function_name](**function_args) | |
| else: | |
| return f"Error: Function {function_name} not found" | |
| mapsearchmode = ['always', 'auto', 'none'] | |
| def openai_api_call(messages, retries=3, delay=5): | |
| global search_mode # Declare search_mode as a global variable | |
| for attempt in range(retries): | |
| try: | |
| # Modify the user's message if search_mode is 0 | |
| if search_mode == 0: | |
| messages[-1]['content'] = "[System: SEARCH when the user ASKED A QUESTION & remember to CITE(the source is the first tag). Otherwise do not search. The User's question:"];" + messages[-1]['content'] | |
| completion = client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=messages, | |
| functions=function_list, | |
| function_call='auto', | |
| timeout=10 | |
| ) | |
| response_message = completion.choices[0].message | |
| # Check if the model wants to call a function | |
| if response_message.function_call: | |
| function_name = response_message.function_call.name | |
| function_args = json.loads(response_message.function_call.arguments) | |
| function_response = execute_function(function_name, function_args) | |
| # Add the function response to the conversation | |
| messages.append(response_message.model_dump()) # The model's request to call the function | |
| messages.append({ | |
| "role": "function", | |
| "name": function_name, | |
| "content": json.dumps(function_response) | |
| }) | |
| # Make a follow-up call to the model with the function response | |
| return openai_api_call(messages) | |
| else: | |
| return response_message.content | |
| except Exception as e: | |
| print(f"Attempt {attempt + 1} failed: {e}") | |
| if attempt < retries - 1: | |
| time.sleep(delay) | |
| else: | |
| return "Sorry, I am having trouble connecting to the server. Please try again later." | |
| return "Failed to get a response after multiple attempts." | |
| def handle_search_mode(mode): | |
| print(mode) | |
| global search_mode | |
| if mode == "Always": | |
| search_mode = 0 | |
| return "You are in Mode 1" | |
| elif mode == "Automatic": | |
| search_mode = 1 | |
| return "You are in Mode 2" | |
| else: | |
| search_mode = 0 | |
| return "Select a mode" | |
| def handle_dbms_mode(mode): | |
| print(mode) | |
| global dbmsmode | |
| with open('settings.arcana',mode='w') as file: | |
| newsettings = foldername+'\n'+mode | |
| file.write(newsettings) | |
| if mode == "Nylon": | |
| dbmsmode = "Nylon" | |
| return "You are in Mode 1" | |
| elif mode == "Fiber": | |
| dbmsmode = "Fiber" | |
| return "You are in Mode 2" | |
| else: | |
| search_mode = 0 | |
| return "Select a mode" | |
| # Chatbot response function | |
| def chatbot_response(message, history): | |
| messages = [{"role": "system", "content": '''Your name is Arcana. You are a chatbot made by Indexademics. Your goal is to solve questions. You can query the database at anytime when the user asks a question specific to their tasks and you think is unlikely to be searchable online. | |
| If they ask anything about academics, search your database first, then use your own knowledge. Cite the file name when you use the dbms.'''}] | |
| for human, assistant in history: | |
| messages.append({"role": "user", "content": human}) | |
| messages.append({"role": "assistant", "content": assistant}) | |
| messages.append({"role": "user", "content": message}) | |
| response = openai_api_call(messages) | |
| return response | |
| selected = None | |
| from concurrent.futures import ThreadPoolExecutor | |
| # Function to handle the file upload | |
| def handle_file_upload(file): | |
| # Ensure the cache2 directory exists | |
| cache_dir = foldername | |
| os.makedirs(cache_dir, exist_ok=True) | |
| # Get the uploaded file path | |
| file_path = file.name | |
| # Define the new path for the uploaded file | |
| new_file_path = os.path.join(cache_dir, os.path.basename(file_path)) | |
| # Move the file to the cache2 directory | |
| shutil.move(file_path, new_file_path) | |
| # Get the file size | |
| file_size = os.path.getsize(new_file_path) | |
| return f"File saved to {new_file_path} with size: {file_size} bytes" | |
| # Wrapper function to run the file upload in a thread | |
| def handle_file_upload_threaded(file): | |
| with ThreadPoolExecutor() as executor: | |
| future = executor.submit(handle_file_upload, file) | |
| return future.result() | |
| def list_uploaded_files(): | |
| global foldername | |
| if not os.path.exists(foldername): | |
| return [] | |
| files = os.listdir(foldername) | |
| return [[file] for file in files] | |
| def on_select(evt: gr.SelectData): | |
| global selected | |
| selected_value = evt.value | |
| selected_index = evt.index | |
| selected = selected_value | |
| print(f"Selected value: {selected_value} at index: {selected_index}") | |
| file_path = os.path.join(foldername,selected_value) if selected_value else None | |
| status_message = f"Selected: {selected_value}" if selected_value else "No file selected" | |
| file_size = get_file_size(file_path) if file_path else "" | |
| file_creation_time = get_file_creation_time(file_path) if file_path else "" | |
| return file_path, status_message, file_size, file_creation_time | |
| def get_file_size(file_path): | |
| if file_path and os.path.exists(file_path): | |
| size_bytes = os.path.getsize(file_path) | |
| if size_bytes < 1024: | |
| return f"{size_bytes} bytes" | |
| elif size_bytes < 1024 * 1024: | |
| return f"{size_bytes / 1024:.2f} KB" | |
| else: | |
| return f"{size_bytes / (1024 * 1024):.2f} MB" | |
| return "" | |
| def get_file_creation_time(file_path): | |
| if file_path and os.path.exists(file_path): | |
| creation_time = os.path.getctime(file_path) | |
| return datetime.fromtimestamp(creation_time).strftime("%Y-%m-%d %H:%M:%S") | |
| return "" | |
| def delete_file(): | |
| global selected,foldername | |
| if selected: | |
| file_path = os.path.join(foldername, selected) | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| return list_uploaded_files(), None, f"File {selected} deleted successfully", "", "" | |
| else: | |
| return list_uploaded_files(), None, f"File {selected} not found", "", "" | |
| else: | |
| return list_uploaded_files(), None, "No file selected for deletion", "", "" | |
| def refresh_files(): | |
| return list_uploaded_files() | |
| def display_file(evt: gr.SelectData, df): | |
| file_path = os.path.join(foldername, evt.value) | |
| return file_path, file_path if file_path.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')) else None, f"Displaying: {evt.value}" | |
| def render_to_database(): | |
| # This function is undefined as per your request | |
| Arcana.main(foldername) | |
| def change_theme(theme): | |
| gr.Interface.theme = theme | |
| def rename_file(new_name): | |
| global selected | |
| if selected and new_name: | |
| old_path = os.path.join(foldername, selected) | |
| new_path = os.path.join(foldername, new_name+'.'+selected.split('.')[-1]) | |
| if os.path.exists(old_path): | |
| os.rename(old_path, new_path) | |
| selected = new_name | |
| return list_uploaded_files(), f"File renamed to {new_name}", new_path, get_file_size(new_path), get_file_creation_time(new_path) | |
| else: | |
| return list_uploaded_files(), f"File {selected} not found", None, "", "" | |
| return list_uploaded_files(), "No file selected or new name not provided", None, "", "" | |
| def query_database(query): | |
| # Usage example | |
| db = ChatDatabase(foldername+'.txt') | |
| # Example 1: Get relevant messages | |
| sender = 'Arcana' | |
| N = 10 | |
| cache = {} | |
| query_tag = None | |
| relevant_messages = db.get_relevant_messages(sender, query, N, cache, query_tag) | |
| print("Relevant messages:") | |
| for message in relevant_messages: | |
| print(f"Sender: {message[0]}, Time: {message[1]}, Tag: {message[3]}") | |
| print(f"Message: {message[2][:100]}...") | |
| print() | |
| df_data = [{"Nylon Returned Query": str(message)} for message in relevant_messages] | |
| # Create a pandas DataFrame | |
| df = pd.DataFrame(df_data) | |
| return df | |
| def query_database_fiber(query): | |
| dbms = fiber.FiberDBMS() | |
| # Load or create the database | |
| dbms.load_or_create(foldername+'.txt') | |
| results = dbms.query(query, 10) | |
| # Convert the results to a pandas DataFrame | |
| df = pd.DataFrame(results) | |
| # Reorder columns if needed | |
| columns_order = ['name', 'content', 'tags', 'index'] | |
| df = df[columns_order] | |
| return df | |
| def setdbname(name): | |
| global foldername | |
| foldername = name | |
| with open('settings.arcana',mode='w') as file: | |
| newsettings = foldername+'\n'+dbmsmode | |
| file.write(newsettings) | |
| example_database = [ | |
| "What is Hydrogen Bonding?", | |
| "Tell me the difference between impulse and force.", | |
| "Tell me a joke that Calculus students will understand.", | |
| "How should I review for the AP Biology Exam?", | |
| "What kind of resources are available in PA and Indexademics?", | |
| "What is the StandardCAS™ group?", | |
| "Explain the concept of quantum entanglement.", | |
| "What are the main differences between mitosis and meiosis?", | |
| "How does the Doppler effect work?", | |
| "Explain the process of photosynthesis.", | |
| "What is the significance of the Pythagorean theorem?", | |
| "How does natural selection contribute to evolution?", | |
| "What is the most important chapter in AP Statistics?", | |
| "How should I prepare on the IB Chinese Exam?" | |
| ] | |
| import random | |
| def get_random_examples(num_examples=5): | |
| return random.sample(example_database, min(num_examples, len(example_database))) | |
| # Create the Gradio interface for the chatbot | |
| chatbot_interface = gr.ChatInterface( | |
| chatbot_response, | |
| chatbot=gr.Chatbot(height=400), | |
| textbox=gr.Textbox(placeholder="Type your message here...", container=False, scale=100), | |
| title="Indexademics ChatBot", | |
| description="Arcana v1", | |
| theme="default", | |
| examples=get_random_examples(), | |
| cache_examples=False, | |
| retry_btn=gr.Button('Retry'), | |
| undo_btn="Delete Previous", | |
| clear_btn="Clear", | |
| ) | |
| def chatbot_response(message): | |
| # Your chatbot response logic here | |
| return f"Response to: {message}" | |
| def relaunch(): | |
| global demo | |
| demo.close() | |
| demo.launch(share=True) | |
| # Combine the interfaces using Tabs | |
| with gr.Blocks(js=""" | |
| async () => { | |
| const originalFetch = window.fetch; | |
| window.fetch = (url, options) => { | |
| if (options && options.signal) { | |
| const controller = new AbortController(); | |
| options.signal = controller.signal; | |
| setTimeout(() => controller.abort(), 3600000); // 300000 ms = 5 minutes | |
| } | |
| return originalFetch(url, options); | |
| }; | |
| } | |
| """) as demo: | |
| gr.Markdown("# ArcanaV1") | |
| with gr.Tabs(): | |
| with gr.TabItem("Welcome Page"): | |
| with open('introduction.txt',mode='r') as file: | |
| intro_content = file.read() | |
| gr.Markdown(intro_content) | |
| with gr.TabItem("Chatbot"): | |
| chatbot_interface.render() | |
| # File uploading interface | |
| with gr.TabItem('Upload'): | |
| gr.Markdown('# Upload and View Files') | |
| with gr.Row(): | |
| # Left column: File list and buttons | |
| with gr.Column(scale=1): | |
| gr.Markdown("## Upload File") | |
| file_input = gr.File(label="Upload your file here", file_types=["pdf", "jpeg", "jpg", "gif", "docx", "pptx"]) | |
| file_input.change(handle_file_upload_threaded, inputs=file_input) | |
| uploaded_files_list = gr.DataFrame(headers=["Uploaded Files"], datatype="str", interactive=False) | |
| with gr.Row(): | |
| refresh_button = gr.Button('Refresh') | |
| delete_button = gr.Button('Delete Selected File') | |
| # Right column: File viewer and Image viewer | |
| with gr.Column(scale=1): | |
| with gr.Tab("File Viewer"): | |
| file_viewer = gr.File(label="File Restore") | |
| file_status = gr.Textbox(label="File Status", interactive=False) | |
| file_size = gr.Textbox(label="File Size", interactive=False) | |
| file_creation_time = gr.Textbox(label="File Creation Time", interactive=False) | |
| with gr.Row(): | |
| new_file_name = gr.Textbox(label="New File Name", placeholder="Enter new file name") | |
| rename_button = gr.Button("Rename File") | |
| with gr.Tab("Image Viewer"): | |
| image_viewer = gr.Image(label="Image Viewer", type="filepath") | |
| # Event handlers | |
| refresh_button.click(fn=refresh_files, outputs=uploaded_files_list) | |
| delete_button.click(fn=delete_file, outputs=[uploaded_files_list, file_viewer, file_status, file_size, file_creation_time]) | |
| uploaded_files_list.select(fn=display_file, inputs=uploaded_files_list, outputs=[file_viewer, image_viewer, file_status]) | |
| uploaded_files_list.select(fn=on_select, outputs=[file_viewer, file_status, file_size, file_creation_time]) | |
| rename_button.click(fn=rename_file, | |
| inputs=new_file_name, | |
| outputs=[uploaded_files_list, file_status, file_viewer, file_size, file_creation_time]) | |
| render_button = gr.Button("Render All Files to Database") | |
| render_button.click(fn=render_to_database) | |
| with gr.TabItem('Settings'): | |
| with gr.TabItem('Database'): | |
| gr.Markdown('Settings') | |
| test_nylon = gr.Textbox(label='Test Nylon', placeholder='Query') | |
| uploaded_files_list2 = gr.DataFrame(headers=["Nylon Returned Query"], datatype="str", interactive=False) | |
| query_button2 = gr.Button('Query') | |
| query_button2.click(fn=query_database, inputs=test_nylon, outputs=uploaded_files_list2) | |
| test_fiber = gr.Textbox(label='Test Fiber', placeholder='Query') | |
| uploaded_files_list3 = gr.DataFrame(headers=["Fiber Returned Query"], datatype="str", interactive=False) | |
| query_button3 = gr.Button('Query') | |
| query_button3.click(fn=query_database_fiber, inputs=test_fiber, outputs=uploaded_files_list3) | |
| gr.Markdown('Nylon 2.1 will be deprecated in text-text selections, as it is built for image-text selections.\nDefault model is Fiber.') | |
| dbmsmode_selector = gr.Radio(["Nylon", "Fiber"], label="Select Model") | |
| dbmsmode_selector.change(handle_dbms_mode, dbmsmode_selector) | |
| database_name = gr.Textbox(label='Database Name', placeholder='cache') | |
| set_dbname = gr.Button('Set Database Name') | |
| set_dbname.click(fn=setdbname, inputs=database_name) | |
| with gr.TabItem('Theme'): | |
| gr.Markdown('Change Theme') | |
| theme_dropdown = gr.Dropdown(choices=['default', 'compact', 'huggingface', 'soft', 'dark'], label='Choose Theme') | |
| theme_button = gr.Button('Apply Theme') | |
| theme_button.click(fn=change_theme, inputs=theme_dropdown) | |
| relaunch_button = gr.Button('Relaunch') | |
| relaunch_button.click(fn=relaunch) | |
| with gr.TabItem('Search'): | |
| gr.Markdown('Set Search Modes') | |
| searchmode_selector = gr.Radio(["Always", "Automatic"], label="Select Mode") | |
| output = gr.Textbox(label="Output") | |
| searchmode_selector.change(handle_search_mode, searchmode_selector, output) | |
| # Launch the interface | |
| demo.launch(share=True) | |