Spaces:
Sleeping
Sleeping
| """Hugging Face dataset uploader for final outputs.""" | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from datetime import datetime | |
| from huggingface_hub import HfApi, CommitOperationAdd | |
| from config import settings | |
| logger = logging.getLogger(__name__) | |
| class HFUploader: | |
| """Handles uploading final outputs to Hugging Face dataset.""" | |
| def __init__(self): | |
| """Initialize the Hugging Face uploader.""" | |
| self.api = HfApi() | |
| self.token = settings.huggingface_token | |
| self.dataset_id = settings.huggingface_dataset | |
| logger.info(f"Initialized HF uploader for dataset: {self.dataset_id}") | |
| def upload_final_output(self, final_data: dict, run_id: str) -> str: | |
| """Upload final output to Hugging Face dataset. | |
| Args: | |
| final_data: The final processed data from the pipeline | |
| run_id: Unique identifier for this pipeline run | |
| Returns: | |
| URL of the uploaded file | |
| """ | |
| try: | |
| # Prepare the data | |
| upload_data = { | |
| "run_id": run_id, | |
| "timestamp": datetime.now().isoformat(), | |
| "final_output": final_data, | |
| } | |
| # Create filename | |
| filename = f"output_{run_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
| # Convert to JSON string | |
| json_content = json.dumps(upload_data, indent=2) | |
| # Create commit operation | |
| commit_operation = CommitOperationAdd( | |
| path_in_repo=f"outputs/{filename}", | |
| path_or_fileobj=json_content.encode("utf-8"), | |
| ) | |
| # Upload to dataset | |
| commit_info = self.api.create_commit( | |
| repo_id=self.dataset_id, | |
| repo_type="dataset", | |
| operations=[commit_operation], | |
| commit_message=f"Pipeline output: {run_id}", | |
| token=self.token, | |
| ) | |
| file_url = f"https://huggingface.co/datasets/{self.dataset_id}/blob/main/outputs/{filename}" | |
| logger.info(f"Successfully uploaded to HF: {file_url}") | |
| return file_url | |
| except Exception as e: | |
| logger.error(f"Error uploading to Hugging Face: {str(e)}") | |
| raise | |
| def upload_pipeline_metadata(self, metadata: dict) -> str: | |
| """Upload pipeline metadata to Hugging Face dataset. | |
| Args: | |
| metadata: Pipeline metadata including all agent outputs | |
| Returns: | |
| URL of the uploaded metadata file | |
| """ | |
| try: | |
| # Create filename | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"metadata_{timestamp}.json" | |
| # Convert to JSON string | |
| json_content = json.dumps(metadata, indent=2) | |
| # Create commit operation | |
| commit_operation = CommitOperationAdd( | |
| path_in_repo=f"metadata/{filename}", | |
| path_or_fileobj=json_content.encode("utf-8"), | |
| ) | |
| # Upload to dataset | |
| commit_info = self.api.create_commit( | |
| repo_id=self.dataset_id, | |
| repo_type="dataset", | |
| operations=[commit_operation], | |
| commit_message=f"Pipeline metadata: {timestamp}", | |
| token=self.token, | |
| ) | |
| file_url = f"https://huggingface.co/datasets/{self.dataset_id}/blob/main/metadata/{filename}" | |
| logger.info(f"Successfully uploaded metadata to HF: {file_url}") | |
| return file_url | |
| except Exception as e: | |
| logger.error(f"Error uploading metadata to Hugging Face: {str(e)}") | |
| raise | |