Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import datetime | |
| import firebase_admin | |
| import openai | |
| import replicate | |
| import textwrap | |
| from typing import Any | |
| from langchain import LLMChain | |
| from langchain.chains.question_answering import load_qa_chain | |
| from nemoguardrails.rails import LLMRails, RailsConfig | |
| from langchain.chat_models import ChatOpenAI | |
| from langchain.docstore.document import Document | |
| from firebase_admin import storage | |
| from .csv_embed import get_embed | |
| from .llm.falcon_llm import FalconLLM | |
| from .llm.llms import ( | |
| get_llm, | |
| GPT_4, | |
| FALCON_7B, | |
| get_llm_chain, | |
| MOBILE_PROMPT, | |
| EXTENSION_PROMPT, | |
| ) | |
| from .pinecone_engine import init_pinecone | |
| from .rails_validate import validate_rails | |
| from ..common.brain_exception import BrainException | |
| from ..common.http_response_codes import responses | |
| from ..common.program_type import ProgramType | |
| from ..common.utils import ( | |
| OPENAI_API_KEY, | |
| FIREBASE_STORAGE_ROOT, | |
| DEFAULT_GPT_MODEL, | |
| parseJsonFromCompletion, | |
| PINECONE_INDEX_NAME, | |
| ACTION_FLAG, | |
| COMMAND_SMS_INDEXES, | |
| COMMAND_BROWSER_OPEN, | |
| ) | |
| from .image_embedding import ( | |
| query_image_text, | |
| get_prompt_image_with_message, | |
| ) | |
| from ..model.req_model import ReqModel | |
| from ..model.requests.request_model import BasicReq | |
| from ..service.auto_task_service import AutoTaskService | |
| from ..service.train_service import TrainService | |
| # Give the path to the folder containing the rails | |
| file_path = os.path.dirname(os.path.abspath(__file__)) | |
| config = RailsConfig.from_path(f"{file_path}/guardrails-config") | |
| # set max_chunk_size = 1800 because of adding some string | |
| max_chunk_size = 1800 # recommended max_chunk_size = 2048 | |
| def getChunks(query: str): | |
| return textwrap.wrap( | |
| query, width=max_chunk_size, break_long_words=False, replace_whitespace=False | |
| ) | |
| def llm_rails( | |
| setting: ReqModel, | |
| rails_app: any, | |
| firebase_app: firebase_admin.App, | |
| query: str, | |
| image_search: bool = True, | |
| is_browser: bool = False, | |
| ) -> Any: | |
| # rails validation | |
| rails_resp = rails_app.generate( | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": query, | |
| } | |
| ] | |
| ) | |
| if not validate_rails(rails_resp): | |
| json_resp = json.loads(rails_resp["content"]) | |
| json_resp["program"] = ProgramType.MESSAGE | |
| return json_resp | |
| # querying | |
| document_id = "" | |
| page_content = "" | |
| if not ACTION_FLAG: | |
| """step 0: convert string to json""" | |
| index = init_pinecone(index_name=PINECONE_INDEX_NAME, setting=setting) | |
| train_service = TrainService(firebase_app=firebase_app, setting=setting) | |
| """step 1: handle with gpt-4""" | |
| query_result = get_embed(data=query, setting=setting) | |
| try: | |
| relatedness_data = index.query( | |
| vector=query_result, | |
| top_k=1, | |
| include_values=False, | |
| namespace=train_service.get_pinecone_index_train_namespace(), | |
| ) | |
| except Exception as ex: | |
| raise BrainException(code=508, message=responses[508]) | |
| if len(relatedness_data["matches"]) == 0: | |
| return str({"program": "message", "content": ""}) | |
| document_id = relatedness_data["matches"][0]["id"] | |
| document = train_service.read_one_document(document_id) | |
| page_content = document["page_content"] | |
| return ask_question( | |
| query=query, | |
| setting=setting, | |
| is_browser=is_browser, | |
| image_search=image_search, | |
| document_id=document_id, | |
| page_content=page_content, | |
| ) | |
| def processLargeText( | |
| setting: ReqModel, | |
| app: any, | |
| chunks: any, | |
| firebase_app: firebase_admin.App, | |
| is_browser: bool = False, | |
| image_search: bool = True, | |
| ): | |
| if len(chunks) == 1: | |
| message = llm_rails( | |
| setting=setting, | |
| rails_app=app, | |
| firebase_app=firebase_app, | |
| query=chunks[0], | |
| image_search=image_search, | |
| is_browser=is_browser, | |
| ) | |
| return message | |
| else: | |
| first_query = "The total length of the content that I want to send you is too large to send in only one piece.\nFor sending you that content, I will follow this rule:\n[START PART 1/10]\nThis is the content of the part 1 out of 10 in total\n[END PART 1/10]\nThen you just answer: 'Received part 1/10'\nAnd when I tell you 'ALL PART SENT', then you can continue processing the data and answering my requests." | |
| app.generate(messages=[{"role": "user", "content": first_query}]) | |
| for index, chunk in enumerate(chunks): | |
| # Process each chunk with ChatGPT | |
| if index + 1 != len(chunks): | |
| chunk_query = ( | |
| "Do not answer yet. This is just another part of the text I want to send you. Just receive and acknowledge as 'Part " | |
| + str(index + 1) | |
| + "/" | |
| + str(len(chunks)) | |
| + "received' and wait for the next part.\n" | |
| + "[START PART " | |
| + str(index + 1) | |
| + "/" | |
| + str(len(chunks)) | |
| + "]\n" | |
| + chunk | |
| + "\n[END PART " | |
| + str(index + 1) | |
| + "/" | |
| + str(len(chunks)) | |
| + "]\n" | |
| + "Remember not answering yet. Just acknowledge you received this part with the message 'Part 1/10 received' and wait for the next part." | |
| ) | |
| llm_rails( | |
| setting=setting, | |
| rails_app=app, | |
| firebase_app=firebase_app, | |
| query=chunk_query, | |
| image_search=image_search, | |
| is_browser=is_browser, | |
| ) | |
| else: | |
| last_query = ( | |
| "[START PART " | |
| + str(index + 1) | |
| + "/" | |
| + str(len(chunks)) | |
| + chunk | |
| + "\n[END PART " | |
| + str(index + 1) | |
| + "/" | |
| + str(len(chunks)) | |
| + "]\n" | |
| + "ALL PART SENT. Now you can continue processing the request." | |
| ) | |
| message = llm_rails( | |
| setting=setting, | |
| rails_app=app, | |
| firebase_app=firebase_app, | |
| query=last_query, | |
| image_search=image_search, | |
| is_browser=is_browser, | |
| ) | |
| return message | |
| # out of for-loop | |
| def getCompletion( | |
| query: str, | |
| setting: ReqModel, | |
| firebase_app: firebase_admin.App, | |
| is_browser: bool = False, | |
| image_search: bool = True, | |
| ): | |
| llm = get_llm(model=DEFAULT_GPT_MODEL, setting=setting).get_llm() | |
| # Break input text into chunks | |
| chunks = getChunks(query) | |
| app = LLMRails(config, llm) | |
| return processLargeText( | |
| setting=setting, | |
| app=app, | |
| chunks=chunks, | |
| image_search=image_search, | |
| firebase_app=firebase_app, | |
| is_browser=is_browser, | |
| ) | |
| def getCompletionOnly( | |
| query: str, | |
| model: str = "gpt-4", | |
| ) -> str: | |
| llm = ChatOpenAI(model_name=model, temperature=0.5, openai_api_key=OPENAI_API_KEY) | |
| chain = load_qa_chain(llm, chain_type="stuff") | |
| chain_data = chain.run(input_documents=[], question=query) | |
| return chain_data | |
| def query_image_ask(image_content, message, setting: ReqModel): | |
| prompt_template = get_prompt_image_with_message(image_content, message) | |
| try: | |
| data = getCompletion(query=prompt_template, image_search=False, setting=setting) | |
| # chain_data = json.loads(data.replace("'", '"')) | |
| # chain_data = json.loads(data) | |
| if data["program"] == "image": | |
| return True | |
| except Exception as e: | |
| return False | |
| return False | |
| def getTextFromImage(filename: str, firebase_app: firebase_admin.App) -> str: | |
| # Create a reference to the image file you want to download | |
| bucket = storage.bucket(app=firebase_app) | |
| blob = bucket.blob(FIREBASE_STORAGE_ROOT.__add__(filename)) | |
| download_url = "" | |
| try: | |
| # Download the image to a local file | |
| download_url = blob.generate_signed_url( | |
| datetime.timedelta(seconds=300), method="GET", version="v4" | |
| ) | |
| output = replicate.run( | |
| "salesforce/blip:2e1dddc8621f72155f24cf2e0adbde548458d3cab9f00c0139eea840d0ac4746", | |
| input={"image": download_url}, | |
| ) | |
| except Exception as e: | |
| output = str("Error happend while analyzing your prompt. Please ask me again :") | |
| return str(output) | |
| """chat with ai | |
| response: | |
| { | |
| 'id': 'chatcmpl-6p9XYPYSTTRi0xEviKjjilqrWU2Ve', | |
| 'object': 'chat.completion', | |
| 'created': 1677649420, | |
| 'model': 'gpt-3.5-turbo', | |
| 'usage': {'prompt_tokens': 56, 'completion_tokens': 31, 'total_tokens': 87}, | |
| 'choices': [ | |
| { | |
| 'message': { | |
| 'role': 'assistant', | |
| 'content': 'The 2020 World Series was played in Arlington, Texas at the Globe Life Field, which was the new home stadium for the Texas Rangers.'}, | |
| 'finish_reason': 'stop', | |
| 'index': 0 | |
| } | |
| ] | |
| } | |
| """ | |
| # Define a content filter function | |
| def filter_guardrails(setting: ReqModel, query: str): | |
| llm = ChatOpenAI( | |
| model_name=DEFAULT_GPT_MODEL, temperature=0, openai_api_key=setting.openai_key | |
| ) | |
| app = LLMRails(config, llm) | |
| # split query with chunks | |
| chunks = getChunks(query) | |
| # get message from guardrails | |
| message = processLargeText(app=app, chunks=chunks, setting=setting) | |
| if ( | |
| message | |
| == "Sorry, I cannot comment on anything which is relevant to the password or pin code." | |
| or message | |
| == "I am an Rising AI assistant which helps answer questions based on a given knowledge base." | |
| ): | |
| return message | |
| else: | |
| return "" | |
| """ | |
| compose json_string for rails input with its arguments | |
| """ | |
| def rails_input_with_args( | |
| setting: ReqModel, | |
| query: str, | |
| image_search: bool, | |
| is_browser: bool, | |
| page_content: str = "", | |
| document_id: str = "", | |
| ) -> str: | |
| # convert json with params for rails. | |
| json_query_with_params = { | |
| "query": query, | |
| "image_search": image_search, | |
| "page_content": page_content, | |
| "document_id": document_id, | |
| "setting": setting.to_json(), | |
| "is_browser": is_browser, | |
| } | |
| return json.dumps(json_query_with_params) | |
| """main method to handle basic query""" | |
| def ask_question( | |
| query: str, | |
| setting: ReqModel, | |
| is_browser: bool, | |
| image_search: bool, | |
| document_id: str = "", | |
| page_content: str = "", | |
| ) -> Any: | |
| """init falcon model""" | |
| falcon_llm = FalconLLM() | |
| autotask_service = AutoTaskService() | |
| docs = [] | |
| if ACTION_FLAG: | |
| # apply the proper prompt for each platform | |
| prompt_template = EXTENSION_PROMPT if is_browser else MOBILE_PROMPT | |
| docs.append(Document(page_content=prompt_template, metadata="")) | |
| # temperature shouldbe 0. | |
| chain_data = get_llm_chain( | |
| model=DEFAULT_GPT_MODEL, setting=setting, temperature=0.0 | |
| ).run(input_documents=docs, question=query) | |
| else: | |
| docs.append(Document(page_content=page_content, metadata="")) | |
| """ 1. calling gpt model to categorize for all message""" | |
| chain_data = get_llm_chain(model=DEFAULT_GPT_MODEL, setting=setting).run( | |
| input_documents=docs, question=query | |
| ) | |
| try: | |
| result = json.loads(chain_data) | |
| # check image query with only its text | |
| if result["program"] == ProgramType.IMAGE: | |
| if image_search: | |
| result["content"] = { | |
| "image_name": query_image_text(result["content"], "", setting) | |
| } | |
| """ 2. check program is message to handle it with falcon llm """ | |
| if result["program"] == ProgramType.MESSAGE: | |
| if is_browser: | |
| result["program"] = ProgramType.BrowserType.ASK_WEBSITE | |
| return result | |
| except ValueError as e: | |
| # Check sms and browser query | |
| if document_id in COMMAND_SMS_INDEXES: | |
| return {"program": ProgramType.SMS, "content": chain_data} | |
| elif document_id in COMMAND_BROWSER_OPEN: | |
| return {"program": ProgramType.BROWSER, "content": "https://google.com"} | |
| if is_browser: | |
| return {"program": ProgramType.BrowserType.ASK_WEBSITE, "content": ""} | |
| return {"program": ProgramType.MESSAGE, "content": chain_data} | |
| def handle_chat_completion(messages: Any, model: str = "gpt-3.5-turbo") -> Any: | |
| openai.api_key = OPENAI_API_KEY | |
| response = openai.ChatCompletion.create( | |
| model=model, | |
| messages=messages, | |
| ) | |
| # Filter the reply using the content filter | |
| # result = filter_guardrails(model, messages[-1]["content"]) | |
| # comment logic issue with guardrails | |
| # if result == "": | |
| # return response | |
| # else: | |
| # response["choices"][0]["message"]["content"] = result | |
| # return response | |
| return response | |