import subprocess import webbrowser from typing import Dict, Any from flows.base_flows.atomic import AtomicFlow import os class PlanFileEditAtomicFlow(AtomicFlow): def _generate_content(self, plan_file_location, plan_str) -> str: content = ( "The below plan will be written to " + plan_file_location + "\n" "Edit the plan directly or provide your thoughts down below if you have any suggestions.\n" "Please do provide your feedback in the thoughts section so that JARVIS knows what you " "are doing.\n" "###########\n" "Plan:\n" + plan_str + "\n############\n" "Thoughts:" ) return content def _generate_temp_file_location(self, plan_file_location): directory = os.path.dirname(plan_file_location) ret = os.path.join(directory, 'temp_plan.txt') return ret def _write_plan_content_to_file(self, file_location, content: str): try: with open(file_location, "w") as file: file.write(content) file_written_timestamp = os.path.getmtime(file_location) return True, f"Plan written to {file_location}", file_location, file_written_timestamp except Exception as e: return False, str(e), file_location, 0 def _check_input(self, input_data: Dict[str, Any]): assert any(item in input_data for item in ["plan", "new_plan"]), "plan or new_plan is not passed to PlanFileEditAtomicFlow" assert "plan_file_location" in input_data, "plan_file_location not passed to PlanFileEditAtomicFlow" plan_file_loc = input_data["plan_file_location"] assert os.path.exists(plan_file_loc), f"{plan_file_loc} does not exist" assert os.path.isfile(plan_file_loc), f"{plan_file_loc} is not a file" def _generate_input_to_writer(self, input_data: Dict[str, Any]): plan_str = input_data['plan'] if "plan" in input_data else input_data['new_plan'] plan_file_location = input_data["plan_file_location"] content_to_write = self._generate_content(plan_file_location, plan_str) file_location_to_write = self._generate_temp_file_location(plan_file_location) return content_to_write, file_location_to_write def run( self, input_data: Dict[str, Any] ): self._check_input(input_data) # ~~~ Getting input data to the file editor ~~~ content_to_write, file_location_to_write = self._generate_input_to_writer(input_data) # ~~~ Calling the writer function ~~~ result, plan_editor_output, temp_file_location, file_written_timestamp = self._write_plan_content_to_file( file_location_to_write, content_to_write) # ~~~ Opening up the file for the user to see ~~~ if result: try: subprocess.run(["code", temp_file_location], timeout=10) except (subprocess.CalledProcessError, subprocess.TimeoutExpired): webbrowser.open(temp_file_location) # ~~~ Generating return variables ~~~ response = {} response["plan_editor_output"] = plan_editor_output response["temp_plan_file_location"] = temp_file_location response["temp_plan_file_written_timestamp"] = file_written_timestamp return response