from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnableBranch from langchain_core.runnables.passthrough import RunnableAssign from langchain_core.runnables import RunnableLambda from langchain_nvidia_ai_endpoints import ChatNVIDIA, NVIDIAEmbeddings from functools import partial import gradio as gr import numpy as np import keras from keras import layers import asyncio from asyncio import Semaphore from operator import itemgetter from collections import abc from typing import Callable import time embedder = NVIDIAEmbeddings(model="nvolveqa_40k", model_type="query") chat_model = ChatNVIDIA(model="llama2_13b") | StrOutputParser() response_prompt = ChatPromptTemplate.from_messages([("system", "{system}"), ("user", "{input}")]) def train_model_neural_network(class0, class1): ## Classic deep learning training loop. If using this, train it to convergence model = keras.Sequential([ layers.Dense(64, activation='tanh'), layers.Dense(1, activation='sigmoid'), ]) ## Since this network is so shallow and the embedding backbone is "kept frozen" ## a high learning rate should not overfit and will actually converge very quickly. model.compile( optimizer = keras.optimizers.Adam(learning_rate = 1), loss = [keras.losses.BinaryCrossentropy(from_logits=False)], metrics = [keras.metrics.BinaryAccuracy()], ) ## Since this uses stochastic gradient descent, we'll need to repeat this process reps_per_batch = 64*5 ## <- repeat the dataset, effectively increasing "epochs" without printing too much epochs = 2 ## <- one epoch should actually be sufficient; 2 to print out an updated training loss x = np.array((class0 + class1) * reps_per_batch) y = np.array(([0]*len(class0) + [1]*len(class1)) * reps_per_batch) model.fit(x, y, epochs=epochs, batch_size=64, validation_split=.5) return model async def embed_with_semaphore( text : str, embed_fn : Callable, semaphore : asyncio.Semaphore ) -> abc.Coroutine: async with semaphore: return await embed_fn(text) ## Making new embed method to limiting maximum concurrency embed = partial( embed_with_semaphore, embed_fn = embedder.aembed_query, semaphore = asyncio.Semaphore(value=10) ## <- feel free to play with value ) ## Useful method for mistral, which is currently tuned to output numbered outputs def EnumParser(*idxs): '''Method that pulls out values from a mistral model that outputs numbered entries''' idxs = idxs or [slice(0, None, 1)] entry_parser = lambda v: v if (' ' not in v) else v[v.index(' '):] out_lambda = lambda x: [entry_parser(v).strip() for v in x.split("\n")] return StrOutputParser() | RunnableLambda(lambda x: itemgetter(*idxs)(out_lambda(x))) instruct_llm = ChatNVIDIA(model="mixtral_8x7b") | EnumParser() gen_prompt = {'input' : lambda x:x} | ChatPromptTemplate.from_messages([('user', "Please generate 20 representative conversations that would be {input}." " Make sure all of the questions are very different in phrasing and content." " Do not respond to the questions; just list them. Make sure all of your outputs are numbered." " Example Response: \n1. \n2. \n3. \n..." )]) ## Some that directly reference NVIDIA responses_1 = (gen_prompt | instruct_llm).invoke( ## TODO: Finish the prompt " reasonable for an NVIDIA document chatbot to be able to answer." " Vary the context to technology, research, deep learning, language modeling, gaming, etc." ) # print("Reasonable NVIDIA Responses:", *responses_1, "", sep="\n") ## And some that do not responses_2 = (gen_prompt | instruct_llm).invoke( ## TODO: Finish the prompt " be reasonable for a tech document chatbot to be able to answer. Make sure to vary" " the context to technology, research, gaming, language modeling, graphics, etc." ) # print("Reasonable non-NVIDIA Responses:", *responses_2, "", sep="\n") ## Feel free to try your own generations instead responses_3 = (gen_prompt | instruct_llm).invoke( "unreasonable for an NVIDIA document chatbot to answer," " as it is irrelevant and will not be useful to answer (though not inherently harmful)." ) # print("Irrelevant Responses:", *responses_3, "", sep="\n") responses_4 = (gen_prompt | instruct_llm).invoke( "unreasonable for an NVIDIA document chatbot to answer," " as it will reflect negatively on NVIDIA." ) # print("Harmful non-NVIDIA", *responses_4, "", sep="\n") good_responses = responses_1 + responses_2 poor_responses = responses_3 + responses_4 model1 = 0 async def embeding(): good_tasks = [embed(query) for query in good_responses] poor_tasks = [embed(query) for query in poor_responses] all_tasks = good_tasks + poor_tasks embeds = await asyncio.gather(*all_tasks) good_embeds = embeds[:len(good_tasks)] poor_embeds = embeds[len(good_tasks):] model1 = train_model_neural_network(poor_embeds, good_embeds) embeding() # model1 = train_model_neural_network(poor_embeds, good_embeds) def RPrint(preface=""): def print_and_return(x, preface=""): print(f"{preface}{x}") return x return RunnableLambda(partial(print_and_return, preface=preface)) ## "Help them out" system message good_sys_msg = ( "You are an NVIDIA chatbot. Please answer their question while representing NVIDIA." " Please help them with their question if it is ethical and relevant." ) ## Resist talking about this topic" system message poor_sys_msg = ( "You are an NVIDIA chatbot. Please answer their question while representing NVIDIA." " Their question has been analyzed and labeled as 'probably not useful to answer as an NVIDIA Chatbot'," " so avoid answering if appropriate and explain your reasoning to them. Make your response as short as possible." ) def is_good_response(query): ## TODO: embed the query and pass the embedding into your classifier embedding = np.array([embedder.embed_query(query)]) ## TODO: return true if it's most likely a good response and false otherwise return model1(embedding) chat_chain = ( { 'input' : (lambda x:x), 'is_good' : is_good_response } | RPrint() | RunnableAssign(dict( system = RunnableBranch( ## Switch statement syntax. First lambda that returns true triggers return of result ((lambda d: d['is_good'] < 0.5), RunnableLambda(lambda x: poor_sys_msg)), ## ... (more branches can also be specified) ## Default branch. Will run if none of the others do RunnableLambda(lambda x: good_sys_msg) ) )) | response_prompt | chat_model ) ############### ## Gradio components def chat_stream(message, history): buffer = "" for token in chat_chain.stream(message): buffer += token yield buffer chatbot = gr.Chatbot(value = [[None, "Hello! I'm your NVIDIA chat agent! Let me answer some questions!"]]) demo = gr.ChatInterface(chat_stream, chatbot=chatbot).queue() try: demo.launch(debug=True, share=True, show_api=False) demo.close() except Exception as e: demo.close() print(e) raise e