Spaces:
Sleeping
Sleeping
| from datasets import Dataset, load_dataset, concatenate_datasets | |
| from huggingface_hub import HfApi, HfFolder | |
| import logging | |
| import os | |
| from typing import Optional, Dict, List | |
| from src.api.services.embedding_service import EmbeddingService | |
| from src.api.exceptions import ( | |
| DatasetNotFoundError, | |
| DatasetPushError, | |
| DatasetDeleteError, | |
| ) | |
| # Set up structured logging | |
| logging.basicConfig( | |
| level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class HuggingFaceService: | |
| def __init__(self, hf_token: Optional[str] = None): | |
| """Initialize the HuggingFaceService with an optional token.""" | |
| self.hf_api = HfApi() | |
| if hf_token: | |
| HfFolder.save_token(hf_token) # Save the token for authentication | |
| async def push_to_hub(self, dataset: Dataset, dataset_name: str) -> None: | |
| """Push the dataset to Hugging Face Hub.""" | |
| try: | |
| logger.info(f"Creating Hugging Face Dataset: {dataset_name}...") | |
| dataset.push_to_hub(dataset_name) | |
| logger.info(f"Dataset pushed to Hugging Face Hub: {dataset_name}") | |
| except Exception as e: | |
| logger.error(f"Failed to push dataset to Hugging Face Hub: {e}") | |
| raise DatasetPushError(f"Failed to push dataset: {e}") | |
| async def read_dataset(self, dataset_name: str) -> Optional[Dataset]: | |
| """Read a dataset from Hugging Face Hub.""" | |
| try: | |
| logger.info(f"Loading dataset from Hugging Face Hub: {dataset_name}...") | |
| dataset = load_dataset( | |
| dataset_name, | |
| keep_in_memory=True, | |
| download_mode="force_redownload", | |
| ) | |
| return dataset["train"] | |
| except Exception as e: | |
| logger.error(f"Failed to read dataset: {e}") | |
| raise DatasetNotFoundError(f"Dataset not found: {e}") | |
| async def update_dataset( | |
| self, | |
| dataset_name: str, | |
| updates: Dict[str, List], | |
| target_column: str, | |
| output_column: str = "embeddings", | |
| ) -> Optional[Dataset]: | |
| """Update a dataset on Hugging Face Hub by generating embeddings for new data and concatenating it with the existing dataset.""" | |
| try: | |
| # Step 1: Load the existing dataset from Hugging Face Hub | |
| logger.info( | |
| f"Loading existing dataset from Hugging Face Hub: {dataset_name}..." | |
| ) | |
| existing_dataset = await self.read_dataset(dataset_name) | |
| # Step 2: Convert the new updates into a Dataset | |
| logger.info("Converting updates to Dataset...") | |
| new_dataset = Dataset.from_dict(updates) | |
| # Step 3: Generate embeddings for the new data | |
| logger.info("Generating embeddings for the new data...") | |
| embedding_service = EmbeddingService( | |
| openai_api_key=os.getenv("OPENAI_API_KEY") | |
| ) # Get the embedding service | |
| new_dataset = await embedding_service.create_embeddings( | |
| new_dataset, target_column, output_column | |
| ) | |
| # Step 4: Concatenate the existing Dataset with the new Dataset | |
| logger.info("Concatenating existing dataset with new data...") | |
| updated_dataset = concatenate_datasets([existing_dataset, new_dataset]) | |
| # Step 5: Push the updated dataset back to Hugging Face Hub | |
| logger.info( | |
| f"Pushing updated dataset to Hugging Face Hub: {dataset_name}..." | |
| ) | |
| await self.push_to_hub(updated_dataset, dataset_name) | |
| return updated_dataset | |
| except Exception as e: | |
| logger.error(f"Failed to update dataset: {e}") | |
| raise DatasetPushError(f"Failed to update dataset: {e}") | |
| async def delete_dataset(self, dataset_name: str) -> None: | |
| """Delete a dataset from Hugging Face Hub.""" | |
| try: | |
| logger.info(f"Deleting dataset from Hugging Face Hub: {dataset_name}...") | |
| self.hf_api.delete_repo(repo_id=dataset_name, repo_type="dataset") | |
| logger.info(f"Dataset deleted from Hugging Face Hub: {dataset_name}") | |
| except Exception as e: | |
| logger.error(f"Failed to delete dataset: {e}") | |
| raise DatasetDeleteError(f"Failed to delete dataset: {e}") | |
| async def delete_rows_from_dataset(self, dataset_name: str, key_column: str, keys_to_delete: List[str]): | |
| """ | |
| Loads a dataset, filters out rows based on a list of keys in a specific column, and pushes it back. | |
| """ | |
| if not keys_to_delete: | |
| return | |
| # Step 1: Load the existing dataset | |
| logger.info(f"Loading dataset {dataset_name} to delete rows.") | |
| dataset = await self.read_dataset(dataset_name) | |
| # Step 2 : Filter the dataset to EXCLUDE the rows with the given product_types | |
| logger.info(f"Filtering out rows where column {key_column} is in {keys_to_delete}") | |
| initial_row_count = len(dataset) | |
| filtered_dataset = dataset.filter(lambda element: element[key_column] not in keys_to_delete) | |
| final_row_count = len(filtered_dataset) | |
| logger.info(f"{initial_row_count - final_row_count} rows deleted.") | |
| # Step 3 : Push the modified dataset back to the hub | |
| await self.push_to_hub(filtered_dataset, dataset_name) | |
| return filtered_dataset | |