| import os |
| import json |
| import datetime |
|
|
| import firebase_admin |
| import openai |
| import replicate |
| import textwrap |
|
|
| from typing import Any |
|
|
| from langchain import LLMChain |
| from langchain.chains.question_answering import load_qa_chain |
| from nemoguardrails.rails import LLMRails, RailsConfig |
|
|
| from langchain.chat_models import ChatOpenAI |
| from langchain.docstore.document import Document |
| from firebase_admin import storage |
|
|
| from .csv_embed import get_embed |
| from .llm.falcon_llm import FalconLLM |
| from .llm.llms import ( |
| get_llm, |
| GPT_4, |
| FALCON_7B, |
| get_llm_chain, |
| MOBILE_PROMPT, |
| EXTENSION_PROMPT, |
| ) |
| from .pinecone_engine import init_pinecone |
| from .rails_validate import validate_rails |
| from ..common.brain_exception import BrainException |
| from ..common.http_response_codes import responses |
| from ..common.program_type import ProgramType |
| from ..common.utils import ( |
| OPENAI_API_KEY, |
| FIREBASE_STORAGE_ROOT, |
| DEFAULT_GPT_MODEL, |
| parseJsonFromCompletion, |
| PINECONE_INDEX_NAME, |
| ACTION_FLAG, |
| COMMAND_SMS_INDEXES, |
| COMMAND_BROWSER_OPEN, |
| ) |
| from .image_embedding import ( |
| query_image_text, |
| get_prompt_image_with_message, |
| ) |
| from ..model.req_model import ReqModel |
| from ..model.requests.request_model import BasicReq |
| from ..service.auto_task_service import AutoTaskService |
| from ..service.train_service import TrainService |
|
|
| |
| file_path = os.path.dirname(os.path.abspath(__file__)) |
| config = RailsConfig.from_path(f"{file_path}/guardrails-config") |
|
|
| |
| max_chunk_size = 1800 |
|
|
|
|
| def getChunks(query: str): |
| return textwrap.wrap( |
| query, width=max_chunk_size, break_long_words=False, replace_whitespace=False |
| ) |
|
|
|
|
| def llm_rails( |
| setting: ReqModel, |
| rails_app: any, |
| firebase_app: firebase_admin.App, |
| query: str, |
| image_search: bool = True, |
| is_browser: bool = False, |
| ) -> Any: |
| |
| rails_resp = rails_app.generate( |
| messages=[ |
| { |
| "role": "user", |
| "content": query, |
| } |
| ] |
| ) |
| if not validate_rails(rails_resp): |
| json_resp = json.loads(rails_resp["content"]) |
| json_resp["program"] = ProgramType.MESSAGE |
| return json_resp |
|
|
| |
| document_id = "" |
| page_content = "" |
| if not ACTION_FLAG: |
| """step 0: convert string to json""" |
| index = init_pinecone(index_name=PINECONE_INDEX_NAME, setting=setting) |
| train_service = TrainService(firebase_app=firebase_app, setting=setting) |
|
|
| """step 1: handle with gpt-4""" |
|
|
| query_result = get_embed(data=query, setting=setting) |
| try: |
| relatedness_data = index.query( |
| vector=query_result, |
| top_k=1, |
| include_values=False, |
| namespace=train_service.get_pinecone_index_train_namespace(), |
| ) |
| except Exception as ex: |
| raise BrainException(code=508, message=responses[508]) |
| if len(relatedness_data["matches"]) == 0: |
| return str({"program": "message", "content": ""}) |
| document_id = relatedness_data["matches"][0]["id"] |
|
|
| document = train_service.read_one_document(document_id) |
| page_content = document["page_content"] |
|
|
| return ask_question( |
| query=query, |
| setting=setting, |
| is_browser=is_browser, |
| image_search=image_search, |
| document_id=document_id, |
| page_content=page_content, |
| ) |
|
|
|
|
| def processLargeText( |
| setting: ReqModel, |
| app: any, |
| chunks: any, |
| firebase_app: firebase_admin.App, |
| is_browser: bool = False, |
| image_search: bool = True, |
| ): |
| if len(chunks) == 1: |
| message = llm_rails( |
| setting=setting, |
| rails_app=app, |
| firebase_app=firebase_app, |
| query=chunks[0], |
| image_search=image_search, |
| is_browser=is_browser, |
| ) |
| return message |
| else: |
| first_query = "The total length of the content that I want to send you is too large to send in only one piece.\nFor sending you that content, I will follow this rule:\n[START PART 1/10]\nThis is the content of the part 1 out of 10 in total\n[END PART 1/10]\nThen you just answer: 'Received part 1/10'\nAnd when I tell you 'ALL PART SENT', then you can continue processing the data and answering my requests." |
| app.generate(messages=[{"role": "user", "content": first_query}]) |
| for index, chunk in enumerate(chunks): |
| |
| if index + 1 != len(chunks): |
| chunk_query = ( |
| "Do not answer yet. This is just another part of the text I want to send you. Just receive and acknowledge as 'Part " |
| + str(index + 1) |
| + "/" |
| + str(len(chunks)) |
| + "received' and wait for the next part.\n" |
| + "[START PART " |
| + str(index + 1) |
| + "/" |
| + str(len(chunks)) |
| + "]\n" |
| + chunk |
| + "\n[END PART " |
| + str(index + 1) |
| + "/" |
| + str(len(chunks)) |
| + "]\n" |
| + "Remember not answering yet. Just acknowledge you received this part with the message 'Part 1/10 received' and wait for the next part." |
| ) |
| llm_rails( |
| setting=setting, |
| rails_app=app, |
| firebase_app=firebase_app, |
| query=chunk_query, |
| image_search=image_search, |
| is_browser=is_browser, |
| ) |
| else: |
| last_query = ( |
| "[START PART " |
| + str(index + 1) |
| + "/" |
| + str(len(chunks)) |
| + chunk |
| + "\n[END PART " |
| + str(index + 1) |
| + "/" |
| + str(len(chunks)) |
| + "]\n" |
| + "ALL PART SENT. Now you can continue processing the request." |
| ) |
| message = llm_rails( |
| setting=setting, |
| rails_app=app, |
| firebase_app=firebase_app, |
| query=last_query, |
| image_search=image_search, |
| is_browser=is_browser, |
| ) |
| return message |
| |
|
|
|
|
| def getCompletion( |
| query: str, |
| setting: ReqModel, |
| firebase_app: firebase_admin.App, |
| is_browser: bool = False, |
| image_search: bool = True, |
| ): |
| llm = get_llm(model=DEFAULT_GPT_MODEL, setting=setting).get_llm() |
|
|
| |
| chunks = getChunks(query) |
|
|
| app = LLMRails(config, llm) |
|
|
| return processLargeText( |
| setting=setting, |
| app=app, |
| chunks=chunks, |
| image_search=image_search, |
| firebase_app=firebase_app, |
| is_browser=is_browser, |
| ) |
|
|
|
|
| def getCompletionOnly( |
| query: str, |
| model: str = "gpt-4", |
| ) -> str: |
| llm = ChatOpenAI(model_name=model, temperature=0.5, openai_api_key=OPENAI_API_KEY) |
| chain = load_qa_chain(llm, chain_type="stuff") |
| chain_data = chain.run(input_documents=[], question=query) |
| return chain_data |
|
|
|
|
| def query_image_ask(image_content, message, setting: ReqModel): |
| prompt_template = get_prompt_image_with_message(image_content, message) |
| try: |
| data = getCompletion(query=prompt_template, image_search=False, setting=setting) |
| |
| |
| if data["program"] == "image": |
| return True |
| except Exception as e: |
| return False |
| return False |
|
|
|
|
| def getTextFromImage(filename: str, firebase_app: firebase_admin.App) -> str: |
| |
| bucket = storage.bucket(app=firebase_app) |
| blob = bucket.blob(FIREBASE_STORAGE_ROOT.__add__(filename)) |
| download_url = "" |
|
|
| try: |
| |
| download_url = blob.generate_signed_url( |
| datetime.timedelta(seconds=300), method="GET", version="v4" |
| ) |
|
|
| output = replicate.run( |
| "salesforce/blip:2e1dddc8621f72155f24cf2e0adbde548458d3cab9f00c0139eea840d0ac4746", |
| input={"image": download_url}, |
| ) |
|
|
| except Exception as e: |
| output = str("Error happend while analyzing your prompt. Please ask me again :") |
|
|
| return str(output) |
|
|
|
|
| """chat with ai |
| response: |
| { |
| 'id': 'chatcmpl-6p9XYPYSTTRi0xEviKjjilqrWU2Ve', |
| 'object': 'chat.completion', |
| 'created': 1677649420, |
| 'model': 'gpt-3.5-turbo', |
| 'usage': {'prompt_tokens': 56, 'completion_tokens': 31, 'total_tokens': 87}, |
| 'choices': [ |
| { |
| 'message': { |
| 'role': 'assistant', |
| 'content': 'The 2020 World Series was played in Arlington, Texas at the Globe Life Field, which was the new home stadium for the Texas Rangers.'}, |
| 'finish_reason': 'stop', |
| 'index': 0 |
| } |
| ] |
| } |
| """ |
|
|
|
|
| |
| def filter_guardrails(setting: ReqModel, query: str): |
| llm = ChatOpenAI( |
| model_name=DEFAULT_GPT_MODEL, temperature=0, openai_api_key=setting.openai_key |
| ) |
| app = LLMRails(config, llm) |
|
|
| |
| chunks = getChunks(query) |
|
|
| |
| message = processLargeText(app=app, chunks=chunks, setting=setting) |
|
|
| if ( |
| message |
| == "Sorry, I cannot comment on anything which is relevant to the password or pin code." |
| or message |
| == "I am an Rising AI assistant which helps answer questions based on a given knowledge base." |
| ): |
| return message |
| else: |
| return "" |
|
|
|
|
| """ |
| compose json_string for rails input with its arguments |
| """ |
|
|
|
|
| def rails_input_with_args( |
| setting: ReqModel, |
| query: str, |
| image_search: bool, |
| is_browser: bool, |
| page_content: str = "", |
| document_id: str = "", |
| ) -> str: |
| |
| json_query_with_params = { |
| "query": query, |
| "image_search": image_search, |
| "page_content": page_content, |
| "document_id": document_id, |
| "setting": setting.to_json(), |
| "is_browser": is_browser, |
| } |
| return json.dumps(json_query_with_params) |
|
|
|
|
| """main method to handle basic query""" |
|
|
|
|
| def ask_question( |
| query: str, |
| setting: ReqModel, |
| is_browser: bool, |
| image_search: bool, |
| document_id: str = "", |
| page_content: str = "", |
| ) -> Any: |
| """init falcon model""" |
| falcon_llm = FalconLLM() |
| autotask_service = AutoTaskService() |
| docs = [] |
|
|
| if ACTION_FLAG: |
| |
| prompt_template = EXTENSION_PROMPT if is_browser else MOBILE_PROMPT |
| docs.append(Document(page_content=prompt_template, metadata="")) |
| |
| chain_data = get_llm_chain( |
| model=DEFAULT_GPT_MODEL, setting=setting, temperature=0.0 |
| ).run(input_documents=docs, question=query) |
| else: |
| docs.append(Document(page_content=page_content, metadata="")) |
| """ 1. calling gpt model to categorize for all message""" |
| chain_data = get_llm_chain(model=DEFAULT_GPT_MODEL, setting=setting).run( |
| input_documents=docs, question=query |
| ) |
| try: |
| result = json.loads(chain_data) |
| |
| if result["program"] == ProgramType.IMAGE: |
| if image_search: |
| result["content"] = { |
| "image_name": query_image_text(result["content"], "", setting) |
| } |
| """ 2. check program is message to handle it with falcon llm """ |
| if result["program"] == ProgramType.MESSAGE: |
| if is_browser: |
| result["program"] = ProgramType.BrowserType.ASK_WEBSITE |
| return result |
| except ValueError as e: |
| |
| if document_id in COMMAND_SMS_INDEXES: |
| return {"program": ProgramType.SMS, "content": chain_data} |
| elif document_id in COMMAND_BROWSER_OPEN: |
| return {"program": ProgramType.BROWSER, "content": "https://google.com"} |
|
|
| if is_browser: |
| return {"program": ProgramType.BrowserType.ASK_WEBSITE, "content": ""} |
| return {"program": ProgramType.MESSAGE, "content": chain_data} |
|
|
|
|
| def handle_chat_completion(messages: Any, model: str = "gpt-3.5-turbo") -> Any: |
| openai.api_key = OPENAI_API_KEY |
|
|
| response = openai.ChatCompletion.create( |
| model=model, |
| messages=messages, |
| ) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| return response |
|
|