| """ |
| Collibra Exporter - Export to Collibra Data Intelligence format. |
| |
| Collibra is an enterprise data governance and catalog platform. |
| https://www.collibra.com/ |
| """ |
|
|
| from typing import Dict, Any, List |
| from datetime import datetime |
| import uuid |
| from .base import LineageExporter, LineageGraph, LineageNode, LineageEdge |
|
|
|
|
| class CollibraExporter(LineageExporter): |
| """Export lineage to Collibra import format.""" |
|
|
| def __init__(self, graph: LineageGraph, community_name: str = "Data Lineage", |
| domain_name: str = "Physical Data Dictionary"): |
| super().__init__(graph) |
| self.community_name = community_name |
| self.domain_name = domain_name |
|
|
| @property |
| def format_name(self) -> str: |
| return "Collibra" |
|
|
| @property |
| def file_extension(self) -> str: |
| return ".json" |
|
|
| def _node_type_to_collibra_type(self, node_type: str) -> str: |
| """Map internal node types to Collibra asset types.""" |
| type_mapping = { |
| "table": "Table", |
| "view": "View", |
| "model": "Data Set", |
| "source": "Data Source", |
| "destination": "Data Target", |
| "column": "Column", |
| "database": "Database", |
| "schema": "Schema", |
| "report": "Report", |
| "dimension": "Dimension Table", |
| "fact": "Fact Table", |
| "feature_set": "Data Set", |
| "semantic_model": "Business Intelligence Report", |
| "external_api": "Data Source", |
| "extract": "Data Set" |
| } |
| return type_mapping.get(node_type.lower(), "Data Set") |
|
|
| def _edge_type_to_collibra_relation(self, edge_type: str) -> str: |
| """Map internal edge types to Collibra relation types.""" |
| relation_mapping = { |
| "transform": "is source of", |
| "reference": "references", |
| "ingest": "is source of", |
| "export": "is target of", |
| "join": "is source of", |
| "aggregate": "is source of", |
| "model": "is source of", |
| "publish": "is target of", |
| "reverse_etl": "is target of" |
| } |
| return relation_mapping.get(edge_type.lower(), "is source of") |
|
|
| def _create_asset(self, node: LineageNode) -> Dict[str, Any]: |
| """Create a Collibra asset from a node.""" |
| asset = { |
| "resourceType": "Asset", |
| "identifier": { |
| "name": node.name, |
| "domain": { |
| "name": self.domain_name, |
| "community": { |
| "name": self.community_name |
| } |
| } |
| }, |
| "type": { |
| "name": self._node_type_to_collibra_type(node.type) |
| }, |
| "displayName": node.name, |
| "attributes": {} |
| } |
|
|
| |
| if node.description: |
| asset["attributes"]["Description"] = [{"value": node.description}] |
|
|
| |
| if node.database: |
| asset["attributes"]["Technical Data Type"] = [{"value": node.database}] |
| if node.schema: |
| asset["attributes"]["Schema Name"] = [{"value": node.schema}] |
|
|
| |
| if node.owner: |
| asset["attributes"]["Data Owner"] = [{"value": node.owner}] |
|
|
| |
| if node.tags: |
| asset["attributes"]["Tags"] = [{"value": ", ".join(node.tags)}] |
|
|
| |
| if node.category: |
| asset["attributes"]["Category"] = [{"value": node.category}] |
|
|
| return asset |
|
|
| def _create_relation(self, edge: LineageEdge) -> Dict[str, Any]: |
| """Create a Collibra relation from an edge.""" |
| source_node = self.graph.get_node(edge.source) |
| target_node = self.graph.get_node(edge.target) |
|
|
| relation = { |
| "resourceType": "Relation", |
| "source": { |
| "name": source_node.name if source_node else edge.source, |
| "domain": { |
| "name": self.domain_name, |
| "community": { |
| "name": self.community_name |
| } |
| } |
| }, |
| "target": { |
| "name": target_node.name if target_node else edge.target, |
| "domain": { |
| "name": self.domain_name, |
| "community": { |
| "name": self.community_name |
| } |
| } |
| }, |
| "type": { |
| "role": self._edge_type_to_collibra_relation(edge.type), |
| "coRole": "has source", |
| "sourceType": { |
| "name": self._node_type_to_collibra_type( |
| source_node.type if source_node else "table" |
| ) |
| }, |
| "targetType": { |
| "name": self._node_type_to_collibra_type( |
| target_node.type if target_node else "table" |
| ) |
| } |
| } |
| } |
|
|
| return relation |
|
|
| def _create_column_assets(self, node: LineageNode) -> List[Dict[str, Any]]: |
| """Create Collibra column assets from a node's columns.""" |
| if not node.columns: |
| return [] |
|
|
| column_assets = [] |
| for col in node.columns: |
| column_asset = { |
| "resourceType": "Asset", |
| "identifier": { |
| "name": f"{node.name}.{col.get('name')}", |
| "domain": { |
| "name": self.domain_name, |
| "community": { |
| "name": self.community_name |
| } |
| } |
| }, |
| "type": { |
| "name": "Column" |
| }, |
| "displayName": col.get("name"), |
| "attributes": { |
| "Technical Data Type": [{"value": col.get("type") or col.get("data_type", "string")}] |
| }, |
| "relations": { |
| "Column is part of Table": [{ |
| "name": node.name, |
| "domain": { |
| "name": self.domain_name, |
| "community": { |
| "name": self.community_name |
| } |
| } |
| }] |
| } |
| } |
|
|
| if col.get("description"): |
| column_asset["attributes"]["Description"] = [{"value": col.get("description")}] |
|
|
| column_assets.append(column_asset) |
|
|
| return column_assets |
|
|
| def export(self) -> str: |
| """Export to Collibra JSON import format.""" |
| return self.to_json(indent=2) |
|
|
| def _to_dict(self) -> Dict[str, Any]: |
| """Convert to Collibra import dictionary.""" |
| |
| assets = [] |
| for node in self.graph.nodes: |
| assets.append(self._create_asset(node)) |
| |
| assets.extend(self._create_column_assets(node)) |
|
|
| |
| relations = [self._create_relation(edge) for edge in self.graph.edges] |
|
|
| return { |
| "exportInfo": { |
| "producer": "Lineage Graph Accelerator", |
| "exportedAt": self.graph.generated_at, |
| "sourceLineageName": self.graph.name, |
| "format": "Collibra Import API", |
| "version": "2.0" |
| }, |
| "community": { |
| "name": self.community_name, |
| "description": f"Data lineage imported from {self.graph.name}" |
| }, |
| "domain": { |
| "name": self.domain_name, |
| "type": "Physical Data Dictionary", |
| "community": { |
| "name": self.community_name |
| } |
| }, |
| "assets": assets, |
| "relations": relations, |
| "summary": { |
| "totalAssets": len(assets), |
| "totalRelations": len(relations), |
| "assetTypes": list(set( |
| self._node_type_to_collibra_type(n.type) for n in self.graph.nodes |
| )) |
| } |
| } |
|
|