File size: 4,087 Bytes
237937e 8431548 29d5f58 90f31d0 29d5f58 237937e 628ab4e 237937e a6a51c7 237937e 345e6fe 29d5f58 345e6fe 29d5f58 345e6fe 237937e 29d5f58 f6d515d 237937e 35abdce 237937e a6a51c7 072188f f6d515d 237937e 35abdce 237937e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
from typing import Dict, Any
from aiflows.base_flows.atomic import AtomicFlow
import time
import subprocess
import os
class ParseFeedbackAtomicFlow(AtomicFlow):
"""This flow parses the feedback from the user. The flow is supposed to be called after the CodeFileEdit or PlanFileEdit flow.
The flow will open the file in VSCode and wait for the user to edit the file. Once the user closes the file, the flow will parse the file and return the content of the file.
*Input Interface*:
- `temp_code_file_location`: Notice that if we are parsing the plan file, this should be changed to `temp_plan_file_location`.
*Output Interface*:
- `code`: The content of the code file.
- `feedback`: The feedback from the user.
"""
def _read_content(self, file_location, file_type):
if file_type == "code":
header_string = "# Code:\n"
elif file_type == "plan":
header_string = "Plan:\n"
else:
raise NotImplemented
with open(file_location, "r") as file:
content = file.read()
start_index = content.find(header_string)
if start_index == -1:
raise ValueError(f"Format error: {header_string} tag not found.")
end_index = content.find("\n############\n")
if end_index == -1 or end_index < start_index:
raise ValueError(f"Format error: {file_type} section is malformed.")
ret = content[start_index + len(header_string):end_index].strip()
return ret
def _parse_user_thoughts(self, file_location):
with open(file_location, "r") as file:
content = file.read()
delimiter = "\n############\n"
parts = content.split(delimiter)
if len(parts) < 2:
raise ValueError("Format error: Unable to find the thoughts section.")
thoughts_section = parts[1].strip()
thoughts_index = thoughts_section.find("Thoughts:")
if thoughts_index == -1:
raise ValueError("Format error: 'Thoughts:' tag not found.")
thoughts = thoughts_section[thoughts_index + len("Thoughts:"):].strip()
if thoughts == '':
thoughts = 'Looks good, go on.'
return thoughts
def _check_input(self, input_data: Dict[str, Any]):
code_file_exists = "temp_code_file_location" in input_data
plan_file_exists = "temp_plan_file_location" in input_data
if code_file_exists == False and plan_file_exists == False:
raise AssertionError("Neither code file nor plan file is passed to ParseFeedbackAtomicFlow")
elif code_file_exists and plan_file_exists:
raise AssertionError("Both code file and plan file are passed to ParseFeedbackAtomic, which one to parse?")
def _open_file_and_wait_for_file_update(self, file_location, check_interval=1):
process = subprocess.Popen(["code", "--wait", file_location])
while True:
if process.poll() is not None:
break
time.sleep(check_interval)
def run(
self,
input_data: Dict[str, Any]
):
response = {}
self._check_input(input_data)
if "temp_code_file_location" in input_data:
temp_code_file_location = input_data['temp_code_file_location']
self._open_file_and_wait_for_file_update(temp_code_file_location)
code_content = self._read_content(temp_code_file_location, "code")
user_thoughts = self._parse_user_thoughts(temp_code_file_location)
response = {"code": code_content, "feedback": user_thoughts}
elif "temp_plan_file_location" in input_data:
plan_file_location = input_data["temp_plan_file_location"]
self._open_file_and_wait_for_file_update(plan_file_location)
plan_content = self._read_content(plan_file_location, "plan")
user_thoughts = self._parse_user_thoughts(plan_file_location)
response = {"plan": plan_content, "feedback": user_thoughts}
return response
|