Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import json | |
| import itertools | |
| import math | |
| import joblib | |
| from typing import List | |
| import pandas as pd | |
| from loguru import logger | |
| def parse_json_garbage(s, start="{", end="}"): | |
| """Parse JSON string without comments | |
| Argument | |
| s: str | |
| start: str | |
| end: str | |
| Return | |
| json_obj: dict | |
| """ | |
| s = s[next(idx for idx, c in enumerate(s) if c in start):] | |
| # print(f"fix head -> {s}") | |
| s = s[:next(idx for idx, c in enumerate(s) if c in end)+1] | |
| # print(f"fix tail -> {s}") | |
| if s.startswith("json"): | |
| s = s[4:] | |
| try: | |
| return json.loads(re.sub("[//#].*","",s,flags=re.MULTILINE)) | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"Error parsing JSON (trying another regex...): {e}") | |
| return json.loads(re.sub("^[//#].*","",s,flags=re.MULTILINE)) | |
| def merge_results( results: list, dataframe_columns: list, list_columns: list): | |
| """ | |
| Argument | |
| results: a list of dataframes | |
| dataframe_columns: list | |
| list_columns: list | |
| Return | |
| merged_results: dict | |
| """ | |
| assert len(results) > 0, "No results to merge" | |
| merged_results = {} | |
| for result in results: | |
| for key in dataframe_columns: | |
| mer_res = pd.concat([ r[key] for r in results], ignore_index=True) | |
| merged_results[key] = mer_res | |
| for key in list_columns: | |
| mer_res = list(itertools.chain(*[ r[key] for r in results])) | |
| merged_results[key] = mer_res | |
| return merged_results | |
| def split_dataframe( df: pd.DataFrame, n_processes: int = 4) -> list: | |
| """ | |
| """ | |
| n = df.shape[0] | |
| n_per_process = max( math.ceil(n / n_processes), 1) | |
| return [ df.iloc[i:i+n_per_process] for i in range(0, n, n_per_process)] | |
| def combine_results( results: pd.DataFrame, combined_results_path: str, src_column: str = 'classified_category', tgt_column: str = 'category', strategy: str = 'replace'): | |
| """ | |
| Argument | |
| classified_results_df: dataframe | |
| combined_results_path | |
| src_column: str | |
| strategy: str, 'replace' or 'patch' | |
| Return | |
| combined_results: dataframe | |
| """ | |
| if not os.path.exists(combined_results_path): | |
| combined_results = results.copy() | |
| if strategy == 'replace': | |
| condition = (combined_results[tgt_column]=='') | (combined_results[src_column]!=combined_results[tgt_column]) | |
| combined_results.loc[ condition, tgt_column] = combined_results[condition][src_column].values | |
| elif strategy == 'patch': | |
| condition = (combined_results[tgt_column]=='') | |
| combined_results.loc[ condition, tgt_column] = combined_results[condition][src_column].values | |
| else: | |
| raise Exception(f"Strategy {strategy} not implemented") | |
| with open( combined_results_path, "wb") as f: | |
| joblib.dump( combined_results, f) | |
| else: | |
| with open( combined_results_path, "rb") as f: | |
| combined_results = joblib.load(f) | |
| return combined_results | |
| def split_dict( information: dict | List[dict], keys1: List[str], keys2: List[str]): | |
| """[ { key1: value1, key2: value2}, { key1: value1, key2: value2}] -> [ {key1: value1}, {key1: value1}], [{key2: value2, key2: value2}] | |
| Argument | |
| information: dict | List[dict], dim -> N | |
| keys1: List[str], dim -> K1 | |
| keys2: List[str], dim -> K2 | |
| Example: | |
| >> split_dict( [ {"a": 1, "b":2, "c": 3}, {"a": 1, "b":2, "c": 3}, {"a": 1, "b":2, "c": 3}], ['a','b'], ['c']) | |
| >> ( [{'a': 1, 'b': 2}, {'a': 1, 'b': 2}, {'a': 1, 'b': 2}], [{'c': 3}, {'c': 3}, {'c': 3}] ) | |
| """ | |
| assert len(keys1)>0 and len(keys2)>0 | |
| results1, results2 = [], [] | |
| if isinstance( information, dict): | |
| information = [ information] | |
| for info in information: # N | |
| split_results1 = {} # K1 | |
| for key in keys1: | |
| if key in info: | |
| split_results1[key] = info[key] | |
| else: | |
| split_results1[key] = None | |
| results1.append( split_results1) | |
| split_results2 = {} # K2 | |
| for key in keys2: | |
| if key in info: | |
| split_results2[key] = info[key] | |
| else: | |
| split_results2[key] = None | |
| results2.append( split_results2) | |
| # results.append( [ split_results1, split_results2]) | |
| assert len(results1)==len(results2) | |
| if len(results1)==1: | |
| return results1[0], results2[0] | |
| return results1, results2 | |
| def format_df( df: pd.DataFrame, input_column: str = 'evidence', output_column: str = 'formatted_evidence', format_func: str = lambda x: x): | |
| """ | |
| Argument | |
| df: `evidence`, `result` | |
| input_column: | |
| output_column: | |
| format_func: | |
| Return | |
| formatted_df: dataframe of `formatted_evidence` | |
| """ | |
| formatted_df = df.copy() | |
| formatted_df[output_column] = formatted_df[input_column].apply(format_func) | |
| return formatted_df | |
| def clean_quotes( text: str): | |
| """ | |
| """ | |
| return text.strip().replace("\u3000","").replace("\r","").replace("\"", "").replace("'", "") | |
| def compose_query( address, name, with_index: bool = True, exclude: str = "-inurl:twincn.com -inurl:findcompany.com.tw -inurl:iyp.com.tw -inurl:twypage.com -inurl:alltwcompany.com -inurl:zhupiter.com -inurl:twinc.com.tw", use_exclude: bool = True): | |
| """ | |
| Argumemnt | |
| # d: series with d[1]: 地址, d[4]: 營業人名稱 # | |
| address: str | |
| name: str | |
| with_index: bool | |
| Return | |
| query: `縣市` `營業人名稱` | |
| """ | |
| # if with_index: # .itertuples() | |
| # query = f"{d[1][:3]} {d[4]}" | |
| # else: | |
| # query = f"{d[0][:3]} {d[3]}" | |
| if use_exclude: | |
| query = f"{address[:3]} {name} {exclude}" | |
| else: | |
| query = f"{address[:3]} {name}" | |
| return query | |
| def reverse_category2supercategory(category2supercategory): | |
| """ | |
| Argument | |
| category2supercategory: dict | |
| Return | |
| supercategory2category: dict | |
| """ | |
| supercategory2category = {} | |
| for key, value in category2supercategory.items(): | |
| if value not in supercategory2category: | |
| supercategory2category[value] = [key] | |
| else: | |
| supercategory2category[value].append(key) | |
| return supercategory2category | |
| def concat_df( list_df: List[pd.DataFrame], axis: int = 0): | |
| """ | |
| Argument | |
| list_df: List[pd.DataFrame] | |
| axis: int | |
| Return | |
| df: pd.DataFrame | |
| """ | |
| assert len(list_df)>0, "Empty list of dataframes" | |
| if len(list_df)==1: | |
| return list_df[0] | |
| return pd.concat( list_df, axis=axis) | |