Spaces:
Sleeping
Sleeping
charSLee013
feat: complete Hugging Face Spaces deployment with production-ready CognitiveKernel-Launchpad
1ea26af
| # | |
| # utils for our web-agent | |
| import re | |
| import io | |
| import os | |
| import copy | |
| import requests | |
| import base64 | |
| try: | |
| import pdf2image | |
| _HAS_PDF2IMAGE = True | |
| except Exception: | |
| _HAS_PDF2IMAGE = False | |
| pdf2image = None | |
| import base64 | |
| import math | |
| import ast | |
| from ..agents.utils import KwargsInitializable, rprint, zwarn, zlog | |
| from .mdconvert import MarkdownConverter | |
| import markdownify | |
| from ..ck_web.utils import MyMarkdownify | |
| # -- | |
| # web state | |
| class FileState: | |
| def __init__(self, **kwargs): | |
| # current file | |
| self.current_file_name = None | |
| self.multimodal = False # whether to get the multimodal content of this state. | |
| # | |
| self.loaded_files = {} # keys: file names, values: True/False, whether the file is loaded. | |
| self.file_meta_data = {} # A string indicating number of pages, tokens each page. | |
| self.current_page_id_list = [] | |
| # | |
| self.textual_content = "" | |
| self.visual_content = [] | |
| self.image_suffix = [] | |
| # step info | |
| self.curr_step = 0 # step to the root | |
| self.total_actual_step = 0 # [no-rev] total actual steps including reverting (can serve as ID) | |
| self.num_revert_state = 0 # [no-rev] number of state reversion | |
| # (last) action information | |
| self.action_string = "" | |
| self.action = None | |
| self.error_message = "" | |
| self.observation = "" | |
| # -- | |
| self.update(**kwargs) | |
| def update(self, **kwargs): | |
| for k, v in kwargs.items(): | |
| assert (k in self.__dict__), f"Attribute not found for {k} <- {v}" | |
| self.__dict__.update(**kwargs) | |
| def to_dict(self): | |
| return self.__dict__.copy() | |
| def copy(self): | |
| return FileState(**self.to_dict()) | |
| def __repr__(self): | |
| return f"FileState({self.__dict__})" | |
| # an opened web browser | |
| class FileEnv(KwargsInitializable): | |
| def __init__(self, starting=True, starting_file_path_dict=None, **kwargs): | |
| # self.file_path_dict = starting_file_path_dict if starting_file_path_dict else {} # store these in the state instead | |
| self.md_converter = MarkdownConverter() | |
| self.file_text_by_page = {} | |
| self.file_screenshot_by_page = {} | |
| self.file_token_num_by_page = {} | |
| self.file_image_suffix_by_page = {} | |
| # maximum number of tokens that can be processed by the File Agent LLM | |
| self.max_file_read_tokens = 2000 | |
| self.max_file_screenshots = 2 | |
| # these variables will be overrwitten by that in kwargs. | |
| super().__init__(**kwargs) | |
| # -- | |
| self.state: FileState = None | |
| if starting: | |
| self.start(starting_file_path_dict) # start at the beginning | |
| # -- | |
| def read_file_by_page_text(self, file_path: str): | |
| return self.md_converter.convert(file_path).text_content.split('\x0c') # split by pages | |
| def find_file_name(self, file_name): | |
| # this function returns an exact match or a fuzzy match of the LLM-output file_name and what the files the environment actually have in state.loaded_files | |
| file_path_dict = self.state.loaded_files | |
| if file_name in file_path_dict: # directly matching | |
| return file_name | |
| elif os.path.basename(file_name) in [os.path.basename(p) for p in file_path_dict]: # allow name matching | |
| return [p for p in file_path_dict if os.path.basename(p) == os.path.basename(file_name)][0] | |
| elif os.path.exists(file_name): | |
| self.add_files_to_load([file_name]) # add it! | |
| return file_name | |
| else: # file not found! | |
| raise FileNotFoundError(f"FileNotFoundError for {file_name}.") | |
| def read_file_by_page_screenshot(file_path: str): | |
| screenshots_b64 = [] | |
| if file_path.endswith(".pdf"): | |
| images = [] | |
| if _HAS_PDF2IMAGE: | |
| try: | |
| images = pdf2image.convert_from_path(file_path) | |
| except Exception as e: | |
| zwarn(f"pdf2image convert_from_path failed: {e}") | |
| else: | |
| zwarn("pdf2image not available; skipping PDF screenshots") | |
| # Let's use the first page as an example | |
| for img in images: | |
| # Save the image to a bytes buffer in PNG format | |
| buffer = io.BytesIO() | |
| img.save(buffer, format="PNG") | |
| buffer.seek(0) | |
| img_bytes = buffer.read() | |
| # Encode to base64 | |
| img_b64 = base64.b64encode(img_bytes).decode('utf-8') | |
| screenshots_b64.append(img_b64) | |
| pdf_file = None | |
| if file_path.endswith(".xlsx") or file_path.endswith(".xls") or file_path.endswith(".csv"): | |
| import subprocess | |
| input_file = file_path | |
| try: | |
| subprocess.run([ | |
| "soffice", "--headless", "--convert-to", "pdf", "--outdir", | |
| os.path.dirname(input_file), input_file | |
| ], check=True) | |
| if input_file.endswith(".xlsx"): | |
| pdf_file = input_file[:-5] + ".pdf" | |
| elif input_file.endswith(".xls"): | |
| pdf_file = input_file[:-4] + ".pdf" | |
| elif input_file.endswith(".csv"): | |
| pdf_file = input_file[:-4] + ".pdf" | |
| images = [] | |
| if pdf_file and _HAS_PDF2IMAGE: | |
| try: | |
| images = pdf2image.convert_from_path(pdf_file) | |
| except Exception as e: | |
| zwarn(f"pdf2image convert_from_path failed for {pdf_file}: {e}") | |
| elif pdf_file: | |
| zwarn("pdf2image not available; skipping Excel/CSV screenshots") | |
| # Let's use the first page as an example | |
| for img in images: | |
| # Save the image to a bytes buffer in PNG format | |
| buffer = io.BytesIO() | |
| img.save(buffer, format="PNG") | |
| buffer.seek(0) | |
| img_bytes = buffer.read() | |
| # Encode to base64 | |
| img_b64 = base64.b64encode(img_bytes).decode('utf-8') | |
| screenshots_b64.append(img_b64) | |
| except Exception as e: | |
| zwarn(f"LibreOffice ('soffice') not available or conversion failed: {e}") | |
| return screenshots_b64 | |
| def start(self, file_path_dict=None): | |
| # for file_path in file_path_dict: | |
| # self.file_text_by_page[file_path] = self.read_file_by_page_text(file_path=file_path) | |
| # self.file_screenshot_by_page[file_path] = FileEnv.read_file_by_page_screenshot(file_path=file_path) | |
| self.init_state(file_path_dict) | |
| def stop(self): | |
| if self.state is not None: | |
| self.end_state() | |
| self.state = None | |
| def __del__(self): | |
| self.stop() | |
| # note: return a copy! | |
| def get_state(self, export_to_dict=True, return_copy=True): | |
| assert self.state is not None, "Current state is None, should first start it!" | |
| if export_to_dict: | |
| ret = self.state.to_dict() | |
| elif return_copy: | |
| ret = self.state.copy() | |
| else: | |
| ret = self.state | |
| return ret | |
| # -- | |
| # helpers | |
| def parse_action_string(self, action_string, state): | |
| patterns = { | |
| "load_file": r'load_file\((.*)\)', | |
| "read_text": r'read_text\((.*)\)', | |
| "read_screenshot": r'read_screenshot\((.*)\)', | |
| "search": r'search\((.*)\)', | |
| "stop": r"stop(.*)", | |
| "nop": r"nop(.*)", | |
| } | |
| action = {"action_name": "", "target_file": None, "page_id_list": None, "key_word_list": None} # assuming these fields | |
| if action_string: | |
| for key, pat in patterns.items(): | |
| m = re.match(pat, action_string, flags=(re.IGNORECASE|re.DOTALL)) # ignore case and allow \n | |
| if m: | |
| action["action_name"] = key | |
| if key in ["read_text", "read_screenshot"]: | |
| args_str = m.group(1) # target ID | |
| m_file = re.search(r'file_name\s*=\s*(".*?"|\'.*?\'|\[.*?\]|\d+)', args_str) | |
| m_page = re.search(r'page_id_list\s*=\s*(".*?"|\'.*?\'|\[.*?\]|\d+)', args_str) | |
| if m_file: | |
| file_name = m_file.group(1) | |
| else: | |
| file_name = None | |
| if m_page: | |
| page_id_list = m_page.group(1) | |
| else: | |
| page_id_list = None | |
| # If not named, try positional | |
| if file_name is None or page_id_list is None: | |
| # Split by comma not inside brackets or quotes | |
| # This is a simple split, not perfect for all edge cases | |
| parts = re.split(r',(?![^\[\]]*\])', args_str) | |
| if len(parts) >= 2: | |
| if file_name is None: | |
| file_name = parts[0] | |
| if page_id_list is None: | |
| page_id_list = parts[1] | |
| # Clean up quotes if needed | |
| if file_name: | |
| file_name = file_name.strip('\'"') | |
| if page_id_list: | |
| page_id_list = page_id_list.strip() | |
| # | |
| if file_name is None or page_id_list is None: | |
| zwarn(f"Failed to parse action string: {action_string}") | |
| return {"action_name": None} | |
| action["target_file"] = file_name.strip('"').strip("'") | |
| action["page_id_list"] = page_id_list | |
| elif key == "search": | |
| # search("filename.pdf", ["xxx", "yyy"]) | |
| # search("filename.pdf", ['xxx', 'yyy']) | |
| # search("filename.pdf", ["xxx", 'yyy']) | |
| # search("filename.pdf", "xxx") | |
| # search(file_name.pdf, "xxx") | |
| # search(file_name="filename.pdf", ["xxx", 'yyy']) | |
| # search(file_name="filename.pdf", key_word_list=["xxx", 'yyy']) | |
| s = m.group(1) | |
| filename_match = re.search( | |
| r'(?:file_name\s*=\s*)?' | |
| r'(?:["\']([\w\-.]+\.pdf)["\']|([\w\-.]+\.pdf))', s) | |
| filename = None | |
| if filename_match: | |
| filename = filename_match.group(1) or filename_match.group(2) | |
| # Match keywords: list or string, positional or keyword argument | |
| keyword_match = re.search( | |
| r'(?:key_word_list\s*=\s*|,\s*)(' | |
| r'\[[^\]]+\]|' # a list: [ ... ] | |
| r'["\'][^"\']+["\']' # or a single quoted string | |
| r')', s) | |
| keywords = None | |
| if keyword_match: | |
| kw_str = keyword_match.group(1) | |
| try: | |
| keywords = ast.literal_eval(kw_str) | |
| if isinstance(keywords, str): | |
| keywords = [keywords] | |
| except Exception as e: | |
| zwarn(f"搜索关键词解析失败 {kw_str}: {e}") | |
| keywords = [kw_str.strip('"\'')] | |
| action["target_file"] = filename | |
| if isinstance(keywords, list): | |
| action["key_word_list"] = keywords | |
| else: | |
| action["key_word_list"] = "###Error: the generated key_word_list is not valid. Please retry!" | |
| else: | |
| action["target_file"] = m.group(1).strip().strip('"').strip("'") | |
| if key in ["stop", "nop"]: | |
| action["action_value"] = m.groups()[-1].strip() # target value | |
| break | |
| return action | |
| def action(self, action): | |
| file_name = "" | |
| page_id_list = [] | |
| multimodal = False | |
| loaded_files = copy.deepcopy(self.state.loaded_files) | |
| file_meta_data = copy.deepcopy(self.state.file_meta_data) | |
| visual_content = None | |
| image_suffix = None | |
| error_message = None | |
| textual_content = "" | |
| observation = None | |
| if action["action_name"] == "load_file": | |
| file_name = self.find_file_name(action["target_file"]) | |
| if file_name.endswith(".pdf"): | |
| text_pages = self.md_converter.convert(file_name).text_content.split('\x0c') # split by pages | |
| text_screenshots = FileEnv.read_file_by_page_screenshot(file_name) | |
| _page_token_num = [math.ceil(len(text_pages[i].encode())/4) for i in range(len(text_pages))] | |
| _info = ", ".join([f"Sheet {i}: { _page_token_num[i] } " for i in range(len(text_pages))]) | |
| file_meta_data[file_name] = f"Number of pages of {file_name}: {len(text_pages)}. Number of tokens of each page: {_info}" | |
| observation = f"load_file({file_name}) # number of pages is {len(text_pages)}" | |
| image_suffix = ['png' for _ in text_screenshots] | |
| elif file_name.endswith(".xlsx") or file_name.endswith(".xls") or file_name.endswith(".csv"): | |
| text_pages = self.md_converter.convert(file_name).text_content.split('\x0c') # split by sheets | |
| text_screenshots = FileEnv.read_file_by_page_screenshot(file_name) | |
| _page_token_num = [math.ceil(len(text_pages[i].encode())/4) for i in range(len(text_pages))] | |
| _info = ", ".join([f"Sheet {i}: { _page_token_num[i] } " for i in range(len(text_pages))]) | |
| file_meta_data[file_name] = f"Number of sheets of {file_name}: {len(text_pages)}. Number of tokens of each page: {_info}. Number of screenshots of the excel file: {len(text_screenshots)}" | |
| observation = f"load_file({file_name}) # number of sheets is {len(text_pages)}" | |
| image_suffix = ['png' for _ in text_screenshots] | |
| elif any(file_name.endswith(img_suffix) for img_suffix in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff', '.webp']): | |
| text_pages = [""] | |
| _page_token_num = [0] | |
| with open(file_name, 'rb') as f: | |
| img_bytes = f.read() | |
| # Base64-encode the bytes and decode to UTF-8 string | |
| img_b64 = base64.b64encode(img_bytes).decode('utf-8') | |
| text_screenshots = [img_b64] | |
| image_suffix = [file_name.split('.')[-1]] | |
| file_meta_data[file_name] = "This is an image." | |
| observation = f"load_file({file_name}) # load an image" | |
| else: | |
| # first, try to use markdown converter to load the file | |
| # breakpoint() | |
| content = self.md_converter.convert(file_name) | |
| if any(file_name.endswith(img_suffix) for img_suffix in ['.htm', '.html']): | |
| content = MyMarkdownify().md_convert(content.text_content) | |
| else: | |
| content = content.text_content | |
| if '\x0c' in content: | |
| text_pages = content.split('\x0c') # split by pages | |
| else: | |
| def split_text_to_pages(text, max_tokens_per_page): | |
| """ | |
| Split the text into pages where each page has approximately max_tokens_per_page tokens. | |
| :param text: The input text to be split. | |
| :param max_tokens_per_page: The maximum number of tokens per page. | |
| :return: A list of text pages. | |
| """ | |
| # Initialize variables | |
| pages = [] | |
| current_page = [] | |
| current_tokens = 0 | |
| # Split the text into words | |
| words = text.split() | |
| for word in words: | |
| # Estimate the number of tokens for the current word | |
| word_tokens = math.ceil(len(word.encode()) / 4) | |
| # Check if adding this word would exceed the max tokens per page | |
| if current_tokens + word_tokens > max_tokens_per_page: | |
| # If so, finalize the current page and start a new one | |
| pages.append(' '.join(current_page)) | |
| current_page = [word] | |
| current_tokens = word_tokens | |
| else: | |
| # Otherwise, add the word to the current page | |
| current_page.append(word) | |
| current_tokens += word_tokens | |
| # Add the last page if it contains any words | |
| if current_page: | |
| pages.append(' '.join(current_page)) | |
| return pages | |
| text_pages = split_text_to_pages(content, self.max_file_read_tokens) | |
| # text_screenshots = FileEnv.read_file_by_page_screenshot(file_name) | |
| text_screenshots = [] | |
| _page_token_num = [math.ceil(len(text_pages[i].encode())/4) for i in range(len(text_pages))] | |
| _info = ", ".join([f"Sheet {i}: { _page_token_num[i] } " for i in range(len(text_pages))]) | |
| file_meta_data[file_name] = f"Number of pages of {file_name}: {len(text_pages)}. Number of tokens of each page: {_info}. Number of screenshots of the excel file: {len(text_screenshots)}" | |
| observation = f"load_file({file_name}) # number of sheets is {len(text_pages)}" | |
| loaded_files[file_name]= True | |
| # save the info to the file env | |
| self.file_text_by_page[file_name] = text_pages | |
| self.file_token_num_by_page[file_name] = _page_token_num | |
| self.file_screenshot_by_page[file_name] = text_screenshots | |
| self.file_image_suffix_by_page[file_name] = image_suffix | |
| page_id_list = [] | |
| textual_content = "The file has just loaded. Please call read_text() or read_screenshot()." | |
| elif action["action_name"] == "read_text": | |
| file_name = self.find_file_name(action["target_file"]) | |
| visual_content = None | |
| page_id_list = eval(action["page_id_list"]) | |
| # Check if the total number of tokens exceed max_file_read_tokens | |
| total_token_num = sum([self.file_token_num_by_page[file_name][i] for i in page_id_list]) | |
| truncated_page_id_list = [] | |
| remaining_page_id_list = [] | |
| if total_token_num > self.max_file_read_tokens: | |
| for j in range(len(page_id_list)-1, 0, -1): | |
| if sum([self.file_token_num_by_page[file_name][i] for i in page_id_list[:j]]) <= self.max_file_read_tokens: | |
| truncated_page_id_list = page_id_list[:j] | |
| remaining_page_id_list = page_id_list[j:] | |
| break | |
| # textual_content = "\n\n".join([f"Page {i}\n" + self.file_text_by_page[file_name][i] for i in page_id_list]) | |
| error_message = f"The pages you selected ({page_id_list}) exceed the maximum token limit {self.max_file_read_tokens}. They have been truncated to {truncated_page_id_list}. {remaining_page_id_list} has not been reviewed." | |
| page_id_list = truncated_page_id_list | |
| # else: | |
| textual_content = "\n\n".join([f"Page {i}\n" + self.file_text_by_page[file_name][i] for i in page_id_list]) | |
| multimodal = False | |
| observation = f"read_text({file_name}, {page_id_list}) # Read {len(page_id_list)} pages" | |
| elif action["action_name"] == "read_screenshot": | |
| file_name = self.find_file_name(action["target_file"]) | |
| page_id_list = eval(action["page_id_list"]) | |
| textual_content = "\n\n".join([f"Page {i}\n" + self.file_text_by_page[file_name][i] for i in page_id_list]) | |
| # make sure the number of screenshots and total number of text tokens both do not exceed the maximum constraint. | |
| truncated_page_id_list = copy.deepcopy(page_id_list) | |
| remaining_page_id_list = [] | |
| if len(page_id_list) > self.max_file_screenshots: | |
| truncated_page_id_list = truncated_page_id_list[:self.max_file_screenshots] | |
| remaining_page_id_list = sorted(list(set(page_id_list) - set(truncated_page_id_list))) | |
| # check if text tokens satisfy the contraint: | |
| if sum([self.file_token_num_by_page[file_name][i] for i in truncated_page_id_list]) > self.max_file_read_tokens: | |
| for j in range(len(truncated_page_id_list)-1, 0, -1): | |
| if sum([self.file_token_num_by_page[file_name][i] for i in truncated_page_id_list[:j]]) <= self.max_file_read_tokens: | |
| truncated_page_id_list = truncated_page_id_list[:j] | |
| remaining_page_id_list = sorted(list(set(page_id_list) - set(truncated_page_id_list))) | |
| break | |
| if len(remaining_page_id_list) > 0: | |
| error_message = f"The pages you selected ({page_id_list}) exceed the maximum token limit {self.max_file_read_tokens} or the maximum screenshot limit {self.max_file_screenshots}. They have been truncated to {truncated_page_id_list}. {remaining_page_id_list} has not been reviewed." | |
| page_id_list = truncated_page_id_list | |
| textual_content = "\n\n".join([f"Page {i}\n" + self.file_text_by_page[file_name][i] for i in page_id_list]) | |
| visual_content = [self.file_screenshot_by_page[file_name][i] for i in page_id_list] | |
| image_suffix = [self.file_image_suffix_by_page[file_name][i] for i in page_id_list] | |
| multimodal = True | |
| observation = f"read_screenshot({file_name}, {page_id_list}) # Read {len(page_id_list)} pages" | |
| elif action["action_name"] == "search": | |
| if "###Error" in action["key_word_list"]: | |
| error_message = action["key_word_list"] | |
| else: | |
| # perform searching | |
| file_name = self.find_file_name(action["target_file"]) | |
| key_word_list = action["key_word_list"] | |
| def find_keyword_pages(file_name, key_word_list): | |
| """ | |
| file_text_by_page: dict, e.g. {'filename.pdf': [page1_text, page2_text, ...]} | |
| file_name: str, the filename key | |
| key_word_list: list of str, keywords to search for | |
| page_base: 0 for 0-based page numbers, 1 for 1-based | |
| Returns: dict, {keyword: [page_numbers]} | |
| """ | |
| result = {} | |
| pages = self.file_text_by_page[file_name] | |
| for keyword in key_word_list: | |
| result[keyword] = [ | |
| i for i, page_text in enumerate(pages) | |
| if keyword in page_text | |
| ] | |
| return result | |
| search_result = find_keyword_pages(file_name, key_word_list) | |
| observation = f"The result of search({file_name}, {key_word_list}). The keys of the result dict are the keywords, and the values are the corresponding page indices that contains the keyword: {search_result}" | |
| elif action["action_name"] == "stop": | |
| pass | |
| # self.state.current_file_name = file_name | |
| # self.state.current_page_id_list = page_id_list | |
| if error_message: | |
| observation = f"{observation} (**Warning**: {error_message})" | |
| return True, {"current_file_name": file_name, "current_page_id_list": page_id_list, "loaded_files": loaded_files, "multimodal": multimodal, "file_meta_data": file_meta_data, "textual_content": textual_content, "visual_content": visual_content, "image_suffix": image_suffix, "error_message": error_message, "observation": observation} | |
| # -- | |
| # other helpers | |
| # -- | |
| # main step | |
| def init_state(self, file_path_dict: dict): | |
| self.state = FileState() # set the new state! | |
| if file_path_dict: | |
| self.add_files_to_load(file_path_dict) | |
| def end_state(self): | |
| del self.file_text_by_page | |
| del self.file_screenshot_by_page | |
| import gc | |
| gc.collect() | |
| def add_files_to_load(self, files): | |
| self.state.loaded_files.update({file: False for file in files}) | |
| def step_state(self, action_string: str): | |
| state = self.state | |
| action_string = action_string.strip() | |
| # -- | |
| # parse action | |
| action = self.parse_action_string(action_string, state) | |
| zlog(f"[CallFile:{state.curr_step}:{state.total_actual_step}] ACTION={action} ACTION_STR={action_string}", timed=True) | |
| # -- | |
| # execution | |
| state.curr_step += 1 | |
| state.total_actual_step += 1 | |
| state.update(action=action, action_string=action_string, error_message="") # first update some of the things | |
| if not action["action_name"]: # UNK action | |
| state.error_message = f"The action you previously choose is not well-formatted: {action_string}. Please double-check if you have selected the correct element or used correct action format." | |
| ret = state.error_message | |
| elif action["action_name"] in ["stop", "nop"]: # ok, nothing to do | |
| ret = f"File agent step: {action_string}" | |
| else: | |
| # actually perform action | |
| action_succeed, results = self.action(action) | |
| if not action_succeed: # no succeed | |
| state.error_message = f"The action you have chosen cannot be executed: {action_string}. Please double-check if you have selected the correct element or used correct action format." | |
| ret = state.error_message | |
| else: # get new states | |
| # results = self._get_current_file_state(state) | |
| state.update(**results) # update it! | |
| ret = f"File agent step: {results.get('observation', action_string)}" | |
| return ret | |
| # -- | |