Spaces:
Runtime error
Runtime error
| """ | |
| Data Logger Module for DReamMachine | |
| Handles data storage to both local JSON files and HuggingFace Datasets | |
| """ | |
| import os | |
| import json | |
| import logging | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Optional | |
| import yaml | |
| from datasets import Dataset, DatasetDict, load_dataset | |
| from huggingface_hub import HfApi, create_repo | |
| logger = logging.getLogger(__name__) | |
| class DataLogger: | |
| """Manages logging of dream sessions to local files and HuggingFace Datasets""" | |
| def __init__(self, config_path: str = "config.yaml", hf_token: Optional[str] = None): | |
| """ | |
| Initialize Data Logger | |
| Args: | |
| config_path: Path to configuration file | |
| hf_token: HuggingFace API token | |
| """ | |
| # Load configuration | |
| with open(config_path, 'r') as f: | |
| self.config = yaml.safe_load(f) | |
| # Logging settings | |
| logging_config = self.config.get('logging', {}) | |
| self.output_format = logging_config.get('output_format', 'json') | |
| self.chunk_size = logging_config.get('chunk_size', 100) | |
| self.log_directory = Path(logging_config.get('log_directory', './logs')) | |
| self.save_to_hf = logging_config.get('save_to_hf_dataset', True) | |
| # HuggingFace settings | |
| hf_config = self.config.get('huggingface', {}) | |
| self.dataset_name = hf_config.get('dataset_name', 'dreammachine-logs') | |
| self.dataset_private = hf_config.get('dataset_private', True) | |
| self.hf_token = hf_token or os.getenv('HF_TOKEN') | |
| # Create log directory | |
| self.log_directory.mkdir(parents=True, exist_ok=True) | |
| # Session tracking | |
| self.current_session_data = [] | |
| self.session_count = 0 | |
| # Initialize HuggingFace API | |
| if self.save_to_hf and self.hf_token: | |
| self.hf_api = HfApi(token=self.hf_token) | |
| self.hf_username = self.hf_api.whoami()['name'] | |
| self.full_dataset_name = f"{self.hf_username}/{self.dataset_name}" | |
| else: | |
| self.hf_api = None | |
| self.full_dataset_name = None | |
| logger.info(f"DataLogger initialized. Logs will be saved to {self.log_directory}") | |
| def initialize_hf_dataset(self) -> bool: | |
| """ | |
| Initialize or verify HuggingFace dataset exists | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| if not self.save_to_hf or not self.hf_api: | |
| logger.warning("HuggingFace dataset saving is disabled") | |
| return False | |
| try: | |
| # Check if dataset already exists | |
| try: | |
| logger.info(f"Checking for existing dataset: {self.full_dataset_name}") | |
| dataset = load_dataset(self.full_dataset_name, token=self.hf_token) | |
| logger.info(f"Found existing dataset: {self.full_dataset_name}") | |
| return True | |
| except Exception: | |
| # Dataset doesn't exist, create it | |
| logger.info(f"Creating new dataset: {self.full_dataset_name}") | |
| # Create empty initial dataset | |
| initial_data = { | |
| 'session_id': [], | |
| 'timestamp': [], | |
| 'life_stage': [], | |
| 'dream_outputs': [], | |
| 'pitch_narrative': [], | |
| 'technical_components': [], | |
| 'feasibility_report': [], | |
| 'curator_scorecard': [], | |
| 'reforge_flag': [] | |
| } | |
| dataset = Dataset.from_dict(initial_data) | |
| # Push to hub | |
| dataset.push_to_hub( | |
| self.full_dataset_name, | |
| private=self.dataset_private, | |
| token=self.hf_token | |
| ) | |
| logger.info(f"Successfully created dataset: {self.full_dataset_name}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to initialize HuggingFace dataset: {str(e)}") | |
| return False | |
| def log_session_data(self, session_data: Dict[str, Any]) -> str: | |
| """ | |
| Log a complete dream session | |
| Args: | |
| session_data: Dictionary containing all session information | |
| Returns: | |
| Session ID | |
| """ | |
| # Add timestamp and session ID | |
| session_id = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{self.session_count}" | |
| session_data['session_id'] = session_id | |
| session_data['timestamp'] = datetime.now().isoformat() | |
| # Save to local JSON | |
| self._save_to_local_json(session_data) | |
| # Save to HuggingFace dataset | |
| if self.save_to_hf: | |
| self._save_to_hf_dataset(session_data) | |
| # Add to current session data | |
| self.current_session_data.append(session_data) | |
| self.session_count += 1 | |
| # Check if we need to chunk | |
| if len(self.current_session_data) >= self.chunk_size: | |
| self._save_chunk() | |
| logger.info(f"Logged session: {session_id}") | |
| return session_id | |
| def _save_to_local_json(self, session_data: Dict[str, Any]) -> None: | |
| """Save session data to local JSON file""" | |
| try: | |
| session_id = session_data.get('session_id', 'unknown') | |
| filename = self.log_directory / f"{session_id}.json" | |
| with open(filename, 'w', encoding='utf-8') as f: | |
| json.dump(session_data, f, indent=2, ensure_ascii=False) | |
| logger.debug(f"Saved session to {filename}") | |
| except Exception as e: | |
| logger.error(f"Failed to save to local JSON: {str(e)}") | |
| def _save_to_hf_dataset(self, session_data: Dict[str, Any]) -> None: | |
| """Append session data to HuggingFace dataset""" | |
| if not self.hf_api: | |
| return | |
| try: | |
| # Load existing dataset | |
| dataset = load_dataset(self.full_dataset_name, split='train', token=self.hf_token) | |
| # Convert session data to dataset row format | |
| new_row = { | |
| 'session_id': [session_data.get('session_id', '')], | |
| 'timestamp': [session_data.get('timestamp', '')], | |
| 'life_stage': [session_data.get('life_stage', '')], | |
| 'dream_outputs': [json.dumps(session_data.get('dream_outputs', []))], | |
| 'pitch_narrative': [session_data.get('pitch_narrative', '')], | |
| 'technical_components': [session_data.get('technical_components', '')], | |
| 'feasibility_report': [session_data.get('feasibility_report', '')], | |
| 'curator_scorecard': [json.dumps(session_data.get('curator_scorecard', {}))], | |
| 'reforge_flag': [session_data.get('curator_scorecard', {}).get('reforge_flag', False)] | |
| } | |
| # Create new dataset with appended row | |
| new_dataset = Dataset.from_dict(new_row) | |
| # Concatenate datasets | |
| from datasets import concatenate_datasets | |
| updated_dataset = concatenate_datasets([dataset, new_dataset]) | |
| # Push updated dataset | |
| updated_dataset.push_to_hub( | |
| self.full_dataset_name, | |
| private=self.dataset_private, | |
| token=self.hf_token | |
| ) | |
| logger.debug(f"Saved session to HuggingFace dataset") | |
| except Exception as e: | |
| logger.error(f"Failed to save to HuggingFace dataset: {str(e)}") | |
| def _save_chunk(self) -> None: | |
| """Save accumulated session data as a chunk file""" | |
| if not self.current_session_data: | |
| return | |
| try: | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| chunk_file = self.log_directory / f"chunk_{timestamp}.json" | |
| with open(chunk_file, 'w', encoding='utf-8') as f: | |
| json.dump(self.current_session_data, f, indent=2, ensure_ascii=False) | |
| logger.info(f"Saved chunk with {len(self.current_session_data)} sessions to {chunk_file}") | |
| self.current_session_data = [] | |
| except Exception as e: | |
| logger.error(f"Failed to save chunk: {str(e)}") | |
| def retrieve_past_data(self, session_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Retrieve data from a past session | |
| Args: | |
| session_id: ID of the session to retrieve | |
| Returns: | |
| Session data dictionary or None if not found | |
| """ | |
| # Try local file first | |
| local_file = self.log_directory / f"{session_id}.json" | |
| if local_file.exists(): | |
| try: | |
| with open(local_file, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| logger.info(f"Retrieved session {session_id} from local storage") | |
| return data | |
| except Exception as e: | |
| logger.error(f"Failed to load local session: {str(e)}") | |
| # Try HuggingFace dataset | |
| if self.save_to_hf and self.hf_api: | |
| try: | |
| dataset = load_dataset(self.full_dataset_name, split='train', token=self.hf_token) | |
| # Find matching session | |
| for row in dataset: | |
| if row['session_id'] == session_id: | |
| logger.info(f"Retrieved session {session_id} from HuggingFace dataset") | |
| return { | |
| 'session_id': row['session_id'], | |
| 'timestamp': row['timestamp'], | |
| 'life_stage': row['life_stage'], | |
| 'dream_outputs': json.loads(row['dream_outputs']), | |
| 'pitch_narrative': row['pitch_narrative'], | |
| 'technical_components': row['technical_components'], | |
| 'feasibility_report': row['feasibility_report'], | |
| 'curator_scorecard': json.loads(row['curator_scorecard']) | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to retrieve from HuggingFace: {str(e)}") | |
| logger.warning(f"Session {session_id} not found") | |
| return None | |
| def get_all_sessions(self) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve all logged sessions | |
| Returns: | |
| List of all session data | |
| """ | |
| sessions = [] | |
| # Load from local JSON files | |
| for json_file in self.log_directory.glob("session_*.json"): | |
| try: | |
| with open(json_file, 'r', encoding='utf-8') as f: | |
| sessions.append(json.load(f)) | |
| except Exception as e: | |
| logger.error(f"Failed to load {json_file}: {str(e)}") | |
| logger.info(f"Retrieved {len(sessions)} sessions from local storage") | |
| return sessions | |
| def get_reforge_sessions(self) -> List[Dict[str, Any]]: | |
| """ | |
| Get all sessions that have reforge_flag = True | |
| Returns: | |
| List of sessions eligible for next stage | |
| """ | |
| all_sessions = self.get_all_sessions() | |
| reforge_sessions = [ | |
| s for s in all_sessions | |
| if s.get('curator_scorecard', {}).get('reforge_flag', False) | |
| ] | |
| logger.info(f"Found {len(reforge_sessions)} reforge-eligible sessions") | |
| return reforge_sessions | |
| # Convenience function | |
| def create_logger(config_path: str = "config.yaml", hf_token: Optional[str] = None) -> DataLogger: | |
| """Create and return a configured DataLogger""" | |
| return DataLogger(config_path, hf_token) | |