""" Dataset Loader para MGC v5.0 Sistema automático para descobrir estrutura e carregar múltiplos datasets do Hugging Face """ import requests import json from typing import Dict, List, Optional, Any from datasets import load_dataset from collections import defaultdict class DatasetStructureDiscover: """Descobre automaticamente a estrutura de qualquer dataset do Hugging Face""" def __init__(self): self.api_base = "https://datasets-server.huggingface.co" def get_splits(self, dataset_name: str) -> Dict: """Obtém splits e configurações do dataset""" url = f"{self.api_base}/splits?dataset={dataset_name}" try: response = requests.get(url, timeout=30) response.raise_for_status() data = response.json() return data except Exception as e: return {"error": str(e), "splits": []} def get_first_rows(self, dataset_name: str, split: str = "train", config: str = None, limit: int = 5) -> Dict: """Obtém as primeiras linhas para entender a estrutura""" if config: url = f"{self.api_base}/first-rows?dataset={dataset_name}&config={config}&split={split}" else: url = f"{self.api_base}/first-rows?dataset={dataset_name}&split={split}" try: response = requests.get(url, timeout=30) response.raise_for_status() data = response.json() return data except Exception as e: return {"error": str(e)} def discover_schema(self, dataset_name: str) -> Dict: """ Descobre o schema completo do dataset: - Colunas disponíveis - Tipos de dados - Candidatos a texto - Candidatos a label """ print(f"\n🔍 Descobrindo estrutura do dataset: {dataset_name}") print("-" * 50) # 1. Obtém splits splits_info = self.get_splits(dataset_name) if "error" in splits_info: return {"error": splits_info["error"], "name": dataset_name} splits = splits_info.get("splits", []) if not splits: return {"error": "No splits found", "name": dataset_name} # 2. Usa o primeiro split disponível para inspecionar first_split = splits[0] config_name = first_split.get("config", "default") split_name = first_split.get("split", "train") # 3. Obtém primeiras linhas rows_info = self.get_first_rows(dataset_name, split_name, config_name, 10) if "error" in rows_info: return {"error": rows_info["error"], "name": dataset_name} # 4. Analisa as colunas - CORREÇÃO AQUI features = rows_info.get("features", {}) # Se features for uma lista, converte para dicionário if isinstance(features, list): features_dict = {} for feat in features: if isinstance(feat, dict): feat_name = feat.get("name", "unknown") features_dict[feat_name] = feat features = features_dict rows = rows_info.get("rows", []) # 5. Identifica colunas de texto e label text_candidates = [] label_candidates = [] if isinstance(features, dict): for col_name, col_info in features.items(): col_type = col_info.get("type", "unknown") if isinstance(col_info, dict) else "unknown" # Colunas de texto comuns if any(keyword in col_name.lower() for keyword in ["text", "sentence", "content", "body", "instruction", "input", "question", "premise", "hypothesis"]): text_candidates.append({"name": col_name, "type": col_type, "reason": "nome sugestivo"}) elif col_type in ["string", "text", "large_string"]: text_candidates.append({"name": col_name, "type": col_type, "reason": "tipo string"}) # Colunas de label comuns if any(keyword in col_name.lower() for keyword in ["label", "class", "category", "sentiment", "output", "response", "target"]): label_candidates.append({"name": col_name, "type": col_type, "reason": "nome sugestivo"}) # 6. Amostra dos dados sample_rows = [] for row in rows[:3]: row_data = row.get("row", {}) sample_rows.append(row_data) schema = { "name": dataset_name, "splits": [{"config": s.get("config"), "split": s.get("split")} for s in splits[:5]], "features": features, "text_candidates": text_candidates, "label_candidates": label_candidates, "sample_rows": sample_rows, "total_splits": len(splits) } print(f" ✅ {len(splits)} splits encontrados") print(f" 📝 Colunas: {list(features.keys()) if isinstance(features, dict) else 'N/A'}") print(f" 📖 Candidatos a texto: {[c['name'] for c in text_candidates]}") print(f" 🏷️ Candidatos a label: {[c['name'] for c in label_candidates]}") return schema class MGCDataLoader: """ Carrega múltiplos datasets do Hugging Face para a MGC Estratégia: 1 dataset base grande + datasets complementares """ def __init__(self): self.discover = DatasetStructureDiscover() self.datasets_info = {} self.loaded_data = [] def get_dataset_structure(self, dataset_name: str) -> Dict: """Interface pública para descobrir estrutura""" return self.discover.discover_schema(dataset_name) def load_dataset_with_auto_detection(self, dataset_name: str, limite: int = None) -> List[Dict]: """ Carrega dataset automaticamente detectando as colunas corretas """ print(f"\n📥 Carregando dataset: {dataset_name}") # Descobre estrutura schema = self.discover.discover_schema(dataset_name) if "error" in schema: print(f" ❌ Erro: {schema['error']}") return [] # Tenta carregar o dataset try: # Pega o primeiro split disponível if schema["splits"]: first_split = schema["splits"][0] config = first_split.get("config") split = first_split.get("split", "train") if config and config != "default": dataset = load_dataset(dataset_name, config, split=split, trust_remote_code=True) else: dataset = load_dataset(dataset_name, split=split, trust_remote_code=True) else: dataset = load_dataset(dataset_name, split="train", trust_remote_code=True) # Limita tamanho if limite and len(dataset) > limite: dataset = dataset.select(range(limite)) # Detecta colunas corretas text_col = None label_col = None column_names = dataset.column_names # Prioridade para colunas de texto for col in schema["text_candidates"]: if col["name"] in column_names: text_col = col["name"] break # Se não achou, pega primeira coluna string if not text_col: for col in column_names: try: if dataset[col][0] and isinstance(dataset[col][0], str): text_col = col break except: pass # Detecta coluna de label for col in schema["label_candidates"]: if col["name"] in column_names: label_col = col["name"] break # Se não achou label, tenta coluna 'label' if not label_col and 'label' in column_names: label_col = 'label' # Converte para formato MGC data_list = [] for item in dataset: texto = "" label = "" if text_col: texto = str(item[text_col])[:500] if label_col: label = str(item[label_col])[:100] else: # Se não tem label, usa o nome do dataset como categoria label = dataset_name.split("/")[-1] if texto and len(texto) > 10: data_list.append({ "entrada": texto, "saida": label, "fonte": dataset_name }) print(f" ✅ Carregado {len(data_list)} exemplos") print(f" 📝 Coluna de texto: {text_col}") print(f" 🏷️ Coluna de label: {label_col}") return data_list except Exception as e: print(f" ❌ Erro ao carregar: {str(e)}") return [] def carregar_dataset_base(self, limite: int = 5000) -> List[Dict]: """Carrega o dataset base (Canarim - 300k+ instruções em PT-BR)""" print("\n" + "="*60) print("🚀 CARREGANDO DATASET BASE (MILHÕES DE TOKENS)") print("="*60) return self.load_dataset_with_auto_detection( "dominguesm/Canarim-Instruct-PTBR-Dataset", limite=limite ) def carregar_datasets_complementares(self, limites: Dict[str, int] = None) -> List[Dict]: """Carrega datasets complementares pequenos""" print("\n" + "="*60) print("📚 CARREGANDO DATASETS COMPLEMENTARES") print("="*60) if limites is None: limites = { "assin2": 3000, "hatebr": 3000, "fake_br": 3000, "portuguese_hate_speech": 3000 } datasets_complementares = [ {"name": "assin2", "desc": "Similaridade Semântica"}, {"name": "hatebr", "desc": "Discurso de Ódio"}, {"name": "fake_br", "desc": "Fake News"}, {"name": "portuguese_hate_speech", "desc": "Discurso de Ódio (Tweets)"} ] all_data = [] for ds in datasets_complementares: nome = ds["name"] desc = ds["desc"] limite = limites.get(nome, 3000) print(f"\n📖 {desc} ({nome})") dados = self.load_dataset_with_auto_detection(nome, limite=limite) all_data.extend(dados) return all_data def carregar_dataset_ingles(self, limite: int = 5000) -> List[Dict]: """Carrega dataset em inglês como complemento (opcional)""" print("\n" + "="*60) print("🇬🇧 CARREGANDO DATASET EM INGLÊS") print("="*60) return self.load_dataset_with_auto_detection("agnews", limite=limite) def carregar_tudo(self, limite_base: int = 5000, carregar_complementares: bool = True, carregar_ingles: bool = False) -> List[Dict]: """Carrega todos os datasets configurados""" todos_dados = [] # 1. Dataset base (Canarim - milhões de tokens) dados_base = self.carregar_dataset_base(limite=limite_base) todos_dados.extend(dados_base) # 2. Datasets complementares em português if carregar_complementares: dados_comp = self.carregar_datasets_complementares() todos_dados.extend(dados_comp) # 3. Dataset em inglês (opcional) if carregar_ingles: dados_eng = self.carregar_dataset_ingles(limite=3000) todos_dados.extend(dados_eng) print("\n" + "="*60) print(f"✅ TOTAL CARREGADO: {len(todos_dados)} exemplos") print("="*60) # Estatísticas por fonte fontes = defaultdict(int) for item in todos_dados: fontes[item["fonte"]] += 1 print("\n📊 Distribuição por dataset:") for fonte, qtd in fontes.items(): print(f" - {fonte}: {qtd}") return todos_dados # ============================================================ # FUNÇÃO PARA INSPEÇÃO MANUAL DE DATASET # ============================================================ def inspecionar_dataset(nome_dataset: str) -> Dict: """Inspeciona a estrutura de qualquer dataset do Hugging Face""" discover = DatasetStructureDiscover() schema = discover.discover_schema(nome_dataset) print("\n" + "="*60) print(f"📋 RELATÓRIO DO DATASET: {nome_dataset}") print("="*60) if "error" in schema: print(f"❌ Erro: {schema['error']}") return schema print(f"\n📂 Splits disponíveis: {schema['total_splits']}") for s in schema['splits'][:5]: print(f" - Config: {s.get('config')}, Split: {s.get('split')}") print(f"\n📝 Colunas: {list(schema.get('features', {}).keys())}") print(f"\n📖 Candidatos para texto:") for c in schema.get('text_candidates', []): print(f" - {c['name']} (tipo: {c['type']}) - {c['reason']}") print(f"\n🏷️ Candidatos para label:") for c in schema.get('label_candidates', []): print(f" - {c['name']} (tipo: {c['type']}) - {c['reason']}") print(f"\n📄 Amostra dos dados:") for i, row in enumerate(schema.get('sample_rows', [])[:2]): print(f"\n Exemplo {i+1}:") for key, value in list(row.items())[:3]: valor_str = str(value)[:100] + "..." if len(str(value)) > 100 else str(value) print(f" {key}: {valor_str}") return schema # ============================================================ # TESTE # ============================================================ if __name__ == "__main__": print("="*60) print("🧠 INSPEÇÃO DE DATASETS PARA MGC v5.0") print("="*60) # Testar inspeção inspecionar_dataset("assin2")