import json import re from typing import Optional from loguru import logger from esgen.config import NUM_ITEMS, PREFIXES from esgen.parser import shexj_to_shexc, namespaces_parser_helper, shexc_to_shexj from esgen.queries import filter_properties, query_cardinality, query_value_type from esgen.utils import get_id, get_uri from esgen.verbalizer import init_comment_verbaliser class BaseModel: def __init__(self): pass def generate_initial_schema(self) -> [str, dict]: """ Generate initial draft of EntitySchema :return: """ raise NotImplementedError class GenerationModel(BaseModel): def __init__(self): super().__init__() def generate_initial_schema(self) -> [str, dict]: raise NotImplementedError class ESGenerationModel(GenerationModel): def __init__(self, name, class_id="", threshold=0.5, property_types=None): """ :param name: :param class_id: :param threshold: :param property_types: """ super().__init__() self.name = name self.class_id = class_id self.threshold = threshold if not property_types: self.property_types = ["wikibase:WikibaseItem"] else: self.property_types = property_types self._pending = list() self.es_shexc: str = "" self.es_shexj: str = "" self.comments = list() def __str__(self): return self.es_shexc def generate_initial_schema(self): expressions, comments, pending = list(), list(), list() logger.info(f"Generating initial schema '{self.name}' for class '{self.class_id}'.") properties = filter_properties( class_id=self.class_id, num_instances=NUM_ITEMS, threshold=self.threshold, property_types=self.property_types ) for prop_uri, prop in properties.items(): if prop["frequency_level"] == "frequent": prop_id = get_id(prop_uri) value_type = query_value_type(self.class_id, prop_id) cardinality = query_cardinality(self.class_id, prop_id) if value_type: expression = { "type": "TripleConstraint", "predicate": prop_uri, "valueExpr": value_type, "min": cardinality[0], "max": cardinality[1], } else: expression = { "type": "TripleConstraint", "predicate": prop_uri, "min": cardinality[0], "max": cardinality[1], } expressions.append(expression) comment = init_comment_verbaliser(prop["label"], prop["frequency"]) comments.append(comment) else: pending.append(prop) logger.info(f"{len(expressions)} properties added to initial schema, {len(pending)} properties added to the pending list.") if len(expressions) <= 1: shex_json = { "type": "Schema", "start": self.name, "shapes": [ { "type": "Shape", "id": self.name, "expression": expressions[0] } ] } else: shex_json = { "type": "Schema", "start": self.name, "shapes": [ { "type": "Shape", "id": self.name, "expression": { "type": "EachOf", "expressions": expressions, } } ] } shexj_text = json.dumps(shex_json, indent=3) shexc_text = shexj_to_shexc(shexj_text, base=None, namespaces=namespaces_parser_helper(PREFIXES), comments=None) # insert comments shexc_lines = shexc_text.split("\n") constraint_start_line = 0 for idx, line in enumerate(shexc_lines): if line.startswith(f"<{self.name}>"): constraint_start_line = idx break for idx, comment in enumerate(comments, start=1): shexc_lines[constraint_start_line + idx] += comment shexc_text = "\n".join(shexc_lines) self.es_shexc, self.es_shexj, self._pending = shexc_text, shexj_text, pending def load_es_shexc(self, shexc_text: str): """ :param shexc_text: :return: """ self.es_shexc = shexc_text self.es_shexj, base, namespaces, self.comments = shexc_to_shexj(self.es_shexc) shex_json = json.loads(self.es_shexj) try: self.name = shex_json["start"] except KeyError: self.name = shex_json["shapes"][0]["id"] expression = shex_json["shapes"][0]["expression"] # load class_id if "expressions" in expression: for item in expression["expressions"]: if item["predicate"] == "http://www.wikidata.org/prop/direct/P31": self.class_id = get_id(item["valueExpr"]["values"][0]) else: if expression["predicate"] == "http://www.wikidata.org/prop/direct/P31": self.class_id = get_id(expression["valueExpr"]["values"][0]) def insert_triple_constraint( self, shape_id: str, predicate: str, allowed_values: str, classes: Optional[str], datatype: Optional[str], cardinality: str, comment: Optional[str]): logger.info(f"Insert the triple constraint with property '{predicate}'.") # values if allowed_values == "with one of a set of specific entities": value_expr = { "type": "NodeConstraint", "values": list(map(lambda x: get_uri(x, "wd"), classes.split(",")[:-1])) } elif allowed_values == "with an instance of": value_expr = { "type": "Shape", "expression": { "type": "TripleConstraint", "predicate": "http://www.wikidata.org/prop/direct/P31", "valueExpr": { "type": "NodeConstraint", "values": list(map(lambda x: get_uri(x, "wd"), classes.split(",")[:-1])) } } } elif allowed_values == "with a subclass of": value_expr = { "type": "Shape", "expression": { "type": "TripleConstraint", "predicate": "http://www.wikidata.org/prop/direct/P279", "valueExpr": { "type": "NodeConstraint", "values": list(map(lambda x: get_uri(x, "wd"), classes.split(",")[:-1])) } } } elif allowed_values == "datatypes": value_expr = { "type": "NodeConstraint", "datatype": get_uri(datatype) } else: # with any value value_expr = None # cardinality cardinality_dict = { "has 1 matching statement": [1, 1], "has 0 or 1 matching statements": [0, 1], "has 0 or more matching statements": [0, -1], "has 1 or more matching statements": [1, -1], "has no matching statements": [0, 0] } match = re.match("has [0-9]+ matching statements", cardinality) if match: cardinality_dict[cardinality] = [int(re.findall("[0-9]+", cardinality)[0])] * 2 # constraints if value_expr: constraint = { "type": "TripleConstraint", "predicate": get_uri(predicate, "wdt"), "valueExpr": value_expr, "min": cardinality_dict[cardinality][0], "max": cardinality_dict[cardinality][1], } else: constraint = { "type": "TripleConstraint", "predicate": get_uri(predicate, "wdt"), "min": cardinality_dict[cardinality][0], "max": cardinality_dict[cardinality][1], } # insert into shexj shex_json = json.loads(self.es_shexj) for shape in shex_json["shapes"]: if shape_id == shape["id"]: if "expressions" in shape["expression"]: shape["expression"]["expressions"].append(constraint) else: expressions = [shape["expression"], constraint] shape["expression"] = { "type": "EachOf", "expressions": expressions, } else: # TODO: add a new shape constraint raise NotImplementedError shexj_text = json.dumps(shex_json, indent=3) # insert comment _, base, namespaces, comments = shexc_to_shexj(self.es_shexc) shexc_text = shexj_to_shexc(shexj_text, base, namespaces, comments) if comment: for line in shexc_text.split("\n"): value = classes.split(",")[0] if classes else datatype if predicate in line and value in line: comments.append({ "comment": comment, "type": "constraint", "location": line }) break shexc_text = shexj_to_shexc(shexj_text, base, namespaces, comments) self.es_shexc, self.es_shexj, self.comments = shexc_text, shexj_text, comments @property def pending(self): return {"pending": self._pending} def update_pending(self, updated_pending): self._pending = updated_pending @property def shapes(self) -> list[str]: shex_json = json.loads(self.es_shexj) return [shape["id"] for shape in shex_json["shapes"]]