ESGen / esgen /model.py
Bohui Zhang
Update the second version
41bef2b
import json
import re
from typing import Optional
from loguru import logger
from esgen.config import NUM_ITEMS, PREFIXES
from esgen.parser import shexj_to_shexc, namespaces_parser_helper, shexc_to_shexj
from esgen.queries import filter_properties, query_cardinality, query_value_type
from esgen.utils import get_id, get_uri
from esgen.verbalizer import init_comment_verbaliser
class BaseModel:
def __init__(self):
pass
def generate_initial_schema(self) -> [str, dict]:
"""
Generate initial draft of EntitySchema
:return:
"""
raise NotImplementedError
class GenerationModel(BaseModel):
def __init__(self):
super().__init__()
def generate_initial_schema(self) -> [str, dict]:
raise NotImplementedError
class ESGenerationModel(GenerationModel):
def __init__(self, name, class_id="", threshold=0.5, property_types=None):
"""
:param name:
:param class_id:
:param threshold:
:param property_types:
"""
super().__init__()
self.name = name
self.class_id = class_id
self.threshold = threshold
if not property_types:
self.property_types = ["wikibase:WikibaseItem"]
else:
self.property_types = property_types
self._pending = list()
self.es_shexc: str = ""
self.es_shexj: str = ""
self.comments = list()
def __str__(self):
return self.es_shexc
def generate_initial_schema(self):
expressions, comments, pending = list(), list(), list()
logger.info(f"Generating initial schema '{self.name}' for class '{self.class_id}'.")
properties = filter_properties(
class_id=self.class_id,
num_instances=NUM_ITEMS,
threshold=self.threshold,
property_types=self.property_types
)
for prop_uri, prop in properties.items():
if prop["frequency_level"] == "frequent":
prop_id = get_id(prop_uri)
value_type = query_value_type(self.class_id, prop_id)
cardinality = query_cardinality(self.class_id, prop_id)
if value_type:
expression = {
"type": "TripleConstraint",
"predicate": prop_uri,
"valueExpr": value_type,
"min": cardinality[0],
"max": cardinality[1],
}
else:
expression = {
"type": "TripleConstraint",
"predicate": prop_uri,
"min": cardinality[0],
"max": cardinality[1],
}
expressions.append(expression)
comment = init_comment_verbaliser(prop["label"], prop["frequency"])
comments.append(comment)
else:
pending.append(prop)
logger.info(f"{len(expressions)} properties added to initial schema, {len(pending)} properties added to the pending list.")
if len(expressions) <= 1:
shex_json = {
"type": "Schema",
"start": self.name,
"shapes": [
{
"type": "Shape",
"id": self.name,
"expression": expressions[0]
}
]
}
else:
shex_json = {
"type": "Schema",
"start": self.name,
"shapes": [
{
"type": "Shape",
"id": self.name,
"expression": {
"type": "EachOf",
"expressions": expressions,
}
}
]
}
shexj_text = json.dumps(shex_json, indent=3)
shexc_text = shexj_to_shexc(shexj_text, base=None, namespaces=namespaces_parser_helper(PREFIXES), comments=None)
# insert comments
shexc_lines = shexc_text.split("\n")
constraint_start_line = 0
for idx, line in enumerate(shexc_lines):
if line.startswith(f"<{self.name}>"):
constraint_start_line = idx
break
for idx, comment in enumerate(comments, start=1):
shexc_lines[constraint_start_line + idx] += comment
shexc_text = "\n".join(shexc_lines)
self.es_shexc, self.es_shexj, self._pending = shexc_text, shexj_text, pending
def load_es_shexc(self, shexc_text: str):
"""
:param shexc_text:
:return:
"""
self.es_shexc = shexc_text
self.es_shexj, base, namespaces, self.comments = shexc_to_shexj(self.es_shexc)
shex_json = json.loads(self.es_shexj)
try:
self.name = shex_json["start"]
except KeyError:
self.name = shex_json["shapes"][0]["id"]
expression = shex_json["shapes"][0]["expression"]
# load class_id
if "expressions" in expression:
for item in expression["expressions"]:
if item["predicate"] == "http://www.wikidata.org/prop/direct/P31":
self.class_id = get_id(item["valueExpr"]["values"][0])
else:
if expression["predicate"] == "http://www.wikidata.org/prop/direct/P31":
self.class_id = get_id(expression["valueExpr"]["values"][0])
def insert_triple_constraint(
self,
shape_id: str,
predicate: str,
allowed_values: str,
classes: Optional[str],
datatype: Optional[str],
cardinality: str,
comment: Optional[str]):
logger.info(f"Insert the triple constraint with property '{predicate}'.")
# values
if allowed_values == "with one of a set of specific entities":
value_expr = {
"type": "NodeConstraint",
"values": list(map(lambda x: get_uri(x, "wd"), classes.split(",")[:-1]))
}
elif allowed_values == "with an instance of":
value_expr = {
"type": "Shape",
"expression": {
"type": "TripleConstraint",
"predicate": "http://www.wikidata.org/prop/direct/P31",
"valueExpr": {
"type": "NodeConstraint",
"values": list(map(lambda x: get_uri(x, "wd"), classes.split(",")[:-1]))
}
}
}
elif allowed_values == "with a subclass of":
value_expr = {
"type": "Shape",
"expression": {
"type": "TripleConstraint",
"predicate": "http://www.wikidata.org/prop/direct/P279",
"valueExpr": {
"type": "NodeConstraint",
"values": list(map(lambda x: get_uri(x, "wd"), classes.split(",")[:-1]))
}
}
}
elif allowed_values == "datatypes":
value_expr = {
"type": "NodeConstraint",
"datatype": get_uri(datatype)
}
else: # with any value
value_expr = None
# cardinality
cardinality_dict = {
"has 1 matching statement": [1, 1],
"has 0 or 1 matching statements": [0, 1],
"has 0 or more matching statements": [0, -1],
"has 1 or more matching statements": [1, -1],
"has no matching statements": [0, 0]
}
match = re.match("has [0-9]+ matching statements", cardinality)
if match:
cardinality_dict[cardinality] = [int(re.findall("[0-9]+", cardinality)[0])] * 2
# constraints
if value_expr:
constraint = {
"type": "TripleConstraint",
"predicate": get_uri(predicate, "wdt"),
"valueExpr": value_expr,
"min": cardinality_dict[cardinality][0],
"max": cardinality_dict[cardinality][1],
}
else:
constraint = {
"type": "TripleConstraint",
"predicate": get_uri(predicate, "wdt"),
"min": cardinality_dict[cardinality][0],
"max": cardinality_dict[cardinality][1],
}
# insert into shexj
shex_json = json.loads(self.es_shexj)
for shape in shex_json["shapes"]:
if shape_id == shape["id"]:
if "expressions" in shape["expression"]:
shape["expression"]["expressions"].append(constraint)
else:
expressions = [shape["expression"], constraint]
shape["expression"] = {
"type": "EachOf",
"expressions": expressions,
}
else:
# TODO: add a new shape constraint
raise NotImplementedError
shexj_text = json.dumps(shex_json, indent=3)
# insert comment
_, base, namespaces, comments = shexc_to_shexj(self.es_shexc)
shexc_text = shexj_to_shexc(shexj_text, base, namespaces, comments)
if comment:
for line in shexc_text.split("\n"):
value = classes.split(",")[0] if classes else datatype
if predicate in line and value in line:
comments.append({
"comment": comment,
"type": "constraint",
"location": line
})
break
shexc_text = shexj_to_shexc(shexj_text, base, namespaces, comments)
self.es_shexc, self.es_shexj, self.comments = shexc_text, shexj_text, comments
@property
def pending(self):
return {"pending": self._pending}
def update_pending(self, updated_pending):
self._pending = updated_pending
@property
def shapes(self) -> list[str]:
shex_json = json.loads(self.es_shexj)
return [shape["id"] for shape in shex_json["shapes"]]