Spaces:
Runtime error
Runtime error
| """ | |
| LOGOS Playback Interpreter - Main Integration (SPCW Cake/Bake Protocol) | |
| State-based reconstruction engine with persistent canvas state | |
| """ | |
| import sys | |
| import os | |
| import logging | |
| # Optional UI dependencies (PyQt5). If missing, print guidance and exit gracefully. | |
| try: | |
| from PyQt5.QtWidgets import QApplication, QFileDialog, QPushButton, QHBoxLayout | |
| from PyQt5.QtCore import QTimer | |
| except ImportError as e: | |
| print("PyQt5 is required for main.py (legacy PyQt viewer).") | |
| print("Install with: pip install PyQt5") | |
| print("Alternatively, use the Tk launcher: python logos_launcher.py") | |
| sys.exit(1) | |
| from stream_interpreter import StreamInterpreter, ChunkType | |
| from playback_window import PlaybackWindow, StreamHarmonizer | |
| from display_interpreter import LogosDisplayInterpreter, Mode | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('logos_interpreter.log'), | |
| logging.StreamHandler(sys.stdout) | |
| ] | |
| ) | |
| logger = logging.getLogger('Main') | |
| def binary_stream_generator(file_path, chunk_size=512): | |
| """ | |
| Generator that yields 512-byte chunks from binary file | |
| Args: | |
| file_path: Path to binary file | |
| chunk_size: Size of each chunk (must be 512 for SPCW) | |
| Yields: | |
| bytes: 512-byte chunks | |
| """ | |
| if chunk_size != 512: | |
| raise ValueError("SPCW protocol requires 512-byte chunks") | |
| with open(file_path, 'rb') as f: | |
| while True: | |
| chunk = f.read(chunk_size) | |
| if not chunk: | |
| break | |
| # Pad incomplete chunks to 512 bytes | |
| if len(chunk) < chunk_size: | |
| chunk = chunk + b'\x00' * (chunk_size - len(chunk)) | |
| yield chunk | |
| def create_sample_binary_file(output_path, num_atoms=100): | |
| """ | |
| Create a sample binary file for testing | |
| Generates varied patterns including harmonized and non-harmonized heat codes | |
| Args: | |
| output_path: Path to output file | |
| num_atoms: Number of 512-byte atoms to generate | |
| """ | |
| import numpy as np | |
| from stream_interpreter import GLOBAL_SCALAR_PRIME | |
| logger.info(f"Creating sample binary file: {output_path} ({num_atoms} atoms)") | |
| with open(output_path, 'wb') as f: | |
| for i in range(num_atoms): | |
| # Create 512-byte atom | |
| atom = bytearray(512) | |
| # Set heat code (first 4 bytes) | |
| if i % 3 == 0: | |
| # Create harmonized heat code (residue == 0) | |
| # Find a value that mod GLOBAL_SCALAR_PRIME == 0 | |
| harmonized_value = (i * GLOBAL_SCALAR_PRIME) % (2**32) | |
| atom[0:4] = harmonized_value.to_bytes(4, byteorder='big') | |
| else: | |
| # Create non-harmonized heat code (residue != 0) | |
| non_harmonized_value = (i * 12345 + 1) % (2**32) | |
| atom[0:4] = non_harmonized_value.to_bytes(4, byteorder='big') | |
| # Fill wave payload (remaining 508 bytes) with patterns | |
| if i % 3 == 0: | |
| # META: Structured pattern (geometric) | |
| pattern = np.arange(508, dtype=np.uint8) | |
| atom[4:] = pattern.tobytes() | |
| else: | |
| # DELTA: Chaotic pattern (heat/noise) | |
| pattern = np.random.randint(0, 256, size=508, dtype=np.uint8) | |
| atom[4:] = pattern.tobytes() | |
| f.write(bytes(atom)) | |
| logger.info(f"Sample file created: {output_path}") | |
| def process_stream(file_path=None, mode=Mode.STREAMING): | |
| """Main stream processing function with state-based reconstruction""" | |
| # Initialize stream interpreter (for classification) | |
| stream_interpreter = StreamInterpreter( | |
| min_fidelity=256, | |
| max_fidelity=1024, | |
| global_scalar_prime=9973 | |
| ) | |
| # Initialize display interpreter (state reconstruction engine) | |
| display_interpreter = LogosDisplayInterpreter(mode=mode) | |
| harmonizer = StreamHarmonizer() | |
| # Setup PyQt application | |
| app = QApplication(sys.argv) | |
| # Create playback window with display interpreter reference | |
| window = PlaybackWindow( | |
| display_interpreter=display_interpreter, | |
| window_width=None, | |
| window_height=None | |
| ) | |
| window.show() | |
| # Get file path | |
| if file_path is None: | |
| # Check if sample file exists | |
| sample_path = 'sample_logos_stream.bin' | |
| if os.path.exists(sample_path): | |
| file_path = sample_path | |
| logger.info(f"Using existing sample file: {file_path}") | |
| else: | |
| # Create sample file | |
| create_sample_binary_file(sample_path, num_atoms=500) | |
| file_path = sample_path | |
| logger.info(f"Created sample file: {file_path}") | |
| if not file_path or not os.path.exists(file_path): | |
| logger.error("No valid file path provided") | |
| window.status_label.setText("Error: No file selected") | |
| return | |
| # Process stream chunks | |
| stream_gen = binary_stream_generator(file_path, chunk_size=512) | |
| chunk_count = 0 | |
| def process_next_chunk(): | |
| """Process next chunk from stream""" | |
| nonlocal chunk_count | |
| try: | |
| chunk = next(stream_gen) | |
| chunk_count += 1 | |
| # Step 1: Process through stream interpreter (classification) | |
| stream_result = stream_interpreter.process_chunk(chunk) | |
| # Step 2: Feed to display interpreter (state reconstruction) | |
| display_interpreter.process_atom( | |
| stream_result['atom_data'], | |
| stream_result['chunk_type'] | |
| ) | |
| # Step 3: Update viewport display | |
| window.update_display() | |
| # Step 4: Handle synchronization | |
| meta_markers = stream_interpreter.get_synchronization_markers() | |
| if meta_markers and stream_result['chunk_type'].value == "META": | |
| harmonizer.register_meta_marker(meta_markers[-1]) | |
| sync_result = harmonizer.synchronize_buffers( | |
| audio_chunk=chunk if stream_result['chunk_type'].value == "META" else None, | |
| video_chunk=chunk, | |
| data_chunk=chunk, | |
| meta_markers=meta_markers | |
| ) | |
| # Schedule next chunk processing | |
| # In streaming mode: 10 FPS (100ms delay) | |
| # In download mode: as fast as possible (1ms delay) | |
| delay = 100 if mode == Mode.STREAMING else 1 | |
| QTimer.singleShot(delay, process_next_chunk) | |
| except StopIteration: | |
| logger.info(f"Stream processing complete. Processed {chunk_count} atoms.") | |
| # In download mode, export full fidelity frame | |
| if mode == Mode.DOWNLOAD: | |
| try: | |
| full_frame = display_interpreter.get_full_fidelity_frame() | |
| export_path = file_path.replace('.bin', '_export.png') | |
| full_frame.save(export_path) | |
| logger.info(f"Full fidelity frame exported to: {export_path}") | |
| window.status_label.setText( | |
| f"Processing complete! Exported to {export_path}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Export failed: {e}") | |
| window.status_label.setText(f"Stream processing complete ({chunk_count} atoms)") | |
| else: | |
| window.status_label.setText(f"Stream processing complete ({chunk_count} atoms)") | |
| # Start processing | |
| mode_str = "STREAMING" if mode == Mode.STREAMING else "DOWNLOAD" | |
| logger.info(f"Starting LOGOS Playback Interpreter - {mode_str} Mode") | |
| logger.info(f"Processing file: {file_path}") | |
| QTimer.singleShot(100, process_next_chunk) | |
| # Run application | |
| sys.exit(app.exec_()) | |
| if __name__ == "__main__": | |
| import sys | |
| file_path = sys.argv[1] if len(sys.argv) > 1 else None | |
| # Check for mode argument | |
| mode = Mode.STREAMING # Default | |
| if len(sys.argv) > 2: | |
| if sys.argv[2].lower() == 'download': | |
| mode = Mode.DOWNLOAD | |
| logger.info("Running in DOWNLOAD mode (full fidelity export)") | |
| else: | |
| logger.info("Running in STREAMING mode (real-time viewport)") | |
| process_stream(file_path, mode=mode) | |