| import gradio as gr |
| import urllib.request |
| import requests |
| import bs4 |
| import lxml |
| import os |
| |
| from huggingface_hub import InferenceClient,HfApi |
| import random |
| import json |
| import datetime |
| |
| from prompts import ( |
| FINDER, |
| COMPRESS_HISTORY_PROMPT, |
| COMPRESS_DATA_PROMPT, |
| COMPRESS_DATA_PROMPT_SMALL, |
| LOG_PROMPT, |
| LOG_RESPONSE, |
| PREFIX, |
| TASK_PROMPT, |
| ) |
| api=HfApi() |
|
|
|
|
|
|
| client = InferenceClient( |
| "mistralai/Mixtral-8x7B-Instruct-v0.1" |
| ) |
|
|
| def parse_action(string: str): |
| print("PARSING:") |
| print(string) |
| assert string.startswith("action:") |
| idx = string.find("action_input=") |
| print(idx) |
| if idx == -1: |
| print ("idx == -1") |
| print (string[8:]) |
| return string[8:], None |
|
|
| print ("last return:") |
| print (string[8 : idx - 1]) |
| print (string[idx + 13 :].strip("'").strip('"')) |
| return string[8 : idx - 1], string[idx + 13 :].strip("'").strip('"') |
|
|
|
|
|
|
| VERBOSE = True |
| MAX_HISTORY = 100 |
| MAX_DATA = 20000 |
|
|
| def format_prompt(message, history): |
| prompt = "<s>" |
| for user_prompt, bot_response in history: |
| prompt += f"[INST] {user_prompt} [/INST]" |
| prompt += f" {bot_response}</s> " |
| prompt += f"[INST] {message} [/INST]" |
| return prompt |
|
|
| def call_search(purpose, task, history, action_input): |
| return_list=[] |
| print (action_input) |
| |
| print ("trying") |
| try: |
| if action_input != "" and action_input != None: |
| action_input.strip('""') |
| |
| |
| model_list = api.list_models(filter=f"{action_input}") |
| this_obj = list(model_list) |
| print(f'THIS_OBJ :: {this_obj[0]}') |
| for i,eb in enumerate(this_obj): |
| |
| return_list.append({"id":this_obj[i].id, |
| "author":this_obj[i].author, |
| "created_at":this_obj[i].created_at, |
| "last_modified":this_obj[i].last_modified, |
| "private":this_obj[i].private, |
| "gated":this_obj[i].gated, |
| "disabled":this_obj[i].disabled, |
| "downloads":this_obj[i].downloads, |
| "likes":this_obj[i].likes, |
| "library_name":this_obj[i].library_name, |
| "tags":this_obj[i].tags, |
| "pipeline_tag":this_obj[i].pipeline_tag, |
| }) |
| |
| c=0 |
| rl = len(return_list) |
| print(rl) |
| for i in str(return_list): |
| if i == " " or i==",": |
| c +=1 |
| |
| print (c) |
| if rl > MAX_DATA: |
| print("compressing...") |
| return_list = compress_data(rl,purpose,task,return_list) |
| history = "observation: the search results are:\n {}\n".format(return_list) |
| return "MAIN", None, history, task |
| else: |
| history = "observation: I need to trigger a search using the following syntax:\naction: SEARCH action_input=URL\n" |
| return "UPDATE-TASK", None, history, task |
| except Exception as e: |
| print (e) |
| history = "observation: I need to trigger a search using the following syntax:\naction: SEARCH action_input=URL\n" |
| return "UPDATE-TASK", None, history, task |
|
|
| |
| |
| |
| return "MAIN", None, history, task |
|
|
|
|
| def run_gpt( |
| prompt_template, |
| stop_tokens, |
| max_tokens, |
| seed, |
| purpose, |
| **prompt_kwargs, |
| ): |
| timestamp=datetime.datetime.now() |
|
|
| print(seed) |
| generate_kwargs = dict( |
| temperature=0.9, |
| max_new_tokens=max_tokens, |
| top_p=0.95, |
| repetition_penalty=1.0, |
| do_sample=True, |
| seed=seed, |
| ) |
| |
| content = PREFIX.format( |
| timestamp=timestamp, |
| purpose=purpose, |
| ) + prompt_template.format(**prompt_kwargs) |
| if VERBOSE: |
| print(LOG_PROMPT.format(content)) |
| |
| |
| |
| |
|
|
| stream = client.text_generation(content, **generate_kwargs, stream=True, details=True, return_full_text=False) |
| resp = "" |
| for response in stream: |
| resp += response.token.text |
| |
|
|
| if VERBOSE: |
| print(LOG_RESPONSE.format(resp)) |
| return resp |
|
|
| def compress_data(c,purpose, task, history, result): |
| seed=random.randint(1,1000000000) |
| |
| print (c) |
| |
| |
| divr=int(c)/MAX_DATA |
| divi=int(divr)+1 if divr != int(divr) else int(divr) |
| chunk = int(int(c)/divr) |
| print(f'chunk:: {chunk}') |
| print(f'divr:: {divr}') |
| print (f'divi:: {divi}') |
| out = [] |
| |
| s=0 |
| e=chunk |
| print(f'e:: {e}') |
| new_history="" |
| task = f'Compile this data to fulfill the task: {task}, and complete the purpose: {purpose}\n' |
| for z in range(divi): |
| print(f's:e :: {s}:{e}') |
| |
| hist = history[s:e] |
| |
| resp = run_gpt( |
| COMPRESS_DATA_PROMPT, |
| stop_tokens=["observation:", "task:", "action:", "thought:"], |
| max_tokens=2048, |
| seed=seed, |
| purpose=purpose, |
| task=task, |
| knowledge=new_history, |
| history=hist, |
| ) |
| new_history = resp |
| print (resp) |
| out+=resp |
| e=e+chunk |
| s=s+chunk |
| ''' |
| resp = run_gpt( |
| COMPRESS_DATA_PROMPT, |
| stop_tokens=["observation:", "task:", "action:", "thought:"], |
| max_tokens=2048, |
| seed=seed, |
| purpose=purpose, |
| task=task, |
| knowledge=new_history, |
| history=result, |
| ) |
| ''' |
| print ("final" + resp) |
| history = "result: {}\n".format(resp) |
| return history |
|
|
|
|
|
|
|
|
| def compress_history(purpose, task, history): |
| resp = run_gpt( |
| COMPRESS_HISTORY_PROMPT, |
| stop_tokens=["observation:", "task:", "action:", "thought:"], |
| max_tokens=512, |
| seed=random.randint(1,1000000000), |
| purpose=purpose, |
| task=task, |
| history=history, |
| ) |
| history = "observation: {}\n".format(resp) |
| return history |
|
|
|
|
| def call_main(purpose, task, history, action_input, result): |
| resp = run_gpt( |
| FINDER, |
| stop_tokens=[], |
| max_tokens=2096, |
| seed=random.randint(1,1000000000), |
| purpose=purpose, |
| task=task, |
| history=history, |
| ) |
| lines = resp.strip().strip("\n").split("\n") |
| |
| for line in lines: |
| if line == "": |
| continue |
| if line.startswith("thought: "): |
| history += "{}\n".format(line) |
| |
| if line.startswith("action: "): |
| action_name, action_input = parse_action(line) |
| print(f'ACTION::{action_name} -- INPUT :: {action_input}') |
| |
| return action_name, action_input, history, task, result |
| else: |
| |
| history += "{}\n".format(line) |
| |
| |
| result=history |
| if "VERBOSE": |
| print(history) |
| return "MAIN", None, history, task, result |
|
|
|
|
| def call_set_task(purpose, task, history, action_input, result): |
| task = run_gpt( |
| TASK_PROMPT, |
| stop_tokens=[], |
| max_tokens=1024, |
| seed=random.randint(1,1000000000), |
| purpose=purpose, |
| task=task, |
| history=history, |
| ).strip("\n") |
| history += "observation: task has been updated to: {}\n".format(task) |
| return "MAIN", None, history, task, result |
|
|
|
|
|
|
| |
| def search_all(url): |
| source="" |
| return source |
|
|
|
|
|
|
| def find_all(purpose,task,history, url, result): |
| return_list=[] |
| print (url) |
| |
| print (f"trying URL:: {url}") |
| try: |
| if url != "" and url != None: |
| |
| out = [] |
| source = requests.get(url) |
| |
| soup = bs4.BeautifulSoup(source.content,'lxml') |
| |
| print(soup.title) |
| |
| print(soup.title.name) |
| |
| |
| |
| |
| |
| |
| rawp=(f'RAW TEXT RETURNED: {soup.text}') |
| cnt=0 |
| cnt+=len(rawp) |
| out.append(rawp) |
| out.append("HTML fragments: ") |
| q=("a","p","span","content","article") |
| for p in soup.find_all("a"): |
| |
| |
| |
| out.append([{"LINK TITLE":p.get('title'),"URL":p.get('href'),"STRING":p.string}]) |
| c=0 |
| out = str(out) |
| rl = len(out) |
| |
| print(f'rl:: {rl}') |
| |
| for i in str(out): |
| |
| c +=1 |
| print (f'c:: {c}') |
| if c > MAX_HISTORY: |
| print("compressing...") |
| rawp = compress_data(c,purpose,task,out,result) |
| else: |
| rawp = out |
| result += rawp |
| |
| print (rawp) |
| print (f'out:: {out}') |
| history += "observation: the search results are:\n {}\n".format(rawp) |
| task = "compile report" |
| return "UPDATE-TASK", None, history, task, result |
| else: |
| history += "observation: An Error occured\nI need to trigger a search using the following syntax:\naction: SCRAPE_WEBSITE action_input=URL\n" |
| return "MAIN", None, history, task, result |
| except Exception as e: |
| print (e) |
| history += "observation: I need to trigger a search using the following syntax:\naction: SCRAPE_WEBSITE action_input=URL\n" |
| return "MAIN", None, history, task, result |
|
|
| |
| |
| |
| return "MAIN", None, history, task, result |
|
|
|
|
|
|
| |
|
|
| NAME_TO_FUNC = { |
| "MAIN": call_main, |
| "UPDATE-TASK": call_set_task, |
| "SEARCH_ENGINE": find_all, |
| "SCRAPE_WEBSITE": find_all, |
| } |
|
|
|
|
| def run_action(purpose, task, history, action_name, action_input,result): |
| if action_name == "COMPLETE": |
| print("Complete - Exiting") |
| |
| return "COMPLETE", None, history, task, result |
|
|
| |
| if len(history.split("\n")) > MAX_HISTORY: |
| if VERBOSE: |
| print("COMPRESSING HISTORY") |
| history = compress_history(purpose, task, history) |
| if action_name in NAME_TO_FUNC: |
| |
| assert action_name in NAME_TO_FUNC |
|
|
| print(f"RUN: {action_name} ACTION_INPUT: {action_input}") |
| return NAME_TO_FUNC[action_name](purpose, task, history, action_input, result) |
| else: |
| history += "observation: The TOOL I tried to use returned an error, I need to select a tool from: (UPDATE-TASK, SEARCH_ENGINE, WEBSITE_SCRAPE, COMPLETE)\n" |
|
|
| return "MAIN", None, history, task, result |
|
|
| def run(purpose,history): |
| task=None |
| result="" |
| |
| if not history: |
| history = "" |
| else: |
| history=str(history) |
| action_name = "MAIN" |
| action_input = None |
| while True: |
| print("") |
| print("") |
| print("---") |
| |
| print("task:", task) |
| print("---") |
| |
| print("---") |
|
|
| action_name, action_input, history, task, result = run_action( |
| purpose, |
| task, |
| history, |
| action_name, |
| action_input, |
| result |
| ) |
| yield result |
| if action_name == "COMPLETE": |
| return result |
|
|
|
|
|
|
|
|
| examples =[ |
| "what are todays breaking news stories?", |
| "find the most popular model that I can use to generate an image by providing a text prompt", |
| "return the top 10 models that I can use to identify objects in images", |
| "which models have the most likes from each category?" |
| ] |
|
|
|
|
| gr.ChatInterface( |
| fn=run, |
| chatbot=gr.Chatbot(show_label=False, show_share_button=False, show_copy_button=True, likeable=True, layout="panel"), |
| title="Mixtral 46.7B Powered <br> Search", |
| examples=examples, |
| concurrency_limit=20, |
| ).launch() |
| ''' |
| with gr.Blocks() as app: |
| with gr.Row(): |
| with gr.Column(scale=1): |
| inp = gr.Textbox() |
| with gr.Column(scale=2): |
| q = gr.Textbox(value="p") |
| with gr.Column(scale=2): |
| num = gr.Textbox() |
| with gr.Row(): |
| all_btn = gr.Button("Load") |
| find_btn = gr.Button("Find") |
| with gr.Row(): |
| rawp = gr.JSON() |
| outp = gr.JSON() |
| outl = gr.Textbox() |
| |
| all_btn.click(find_all,[inp,q,num],[rawp]) |
| find_btn.click(find_it,[inp,q,num],[outp,outl]) |
| |
| app.launch() |
| |
| ''' |