Spaces:
Sleeping
Sleeping
File size: 3,694 Bytes
2dfc473 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | """Hugging Face dataset uploader for final outputs."""
import json
import logging
from pathlib import Path
from datetime import datetime
from huggingface_hub import HfApi, CommitOperationAdd
from config import settings
logger = logging.getLogger(__name__)
class HFUploader:
"""Handles uploading final outputs to Hugging Face dataset."""
def __init__(self):
"""Initialize the Hugging Face uploader."""
self.api = HfApi()
self.token = settings.huggingface_token
self.dataset_id = settings.huggingface_dataset
logger.info(f"Initialized HF uploader for dataset: {self.dataset_id}")
def upload_final_output(self, final_data: dict, run_id: str) -> str:
"""Upload final output to Hugging Face dataset.
Args:
final_data: The final processed data from the pipeline
run_id: Unique identifier for this pipeline run
Returns:
URL of the uploaded file
"""
try:
# Prepare the data
upload_data = {
"run_id": run_id,
"timestamp": datetime.now().isoformat(),
"final_output": final_data,
}
# Create filename
filename = f"output_{run_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
# Convert to JSON string
json_content = json.dumps(upload_data, indent=2)
# Create commit operation
commit_operation = CommitOperationAdd(
path_in_repo=f"outputs/{filename}",
path_or_fileobj=json_content.encode("utf-8"),
)
# Upload to dataset
commit_info = self.api.create_commit(
repo_id=self.dataset_id,
repo_type="dataset",
operations=[commit_operation],
commit_message=f"Pipeline output: {run_id}",
token=self.token,
)
file_url = f"https://huggingface.co/datasets/{self.dataset_id}/blob/main/outputs/{filename}"
logger.info(f"Successfully uploaded to HF: {file_url}")
return file_url
except Exception as e:
logger.error(f"Error uploading to Hugging Face: {str(e)}")
raise
def upload_pipeline_metadata(self, metadata: dict) -> str:
"""Upload pipeline metadata to Hugging Face dataset.
Args:
metadata: Pipeline metadata including all agent outputs
Returns:
URL of the uploaded metadata file
"""
try:
# Create filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"metadata_{timestamp}.json"
# Convert to JSON string
json_content = json.dumps(metadata, indent=2)
# Create commit operation
commit_operation = CommitOperationAdd(
path_in_repo=f"metadata/{filename}",
path_or_fileobj=json_content.encode("utf-8"),
)
# Upload to dataset
commit_info = self.api.create_commit(
repo_id=self.dataset_id,
repo_type="dataset",
operations=[commit_operation],
commit_message=f"Pipeline metadata: {timestamp}",
token=self.token,
)
file_url = f"https://huggingface.co/datasets/{self.dataset_id}/blob/main/metadata/{filename}"
logger.info(f"Successfully uploaded metadata to HF: {file_url}")
return file_url
except Exception as e:
logger.error(f"Error uploading metadata to Hugging Face: {str(e)}")
raise
|