Spaces:
Runtime error
Runtime error
| """Manifest-to-topology compilation helpers for root snapshot hydration. | |
| These helpers turn a manifest's declared company world into the canonical | |
| topology fields the mutator, validators, and runtime expect to reason about. | |
| They intentionally keep "real login users" separate from trust-only narrative | |
| principals so the trust graph can be compiled without silently creating extra | |
| accounts in rendered services. | |
| """ | |
| from __future__ import annotations | |
| from copy import deepcopy | |
| from pathlib import PurePosixPath | |
| import re | |
| from typing import Any | |
| def build_host_catalog(manifest: dict[str, Any]) -> dict[str, dict[str, Any]]: | |
| """Return the manifest-defined host catalog keyed by host name.""" | |
| catalog: dict[str, dict[str, Any]] = {} | |
| for raw in manifest.get("topology", {}).get("hosts", []): | |
| if not isinstance(raw, dict): | |
| continue | |
| name = str(raw.get("name", "")).strip() | |
| if not name: | |
| continue | |
| catalog[name] = { | |
| "zone": str(raw.get("zone", "")), | |
| "services": deepcopy(raw.get("services", [])), | |
| "connects_to": deepcopy(raw.get("connects_to", [])), | |
| "purpose": str(raw.get("purpose", "")), | |
| "hostname": str(raw.get("hostname", "")), | |
| "os": str(raw.get("os", "")), | |
| "exposure": deepcopy(raw.get("exposure", {})), | |
| } | |
| return catalog | |
| def build_principal_catalog( | |
| manifest: dict[str, Any], | |
| existing: dict[str, Any] | None = None, | |
| ) -> tuple[dict[str, dict[str, Any]], list[str]]: | |
| """Return a canonical principal catalog plus normalized trust-only names.""" | |
| catalog: dict[str, dict[str, Any]] = {} | |
| trust_only: set[str] = set() | |
| if isinstance(existing, dict): | |
| for name, raw in existing.items(): | |
| principal = str(name).strip() | |
| if not principal or not isinstance(raw, dict): | |
| continue | |
| catalog[principal] = deepcopy(raw) | |
| for raw in manifest.get("users", []): | |
| if not isinstance(raw, dict): | |
| continue | |
| username = str(raw.get("username", "")).strip() | |
| if not username: | |
| continue | |
| principal = catalog.setdefault(username, {}) | |
| principal.update( | |
| { | |
| "username": username, | |
| "kind": "user", | |
| "is_login_account": True, | |
| "hosts": deepcopy(raw.get("hosts", [])), | |
| "department": str(raw.get("department", "")), | |
| "role": str(raw.get("role", "")), | |
| "email": str(raw.get("email", "")), | |
| "full_name": str(raw.get("full_name", "")), | |
| } | |
| ) | |
| for raw in manifest.get("trust_relationships", []): | |
| if not isinstance(raw, dict): | |
| continue | |
| source = str(raw.get("source") or raw.get("from") or "").strip() | |
| target = str(raw.get("target") or raw.get("to") or "").strip() | |
| for principal_name in (source, target): | |
| if not principal_name: | |
| continue | |
| principal = catalog.setdefault(principal_name, {}) | |
| if not principal.get("is_login_account", False): | |
| trust_only.add(principal_name) | |
| principal.setdefault("username", principal_name) | |
| principal.setdefault("kind", "trust_principal") | |
| principal.setdefault("is_login_account", False) | |
| principal.setdefault("hosts", []) | |
| principal.setdefault("department", "") | |
| principal.setdefault("role", "") | |
| principal.setdefault("email", "") | |
| principal.setdefault("full_name", "") | |
| return catalog, sorted(trust_only) | |
| def compile_manifest_topology( | |
| manifest: dict[str, Any], | |
| topology: dict[str, Any] | None = None, | |
| ) -> dict[str, Any]: | |
| """Compile manifest state into graph-friendly topology fields. | |
| Existing topology fields are preserved where possible so builder-generated | |
| details such as passwords or payload-specific knobs survive root hydration. | |
| """ | |
| compiled = deepcopy(topology) if isinstance(topology, dict) else {} | |
| company = manifest.get("company", {}) if isinstance(manifest.get("company"), dict) else {} | |
| compiled.setdefault("tier", int(manifest.get("tier", compiled.get("tier", 1)) or 1)) | |
| compiled.setdefault("domain", company.get("domain", "acmecorp.local")) | |
| compiled.setdefault("org_name", company.get("name", "AcmeCorp")) | |
| compiled.setdefault("manifest_name", manifest.get("name", "")) | |
| compiled.setdefault("difficulty", deepcopy(manifest.get("difficulty", {}))) | |
| compiled.setdefault( | |
| "networks", | |
| deepcopy(manifest.get("topology", {}).get("networks", [])), | |
| ) | |
| compiled.setdefault( | |
| "firewall_rules", | |
| deepcopy(manifest.get("topology", {}).get("firewall_rules", [])), | |
| ) | |
| host_catalog = build_host_catalog(manifest) | |
| compiled["host_catalog"] = host_catalog | |
| compiled["hosts"] = _merge_hosts(compiled.get("hosts"), host_catalog) | |
| compiled["zones"] = _merge_zones(compiled.get("zones"), host_catalog) | |
| compiled["users"] = _merge_users(compiled.get("users"), manifest) | |
| compiled["host_details"] = _merge_host_details(compiled.get("host_details"), host_catalog) | |
| compiled["dependency_edges"] = _merge_dependency_edges( | |
| compiled.get("dependency_edges"), | |
| host_catalog, | |
| ) | |
| principal_catalog, trust_only = build_principal_catalog( | |
| manifest, | |
| existing=compiled.get("principal_catalog") | |
| if isinstance(compiled.get("principal_catalog"), dict) | |
| else None, | |
| ) | |
| compiled["principal_catalog"] = principal_catalog | |
| compiled["trust_edges"] = _merge_trust_edges(compiled.get("trust_edges"), manifest) | |
| compiled["manifest_normalization"] = { | |
| "trust_only_principals": trust_only, | |
| "notes": [ | |
| ( | |
| "Normalized trust principals not present in manifest users into " | |
| "principal_catalog only" | |
| ) | |
| ] | |
| if trust_only | |
| else [], | |
| } | |
| runtime_contract = runtime_contract_from_topology(compiled, manifest=manifest) | |
| compiled["runtime_contract"] = runtime_contract | |
| compiled.setdefault("web_host", runtime_contract["web_host"]) | |
| compiled.setdefault("db_host", runtime_contract["db_host"]) | |
| compiled.setdefault("ldap_host", runtime_contract["ldap_host"]) | |
| compiled.setdefault("web_doc_root", runtime_contract["web_doc_root"]) | |
| compiled.setdefault("web_config_path", runtime_contract["web_config_path"]) | |
| compiled.setdefault("db_name", runtime_contract["db_name"]) | |
| compiled.setdefault("db_user", runtime_contract["db_user"]) | |
| compiled.setdefault("db_pass", runtime_contract["db_password"]) | |
| compiled.setdefault("db_password", runtime_contract["db_password"]) | |
| compiled.setdefault("ldap_bind_dn", runtime_contract["ldap_bind_dn"]) | |
| compiled.setdefault("ldap_bind_pw", runtime_contract["ldap_bind_pw"]) | |
| compiled.setdefault("ldap_search_base_dn", runtime_contract["ldap_search_base_dn"]) | |
| compiled.setdefault("credential_reuse_user", runtime_contract["credential_reuse_user"]) | |
| compiled.setdefault("credential_reuse_host", runtime_contract["credential_reuse_host"]) | |
| compiled.setdefault( | |
| "credential_reuse_password", | |
| runtime_contract["credential_reuse_password"], | |
| ) | |
| service_accounts = compiled.get("service_accounts") | |
| if not isinstance(service_accounts, dict): | |
| service_accounts = {} | |
| webapp = service_accounts.get("webapp") | |
| if not isinstance(webapp, dict): | |
| webapp = {} | |
| webapp.setdefault("username", runtime_contract["db_user"]) | |
| webapp.setdefault("password", runtime_contract["db_password"]) | |
| webapp.setdefault("ldap_bind_dn", runtime_contract["ldap_bind_dn"]) | |
| webapp.setdefault("ldap_bind_pw", runtime_contract["ldap_bind_pw"]) | |
| service_accounts["webapp"] = webapp | |
| compiled["service_accounts"] = service_accounts | |
| return compiled | |
| def runtime_contract_from_topology( | |
| topology: dict[str, Any] | None, | |
| *, | |
| manifest: dict[str, Any] | None = None, | |
| ) -> dict[str, str]: | |
| """Derive runtime service/account semantics from compiled topology state.""" | |
| source = topology if isinstance(topology, dict) else {} | |
| runtime = deepcopy(source.get("runtime_contract", {})) | |
| if not isinstance(runtime, dict): | |
| runtime = {} | |
| domain = _coerce_text( | |
| runtime.get("domain"), | |
| source.get("domain"), | |
| _manifest_company_domain(manifest), | |
| default="corp.local", | |
| ) | |
| host_catalog = source.get("host_catalog", {}) | |
| if not isinstance(host_catalog, dict): | |
| host_catalog = {} | |
| host_details = source.get("host_details", {}) | |
| if not isinstance(host_details, dict): | |
| host_details = {} | |
| hosts = _normalized_hosts(source.get("hosts", [])) | |
| web_host = _select_core_host( | |
| explicit=_coerce_text(runtime.get("web_host"), source.get("web_host")), | |
| hosts=hosts, | |
| host_maps=[host_catalog, host_details], | |
| preferred_names=("web", "portal", "frontend"), | |
| service_markers=("nginx", "apache", "http", "php", "gunicorn", "uvicorn"), | |
| fallback="web", | |
| ) | |
| db_host = _select_core_host( | |
| explicit=_coerce_text(runtime.get("db_host"), source.get("db_host")), | |
| hosts=hosts, | |
| host_maps=[host_catalog, host_details], | |
| preferred_names=("db", "database", "mysql"), | |
| service_markers=("mysql", "mariadb", "postgres", "postgresql", "database"), | |
| fallback="db", | |
| ) | |
| ldap_host = _select_core_host( | |
| explicit=_coerce_text(runtime.get("ldap_host"), source.get("ldap_host")), | |
| hosts=hosts, | |
| host_maps=[host_catalog, host_details], | |
| preferred_names=("ldap", "directory", "idp"), | |
| service_markers=("ldap", "openldap"), | |
| fallback="ldap", | |
| ) | |
| db_name = _coerce_text( | |
| runtime.get("db_name"), | |
| source.get("db_name"), | |
| _infer_manifest_db_name(manifest), | |
| default="referral_db", | |
| ) | |
| service_accounts = source.get("service_accounts", {}) | |
| if not isinstance(service_accounts, dict): | |
| service_accounts = {} | |
| webapp_account = service_accounts.get("webapp", {}) | |
| if not isinstance(webapp_account, dict): | |
| webapp_account = {} | |
| db_user = _coerce_text( | |
| runtime.get("db_user"), | |
| source.get("db_user"), | |
| source.get("db_app_user"), | |
| webapp_account.get("username"), | |
| ) | |
| db_password = _coerce_text( | |
| runtime.get("db_password"), | |
| runtime.get("db_pass"), | |
| source.get("db_password"), | |
| source.get("db_pass"), | |
| source.get("db_app_password"), | |
| webapp_account.get("password"), | |
| ) | |
| users = source.get("users", []) | |
| selected_user, selected_password = _pick_db_account(users, db_host) | |
| if not db_user: | |
| db_user = selected_user | |
| if not db_password: | |
| db_password = selected_password | |
| if not db_user: | |
| db_user = f"svc_{_slug_token(db_host or 'db')}" | |
| if not db_password: | |
| db_password = _predictable_service_password(db_user, domain) | |
| web_doc_root = _coerce_text( | |
| runtime.get("web_doc_root"), | |
| source.get("web_doc_root"), | |
| default="/var/www/portal", | |
| ) | |
| if not web_doc_root.startswith("/"): | |
| web_doc_root = f"/{web_doc_root}" | |
| web_doc_parent = PurePosixPath(web_doc_root).parent | |
| default_config_path = (web_doc_parent / "config.php").as_posix() | |
| if not default_config_path.startswith("/"): | |
| default_config_path = "/var/www/config.php" | |
| web_config_path = _coerce_text( | |
| runtime.get("web_config_path"), | |
| source.get("web_config_path"), | |
| default=default_config_path, | |
| ) | |
| if not web_config_path.startswith("/"): | |
| web_config_path = f"/{web_config_path}" | |
| ldap_base_dn = _domain_to_ldap_dn(domain) | |
| ldap_search_base_dn = _coerce_text( | |
| runtime.get("ldap_search_base_dn"), | |
| source.get("ldap_search_base_dn"), | |
| default=ldap_base_dn, | |
| ) | |
| ldap_bind_dn = _coerce_text( | |
| runtime.get("ldap_bind_dn"), | |
| source.get("ldap_bind_dn"), | |
| webapp_account.get("ldap_bind_dn"), | |
| default=f"cn={db_user},{ldap_base_dn}", | |
| ) | |
| ldap_bind_pw = _coerce_text( | |
| runtime.get("ldap_bind_pw"), | |
| source.get("ldap_bind_pw"), | |
| webapp_account.get("ldap_bind_pw"), | |
| default=db_password, | |
| ) | |
| credential_reuse_user = _coerce_text( | |
| runtime.get("credential_reuse_user"), | |
| source.get("credential_reuse_user"), | |
| default=db_user, | |
| ) | |
| credential_reuse_host = _coerce_text( | |
| runtime.get("credential_reuse_host"), | |
| source.get("credential_reuse_host"), | |
| default=db_host, | |
| ) | |
| credential_reuse_password = _coerce_text( | |
| runtime.get("credential_reuse_password"), | |
| source.get("credential_reuse_password"), | |
| default=ldap_bind_pw, | |
| ) | |
| return { | |
| "domain": domain, | |
| "web_host": web_host, | |
| "db_host": db_host, | |
| "ldap_host": ldap_host, | |
| "web_doc_root": web_doc_root, | |
| "web_config_path": web_config_path, | |
| "db_name": db_name, | |
| "db_user": db_user, | |
| "db_password": db_password, | |
| "ldap_bind_dn": ldap_bind_dn, | |
| "ldap_bind_pw": ldap_bind_pw, | |
| "ldap_search_base_dn": ldap_search_base_dn, | |
| "credential_reuse_user": credential_reuse_user, | |
| "credential_reuse_host": credential_reuse_host, | |
| "credential_reuse_password": credential_reuse_password, | |
| } | |
| def _manifest_company_domain(manifest: dict[str, Any] | None) -> str: | |
| if not isinstance(manifest, dict): | |
| return "" | |
| company = manifest.get("company", {}) | |
| if not isinstance(company, dict): | |
| return "" | |
| return str(company.get("domain", "")).strip() | |
| def _normalized_hosts(raw_hosts: object) -> list[str]: | |
| hosts: list[str] = [] | |
| if not isinstance(raw_hosts, list): | |
| return hosts | |
| for raw in raw_hosts: | |
| if isinstance(raw, dict): | |
| name = str(raw.get("name", "")).strip() | |
| else: | |
| name = str(raw).strip() | |
| if name and name not in hosts: | |
| hosts.append(name) | |
| return hosts | |
| def _select_core_host( | |
| *, | |
| explicit: str, | |
| hosts: list[str], | |
| host_maps: list[dict[str, Any]], | |
| preferred_names: tuple[str, ...], | |
| service_markers: tuple[str, ...], | |
| fallback: str, | |
| ) -> str: | |
| if explicit and (not hosts or explicit in hosts): | |
| return explicit | |
| for name in preferred_names: | |
| if name in hosts: | |
| return name | |
| for host in hosts: | |
| services = _host_services(host, host_maps) | |
| if not services: | |
| continue | |
| if any( | |
| marker in service | |
| for service in services | |
| for marker in service_markers | |
| ): | |
| return host | |
| for host in hosts: | |
| lowered = host.lower() | |
| if any(name in lowered for name in preferred_names): | |
| return host | |
| if hosts: | |
| return hosts[0] | |
| return fallback | |
| def _host_services(host: str, host_maps: list[dict[str, Any]]) -> list[str]: | |
| services: list[str] = [] | |
| for host_map in host_maps: | |
| detail = host_map.get(host, {}) | |
| if not isinstance(detail, dict): | |
| continue | |
| raw_services = detail.get("services", []) | |
| if not isinstance(raw_services, list): | |
| continue | |
| for raw_service in raw_services: | |
| service = str(raw_service).strip().lower() | |
| if service and service not in services: | |
| services.append(service) | |
| return services | |
| def _pick_db_account(raw_users: object, db_host: str) -> tuple[str, str]: | |
| if not isinstance(raw_users, list): | |
| return "", "" | |
| for raw in raw_users: | |
| if not isinstance(raw, dict): | |
| continue | |
| username = str(raw.get("username", "")).strip() | |
| if not username: | |
| continue | |
| hosts = raw.get("hosts", []) | |
| if not isinstance(hosts, list) or db_host not in hosts: | |
| continue | |
| password = str(raw.get("password", "")).strip() | |
| if not _is_privileged_account(raw): | |
| return username, password | |
| return "", "" | |
| def _is_privileged_account(user: dict[str, Any]) -> bool: | |
| groups = user.get("groups", []) | |
| if isinstance(groups, list): | |
| lowered = {str(group).strip().lower() for group in groups} | |
| if {"admin", "admins"} & lowered: | |
| return True | |
| role = str(user.get("role", "")).lower() | |
| return "admin" in role | |
| def _infer_manifest_db_name(manifest: dict[str, Any] | None) -> str: | |
| if not isinstance(manifest, dict): | |
| return "" | |
| for raw in manifest.get("data_inventory", []): | |
| if not isinstance(raw, dict): | |
| continue | |
| location = str(raw.get("location", "")).strip() | |
| lowered = location.lower() | |
| for prefix in ("mysql:", "db:"): | |
| if not lowered.startswith(prefix): | |
| continue | |
| raw_name = location[len(prefix):].split(".", 1)[0].strip() | |
| if raw_name: | |
| return raw_name | |
| return "" | |
| def _domain_to_ldap_dn(domain: str) -> str: | |
| parts = [part for part in domain.split(".") if part] | |
| if not parts: | |
| return "dc=corp,dc=local" | |
| return ",".join(f"dc={part}" for part in parts) | |
| def _predictable_service_password(username: str, domain: str) -> str: | |
| token = _slug_token(username).replace("_", "") | |
| if not token: | |
| token = "service" | |
| suffix = 200 + (sum(ord(ch) for ch in f"{username}:{domain}") % 700) | |
| return f"{token.capitalize()}!{suffix}" | |
| def _slug_token(value: str) -> str: | |
| token = re.sub(r"[^a-z0-9]+", "_", value.lower()).strip("_") | |
| return token or "service" | |
| def _coerce_text(*values: object, default: str = "") -> str: | |
| for value in values: | |
| if value is None: | |
| continue | |
| text = str(value).strip() | |
| if text: | |
| return text | |
| return default | |
| def _merge_hosts( | |
| raw_hosts: object, | |
| host_catalog: dict[str, dict[str, Any]], | |
| ) -> list[str]: | |
| hosts: list[str] = [] | |
| seen: set[str] = set() | |
| if isinstance(raw_hosts, list): | |
| for raw in raw_hosts: | |
| if isinstance(raw, dict): | |
| name = str(raw.get("name", "")).strip() | |
| else: | |
| name = str(raw).strip() | |
| if not name or name in seen: | |
| continue | |
| seen.add(name) | |
| hosts.append(name) | |
| for host in host_catalog: | |
| if host in seen: | |
| continue | |
| seen.add(host) | |
| hosts.append(host) | |
| return hosts | |
| def _merge_zones( | |
| raw_zones: object, | |
| host_catalog: dict[str, dict[str, Any]], | |
| ) -> dict[str, list[str]]: | |
| zones: dict[str, list[str]] = {} | |
| if isinstance(raw_zones, dict): | |
| for zone, raw_hosts in raw_zones.items(): | |
| zone_name = str(zone).strip() | |
| if not zone_name: | |
| continue | |
| zone_hosts: list[str] = [] | |
| if isinstance(raw_hosts, list): | |
| for raw_host in raw_hosts: | |
| host = str(raw_host).strip() | |
| if host and host not in zone_hosts: | |
| zone_hosts.append(host) | |
| zones[zone_name] = zone_hosts | |
| for host, raw_catalog in host_catalog.items(): | |
| zone = str(raw_catalog.get("zone", "")).strip() or "default" | |
| zone_hosts = zones.setdefault(zone, []) | |
| if host not in zone_hosts: | |
| zone_hosts.append(host) | |
| return zones | |
| def _merge_users(raw_users: object, manifest: dict[str, Any]) -> list[dict[str, Any]]: | |
| existing: dict[str, dict[str, Any]] = {} | |
| extras: list[dict[str, Any]] = [] | |
| if isinstance(raw_users, list): | |
| for raw in raw_users: | |
| if not isinstance(raw, dict): | |
| continue | |
| username = str(raw.get("username", "")).strip() | |
| if not username: | |
| continue | |
| existing[username] = deepcopy(raw) | |
| merged: list[dict[str, Any]] = [] | |
| seen: set[str] = set() | |
| for raw in manifest.get("users", []): | |
| if not isinstance(raw, dict): | |
| continue | |
| username = str(raw.get("username", "")).strip() | |
| if not username: | |
| continue | |
| record = existing.pop(username, {}) | |
| record.setdefault("username", username) | |
| record.setdefault("password", "") | |
| record.setdefault("groups", []) | |
| record.setdefault("hosts", deepcopy(raw.get("hosts", []))) | |
| record.setdefault("email", str(raw.get("email", ""))) | |
| record.setdefault("full_name", str(raw.get("full_name", ""))) | |
| record.setdefault("department", str(raw.get("department", ""))) | |
| record.setdefault("role", str(raw.get("role", ""))) | |
| merged.append(record) | |
| seen.add(username) | |
| for username, record in existing.items(): | |
| if username in seen: | |
| continue | |
| extras.append(record) | |
| merged.extend(extras) | |
| return merged | |
| def _merge_host_details( | |
| raw_details: object, | |
| host_catalog: dict[str, dict[str, Any]], | |
| ) -> dict[str, dict[str, Any]]: | |
| host_details: dict[str, dict[str, Any]] = {} | |
| if isinstance(raw_details, dict): | |
| for host, raw_detail in raw_details.items(): | |
| host_name = str(host).strip() | |
| if not host_name or not isinstance(raw_detail, dict): | |
| continue | |
| host_details[host_name] = deepcopy(raw_detail) | |
| for host, raw_catalog in host_catalog.items(): | |
| detail = host_details.setdefault(host, {}) | |
| detail.setdefault("zone", str(raw_catalog.get("zone", ""))) | |
| detail.setdefault("services", deepcopy(raw_catalog.get("services", []))) | |
| detail.setdefault("connects_to", deepcopy(raw_catalog.get("connects_to", []))) | |
| detail.setdefault("purpose", str(raw_catalog.get("purpose", ""))) | |
| detail.setdefault("hostname", str(raw_catalog.get("hostname", ""))) | |
| detail.setdefault("os", str(raw_catalog.get("os", ""))) | |
| detail.setdefault("exposure", deepcopy(raw_catalog.get("exposure", {}))) | |
| return host_details | |
| def _merge_dependency_edges( | |
| raw_edges: object, | |
| host_catalog: dict[str, dict[str, Any]], | |
| ) -> list[dict[str, str]]: | |
| edges: list[dict[str, str]] = [] | |
| seen: set[tuple[str, str]] = set() | |
| if isinstance(raw_edges, list): | |
| for raw in raw_edges: | |
| if not isinstance(raw, dict): | |
| continue | |
| source = str(raw.get("source", "")).strip() | |
| target = str(raw.get("target", "")).strip() | |
| if not source or not target or (source, target) in seen: | |
| continue | |
| edges.append({"source": source, "target": target}) | |
| seen.add((source, target)) | |
| for source, raw_catalog in host_catalog.items(): | |
| raw_targets = raw_catalog.get("connects_to", []) | |
| if not isinstance(raw_targets, list): | |
| continue | |
| for raw_target in raw_targets: | |
| target = str(raw_target).strip() | |
| if not target or (source, target) in seen: | |
| continue | |
| edges.append({"source": source, "target": target}) | |
| seen.add((source, target)) | |
| return edges | |
| def _merge_trust_edges( | |
| raw_edges: object, | |
| manifest: dict[str, Any], | |
| ) -> list[dict[str, str]]: | |
| edges: list[dict[str, str]] = [] | |
| seen: set[tuple[str, str, str]] = set() | |
| if isinstance(raw_edges, list): | |
| for raw in raw_edges: | |
| if not isinstance(raw, dict): | |
| continue | |
| source = str(raw.get("source", "")).strip() | |
| target = str(raw.get("target", "")).strip() | |
| edge_type = str(raw.get("type", "")).strip() | |
| if not source or not target or (source, target, edge_type) in seen: | |
| continue | |
| edges.append( | |
| { | |
| "source": source, | |
| "target": target, | |
| "type": edge_type, | |
| "context": str(raw.get("context", "")), | |
| } | |
| ) | |
| seen.add((source, target, edge_type)) | |
| for raw in manifest.get("trust_relationships", []): | |
| if not isinstance(raw, dict): | |
| continue | |
| source = str(raw.get("source") or raw.get("from") or "").strip() | |
| target = str(raw.get("target") or raw.get("to") or "").strip() | |
| edge_type = str(raw.get("type", "")).strip() | |
| if not source or not target or (source, target, edge_type) in seen: | |
| continue | |
| edges.append( | |
| { | |
| "source": source, | |
| "target": target, | |
| "type": edge_type, | |
| "context": str(raw.get("context") or raw.get("description") or ""), | |
| } | |
| ) | |
| seen.add((source, target, edge_type)) | |
| return edges | |