Spaces:
Runtime error
Runtime error
| from langchain_core.output_parsers import StrOutputParser | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.runnables import RunnableBranch | |
| from langchain_core.runnables.passthrough import RunnableAssign | |
| from langchain_core.runnables import RunnableLambda | |
| from langchain_nvidia_ai_endpoints import ChatNVIDIA, NVIDIAEmbeddings | |
| from functools import partial | |
| import gradio as gr | |
| import numpy as np | |
| import keras | |
| from keras import layers | |
| import asyncio | |
| from asyncio import Semaphore | |
| from operator import itemgetter | |
| from collections import abc | |
| from typing import Callable | |
| import time | |
| embedder = NVIDIAEmbeddings(model="nvolveqa_40k", model_type="query") | |
| chat_model = ChatNVIDIA(model="llama2_13b") | StrOutputParser() | |
| response_prompt = ChatPromptTemplate.from_messages([("system", "{system}"), ("user", "{input}")]) | |
| def train_model_neural_network(class0, class1): | |
| ## Classic deep learning training loop. If using this, train it to convergence | |
| model = keras.Sequential([ | |
| layers.Dense(64, activation='tanh'), | |
| layers.Dense(1, activation='sigmoid'), | |
| ]) | |
| ## Since this network is so shallow and the embedding backbone is "kept frozen" | |
| ## a high learning rate should not overfit and will actually converge very quickly. | |
| model.compile( | |
| optimizer = keras.optimizers.Adam(learning_rate = 1), | |
| loss = [keras.losses.BinaryCrossentropy(from_logits=False)], | |
| metrics = [keras.metrics.BinaryAccuracy()], | |
| ) | |
| ## Since this uses stochastic gradient descent, we'll need to repeat this process | |
| reps_per_batch = 64*5 ## <- repeat the dataset, effectively increasing "epochs" without printing too much | |
| epochs = 2 ## <- one epoch should actually be sufficient; 2 to print out an updated training loss | |
| x = np.array((class0 + class1) * reps_per_batch) | |
| y = np.array(([0]*len(class0) + [1]*len(class1)) * reps_per_batch) | |
| model.fit(x, y, epochs=epochs, batch_size=64, validation_split=.5) | |
| return model | |
| async def embed_with_semaphore( | |
| text : str, | |
| embed_fn : Callable, | |
| semaphore : asyncio.Semaphore | |
| ) -> abc.Coroutine: | |
| async with semaphore: | |
| return await embed_fn(text) | |
| ## Making new embed method to limiting maximum concurrency | |
| embed = partial( | |
| embed_with_semaphore, | |
| embed_fn = embedder.aembed_query, | |
| semaphore = asyncio.Semaphore(value=10) ## <- feel free to play with value | |
| ) | |
| ## Useful method for mistral, which is currently tuned to output numbered outputs | |
| def EnumParser(*idxs): | |
| '''Method that pulls out values from a mistral model that outputs numbered entries''' | |
| idxs = idxs or [slice(0, None, 1)] | |
| entry_parser = lambda v: v if (' ' not in v) else v[v.index(' '):] | |
| out_lambda = lambda x: [entry_parser(v).strip() for v in x.split("\n")] | |
| return StrOutputParser() | RunnableLambda(lambda x: itemgetter(*idxs)(out_lambda(x))) | |
| instruct_llm = ChatNVIDIA(model="mixtral_8x7b") | EnumParser() | |
| gen_prompt = {'input' : lambda x:x} | ChatPromptTemplate.from_messages([('user', | |
| "Please generate 20 representative conversations that would be {input}." | |
| " Make sure all of the questions are very different in phrasing and content." | |
| " Do not respond to the questions; just list them. Make sure all of your outputs are numbered." | |
| " Example Response: \n1. <question>\n2. <question>\n3. <question>\n..." | |
| )]) | |
| ## Some that directly reference NVIDIA | |
| responses_1 = (gen_prompt | instruct_llm).invoke( | |
| ## TODO: Finish the prompt | |
| " reasonable for an NVIDIA document chatbot to be able to answer." | |
| " Vary the context to technology, research, deep learning, language modeling, gaming, etc." | |
| ) | |
| # print("Reasonable NVIDIA Responses:", *responses_1, "", sep="\n") | |
| ## And some that do not | |
| responses_2 = (gen_prompt | instruct_llm).invoke( | |
| ## TODO: Finish the prompt | |
| " be reasonable for a tech document chatbot to be able to answer. Make sure to vary" | |
| " the context to technology, research, gaming, language modeling, graphics, etc." | |
| ) | |
| # print("Reasonable non-NVIDIA Responses:", *responses_2, "", sep="\n") | |
| ## Feel free to try your own generations instead | |
| responses_3 = (gen_prompt | instruct_llm).invoke( | |
| "unreasonable for an NVIDIA document chatbot to answer," | |
| " as it is irrelevant and will not be useful to answer (though not inherently harmful)." | |
| ) | |
| # print("Irrelevant Responses:", *responses_3, "", sep="\n") | |
| responses_4 = (gen_prompt | instruct_llm).invoke( | |
| "unreasonable for an NVIDIA document chatbot to answer," | |
| " as it will reflect negatively on NVIDIA." | |
| ) | |
| # print("Harmful non-NVIDIA", *responses_4, "", sep="\n") | |
| good_responses = responses_1 + responses_2 | |
| poor_responses = responses_3 + responses_4 | |
| model1 = 0 | |
| async def embeding(): | |
| good_tasks = [embed(query) for query in good_responses] | |
| poor_tasks = [embed(query) for query in poor_responses] | |
| all_tasks = good_tasks + poor_tasks | |
| embeds = await asyncio.gather(*all_tasks) | |
| good_embeds = embeds[:len(good_tasks)] | |
| poor_embeds = embeds[len(good_tasks):] | |
| model1 = train_model_neural_network(poor_embeds, good_embeds) | |
| embeding() | |
| # model1 = train_model_neural_network(poor_embeds, good_embeds) | |
| def RPrint(preface=""): | |
| def print_and_return(x, preface=""): | |
| print(f"{preface}{x}") | |
| return x | |
| return RunnableLambda(partial(print_and_return, preface=preface)) | |
| ## "Help them out" system message | |
| good_sys_msg = ( | |
| "You are an NVIDIA chatbot. Please answer their question while representing NVIDIA." | |
| " Please help them with their question if it is ethical and relevant." | |
| ) | |
| ## Resist talking about this topic" system message | |
| poor_sys_msg = ( | |
| "You are an NVIDIA chatbot. Please answer their question while representing NVIDIA." | |
| " Their question has been analyzed and labeled as 'probably not useful to answer as an NVIDIA Chatbot'," | |
| " so avoid answering if appropriate and explain your reasoning to them. Make your response as short as possible." | |
| ) | |
| def is_good_response(query): | |
| ## TODO: embed the query and pass the embedding into your classifier | |
| embedding = np.array([embedder.embed_query(query)]) | |
| ## TODO: return true if it's most likely a good response and false otherwise | |
| return model1(embedding) | |
| chat_chain = ( | |
| { 'input' : (lambda x:x), 'is_good' : is_good_response } | |
| | RPrint() | |
| | RunnableAssign(dict( | |
| system = RunnableBranch( | |
| ## Switch statement syntax. First lambda that returns true triggers return of result | |
| ((lambda d: d['is_good'] < 0.5), RunnableLambda(lambda x: poor_sys_msg)), | |
| ## ... (more branches can also be specified) | |
| ## Default branch. Will run if none of the others do | |
| RunnableLambda(lambda x: good_sys_msg) | |
| ) | |
| )) | response_prompt | chat_model | |
| ) | |
| ############### | |
| ## Gradio components | |
| def chat_stream(message, history): | |
| buffer = "" | |
| for token in chat_chain.stream(message): | |
| buffer += token | |
| yield buffer | |
| chatbot = gr.Chatbot(value = [[None, "Hello! I'm your NVIDIA chat agent! Let me answer some questions!"]]) | |
| demo = gr.ChatInterface(chat_stream, chatbot=chatbot).queue() | |
| try: | |
| demo.launch(debug=True, share=True, show_api=False) | |
| demo.close() | |
| except Exception as e: | |
| demo.close() | |
| print(e) | |
| raise e |