File size: 8,349 Bytes
2e93420
 
 
b70d82f
9cbbfac
2e93420
9fb3deb
c1b1880
 
 
 
 
 
 
b70d82f
c1b1880
 
 
 
 
 
b70d82f
9fb3deb
 
 
 
 
 
c1b1880
 
b70d82f
 
c1b1880
 
 
 
 
 
 
b70d82f
c1b1880
b70d82f
c1b1880
 
b70d82f
 
 
 
 
 
 
 
 
 
 
 
 
 
2e93420
 
b70d82f
2e93420
 
 
b70d82f
 
 
2e93420
b70d82f
2e93420
b70d82f
 
 
 
 
2e93420
b70d82f
 
 
 
 
 
 
 
 
 
 
2e93420
b70d82f
2e93420
b70d82f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2e93420
b70d82f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2e93420
b70d82f
 
 
 
 
2e93420
b70d82f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9fb3deb
b70d82f
 
9fb3deb
 
 
 
 
b70d82f
9fb3deb
b70d82f
9fb3deb
 
 
 
 
 
 
 
2e93420
b70d82f
2e93420
 
9fb3deb
 
 
2e93420
b70d82f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import os
import json
from pathlib import Path
from collections import defaultdict
from rdflib import Graph, URIRef, BNode, RDF, RDFS, OWL

# --- MAPPA DEI NAMESPACE--
ARCO_NAMESPACES = {
    "https://w3id.org/arco/ontology/arco/": "arco",
    "https://w3id.org/arco/ontology/core/": "core",
    "https://w3id.org/arco/ontology/location/": "a-loc",
    "https://w3id.org/arco/ontology/context-description/": "a-cd",
    "https://w3id.org/arco/ontology/denotative-description/": "a-dd",
    "https://w3id.org/arco/ontology/cultural-event/": "a-ce",
    "https://w3id.org/arco/ontology/catalogue/": "a-cat",
    "http://dati.beniculturali.it/cis/": "cis",
    "https://w3id.org/italia/onto/l0/": "l0",
    "https://w3id.org/italia/onto/CLV/": "clv",
    "https://w3id.org/italia/onto/TI/": "ti",
    "https://w3id.org/italia/onto/RO/": "ro",
    "https://w3id.org/italia/onto/SM/": "sm",
    "https://w3id.org/italia/onto/MU/": "mu",
    "http://www.cidoc-crm.org/cidoc-crm/": "crm", 
    "http://www.w3.org/2002/07/owl#": "owl",
    "http://www.w3.org/2000/01/rdf-schema#": "rdfs",
    "http://www.w3.org/1999/02/22-rdf-syntax-ns#": "rdf",
    "http://www.w3.org/2001/XMLSchema#": "xsd",
    "http://www.w3.org/2004/02/skos/core#": "skos"
}

def uri_to_qname(uri: URIRef) -> str:
    if not uri or isinstance(uri, BNode):
        return None
    uri_str = str(uri)
    best_match = ""
    for ns_uri in ARCO_NAMESPACES.keys():
        if uri_str.startswith(ns_uri) and len(ns_uri) > len(best_match):
            best_match = ns_uri
    if best_match:
        return f"{ARCO_NAMESPACES[best_match]}:{uri_str[len(best_match):].lstrip('#')}"
    
    if '#' in uri_str: return uri_str.split('#')[-1]
    return uri_str.split('/')[-1]

def get_union_classes(g: Graph, bnode: BNode):
    union_list = g.value(bnode, OWL.unionOf)
    classes = []
    if union_list:
        current = union_list
        while current and current != RDF.nil:
            item = g.value(current, RDF.first)
            if isinstance(item, URIRef):
                classes.append(uri_to_qname(item))
            current = g.value(current, RDF.rest)
    return [c for c in classes if c]

