Spaces:
Runtime error
Runtime error
| from sentence_transformers import SentenceTransformer, util | |
| import json | |
| import time | |
| import pandas as pd | |
| import numpy as np | |
| import pickle | |
| import chromadb | |
| from chromadb.config import Settings | |
| from chromadb.utils import embedding_functions | |
| from chromadb.db.clickhouse import NoDatapointsException | |
| def prepare_cd(conceptDescriptions): | |
| df_cd = pd.DataFrame( | |
| columns=["SemanticId", "Definition", "PreferredName", "Datatype", "Unit"] | |
| ) | |
| # In den leeren DF werden alle Concept Descriptions eingelesen | |
| for cd in conceptDescriptions: | |
| semantic_id = cd["identification"]["id"] | |
| data_spec = cd["embeddedDataSpecifications"][0]["dataSpecificationContent"] | |
| preferred_name = data_spec["preferredName"] | |
| short_name = data_spec["shortName"] | |
| if len(preferred_name) > 1: | |
| for name_variant in preferred_name: | |
| if ( | |
| name_variant["language"] == "EN" | |
| or name_variant["language"] == "en" | |
| or name_variant["language"] == "EN?" | |
| ): | |
| name = name_variant["text"] | |
| elif len(preferred_name) == 1: | |
| name = preferred_name[0]["text"] | |
| elif len(preferred_name) == 0: | |
| short_name = data_spec["shortName"] | |
| if len(short_name) == 0: | |
| name = "NaN" | |
| else: | |
| name = short_name[0]["text"] | |
| definition = data_spec["definition"] | |
| if len(definition) > 1: | |
| for definition_variant in definition: | |
| if ( | |
| definition_variant["language"] == "EN" | |
| or definition_variant["language"] == "en" | |
| or definition_variant["language"] == "EN?" | |
| ): | |
| chosen_def = definition_variant["text"] | |
| elif len(definition) == 1: | |
| chosen_def = definition[0]["text"] | |
| elif len(definition) == 0: | |
| chosen_def = "NaN" | |
| if data_spec["dataType"] == "": | |
| datatype = "NaN" | |
| else: | |
| datatype = data_spec["dataType"] | |
| if data_spec["unit"] == "": | |
| unit = "NaN" | |
| else: | |
| unit = data_spec["unit"] | |
| new_entry = pd.DataFrame( | |
| { | |
| "SemanticId": semantic_id, | |
| "Definition": chosen_def, | |
| "PreferredName": name, | |
| "Datatype": datatype, | |
| "Unit": unit, | |
| }, | |
| index=[0], | |
| ) | |
| df_cd = pd.concat([df_cd, new_entry], ignore_index=True) | |
| return df_cd | |
| def get_values(submodel_element): | |
| # Auslesen der Submodel Element Werte | |
| se_type = submodel_element["modelType"]["name"] | |
| se_semantic_id = submodel_element["semanticId"]["keys"][0]["value"] | |
| se_semantic_id_local = submodel_element["semanticId"]["keys"][0]["local"] | |
| se_id_short = submodel_element["idShort"] | |
| value = [] | |
| se_value = submodel_element["value"] | |
| value.append(se_value) | |
| return se_type, se_semantic_id, se_semantic_id_local, se_id_short, value | |
| def get_concept_description(semantic_id, df_cd): | |
| cd_content = df_cd.loc[df_cd["SemanticId"] == semantic_id] | |
| if cd_content.empty: | |
| cd_content = pd.DataFrame( | |
| { | |
| "SemanticId": semantic_id, | |
| "Definition": "NaN", | |
| "PreferredName": "NaN", | |
| "Datatype": "NaN", | |
| "Unit": "NaN", | |
| }, | |
| index=[0], | |
| ) | |
| cd_content = cd_content.iloc[0] | |
| return cd_content | |
| def get_values_sec( | |
| df_cd, | |
| content, | |
| df, | |
| aas_id, | |
| aas_name, | |
| submodel_id, | |
| submodel_name, | |
| submodel_semantic_id, | |
| ): | |
| collection_values = content[0]["value"] | |
| for element in collection_values: | |
| content = [] | |
| content.append(element) | |
| se_type, se_semantic_id, se_semantic_id_local, se_id_short, value = get_values( | |
| element | |
| ) | |
| if se_type == "SubmodelElementCollection": | |
| if se_semantic_id_local == True: | |
| cd_content = get_concept_description(se_semantic_id, df_cd) | |
| definition = cd_content["Definition"] | |
| preferred_name = cd_content["PreferredName"] | |
| datatype = cd_content["Datatype"] | |
| unit = cd_content["Unit"] | |
| else: | |
| definition = "NaN" | |
| preferred_name = "NaN" | |
| datatype = "NaN" | |
| unit = "NaN" | |
| new_row = pd.DataFrame( | |
| { | |
| "AASId": aas_id, | |
| "AASIdShort": aas_name, | |
| "SubmodelId": submodel_id, | |
| "SubmodelName": submodel_name, | |
| "SubmodelSemanticId": submodel_semantic_id, | |
| "SEContent": content, | |
| "SESemanticId": se_semantic_id, | |
| "SEModelType": se_type, | |
| "SEIdShort": se_id_short, | |
| "SEValue": value, | |
| "Definition": definition, | |
| "PreferredName": preferred_name, | |
| "Datatype": datatype, | |
| "Unit": unit, | |
| } | |
| ) | |
| df = pd.concat([df, new_row], ignore_index=True) | |
| content = [] | |
| content.append(element) | |
| # Rekursive Funktion -> so oft durchlaufen bis unterste Ebene der Collections erreicht ist, so werden verschachteltet SECs bis zum Ende ausgelesen | |
| df = get_values_sec( | |
| df_cd, | |
| content, | |
| df, | |
| aas_id, | |
| aas_name, | |
| submodel_id, | |
| submodel_name, | |
| submodel_semantic_id, | |
| ) | |
| else: | |
| if se_semantic_id_local == True: | |
| cd_content = get_concept_description(se_semantic_id, df_cd) | |
| definition = cd_content["Definition"] | |
| preferred_name = cd_content["PreferredName"] | |
| datatype = cd_content["Datatype"] | |
| unit = cd_content["Unit"] | |
| else: | |
| definition = "NaN" | |
| preferred_name = "NaN" | |
| datatype = "NaN" | |
| unit = "NaN" | |
| new_row = pd.DataFrame( | |
| { | |
| "AASId": aas_id, | |
| "AASIdShort": aas_name, | |
| "SubmodelId": submodel_id, | |
| "SubmodelName": submodel_name, | |
| "SubmodelSemanticId": submodel_semantic_id, | |
| "SEContent": content, | |
| "SESemanticId": se_semantic_id, | |
| "SEModelType": se_type, | |
| "SEIdShort": se_id_short, | |
| "SEValue": value, | |
| "Definition": definition, | |
| "PreferredName": preferred_name, | |
| "Datatype": datatype, | |
| "Unit": unit, | |
| } | |
| ) | |
| df = pd.concat([df, new_row], ignore_index=True) | |
| return df | |
| def set_up_metadata(metalabel, df): | |
| datatype_mapping = { | |
| "boolean": "BOOLEAN", | |
| "string": "STRING", | |
| "string_translatable": "STRING", | |
| "translatable_string": "STRING", | |
| "non_translatable_string": "STRING", | |
| "date": "DATE", | |
| "data_time": "DATE", | |
| "uri": "URI", | |
| "int": "INT", | |
| "int_measure": "INT", | |
| "int_currency": "INT", | |
| "integer": "INT", | |
| "real": "REAL", | |
| "real_measure": "REAL", | |
| "real_currency": "REAL", | |
| "enum_code": "ENUM_CODE", | |
| "enum_int": "ENUM_CODE", | |
| "ENUM_REAL": "ENUM_CODE", | |
| "ENUM_RATIONAL": "ENUM_CODE", | |
| "ENUM_BOOLEAN": "ENUM_CODE", | |
| "ENUM_STRING": "ENUM_CODE", | |
| "enum_reference": "ENUM_CODE", | |
| "enum_instance": "ENUM_CODE", | |
| "set(b1,b2)": "SET", | |
| "constrained_set(b1,b2,cmn,cmx)": "SET", | |
| "set [0,?]": "SET", | |
| "set [1,?]": "SET", | |
| "set [1, ?]": "SET", | |
| "nan": "NaN", | |
| "media_type": "LARGE_OBJECT_TYPE", | |
| } | |
| unit_mapping = { | |
| "nan": "NaN", | |
| "hertz": "FREQUENCY", | |
| "hz": "FREQUENCY", | |
| "pa": "PRESSURE", | |
| "pascal": "PRESSURE", | |
| "n/m²": "PRESSURE", | |
| "bar": "PRESSURE", | |
| "%": "SCALARS_PERC", | |
| "w": "POWER", | |
| "watt": "POWER", | |
| "kw": "POWER", | |
| "kg/m³": "CHEMISTRY", | |
| "m²/s": "CHEMISTRY", | |
| "pa*s": "CHEMISTRY", | |
| "v": "ELECTRICAL", | |
| "volt": "ELECTRICAL", | |
| "db": "ACOUSTICS", | |
| "db(a)": "ACOUSTICS", | |
| "k": "TEMPERATURE", | |
| "°c": "TEMPERATURE", | |
| "n": "MECHANICS", | |
| "newton": "MECHANICS", | |
| "kg/s": "FLOW", | |
| "kg/h": "FLOW", | |
| "m³/s": "FLOW", | |
| "m³/h": "FLOW", | |
| "l/s": "FLOW", | |
| "l/h": "FLOW", | |
| "µm": "LENGTH", | |
| "mm": "LENGTH", | |
| "cm": "LENGTH", | |
| "dm": "LENGTH", | |
| "m": "LENGTH", | |
| "meter": "LENGTH", | |
| "m/s": "SPEED", | |
| "km/h": "SPEED", | |
| "s^(-1)": "FREQUENCY", | |
| "1/s": "FREQUENCY", | |
| "s": "TIME", | |
| "h": "TIME", | |
| "min": "TIME", | |
| "d": "TIME", | |
| "hours": "TIME", | |
| "a": "ELECTRICAL", | |
| "m³": "VOLUME", | |
| "m²": "AREA", | |
| "rpm": "FLOW", | |
| "nm": "MECHANICS", | |
| "m/m": "MECHANICS", | |
| "m³/m²s": "MECHANICS", | |
| "w(m²*K)": "HEAT_TRANSFER", | |
| "kwh": "ELECTRICAL", | |
| "kg/(s*m²)": "FLOW", | |
| "kg": "MASS", | |
| "w/(m*k)": "HEAT_TRANSFER", | |
| "m²*k/w": "HEAT_TRANSFER", | |
| "j/s": "POWER", | |
| } | |
| dataset = df | |
| dataset["unit_lowercase"] = dataset["Unit"] | |
| dataset["unit_lowercase"] = dataset["unit_lowercase"].str.lower() | |
| dataset["unit_categ"] = dataset["unit_lowercase"].map(unit_mapping) | |
| dataset["datatype_lowercase"] = dataset["Datatype"] | |
| dataset["datatype_lowercase"] = dataset["datatype_lowercase"].str.lower() | |
| dataset["datatype_categ"] = dataset["datatype_lowercase"].map(datatype_mapping) | |
| dataset = dataset.fillna("NaN") | |
| dataset["index"] = dataset.index | |
| # uni_datatype=dataset['datatype_categ'].unique() | |
| # uni_unit=dataset['unit_categ'].unique() | |
| unique_labels_set = set() | |
| dataset["Metalabel"] = "" | |
| for i in range(0, len(dataset["Metalabel"])): | |
| concat = (str(dataset["unit_categ"][i]), str(dataset["datatype_categ"][i])) | |
| keys = [k for k, v in metalabel.items() if v == concat] | |
| dataset["Metalabel"][i] = keys[0] | |
| unique_labels_set.add(keys[0]) | |
| unique_label = list(unique_labels_set) | |
| print(unique_label) | |
| return dataset | |
| def encode(aas_df, model): | |
| # Einsatz von Sentence Bert um Embeddings zu kreieren | |
| aas_df["PreferredName"] = "Name: " + aas_df["PreferredName"].astype(str) | |
| aas_df["Definition"] = "Description: " + aas_df["Definition"].astype(str) + "; " | |
| corpus_names = aas_df.loc[:, "PreferredName"] | |
| corpus_definitions = aas_df.loc[:, "Definition"] | |
| embeddings_definitions = model.encode(corpus_definitions, show_progress_bar=True) | |
| embeddings_names = model.encode(corpus_names, show_progress_bar=True) | |
| concat_name_def_emb = np.concatenate( | |
| (embeddings_definitions, embeddings_names), axis=1 | |
| ) | |
| # aas_df['EmbeddingDefinition'] = embeddings_definitions.tolist() | |
| # aas_df['EmbeddingName'] = embeddings_names.tolist() | |
| aas_df["EmbeddingNameDefinition"] = concat_name_def_emb.tolist() | |
| return aas_df | |
| def convert_to_list(aas_df): | |
| # Für die Datenbank werden teilweise Listen gebraucht | |
| aas_index = aas_df.index.tolist() | |
| aas_index_str = [str(r) for r in aas_index] | |
| se_content = aas_df["SEContent"].tolist() | |
| se_embedding_name_definition = aas_df["EmbeddingNameDefinition"].tolist() | |
| aas_df_dropped = aas_df.drop( | |
| ["EmbeddingNameDefinition", "SEContent", "SEValue"], axis=1 | |
| ) | |
| metadata = aas_df_dropped.to_dict("records") | |
| return metadata, aas_index_str, se_content, se_embedding_name_definition | |
| def set_up_chroma( | |
| metadata, aas_index_str, se_content, se_embedding_name_definition, aas_name, client | |
| ): | |
| aas_name = aas_name.lower() | |
| # Kein Großbuchstaben in Datenbank erlaubt | |
| print(aas_name) | |
| # client = chromadb.Client(Settings( | |
| # chroma_db_impl="duckdb+parquet", | |
| # persist_directory="./drive/My Drive/Colab/NLP/SemantischeInteroperabilität/Deployment" # Optional, defaults to .chromadb/ in the current directory | |
| # )) | |
| emb_fn = embedding_functions.SentenceTransformerEmbeddingFunction( | |
| model_name="gart-labor/eng-distilBERT-se-eclass" | |
| ) | |
| collection = client.get_or_create_collection( | |
| name=aas_name, embedding_function=emb_fn | |
| ) | |
| aas_content_string = [] | |
| # Umwandeln in Json damit es in db geschrieben werden kann | |
| for element in se_content: | |
| content = json.dumps(element) | |
| aas_content_string.append(content) | |
| items = collection.count() # returns the number of items in the collection | |
| print(collection) | |
| print("Datenbank erstellt, Anzahl Items:") | |
| print(items) | |
| if items == 0: | |
| # Hinzufügen der SE Inhalte, der Embeddings und weiterer Metadaten in collection der Datenbank | |
| collection.add( | |
| documents=aas_content_string, | |
| embeddings=se_embedding_name_definition, | |
| metadatas=metadata, | |
| ids=aas_index_str, | |
| ) | |
| items = collection.count() # returns the number of items in the collection | |
| print("------------") | |
| print("Datenbank befüllt, Anzahl items:") | |
| print(items) | |
| else: | |
| print("-----------") | |
| print("AAS schon vorhanden") | |
| return collection | |
| def read_aas(aas, submodels, assets, conceptDescriptions, submodels_ids, metalabel): | |
| df = pd.DataFrame( | |
| columns=[ | |
| "AASId", | |
| "AASIdShort", | |
| "SubmodelId", | |
| "SubmodelName", | |
| "SubmodelSemanticId", | |
| "SEContent", | |
| "SESemanticId", | |
| "SEModelType", | |
| "SEIdShort", | |
| "SEValue", | |
| "Definition", | |
| "PreferredName", | |
| "Datatype", | |
| "Unit", | |
| ] | |
| ) | |
| aas_id = aas[0]["identification"]["id"] | |
| aas_name = aas[0]["idShort"] | |
| # Aufbereiten aller Concept descriptions als pandas dataframe, damit diese nachher einfacher untersucht werden können | |
| df_cd = prepare_cd(conceptDescriptions) | |
| # Auslesen der Teilmodelle | |
| for submodel in submodels: | |
| submodel_name = submodel["idShort"] | |
| submodel_id = submodel["identification"]["id"] | |
| # Muss gemacht werden, da Anzahl der Teilmodelle innerhalb der AAS und des Env nicht immer übereisntimmen | |
| if submodel_id in submodels_ids: | |
| semantic_id_existing = submodel["semanticId"]["keys"] | |
| if not semantic_id_existing: | |
| submodel_semantic_id = "Not defined" | |
| else: | |
| submodel_semantic_id = semantic_id_existing[0]["value"] | |
| submodel_elements = submodel["submodelElements"] | |
| # Auslesen Submodel Elements | |
| for submodel_element in submodel_elements: | |
| content = [] | |
| content.append(submodel_element) | |
| ( | |
| se_type, | |
| se_semantic_id, | |
| se_semantic_id_local, | |
| se_id_short, | |
| value, | |
| ) = get_values(submodel_element) | |
| # When Concept Description local dann auslesen der Concept Description | |
| if se_semantic_id_local == True: | |
| cd_content = get_concept_description(se_semantic_id, df_cd) | |
| definition = cd_content["Definition"] | |
| preferred_name = cd_content["PreferredName"] | |
| datatype = cd_content["Datatype"] | |
| unit = cd_content["Unit"] | |
| else: | |
| definition = "NaN" | |
| preferred_name = "NaN" | |
| datatype = "NaN" | |
| unit = "NaN" | |
| new_row = pd.DataFrame( | |
| { | |
| "AASId": aas_id, | |
| "AASIdShort": aas_name, | |
| "SubmodelId": submodel_id, | |
| "SubmodelName": submodel_name, | |
| "SubmodelSemanticId": submodel_semantic_id, | |
| "SEContent": content, | |
| "SESemanticId": se_semantic_id, | |
| "SEModelType": se_type, | |
| "SEIdShort": se_id_short, | |
| "SEValue": value, | |
| "Definition": definition, | |
| "PreferredName": preferred_name, | |
| "Datatype": datatype, | |
| "Unit": unit, | |
| } | |
| ) | |
| df = pd.concat([df, new_row], ignore_index=True) | |
| # Wenn Submodel Element Collection dann diese Werte auch auslesen | |
| if se_type == "SubmodelElementCollection": | |
| df = get_values_sec( | |
| df_cd, | |
| content, | |
| df, | |
| aas_id, | |
| aas_name, | |
| submodel_id, | |
| submodel_name, | |
| submodel_semantic_id, | |
| ) | |
| else: | |
| continue | |
| df = set_up_metadata(metalabel, df) | |
| return df, aas_name | |
| def index_corpus(data, model, metalabel, client_chroma): | |
| # Start Punkt | |
| aas = data["assetAdministrationShells"] | |
| aas_submodels = aas[0]["submodels"] | |
| submodels_ids = [] | |
| for submodel in aas_submodels: | |
| submodels_ids.append(submodel["keys"][0]["value"]) | |
| submodels = data["submodels"] | |
| conceptDescriptions = data["conceptDescriptions"] | |
| assets = data["assets"] | |
| aas_df, aas_name = read_aas( | |
| aas, submodels, assets, conceptDescriptions, submodels_ids, metalabel | |
| ) | |
| # aas_df_embeddings = encode(aas_df, model) | |
| aas_df = encode(aas_df, model) | |
| metadata, aas_index_str, se_content, se_embedding_name_definition = convert_to_list( | |
| aas_df | |
| ) | |
| collection = set_up_chroma( | |
| metadata, | |
| aas_index_str, | |
| se_content, | |
| se_embedding_name_definition, | |
| aas_name, | |
| client_chroma, | |
| ) | |
| return collection | |
| # if __name__ == '__main__': | |
| # create_database = index_corpus(aas = 'festo_switch.json') | |