Spaces:
Sleeping
Sleeping
File size: 7,979 Bytes
e53fda1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 | import json
import re
from typing import Optional, Tuple, Dict
from collections import OrderedDict
from collections.abc import Mapping, Iterable
from datetime import datetime
# import torch
# from cradle import constants
from utils.string_utils import contains_punctuation, is_numbered_bullet_list_item
def load_json(file_path):
with open(file_path, mode='r', encoding='utf8') as fp:
json_dict = json.load(fp)
return json_dict
# def serialize_data(item):
# """Recursively convert non-serializable items in the dictionary."""
# if isinstance(item, (str, int, float, bool)):
# return item
# elif isinstance(item, torch.Tensor):
# # Check if the tensor is 0-d (a scalar)
# if item.dim() == 0:
# # Convert scalar tensor to a Python number
# return item.item()
# else:
# # Check if tensor is on a GPU, move to CPU first
# if item.is_cuda:
# item = item.cpu()
# # Convert tensor to a list
# return item.numpy().tolist()
# elif isinstance(item, datetime):
# return item.isoformat()
# if isinstance(item, Mapping):
# return {key: serialize_data(value) for key, value in item.items()}
# elif isinstance(item, Iterable):
# return [serialize_data(element) for element in item]
# elif isinstance(item, JsonFrameStructure): # Assuming JSONStructure needs to be handled
# return item.to_dict() # Assuming JSONStructure objects have a to_dict method or similar
# return item
# def save_json(file_path, json_dict, indent=-1):
# processed_data = serialize_data(json_dict)
# with open(file_path, mode='w', encoding='utf8') as fp:
# if indent == -1:
# json.dump(processed_data, fp, ensure_ascii=False)
# else:
# json.dump(processed_data, fp, ensure_ascii=False, indent=indent)
def check_json(json_string):
try:
json.loads(json_string)
except:
return False
return True
def refine_json(json_string):
patterns = [
r"^`+json(.*?)`+", # ```json content```, ```json content``, ...
r"^json(.*?)", # json content
r"^json(.*?)\." # json content.
]
for pattern in patterns:
match = re.search(pattern, json_string, re.DOTALL)
if match:
json_string = match.group(1)
if check_json(json_string):
return json_string
return json_string
def parse_semi_formatted_json(json_string):
obj = None
try:
response = refine_json(json_string)
obj = json.loads(response)
except Exception as e:
raise ValueError(f"Error in processing json: {e}. Object was: {json_string}.") from e
return obj
def _is_line_key_candidate(line: str) -> Tuple[bool, Optional[str]]:
result = False
likely_key = None
if line.endswith(':'):
# Cannot have other previous punctuation, except if it's a numbered bullet list item
num_idx = is_numbered_bullet_list_item(line)
post_num_idx = 0
if num_idx > -1:
post_num_idx = num_idx
likely_key = line[post_num_idx:-1].strip()
result = not contains_punctuation(likely_key)
return result, likely_key
### Parses the semi-formatted text from model response
def parse_semi_formatted_text(text):
lines = text.split('\n')
lines = [line.rstrip() for line in lines if line.rstrip()]
result_dict = {}
current_key = None
current_value = []
parsed_data = []
in_code_flag = False
for line in lines:
line = line.replace("**", "").replace("###", "").replace("##", "") # Remove unnecessary in Markdown formatting
is_key, key_candidate = _is_line_key_candidate(line)
# Check if the line indicates a new key
if is_key and in_code_flag == False:
# If there's a previous key, process its values
if current_key and current_key == 'action_guidance':
result_dict[current_key] = parsed_data
elif current_key:
result_dict[current_key] = '\n'.join(current_value).strip()
try:
current_key = key_candidate.replace(" ", "_").lower()
except Exception as e:
# logger.error(f"Response is not in the correct format: {e}\nReceived text was: {text}")
raise
current_value = []
parsed_data = []
else:
if current_key == 'action_guidance':
in_code_flag = True
if line.strip() == '```':
if current_value: # Process previous code block and description
entry = {"code": '\n'.join(current_value[1:])}
parsed_data.append(entry)
current_value = []
in_code_flag = False
else:
current_value.append(line)
if line.strip().lower() == 'null':
in_code_flag = False
else:
in_code_flag = False
line = line.strip()
current_value.append(line)
# Process the last key
if current_key == 'action_guidance':
if current_value: # Process the last code block and description
entry = {"code": '\n'.join(current_value[:-1]).strip()}
parsed_data.append(entry)
result_dict[current_key] = parsed_data
else:
result_dict[current_key] = '\n'.join(current_value).strip()
if "success" in result_dict:
result_dict["success"] = result_dict["success"].lower() == "true"
return result_dict
class JsonFrameStructure():
def __init__(self):
self.data_structure: Dict[int, Dict[str, list[Dict[str, any]]]] = {}
self.end_index: int = -1
def add_instance(self, timestamp: str, instance: dict[str, any]) -> None:
# Check if the timestamp already exists across all indices
exists = False
for index_data in self.data_structure.values():
if timestamp in index_data:
# Timestamp already exists, append the instance to the existing timestamp
index_data[timestamp].append(instance)
exists = True
break
if not exists:
# Timestamp is new, create a new entry and increment the end_index
self.end_index += 1
self.data_structure.setdefault(self.end_index, {}).setdefault(timestamp, []).append(instance)
def sort_index_by_timestamp(self) -> None:
extracted_data = [(key, value) for entry in self.data_structure.values() for key, value in entry.items()]
sorted_data = sorted(extracted_data, key=lambda x: x[0])
# Reconstructing the JSON structure with sorted data
self.data_structure = OrderedDict({index: {key: value} for index, (key, value) in enumerate(sorted_data)})
def search_type_across_all_indices(self, search_type: str) -> list[dict[str, any]]:
results = []
# Sort the keys in ascending order
for index, index_data in sorted(self.data_structure.items()):
for object_id, instances in index_data.items():
for instance in instances:
for type, values in instance.items():
if type == search_type and values != "" and values != []:
results.append({"index": index, "object_id": object_id, "values":values})
return results
def to_dict(self):
return {
"data_structure": self.data_structure,
"end_index": self.end_index
}
@classmethod
def from_dict(cls, data_dict):
instance = cls()
instance.data_structure = data_dict.get("data_structure", {})
instance.end_index = data_dict.get("end_index", -1)
return instance
|