File size: 12,547 Bytes
5374a2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
import os 
import yaml
import json
import regex
from uuid import uuid4
from datetime import datetime, date 
from pydantic import BaseModel
from pydantic_core import PydanticUndefined, ValidationError
from typing import Union, Type, Any, List, Dict, get_origin, get_args

from .logging import logger 

def make_parent_folder(path: str):

    dir_folder = os.path.dirname(path)
    if len(dir_folder.strip()) == 0:
        return
    if not os.path.exists(dir_folder):
        os.makedirs(dir_folder, exist_ok=True)

def generate_id():
    return uuid4().hex

def get_timestamp():
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def load_json(path: str, type: str="json"):
    
    assert type in ["json", "jsonl"] # only support json or jsonl format
    if not os.path.exists(path=path):
        logger.error(f"File \"{path}\" does not exists!")
    
    if type == "json":
        try:
            with open(path, "r", encoding="utf-8") as file:
                # outputs = yaml.safe_load(file.read()) # 用yaml.safe_load加载大文件的时候会非常慢
                outputs = json.loads(file.read())
        except Exception:
            logger.error(f"File \"{path}\" is not a valid json file!")
    
    elif type == "jsonl":
        outputs = []
        with open(path, "r", encoding="utf-8") as fin:
            for line in fin:
                # outputs.append(yaml.safe_load(line))
                outputs.append(json.loads(line))
    else:
        outputs = []
        
    return outputs

def save_json(data, path: str, type: str="json", use_indent: bool=True) -> str:

    """
    save data to a json file

    Args: 
        data: The json data to be saved. It can be a JSON str or a Serializable object when type=="json" or a list of JSON str or Serializable object when type=="jsonl".
        path(str): The path of the saved json file. 
        type(str): The type of the json file, chosen from ["json" or "jsonl"].
        use_indent: Whether to use indent when saving the json file. 
    
    Returns:
        path: the path where the json data is saved. 
    """

    assert type in ["json", "jsonl"] # only support json or jsonl format
    make_parent_folder(path)

    if type == "json":
        with open(path, "w", encoding="utf-8") as fout:
            if use_indent:
                fout.write(data if isinstance(data, str) else json.dumps(data, indent=4))
            else:
                fout.write(data if isinstance(data, str) else json.dumps(data))

    elif type == "jsonl":
        with open(path, "w", encoding="utf-8") as fout:
            for item in data:
                fout.write("{}\n".format(item if isinstance(item, str) else json.dumps(item)))

    return path

def escape_json_values(string: str) -> str:

    def escape_value(match):
        raw_value = match.group(1)
        raw_value = raw_value.replace('\n', '\\n')
        return f'"{raw_value}"'
    
    def fix_json(match):
        raw_key = match.group(1)
        raw_value = match.group(2)
        raw_value = raw_value.replace("\n", "\\n")
        raw_value = regex.sub(r'(?<!\\)"', '\\\"', raw_value)
        return f'"{raw_key}": "{raw_value}"'
    
    try:
        json.loads(string)
        return string
    except json.JSONDecodeError:
        pass

    try:
        string = regex.sub(r'(?<!\\)"', '\\\"', string) # replace " with \"
        pattern_key = r'\\"([^"]+)\\"(?=\s*:\s*)'
        string = regex.sub(pattern_key, r'"\1"', string) # replace \\"key\\" with "key"
        pattern_value = r'(?<=:\s*)\\"((?:\\.|[^"\\])*)\\"'
        string = regex.sub(pattern_value, escape_value, string, flags=regex.DOTALL) # replace \\"value\\" with "value"and change \n to \\n
        pattern_nested_json = r'"([^"]+)"\s*:\s*\\"([^"]*\{+[\S\s]*?\}+)[\r\n\\n]*"' # handle nested json in value
        string = regex.sub(pattern_nested_json, fix_json, string, flags=regex.DOTALL)
        json.loads(string)
        return string
    except json.JSONDecodeError:
        pass
    
    return string

