import json import os import ast from pathlib import Path from typing import List import langchain.llms as LLM from app.custom_langchain import ModAPIChain, APIResponse, FlexAPIChain, FlexAPIChainPayload from app.utils import find_value, custom_api_prompt from app.custom_api_prompts import API_REQUEST_PROMPT, \ API_REQUEST_PROMPT2, API_RESPONSE_PROMPT, API_RESPONSE_PROMPT2, PERMISSION_PROMPT PATH = Path(os.path.abspath(os.path.dirname(__file__))) # Open the YAML file api_doc_path = PATH / 'dbr_api_docs' def prepare_api_docs(): """ Add environment variables to the individual API docs. """ updated_api_docs = {} for filename in os.listdir(api_doc_path): with open(os.path.join(api_doc_path, filename)) as f: updated_api_docs[filename] = f.read() updated_api_docs[filename] = updated_api_docs[filename].replace( "{DATABRICKS_HOST}", os.getenv('DATABRICKS_HOST')) return updated_api_docs def get_run(llm: LLM, runID: str, query: str, api_text: str, headers: dict) -> str: """ Get infos from an MLflow run. Args: llm (LLM): Large language model runID (str): Run ID query (str): Query api_text (str): API documentation for a particular endpoint headers (dict): Authorization for the Databricks API Return: (str): the response of the language model """ mlflow_run_get_chain = APIResponse.from_llm_and_api_docs(llm, api_text, headers=headers, verbose=True) run_infos = json.loads(mlflow_run_get_chain.run( f"""Please give me the infos about the run '{runID}'""")) # Please return the f1 score return llm(f"""{query} of the metrics : {find_value(run_infos, 'metrics')}""") def trans_model(llm: LLM, query: str, api_text: str, headers: dict): """ Transition a MLflow model to another stage. Args: llm (LLM): Large language model query (str): Query for the model api_text (str): API documentation for a particular endpoint headers (dict): Authorization for the Databricks API """ mlflow_model_transition_chain = FlexAPIChainPayload.from_llm_and_api_docs( llm, api_text, headers=headers, api_url_prompt=API_REQUEST_PROMPT2, api_response_prompt=API_RESPONSE_PROMPT2, verbose=True ) mlflow_model_transition_chain.run(query) def get_model(llm: LLM, model_query: str, api_text: str, headers: dict) -> str: """ Get all infos about a model and its particular stages Args: llm (LLM): Large language model model_query (str): Query api_text (str): API documentation for a particular endpoint headers (dict): Authorization for the Databricks API Return: (str): response of the language model """ mlflow_model_get_chain = APIResponse.from_llm_and_api_docs(llm, api_text, headers=headers, verbose=True) api_result = mlflow_model_get_chain.run(model_query) return llm( f"""What is the runID and their version of the latest version with stage 'None' \ and stage 'Production' respectively, given the following info: {api_result}""" ) def batch_mod_permission( logger, llm: LLM, updated_api_docs: dict, api_text: str, api_name: str, headers: dict, permission_mod: str, jobs: List[str] ): """ Get or modify permissions for a batch of jobs. Args: logger (Logger): Logger object llm (LLM): Large language model updated_api_docs (dict): Updated API docs api_text (str): API documentation for a particular endpoint api_name (str): API name headers (dict): Authorization for the Databricks API permission_mod (str): Permission modification jobs (list): List of jobs """ get_permission_txt = updated_api_docs['get_permissions.txt'] for job in jobs: logger.info(f"Permission for job: {job}") mod_permissions( logger, llm, permission_mod, job, headers, get_permission_txt, api_text, api_name ) def get_permissions(logger, llm: LLM, api_text: str, headers: dict, jobID: str) -> dict: """ Get the permissions of a particular job. Args: logger (Logger): Logger object llm (LLM): Large language model api_text (str): API documentation for a particular endpoint headers (dict): Authorization for the Databricks API jobID (str): ID of the Databricks job Return: (dict): Access control list """ init_query = f"""Get the jobs permissions for jobID {jobID}""" permission_get_chain = APIResponse.from_llm_and_api_docs(llm, api_text, headers=headers, verbose=True) acc_control_list = {'access_control_list': json.loads(permission_get_chain.run(init_query))['access_control_list']} logger.info(acc_control_list) return acc_control_list def mod_permissions( logger, llm: LLM, permission_mod: str, jobID: str, headers: dict, get_permission_txt: str, api_text: str, api_name: str ): """ Modify permissions for a job. Args: logger (Logger): Logger object llm (LLM): Large language model permission_mod (str): Permission modification jobID (str): ID of the Databricks job headers (dict): Authorization for the Databricks API get_permission_txt (str): API text for getting permissions api_text (str): API documentation for a particular endpoint api_name (str): file name of the API documentation """ acc_control_list = get_permissions(logger, llm, get_permission_txt, headers, jobID) # Update the permission, if the query demands it if api_name == 'update_permissions.txt': permission_prompt= PERMISSION_PROMPT.format( acc_control_list=acc_control_list, permission_mod=permission_mod) output_acc_list = llm(permission_prompt) patch_payload = ast.literal_eval(output_acc_list.rstrip().lstrip()) # # Reformat the general permission response into the form for the patch payload for idx, _ in enumerate(patch_payload['access_control_list']): patch_payload['access_control_list'][idx]['permission_level'] = patch_payload['access_control_list'][idx]['all_permissions'][0]['permission_level'] patch_payload['access_control_list'][idx].pop('all_permissions') permission_update_chain = FlexAPIChain.from_llm_and_api_docs( llm, api_text, headers=headers, verbose=True, api_url_prompt=API_REQUEST_PROMPT, api_response_prompt=API_RESPONSE_PROMPT ) # use updated API Chain which can do post, put and patch also.... permission_update_chain.body = patch_payload logger.info(f"Payload for permission update: {patch_payload}") permission_update_chain(f"""Update the jobs permissions of jobID {jobID}""") def get_job_infos( llm: LLM, query: str, response_query: str, api_text: str, headers: dict ) -> str: """ Get particular infos about the list of existing jobs. Args: llm (LLM): Large language model query (str): Query to get the list of jobs response_query (str): Query to get the response api_text (str): API documentation for a particular endpoint headers (dict): Authorization for the Databricks API Return: (str): response from the large language model """ job_chain = ModAPIChain.from_llm_and_api_docs( llm, api_text, headers=headers, verbose=True ) custom_resp = f"This response contains a list of databricks jobs. {response_query}" jobs_result = custom_api_prompt(llm, job_chain, query, custom_resp) return jobs_result