Spaces:
Sleeping
Sleeping
charSLee013
feat: complete Hugging Face Spaces deployment with production-ready CognitiveKernel-Launchpad
1ea26af
| # | |
| import json | |
| from ..agents.agent import MultiStepAgent, register_template, ActionResult | |
| from ..agents.utils import zwarn, have_images_in_messages | |
| from ..agents.model import LLM | |
| from .utils import FileEnv | |
| from .prompts import PROMPTS as FILE_PROMPTS | |
| class FileAgent(MultiStepAgent): | |
| def __init__(self, settings=None, **kwargs): | |
| # note: this is a little tricky since things will get re-init again in super().__init__ | |
| feed_kwargs = dict( | |
| name="file_agent", | |
| description="A file agent helping to parse and process (a) file(s) to solve a specific task.", | |
| templates={"plan": "file_plan", "action": "file_action", "end": "file_end"}, # template names | |
| max_steps=16, | |
| ) | |
| feed_kwargs.update(kwargs) | |
| self.settings = settings # Store settings reference | |
| self.file_env_kwargs = {} # kwargs for file env | |
| self.check_nodiff_steps = 3 # if for 3 steps, we have the same file page, then explicitly indicating this! | |
| # Use configuration from settings instead of global state | |
| if settings and hasattr(settings, 'file'): | |
| self.max_file_read_tokens = settings.file.max_file_read_tokens | |
| self.max_file_screenshots = settings.file.max_file_screenshots | |
| else: | |
| # Fallback defaults if no settings provided | |
| self.max_file_read_tokens = 3000 | |
| self.max_file_screenshots = 2 | |
| self.file_env_kwargs['max_file_read_tokens'] = self.max_file_read_tokens | |
| self.file_env_kwargs['max_file_screenshots'] = self.max_file_screenshots | |
| # Use same model config as main model for multimodal (if provided); otherwise lazy init | |
| multimodal_kwargs = kwargs.get('model_multimodal', {}).copy() if kwargs.get('model_multimodal') else None | |
| if multimodal_kwargs: | |
| self.model_multimodal = LLM(**multimodal_kwargs) | |
| else: | |
| # Lazy/default init to avoid validation errors when not needed | |
| self.model_multimodal = LLM(_default_init=True) | |
| # -- | |
| register_template(FILE_PROMPTS) # add web prompts | |
| super().__init__(**feed_kwargs) | |
| self.file_envs = {} # session_id -> ENV | |
| self.current_session = None | |
| self.ACTIVE_FUNCTIONS.update(stop=self._my_stop, load_file=self._my_load_file, read_text=self._my_read_text, read_screenshot=self._my_read_screenshot, search=self._my_search) | |
| # -- | |
| # note: a specific stop function! | |
| def _my_search(self, file_path: str, key_word_list: list): | |
| return ActionResult(f"search({file_path}, {key_word_list})") | |
| def _my_stop(self, answer: str = None, summary: str = None, output: str = None): | |
| if output: | |
| ret = f"Final answer: [{output}] ({summary})" | |
| else: | |
| ret = f"Final answer: [{answer}] ({summary})" | |
| self.put_final_result(ret) # mark end and put final result | |
| return ActionResult("stop", ret) | |
| def _my_load_file(self, file_path: str): | |
| return ActionResult(f'load_file({file_path})') | |
| def _my_read_text(self, file_path: str, page_id_list: list): | |
| return ActionResult(f"read_text({file_path}, {page_id_list})") | |
| def _my_read_screenshot(self, file_path: str, page_id_list: list): | |
| return ActionResult(f"read_screenshot({file_path}, {page_id_list})") | |
| def get_function_definition(self, short: bool): | |
| if short: | |
| return "- def file_agent(task: str, file_path_dict: dict = None) -> Dict: # Processes and analyzes one or more files to accomplish a specified task, with support for various file types such as PDF, Excel, and images." | |
| else: | |
| return """- file_agent | |
| ```python | |
| def file_agent(task: str, file_path_dict: dict = None) -> dict: | |
| \""" Processes and analyzes one or more files to accomplish a specified task. | |
| Args: | |
| task (str): A clear description of the task to be completed. If the task requires a specific output format, specify it here. | |
| file_path_dict (dict, optional): A dictionary mapping file paths to short descriptions of each file. | |
| Example: {"./data/report.pdf": "Annual financial report for 2023."} | |
| If not provided, file information may be inferred from the task description. | |
| Returns: | |
| dict: A dictionary with the following structure: | |
| { | |
| 'output': <str> # The well-formatted answer to the task. | |
| 'log': <str> # Additional notes, processing details, or error messages. | |
| } | |
| Notes: | |
| - If the task specifies an output format, ensure the `output` field matches that format. | |
| - Supports a variety of file types, including but not limited to PDF, Excel, images, etc. | |
| - If no files are provided or if files need to be downloaded from the Internet, return control to the external planner to invoke a web agent first. | |
| Example: | |
| >>> answer = file_agent(task="Based on the files, what was the increase in total revenue from 2022 to 2023?? (Format your output as 'increase_percentage'.)", file_path_dict={"./downloadedFiles/revenue.pdf": "The financial report of the company XX."}) | |
| >>> print(answer) # directly print the full result dictionary | |
| \""" | |
| ```""" | |
| def __call__(self, task: str, file_path_dict: dict = None, **kwargs): # allow *args styled calling | |
| return super().__call__(task, file_path_dict=file_path_dict, **kwargs) | |
| def init_run(self, session): | |
| super().init_run(session) | |
| _id = session.id | |
| assert _id not in self.file_envs | |
| _kwargs = self.file_env_kwargs.copy() | |
| if session.info.get("file_path_dict"): | |
| _kwargs["starting_file_path_dict"] = session.info["file_path_dict"] | |
| self.file_envs[_id] = FileEnv(**_kwargs) | |
| self.current_session = session | |
| def end_run(self, session): | |
| ret = super().end_run(session) | |
| _id = session.id | |
| self.file_envs[_id].stop() | |
| del self.file_envs[_id] # remove web env | |
| return ret | |
| def step_prepare(self, session, state): | |
| self.current_session = session | |
| _input_kwargs, _extra_kwargs = super().step_prepare(session, state) | |
| _file_env = self.file_envs[session.id] | |
| _input_kwargs["max_file_read_tokens"] = _file_env.max_file_read_tokens | |
| _input_kwargs["max_file_screenshots"] = _file_env.max_file_screenshots | |
| page_result = self._prep_page(_file_env.get_state()) # current file content | |
| _input_kwargs["textual_content"] = page_result['textual_content'] | |
| _input_kwargs["file_meta_data"] = page_result['file_meta_data'] | |
| _input_kwargs["loaded_files"] = page_result['loaded_files'] | |
| _input_kwargs["visual_content"] = page_result['visual_content'] | |
| _input_kwargs["image_suffix"] = page_result['image_suffix'] | |
| if not page_result["error_message"] is None: | |
| _input_kwargs["textual_content"] += "Note the error message:" + page_result['error_message'] | |
| if session.num_of_steps() > 1: # has previous step | |
| _prev_step = session.get_specific_step(-2) # the step before | |
| _input_kwargs["textual_content_old"] = self._prep_page(_prev_step["action"]["file_state_before"])["textual_content"] # old web page | |
| else: | |
| _input_kwargs["textual_content_old"] = "N/A" | |
| _extra_kwargs["file_env"] = _file_env | |
| return _input_kwargs, _extra_kwargs | |
| def step_action(self, action_res, action_input_kwargs, file_env=None, **kwargs): | |
| action_res["file_state_before"] = file_env.get_state() # inplace storage of the web-state before the action | |
| _rr = super().step_action(action_res, action_input_kwargs) # get action from code execution | |
| if isinstance(_rr, ActionResult): | |
| action_str, action_result = _rr.action, _rr.result | |
| else: | |
| action_str = self.get_obs_str(None, obs=_rr, add_seq_enum=False) | |
| action_str, action_result = "nop", action_str.strip() # no-operation | |
| # -- | |
| try: # execute the action on the browser | |
| step_result = file_env.step_state(action_str) | |
| ret = action_result if action_result is not None else step_result # use action result if there are direct ones | |
| # return f"File agent step: {action_str.strip()}" | |
| except Exception as e: | |
| zwarn("file_env execution error!" + f"\nFile agent error: {e} for {_rr}") | |
| ret = f"File agent error: {e} for {_rr}" | |
| return ret | |
| def step_call(self, messages, session, model=None): | |
| _use_multimodal = session.info.get("use_multimodal", False) or have_images_in_messages(messages) | |
| if model is None: | |
| model = self.model_multimodal if _use_multimodal else self.model # use which model? | |
| response = model(messages) | |
| return response | |
| # -- | |
| # other helpers | |
| def _prep_page(self, file_state): | |
| _ss = file_state | |
| _ret = {"loaded_files": _ss["loaded_files"], | |
| "file_meta_data":_ss["file_meta_data"], | |
| "textual_content":_ss["textual_content"], | |
| "visual_content":None, | |
| "image_suffix":None, | |
| "error_message":None} | |
| if _ss["error_message"]: | |
| # _ret = _ret + "\n(Note: " + _ss["error_message"] + ")" | |
| _ret["error_message"] = _ss["error_message"] | |
| if _ss["visual_content"]: | |
| _ret["visual_content"] = _ss["visual_content"] | |
| _ret["image_suffix"] = _ss["image_suffix"] | |
| return _ret | |