def fix_json_booleans(string: str) -> str:
    """
    Finds and replaces isolated "True" and "False" with "true" and "false".

    The '\b' in the regex stands for a "word boundary", which ensures that
    we only match the full words and not substrings like "True" in "IsTrue".

    Args:
        json_string (str): The input JSON string.

    Returns:
        str: The modified JSON string with booleans in lowercase.
    """
    # Use re.sub() with a word boundary (\b) to ensure we only match
    # the isolated words 'True' and 'False' and not substrings like "True" in "IsTrue"
    modified_string = regex.sub(r'\bTrue\b', 'true', string)
    modified_string = regex.sub(r'\bFalse\b', 'false', modified_string)
    return modified_string


def fix_json(string: str) -> str:
    string = fix_json_booleans(string)
    string = escape_json_values(string)
    return string


def parse_json_from_text(text: str) -> List[str]:
    """
    Autoregressively extract JSON object from text 

    Args: 
        text (str): a text that includes JSON data 
    
    Returns:
        List[str]: a list of parsed JSON data
    """
    json_pattern = r"""(?:\{(?:[^{}]*|(?R))*\}|\[(?:[^\[\]]*|(?R))*\])"""
    pattern = regex.compile(json_pattern, regex.VERBOSE)
    matches = pattern.findall(text)
    matches = [fix_json(match) for match in matches]
    return matches


def parse_xml_from_text(text: str, label: str) -> List[str]:
    pattern = rf"<{label}>(.*?)</{label}>"
    matches: List[str] = regex.findall(pattern, text, regex.DOTALL)
    values = [] 
    if matches:
        values = [match.strip() for match in matches]
    return values

def parse_data_from_text(text: str, datatype: str):

    if datatype == "str":
        data = text
    elif datatype == "int":
        data = int(text)
    elif datatype == "float":
        data = float(text)
    elif datatype == "bool":
        data = text.lower() in ("true", "yes", "1", "on", "True")
    elif datatype == "list":
        data = eval(text)
    elif datatype == "dict":
        data = eval(text)
    else:
        # raise ValueError(
        #     f"Invalid value '{datatype}' is detected for `datatype`. "
        #     "Available choices: ['str', 'int', 'float', 'bool', 'list', 'dict']"
        # )
        # logger.warning(f"Unknown datatype '{datatype}' is detected for `datatype`. Return the raw text instead.")
        # failed to parse the data, return the raw text
        return text 
    return data

def parse_json_from_llm_output(text: str) -> dict:
    """
    Extract JSON str from LLM outputs and convert it to dict. 
    """
    json_list = parse_json_from_text(text=text)
    if json_list:
        json_text = json_list[0]
        try:
            data = yaml.safe_load(json_text)
        except Exception:
            raise ValueError(f"The following generated text is not a valid JSON string!\n{json_text}")
    else:
        raise ValueError(f"The follwoing generated text does not contain JSON string!\n{text}")
    return data

def extract_code_blocks(text: str, return_type: bool = False) -> Union[List[str], List[tuple]]:
    """
    Extract code blocks from text enclosed in triple backticks.
    
    Args:
        text (str): The text containing code blocks
        return_type (bool): If True, returns tuples of (language, code), otherwise just code
        
    Returns:
        Union[List[str], List[tuple]]: Either list of code blocks or list of (language, code) tuples
    """
    # Regular expression to match code blocks enclosed in triple backticks
    code_block_pattern = r"```((?:[a-zA-Z]*)?)\n*(.*?)\n*```"
    # Find all matches in the text
    matches = regex.findall(code_block_pattern, text, regex.DOTALL)

    # if no code blocks are found, return the text itself 
    if not matches:
        return [(None, text.strip())] if return_type else [text.strip()]
    
    if return_type:
        # Return tuples of (language, code)
        return [(lang.strip() or None, code.strip()) for lang, code in matches]
    else:
        # Return just the code blocks
        return [code.strip() for _, code in matches]

def remove_repr_quotes(json_string):
    pattern = r'"([A-Za-z_]\w*\(.*\))"'
    result = regex.sub(pattern, r'\1', json_string)
    return result