def build_domain_index_and_shacl(ontology_dir: str, output_json: str, output_shacl: str):
    print(f"⏳ Inizializzazione Graph e caricamento da {ontology_dir}...")
    g = Graph()
    
    owl_files = list(Path(ontology_dir).glob('**/*.owl'))
    for file_path in owl_files:
        try:
            g.parse(file_path, format="xml")
            print(f"  -> Caricato: {file_path.name}")
        except Exception as e:
            print(f"  ⚠️ Errore parsing {file_path.name}: {e}")

    print("✅ Ontologie caricate in memoria. Compilazione indici in corso...")

    classes_dict = {}
    properties_list = []
    
    for s in g.subjects(RDF.type, OWL.Class):
        if isinstance(s, BNode): continue
        
        qname = uri_to_qname(s)
        label = g.value(s, RDFS.label)
        comment = g.value(s, RDFS.comment)
        
        label_str = str(label) if label else qname
        for lang_label in g.objects(s, RDFS.label):
            if lang_label.language == 'it': label_str = str(lang_label)
            
        desc_str = str(comment) if comment else ""
        for lang_comment in g.objects(s, RDFS.comment):
            if lang_comment.language == 'it': desc_str = str(lang_comment)

        parents = [uri_to_qname(p) for p in g.objects(s, RDFS.subClassOf) if isinstance(p, URIRef)]
        
        classes_dict[qname] = {
            "label": label_str,
            "description": desc_str,
            "parents": parents,
            "namespace": qname.split(":")[0] if ":" in qname else "unknown"
        }

    for prop_type in [OWL.ObjectProperty, OWL.DatatypeProperty]:
        for s in g.subjects(RDF.type, prop_type):
            if isinstance(s, BNode): continue
            
            qname = uri_to_qname(s)
            label = g.value(s, RDFS.label)
            label_str = str(label) if label else qname
            
            domain_node = g.value(s, RDFS.domain)
            domains = []
            if isinstance(domain_node, URIRef):
                domains.append(uri_to_qname(domain_node))
            elif isinstance(domain_node, BNode):
                domains.extend(get_union_classes(g, domain_node))
                
            range_node = g.value(s, RDFS.range)
            ranges = []
            if isinstance(range_node, URIRef):
                ranges.append(uri_to_qname(range_node))
            elif isinstance(range_node, BNode):
                ranges.extend(get_union_classes(g, range_node))

            properties_list.append({
                "id": qname,
                "label": label_str,
                "domains": domains,
                "ranges": ranges
            })

    properties_by_domain = defaultdict(list)
    for prop in properties_list:
        for d in prop["domains"]:
            properties_by_domain[d].append({
                "id": prop["id"],
                "label": prop["label"],
                "range": prop["ranges"][0] if prop["ranges"] else "Mixed/Union",
                "inherited_from": d
            })

    def get_inherited_properties(class_qname, visited=None):
        if visited is None: visited = set()
        if class_qname in visited: return []
        visited.add(class_qname)
        
        props = list(properties_by_domain.get(class_qname, []))
        for parent in classes_dict.get(class_qname, {}).get("parents", []):
            inherited = get_inherited_properties(parent, visited)
            for p in inherited:
                if not any(existing["id"] == p["id"] for existing in props):
                    props.append(p)
        return props

    final_properties_by_domain = {}
    for cls in classes_dict.keys():
        all_props = get_inherited_properties(cls)
        if all_props:
            final_properties_by_domain[cls] = all_props

    class_embeddings_texts = {
        k: f"{v['label']} - {v['description']}" for k, v in classes_dict.items() if v['description']
    }

    domain_index = {
        "classes": classes_dict,
        "properties_by_domain": final_properties_by_domain,
        "class_embeddings_texts": class_embeddings_texts
    }
    
    os.makedirs(os.path.dirname(output_json), exist_ok=True)
    with open(output_json, 'w', encoding='utf-8') as f:
        json.dump(domain_index, f, ensure_ascii=False, indent=2)
    print(f"💾 Salvato Indice di Dominio in: {output_json}")

    os.makedirs(os.path.dirname(output_shacl), exist_ok=True)
    with open(output_shacl, 'w', encoding='utf-8') as f:
        f.write("@prefix sh: <http://www.w3.org/ns/shacl#> .\n")
        f.write("@prefix ex: <http://activadigital.it/ontology/> .\n")
        for ns_uri, prefix in ARCO_NAMESPACES.items():
            f.write(f"@prefix {prefix}: <{ns_uri}> .\n")
        f.write("\n")
        
        shape_count = 0
        for prop in properties_list:
            safe_id = prop["id"].replace(":", "_").replace("-", "_")
        
            if len(prop["domains"]) == 1:
                dom = prop["domains"][0]
                if ":" in dom and ":" in prop["id"]:
                    f.write(f"ex:{safe_id}_DomainShape a sh:NodeShape ;\n")
                    f.write(f"    sh:targetSubjectsOf {prop['id']} ;\n")
                    f.write(f"    sh:class {dom} .\n\n")
                    shape_count += 1
                
            if len(prop["ranges"]) == 1:
                rng = prop["ranges"][0]
                if ":" in rng and ":" in prop["id"]:
                    f.write(f"ex:{safe_id}_RangeShape a sh:NodeShape ;\n")
                    f.write(f"    sh:targetObjectsOf {prop['id']} ;\n")
                    if rng.startswith("xsd:") or rng == "rdfs:Literal":
                        f.write(f"    sh:datatype {rng} .\n\n")
                    else:
                        f.write(f"    sh:class {rng} .\n\n")
                    shape_count += 1

    print(f"🛡️ Generato SHACL auto_constraints.ttl con {shape_count} regole rigorose in: {output_shacl}")

if __name__ == "__main__":
    ONTOLOGY_FOLDER = "./ontology/" 
    OUTPUT_JSON = "./ontology/domain_index.json"
    OUTPUT_SHACL = "./ontology/shapes/auto_constraints.ttl"
    
    build_domain_index_and_shacl(ONTOLOGY_FOLDER, OUTPUT_JSON, OUTPUT_SHACL)