| import sys
|
| from typing import Optional, Literal, Union
|
| from datetime import datetime
|
|
|
| from loguru import logger
|
| from tqdm import tqdm
|
| from collections import defaultdict
|
|
|
| from SPARQLWrapper import SPARQLWrapper, JSON
|
| from SPARQLWrapper.SPARQLExceptions import EndPointInternalError
|
|
|
| from esgen.config import N_EXAMPLES
|
| from esgen.utils import get_id
|
|
|
|
|
| def get_sparql_results(query: str) -> dict:
|
| user_agent = "WDQS-example Python/%s.%s" % (sys.version_info[0], sys.version_info[1])
|
|
|
| sparql = SPARQLWrapper(endpoint="https://query.wikidata.org/sparql", agent=user_agent)
|
| sparql.setQuery(query)
|
| sparql.setReturnFormat(JSON)
|
| try:
|
| return sparql.query().convert()
|
| except EndPointInternalError as e:
|
| logger.error("%s: %s." % (e.__class__.__name__, e.msg))
|
| logger.error("Input SPARQL query: %s" % query.translate({ord(i): " " for i in "\n\t"}))
|
| return {'results': {'bindings': []}}
|
|
|
|
|
| def collect_instances(class_id: str, instances_num: int) -> list:
|
| """
|
|
|
| :param class_id:
|
| :param instances_num:
|
| :return:
|
| """
|
| query = """SELECT DISTINCT ?item
|
| WHERE {
|
| ?item wdt:P31 wd:%s
|
| }
|
| LIMIT %d
|
| """ % (class_id, instances_num)
|
| results = get_sparql_results(query)
|
| instances = [result["item"]["value"] for result in results["results"]["bindings"]]
|
| instances = [get_id(instance) for instance in instances]
|
|
|
| return instances
|
|
|
|
|
| def collect_properties(instances: list, property_types: list) -> dict:
|
| """
|
|
|
| :param instances:
|
| :param property_types:
|
| :return:
|
| """
|
|
|
| properties = defaultdict(dict)
|
| now = datetime.now()
|
| tqdm_desc = "%s.%s | tqdm | __main__:generate_initial_schema:110 " % (now.strftime("%Y-%m-%d %H:%M:%S"), str(now.microsecond)[:3])
|
| for instance in tqdm(instances, desc=tqdm_desc):
|
| query = """SELECT DISTINCT ?property ?propLabel ?propType
|
| WHERE {
|
| wd:%s ?property ?value .
|
| ?prop wikibase:directClaim ?property .
|
| ?prop wikibase:propertyType ?propType .
|
| FILTER ( ?propType IN ( %s ) )
|
| SERVICE wikibase:label { bd:serviceParam wikibase:language "en" }
|
| }""" % (instance, ",".join(property_types))
|
| results = get_sparql_results(query)
|
| for result in results["results"]["bindings"]:
|
| property_uri = result["property"]["value"]
|
| if properties[property_uri].get("count", 0) == 0:
|
| properties[property_uri] = {
|
| "uri": property_uri,
|
| "label": result["propLabel"]["value"],
|
| "type": result["propType"]["value"],
|
| "count": 1
|
| }
|
| else:
|
| properties[property_uri]["count"] += 1
|
| return properties
|
|
|
|
|
| def filter_properties(class_id: str, num_instances: int, threshold: float, property_types: list) -> dict:
|
| """
|
| filter properties based on the threshold
|
|
|
| :param class_id:
|
| :param num_instances:
|
| :param threshold:
|
| :param property_types:
|
| :return:
|
| """
|
|
|
| instances = collect_instances(class_id, num_instances)
|
| num_instances = len(instances)
|
| properties = collect_properties(instances, property_types)
|
| for key, value in properties.items():
|
|
|
|
|
| value["frequency"] = value["count"] / num_instances * 100
|
| if value["frequency"] >= threshold:
|
| value["frequency_level"] = "frequent"
|
| else:
|
| value["frequency_level"] = "infrequent"
|
| return properties
|
|
|
|
|
| def collect_examples(class_id: str, property_id: str) -> list:
|
| """
|
|
|
| :param class_id:
|
| :param property_id:
|
| :return:
|
| """
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| query = """SELECT DISTINCT ?subject ?subjectLabel ?property ?propertyLabel ?object ?objectLabel
|
| WHERE {
|
| ?subject wdt:P31 wd:%s .
|
| ?subject wdt:%s ?object .
|
| ?property wikibase:directClaim wdt:%s .
|
| SERVICE wikibase:label { bd:serviceParam wikibase:language "en" }
|
| }
|
| LIMIT %d""" % (class_id, property_id, property_id, N_EXAMPLES)
|
| results = get_sparql_results(query)
|
| examples = list()
|
| for result in results["results"]["bindings"]:
|
| examples.append({
|
| "subject": get_id(result["subject"]["value"]),
|
| "subjectLabel": result["subjectLabel"]["value"],
|
| "property": get_id(result["property"]["value"]),
|
| "propertyLabel": result["propertyLabel"]["value"],
|
| "object": get_id(result["object"]["value"]),
|
| "objectLabel": result["objectLabel"]["value"]
|
| })
|
| return examples
|
|
|
|
|
| def query_value_type(
|
| class_id: str,
|
| property_id: str,
|
| return_format: Literal["dict", "str"] = "dict") -> Optional[Union[dict, str]]:
|
| """
|
|
|
| :param class_id:
|
| :param property_id:
|
| :param return_format:
|
| :return:
|
| """
|
|
|
| if property_id == "P31":
|
| return {"type": "NodeConstraint", "values": [f"http://www.wikidata.org/entity/{class_id}"]}
|
|
|
| query = """SELECT DISTINCT ?objectType (COUNT (?objectType) AS ?count)
|
| WHERE {
|
| {
|
| SELECT ?object WHERE {
|
| ?subject wdt:P31 wd:%s .
|
| ?subject wdt:%s ?object .
|
| }
|
| LIMIT 200
|
| }
|
| BIND(IF(isIRI(?object), "IRI", datatype(?object)) AS ?objectType) .
|
| }
|
| GROUP BY ?objectType
|
| ORDER BY DESC(?count)
|
| """ % (class_id, property_id)
|
| results = get_sparql_results(query)
|
| if not results["results"]["bindings"]:
|
| return None
|
| else:
|
| top_value_type = results["results"]["bindings"][0]["objectType"]["value"]
|
| if top_value_type == "IRI":
|
|
|
| return {"type": "NodeConstraint", "nodeKind": "iri"} if return_format == "dict" else "IRI"
|
| else:
|
| return {"type": "NodeConstraint", "datatype": top_value_type} if return_format == "dict" else top_value_type
|
|
|
|
|
| def query_value_type_constraint(property_id: str) -> Optional[dict]:
|
| """
|
| TODO: change nesting shapes to value shapes
|
| :param property_id:
|
| :return: valueExpr value for ShExJ
|
| """
|
| query = """SELECT ?valueType ?valueTypeLabel ?relation ?relationLabel
|
| WHERE {
|
| wd:%s p:P2302 ?statement .
|
| ?statement ps:P2302 wd:Q21510865 ; # value-type constraint
|
| pq:P2308 ?valueType ; # class
|
| pq:P2309 ?r . # relation
|
| ?r wdt:P1687 ?relation . # Wikidata property
|
| SERVICE wikibase:label { bd:serviceParam wikibase:language "en". }
|
| }
|
| """ % property_id
|
| results = get_sparql_results(query)
|
| if not results["results"]["bindings"]:
|
| return None
|
| else:
|
| results = results["results"]["bindings"]
|
| value_type_constraint = defaultdict(list)
|
| for item in results:
|
| value_type_constraint[get_id(item["relation"]["value"])].append(item["valueType"]["value"])
|
| if len(value_type_constraint) == 1:
|
| predicate_id = next(iter(value_type_constraint))
|
| nesting_shapes = {
|
| "type": "Shape",
|
| "expression": {
|
| "type": "TripleConstraint",
|
| "predicate": f"http://www.wikidata.org/prop/direct/{predicate_id}",
|
| "valueExpr": {
|
| "type": "NodeConstraint",
|
| "values": value_type_constraint[predicate_id]
|
| }
|
| }
|
| }
|
| else:
|
| nesting_shapes = {
|
| "type": "Shape",
|
| "expression": {
|
| "type": "EachOf",
|
| "expressions": [{
|
| "type": "TripleConstraint",
|
| "predicate": f"http://www.wikidata.org/prop/direct/{key}",
|
| "valueExpr": {
|
| "type": "NodeConstraint",
|
| "values": value
|
| }
|
| } for key, value in value_type_constraint.items()]
|
| }
|
| }
|
|
|
| return nesting_shapes
|
|
|
|
|
| def query_cardinality(
|
| class_id: str,
|
| property_id: str,
|
| return_format: Literal["list", "str"] = "list") -> Union[list[int], str]:
|
| """
|
| Cardinalities are represented as by the strings '?' (zero or one), '+' (one or more), '*' (zero or more)
|
| following the notation in the XML specification or {m,} to indicate a that at least m elements are required.
|
| :param class_id:
|
| :param property_id:
|
| :param return_format:
|
| :return:
|
| """
|
|
|
| if property_id == "P31" or property_id == "P279":
|
| return [1, -1]
|
| query = """
|
| SELECT ?objectCount (COUNT(?objectCount) AS ?objectCountCount)
|
| {
|
| SELECT ?subject (COUNT(?object) AS ?objectCount)
|
| WHERE {
|
| {
|
| SELECT ?subject WHERE {
|
| ?subject wdt:P31 wd:%s .
|
| }
|
| LIMIT 200
|
| }
|
| ?subject wdt:%s ?object .
|
| }
|
| GROUP BY ?subject
|
| }
|
| GROUP BY ?objectCount
|
| ORDER BY DESC(?objectCountCount)
|
| """ % (class_id, property_id)
|
| results = get_sparql_results(query)
|
| if not results["results"]["bindings"]:
|
|
|
| return [0, 0] if return_format == "list" else "has no matching statements"
|
| else:
|
| results = results["results"]["bindings"]
|
| object_count = {int(results[i]["objectCount"]["value"]): int(results[i]["objectCountCount"]["value"]) for i in range(len(results))}
|
| if len(object_count) == 1 and next(iter(object_count)) == 1 and sum(object_count.values()) <= 200:
|
| return [0, 1] if return_format == "list" else "has 0 or 1 matching statements"
|
| elif list(object_count.values())[0] >= 190:
|
|
|
| return [next(iter(object_count)), next(iter(object_count))] if return_format == "list" else f"has {next(iter(object_count))} matching statements"
|
| elif sum(object_count.values()) == 200:
|
| return [1, -1] if return_format == "list" else "has 1 or more matching statements"
|
| else:
|
| return [0, -1] if return_format == "list" else "has 0 or more matching statements"
|
|
|