Spaces:
Sleeping
Sleeping
| import json | |
| from zipfile import ZipFile | |
| import gradio as gr | |
| import pandas as pd | |
| def create_output_attribute(attribute_name, attribute_value, attribute_language): | |
| out_dict = { | |
| "coordinates": "None", | |
| "entity": attribute_name, | |
| "probability": 1.0, | |
| "value": attribute_value, | |
| "model": "NER", | |
| "language": attribute_language | |
| } | |
| return out_dict | |
| def create_allergen_attribute(allergen_type_code, allergen_containment_level='None'): | |
| out_dict = { | |
| "coordinates": "None", | |
| "probability": 1.0, | |
| "model": "rule-based", | |
| "entity": "Allergen", | |
| "allergenTypeCode": allergen_type_code, | |
| "levelOfContainmentCode": allergen_containment_level | |
| } | |
| return out_dict | |
| def extract_attribute_value_from_df(df, attribute_id): | |
| try: | |
| return df[df['attribute_id'] == attribute_id].iloc[0]['attribute_value'] | |
| except IndexError: | |
| return None | |
| def extract_value_and_unit(value_in): | |
| if value_in is None: | |
| return value_in, value_in | |
| split = value_in.split(' ') | |
| if len(split) == 2: | |
| return split[0], split[1] | |
| else: | |
| return value_in, None | |
| def process_normal_attributes(pxm_output, certifai_mapping, desired_language): | |
| # Process the 'simple' attributes | |
| output_list = [] | |
| gtin = None | |
| hierarchy_attributes = [] | |
| for attribute in pxm_output['data']: | |
| attribute_id = str(attribute['attribute_id']) | |
| attribute_language = attribute['locale']['value'] | |
| attribute_value = str(attribute['value']) | |
| if attribute['path'] is not None: | |
| hierarchy_attributes.append({'attribute_id': attribute_id, 'attribute_value': attribute_value, | |
| 'attribute_language': attribute_language, 'path': attribute['path']}) | |
| if attribute_id in certifai_mapping.keys() and attribute_language in desired_language and attribute['path'] is None: | |
| attribute_name = certifai_mapping[attribute_id] | |
| output_list.append(create_output_attribute(attribute_name, attribute_value, attribute_language)) | |
| # Filter out the gtin | |
| if attribute_id == '3603': | |
| gtin = attribute_value | |
| return output_list, hierarchy_attributes, gtin | |
| def extract_path_hierarchy(hierarchy_attributes_df): | |
| # Preprocess the path column from the attributes df to extract the path hierarchy | |
| path_list = [] | |
| for element in hierarchy_attributes_df['path'].str.split('.'): | |
| if len(element) == 1: | |
| path_list.append([element[0], None, None]) | |
| elif len(element) == 2: | |
| path_list.append([element[0], element[1], None]) | |
| elif len(element) == 3: | |
| path_list.append(element) | |
| hierarchy_attributes_df[['path.0', 'path.1', 'path.2']] = path_list | |
| return hierarchy_attributes_df | |
| def process_allergens(hierarchy_attributes_df): | |
| # Allergens | |
| allergens_df = hierarchy_attributes_df[(hierarchy_attributes_df['attribute_id'].isin(['5184', '5191']))] | |
| allergens_list = [] | |
| if len(allergens_df) > 0: | |
| allergens_df = allergens_df[allergens_df['attribute_language'] == allergens_df.iloc[0]['attribute_language']] | |
| for path, group in allergens_df.groupby('path'): | |
| allergen_type_code = group[group['attribute_id'] == '5184'].iloc[0]['attribute_value'] | |
| allergen_containment_level_code = group[group['attribute_id'] == '5191'].iloc[0]['attribute_value'] | |
| allergens_list.append(create_allergen_attribute(allergen_type_code, allergen_containment_level_code)) | |
| allergen_attribute = {'entity': 'allergens', 'values': allergens_list, 'model': 'rule-based'} | |
| return allergen_attribute | |
| # def process_communication_channels(hierarchy_attributes_df): | |
| # # Communication Channels | |
| # communication_channels_df = hierarchy_attributes_df[ | |
| # (hierarchy_attributes_df['attribute_id'].isin(['2400', '2401']))] | |
| # communication_channels_df = communication_channels_df[ | |
| # communication_channels_df['attribute_language'] == communication_channels_df.iloc[0]['attribute_language']] | |
| # communication_channels_list = [] | |
| # for path_1, group in communication_channels_df.groupby('path'): | |
| # communication_channel_dict = { | |
| # "coordinates": "None", | |
| # "probability": 1.0, | |
| # "model": "rule-based", | |
| # "entity": "CommunicationChannel", | |
| # "communicationChannelCode": extract_attribute_value_from_df(group, '2400'), | |
| # "communicationValue": extract_attribute_value_from_df(group, '2401') | |
| # } | |
| # communication_channels_list.append(communication_channel_dict) | |
| # communication_channel_attribute = {'entity': 'communicationChannels', 'values': communication_channels_list} | |
| # return communication_channel_attribute | |
| def process_communication_channels(hierarchy_attributes_df): | |
| # Communication Channels | |
| communication_channels_df = hierarchy_attributes_df[ | |
| (hierarchy_attributes_df['attribute_id'].isin(['4896', '4897', "4898", "4900", "4901"]))] | |
| communication_channels_df = communication_channels_df[ | |
| communication_channels_df['attribute_language'] == communication_channels_df.iloc[0]['attribute_language']] | |
| communication_channels_list = [] | |
| for path_1, group in communication_channels_df.groupby('path'): | |
| communication_channel_code = extract_attribute_value_from_df(group, '4900') | |
| communication_value = extract_attribute_value_from_df(group, '4901') | |
| if communication_channel_code and communication_value: | |
| communication_channel_dict = { | |
| "path": group["path.0"].iloc[0], | |
| "coordinates": "None", | |
| "probability": 1.0, | |
| "model": "rule-based", | |
| "entity": "CommunicationChannel", | |
| "communicationChannelCode": communication_channel_code, | |
| "communicationValue": communication_value, | |
| } | |
| communication_channels_list.append(communication_channel_dict) | |
| return communication_channels_list | |
| def process_contact_information(hierarchy_attributes_df, communication_channels_list): | |
| # Contact information | |
| contact_information_df = hierarchy_attributes_df[ | |
| (hierarchy_attributes_df['attribute_id'].isin(['4896', '4897', "4898"]))] | |
| contact_information_df = contact_information_df[ | |
| contact_information_df['attribute_language'] == contact_information_df.iloc[0]['attribute_language']] | |
| contact_information_list = [] | |
| for i, (path_1, group) in enumerate(contact_information_df.groupby('path')): | |
| contact_name = extract_attribute_value_from_df(group, '4896') | |
| contact_address = extract_attribute_value_from_df(group, '4897') | |
| contact_type_code = extract_attribute_value_from_df(group, '4898') | |
| if contact_name and contact_address and contact_type_code: | |
| communication_channels = [dic for dic in communication_channels_list if dic["path"] == str(i)] | |
| contact_information_dict = { | |
| "contactInformation": str(i), | |
| "contactName": contact_name, | |
| "contact_address": contact_address, | |
| "contactTypeCode": contact_type_code, | |
| "communicationChannels": communication_channels, | |
| } | |
| contact_information_list.append(contact_information_dict) | |
| contact_information_out = {'entity': 'contact_information', 'values': contact_information_list} | |
| return contact_information_out | |
| def process_preparation_instructions(hierarchy_attributes_df): | |
| # Preparation instructions | |
| preparation_instructions_df = hierarchy_attributes_df[ | |
| (hierarchy_attributes_df['attribute_id'].isin(['5206', '5207']))] | |
| preparation_instructions_df = preparation_instructions_df[ | |
| preparation_instructions_df['attribute_language'] == preparation_instructions_df.iloc[0]['attribute_language']] | |
| preparation_instructions_list = [] | |
| for path_1, group in preparation_instructions_df.groupby('path'): | |
| preparation_instructions_dict = { | |
| "coordinates": "None", | |
| "probability": 1.0, | |
| "model": "rule-based", | |
| "entity": "PreparationInstruction", | |
| "preparationTypeCode": extract_attribute_value_from_df(group, '5206'), | |
| "preparationInstructions": extract_attribute_value_from_df(group, '5207') | |
| } | |
| preparation_instructions_list.append(preparation_instructions_dict) | |
| preparation_instructions_attribute = {'entity': 'preparationInstructions', 'values': preparation_instructions_list} | |
| return preparation_instructions_attribute | |
| def process_diet_information(hierarchy_attributes_df): | |
| # Diet Information | |
| diet_information_df = hierarchy_attributes_df[ | |
| (hierarchy_attributes_df['attribute_id'].isin(['5203', '5204']))] | |
| diet_information_df = diet_information_df[ | |
| diet_information_df['attribute_language'] == diet_information_df.iloc[0]['attribute_language']] | |
| diet_information_list = [] | |
| for path_1, group in diet_information_df.groupby('path'): | |
| diet_information_dict = { | |
| "coordinates": "None", | |
| "probability": 1.0, | |
| "model": "rule-based", | |
| "entity": "DietInformation", | |
| "dietTypeCode": extract_attribute_value_from_df(group, '5203'), | |
| "isDietTypeMarkedOnPackage": extract_attribute_value_from_df(group, '5204') | |
| } | |
| diet_information_list.append(diet_information_dict) | |
| diet_information_attribute = {'entity': 'dietInformation', 'values': diet_information_list} | |
| return diet_information_attribute | |
| def process_claim_element_information(hierarchy_attributes_df): | |
| # Claim element information | |
| claim_element_information_df = hierarchy_attributes_df[ | |
| (hierarchy_attributes_df['attribute_id'].isin(['5199', '5200', '5201']))] | |
| claim_element_information_df = claim_element_information_df[ | |
| claim_element_information_df['attribute_language'] == claim_element_information_df.iloc[0]['attribute_language']] | |
| claim_element_information_list = [] | |
| for path_1, group in claim_element_information_df.groupby('path'): | |
| claim_element_information_dict = { | |
| "coordinates": "None", | |
| "probability": 1.0, | |
| "model": "rule-based", | |
| "entity": "ClaimElementInformation", | |
| "claimElementCode": extract_attribute_value_from_df(group, '5199'), | |
| "claimTypeCode": extract_attribute_value_from_df(group, '5200'), | |
| "claimMarkedOnPackage": extract_attribute_value_from_df(group, '5201'), | |
| } | |
| claim_element_information_list.append(claim_element_information_dict) | |
| claim_element_information_attribute = {'entity': 'claimElementInformation', 'values': claim_element_information_list} | |
| return claim_element_information_attribute | |
| def process_nutrient_table(hierarchy_attributes_df): | |
| # Nutrient table | |
| nutrient_table_df = hierarchy_attributes_df[ | |
| (hierarchy_attributes_df['attribute_id'].isin(['5211', '5212', '5219', '5215', '5216', '5217']))] | |
| nutrient_table_list = [] | |
| if len(nutrient_table_df) > 0: | |
| preferred_language = 'en-GB' | |
| nutrient_table_df = nutrient_table_df[nutrient_table_df['attribute_language'] == nutrient_table_df.iloc[0]['attribute_language']] | |
| for path_0, group in nutrient_table_df.groupby('path.0'): | |
| content = extract_attribute_value_from_df(group, '5211') | |
| nutrient_value, nutrient_unit = extract_value_and_unit(content) | |
| nutrient_basis_quantity_dict = { | |
| "nutrientBasisQuantityValue": nutrient_value, | |
| "nutrientBasisQuantityMeasurementUnitCode": nutrient_unit, | |
| "preparationStateCode": extract_attribute_value_from_df(group, '5212')} | |
| nutrient_values_list = [] | |
| for path_1, sub_group in group.sort_values(by='path.1').groupby('path.1'): | |
| content = extract_attribute_value_from_df(sub_group, '5219') | |
| nutrient_value, nutrient_unit = extract_value_and_unit(content) | |
| nutrient_values_dict = { | |
| "coordinates": "", | |
| "probability": 1.0, | |
| "nutrientTypeCode": extract_attribute_value_from_df(sub_group, '5215'), | |
| "quantityContained": { | |
| "measurementUnitCode": nutrient_unit, | |
| "value": nutrient_value, | |
| "precisionCode": extract_attribute_value_from_df(sub_group, '5216') | |
| }, | |
| "dailyValueIntakePercent": { | |
| 'value': extract_attribute_value_from_df(sub_group, '5217'), | |
| "precisionCode": "APPROXIMATELY" | |
| } | |
| } | |
| nutrient_values_list.append(nutrient_values_dict) | |
| nutrient_basis_quantity_dict['values'] = nutrient_values_list | |
| nutrient_table_list.append(nutrient_basis_quantity_dict) | |
| nutrient_attribute = {"coordinates": "None", | |
| "entity": "nutrients", | |
| "probability": 1.0, | |
| "value": nutrient_table_list, | |
| "model": "table-rule-based"} | |
| return nutrient_attribute | |
| def pad_gtin(gtin, desired_length=14): | |
| for i in range(desired_length - len(gtin)): | |
| gtin = '0' + gtin | |
| return gtin | |
| # Load in the attribute name mappings | |
| with open("field_mapping.json") as f: | |
| certifai_mapping = json.load(f) | |
| def process_file(filename, pxm_output): | |
| # Process a JSON file and map it to the Certifai output format | |
| desired_language = ["en-GB"] | |
| # Get the languages to extract | |
| language = filename.strip(".json").split("_")[1] | |
| if language not in desired_language: | |
| desired_language.append(language) | |
| # Process the normal attributes | |
| output_list, hierarchy_attributes, gtin = process_normal_attributes( | |
| pxm_output, certifai_mapping, desired_language | |
| ) | |
| # Process the complex attributes | |
| hierarchy_attributes_df = pd.DataFrame(hierarchy_attributes) | |
| if len(hierarchy_attributes_df) > 0: | |
| hierarchy_attributes_df = extract_path_hierarchy(hierarchy_attributes_df) | |
| # Process the allergens | |
| allergen_attribute = process_allergens(hierarchy_attributes_df) | |
| output_list.append(allergen_attribute) | |
| # # Process the communication channels | |
| # try: | |
| # communication_channel_attribute = process_communication_channels(hierarchy_attributes_df) | |
| # output_list.append(communication_channel_attribute) | |
| # except: | |
| # pass | |
| # Process the communication channels | |
| try: | |
| communication_channels_list = process_communication_channels(hierarchy_attributes_df) | |
| except Exception as e: | |
| print(e) | |
| communication_channels_list = [] | |
| # Process contact information | |
| try: | |
| contact_information_attribute = process_contact_information(hierarchy_attributes_df, communication_channels_list) | |
| output_list.append(contact_information_attribute) | |
| except Exception as e: | |
| print(e) | |
| pass | |
| # Process preparation instructions | |
| try: | |
| preparation_instructions_attribute = process_preparation_instructions(hierarchy_attributes_df) | |
| output_list.append(preparation_instructions_attribute) | |
| except Exception as e: | |
| print(e) | |
| pass | |
| # Process diet information | |
| try: | |
| diet_information_attribute = process_diet_information(hierarchy_attributes_df) | |
| output_list.append(diet_information_attribute) | |
| except Exception as e: | |
| print(e) | |
| pass | |
| # Process claim element information | |
| try: | |
| claim_element_information_attribute = process_claim_element_information(hierarchy_attributes_df) | |
| output_list.append(claim_element_information_attribute) | |
| except Exception as e: | |
| print(e) | |
| pass | |
| # Process the nutrient table | |
| nutrient_table_attribute = process_nutrient_table(hierarchy_attributes_df) | |
| output_list.append(nutrient_table_attribute) | |
| output_dict = {"attributes": output_list, "text": "OCR Output:"} | |
| gtin = pad_gtin(gtin) | |
| return gtin, output_dict | |
| def create_demo_data(files): | |
| gtins = [] | |
| for _, file in enumerate(files): | |
| pxm_output = json.load(open(file.orig_name, "r")) | |
| gtin, output_dict = process_file(filename=file.orig_name, pxm_output=pxm_output) | |
| with open(f"{gtin}.json", "w") as my_file: | |
| json.dump(output_dict, my_file) | |
| gtins.append(gtin) | |
| with ZipFile("demo_data.zip", "w") as zip_object: | |
| for gtin in gtins: | |
| zip_object.write(f"{gtin}.json") | |
| return "demo_data.zip" | |
| output = gr.Textbox(label="Result") | |
| gr.Interface( | |
| fn=create_demo_data, | |
| inputs=gr.File(file_count="multiple", file_types=[".json"]), | |
| outputs="file", | |
| title="Demo data creator", | |
| ).launch() | |