| import json |
| import logging |
| import datasets |
| from datasets import load_dataset, get_dataset_config_names, get_dataset_infos |
| from huggingface_hub import HfApi |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| class DatasetCommandCenter: |
| def __init__(self, token=None): |
| self.token = token |
|
|
| def get_dataset_metadata(self, dataset_id): |
| """ |
| Step 1: Get available Configs (subsets) and Splits without downloading data. |
| """ |
| try: |
| |
| try: |
| configs = get_dataset_config_names(dataset_id, token=self.token) |
| except Exception: |
| |
| configs = ['default'] |
|
|
| |
| |
| selected_config = configs[0] |
| |
| try: |
| |
| infos = get_dataset_infos(dataset_id, token=self.token) |
| |
| if selected_config in infos: |
| splits = list(infos[selected_config].splits.keys()) |
| else: |
| |
| splits = list(infos.values())[0].splits.keys() |
| except: |
| |
| splits = ['train', 'test', 'validation'] |
|
|
| return { |
| "status": "success", |
| "configs": configs, |
| "splits": list(splits) |
| } |
| except Exception as e: |
| return {"status": "error", "message": str(e)} |
|
|
| def get_splits_for_config(self, dataset_id, config_name): |
| """ |
| Helper to update splits when user changes the Config dropdown |
| """ |
| try: |
| infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token) |
| splits = list(infos[config_name].splits.keys()) |
| return {"status": "success", "splits": splits} |
| except Exception as e: |
| |
| return {"status": "success", "splits": ['train', 'test', 'validation']} |
|
|
| def inspect_dataset(self, dataset_id, config, split): |
| """ |
| Step 2: Stream actual rows and detect JSON. |
| """ |
| try: |
| |
| conf = config if config != 'default' else None |
| |
| ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token) |
| |
| sample_rows = [] |
| for i, row in enumerate(ds_stream): |
| if i >= 5: break |
| |
| clean_row = {} |
| for k, v in row.items(): |
| if not isinstance(v, (str, int, float, bool, list, dict, type(None))): |
| clean_row[k] = str(v) |
| else: |
| clean_row[k] = v |
| sample_rows.append(clean_row) |
|
|
| if not sample_rows: |
| return {"status": "error", "message": "Dataset is empty."} |
|
|
| |
| analysis = {} |
| keys = sample_rows[0].keys() |
| |
| for k in keys: |
| sample_val = sample_rows[0][k] |
| col_type = type(sample_val).__name__ |
| is_json_str = False |
| |
| |
| if isinstance(sample_val, str): |
| s = sample_val.strip() |
| if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')): |
| try: |
| json.loads(s) |
| is_json_str = True |
| except: |
| pass |
| |
| analysis[k] = { |
| "type": col_type, |
| "is_json_string": is_json_str |
| } |
|
|
| return { |
| "status": "success", |
| "samples": sample_rows, |
| "analysis": analysis |
| } |
| except Exception as e: |
| return {"status": "error", "message": str(e)} |
|
|
| def _apply_transformations(self, row, recipe): |
| """ |
| Apply Parsing, Renaming, Dropping, Filtering |
| """ |
| new_row = row.copy() |
| |
| |
| if "json_expansions" in recipe: |
| for item in recipe["json_expansions"]: |
| col_name = item["col"] |
| target_keys = item["keys"] |
| |
| |
| source_data = new_row.get(col_name) |
| |
| parsed_obj = None |
| |
| |
| if isinstance(source_data, dict): |
| parsed_obj = source_data |
| |
| elif isinstance(source_data, str): |
| try: |
| parsed_obj = json.loads(source_data) |
| except: |
| pass |
| |
| if parsed_obj: |
| for key in target_keys: |
| |
| val = parsed_obj |
| parts = key.split('.') |
| try: |
| for p in parts: |
| val = val[p] |
| |
| |
| clean_key = key.replace('.', '_') |
| new_col_name = f"{col_name}_{clean_key}" |
| new_row[new_col_name] = val |
| except: |
| |
| clean_key = key.replace('.', '_') |
| new_row[f"{col_name}_{clean_key}"] = None |
|
|
| |
| if "renames" in recipe: |
| for old, new in recipe["renames"].items(): |
| if old in new_row: |
| new_row[new] = new_row.pop(old) |
|
|
| |
| if "drops" in recipe: |
| for drop_col in recipe["drops"]: |
| if drop_col in new_row: |
| del new_row[drop_col] |
|
|
| return new_row |
|
|
| def _passes_filter(self, row, filters): |
| if not filters: return True |
| context = row.copy() |
| for f_str in filters: |
| try: |
| |
| if not eval(f_str, {}, context): |
| return False |
| except: |
| return False |
| return True |
|
|
| def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None): |
| logger.info(f"Starting job: {source_id} ({config}/{split}) -> {target_id}") |
| |
| conf = config if config != 'default' else None |
| |
| def gen(): |
| ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token) |
| count = 0 |
| for row in ds_stream: |
| if max_rows and count >= int(max_rows): |
| break |
| |
| |
| |
| |
| |
| |
| trans_row = self._apply_transformations(row, recipe) |
| |
| if self._passes_filter(trans_row, recipe.get("filters", [])): |
| yield trans_row |
| count += 1 |
|
|
| |
| |
| |
| try: |
| new_dataset = datasets.Dataset.from_generator(gen) |
| new_dataset.push_to_hub(target_id, token=self.token) |
| return {"status": "success", "rows_processed": len(new_dataset)} |
| except Exception as e: |
| logger.error(e) |
| raise e |
| |
| def preview_transform(self, dataset_id, config, split, recipe): |
| conf = config if config != 'default' else None |
| ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token) |
| processed = [] |
| for i, row in enumerate(ds_stream): |
| if len(processed) >= 5: break |
| trans_row = self._apply_transformations(row, recipe) |
| if self._passes_filter(trans_row, recipe.get("filters", [])): |
| processed.append(trans_row) |
| return processed |