""" Data Logger Module for DReamMachine Handles data storage to both local JSON files and HuggingFace Datasets """ import os import json import logging from datetime import datetime from pathlib import Path from typing import Dict, List, Any, Optional import yaml from datasets import Dataset, DatasetDict, load_dataset from huggingface_hub import HfApi, create_repo logger = logging.getLogger(__name__) class DataLogger: """Manages logging of dream sessions to local files and HuggingFace Datasets""" def __init__(self, config_path: str = "config.yaml", hf_token: Optional[str] = None): """ Initialize Data Logger Args: config_path: Path to configuration file hf_token: HuggingFace API token """ # Load configuration with open(config_path, 'r') as f: self.config = yaml.safe_load(f) # Logging settings logging_config = self.config.get('logging', {}) self.output_format = logging_config.get('output_format', 'json') self.chunk_size = logging_config.get('chunk_size', 100) self.log_directory = Path(logging_config.get('log_directory', './logs')) self.save_to_hf = logging_config.get('save_to_hf_dataset', True) # HuggingFace settings hf_config = self.config.get('huggingface', {}) self.dataset_name = hf_config.get('dataset_name', 'dreammachine-logs') self.dataset_private = hf_config.get('dataset_private', True) self.hf_token = hf_token or os.getenv('HF_TOKEN') # Create log directory self.log_directory.mkdir(parents=True, exist_ok=True) # Session tracking self.current_session_data = [] self.session_count = 0 # Initialize HuggingFace API if self.save_to_hf and self.hf_token: self.hf_api = HfApi(token=self.hf_token) self.hf_username = self.hf_api.whoami()['name'] self.full_dataset_name = f"{self.hf_username}/{self.dataset_name}" else: self.hf_api = None self.full_dataset_name = None logger.info(f"DataLogger initialized. Logs will be saved to {self.log_directory}") def initialize_hf_dataset(self) -> bool: """ Initialize or verify HuggingFace dataset exists Returns: True if successful, False otherwise """ if not self.save_to_hf or not self.hf_api: logger.warning("HuggingFace dataset saving is disabled") return False try: # Check if dataset already exists try: logger.info(f"Checking for existing dataset: {self.full_dataset_name}") dataset = load_dataset(self.full_dataset_name, token=self.hf_token) logger.info(f"Found existing dataset: {self.full_dataset_name}") return True except Exception: # Dataset doesn't exist, create it logger.info(f"Creating new dataset: {self.full_dataset_name}") # Create empty initial dataset initial_data = { 'session_id': [], 'timestamp': [], 'life_stage': [], 'dream_outputs': [], 'pitch_narrative': [], 'technical_components': [], 'feasibility_report': [], 'curator_scorecard': [], 'reforge_flag': [] } dataset = Dataset.from_dict(initial_data) # Push to hub dataset.push_to_hub( self.full_dataset_name, private=self.dataset_private, token=self.hf_token ) logger.info(f"Successfully created dataset: {self.full_dataset_name}") return True except Exception as e: logger.error(f"Failed to initialize HuggingFace dataset: {str(e)}") return False def log_session_data(self, session_data: Dict[str, Any]) -> str: """ Log a complete dream session Args: session_data: Dictionary containing all session information Returns: Session ID """ # Add timestamp and session ID session_id = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{self.session_count}" session_data['session_id'] = session_id session_data['timestamp'] = datetime.now().isoformat() # Save to local JSON self._save_to_local_json(session_data) # Save to HuggingFace dataset if self.save_to_hf: self._save_to_hf_dataset(session_data) # Add to current session data self.current_session_data.append(session_data) self.session_count += 1 # Check if we need to chunk if len(self.current_session_data) >= self.chunk_size: self._save_chunk() logger.info(f"Logged session: {session_id}") return session_id def _save_to_local_json(self, session_data: Dict[str, Any]) -> None: """Save session data to local JSON file""" try: session_id = session_data.get('session_id', 'unknown') filename = self.log_directory / f"{session_id}.json" with open(filename, 'w', encoding='utf-8') as f: json.dump(session_data, f, indent=2, ensure_ascii=False) logger.debug(f"Saved session to {filename}") except Exception as e: logger.error(f"Failed to save to local JSON: {str(e)}") def _save_to_hf_dataset(self, session_data: Dict[str, Any]) -> None: """Append session data to HuggingFace dataset""" if not self.hf_api: return try: # Load existing dataset dataset = load_dataset(self.full_dataset_name, split='train', token=self.hf_token) # Convert session data to dataset row format new_row = { 'session_id': [session_data.get('session_id', '')], 'timestamp': [session_data.get('timestamp', '')], 'life_stage': [session_data.get('life_stage', '')], 'dream_outputs': [json.dumps(session_data.get('dream_outputs', []))], 'pitch_narrative': [session_data.get('pitch_narrative', '')], 'technical_components': [session_data.get('technical_components', '')], 'feasibility_report': [session_data.get('feasibility_report', '')], 'curator_scorecard': [json.dumps(session_data.get('curator_scorecard', {}))], 'reforge_flag': [session_data.get('curator_scorecard', {}).get('reforge_flag', False)] } # Create new dataset with appended row new_dataset = Dataset.from_dict(new_row) # Concatenate datasets from datasets import concatenate_datasets updated_dataset = concatenate_datasets([dataset, new_dataset]) # Push updated dataset updated_dataset.push_to_hub( self.full_dataset_name, private=self.dataset_private, token=self.hf_token ) logger.debug(f"Saved session to HuggingFace dataset") except Exception as e: logger.error(f"Failed to save to HuggingFace dataset: {str(e)}") def _save_chunk(self) -> None: """Save accumulated session data as a chunk file""" if not self.current_session_data: return try: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') chunk_file = self.log_directory / f"chunk_{timestamp}.json" with open(chunk_file, 'w', encoding='utf-8') as f: json.dump(self.current_session_data, f, indent=2, ensure_ascii=False) logger.info(f"Saved chunk with {len(self.current_session_data)} sessions to {chunk_file}") self.current_session_data = [] except Exception as e: logger.error(f"Failed to save chunk: {str(e)}") def retrieve_past_data(self, session_id: str) -> Optional[Dict[str, Any]]: """ Retrieve data from a past session Args: session_id: ID of the session to retrieve Returns: Session data dictionary or None if not found """ # Try local file first local_file = self.log_directory / f"{session_id}.json" if local_file.exists(): try: with open(local_file, 'r', encoding='utf-8') as f: data = json.load(f) logger.info(f"Retrieved session {session_id} from local storage") return data except Exception as e: logger.error(f"Failed to load local session: {str(e)}") # Try HuggingFace dataset if self.save_to_hf and self.hf_api: try: dataset = load_dataset(self.full_dataset_name, split='train', token=self.hf_token) # Find matching session for row in dataset: if row['session_id'] == session_id: logger.info(f"Retrieved session {session_id} from HuggingFace dataset") return { 'session_id': row['session_id'], 'timestamp': row['timestamp'], 'life_stage': row['life_stage'], 'dream_outputs': json.loads(row['dream_outputs']), 'pitch_narrative': row['pitch_narrative'], 'technical_components': row['technical_components'], 'feasibility_report': row['feasibility_report'], 'curator_scorecard': json.loads(row['curator_scorecard']) } except Exception as e: logger.error(f"Failed to retrieve from HuggingFace: {str(e)}") logger.warning(f"Session {session_id} not found") return None def get_all_sessions(self) -> List[Dict[str, Any]]: """ Retrieve all logged sessions Returns: List of all session data """ sessions = [] # Load from local JSON files for json_file in self.log_directory.glob("session_*.json"): try: with open(json_file, 'r', encoding='utf-8') as f: sessions.append(json.load(f)) except Exception as e: logger.error(f"Failed to load {json_file}: {str(e)}") logger.info(f"Retrieved {len(sessions)} sessions from local storage") return sessions def get_reforge_sessions(self) -> List[Dict[str, Any]]: """ Get all sessions that have reforge_flag = True Returns: List of sessions eligible for next stage """ all_sessions = self.get_all_sessions() reforge_sessions = [ s for s in all_sessions if s.get('curator_scorecard', {}).get('reforge_flag', False) ] logger.info(f"Found {len(reforge_sessions)} reforge-eligible sessions") return reforge_sessions # Convenience function def create_logger(config_path: str = "config.yaml", hf_token: Optional[str] = None) -> DataLogger: """Create and return a configured DataLogger""" return DataLogger(config_path, hf_token)