File size: 3,694 Bytes
2dfc473
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""Hugging Face dataset uploader for final outputs."""
import json
import logging
from pathlib import Path
from datetime import datetime
from huggingface_hub import HfApi, CommitOperationAdd
from config import settings


logger = logging.getLogger(__name__)


class HFUploader:
    """Handles uploading final outputs to Hugging Face dataset."""

    def __init__(self):
        """Initialize the Hugging Face uploader."""
        self.api = HfApi()
        self.token = settings.huggingface_token
        self.dataset_id = settings.huggingface_dataset
        logger.info(f"Initialized HF uploader for dataset: {self.dataset_id}")

    def upload_final_output(self, final_data: dict, run_id: str) -> str:
        """Upload final output to Hugging Face dataset.

        Args:
            final_data: The final processed data from the pipeline
            run_id: Unique identifier for this pipeline run

        Returns:
            URL of the uploaded file
        """
        try:
            # Prepare the data
            upload_data = {
                "run_id": run_id,
                "timestamp": datetime.now().isoformat(),
                "final_output": final_data,
            }

            # Create filename
            filename = f"output_{run_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"

            # Convert to JSON string
            json_content = json.dumps(upload_data, indent=2)

            # Create commit operation
            commit_operation = CommitOperationAdd(
                path_in_repo=f"outputs/{filename}",
                path_or_fileobj=json_content.encode("utf-8"),
            )

            # Upload to dataset
            commit_info = self.api.create_commit(
                repo_id=self.dataset_id,
                repo_type="dataset",
                operations=[commit_operation],
                commit_message=f"Pipeline output: {run_id}",
                token=self.token,
            )

            file_url = f"https://huggingface.co/datasets/{self.dataset_id}/blob/main/outputs/{filename}"
            logger.info(f"Successfully uploaded to HF: {file_url}")

            return file_url

        except Exception as e:
            logger.error(f"Error uploading to Hugging Face: {str(e)}")
            raise

    def upload_pipeline_metadata(self, metadata: dict) -> str:
        """Upload pipeline metadata to Hugging Face dataset.

        Args:
            metadata: Pipeline metadata including all agent outputs

        Returns:
            URL of the uploaded metadata file
        """
        try:
            # Create filename
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"metadata_{timestamp}.json"

            # Convert to JSON string
            json_content = json.dumps(metadata, indent=2)

            # Create commit operation
            commit_operation = CommitOperationAdd(
                path_in_repo=f"metadata/{filename}",
                path_or_fileobj=json_content.encode("utf-8"),
            )

            # Upload to dataset
            commit_info = self.api.create_commit(
                repo_id=self.dataset_id,
                repo_type="dataset",
                operations=[commit_operation],
                commit_message=f"Pipeline metadata: {timestamp}",
                token=self.token,
            )

            file_url = f"https://huggingface.co/datasets/{self.dataset_id}/blob/main/metadata/{filename}"
            logger.info(f"Successfully uploaded metadata to HF: {file_url}")

            return file_url

        except Exception as e:
            logger.error(f"Error uploading metadata to Hugging Face: {str(e)}")
            raise