ESGen / esgen /queries.py
Bohui Zhang
Fix empty initial schema errors
9ad5c6c
import sys
from typing import Optional, Literal, Union
from datetime import datetime
from loguru import logger
from tqdm import tqdm
from collections import defaultdict
from SPARQLWrapper import SPARQLWrapper, JSON
from SPARQLWrapper.SPARQLExceptions import EndPointInternalError
from esgen.config import N_EXAMPLES
from esgen.utils import get_id
def get_sparql_results(query: str) -> dict:
user_agent = "WDQS-example Python/%s.%s" % (sys.version_info[0], sys.version_info[1])
# TODO adjust user agent; see https://w.wiki/CX6
sparql = SPARQLWrapper(endpoint="https://query.wikidata.org/sparql", agent=user_agent)
sparql.setQuery(query)
sparql.setReturnFormat(JSON)
try:
return sparql.query().convert()
except EndPointInternalError as e:
logger.error("%s: %s." % (e.__class__.__name__, e.msg))
logger.error("Input SPARQL query: %s" % query.translate({ord(i): " " for i in "\n\t"}))
return {'results': {'bindings': []}}
def collect_instances(class_id: str, instances_num: int) -> list:
"""
:param class_id:
:param instances_num:
:return:
"""
query = """SELECT DISTINCT ?item
WHERE {
?item wdt:P31 wd:%s
}
LIMIT %d
""" % (class_id, instances_num)
results = get_sparql_results(query)
instances = [result["item"]["value"] for result in results["results"]["bindings"]]
instances = [get_id(instance) for instance in instances]
return instances
def collect_properties(instances: list, property_types: list) -> dict:
"""
:param instances:
:param property_types:
:return:
"""
# properties = []
properties = defaultdict(dict)
now = datetime.now()
tqdm_desc = "%s.%s | tqdm | __main__:generate_initial_schema:110 " % (now.strftime("%Y-%m-%d %H:%M:%S"), str(now.microsecond)[:3])
for instance in tqdm(instances, desc=tqdm_desc):
query = """SELECT DISTINCT ?property ?propLabel ?propType
WHERE {
wd:%s ?property ?value .
?prop wikibase:directClaim ?property .
?prop wikibase:propertyType ?propType .
FILTER ( ?propType IN ( %s ) )
SERVICE wikibase:label { bd:serviceParam wikibase:language "en" }
}""" % (instance, ",".join(property_types))
results = get_sparql_results(query)
for result in results["results"]["bindings"]:
property_uri = result["property"]["value"]
if properties[property_uri].get("count", 0) == 0:
properties[property_uri] = {
"uri": property_uri,
"label": result["propLabel"]["value"],
"type": result["propType"]["value"],
"count": 1
}
else:
properties[property_uri]["count"] += 1
return properties
def filter_properties(class_id: str, num_instances: int, threshold: float, property_types: list) -> dict:
"""
filter properties based on the threshold
:param class_id:
:param num_instances:
:param threshold:
:param property_types:
:return:
"""
# instance_count = count_instances(class_id)
instances = collect_instances(class_id, num_instances)
num_instances = len(instances) # update num_instance in case less than 200
properties = collect_properties(instances, property_types)
for key, value in properties.items():
# prop_count = count_properties(instances, prop["property"])
# prop["count"] = prop_count
value["frequency"] = value["count"] / num_instances * 100
if value["frequency"] >= threshold:
value["frequency_level"] = "frequent"
else:
value["frequency_level"] = "infrequent"
return properties
def collect_examples(class_id: str, property_id: str) -> list:
"""
:param class_id:
:param property_id:
:return:
"""
# query = """SELECT DISTINCT ?subject ?subjectLabel ?property ?propertyLabel ?object ?objectLabel
# WHERE {
# BIND(wdt:%s AS ?prop) .
# ?subject wdt:P31 wd:%s .
# ?subject ?prop ?object .
# ?property wikibase:directClaim ?prop .
# SERVICE wikibase:label { bd:serviceParam wikibase:language "en" }
# }
# LIMIT %d""" % (property_id, class_id, N_EXAMPLES)
query = """SELECT DISTINCT ?subject ?subjectLabel ?property ?propertyLabel ?object ?objectLabel
WHERE {
?subject wdt:P31 wd:%s .
?subject wdt:%s ?object .
?property wikibase:directClaim wdt:%s .
SERVICE wikibase:label { bd:serviceParam wikibase:language "en" }
}
LIMIT %d""" % (class_id, property_id, property_id, N_EXAMPLES)
results = get_sparql_results(query)
examples = list()
for result in results["results"]["bindings"]:
examples.append({
"subject": get_id(result["subject"]["value"]),
"subjectLabel": result["subjectLabel"]["value"],
"property": get_id(result["property"]["value"]),
"propertyLabel": result["propertyLabel"]["value"],
"object": get_id(result["object"]["value"]),
"objectLabel": result["objectLabel"]["value"]
})
return examples
def query_value_type(
class_id: str,
property_id: str,
return_format: Literal["dict", "str"] = "dict") -> Optional[Union[dict, str]]:
"""
:param class_id:
:param property_id:
:param return_format:
:return:
"""
# TODO: query issues for P31 & P279
if property_id == "P31":
return {"type": "NodeConstraint", "values": [f"http://www.wikidata.org/entity/{class_id}"]}
# optimized query (trying to avoid query timeout limit reached)
query = """SELECT DISTINCT ?objectType (COUNT (?objectType) AS ?count)
WHERE {
{
SELECT ?object WHERE {
?subject wdt:P31 wd:%s .
?subject wdt:%s ?object .
}
LIMIT 200
}
BIND(IF(isIRI(?object), "IRI", datatype(?object)) AS ?objectType) .
}
GROUP BY ?objectType
ORDER BY DESC(?count)
""" % (class_id, property_id)
results = get_sparql_results(query)
if not results["results"]["bindings"]: # unknown, e.g., BlankNode
return None # "."
else:
top_value_type = results["results"]["bindings"][0]["objectType"]["value"]
if top_value_type == "IRI":
# TODO: use query_value_type_constraint
return {"type": "NodeConstraint", "nodeKind": "iri"} if return_format == "dict" else "IRI"
else:
return {"type": "NodeConstraint", "datatype": top_value_type} if return_format == "dict" else top_value_type
def query_value_type_constraint(property_id: str) -> Optional[dict]:
"""
TODO: change nesting shapes to value shapes
:param property_id:
:return: valueExpr value for ShExJ
"""
query = """SELECT ?valueType ?valueTypeLabel ?relation ?relationLabel
WHERE {
wd:%s p:P2302 ?statement .
?statement ps:P2302 wd:Q21510865 ; # value-type constraint
pq:P2308 ?valueType ; # class
pq:P2309 ?r . # relation
?r wdt:P1687 ?relation . # Wikidata property
SERVICE wikibase:label { bd:serviceParam wikibase:language "en". }
}
""" % property_id
results = get_sparql_results(query)
if not results["results"]["bindings"]: # no value-type constraint
return None
else:
results = results["results"]["bindings"]
value_type_constraint = defaultdict(list)
for item in results:
value_type_constraint[get_id(item["relation"]["value"])].append(item["valueType"]["value"])
if len(value_type_constraint) == 1:
predicate_id = next(iter(value_type_constraint))
nesting_shapes = {
"type": "Shape",
"expression": {
"type": "TripleConstraint",
"predicate": f"http://www.wikidata.org/prop/direct/{predicate_id}",
"valueExpr": {
"type": "NodeConstraint",
"values": value_type_constraint[predicate_id]
}
}
}
else:
nesting_shapes = {
"type": "Shape",
"expression": {
"type": "EachOf",
"expressions": [{
"type": "TripleConstraint",
"predicate": f"http://www.wikidata.org/prop/direct/{key}",
"valueExpr": {
"type": "NodeConstraint",
"values": value
}
} for key, value in value_type_constraint.items()]
}
}
return nesting_shapes
def query_cardinality(
class_id: str,
property_id: str,
return_format: Literal["list", "str"] = "list") -> Union[list[int], str]:
"""
Cardinalities are represented as by the strings '?' (zero or one), '+' (one or more), '*' (zero or more)
following the notation in the XML specification or {m,} to indicate a that at least m elements are required.
:param class_id:
:param property_id:
:param return_format:
:return:
"""
# TODO: fix issues for P31 & P279
if property_id == "P31" or property_id == "P279":
return [1, -1]
query = """
SELECT ?objectCount (COUNT(?objectCount) AS ?objectCountCount)
{
SELECT ?subject (COUNT(?object) AS ?objectCount)
WHERE {
{
SELECT ?subject WHERE {
?subject wdt:P31 wd:%s .
}
LIMIT 200
}
?subject wdt:%s ?object .
}
GROUP BY ?subject
}
GROUP BY ?objectCount
ORDER BY DESC(?objectCountCount)
""" % (class_id, property_id)
results = get_sparql_results(query)
if not results["results"]["bindings"]: # no objects for given (subject, property) pairs
# TODO: [0, -1] ?
return [0, 0] if return_format == "list" else "has no matching statements" # {0}
else:
results = results["results"]["bindings"]
object_count = {int(results[i]["objectCount"]["value"]): int(results[i]["objectCountCount"]["value"]) for i in range(len(results))}
if len(object_count) == 1 and next(iter(object_count)) == 1 and sum(object_count.values()) <= 200:
return [0, 1] if return_format == "list" else "has 0 or 1 matching statements" # ?
elif list(object_count.values())[0] >= 190:
# TODO: [next(iter(object_count)), -1] ?
return [next(iter(object_count)), next(iter(object_count))] if return_format == "list" else f"has {next(iter(object_count))} matching statements" # f"{{{next(iter(object_count))}}}"
elif sum(object_count.values()) == 200:
return [1, -1] if return_format == "list" else "has 1 or more matching statements" # +
else:
return [0, -1] if return_format == "list" else "has 0 or more matching statements" # *