Spaces:
Sleeping
Sleeping
| import os | |
| import subprocess | |
| import random | |
| from huggingface_hub import InferenceClient | |
| import gradio as gr | |
| from safe_search import safe_search | |
| from i_search import google | |
| from i_search import i_search as i_s | |
| from agent import ( | |
| ACTION_PROMPT, | |
| ADD_PROMPT, | |
| COMPRESS_HISTORY_PROMPT, | |
| LOG_PROMPT, | |
| LOG_RESPONSE, | |
| MODIFY_PROMPT, | |
| PREFIX, | |
| SEARCH_QUERY, | |
| READ_PROMPT, | |
| TASK_PROMPT, | |
| UNDERSTAND_TEST_RESULTS_PROMPT, | |
| ) | |
| from utils import parse_action, parse_file_content, read_python_module_structure | |
| from datetime import datetime | |
| now = datetime.now() | |
| date_time_str = now.strftime("%Y-%m-%d %H:%M:%S") | |
| client = InferenceClient( | |
| "mistralai/Mixtral-8x7B-Instruct-v0.1" | |
| ) | |
| ############################################ | |
| VERBOSE = True | |
| MAX_HISTORY = 100 | |
| #MODEL = "gpt-3.5-turbo" # "gpt-4" | |
| def format_prompt(message, history): | |
| prompt = "<s>" | |
| for user_prompt, bot_response in history: | |
| prompt += f"[INST] {user_prompt} [/INST]" | |
| prompt += f" {bot_response}</s> " | |
| prompt += f"[INST] {message} [/INST]" | |
| return prompt | |
| def run_gpt( | |
| prompt_template, | |
| stop_tokens, | |
| max_tokens, | |
| purpose, | |
| **prompt_kwargs, | |
| ): | |
| seed = random.randint(1,1111111111111111) | |
| print (seed) | |
| generate_kwargs = dict( | |
| temperature=1.0, | |
| max_new_tokens=1048, | |
| top_p=0.99, | |
| repetition_penalty=1.0, | |
| do_sample=True, | |
| seed=seed, | |
| ) | |
| content = PREFIX.format( | |
| date_time_str=date_time_str, | |
| purpose=purpose, | |
| safe_search=safe_search, | |
| ) + prompt_template.format(**prompt_kwargs) | |
| if VERBOSE: | |
| print(LOG_PROMPT.format(content)) | |
| #formatted_prompt = format_prompt(f"{system_prompt}, {prompt}", history) | |
| #formatted_prompt = format_prompt(f'{content}', history) | |
| stream = client.text_generation(content, **generate_kwargs, stream=True, details=True, return_full_text=False) | |
| resp = "" | |
| for response in stream: | |
| resp += response.token.text | |
| if VERBOSE: | |
| print(LOG_RESPONSE.format(resp)) | |
| return resp | |
| def compress_history(purpose, task, history, directory): | |
| resp = run_gpt( | |
| COMPRESS_HISTORY_PROMPT, | |
| stop_tokens=["observation:", "task:", "action:", "thought:"], | |
| max_tokens=1024, | |
| purpose=purpose, | |
| task=task, | |
| history=history, | |
| ) | |
| history = "observation: {}\n".format(resp) | |
| return history | |
| def call_search(purpose, task, history, directory, action_input): | |
| print("CALLING SEARCH") | |
| try: | |
| if "http" in action_input: | |
| if "<" in action_input: | |
| action_input = action_input.strip("<") | |
| if ">" in action_input: | |
| action_input = action_input.strip(">") | |
| response = i_s(action_input) | |
| #response = google(search_return) | |
| print(response) | |
| history += "observation: search result is: {}\n".format(response) | |
| else: | |
| history += "observation: I need to provide a valid URL to 'action: SEARCH action_input=https://URL'\n" | |
| except Exception as e: | |
| history += "observation: {}'\n".format(e) | |
| return "MAIN", None, history, task | |
| def call_main(purpose, task, history, directory, action_input): | |
| resp = run_gpt( | |
| ACTION_PROMPT, | |
| stop_tokens=["observation:", "task:", "action:","thought:"], | |
| max_tokens=1048, | |
| purpose=purpose, | |
| task=task, | |
| history=history, | |
| ) | |
| lines = resp.strip().strip("\n").split("\n") | |
| for line in lines: | |
| if line == "": | |
| continue | |
| if line.startswith("thought: "): | |
| history += "{}\n".format(line) | |
| elif line.startswith("action: "): | |
| action_name, action_input = parse_action(line) | |
| print (f'ACTION_NAME :: {action_name}') | |
| print (f'ACTION_INPUT :: {action_input}') | |
| history += "{}\n".format(line) | |
| if "COMPLETE" in action_name or "COMPLETE" in action_input: | |
| task = "END" | |
| return action_name, action_input, history, task | |
| else: | |
| return action_name, action_input, history, task | |
| else: | |
| history += "{}\n".format(line) | |
| #history += "observation: the following command did not produce any useful output: '{}', I need to check the commands syntax, or use a different command\n".format(line) | |
| #return action_name, action_input, history, task | |
| #assert False, "unknown action: {}".format(line) | |
| return "MAIN", None, history, task | |
| def call_set_task(purpose, task, history, directory, action_input): | |
| task = run_gpt( | |
| TASK_PROMPT, | |
| stop_tokens=[], | |
| max_tokens=64, | |
| purpose=purpose, | |
| task=task, | |
| history=history, | |
| ).strip("\n") | |
| history += "observation: task has been updated to: {}\n".format(task) | |
| return "MAIN", None, history, task | |
| def end_fn(purpose, task, history, directory, action_input): | |
| task = "END" | |
| return "COMPLETE", "COMPLETE", history, task | |
| def call_store_mem(purpose, task, history, directory, action_input): | |
| print(f"STORE MEMORY CALLED :: {action_input}") | |
| history += "observation: Memory Stored" | |
| return "UPDATE-TASK", None, history, task | |
| def call_recall_mem(purpose, task, history, directory, action_input): | |
| print(f"RECALL MEMORY CALLED :: {action_input}") | |
| history += "observation: no memory recalled, try a search engine" | |
| return "UPDATE-TASK", None, history, task | |
| NAME_TO_FUNC = { | |
| "MAIN": call_main, | |
| "UPDATE-TASK": call_set_task, | |
| "SEARCH": call_search, | |
| "COMPLETE": end_fn, | |
| "STORE_MEMORY": call_store_mem, | |
| "RECALL_MEMORY": call_recall_mem, | |
| } | |
| def run_action(purpose, task, history, directory, action_name, action_input): | |
| print(f'action_name::{action_name}') | |
| try: | |
| if "RESPONSE" in action_name or "COMPLETE" in action_name: | |
| action_name="COMPLETE" | |
| task="END" | |
| return action_name, "COMPLETE", history, task | |
| # compress the history when it is long | |
| if len(history.split("\n")) > MAX_HISTORY: | |
| if VERBOSE: | |
| print("COMPRESSING HISTORY") | |
| history = compress_history(purpose, task, history, directory) | |
| if not action_name in NAME_TO_FUNC: | |
| action_name="MAIN" | |
| if action_name == "" or action_name == None: | |
| action_name="MAIN" | |
| assert action_name in NAME_TO_FUNC | |
| print("RUN: ", action_name, action_input) | |
| return NAME_TO_FUNC[action_name](purpose, task, history, directory, action_input) | |
| except Exception as e: | |
| history += "observation: the previous command did not produce any useful output, I need to check the commands syntax, or use a different command\n" | |
| return "UPDATE-TASK", None, history, task | |
| def run(purpose,history): | |
| #print(purpose) | |
| #print(hist) | |
| task=None | |
| directory="./" | |
| if history: | |
| history=str(history).strip("[]") | |
| if not history: | |
| history = "" | |
| action_name = "UPDATE-TASK" if task is None else "MAIN" | |
| action_input = None | |
| while True: | |
| print("") | |
| print("") | |
| print("---") | |
| print("purpose:", purpose) | |
| print("task:", task) | |
| print("---") | |
| print(history) | |
| print("---") | |
| action_name, action_input, history, task = run_action( | |
| purpose, | |
| task, | |
| history, | |
| directory, | |
| action_name, | |
| action_input, | |
| ) | |
| yield (history) | |
| #yield ("",[(purpose,history)]) | |
| if task == "END": | |
| return (history) | |
| #return ("", [(purpose,history)]) | |
| ################################################ | |
| def format_prompt(message, history): | |
| prompt = "<s>" | |
| for user_prompt, bot_response in history: | |
| prompt += f"[INST] {user_prompt} [/INST]" | |
| prompt += f" {bot_response}</s> " | |
| prompt += f"[INST] {message} [/INST]" | |
| return prompt | |
| agents =[ | |
| "WEB_DEV", | |
| "AI_SYSTEM_PROMPT", | |
| "PYTHON_CODE_DEV" | |
| ] | |
| def generate( | |
| prompt, history, agent_name=agents[0], sys_prompt="", temperature=0.9, max_new_tokens=256, top_p=0.95, repetition_penalty=1.0, | |
| ): | |
| seed = random.randint(1,1111111111111111) | |
| agent=prompts.WEB_DEV | |
| if agent_name == "WEB_DEV": | |
| agent = prompts.WEB_DEV | |
| if agent_name == "AI_SYSTEM_PROMPT": | |
| agent = prompts.AI_SYSTEM_PROMPT | |
| if agent_name == "PYTHON_CODE_DEV": | |
| agent = prompts.PYTHON_CODE_DEV | |
| system_prompt=agent | |
| temperature = float(temperature) | |
| if temperature < 1e-2: | |
| temperature = 1e-2 | |
| top_p = float(top_p) | |
| generate_kwargs = dict( | |
| temperature=temperature, | |
| max_new_tokens=max_new_tokens, | |
| top_p=top_p, | |
| repetition_penalty=repetition_penalty, | |
| do_sample=True, | |
| seed=seed, | |
| ) | |
| formatted_prompt = format_prompt(f"{system_prompt}, {prompt}", history) | |
| stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) | |
| output = "" | |
| for response in stream: | |
| output += response.token.text | |
| yield output | |
| return output | |
| additional_inputs=[ | |
| gr.Dropdown( | |
| label="Agents", | |
| choices=[s for s in agents], | |
| value=agents[0], | |
| interactive=True, | |
| ), | |
| gr.Textbox( | |
| label="System Prompt", | |
| max_lines=1, | |
| interactive=True, | |
| ), | |
| gr.Slider( | |
| label="Temperature", | |
| value=0.9, | |
| minimum=0.0, | |
| maximum=1.0, | |
| step=0.05, | |
| interactive=True, | |
| info="Higher values produce more diverse outputs", | |
| ), | |
| gr.Slider( | |
| label="Max new tokens", | |
| value=1048*10, | |
| minimum=0, | |
| maximum=1048*10, | |
| step=64, | |
| interactive=True, | |
| info="The maximum numbers of new tokens", | |
| ), | |
| gr.Slider( | |
| label="Top-p (nucleus sampling)", | |
| value=0.90, | |
| minimum=0.0, | |
| maximum=1, | |
| step=0.05, | |
| interactive=True, | |
| info="Higher values sample more low-probability tokens", | |
| ), | |
| gr.Slider( | |
| label="Repetition penalty", | |
| value=1.2, | |
| minimum=1.0, | |
| maximum=2.0, | |
| step=0.05, | |
| interactive=True, | |
| info="Penalize repeated tokens", | |
| ), | |
| ] | |
| examples=[["What are the biggest news stories today?", None, None, None, None, None, ], | |
| ["Search the Internet to find todays Breaking News Stories, return an exhaustive News Article that you write summarizing them all.", None, None, None, None, None, ], | |
| ["When is the next full moon?", None, None, None, None, None, ], | |
| ["I'm planning a vacation to Japan. Can you suggest a one-week itinerary including must-visit places and local cuisines to try?", None, None, None, None, None, ], | |
| ["Can you write a short story about a time-traveling detective who solves historical mysteries?", None, None, None, None, None,], | |
| ["I'm trying to learn French. Can you provide some common phrases that would be useful for a beginner, along with their pronunciations?", None, None, None, None, None,], | |
| ["I have chicken, rice, and bell peppers in my kitchen. Can you suggest an easy recipe I can make with these ingredients?", None, None, None, None, None,], | |
| ["Can you explain how the QuickSort algorithm works and provide a Python implementation?", None, None, None, None, None,], | |
| ["What are some unique features of Rust that make it stand out compared to other systems programming languages like C++?", None, None, None, None, None,], | |
| ] | |
| ''' | |
| gr.ChatInterface( | |
| fn=run, | |
| chatbot=gr.Chatbot(show_label=False, show_share_button=False, show_copy_button=True, likeable=True, layout="panel"), | |
| title="Mixtral 46.7B\nMicro-Agent\nInternet Search <br> development test", | |
| examples=examples, | |
| concurrency_limit=20, | |
| with gr.Blocks() as ifacea: | |
| gr.HTML("""TEST""") | |
| ifacea.launch() | |
| ).launch() | |
| with gr.Blocks() as iface: | |
| #chatbot=gr.Chatbot(show_label=False, show_share_button=False, show_copy_button=True, likeable=True, layout="panel"), | |
| chatbot=gr.Chatbot() | |
| msg = gr.Textbox() | |
| with gr.Row(): | |
| submit_b = gr.Button() | |
| clear = gr.ClearButton([msg, chatbot]) | |
| submit_b.click(run, [msg,chatbot],[msg,chatbot]) | |
| msg.submit(run, [msg, chatbot], [msg, chatbot]) | |
| iface.launch() | |
| ''' | |
| gr.ChatInterface( | |
| fn=run, | |
| chatbot=gr.Chatbot(show_label=False, show_share_button=False, show_copy_button=True, likeable=True, layout="panel"), | |
| title="Mixtral 46.7B\nMicro-Agent\nInternet Search <br> development test", | |
| examples=examples, | |
| concurrency_limit=20, | |
| ).launch(show_api=False) |