| | import os |
| | import boto3 |
| | import gradio as gr |
| | import math |
| | import json |
| | import time |
| | import re |
| | from botocore.client import Config |
| |
|
| | kb_id = os.getenv('KNOWLEDGE_BASE_ID') |
| | aws_access_key = os.getenv('AWS_ACCESS_KEY_ID') |
| | aws_secret_key = os.getenv('AWS_SECRET_ACCESS_KEY') |
| | region = os.getenv('AWS_REGION') |
| | initial_message = os.getenv('INITIAL_MESSAGE') |
| |
|
| | initial_suggestions = [ |
| | "How to edit RFID crew badge?", |
| | "What types of RFID tags exist?", |
| | "Is it possible to delete a facility?" |
| | ] |
| |
|
| | |
| | |
| | amazon_model_id = "amazon.titan-text-premier-v1:0" |
| |
|
| | bedrock_config = Config(connect_timeout=120, read_timeout=120, retries={'max_attempts': 0}) |
| | bedrock_client = boto3.client( |
| | 'bedrock-runtime', |
| | region_name=region, |
| | aws_access_key_id=aws_access_key, |
| | aws_secret_access_key=aws_secret_key |
| | ) |
| | bedrock_agent_client = boto3.client( |
| | "bedrock-agent-runtime", |
| | config=bedrock_config, |
| | region_name=region, |
| | aws_access_key_id=aws_access_key, |
| | aws_secret_access_key=aws_secret_key |
| | ) |
| |
|
| | def retrieve(query, numberOfResults=4): |
| | start_time = time.time() |
| | response = bedrock_agent_client.retrieve( |
| | retrievalQuery= { |
| | 'text': query |
| | }, |
| | knowledgeBaseId=kb_id, |
| | retrievalConfiguration= { |
| | 'vectorSearchConfiguration': { |
| | 'numberOfResults': numberOfResults, |
| | 'overrideSearchType': "SEMANTIC", |
| | } |
| | } |
| | ) |
| | end_time = time.time() |
| | retrieve_execution_time = end_time - start_time |
| |
|
| | return response['retrievalResults'], retrieve_execution_time |
| |
|
| | def get_contexts(retrievalResults): |
| | contexts = "" |
| | for retrievedResult in retrievalResults: |
| | contexts += retrievedResult['content']['text'] + '\n' |
| | return contexts |
| |
|
| | def clean_text(text): |
| | return re.sub(r"(For more information, follow the links provided:).*", "", text, flags=re.DOTALL) |
| |
|
| | def parse_suggestions(response): |
| | suggestions = re.findall(r'<SG>(.*?)</SG>', response) |
| | suggestions = ["" if suggestion == "Suggestion" else suggestion for suggestion in suggestions] |
| | return suggestions |
| |
|
| | def get_answer(query, history, temperature, top_p, max_token_count): |
| | contexts = "" |
| | article_urls_text = "" |
| | unique_article_ids = [] |
| | max_words = math.floor(max_token_count*0.75) |
| |
|
| | retrievalResults, retrieve_execution_time = retrieve(query) |
| | highest_score = retrievalResults[0]['score'] if retrievalResults else 0 |
| |
|
| | if highest_score > 0.45: |
| | contexts = get_contexts(retrievalResults) |
| |
|
| | for result in retrievalResults: |
| | article_id = result['metadata'].get('article_id') |
| | if article_id not in unique_article_ids: |
| | unique_article_ids.append(article_id) |
| | if len(unique_article_ids) == 3: |
| | break |
| |
|
| | PROMPT_TEMPLATE = f""" |
| | System: You are an intelligent assistant helping users understand and navigate website functionalities. |
| | Your goal is to provide clear, accurate, and contextually relevant answers based on the information provided. |
| | Use the information enclosed in the <context> tags and refer to the conversation history in the <history> tags to answer the user's question in the <question> tags. |
| | If you don't know the answer, just say that you don't know, don't try to make up an answer. |
| | |
| | Your response must: |
| | - Be in the same language that used in question. |
| | - Be fully formed and grammatically correct without cutting off any sentences. |
| | - Complete a logical thought or sentence before stopping, ensuring the response doesn't end mid-sentence. |
| | - Be clear, easy to understand, and succinct, not exceeding {max_words} words. |
| | - Refer specifically to website features or actions when relevant to the user's question. |
| | - Avoid providing URL links or external references. |
| | - Use a visually appealing and easy-to-read format. Structure information in short, clear paragraphs and, where applicable, use bullet points or numbered lists. |
| | |
| | <history> |
| | {history} |
| | </history> |
| | <context> |
| | {contexts} |
| | </context> |
| | <question> |
| | {query} |
| | </question> |
| | |
| | Provide a detailed, concise response that fully answers the user's question. Ensure your response is organized and visually readable. |
| | |
| | If necessary, reduce the amount of detail provided to keep the response within the word limit but still complete. |
| | |
| | Additionally, only if there is sufficient remaining token capacity, provide 2 or 3 related questions that the user might want to ask next based on the topic. |
| | Format these suggested questions as follows, leaving the <SG> tags empty if no suggestions are generated: |
| | <SGs><SG>Suggestion</SG><SG>Suggestion</SG><SG>Suggestion</SG></SGs> |
| | |
| | Ensure these suggested questions are brief, relevant, and encourage further exploration on the topic. |
| | Assistant: |
| | """ |
| |
|
| | body = json.dumps({ |
| | "inputText": PROMPT_TEMPLATE, |
| | "textGenerationConfig": { |
| | "maxTokenCount": max_token_count, |
| | "temperature": temperature, |
| | "topP": top_p |
| | } |
| | }) |
| |
|
| | kwargs = { |
| | "modelId": amazon_model_id, |
| | "contentType": "application/json", |
| | "accept": "*/*", |
| | "body": body |
| | } |
| | |
| | start_time = time.time() |
| |
|
| | response = bedrock_client.invoke_model(**kwargs) |
| | |
| | end_time = time.time() |
| | invoke_model_time = end_time - start_time |
| |
|
| | response_body = json.loads(response.get('body').read()) |
| | response_text = response_body['results'][0]['outputText'] |
| |
|
| | suggestions = parse_suggestions(response_text) |
| |
|
| | response_json = { |
| | "response_text": response_text.split('<SGs>')[0].strip(), |
| | "suggestions": suggestions, |
| | "article_ids": unique_article_ids |
| | } |
| |
|
| | if response_json["article_ids"]: |
| | article_urls_text = "\n\nFor more information, follow the links provided:\n" + "\n".join( |
| | f"— https://knowledge.operativeiq.com/articles/{article_id}" for article_id in unique_article_ids) |
| | |
| | |
| | |
| |
|
| | prompt_and_time = f""" |
| | Prompt: |
| | {PROMPT_TEMPLATE} |
| | Retrieve execution time: {retrieve_execution_time} seconds |
| | Invoke model execution time: {invoke_model_time} seconds |
| | """ |
| | return response_json["response_text"] + article_urls_text, prompt_and_time, response_json["suggestions"] |
| |
|
| | def format_chat_history(chat_history): |
| | prompt = "" |
| | for turn in chat_history: |
| | user_message, bot_message = turn |
| | cleaned_message = clean_text(bot_message) |
| | prompt = f"{prompt}User: {user_message}\nAssistant: {cleaned_message}\n" |
| | return prompt |
| |
|
| | def respond(message, chat_history, temperature=0.9, top_p=0.6, max_token_count=512): |
| | formatted_history = format_chat_history(chat_history[-4:]) |
| | chat_history.append([message, ""]) |
| |
|
| | stream, prompt_and_time, suggestions = get_answer(message, formatted_history, temperature, top_p, max_token_count) |
| |
|
| | suggestion1_update = gr.update(visible=bool(suggestions[0]), value=suggestions[0]) |
| | suggestion2_update = gr.update(visible=bool(suggestions[1]), value=suggestions[1]) |
| | suggestion3_update = gr.update(visible=bool(suggestions[2]), value=suggestions[2]) |
| |
|
| | for idx, text_token in enumerate(stream): |
| | if idx == 0 and text_token.startswith(" "): |
| | text_token = text_token[1:] |
| |
|
| | chat_history[-1][1] += text_token |
| | yield ( |
| | "", chat_history, prompt_and_time, |
| | suggestion1_update, suggestion2_update, suggestion3_update |
| | ) |
| | |
| | def clear_chat_history(): |
| | return ( |
| | '', [[None, initial_message]], '', |
| | gr.update(visible=True, value=initial_suggestions[0]), |
| | gr.update(visible=True, value=initial_suggestions[1]), |
| | gr.update(visible=True, value=initial_suggestions[2]), |
| | ) |
| |
|
| | def main(): |
| | with gr.Blocks(css=""" |
| | .suggestion-button { |
| | font-size: 14.5px; |
| | } |
| | """) as demo: |
| | chatbot = gr.Chatbot([[None, initial_message]], height=600) |
| |
|
| | with gr.Row(): |
| | suggestion1 = gr.Button(initial_suggestions[0], elem_classes="suggestion-button", visible=True) |
| | suggestion2 = gr.Button(initial_suggestions[1], elem_classes="suggestion-button", visible=True) |
| | suggestion3 = gr.Button(initial_suggestions[2], elem_classes="suggestion-button", visible=True) |
| |
|
| | msg = gr.Textbox(label="Question") |
| | |
| | with gr.Accordion(label="Advanced options", open=False): |
| | temperature = gr.Slider(label="Temperature", minimum=0.1, maximum=1, value=0.6, step=0.1) |
| | top_p = gr.Slider(label="Top P", minimum=0.1, maximum=1, value=0.5, step=0.1) |
| | max_token_count = gr.Slider(label="Max token count", minimum=1, maximum=1024, value=400, step=10) |
| | prompt_and_time = gr.Textbox(label="Prompt and Time", interactive=False) |
| | |
| | btn = gr.Button("Submit") |
| | clear = gr.Button("Clear history") |
| |
|
| | inputs= [ |
| | msg, chatbot, temperature, top_p, max_token_count |
| | ] |
| | outputs = [ |
| | msg, chatbot, prompt_and_time, |
| | suggestion1, suggestion2, suggestion3 |
| | ] |
| | |
| | btn.click(respond, inputs=inputs, outputs=outputs) |
| |
|
| | suggestion1.click(lambda s: s, inputs=suggestion1, outputs=msg).then( |
| | respond, |
| | inputs=inputs, |
| | outputs=outputs |
| | ) |
| | suggestion2.click(lambda s: s, inputs=suggestion2, outputs=msg).then( |
| | respond, |
| | inputs=inputs, |
| | outputs=outputs |
| | ) |
| | suggestion3.click(lambda s: s, inputs=suggestion3, outputs=msg).then( |
| | respond, |
| | inputs=inputs, |
| | outputs=outputs |
| | ) |
| |
|
| | msg.submit(respond, inputs=inputs, outputs=outputs) |
| | |
| | clear.click(clear_chat_history, outputs=[ |
| | msg, chatbot, prompt_and_time, |
| | suggestion1, suggestion2, suggestion3 |
| | ]) |
| | |
| | demo.queue().launch() |
| |
|
| | if __name__ == "__main__": |
| | main() |