Spaces:
Configuration error
Configuration error
| """ | |
| HuggingFace Hub Integration | |
| Push and pull dataset provenance to/from HuggingFace Hub. | |
| Exports complete W3C PROV-O accountability bundle: | |
| - cascade_provenance.json (CASCADE native format) | |
| - prov_o.jsonld (W3C PROV-O JSON-LD - interoperable) | |
| - prov_n.txt (W3C PROV-N notation - human readable) | |
| - activities.jsonl (Activity log for audit) | |
| - agents.json (Agent attributions) | |
| - croissant.json (MLCommons Croissant) | |
| """ | |
| import json | |
| import time | |
| from datetime import datetime, timezone | |
| from typing import Any, Dict, List, Optional | |
| from .provenance import ProvenanceGraph | |
| from .croissant import CroissantExporter | |
| class AccountabilityBundle: | |
| """ | |
| Complete W3C PROV-O accountability package. | |
| When a dataset is extracted, this bundle provides full audit trail: | |
| - Who created/modified it (agents) | |
| - What transformations occurred (activities) | |
| - Where it came from (entity lineage) | |
| - When everything happened (timestamps) | |
| - How to verify integrity (hashes) | |
| """ | |
| def __init__(self, graph: ProvenanceGraph): | |
| self.graph = graph | |
| self.created_at = datetime.now(timezone.utc).isoformat() | |
| def to_prov_o_jsonld(self) -> Dict[str, Any]: | |
| """Export W3C PROV-O JSON-LD (interoperable standard).""" | |
| return self.graph.to_prov_jsonld() | |
| def to_prov_n(self) -> str: | |
| """Export W3C PROV-N notation (human readable).""" | |
| return self.graph.to_prov_n() | |
| def to_activity_log(self) -> List[Dict[str, Any]]: | |
| """Export activity log for audit (JSONL format).""" | |
| activities = [] | |
| for activity in self.graph.list_activities(): | |
| activities.append({ | |
| "id": activity.id, | |
| "name": activity.name, | |
| "type": activity.activity_type.value, | |
| "started_at": datetime.fromtimestamp(activity.started_at).isoformat() if activity.started_at else None, | |
| "ended_at": datetime.fromtimestamp(activity.ended_at).isoformat() if activity.ended_at else None, | |
| "duration_seconds": activity.duration, | |
| "inputs": activity.inputs, | |
| "outputs": activity.outputs, | |
| "parameters": activity.parameters, | |
| "attributes": activity.attributes, | |
| }) | |
| return activities | |
| def to_agent_attributions(self) -> Dict[str, Any]: | |
| """Export agent attributions for accountability.""" | |
| agents = {} | |
| for agent in self.graph.list_agents(): | |
| agents[agent.id] = { | |
| "name": agent.name, | |
| "type": agent.agent_type.value, | |
| "version": agent.version, | |
| "identifier": agent.identifier, | |
| "attributes": agent.attributes, | |
| } | |
| # Build attribution matrix: which agent did what | |
| attributions = [] | |
| for rel in self.graph.list_relationships(): | |
| if rel.relation_type.value == "wasAssociatedWith": | |
| activity = self.graph.get_activity(rel.source_id) | |
| agent = self.graph.get_agent(rel.target_id) | |
| if activity and agent: | |
| attributions.append({ | |
| "activity_id": activity.id, | |
| "activity_name": activity.name, | |
| "agent_id": agent.id, | |
| "agent_name": agent.name, | |
| "timestamp": datetime.fromtimestamp(activity.started_at).isoformat() if activity.started_at else None, | |
| }) | |
| return { | |
| "agents": agents, | |
| "attributions": attributions, | |
| "total_agents": len(agents), | |
| "total_attributions": len(attributions), | |
| } | |
| def to_integrity_manifest(self) -> Dict[str, Any]: | |
| """Export integrity manifest for verification.""" | |
| is_valid, invalid_ids = self.graph.verify_integrity() | |
| return { | |
| "root_hash": self.graph.root_hash, | |
| "created_at": self.created_at, | |
| "is_valid": is_valid, | |
| "invalid_entity_ids": invalid_ids, | |
| "entity_hashes": { | |
| entity.id: { | |
| "content_hash": entity.content_hash, | |
| "schema_hash": entity.schema_hash, | |
| } | |
| for entity in self.graph.list_entities() | |
| }, | |
| "verification_note": ( | |
| "To verify: recompute content hashes and compare against this manifest. " | |
| "Any mismatch indicates data tampering." | |
| ), | |
| } | |
| def export(self, output_dir: str): | |
| """Export all accountability artifacts to a directory.""" | |
| import os | |
| os.makedirs(output_dir, exist_ok=True) | |
| # 1. CASCADE provenance JSON | |
| with open(os.path.join(output_dir, "cascade_provenance.json"), "w") as f: | |
| json.dump(self.graph.to_dict(), f, indent=2, default=str) | |
| # 2. W3C PROV-O JSON-LD | |
| with open(os.path.join(output_dir, "prov_o.jsonld"), "w") as f: | |
| json.dump(self.to_prov_o_jsonld(), f, indent=2, default=str) | |
| # 3. W3C PROV-N notation | |
| with open(os.path.join(output_dir, "prov_n.txt"), "w") as f: | |
| f.write(self.to_prov_n()) | |
| # 4. Activity log | |
| with open(os.path.join(output_dir, "activities.jsonl"), "w") as f: | |
| for activity in self.to_activity_log(): | |
| f.write(json.dumps(activity, default=str) + "\n") | |
| # 5. Agent attributions | |
| with open(os.path.join(output_dir, "agents.json"), "w") as f: | |
| json.dump(self.to_agent_attributions(), f, indent=2, default=str) | |
| # 6. Integrity manifest | |
| with open(os.path.join(output_dir, "integrity_manifest.json"), "w") as f: | |
| json.dump(self.to_integrity_manifest(), f, indent=2, default=str) | |
| # 7. Croissant metadata | |
| exporter = CroissantExporter(self.graph) | |
| croissant_content = exporter.to_json(name="dataset", url="local://") | |
| with open(os.path.join(output_dir, "croissant.json"), "w") as f: | |
| f.write(croissant_content) | |
| def summary(self) -> Dict[str, Any]: | |
| """Summary of the accountability bundle.""" | |
| stats = self.graph.stats | |
| return { | |
| "bundle_created_at": self.created_at, | |
| "graph_name": self.graph.name, | |
| "root_hash": self.graph.root_hash, | |
| "entities": stats["entities"], | |
| "activities": stats["activities"], | |
| "agents": stats["agents"], | |
| "relationships": stats["relationships"], | |
| "files_included": [ | |
| "cascade_provenance.json", | |
| "prov_o.jsonld", | |
| "prov_n.txt", | |
| "activities.jsonl", | |
| "agents.json", | |
| "integrity_manifest.json", | |
| "croissant.json", | |
| ], | |
| } | |
| class HubIntegration: | |
| """ | |
| Integration with HuggingFace Hub for dataset provenance. | |
| Stores complete accountability bundle: | |
| 1. cascade_provenance.json - CASCADE native format | |
| 2. prov_o.jsonld - W3C PROV-O JSON-LD (interoperable) | |
| 3. prov_n.txt - W3C PROV-N notation (human readable) | |
| 4. activities.jsonl - Activity log for audit | |
| 5. agents.json - Agent attributions | |
| 6. integrity_manifest.json - Hash verification | |
| 7. croissant.json - MLCommons Croissant | |
| 8. README.md - Human-readable provenance section | |
| """ | |
| PROVENANCE_FILENAME = "cascade_provenance.json" | |
| PROV_O_FILENAME = "prov_o.jsonld" | |
| PROV_N_FILENAME = "prov_n.txt" | |
| ACTIVITIES_FILENAME = "activities.jsonl" | |
| AGENTS_FILENAME = "agents.json" | |
| INTEGRITY_FILENAME = "integrity_manifest.json" | |
| CROISSANT_FILENAME = "croissant.json" | |
| def __init__(self, token: str = None): | |
| """ | |
| Initialize Hub integration. | |
| Args: | |
| token: HuggingFace API token (optional, uses cached token if not provided) | |
| """ | |
| self.token = token | |
| def push_provenance( | |
| self, | |
| graph: ProvenanceGraph, | |
| repo_id: str, | |
| commit_message: str = "Update provenance", | |
| private: bool = False, | |
| include_croissant: bool = True, | |
| full_accountability: bool = True, | |
| ) -> str: | |
| """ | |
| Push complete accountability bundle to HuggingFace Hub. | |
| Args: | |
| graph: The provenance graph to push | |
| repo_id: HuggingFace repo ID (e.g., "username/dataset-name") | |
| commit_message: Commit message | |
| private: Whether the repo should be private | |
| include_croissant: Whether to include Croissant JSON-LD | |
| full_accountability: Whether to include full W3C PROV-O bundle | |
| Returns: | |
| URL of the pushed provenance | |
| """ | |
| from huggingface_hub import HfApi, CommitOperationAdd | |
| api = HfApi(token=self.token) | |
| # Ensure repo exists | |
| api.create_repo( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| private=private, | |
| exist_ok=True, | |
| ) | |
| operations = [] | |
| bundle = AccountabilityBundle(graph) | |
| # 1. CASCADE provenance JSON (native format) | |
| provenance_content = json.dumps(graph.to_dict(), indent=2, default=str) | |
| operations.append(CommitOperationAdd( | |
| path_in_repo=self.PROVENANCE_FILENAME, | |
| path_or_fileobj=provenance_content.encode("utf-8"), | |
| )) | |
| if full_accountability: | |
| # 2. W3C PROV-O JSON-LD (interoperable standard) | |
| prov_o_content = json.dumps(bundle.to_prov_o_jsonld(), indent=2, default=str) | |
| operations.append(CommitOperationAdd( | |
| path_in_repo=self.PROV_O_FILENAME, | |
| path_or_fileobj=prov_o_content.encode("utf-8"), | |
| )) | |
| # 3. W3C PROV-N notation (human readable) | |
| prov_n_content = bundle.to_prov_n() | |
| operations.append(CommitOperationAdd( | |
| path_in_repo=self.PROV_N_FILENAME, | |
| path_or_fileobj=prov_n_content.encode("utf-8"), | |
| )) | |
| # 4. Activity log (JSONL for easy grep/audit) | |
| activities = bundle.to_activity_log() | |
| activities_content = "\n".join(json.dumps(a, default=str) for a in activities) | |
| operations.append(CommitOperationAdd( | |
| path_in_repo=self.ACTIVITIES_FILENAME, | |
| path_or_fileobj=activities_content.encode("utf-8"), | |
| )) | |
| # 5. Agent attributions | |
| agents_content = json.dumps(bundle.to_agent_attributions(), indent=2, default=str) | |
| operations.append(CommitOperationAdd( | |
| path_in_repo=self.AGENTS_FILENAME, | |
| path_or_fileobj=agents_content.encode("utf-8"), | |
| )) | |
| # 6. Integrity manifest (for verification) | |
| integrity_content = json.dumps(bundle.to_integrity_manifest(), indent=2, default=str) | |
| operations.append(CommitOperationAdd( | |
| path_in_repo=self.INTEGRITY_FILENAME, | |
| path_or_fileobj=integrity_content.encode("utf-8"), | |
| )) | |
| # 7. Croissant JSON-LD (MLCommons standard) | |
| if include_croissant: | |
| exporter = CroissantExporter(graph) | |
| croissant_content = exporter.to_json( | |
| name=repo_id.split("/")[-1], | |
| url=f"https://huggingface.co/datasets/{repo_id}", | |
| ) | |
| operations.append(CommitOperationAdd( | |
| path_in_repo=self.CROISSANT_FILENAME, | |
| path_or_fileobj=croissant_content.encode("utf-8"), | |
| )) | |
| # Commit all accountability artifacts | |
| api.create_commit( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| operations=operations, | |
| commit_message=commit_message, | |
| ) | |
| return f"https://huggingface.co/datasets/{repo_id}" | |
| def pull_provenance(self, repo_id: str) -> Optional[ProvenanceGraph]: | |
| """ | |
| Pull provenance from HuggingFace Hub. | |
| Args: | |
| repo_id: HuggingFace repo ID | |
| Returns: | |
| ProvenanceGraph if found, None otherwise | |
| """ | |
| from huggingface_hub import hf_hub_download | |
| try: | |
| # Download provenance file | |
| local_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=self.PROVENANCE_FILENAME, | |
| repo_type="dataset", | |
| token=self.token, | |
| ) | |
| with open(local_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| return ProvenanceGraph.from_dict(data) | |
| except Exception as e: | |
| print(f"Could not pull provenance from {repo_id}: {e}") | |
| return None | |
| def get_dataset_provenance_url(self, repo_id: str) -> str: | |
| """Get URL to provenance file in Hub.""" | |
| return f"https://huggingface.co/datasets/{repo_id}/blob/main/{self.PROVENANCE_FILENAME}" | |
| def update_dataset_card( | |
| self, | |
| repo_id: str, | |
| graph: ProvenanceGraph, | |
| ) -> str: | |
| """ | |
| Update dataset card with provenance summary. | |
| Adds/updates YAML front-matter with: | |
| - Lineage information | |
| - Root hash | |
| - Entity/activity counts | |
| Args: | |
| repo_id: HuggingFace repo ID | |
| graph: Provenance graph | |
| Returns: | |
| URL of the updated dataset | |
| """ | |
| from huggingface_hub import HfApi, hf_hub_download | |
| api = HfApi(token=self.token) | |
| # Build provenance section for README | |
| provenance_section = self._build_readme_section(graph) | |
| # Get current README | |
| try: | |
| readme_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename="README.md", | |
| repo_type="dataset", | |
| token=self.token, | |
| ) | |
| with open(readme_path, "r", encoding="utf-8") as f: | |
| current_readme = f.read() | |
| except: | |
| current_readme = f"# {repo_id.split('/')[-1]}\n\n" | |
| # Update or append provenance section | |
| marker_start = "<!-- CASCADE_PROVENANCE_START -->" | |
| marker_end = "<!-- CASCADE_PROVENANCE_END -->" | |
| if marker_start in current_readme: | |
| # Replace existing section | |
| import re | |
| pattern = re.escape(marker_start) + r".*?" + re.escape(marker_end) | |
| new_readme = re.sub( | |
| pattern, | |
| f"{marker_start}\n{provenance_section}\n{marker_end}", | |
| current_readme, | |
| flags=re.DOTALL, | |
| ) | |
| else: | |
| # Append section | |
| new_readme = current_readme.rstrip() + f"\n\n{marker_start}\n{provenance_section}\n{marker_end}\n" | |
| # Push updated README | |
| api.upload_file( | |
| path_or_fileobj=new_readme.encode("utf-8"), | |
| path_in_repo="README.md", | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| commit_message="Update provenance in README", | |
| ) | |
| return f"https://huggingface.co/datasets/{repo_id}" | |
| def _build_readme_section(self, graph: ProvenanceGraph) -> str: | |
| """Build provenance section for README.""" | |
| stats = graph.stats | |
| bundle = AccountabilityBundle(graph) | |
| lines = [ | |
| "## 🔗 Provenance & Accountability", | |
| "", | |
| "This dataset has CASCADE provenance tracking enabled with full W3C PROV-O compliance.", | |
| "", | |
| "### Integrity", | |
| "", | |
| f"| Metric | Value |", | |
| f"|--------|-------|", | |
| f"| Root Hash | `{graph.root_hash[:16]}...` |", | |
| f"| Entities | {stats['entities']} |", | |
| f"| Activities | {stats['activities']} |", | |
| f"| Agents | {stats['agents']} |", | |
| f"| Relationships | {stats['relationships']} |", | |
| "", | |
| ] | |
| # Add lineage summary | |
| entities = graph.list_entities() | |
| if entities: | |
| lines.append("### Lineage") | |
| lines.append("") | |
| for entity in entities[:5]: # Show first 5 | |
| upstream = graph.get_lineage(entity.id, "upstream") | |
| if upstream: | |
| lines.append(f"- **{entity.name}** derived from: {', '.join(upstream[:3])}") | |
| else: | |
| lines.append(f"- **{entity.name}** (source)") | |
| if len(entities) > 5: | |
| lines.append(f"- ... and {len(entities) - 5} more entities") | |
| lines.append("") | |
| # Add activities summary | |
| activities = graph.list_activities() | |
| if activities: | |
| lines.append("### Activities") | |
| lines.append("") | |
| for activity in activities[:5]: | |
| duration = f" ({activity.duration:.2f}s)" if activity.duration else "" | |
| lines.append(f"- **{activity.name}** [{activity.activity_type.value}]{duration}") | |
| if len(activities) > 5: | |
| lines.append(f"- ... and {len(activities) - 5} more activities") | |
| lines.append("") | |
| # Add agents summary | |
| agents = graph.list_agents() | |
| if agents: | |
| lines.append("### Agents (Accountability)") | |
| lines.append("") | |
| for agent in agents[:5]: | |
| lines.append(f"- **{agent.name}** [{agent.agent_type.value}]") | |
| if len(agents) > 5: | |
| lines.append(f"- ... and {len(agents) - 5} more agents") | |
| lines.append("") | |
| # Accountability bundle files | |
| lines.extend([ | |
| "### Accountability Bundle", | |
| "", | |
| "| File | Standard | Description |", | |
| "|------|----------|-------------|", | |
| f"| [{self.PROVENANCE_FILENAME}]({self.PROVENANCE_FILENAME}) | CASCADE | Native provenance format |", | |
| f"| [{self.PROV_O_FILENAME}]({self.PROV_O_FILENAME}) | W3C PROV-O | Interoperable JSON-LD |", | |
| f"| [{self.PROV_N_FILENAME}]({self.PROV_N_FILENAME}) | W3C PROV-N | Human-readable notation |", | |
| f"| [{self.ACTIVITIES_FILENAME}]({self.ACTIVITIES_FILENAME}) | JSONL | Activity audit log |", | |
| f"| [{self.AGENTS_FILENAME}]({self.AGENTS_FILENAME}) | JSON | Agent attributions |", | |
| f"| [{self.INTEGRITY_FILENAME}]({self.INTEGRITY_FILENAME}) | JSON | Hash verification manifest |", | |
| f"| [{self.CROISSANT_FILENAME}]({self.CROISSANT_FILENAME}) | MLCommons | Croissant metadata |", | |
| "", | |
| ]) | |
| return "\n".join(lines) | |
| def push_to_hub( | |
| graph: ProvenanceGraph, | |
| repo_id: str, | |
| token: str = None, | |
| private: bool = False, | |
| ) -> str: | |
| """ | |
| Convenience function to push provenance to Hub. | |
| Args: | |
| graph: Provenance graph to push | |
| repo_id: HuggingFace repo ID | |
| token: HF token (optional) | |
| private: Whether repo should be private | |
| Returns: | |
| URL of the pushed provenance | |
| """ | |
| hub = HubIntegration(token=token) | |
| return hub.push_provenance(graph, repo_id, private=private) | |
| def pull_from_hub(repo_id: str, token: str = None) -> Optional[ProvenanceGraph]: | |
| """ | |
| Convenience function to pull provenance from Hub. | |
| Args: | |
| repo_id: HuggingFace repo ID | |
| token: HF token (optional) | |
| Returns: | |
| ProvenanceGraph if found | |
| """ | |
| hub = HubIntegration(token=token) | |
| return hub.pull_provenance(repo_id) | |