import json import logging import datasets from datasets import load_dataset, get_dataset_config_names, get_dataset_infos from huggingface_hub import HfApi, DatasetCard, DatasetCardData logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class DatasetCommandCenter: def __init__(self, token=None): self.token = token def get_dataset_metadata(self, dataset_id): try: try: configs = get_dataset_config_names(dataset_id, token=self.token) except: configs = ['default'] try: infos = get_dataset_infos(dataset_id, token=self.token) first_conf = configs[0] if first_conf in infos: splits = list(infos[first_conf].splits.keys()) else: splits = list(infos.values())[0].splits.keys() except: splits = ['train', 'test', 'validation'] # license try: configs = get_dataset_config_names(dataset_id, token=self.token) except: configs = ['default'] license_name = "unknown" try: infos = get_dataset_infos(dataset_id, token=self.token) # Try to grab license from first config first = list(infos.values())[0] license_name = first.license or "unknown" except: pass return { "status": "success", "configs": configs, # We assume user will pick splits later, just return configs + license hint "license_detected": license_name } except Exception as e: return {"status": "error", "message": str(e)} def get_splits_for_config(self, dataset_id, config_name): try: infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token) splits = list(infos[config_name].splits.keys()) return {"status": "success", "splits": splits} except: return {"status": "success", "splits": ['train', 'test', 'validation']} # --- HELPER: Recursive JSON/Dot Notation Getter --- def _get_value_by_path(self, obj, path): if not path: return obj keys = path.split('.') current = obj for key in keys: # Auto-parse JSON string if encountered if isinstance(current, str): s = current.strip() if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')): try: current = json.loads(s) except: pass if isinstance(current, dict) and key in current: current = current[key] else: return None return current # --- HELPER: List Search Logic --- def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path): """ Logic: Look inside row[source_col] (which is a list). Find first item where item[filter_key] == filter_val. Then extract item[target_path]. """ # 1. Get the list (handling JSON string if needed) data = row.get(source_col) if isinstance(data, str): try: data = json.loads(data) except: return None if not isinstance(data, list): return None # 2. Search the list matched_item = None for item in data: # We treat values as strings for comparison to be safe if str(item.get(filter_key, '')) == str(filter_val): matched_item = item break if matched_item: # 3. Extract the target (supporting nested json parsing via dot notation) # e.g. target_path = "content.analysis" return self._get_value_by_path(matched_item, target_path) return None def _flatten_schema(self, obj, parent='', visited=None): if visited is None: visited = set() items = [] # Avoid infinite recursion if id(obj) in visited: return [] visited.add(id(obj)) # Handle JSON strings if isinstance(obj, str): s = obj.strip() if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')): try: obj = json.loads(s) except: pass if isinstance(obj, dict): for k, v in obj.items(): full_key = f"{parent}.{k}" if parent else k items.append((full_key, type(v).__name__)) items.extend(self._flatten_schema(v, full_key, visited)) elif isinstance(obj, list) and len(obj) > 0: # For lists, we just peek at the first item to guess schema full_key = f"{parent}[]" if parent else "[]" items.append((parent, "List")) # Mark the parent as a List items.extend(self._flatten_schema(obj[0], full_key, visited)) return items def inspect_dataset(self, dataset_id, config, split): try: conf = config if config != 'default' else None ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token) sample_rows = [] schema_map = {} # stores { "col_name": { "is_list": bool, "keys": [] } } for i, row in enumerate(ds_stream): if i >= 10: break # Create clean sample for UI clean_row = {} for k, v in row.items(): if not isinstance(v, (str, int, float, bool, list, dict, type(None))): clean_row[k] = str(v) else: clean_row[k] = v sample_rows.append(clean_row) # Analyze Schema for k, v in row.items(): if k not in schema_map: schema_map[k] = {"is_list": False, "keys": set()} # Check if it's a list (or json-string list) val = v if isinstance(val, str): try: val = json.loads(val) except: pass if isinstance(val, list): schema_map[k]["is_list"] = True if len(val) > 0 and isinstance(val[0], dict): schema_map[k]["keys"].update(val[0].keys()) elif isinstance(val, dict): schema_map[k]["keys"].update(val.keys()) # Format schema for UI formatted_schema = {} for k, info in schema_map.items(): formatted_schema[k] = { "type": "List" if info["is_list"] else "Object", "keys": list(info["keys"]) } return { "status": "success", "samples": sample_rows, "schema": formatted_schema, "dataset_id": dataset_id } except Exception as e: return {"status": "error", "message": str(e)} def _apply_projection(self, row, recipe): new_row = {} for col_def in recipe['columns']: t_type = col_def.get('type', 'simple') if t_type == 'simple': # Standard Dot Notation new_row[col_def['name']] = self._get_value_by_path(row, col_def['source']) elif t_type == 'list_search': # GET x WHERE y=z val = self._extract_from_list_logic( row, col_def['source'], col_def['filter_key'], col_def['filter_val'], col_def['target_key'] ) new_row[col_def['name']] = val elif t_type == 'python': # Advanced Python Eval try: context = row.copy() # We inject 'json' module into context for user scripts context['json'] = json val = eval(col_def['expression'], {}, context) new_row[col_def['name']] = val except: new_row[col_def['name']] = None return new_row def _passes_filter(self, row, filter_str): if not filter_str: return True try: context = row.copy() return eval(filter_str, {}, context) except: return False def _generate_card(self, source_id, target_id, recipe, license_name): """ Generates a README.md with YAML metadata and a report of operations. """ # 1. YAML Metadata card_data = DatasetCardData( language="en", license=license_name, tags=["dataset-command-center", "etl", "generated-dataset"], base_model=source_id, # Linking source ) # 2. Description & Recipe Table content = f""" # {target_id.split('/')[-1]} This dataset is a transformation of [{source_id}](https://huggingface.co/datasets/{source_id}). It was generated using the **Hugging Face Dataset Command Center**. ## Transformation Recipe The following operations were applied to the source data: | Target Column | Source | Type | Logic / Filter | |---------------|--------|------|----------------| """ for col in recipe['columns']: c_type = col.get('type', 'simple') c_name = col['name'] c_src = col.get('source', '-') if c_type == 'simple': logic = "Direct Mapping" elif c_type == 'list_search': logic = f"Get `{col['target_key']}` where `{col['filter_key']} == {col['filter_val']}`" elif c_type == 'python': logic = f"`{col.get('expression')}`" else: logic = "-" content += f"| **{c_name}** | `{c_src}` | {c_type} | {logic} |\n" if recipe.get('filter_rule'): content += f"\n### Row Filtering\n**Filter Applied:** `{recipe['filter_rule']}`\n" content += f"\n## Original License\nThis dataset inherits the license: `{license_name}` from the source." card = DatasetCard.from_template(card_data, content=content) return card def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None): logger.info(f"Job started: {source_id}") conf = config if config != 'default' else None def gen(): ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token) count = 0 for row in ds_stream: if max_rows and count >= int(max_rows): break if self._passes_filter(row, recipe.get('filter_rule')): yield self._apply_projection(row, recipe) count += 1 try: new_dataset = datasets.Dataset.from_generator(gen) new_dataset.push_to_hub(target_id, token=self.token) # 2. GENERATE & PUSH CARD try: card = self._generate_card(source_id, target_id, recipe, new_license or "unknown") card.push_to_hub(target_id, token=self.token) except Exception as e: logger.warning(f"Could not push dataset card: {e}") return {"status": "success", "rows_processed": len(new_dataset)} except Exception as e: return {"status": "error", "message": str(e)} def preview_transform(self, dataset_id, config, split, recipe): conf = config if config != 'default' else None ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token) processed = [] for row in ds_stream: if len(processed) >= 5: break if self._passes_filter(row, recipe.get('filter_rule')): processed.append(self._apply_projection(row, recipe)) return processed