Nvidia_Agent / app.py
ez7051's picture
Update app.py
ecb8dbd verified
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableBranch
from langchain_core.runnables.passthrough import RunnableAssign
from langchain_core.runnables import RunnableLambda
from langchain_nvidia_ai_endpoints import ChatNVIDIA, NVIDIAEmbeddings
from functools import partial
import gradio as gr
import numpy as np
import keras
from keras import layers
import asyncio
from asyncio import Semaphore
from operator import itemgetter
from collections import abc
from typing import Callable
import time
embedder = NVIDIAEmbeddings(model="nvolveqa_40k", model_type="query")
chat_model = ChatNVIDIA(model="llama2_13b") | StrOutputParser()
response_prompt = ChatPromptTemplate.from_messages([("system", "{system}"), ("user", "{input}")])
def train_model_neural_network(class0, class1):
## Classic deep learning training loop. If using this, train it to convergence
model = keras.Sequential([
layers.Dense(64, activation='tanh'),
layers.Dense(1, activation='sigmoid'),
])
## Since this network is so shallow and the embedding backbone is "kept frozen"
## a high learning rate should not overfit and will actually converge very quickly.
model.compile(
optimizer = keras.optimizers.Adam(learning_rate = 1),
loss = [keras.losses.BinaryCrossentropy(from_logits=False)],
metrics = [keras.metrics.BinaryAccuracy()],
)
## Since this uses stochastic gradient descent, we'll need to repeat this process
reps_per_batch = 64*5 ## <- repeat the dataset, effectively increasing "epochs" without printing too much
epochs = 2 ## <- one epoch should actually be sufficient; 2 to print out an updated training loss
x = np.array((class0 + class1) * reps_per_batch)
y = np.array(([0]*len(class0) + [1]*len(class1)) * reps_per_batch)
model.fit(x, y, epochs=epochs, batch_size=64, validation_split=.5)
return model
async def embed_with_semaphore(
text : str,
embed_fn : Callable,
semaphore : asyncio.Semaphore
) -> abc.Coroutine:
async with semaphore:
return await embed_fn(text)
## Making new embed method to limiting maximum concurrency
embed = partial(
embed_with_semaphore,
embed_fn = embedder.aembed_query,
semaphore = asyncio.Semaphore(value=10) ## <- feel free to play with value
)
## Useful method for mistral, which is currently tuned to output numbered outputs
def EnumParser(*idxs):
'''Method that pulls out values from a mistral model that outputs numbered entries'''
idxs = idxs or [slice(0, None, 1)]
entry_parser = lambda v: v if (' ' not in v) else v[v.index(' '):]
out_lambda = lambda x: [entry_parser(v).strip() for v in x.split("\n")]
return StrOutputParser() | RunnableLambda(lambda x: itemgetter(*idxs)(out_lambda(x)))
instruct_llm = ChatNVIDIA(model="mixtral_8x7b") | EnumParser()
gen_prompt = {'input' : lambda x:x} | ChatPromptTemplate.from_messages([('user',
"Please generate 20 representative conversations that would be {input}."
" Make sure all of the questions are very different in phrasing and content."
" Do not respond to the questions; just list them. Make sure all of your outputs are numbered."
" Example Response: \n1. <question>\n2. <question>\n3. <question>\n..."
)])
## Some that directly reference NVIDIA
responses_1 = (gen_prompt | instruct_llm).invoke(
## TODO: Finish the prompt
" reasonable for an NVIDIA document chatbot to be able to answer."
" Vary the context to technology, research, deep learning, language modeling, gaming, etc."
)
# print("Reasonable NVIDIA Responses:", *responses_1, "", sep="\n")
## And some that do not
responses_2 = (gen_prompt | instruct_llm).invoke(
## TODO: Finish the prompt
" be reasonable for a tech document chatbot to be able to answer. Make sure to vary"
" the context to technology, research, gaming, language modeling, graphics, etc."
)
# print("Reasonable non-NVIDIA Responses:", *responses_2, "", sep="\n")
## Feel free to try your own generations instead
responses_3 = (gen_prompt | instruct_llm).invoke(
"unreasonable for an NVIDIA document chatbot to answer,"
" as it is irrelevant and will not be useful to answer (though not inherently harmful)."
)
# print("Irrelevant Responses:", *responses_3, "", sep="\n")
responses_4 = (gen_prompt | instruct_llm).invoke(
"unreasonable for an NVIDIA document chatbot to answer,"
" as it will reflect negatively on NVIDIA."
)
# print("Harmful non-NVIDIA", *responses_4, "", sep="\n")
good_responses = responses_1 + responses_2
poor_responses = responses_3 + responses_4
model1 = 0
async def embeding():
good_tasks = [embed(query) for query in good_responses]
poor_tasks = [embed(query) for query in poor_responses]
all_tasks = good_tasks + poor_tasks
embeds = await asyncio.gather(*all_tasks)
good_embeds = embeds[:len(good_tasks)]
poor_embeds = embeds[len(good_tasks):]
model1 = train_model_neural_network(poor_embeds, good_embeds)
embeding()
# model1 = train_model_neural_network(poor_embeds, good_embeds)
def RPrint(preface=""):
def print_and_return(x, preface=""):
print(f"{preface}{x}")
return x
return RunnableLambda(partial(print_and_return, preface=preface))
## "Help them out" system message
good_sys_msg = (
"You are an NVIDIA chatbot. Please answer their question while representing NVIDIA."
" Please help them with their question if it is ethical and relevant."
)
## Resist talking about this topic" system message
poor_sys_msg = (
"You are an NVIDIA chatbot. Please answer their question while representing NVIDIA."
" Their question has been analyzed and labeled as 'probably not useful to answer as an NVIDIA Chatbot',"
" so avoid answering if appropriate and explain your reasoning to them. Make your response as short as possible."
)
def is_good_response(query):
## TODO: embed the query and pass the embedding into your classifier
embedding = np.array([embedder.embed_query(query)])
## TODO: return true if it's most likely a good response and false otherwise
return model1(embedding)
chat_chain = (
{ 'input' : (lambda x:x), 'is_good' : is_good_response }
| RPrint()
| RunnableAssign(dict(
system = RunnableBranch(
## Switch statement syntax. First lambda that returns true triggers return of result
((lambda d: d['is_good'] < 0.5), RunnableLambda(lambda x: poor_sys_msg)),
## ... (more branches can also be specified)
## Default branch. Will run if none of the others do
RunnableLambda(lambda x: good_sys_msg)
)
)) | response_prompt | chat_model
)
###############
## Gradio components
def chat_stream(message, history):
buffer = ""
for token in chat_chain.stream(message):
buffer += token
yield buffer
chatbot = gr.Chatbot(value = [[None, "Hello! I'm your NVIDIA chat agent! Let me answer some questions!"]])
demo = gr.ChatInterface(chat_stream, chatbot=chatbot).queue()
try:
demo.launch(debug=True, share=True, show_api=False)
demo.close()
except Exception as e:
demo.close()
print(e)
raise e