|
|
import json
|
|
|
import re
|
|
|
from typing import Optional
|
|
|
|
|
|
from loguru import logger
|
|
|
|
|
|
from esgen.config import NUM_ITEMS, PREFIXES
|
|
|
from esgen.parser import shexj_to_shexc, namespaces_parser_helper, shexc_to_shexj
|
|
|
from esgen.queries import filter_properties, query_cardinality, query_value_type
|
|
|
from esgen.utils import get_id, get_uri
|
|
|
from esgen.verbalizer import init_comment_verbaliser
|
|
|
|
|
|
|
|
|
class BaseModel:
|
|
|
def __init__(self):
|
|
|
pass
|
|
|
|
|
|
def generate_initial_schema(self) -> [str, dict]:
|
|
|
"""
|
|
|
Generate initial draft of EntitySchema
|
|
|
:return:
|
|
|
"""
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
|
class GenerationModel(BaseModel):
|
|
|
def __init__(self):
|
|
|
super().__init__()
|
|
|
|
|
|
def generate_initial_schema(self) -> [str, dict]:
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
|
class ESGenerationModel(GenerationModel):
|
|
|
def __init__(self, name, class_id="", threshold=0.5, property_types=None):
|
|
|
"""
|
|
|
|
|
|
:param name:
|
|
|
:param class_id:
|
|
|
:param threshold:
|
|
|
:param property_types:
|
|
|
"""
|
|
|
super().__init__()
|
|
|
self.name = name
|
|
|
self.class_id = class_id
|
|
|
self.threshold = threshold
|
|
|
if not property_types:
|
|
|
self.property_types = ["wikibase:WikibaseItem"]
|
|
|
else:
|
|
|
self.property_types = property_types
|
|
|
self._pending = list()
|
|
|
self.es_shexc: str = ""
|
|
|
self.es_shexj: str = ""
|
|
|
self.comments = list()
|
|
|
|
|
|
def __str__(self):
|
|
|
return self.es_shexc
|
|
|
|
|
|
def generate_initial_schema(self):
|
|
|
expressions, comments, pending = list(), list(), list()
|
|
|
logger.info(f"Generating initial schema '{self.name}' for class '{self.class_id}'.")
|
|
|
properties = filter_properties(
|
|
|
class_id=self.class_id,
|
|
|
num_instances=NUM_ITEMS,
|
|
|
threshold=self.threshold,
|
|
|
property_types=self.property_types
|
|
|
)
|
|
|
for prop_uri, prop in properties.items():
|
|
|
if prop["frequency_level"] == "frequent":
|
|
|
prop_id = get_id(prop_uri)
|
|
|
value_type = query_value_type(self.class_id, prop_id)
|
|
|
cardinality = query_cardinality(self.class_id, prop_id)
|
|
|
if value_type:
|
|
|
expression = {
|
|
|
"type": "TripleConstraint",
|
|
|
"predicate": prop_uri,
|
|
|
"valueExpr": value_type,
|
|
|
"min": cardinality[0],
|
|
|
"max": cardinality[1],
|
|
|
}
|
|
|
else:
|
|
|
expression = {
|
|
|
"type": "TripleConstraint",
|
|
|
"predicate": prop_uri,
|
|
|
"min": cardinality[0],
|
|
|
"max": cardinality[1],
|
|
|
}
|
|
|
expressions.append(expression)
|
|
|
comment = init_comment_verbaliser(prop["label"], prop["frequency"])
|
|
|
comments.append(comment)
|
|
|
else:
|
|
|
pending.append(prop)
|
|
|
logger.info(f"{len(expressions)} properties added to initial schema, {len(pending)} properties added to the pending list.")
|
|
|
if len(expressions) <= 1:
|
|
|
shex_json = {
|
|
|
"type": "Schema",
|
|
|
"start": self.name,
|
|
|
"shapes": [
|
|
|
{
|
|
|
"type": "Shape",
|
|
|
"id": self.name,
|
|
|
"expression": expressions[0]
|
|
|
}
|
|
|
]
|
|
|
}
|
|
|
else:
|
|
|
shex_json = {
|
|
|
"type": "Schema",
|
|
|
"start": self.name,
|
|
|
"shapes": [
|
|
|
{
|
|
|
"type": "Shape",
|
|
|
"id": self.name,
|
|
|
"expression": {
|
|
|
"type": "EachOf",
|
|
|
"expressions": expressions,
|
|
|
}
|
|
|
}
|
|
|
]
|
|
|
}
|
|
|
shexj_text = json.dumps(shex_json, indent=3)
|
|
|
shexc_text = shexj_to_shexc(shexj_text, base=None, namespaces=namespaces_parser_helper(PREFIXES), comments=None)
|
|
|
|
|
|
shexc_lines = shexc_text.split("\n")
|
|
|
constraint_start_line = 0
|
|
|
for idx, line in enumerate(shexc_lines):
|
|
|
if line.startswith(f"<{self.name}>"):
|
|
|
constraint_start_line = idx
|
|
|
break
|
|
|
for idx, comment in enumerate(comments, start=1):
|
|
|
shexc_lines[constraint_start_line + idx] += comment
|
|
|
shexc_text = "\n".join(shexc_lines)
|
|
|
self.es_shexc, self.es_shexj, self._pending = shexc_text, shexj_text, pending
|
|
|
|
|
|
def load_es_shexc(self, shexc_text: str):
|
|
|
"""
|
|
|
|
|
|
:param shexc_text:
|
|
|
:return:
|
|
|
"""
|
|
|
self.es_shexc = shexc_text
|
|
|
self.es_shexj, base, namespaces, self.comments = shexc_to_shexj(self.es_shexc)
|
|
|
shex_json = json.loads(self.es_shexj)
|
|
|
try:
|
|
|
self.name = shex_json["start"]
|
|
|
except KeyError:
|
|
|
self.name = shex_json["shapes"][0]["id"]
|
|
|
expression = shex_json["shapes"][0]["expression"]
|
|
|
|
|
|
if "expressions" in expression:
|
|
|
for item in expression["expressions"]:
|
|
|
if item["predicate"] == "http://www.wikidata.org/prop/direct/P31":
|
|
|
self.class_id = get_id(item["valueExpr"]["values"][0])
|
|
|
else:
|
|
|
if expression["predicate"] == "http://www.wikidata.org/prop/direct/P31":
|
|
|
self.class_id = get_id(expression["valueExpr"]["values"][0])
|
|
|
|
|
|
def insert_triple_constraint(
|
|
|
self,
|
|
|
shape_id: str,
|
|
|
predicate: str,
|
|
|
allowed_values: str,
|
|
|
classes: Optional[str],
|
|
|
datatype: Optional[str],
|
|
|
cardinality: str,
|
|
|
comment: Optional[str]):
|
|
|
logger.info(f"Insert the triple constraint with property '{predicate}'.")
|
|
|
|
|
|
if allowed_values == "with one of a set of specific entities":
|
|
|
value_expr = {
|
|
|
"type": "NodeConstraint",
|
|
|
"values": list(map(lambda x: get_uri(x, "wd"), classes.split(",")[:-1]))
|
|
|
}
|
|
|
elif allowed_values == "with an instance of":
|
|
|
value_expr = {
|
|
|
"type": "Shape",
|
|
|
"expression": {
|
|
|
"type": "TripleConstraint",
|
|
|
"predicate": "http://www.wikidata.org/prop/direct/P31",
|
|
|
"valueExpr": {
|
|
|
"type": "NodeConstraint",
|
|
|
"values": list(map(lambda x: get_uri(x, "wd"), classes.split(",")[:-1]))
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
elif allowed_values == "with a subclass of":
|
|
|
value_expr = {
|
|
|
"type": "Shape",
|
|
|
"expression": {
|
|
|
"type": "TripleConstraint",
|
|
|
"predicate": "http://www.wikidata.org/prop/direct/P279",
|
|
|
"valueExpr": {
|
|
|
"type": "NodeConstraint",
|
|
|
"values": list(map(lambda x: get_uri(x, "wd"), classes.split(",")[:-1]))
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
elif allowed_values == "datatypes":
|
|
|
value_expr = {
|
|
|
"type": "NodeConstraint",
|
|
|
"datatype": get_uri(datatype)
|
|
|
}
|
|
|
else:
|
|
|
value_expr = None
|
|
|
|
|
|
|
|
|
cardinality_dict = {
|
|
|
"has 1 matching statement": [1, 1],
|
|
|
"has 0 or 1 matching statements": [0, 1],
|
|
|
"has 0 or more matching statements": [0, -1],
|
|
|
"has 1 or more matching statements": [1, -1],
|
|
|
"has no matching statements": [0, 0]
|
|
|
}
|
|
|
match = re.match("has [0-9]+ matching statements", cardinality)
|
|
|
if match:
|
|
|
cardinality_dict[cardinality] = [int(re.findall("[0-9]+", cardinality)[0])] * 2
|
|
|
|
|
|
|
|
|
if value_expr:
|
|
|
constraint = {
|
|
|
"type": "TripleConstraint",
|
|
|
"predicate": get_uri(predicate, "wdt"),
|
|
|
"valueExpr": value_expr,
|
|
|
"min": cardinality_dict[cardinality][0],
|
|
|
"max": cardinality_dict[cardinality][1],
|
|
|
}
|
|
|
else:
|
|
|
constraint = {
|
|
|
"type": "TripleConstraint",
|
|
|
"predicate": get_uri(predicate, "wdt"),
|
|
|
"min": cardinality_dict[cardinality][0],
|
|
|
"max": cardinality_dict[cardinality][1],
|
|
|
}
|
|
|
|
|
|
|
|
|
shex_json = json.loads(self.es_shexj)
|
|
|
for shape in shex_json["shapes"]:
|
|
|
if shape_id == shape["id"]:
|
|
|
if "expressions" in shape["expression"]:
|
|
|
shape["expression"]["expressions"].append(constraint)
|
|
|
else:
|
|
|
expressions = [shape["expression"], constraint]
|
|
|
shape["expression"] = {
|
|
|
"type": "EachOf",
|
|
|
"expressions": expressions,
|
|
|
}
|
|
|
else:
|
|
|
|
|
|
raise NotImplementedError
|
|
|
shexj_text = json.dumps(shex_json, indent=3)
|
|
|
|
|
|
_, base, namespaces, comments = shexc_to_shexj(self.es_shexc)
|
|
|
shexc_text = shexj_to_shexc(shexj_text, base, namespaces, comments)
|
|
|
if comment:
|
|
|
for line in shexc_text.split("\n"):
|
|
|
value = classes.split(",")[0] if classes else datatype
|
|
|
if predicate in line and value in line:
|
|
|
comments.append({
|
|
|
"comment": comment,
|
|
|
"type": "constraint",
|
|
|
"location": line
|
|
|
})
|
|
|
break
|
|
|
shexc_text = shexj_to_shexc(shexj_text, base, namespaces, comments)
|
|
|
self.es_shexc, self.es_shexj, self.comments = shexc_text, shexj_text, comments
|
|
|
|
|
|
@property
|
|
|
def pending(self):
|
|
|
return {"pending": self._pending}
|
|
|
|
|
|
def update_pending(self, updated_pending):
|
|
|
self._pending = updated_pending
|
|
|
|
|
|
@property
|
|
|
def shapes(self) -> list[str]:
|
|
|
shex_json = json.loads(self.es_shexj)
|
|
|
return [shape["id"] for shape in shex_json["shapes"]]
|
|
|
|