| """
|
| full_processing.py
|
|
|
| Produce one JSONL row per software entity (schema.org/SoftwareSourceCode) from a JSON-LD file.
|
|
|
| Pipeline:
|
| 1) Load JSON-LD and build an index by @id (deep-merge duplicates).
|
| 2) Pick only roots whose @type includes SoftwareSourceCode.
|
| 3) Recursively dereference @id references (incl. blank nodes), avoiding cycles.
|
| 4) Unwrap JSON-LD value objects {"@value": ... , "@type": ...} to scalars (cast xsd types).
|
| 5) Strip JSON-LD control keys (@context/@language...) and rename @id->id, @type->type.
|
| 6) Strip known vocab prefixes from KEYS at any depth (schema.org / imaging-plaza / w3id OKN / biomedit SPHN).
|
| 7) Optionally drop keys in EXCLUDE_KEYS.
|
| 8) Write one cleaned record per software root as JSONL.
|
|
|
| Set INPUT_FILE and OUTPUT_FILE, then run.
|
| """
|
| from __future__ import annotations
|
|
|
| from pathlib import Path
|
| from typing import Any, Dict, Iterable, List, Tuple, Set
|
| import json
|
| import logging
|
|
|
|
|
| INPUT_FILE = "dataset/full_graph_new.jsonld"
|
| OUTPUT_FILE = "dataset/filtered_dataset.jsonl"
|
|
|
| EXCLUDE_KEYS: Set[str] = set([
|
|
|
| ])
|
|
|
|
|
| log = logging.getLogger("ai_agent.catalog.sync")
|
|
|
|
|
| PREFIXES: Tuple[str, ...] = (
|
|
|
| "http://schema.org/",
|
| "https://schema.org/",
|
|
|
| "https://imaging-plaza.epfl.ch/ontology#",
|
| "http://imaging-plaza.epfl.ch/ontology#",
|
|
|
| "https://w3id.org/okn/o/sd#",
|
| "http://w3id.org/okn/o/sd#",
|
|
|
| "https://biomedit.ch/rdf/sphn-schema/sphn#",
|
| "http://biomedit.ch/rdf/sphn-schema/sphn#",
|
| )
|
|
|
| SOFTWARE_TYPES: Set[str] = {
|
| "http://schema.org/SoftwareSourceCode",
|
| "https://schema.org/SoftwareSourceCode",
|
| "schema:SoftwareSourceCode",
|
| "SoftwareSourceCode",
|
| }
|
|
|
|
|
| XSD_BOOLEAN = {"http://www.w3.org/2001/XMLSchema#boolean", "xsd:boolean"}
|
| XSD_INTEGERS = {
|
| "http://www.w3.org/2001/XMLSchema#integer",
|
| "http://www.w3.org/2001/XMLSchema#long",
|
| "http://www.w3.org/2001/XMLSchema#int",
|
| "xsd:integer",
|
| "xsd:int",
|
| "xsd:long",
|
| }
|
| XSD_FLOATS = {
|
| "http://www.w3.org/2001/XMLSchema#float",
|
| "http://www.w3.org/2001/XMLSchema#double",
|
| "http://www.w3.org/2001/XMLSchema#decimal",
|
| "xsd:float",
|
| "xsd:double",
|
| "xsd:decimal",
|
| }
|
|
|
|
|
|
|
| def deep_merge(a: Any, b: Any) -> Any:
|
| """Deeply merge two JSON values (dict/list/scalars)."""
|
| if a is b or a == b:
|
| return a
|
| if isinstance(a, dict) and isinstance(b, dict):
|
| out = dict(a)
|
| for k, v in b.items():
|
| if k in out:
|
| out[k] = deep_merge(out[k], v)
|
| else:
|
| out[k] = v
|
| return out
|
| if isinstance(a, list) and isinstance(b, list):
|
| out = list(a)
|
| for x in b:
|
| if x not in out:
|
| out.append(x)
|
| return out
|
| if isinstance(a, list):
|
| return deep_merge(a, [b])
|
| if isinstance(b, list):
|
| return deep_merge([a], b)
|
|
|
| return [a, b] if a != b else a
|
|
|
| def normalize_types(t: Any) -> List[str]:
|
| """Return a list of type strings."""
|
| if t is None:
|
| return []
|
| if isinstance(t, list):
|
| return [str(x) for x in t]
|
| return [str(t)]
|
|
|
| def is_software(node: Dict[str, Any]) -> bool:
|
| """True if node's @type includes SoftwareSourceCode (accepting http/https/compact)."""
|
| types = set(normalize_types(node.get("@type")))
|
| if types & SOFTWARE_TYPES:
|
| return True
|
| for t in types:
|
| if t.endswith("SoftwareSourceCode"):
|
| return True
|
| return False
|
|
|
| def strip_key_prefix(key: Any) -> Any:
|
| """Strip known prefixes from string keys."""
|
| if not isinstance(key, str):
|
| return key
|
| for p in PREFIXES:
|
| if key.startswith(p):
|
| return key[len(p):]
|
| return key
|
|
|
| def cast_typed_value(value: Any, vtype: str) -> Any:
|
| """Cast a JSON-LD typed literal to a Python scalar when safe."""
|
| if not isinstance(value, str):
|
|
|
| return value
|
| low = value.strip().lower()
|
| if vtype in XSD_BOOLEAN:
|
| if low in ("true", "1"):
|
| return True
|
| if low in ("false", "0"):
|
| return False
|
| return value
|
| if vtype in XSD_INTEGERS:
|
| try:
|
| return int(value)
|
| except Exception:
|
| return value
|
| if vtype in XSD_FLOATS:
|
| try:
|
| return float(value)
|
| except Exception:
|
| return value
|
|
|
| return value
|
|
|
| def unwrap_value_object(obj: Dict[str, Any]) -> Any:
|
| """
|
| Unwrap JSON-LD value objects like:
|
| {"@value": "10", "@type": "xsd:integer"} -> 10
|
| {"@value": "true", "@type": "xsd:boolean"} -> True
|
| {"@value": "2023-01-01"} -> "2023-01-01"
|
| """
|
| val = obj.get("@value")
|
| vtype = obj.get("@type")
|
| if vtype:
|
| return cast_typed_value(val, vtype)
|
| return val
|
|
|
| def strip_jsonld_control(obj: Any) -> Any:
|
| """
|
| Remove JSON-LD control keys and rename @id/@type at any depth,
|
| **but first unwrap value objects** so we don't lose @value.
|
|
|
| - Value objects: {"@value":..., "@type":...} -> scalar (cast)
|
| - @id -> id
|
| - @type -> type (list or string; localize IRIs to tail segment)
|
| - other "@..." keys are dropped
|
| """
|
| if isinstance(obj, dict):
|
|
|
| if "@value" in obj:
|
| return strip_jsonld_control(unwrap_value_object(obj))
|
|
|
| out: Dict[str, Any] = {}
|
| for k, v in obj.items():
|
| if k == "@id":
|
| out["id"] = strip_jsonld_control(v)
|
| elif k == "@type":
|
| types = normalize_types(v)
|
| out["type"] = [localize_iri(x) for x in types] if len(types) > 1 else localize_iri(types[0]) if types else types
|
| elif isinstance(k, str) and k.startswith("@"):
|
|
|
| continue
|
| else:
|
| out[k] = strip_jsonld_control(v)
|
| return out
|
| if isinstance(obj, list):
|
| return [strip_jsonld_control(x) for x in obj]
|
| return obj
|
|
|
| def localize_iri(s: Any) -> Any:
|
| """Return last token after '#' or '/', otherwise the string itself."""
|
| if not isinstance(s, str):
|
| return s
|
| if "#" in s:
|
| return s.rsplit("#", 1)[-1]
|
| if "/" in s:
|
| return s.rstrip("/").rsplit("/", 1)[-1]
|
| return s
|
|
|
| def strip_prefixes_and_merge(obj: Any) -> Any:
|
| """
|
| Recursively strip vocab prefixes from DICT KEYS and deep-merge collisions.
|
| (Run this AFTER strip_jsonld_control so we don't touch '@...' keys.)
|
| """
|
| if isinstance(obj, dict):
|
| out: Dict[str, Any] = {}
|
| for k, v in obj.items():
|
| nk = strip_key_prefix(k)
|
| nv = strip_prefixes_and_merge(v)
|
| if nk in EXCLUDE_KEYS:
|
| continue
|
| if nk in out:
|
| out[nk] = deep_merge(out[nk], nv)
|
| else:
|
| out[nk] = nv
|
| return out
|
| if isinstance(obj, list):
|
| return [strip_prefixes_and_merge(x) for x in obj]
|
| return obj
|
|
|
|
|
|
|
| def build_index(graph_nodes: Iterable[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
|
| """
|
| Build and deep-merge an index of nodes by @id.
|
| If multiple nodes share the same @id, their properties are merged.
|
| Nodes without @id are ignored in the index (they'll be captured by deref from parents).
|
| """
|
| idx: Dict[str, Dict[str, Any]] = {}
|
| for n in graph_nodes:
|
| if not isinstance(n, dict):
|
| continue
|
| nid = n.get("@id")
|
| if isinstance(nid, str):
|
| if nid in idx:
|
| idx[nid] = deep_merge(idx[nid], n)
|
| else:
|
| idx[nid] = dict(n)
|
| return idx
|
|
|
| def deref(node: Any, idx: Dict[str, Dict[str, Any]], seen: Set[str] | None = None) -> Any:
|
| """
|
| Recursively dereference objects with '@id' by replacing them with their full node,
|
| merged with any inline properties. Avoid infinite loops with `seen`.
|
| """
|
| if isinstance(node, dict):
|
| node_id = node.get("@id")
|
| base = dict(node)
|
| if isinstance(node_id, str) and node_id in idx:
|
| if seen is None:
|
| seen = set()
|
| if node_id in seen:
|
| return {"@id": node_id}
|
| seen.add(node_id)
|
| merged = deep_merge(idx[node_id], base)
|
| out: Dict[str, Any] = {}
|
| for k, v in merged.items():
|
| out[k] = deref(v, idx, seen=set(seen))
|
| return out
|
| else:
|
| out: Dict[str, Any] = {}
|
| for k, v in node.items():
|
| out[k] = deref(v, idx, seen=set(seen) if seen is not None else None)
|
| return out
|
|
|
| if isinstance(node, list):
|
| return [deref(x, idx, seen=set(seen) if seen is not None else None) for x in node]
|
|
|
| return node
|
|
|
| def extract_graph(doc: Any) -> List[Dict[str, Any]]:
|
| """Return the list of nodes from a JSON-LD document regardless of shape."""
|
| if isinstance(doc, dict) and "@graph" in doc:
|
| g = doc["@graph"]
|
| return [x for x in g if isinstance(x, dict)]
|
| if isinstance(doc, list):
|
| return [x for x in doc if isinstance(x, dict)]
|
| if isinstance(doc, dict):
|
| return [doc]
|
| return []
|
|
|
| def drop_empties(obj: Any) -> Any:
|
| """Remove dict keys with None/''/[]/{} and empty list items recursively (keeps 0/False)."""
|
| if isinstance(obj, dict):
|
| out = {}
|
| for k, v in obj.items():
|
| vv = drop_empties(v)
|
| if _is_empty(vv):
|
| continue
|
| out[k] = vv
|
| return out
|
| if isinstance(obj, list):
|
| new = [drop_empties(x) for x in obj]
|
| return [x for x in new if not _is_empty(x)]
|
| return obj
|
|
|
| def _is_empty(v: Any) -> bool:
|
| if v is None:
|
| return True
|
| if isinstance(v, str) and v == "":
|
| return True
|
| if isinstance(v, dict) and len(v) == 0:
|
| return True
|
| if isinstance(v, list) and len(v) == 0:
|
| return True
|
| return False
|
|
|
| def full_processing(input_file, output_file) -> None:
|
|
|
| data = json.loads(Path(input_file).read_text(encoding="utf-8"))
|
|
|
|
|
| nodes = extract_graph(data)
|
| index = build_index(nodes)
|
|
|
|
|
| software_ids: List[str] = [nid for nid, node in index.items() if is_software(node)]
|
|
|
|
|
| out_path = Path(output_file)
|
| count = 0
|
| with out_path.open("w", encoding="utf-8") as fw:
|
| for sid in software_ids:
|
| resolved = deref(index[sid], index)
|
|
|
| cleaned = strip_jsonld_control(resolved)
|
| cleaned = strip_prefixes_and_merge(cleaned)
|
| cleaned = drop_empties(cleaned)
|
| fw.write(json.dumps(cleaned, ensure_ascii=False) + "\n")
|
| count += 1
|
|
|
| log.info("Wrote %d software records to %s", count, output_file)
|
|
|
|
|
| if __name__ == "__main__":
|
| full_processing(INPUT_FILE, OUTPUT_FILE)
|
|
|