""" Dataset Extractor for Janus. Streams datasets from HF Hub, extracts facts and patterns, converts to knowledge entries for the knowledge base. """ import json import logging import time from typing import Dict, List, Any, Optional logger = logging.getLogger(__name__) class DatasetExtractor: """ Extracts knowledge from HF datasets and converts to knowledge entries. Works with streaming datasets to avoid full downloads. """ def __init__(self): self.extraction_stats = { "total_datasets_processed": 0, "total_entries_extracted": 0, "last_extraction": None, } def extract_facts_from_samples(self, samples: List[Dict], topic: str) -> List[Dict]: """ Extract knowledge facts from dataset samples. Args: samples: List of dataset records topic: Topic for the extracted knowledge Returns: List of knowledge entry dicts """ if not samples: return [] facts = [] for sample in samples: # Extract text fields text_fields = self._extract_text_fields(sample) for field_name, text in text_fields.items(): if len(text) > 50: # Only meaningful content facts.append( { "text": text[:1000], "source": f"dataset:{topic}", "topic": topic, "field": field_name, "timestamp": time.time(), "confidence": 0.7, # Dataset quality varies } ) self.extraction_stats["total_entries_extracted"] += len(facts) self.extraction_stats["total_datasets_processed"] += 1 self.extraction_stats["last_extraction"] = time.time() logger.info(f"Extracted {len(facts)} facts from {topic} dataset") return facts def extract_instruction_pairs(self, samples: List[Dict]) -> List[Dict]: """ Extract instruction-tuning pairs from dataset samples. Args: samples: List of dataset records Returns: List of {"instruction", "input", "output"} dicts """ pairs = [] for sample in samples: pair = self._convert_to_instruction_pair(sample) if pair: pairs.append(pair) logger.info( f"Extracted {len(pairs)} instruction pairs from {len(samples)} samples" ) return pairs def _extract_text_fields(self, sample: Dict) -> Dict[str, str]: """Extract all text fields from a sample.""" text_fields = {} for key, value in sample.items(): if isinstance(value, str) and len(value) > 50: text_fields[key] = value elif isinstance(value, list): # Join list items if they're strings text_items = [str(item) for item in value if isinstance(item, str)] if text_items: combined = " ".join(text_items) if len(combined) > 50: text_fields[key] = combined return text_fields def _convert_to_instruction_pair(self, sample: Dict) -> Optional[Dict]: """ Convert a dataset sample to instruction-tuning format. Handles common dataset formats. """ # Common field patterns for instruction tuning instruction_fields = ["instruction", "prompt", "question", "query", "input"] output_fields = ["output", "response", "answer", "completion", "target"] instruction = None output = None # Find instruction for field in instruction_fields: if ( field in sample and isinstance(sample[field], str) and len(sample[field]) > 10 ): instruction = sample[field] break # Find output for field in output_fields: if ( field in sample and isinstance(sample[field], str) and len(sample[field]) > 10 ): output = sample[field] break # Handle nested formats (e.g., conversations) if instruction is None and "conversations" in sample: convos = sample["conversations"] if isinstance(convos, list) and len(convos) >= 2: # First message is instruction, second is output if isinstance(convos[0], dict): instruction = convos[0].get("value", "") if isinstance(convos[1], dict): output = convos[1].get("value", "") # Handle Q&A format if instruction is None and "question" in sample: instruction = sample["question"] if "answer" in sample: output = sample["answer"] if instruction and output: return { "instruction": instruction[:2000], "input": "", "output": output[:2000], "source": "dataset_extract", } return None def get_extraction_stats(self) -> Dict: """Get extraction statistics.""" return self.extraction_stats.copy() dataset_extractor = DatasetExtractor()