def custom_serializer(obj: Any): 

    if isinstance(obj, (bytes, bytearray)):
        return obj.decode()
    if isinstance(obj, (datetime, date)):
        return obj.strftime("%Y-%m-%d %H:%M:%S")
    if isinstance(obj, set):
        return list(obj)
    if hasattr(obj, "read") and hasattr(obj, "name"):
        return f"<FileObject name={getattr(obj, 'name', 'unknown')}>"
    if callable(obj):
        return obj.__name__
    if hasattr(obj, "__class__"):
        return obj.__repr__() if hasattr(obj, "__repr__") else obj.__class__.__name__
    
    raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")

# def get_type_name(type):
#     """
#     return the name of a type.
#     """
#     origin = get_origin(type)
#     args = get_args(type)
#     if origin:
#         type_name = f"{origin.__name__}[{', '.join(arg.__name__ for arg in args)}]"
#     else:
#         type_name = getattr(type, "__name__", str(type))

#     return type_name

def get_type_name(typ):

    origin = get_origin(typ)
    if origin is None:
        return getattr(typ, "__name__", str(typ))
    
    if origin is Union:
        args = get_args(typ)
        return " | ".join(get_type_name(arg) for arg in args)
    
    if origin is type:
        return f"Type[{get_type_name(args[0])}]" if args else "Type[Any]"
    
    if origin in (list, tuple):
        args = get_args(typ)
        return f"{origin.__name__}[{', '.join(get_type_name(arg) for arg in args)}]"
    
    if origin is dict:
        key_type, value_type = get_args(typ)
        return f"dict[{get_type_name(key_type)}, {get_type_name(value_type)}]"
    
    return str(origin)

def get_pydantic_field_types(model: Type[BaseModel]) -> Dict[str, Union[str, dict]]:

    field_types = {}
    for field_name, field_info in model.model_fields.items():
        field_type = field_info.annotation
        if hasattr(field_type, "model_fields"):
            field_types[field_name] = get_pydantic_field_types(field_type)
        else:
            type_name = get_type_name(field_type)           
            field_types[field_name] = type_name
    
    return field_types

def get_pydantic_required_field_types(model: Type[BaseModel]) -> Dict[str, str]:

    required_field_types = {}
    for field_name, field_info in model.model_fields.items():
        if not field_info.is_required():
            continue
        if field_info.default is not PydanticUndefined or field_info.default_factory is not None:
            continue
        field_type = field_info.annotation
        type_name = get_type_name(field_type)
        required_field_types[field_name] = type_name
    
    return required_field_types

def format_pydantic_field_types(field_types: Dict[str, str]) -> str:

    output = ", ".join(f"\"{field_name}\": {field_type}" for field_name, field_type in field_types.items())
    output = "{" + output + "}"
    return output

def get_error_message(errors: List[Union[ValidationError, Exception]]) -> str: 

    if not isinstance(errors, list):
        errors = [errors]
    
    validation_errors, exceptions = [], [] 
    for error in errors:
        if isinstance(error, ValidationError):
            validation_errors.append(error)
        else:
            exceptions.append(error)
    
    message = ""
    if len(validation_errors) > 0:
        message += f" >>>>>>>> {len(validation_errors)} Validation Errors: <<<<<<<<\n\n"
        message += "\n\n".join([str(error) for error in validation_errors])
    if len(exceptions) > 0:
        if len(message) > 0:
            message += "\n\n"
        message += f">>>>>>>> {len(exceptions)} Exception Errors: <<<<<<<<\n\n"
        message += "\n\n".join([str(type(error).__name__) + ": " +str(error) for error in exceptions])
    return message

def get_base_module_init_error_message(cls, data: Dict[str, Any], errors: List[Union[ValidationError, Exception]]) -> str:

    if not isinstance(errors, list):
        errors = [errors]
    
    message = f"Can not instantiate {cls.__name__} from: "
    formatted_data = json.dumps(data, indent=4, default=custom_serializer)
    formatted_data = remove_repr_quotes(formatted_data)
    message += formatted_data
    message += "\n\n" + get_error_message(errors)
    return message