"""Hugging Face dataset uploader for final outputs.""" import json import logging from pathlib import Path from datetime import datetime from huggingface_hub import HfApi, CommitOperationAdd from config import settings logger = logging.getLogger(__name__) class HFUploader: """Handles uploading final outputs to Hugging Face dataset.""" def __init__(self): """Initialize the Hugging Face uploader.""" self.api = HfApi() self.token = settings.huggingface_token self.dataset_id = settings.huggingface_dataset logger.info(f"Initialized HF uploader for dataset: {self.dataset_id}") def upload_final_output(self, final_data: dict, run_id: str) -> str: """Upload final output to Hugging Face dataset. Args: final_data: The final processed data from the pipeline run_id: Unique identifier for this pipeline run Returns: URL of the uploaded file """ try: # Prepare the data upload_data = { "run_id": run_id, "timestamp": datetime.now().isoformat(), "final_output": final_data, } # Create filename filename = f"output_{run_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" # Convert to JSON string json_content = json.dumps(upload_data, indent=2) # Create commit operation commit_operation = CommitOperationAdd( path_in_repo=f"outputs/{filename}", path_or_fileobj=json_content.encode("utf-8"), ) # Upload to dataset commit_info = self.api.create_commit( repo_id=self.dataset_id, repo_type="dataset", operations=[commit_operation], commit_message=f"Pipeline output: {run_id}", token=self.token, ) file_url = f"https://huggingface.co/datasets/{self.dataset_id}/blob/main/outputs/{filename}" logger.info(f"Successfully uploaded to HF: {file_url}") return file_url except Exception as e: logger.error(f"Error uploading to Hugging Face: {str(e)}") raise def upload_pipeline_metadata(self, metadata: dict) -> str: """Upload pipeline metadata to Hugging Face dataset. Args: metadata: Pipeline metadata including all agent outputs Returns: URL of the uploaded metadata file """ try: # Create filename timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"metadata_{timestamp}.json" # Convert to JSON string json_content = json.dumps(metadata, indent=2) # Create commit operation commit_operation = CommitOperationAdd( path_in_repo=f"metadata/{filename}", path_or_fileobj=json_content.encode("utf-8"), ) # Upload to dataset commit_info = self.api.create_commit( repo_id=self.dataset_id, repo_type="dataset", operations=[commit_operation], commit_message=f"Pipeline metadata: {timestamp}", token=self.token, ) file_url = f"https://huggingface.co/datasets/{self.dataset_id}/blob/main/metadata/{filename}" logger.info(f"Successfully uploaded metadata to HF: {file_url}") return file_url except Exception as e: logger.error(f"Error uploading metadata to Hugging Face: {str(e)}") raise