ParseFeedbackFlowModule / ParseFeedbackAtomicFlow.py
Tachi67's picture
Upload 3 files
a6a51c7
raw
history blame
3.62 kB
from typing import Dict, Any
from flows.base_flows.atomic import AtomicFlow
import time
import os
class ParseFeedbackAtomicFlow(AtomicFlow):
def _read_content(self, file_location, file_type):
if file_type == "code":
header_string = "# Code:\n"
elif file_type == "plan":
header_string = "Plan:\n"
else:
raise NotImplemented
with open(file_location, "r") as file:
content = file.read()
start_index = content.find(header_string)
if start_index == -1:
raise ValueError(f"Format error: {header_string} tag not found.")
end_index = content.find("\n############\n")
if end_index == -1 or end_index < start_index:
raise ValueError(f"Format error: {file_type} section is malformed.")
ret = content[start_index + len(header_string):end_index].strip()
return ret
def _parse_user_thoughts(self, file_location):
with open(file_location, "r") as file:
content = file.read()
delimiter = "\n############\n"
parts = content.split(delimiter)
if len(parts) < 2:
raise ValueError("Format error: Unable to find the thoughts section.")
thoughts_section = parts[1].strip()
thoughts_index = thoughts_section.find("Thoughts:")
if thoughts_index == -1:
raise ValueError("Format error: 'Thoughts:' tag not found.")
thoughts = thoughts_section[thoughts_index + len("Thoughts:"):].strip()
if thoughts == '':
thoughts = 'Looks good, go on.'
return thoughts
def _check_input(self, input_data: Dict[str, Any]):
code_file_exists = "temp_code_file_location" in input_data
plan_file_exists = "temp_plan_file_location" in input_data
if code_file_exists == False and plan_file_exists == False:
raise AssertionError("Neither code file nor plan file is passed to ParseFeedbackAtomicFlow")
elif code_file_exists and plan_file_exists:
raise AssertionError("Both code file and plan file are passed to ParseFeedbackAtomic, which one to parse?")
def _wait_for_file_update(self, file_location, last_check_time, check_interval=1):
while True:
time.sleep(check_interval)
current_time = os.path.getmtime(file_location)
if current_time != last_check_time:
break
def run(
self,
input_data: Dict[str, Any]
):
response = {}
self._check_input(input_data)
if "temp_code_file_location" in input_data:
temp_code_file_location = input_data['temp_code_file_location']
last_check_time = input_data["temp_code_file_written_timestamp"]
self._wait_for_file_update(temp_code_file_location, last_check_time)
code_content = self._read_content(temp_code_file_location, "code")
user_thoughts = self._parse_user_thoughts(temp_code_file_location)
response = {"code": code_content, "feedback": user_thoughts}
elif "temp_plan_file_location" in input_data:
plan_file_location = input_data["temp_plan_file_location"]
last_check_time = input_data["temp_plan_file_written_timestamp"]
self._wait_for_file_update(plan_file_location, last_check_time)
plan_content = self._read_content(plan_file_location, "plan")
user_thoughts = self._parse_user_thoughts(plan_file_location)
response = {"plan": plan_content, "feedback": user_thoughts}
return response