File size: 12,547 Bytes
5374a2d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 |
import os
import yaml
import json
import regex
from uuid import uuid4
from datetime import datetime, date
from pydantic import BaseModel
from pydantic_core import PydanticUndefined, ValidationError
from typing import Union, Type, Any, List, Dict, get_origin, get_args
from .logging import logger
def make_parent_folder(path: str):
dir_folder = os.path.dirname(path)
if len(dir_folder.strip()) == 0:
return
if not os.path.exists(dir_folder):
os.makedirs(dir_folder, exist_ok=True)
def generate_id():
return uuid4().hex
def get_timestamp():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def load_json(path: str, type: str="json"):
assert type in ["json", "jsonl"] # only support json or jsonl format
if not os.path.exists(path=path):
logger.error(f"File \"{path}\" does not exists!")
if type == "json":
try:
with open(path, "r", encoding="utf-8") as file:
# outputs = yaml.safe_load(file.read()) # 用yaml.safe_load加载大文件的时候会非常慢
outputs = json.loads(file.read())
except Exception:
logger.error(f"File \"{path}\" is not a valid json file!")
elif type == "jsonl":
outputs = []
with open(path, "r", encoding="utf-8") as fin:
for line in fin:
# outputs.append(yaml.safe_load(line))
outputs.append(json.loads(line))
else:
outputs = []
return outputs
def save_json(data, path: str, type: str="json", use_indent: bool=True) -> str:
"""
save data to a json file
Args:
data: The json data to be saved. It can be a JSON str or a Serializable object when type=="json" or a list of JSON str or Serializable object when type=="jsonl".
path(str): The path of the saved json file.
type(str): The type of the json file, chosen from ["json" or "jsonl"].
use_indent: Whether to use indent when saving the json file.
Returns:
path: the path where the json data is saved.
"""
assert type in ["json", "jsonl"] # only support json or jsonl format
make_parent_folder(path)
if type == "json":
with open(path, "w", encoding="utf-8") as fout:
if use_indent:
fout.write(data if isinstance(data, str) else json.dumps(data, indent=4))
else:
fout.write(data if isinstance(data, str) else json.dumps(data))
elif type == "jsonl":
with open(path, "w", encoding="utf-8") as fout:
for item in data:
fout.write("{}\n".format(item if isinstance(item, str) else json.dumps(item)))
return path
def escape_json_values(string: str) -> str:
def escape_value(match):
raw_value = match.group(1)
raw_value = raw_value.replace('\n', '\\n')
return f'"{raw_value}"'
def fix_json(match):
raw_key = match.group(1)
raw_value = match.group(2)
raw_value = raw_value.replace("\n", "\\n")
raw_value = regex.sub(r'(?<!\\)"', '\\\"', raw_value)
return f'"{raw_key}": "{raw_value}"'
try:
json.loads(string)
return string
except json.JSONDecodeError:
pass
try:
string = regex.sub(r'(?<!\\)"', '\\\"', string) # replace " with \"
pattern_key = r'\\"([^"]+)\\"(?=\s*:\s*)'
string = regex.sub(pattern_key, r'"\1"', string) # replace \\"key\\" with "key"
pattern_value = r'(?<=:\s*)\\"((?:\\.|[^"\\])*)\\"'
string = regex.sub(pattern_value, escape_value, string, flags=regex.DOTALL) # replace \\"value\\" with "value"and change \n to \\n
pattern_nested_json = r'"([^"]+)"\s*:\s*\\"([^"]*\{+[\S\s]*?\}+)[\r\n\\n]*"' # handle nested json in value
string = regex.sub(pattern_nested_json, fix_json, string, flags=regex.DOTALL)
json.loads(string)
return string
except json.JSONDecodeError:
pass
return string
def fix_json_booleans(string: str) -> str:
"""
Finds and replaces isolated "True" and "False" with "true" and "false".
The '\b' in the regex stands for a "word boundary", which ensures that
we only match the full words and not substrings like "True" in "IsTrue".
Args:
json_string (str): The input JSON string.
Returns:
str: The modified JSON string with booleans in lowercase.
"""
# Use re.sub() with a word boundary (\b) to ensure we only match
# the isolated words 'True' and 'False' and not substrings like "True" in "IsTrue"
modified_string = regex.sub(r'\bTrue\b', 'true', string)
modified_string = regex.sub(r'\bFalse\b', 'false', modified_string)
return modified_string
def fix_json(string: str) -> str:
string = fix_json_booleans(string)
string = escape_json_values(string)
return string
def parse_json_from_text(text: str) -> List[str]:
"""
Autoregressively extract JSON object from text
Args:
text (str): a text that includes JSON data
Returns:
List[str]: a list of parsed JSON data
"""
json_pattern = r"""(?:\{(?:[^{}]*|(?R))*\}|\[(?:[^\[\]]*|(?R))*\])"""
pattern = regex.compile(json_pattern, regex.VERBOSE)
matches = pattern.findall(text)
matches = [fix_json(match) for match in matches]
return matches
def parse_xml_from_text(text: str, label: str) -> List[str]:
pattern = rf"<{label}>(.*?)</{label}>"
matches: List[str] = regex.findall(pattern, text, regex.DOTALL)
values = []
if matches:
values = [match.strip() for match in matches]
return values
def parse_data_from_text(text: str, datatype: str):
if datatype == "str":
data = text
elif datatype == "int":
data = int(text)
elif datatype == "float":
data = float(text)
elif datatype == "bool":
data = text.lower() in ("true", "yes", "1", "on", "True")
elif datatype == "list":
data = eval(text)
elif datatype == "dict":
data = eval(text)
else:
# raise ValueError(
# f"Invalid value '{datatype}' is detected for `datatype`. "
# "Available choices: ['str', 'int', 'float', 'bool', 'list', 'dict']"
# )
# logger.warning(f"Unknown datatype '{datatype}' is detected for `datatype`. Return the raw text instead.")
# failed to parse the data, return the raw text
return text
return data
def parse_json_from_llm_output(text: str) -> dict:
"""
Extract JSON str from LLM outputs and convert it to dict.
"""
json_list = parse_json_from_text(text=text)
if json_list:
json_text = json_list[0]
try:
data = yaml.safe_load(json_text)
except Exception:
raise ValueError(f"The following generated text is not a valid JSON string!\n{json_text}")
else:
raise ValueError(f"The follwoing generated text does not contain JSON string!\n{text}")
return data
def extract_code_blocks(text: str, return_type: bool = False) -> Union[List[str], List[tuple]]:
"""
Extract code blocks from text enclosed in triple backticks.
Args:
text (str): The text containing code blocks
return_type (bool): If True, returns tuples of (language, code), otherwise just code
Returns:
Union[List[str], List[tuple]]: Either list of code blocks or list of (language, code) tuples
"""
# Regular expression to match code blocks enclosed in triple backticks
code_block_pattern = r"```((?:[a-zA-Z]*)?)\n*(.*?)\n*```"
# Find all matches in the text
matches = regex.findall(code_block_pattern, text, regex.DOTALL)
# if no code blocks are found, return the text itself
if not matches:
return [(None, text.strip())] if return_type else [text.strip()]
if return_type:
# Return tuples of (language, code)
return [(lang.strip() or None, code.strip()) for lang, code in matches]
else:
# Return just the code blocks
return [code.strip() for _, code in matches]
def remove_repr_quotes(json_string):
pattern = r'"([A-Za-z_]\w*\(.*\))"'
result = regex.sub(pattern, r'\1', json_string)
return result
def custom_serializer(obj: Any):
if isinstance(obj, (bytes, bytearray)):
return obj.decode()
if isinstance(obj, (datetime, date)):
return obj.strftime("%Y-%m-%d %H:%M:%S")
if isinstance(obj, set):
return list(obj)
if hasattr(obj, "read") and hasattr(obj, "name"):
return f"<FileObject name={getattr(obj, 'name', 'unknown')}>"
if callable(obj):
return obj.__name__
if hasattr(obj, "__class__"):
return obj.__repr__() if hasattr(obj, "__repr__") else obj.__class__.__name__
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
# def get_type_name(type):
# """
# return the name of a type.
# """
# origin = get_origin(type)
# args = get_args(type)
# if origin:
# type_name = f"{origin.__name__}[{', '.join(arg.__name__ for arg in args)}]"
# else:
# type_name = getattr(type, "__name__", str(type))
# return type_name
def get_type_name(typ):
origin = get_origin(typ)
if origin is None:
return getattr(typ, "__name__", str(typ))
if origin is Union:
args = get_args(typ)
return " | ".join(get_type_name(arg) for arg in args)
if origin is type:
return f"Type[{get_type_name(args[0])}]" if args else "Type[Any]"
if origin in (list, tuple):
args = get_args(typ)
return f"{origin.__name__}[{', '.join(get_type_name(arg) for arg in args)}]"
if origin is dict:
key_type, value_type = get_args(typ)
return f"dict[{get_type_name(key_type)}, {get_type_name(value_type)}]"
return str(origin)
def get_pydantic_field_types(model: Type[BaseModel]) -> Dict[str, Union[str, dict]]:
field_types = {}
for field_name, field_info in model.model_fields.items():
field_type = field_info.annotation
if hasattr(field_type, "model_fields"):
field_types[field_name] = get_pydantic_field_types(field_type)
else:
type_name = get_type_name(field_type)
field_types[field_name] = type_name
return field_types
def get_pydantic_required_field_types(model: Type[BaseModel]) -> Dict[str, str]:
required_field_types = {}
for field_name, field_info in model.model_fields.items():
if not field_info.is_required():
continue
if field_info.default is not PydanticUndefined or field_info.default_factory is not None:
continue
field_type = field_info.annotation
type_name = get_type_name(field_type)
required_field_types[field_name] = type_name
return required_field_types
def format_pydantic_field_types(field_types: Dict[str, str]) -> str:
output = ", ".join(f"\"{field_name}\": {field_type}" for field_name, field_type in field_types.items())
output = "{" + output + "}"
return output
def get_error_message(errors: List[Union[ValidationError, Exception]]) -> str:
if not isinstance(errors, list):
errors = [errors]
validation_errors, exceptions = [], []
for error in errors:
if isinstance(error, ValidationError):
validation_errors.append(error)
else:
exceptions.append(error)
message = ""
if len(validation_errors) > 0:
message += f" >>>>>>>> {len(validation_errors)} Validation Errors: <<<<<<<<\n\n"
message += "\n\n".join([str(error) for error in validation_errors])
if len(exceptions) > 0:
if len(message) > 0:
message += "\n\n"
message += f">>>>>>>> {len(exceptions)} Exception Errors: <<<<<<<<\n\n"
message += "\n\n".join([str(type(error).__name__) + ": " +str(error) for error in exceptions])
return message
def get_base_module_init_error_message(cls, data: Dict[str, Any], errors: List[Union[ValidationError, Exception]]) -> str:
if not isinstance(errors, list):
errors = [errors]
message = f"Can not instantiate {cls.__name__} from: "
formatted_data = json.dumps(data, indent=4, default=custom_serializer)
formatted_data = remove_repr_quotes(formatted_data)
message += formatted_data
message += "\n\n" + get_error_message(errors)
return message
|