ai-agent / src /ai_agent /utils /full_processing.py
katospiegel's picture
Deploy develop: FastAPI+React frontend, multi-stage Docker (ai_agent serve)
07c2476 verified
Raw
History Blame Contribute Delete
11.9 kB
"""
full_processing.py
Produce one JSONL row per software entity (schema.org/SoftwareSourceCode) from a JSON-LD file.
Pipeline:
1) Load JSON-LD and build an index by @id (deep-merge duplicates).
2) Pick only roots whose @type includes SoftwareSourceCode.
3) Recursively dereference @id references (incl. blank nodes), avoiding cycles.
4) Unwrap JSON-LD value objects {"@value": ... , "@type": ...} to scalars (cast xsd types).
5) Strip JSON-LD control keys (@context/@language...) and rename @id->id, @type->type.
6) Strip known vocab prefixes from KEYS at any depth (schema.org / imaging-plaza / w3id OKN / biomedit SPHN).
7) Optionally drop keys in EXCLUDE_KEYS.
8) Write one cleaned record per software root as JSONL.
Set INPUT_FILE and OUTPUT_FILE, then run.
"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, Iterable, List, Tuple, Set
import json
import logging
# ---- configure here ----
INPUT_FILE = "dataset/full_graph_new.jsonld"
OUTPUT_FILE = "dataset/filtered_dataset.jsonl"
# Optionally drop certain properties anywhere (AFTER prefix stripping). Example:
EXCLUDE_KEYS: Set[str] = set([
# "bodySite",
])
# ------------------------
log = logging.getLogger("ai_agent.catalog.sync")
# Prefixes to strip from KEYS (order matters: more specific first)
PREFIXES: Tuple[str, ...] = (
# schema.org
"http://schema.org/",
"https://schema.org/",
# imaging-plaza
"https://imaging-plaza.epfl.ch/ontology#",
"http://imaging-plaza.epfl.ch/ontology#",
# OKN
"https://w3id.org/okn/o/sd#",
"http://w3id.org/okn/o/sd#",
# SPHN / biomedit
"https://biomedit.ch/rdf/sphn-schema/sphn#",
"http://biomedit.ch/rdf/sphn-schema/sphn#",
)
SOFTWARE_TYPES: Set[str] = {
"http://schema.org/SoftwareSourceCode",
"https://schema.org/SoftwareSourceCode",
"schema:SoftwareSourceCode",
"SoftwareSourceCode",
}
# Known XML Schema datatypes for safe casting
XSD_BOOLEAN = {"http://www.w3.org/2001/XMLSchema#boolean", "xsd:boolean"}
XSD_INTEGERS = {
"http://www.w3.org/2001/XMLSchema#integer",
"http://www.w3.org/2001/XMLSchema#long",
"http://www.w3.org/2001/XMLSchema#int",
"xsd:integer",
"xsd:int",
"xsd:long",
}
XSD_FLOATS = {
"http://www.w3.org/2001/XMLSchema#float",
"http://www.w3.org/2001/XMLSchema#double",
"http://www.w3.org/2001/XMLSchema#decimal",
"xsd:float",
"xsd:double",
"xsd:decimal",
}
# ---------------- utilities ----------------
def deep_merge(a: Any, b: Any) -> Any:
"""Deeply merge two JSON values (dict/list/scalars)."""
if a is b or a == b:
return a
if isinstance(a, dict) and isinstance(b, dict):
out = dict(a)
for k, v in b.items():
if k in out:
out[k] = deep_merge(out[k], v)
else:
out[k] = v
return out
if isinstance(a, list) and isinstance(b, list):
out = list(a)
for x in b:
if x not in out:
out.append(x)
return out
if isinstance(a, list):
return deep_merge(a, [b])
if isinstance(b, list):
return deep_merge([a], b)
# scalar vs dict -> list; scalar vs scalar -> 2-item list
return [a, b] if a != b else a
def normalize_types(t: Any) -> List[str]:
"""Return a list of type strings."""
if t is None:
return []
if isinstance(t, list):
return [str(x) for x in t]
return [str(t)]
def is_software(node: Dict[str, Any]) -> bool:
"""True if node's @type includes SoftwareSourceCode (accepting http/https/compact)."""
types = set(normalize_types(node.get("@type")))
if types & SOFTWARE_TYPES:
return True
for t in types:
if t.endswith("SoftwareSourceCode"):
return True
return False
def strip_key_prefix(key: Any) -> Any:
"""Strip known prefixes from string keys."""
if not isinstance(key, str):
return key
for p in PREFIXES:
if key.startswith(p):
return key[len(p):]
return key
def cast_typed_value(value: Any, vtype: str) -> Any:
"""Cast a JSON-LD typed literal to a Python scalar when safe."""
if not isinstance(value, str):
# value might already be numeric/bool
return value
low = value.strip().lower()
if vtype in XSD_BOOLEAN:
if low in ("true", "1"):
return True
if low in ("false", "0"):
return False
return value
if vtype in XSD_INTEGERS:
try:
return int(value)
except Exception:
return value
if vtype in XSD_FLOATS:
try:
return float(value)
except Exception:
return value
# For dates and unknown types, leave as string
return value
def unwrap_value_object(obj: Dict[str, Any]) -> Any:
"""
Unwrap JSON-LD value objects like:
{"@value": "10", "@type": "xsd:integer"} -> 10
{"@value": "true", "@type": "xsd:boolean"} -> True
{"@value": "2023-01-01"} -> "2023-01-01"
"""
val = obj.get("@value")
vtype = obj.get("@type")
if vtype:
return cast_typed_value(val, vtype)
return val
def strip_jsonld_control(obj: Any) -> Any:
"""
Remove JSON-LD control keys and rename @id/@type at any depth,
**but first unwrap value objects** so we don't lose @value.
- Value objects: {"@value":..., "@type":...} -> scalar (cast)
- @id -> id
- @type -> type (list or string; localize IRIs to tail segment)
- other "@..." keys are dropped
"""
if isinstance(obj, dict):
# 1) Value object handling: must come first
if "@value" in obj:
return strip_jsonld_control(unwrap_value_object(obj))
out: Dict[str, Any] = {}
for k, v in obj.items():
if k == "@id":
out["id"] = strip_jsonld_control(v)
elif k == "@type":
types = normalize_types(v)
out["type"] = [localize_iri(x) for x in types] if len(types) > 1 else localize_iri(types[0]) if types else types
elif isinstance(k, str) and k.startswith("@"):
# drop @context, @language, etc.
continue
else:
out[k] = strip_jsonld_control(v)
return out
if isinstance(obj, list):
return [strip_jsonld_control(x) for x in obj]
return obj
def localize_iri(s: Any) -> Any:
"""Return last token after '#' or '/', otherwise the string itself."""
if not isinstance(s, str):
return s
if "#" in s:
return s.rsplit("#", 1)[-1]
if "/" in s:
return s.rstrip("/").rsplit("/", 1)[-1]
return s
def strip_prefixes_and_merge(obj: Any) -> Any:
"""
Recursively strip vocab prefixes from DICT KEYS and deep-merge collisions.
(Run this AFTER strip_jsonld_control so we don't touch '@...' keys.)
"""
if isinstance(obj, dict):
out: Dict[str, Any] = {}
for k, v in obj.items():
nk = strip_key_prefix(k)
nv = strip_prefixes_and_merge(v)
if nk in EXCLUDE_KEYS:
continue
if nk in out:
out[nk] = deep_merge(out[nk], nv)
else:
out[nk] = nv
return out
if isinstance(obj, list):
return [strip_prefixes_and_merge(x) for x in obj]
return obj
# ------------- core pipeline -------------
def build_index(graph_nodes: Iterable[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""
Build and deep-merge an index of nodes by @id.
If multiple nodes share the same @id, their properties are merged.
Nodes without @id are ignored in the index (they'll be captured by deref from parents).
"""
idx: Dict[str, Dict[str, Any]] = {}
for n in graph_nodes:
if not isinstance(n, dict):
continue
nid = n.get("@id")
if isinstance(nid, str):
if nid in idx:
idx[nid] = deep_merge(idx[nid], n)
else:
idx[nid] = dict(n)
return idx
def deref(node: Any, idx: Dict[str, Dict[str, Any]], seen: Set[str] | None = None) -> Any:
"""
Recursively dereference objects with '@id' by replacing them with their full node,
merged with any inline properties. Avoid infinite loops with `seen`.
"""
if isinstance(node, dict):
node_id = node.get("@id")
base = dict(node)
if isinstance(node_id, str) and node_id in idx:
if seen is None:
seen = set()
if node_id in seen:
return {"@id": node_id}
seen.add(node_id)
merged = deep_merge(idx[node_id], base)
out: Dict[str, Any] = {}
for k, v in merged.items():
out[k] = deref(v, idx, seen=set(seen))
return out
else:
out: Dict[str, Any] = {}
for k, v in node.items():
out[k] = deref(v, idx, seen=set(seen) if seen is not None else None)
return out
if isinstance(node, list):
return [deref(x, idx, seen=set(seen) if seen is not None else None) for x in node]
return node
def extract_graph(doc: Any) -> List[Dict[str, Any]]:
"""Return the list of nodes from a JSON-LD document regardless of shape."""
if isinstance(doc, dict) and "@graph" in doc:
g = doc["@graph"]
return [x for x in g if isinstance(x, dict)]
if isinstance(doc, list):
return [x for x in doc if isinstance(x, dict)]
if isinstance(doc, dict):
return [doc]
return []
def drop_empties(obj: Any) -> Any:
"""Remove dict keys with None/''/[]/{} and empty list items recursively (keeps 0/False)."""
if isinstance(obj, dict):
out = {}
for k, v in obj.items():
vv = drop_empties(v)
if _is_empty(vv):
continue
out[k] = vv
return out
if isinstance(obj, list):
new = [drop_empties(x) for x in obj]
return [x for x in new if not _is_empty(x)]
return obj
def _is_empty(v: Any) -> bool:
if v is None:
return True
if isinstance(v, str) and v == "":
return True
if isinstance(v, dict) and len(v) == 0:
return True
if isinstance(v, list) and len(v) == 0:
return True
return False
def full_processing(input_file, output_file) -> None:
# 1) Load
data = json.loads(Path(input_file).read_text(encoding="utf-8"))
# 2) Index by @id (deep-merge duplicates)
nodes = extract_graph(data)
index = build_index(nodes)
# 3) Root selection: only SoftwareSourceCode
software_ids: List[str] = [nid for nid, node in index.items() if is_software(node)]
# 4) For each software root: deref -> unwrap values -> strip controls -> strip prefixes -> drop empties
out_path = Path(output_file)
count = 0
with out_path.open("w", encoding="utf-8") as fw:
for sid in software_ids:
resolved = deref(index[sid], index)
# Unwrap value objects FIRST, then drop @-keys / rename, then strip prefixes & merge
cleaned = strip_jsonld_control(resolved)
cleaned = strip_prefixes_and_merge(cleaned)
cleaned = drop_empties(cleaned)
fw.write(json.dumps(cleaned, ensure_ascii=False) + "\n")
count += 1
log.info("Wrote %d software records to %s", count, output_file)
if __name__ == "__main__":
full_processing(INPUT_FILE, OUTPUT_FILE)