Spaces:
Runtime error
Runtime error
| """ | |
| dsp_bridge.py - SPCW Digital Signal Processing Bridge | |
| Unified pipeline: Source → Bake → Transmit → Reconstruct → Display | |
| PARALLEL WAVE ARCHITECTURE: | |
| - Each tile is a "Wave" with a designated endpoint | |
| - Waves transmit in parallel via ThreadPoolExecutor | |
| - Smaller waves = faster transmission (more parallelism) | |
| - Every wave has fractal address = endpoint for reconstruction | |
| This is the transmission backbone. No intermediate files. | |
| Direct memory-to-memory wave transport. | |
| """ | |
| import time | |
| import math | |
| import numpy as np | |
| import cv2 | |
| import struct | |
| import threading | |
| from queue import Queue | |
| from typing import Optional, Callable, Tuple, List, Dict | |
| from dataclasses import dataclass, field | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from .logos_core import ( | |
| calculate_heat_code, | |
| pack_atom, | |
| unpack_atom, | |
| prime_harmonizer, | |
| PAYLOAD_SIZE, | |
| ATOM_SIZE, | |
| META_SIZE, | |
| ) | |
| from .network import SHARED_NETWORK | |
| from .baker import BreadBaker | |
| from .fractal_engine import LogosFractalEngine | |
| # ============================================================ | |
| # ATOM STRUCTURE | |
| # ============================================================ | |
| METADATA_SIZE = 8 # [img_w:2B][img_h:2B][tile_row:1B][tile_col:1B][grid_rows:1B][grid_cols:1B] | |
| def encode_tile_metadata(width: int, height: int, tile_row: int, tile_col: int, | |
| grid_rows: int, grid_cols: int) -> bytes: | |
| """Encode tile metadata into first 8 bytes of payload""" | |
| return struct.pack('>HHBBBB', width, height, tile_row, tile_col, grid_rows, grid_cols) | |
| def decode_tile_metadata(payload: bytes) -> Tuple[int, int, int, int, int, int]: | |
| """Decode tile metadata from payload""" | |
| if len(payload) < 8: | |
| return (0, 0, 0, 0, 0, 0) | |
| return struct.unpack('>HHBBBB', payload[:8]) | |
| # ============================================================ | |
| # WAVE & TRANSMISSION STATS | |
| # ============================================================ | |
| class WaveStats: | |
| """Stats for a single wave (tile)""" | |
| wave_id: int = 0 | |
| tile_row: int = 0 | |
| tile_col: int = 0 | |
| atoms: int = 0 | |
| bytes: int = 0 | |
| tx_time_ms: float = 0.0 | |
| rx_time_ms: float = 0.0 | |
| endpoint: int = 0 # Heat code (fractal address) | |
| class TransmissionStats: | |
| """Real-time transmission statistics""" | |
| atoms_sent: int = 0 | |
| atoms_received: int = 0 | |
| bytes_transmitted: int = 0 | |
| elapsed_ms: float = 0.0 | |
| throughput_mbps: float = 0.0 | |
| ssim: float = 0.0 | |
| tiles_complete: int = 0 | |
| total_tiles: int = 0 | |
| waves: List[WaveStats] = field(default_factory=list) | |
| parallel_waves: int = 0 | |
| def progress(self) -> float: | |
| if self.total_tiles == 0: | |
| return 0.0 | |
| return self.tiles_complete / self.total_tiles | |
| def avg_wave_time_ms(self) -> float: | |
| if not self.waves: | |
| return 0.0 | |
| return sum(w.tx_time_ms for w in self.waves) / len(self.waves) | |
| # ============================================================ | |
| # DSP BRIDGE - PARALLEL WAVE ARCHITECTURE | |
| # ============================================================ | |
| class DSPBridge: | |
| """ | |
| Digital Signal Processing Bridge for SPCW Transport. | |
| AUTOMATIC WAVE ARCHITECTURE: | |
| - Image divided into 512x512 CHUNKS | |
| - Each chunk subdivided into 8x8 = 64 WAVES | |
| - Total waves = (chunks_x * 8) × (chunks_y * 8) | |
| - Example: 4096x4096 → 8x8 chunks → 64x64 = 4096 waves | |
| Pipeline: | |
| 1. Ingest source → 2. Auto-chunk → 3. Parallel encode | |
| 4. Parallel decode → 5. Verify SSIM → 6. Display | |
| """ | |
| WINDOW_NAME = "SPCW Live Transport" | |
| CHUNK_SIZE = 512 # Each chunk is 512x512 | |
| WAVES_PER_CHUNK = 8 # 8x8 = 64 waves per chunk | |
| def __init__(self, grid_size: Optional[int] = None, num_workers: int = 64, | |
| viewport_size: Tuple[int, int] = (1280, 720)): | |
| """ | |
| Initialize DSP Bridge. | |
| AES-256 Adaptive Grid Support. | |
| """ | |
| self.num_workers = num_workers | |
| self.viewport_size = viewport_size | |
| self.forced_grid_size = grid_size | |
| self.grid_size = grid_size if grid_size else 8 | |
| # Use Shared Network Instance (Optimization) | |
| self.network = SHARED_NETWORK | |
| self.baker = BreadBaker() | |
| # Wave buffers: wave_id -> list of atoms | |
| self.wave_buffers: Dict[int, List[bytes]] = {} | |
| # State | |
| self.source_image: Optional[np.ndarray] = None | |
| self.canvas: Optional[np.ndarray] = None | |
| self.canvas_width = 0 | |
| self.canvas_height = 0 | |
| self.tile_w = 0 | |
| self.tile_h = 0 | |
| # Stats | |
| self.stats = TransmissionStats() | |
| # Control | |
| self._stop_flag = False | |
| self._is_running = False | |
| # Callbacks | |
| self.on_stats_update: Optional[Callable[[TransmissionStats], None]] = None | |
| def _tile_to_quadtree_path(self, tile_row: int, tile_col: int) -> List[int]: | |
| """Convert tile position to quadtree path for heat code""" | |
| path = [] | |
| r_start, r_end = 0, self.grid_size | |
| c_start, c_end = 0, self.grid_size | |
| for _ in range(16): | |
| if r_end - r_start <= 1 and c_end - c_start <= 1: | |
| break | |
| r_mid = (r_start + r_end) // 2 | |
| c_mid = (c_start + c_end) // 2 | |
| in_bottom = tile_row >= r_mid if r_mid < r_end else False | |
| in_right = tile_col >= c_mid if c_mid < c_end else False | |
| quadrant = (int(in_bottom) << 1) | int(in_right) | |
| path.append(quadrant) | |
| if in_bottom: | |
| r_start = r_mid | |
| else: | |
| r_end = r_mid | |
| if in_right: | |
| c_start = c_mid | |
| else: | |
| c_end = c_mid | |
| return path | |
| def _encode_wave(self, wave_id: int, tile_row: int, tile_col: int, | |
| tile_data: np.ndarray) -> Tuple[int, List[bytes], WaveStats]: | |
| """ | |
| Encode a single wave (tile) into atoms. | |
| This is a PURE FUNCTION - can run in parallel. | |
| Returns: (wave_id, list of atoms, wave stats) | |
| """ | |
| start_time = time.perf_counter() | |
| # Compute fractal endpoint (heat code) from tile position | |
| path = self._tile_to_quadtree_path(tile_row, tile_col) | |
| heat_code = calculate_heat_code(path) | |
| # Validate against instantiated Prime Network | |
| # "Discrete wave transmission" aligned with topology | |
| is_manifold_aligned = self.network.validate_wave(heat_code) | |
| # Select domain based on topological alignment | |
| # Manifold aligned -> medium (standard) | |
| # Off-manifold -> small (constrained/filtered) | |
| domain_key = "medium" if is_manifold_aligned else "small" | |
| # Build metadata | |
| meta = encode_tile_metadata( | |
| self.canvas_width, self.canvas_height, | |
| tile_row, tile_col, | |
| self.grid_size, self.grid_size | |
| ) | |
| # Flatten tile pixels REMOVED - Using Fractal Compression | |
| # Encode with BreadBaker (Fractal Compression) | |
| # We pass empty prefix so atoms use LOCAL addressing (relative to tile) | |
| atom_defs = self.baker.bake(tile_data, prefix_path=[]) | |
| # Calculate payload capacity | |
| PIXEL_DATA_SIZE = PAYLOAD_SIZE - META_SIZE - METADATA_SIZE | |
| # Encode atoms | |
| atoms = [] | |
| chunk_idx = 0 | |
| for atom_def in atom_defs: | |
| local_path = atom_def['path_bits'] | |
| payload_data = atom_def['payload'] | |
| # Calculate LOCAL heat code for the atom header | |
| atom_heat_code = calculate_heat_code(local_path) | |
| # Build payload: metadata + compressed data | |
| # Note: We append tile metadata to EVERY atom currently, which is overhead | |
| # but required for stateless decoding if packets drop. | |
| # Optimization: Only first atom needs it? | |
| # But we stick to protocol: metadata first. | |
| payload = meta + payload_data | |
| # Pack atom | |
| atom = pack_atom(atom_heat_code, payload, domain_key=domain_key, gap_id=chunk_idx) | |
| atoms.append(atom) | |
| chunk_idx += 1 | |
| elapsed = (time.perf_counter() - start_time) * 1000 | |
| wave_stats = WaveStats( | |
| wave_id=wave_id, | |
| tile_row=tile_row, | |
| tile_col=tile_col, | |
| atoms=len(atoms), | |
| bytes=len(atoms) * ATOM_SIZE, | |
| tx_time_ms=elapsed, | |
| endpoint=heat_code | |
| ) | |
| return wave_id, atoms, wave_stats | |
| def _decode_wave(self, wave_id: int, atoms: List[bytes]) -> Tuple[int, int, int, np.ndarray, WaveStats]: | |
| """ | |
| Decode a wave from atoms back to tile pixels. | |
| This is a PURE FUNCTION - can run in parallel. | |
| VECTORIZED: Uses numpy reshape instead of per-pixel loop. | |
| Returns: (wave_id, tile_row, tile_col, tile_pixels, wave_stats) | |
| """ | |
| start_time = time.perf_counter() | |
| # Unpack first atom to get metadata | |
| if not atoms: | |
| # Handle empty atoms - return blank tile | |
| elapsed = (time.perf_counter() - start_time) * 1000 | |
| wave_stats = WaveStats( | |
| wave_id=wave_id, atoms=0, bytes=0, rx_time_ms=elapsed | |
| ) | |
| return wave_id, 0, 0, np.zeros((1, 1, 3), dtype=np.uint8), wave_stats | |
| first = unpack_atom(atoms[0]) | |
| heat_code, payload, _, _ = first | |
| img_w, img_h, tile_row, tile_col, grid_rows, grid_cols = decode_tile_metadata(payload) | |
| # Calculate tile dimensions | |
| tile_h = math.ceil(img_h / grid_rows) | |
| tile_w = math.ceil(img_w / grid_cols) | |
| y0 = tile_row * tile_h | |
| x0 = tile_col * tile_w | |
| actual_h = min(tile_h, img_h - y0) | |
| actual_w = min(tile_w, img_w - x0) | |
| # Initialize Fractal Engine for this tile (Local Scope) | |
| fractal_engine = LogosFractalEngine(min_bucket_size=1) # High res | |
| # Process atoms | |
| # Each atom contains local instruction for the quadrant | |
| for atom in atoms: | |
| hc, pl, _, _ = unpack_atom(atom) | |
| # Skip metadata bytes [0:8] to get Baker payload | |
| # Baker payload: [Control/Data...] | |
| baker_payload = pl[METADATA_SIZE:] | |
| # Helper: hex string of heat code | |
| hex_str = f"{hc:08x}" | |
| fractal_engine.process_atom(hex_str, baker_payload) | |
| # Draw Tile Result | |
| tile = fractal_engine.draw_viewport((actual_w, actual_h)) | |
| elapsed = (time.perf_counter() - start_time) * 1000 | |
| wave_stats = WaveStats( | |
| wave_id=wave_id, | |
| tile_row=tile_row, | |
| tile_col=tile_col, | |
| atoms=len(atoms), | |
| bytes=len(atoms) * ATOM_SIZE, | |
| rx_time_ms=elapsed, | |
| endpoint=heat_code | |
| ) | |
| return wave_id, tile_row, tile_col, tile, wave_stats | |
| def _parallel_encode(self) -> Dict[int, List[bytes]]: | |
| """ | |
| Parallel wave encoding - all waves encode simultaneously. | |
| Returns: dict mapping wave_id -> list of atoms | |
| """ | |
| h, w = self.source_image.shape[:2] | |
| self.canvas_width = w | |
| self.canvas_height = h | |
| self.tile_h = math.ceil(h / self.grid_size) | |
| self.tile_w = math.ceil(w / self.grid_size) | |
| total_waves = self.grid_size * self.grid_size | |
| self.stats.total_tiles = total_waves | |
| self.stats.parallel_waves = min(self.num_workers, total_waves) | |
| # Prepare wave tasks | |
| tasks = [] | |
| wave_id = 0 | |
| for tr in range(self.grid_size): | |
| for tc in range(self.grid_size): | |
| # Extract tile region | |
| y0 = tr * self.tile_h | |
| y1 = min(h, y0 + self.tile_h) | |
| x0 = tc * self.tile_w | |
| x1 = min(w, x0 + self.tile_w) | |
| tile = self.source_image[y0:y1, x0:x1, :].copy() | |
| tasks.append((wave_id, tr, tc, tile)) | |
| wave_id += 1 | |
| # Parallel encode all waves | |
| wave_atoms: Dict[int, List[bytes]] = {} | |
| with ThreadPoolExecutor(max_workers=self.num_workers) as executor: | |
| futures = { | |
| executor.submit(self._encode_wave, wid, tr, tc, tile): wid | |
| for wid, tr, tc, tile in tasks | |
| } | |
| for future in as_completed(futures): | |
| wid, atoms, wave_stat = future.result() | |
| wave_atoms[wid] = atoms | |
| self.stats.atoms_sent += len(atoms) | |
| self.stats.bytes_transmitted += len(atoms) * ATOM_SIZE | |
| self.stats.waves.append(wave_stat) | |
| return wave_atoms | |
| def _parallel_decode(self, wave_atoms: Dict[int, List[bytes]]): | |
| """ | |
| Parallel wave decoding - all waves decode simultaneously. | |
| """ | |
| # Initialize canvas | |
| self.canvas = np.zeros((self.canvas_height, self.canvas_width, 3), dtype=np.uint8) | |
| # Parallel decode all waves | |
| with ThreadPoolExecutor(max_workers=self.num_workers) as executor: | |
| futures = { | |
| executor.submit(self._decode_wave, wid, atoms): wid | |
| for wid, atoms in wave_atoms.items() | |
| } | |
| for future in as_completed(futures): | |
| wid, tile_row, tile_col, tile, wave_stat = future.result() | |
| # Place tile at designated endpoint | |
| y0 = tile_row * self.tile_h | |
| x0 = tile_col * self.tile_w | |
| h, w = tile.shape[:2] | |
| self.canvas[y0:y0+h, x0:x0+w] = tile | |
| self.stats.atoms_received += wave_stat.atoms | |
| self.stats.tiles_complete += 1 | |
| def _calculate_ssim(self) -> float: | |
| """Calculate SSIM between source and reconstructed""" | |
| if self.source_image is None or self.canvas is None: | |
| return 0.0 | |
| if self.source_image.shape != self.canvas.shape: | |
| return 0.0 | |
| # Exact match check | |
| if np.array_equal(self.source_image, self.canvas): | |
| return 1.0 | |
| # MSE-based approximation | |
| gray_src = cv2.cvtColor(self.source_image, cv2.COLOR_RGB2GRAY) | |
| gray_dst = cv2.cvtColor(self.canvas, cv2.COLOR_RGB2GRAY) | |
| mse = np.mean((gray_src.astype(float) - gray_dst.astype(float)) ** 2) | |
| if mse == 0: | |
| return 1.0 | |
| psnr = 10 * np.log10(255.0 ** 2 / mse) | |
| return min(1.0, psnr / 60.0) | |
| def _calculate_grid(self, width: int, height: int) -> int: | |
| """ | |
| Auto-calculate optimal grid size based on image dimensions. | |
| Strategy: | |
| - Divide image into 512x512 chunks | |
| - Each chunk gets 8x8 = 64 waves | |
| - Minimum 8x8 grid, scales up for larger images | |
| """ | |
| chunks_x = max(1, math.ceil(width / self.CHUNK_SIZE)) | |
| chunks_y = max(1, math.ceil(height / self.CHUNK_SIZE)) | |
| # Grid = chunks × waves_per_chunk_side | |
| grid_x = chunks_x * self.WAVES_PER_CHUNK | |
| grid_y = chunks_y * self.WAVES_PER_CHUNK | |
| # Use the larger dimension for square grid | |
| grid_size = max(grid_x, grid_y) | |
| # Cap at reasonable maximum for memory | |
| grid_size = min(grid_size, 256) | |
| return grid_size | |
| def transmit(self, source_path: str, show_window: bool = True) -> TransmissionStats: | |
| """ | |
| Main transmission method using AUTO WAVE ARCHITECTURE. | |
| Grid size is automatically determined: | |
| - 512x512 chunks with 64 waves each | |
| - Scales with image size | |
| Args: | |
| source_path: Path to source image | |
| show_window: Show display window (clean, no stats overlay) | |
| Returns: | |
| TransmissionStats with final metrics | |
| """ | |
| self._stop_flag = False | |
| self._is_running = True | |
| self.stats = TransmissionStats() # Reset stats | |
| # Load source | |
| img = cv2.imread(source_path) | |
| if img is None: | |
| raise ValueError(f"Could not load: {source_path}") | |
| self.source_image = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| h, w = self.source_image.shape[:2] | |
| # AUTO-CALCULATE grid based on image size (unless forced) | |
| if self.forced_grid_size: | |
| self.grid_size = self.forced_grid_size | |
| else: | |
| self.grid_size = self._calculate_grid(w, h) | |
| total_waves = self.grid_size * self.grid_size | |
| chunks_x = math.ceil(w / self.CHUNK_SIZE) | |
| chunks_y = math.ceil(h / self.CHUNK_SIZE) | |
| print(f"[DSP] Source: {w}x{h}") | |
| print(f"[DSP] Chunks: {chunks_x}x{chunks_y} ({chunks_x * chunks_y} @ 512x512)") | |
| print(f"[DSP] Waves: {self.grid_size}x{self.grid_size} = {total_waves}") | |
| print(f"[DSP] Workers: {self.num_workers} parallel") | |
| start_time = time.perf_counter() | |
| # PHASE 1: Parallel encode all waves | |
| encode_start = time.perf_counter() | |
| wave_atoms = self._parallel_encode() | |
| encode_time = (time.perf_counter() - encode_start) * 1000 | |
| # PHASE 2: Parallel decode all waves | |
| decode_start = time.perf_counter() | |
| self._parallel_decode(wave_atoms) | |
| decode_time = (time.perf_counter() - decode_start) * 1000 | |
| # Calculate final stats | |
| elapsed = time.perf_counter() - start_time | |
| self.stats.elapsed_ms = elapsed * 1000 | |
| self.stats.throughput_mbps = (self.stats.bytes_transmitted / (1024 * 1024)) / elapsed if elapsed > 0 else 0 | |
| self.stats.ssim = self._calculate_ssim() | |
| print(f"[DSP] Encode: {encode_time:.1f}ms | Decode: {decode_time:.1f}ms") | |
| print(f"[DSP] Transmitted: {self.stats.atoms_sent} atoms ({total_waves} waves)") | |
| print(f"[DSP] Time: {self.stats.elapsed_ms:.1f}ms") | |
| print(f"[DSP] Throughput: {self.stats.throughput_mbps:.2f} MB/s") | |
| print(f"[DSP] Avg wave: {self.stats.avg_wave_time_ms:.2f}ms") | |
| print(f"[DSP] SSIM: {self.stats.ssim:.6f} {'(PERFECT)' if self.stats.ssim == 1.0 else ''}") | |
| self._is_running = False | |
| # Display | |
| if show_window and self.canvas is not None: | |
| self._show_window() | |
| return self.stats | |
| def _show_window(self): | |
| """Display clean result window - no stats overlay (stats go to launcher)""" | |
| try: | |
| cv2.namedWindow(self.WINDOW_NAME, cv2.WINDOW_NORMAL) | |
| cv2.resizeWindow(self.WINDOW_NAME, self.viewport_size[0], self.viewport_size[1]) | |
| except Exception as e: | |
| print(f"[WARN] Could not create window: {e}") | |
| return | |
| print(f"\n[CONTROLS] S: Side-by-side | Q: Quit") | |
| show_comparison = False | |
| try: | |
| while not self._stop_flag: | |
| if show_comparison and self.source_image is not None: | |
| # Side-by-side: Original | Reconstructed | |
| h = max(self.source_image.shape[0], self.canvas.shape[0]) | |
| w = self.source_image.shape[1] + self.canvas.shape[1] + 4 | |
| frame = np.zeros((h, w, 3), dtype=np.uint8) | |
| frame[:self.source_image.shape[0], :self.source_image.shape[1]] = self.source_image | |
| frame[:self.canvas.shape[0], self.source_image.shape[1]+4:] = self.canvas | |
| # Scale to fit viewport | |
| scale = min(self.viewport_size[0] / w, self.viewport_size[1] / h) | |
| frame = cv2.resize(frame, (int(w * scale), int(h * scale)), | |
| interpolation=cv2.INTER_NEAREST) | |
| else: | |
| # Just reconstructed - CLEAN, no overlay | |
| scale = min(self.viewport_size[0] / self.canvas_width, | |
| self.viewport_size[1] / self.canvas_height) | |
| frame = cv2.resize(self.canvas, | |
| (int(self.canvas_width * scale), int(self.canvas_height * scale)), | |
| interpolation=cv2.INTER_NEAREST) | |
| # Convert and display - NO TEXT OVERLAY | |
| display = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) | |
| cv2.imshow(self.WINDOW_NAME, display) | |
| key = cv2.waitKey(30) | |
| # Check if window was closed | |
| try: | |
| if cv2.getWindowProperty(self.WINDOW_NAME, cv2.WND_PROP_VISIBLE) < 1: | |
| break | |
| except: | |
| break | |
| if key == ord('q') or key == 27: | |
| break | |
| elif key == ord('s'): | |
| show_comparison = not show_comparison | |
| finally: | |
| try: | |
| cv2.destroyWindow(self.WINDOW_NAME) | |
| cv2.waitKey(1) | |
| except: | |
| pass | |
| def stop(self): | |
| """Stop transmission""" | |
| self._stop_flag = True | |
| def get_canvas(self) -> Optional[np.ndarray]: | |
| """Get reconstructed image""" | |
| return self.canvas | |
| def save_output(self, path: str): | |
| """Save reconstructed image""" | |
| if self.canvas is not None: | |
| cv2.imwrite(path, cv2.cvtColor(self.canvas, cv2.COLOR_RGB2BGR)) | |
| print(f"[DSP] Saved: {path}") | |
| # ============================================================ | |
| # CLI | |
| # ============================================================ | |
| def main(): | |
| import argparse | |
| parser = argparse.ArgumentParser( | |
| description="SPCW DSP Bridge - Unified Transmission Pipeline", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| python dsp_bridge.py image.png # Transmit and display | |
| python dsp_bridge.py image.png -o output.png # Save reconstruction | |
| python dsp_bridge.py image.png --grid 16 # 16x16 tile grid | |
| Controls: S: Side-by-side comparison | Q: Quit | |
| """ | |
| ) | |
| parser.add_argument("source", help="Source image path") | |
| parser.add_argument("-o", "--output", help="Save reconstructed image") | |
| parser.add_argument("--grid", type=int, default=8, help="Grid size (default: 8)") | |
| parser.add_argument("--workers", type=int, default=8, help="Parallel workers (default: 8)") | |
| parser.add_argument("--viewport", nargs=2, type=int, default=[1280, 720], | |
| metavar=("W", "H"), help="Viewport size") | |
| parser.add_argument("--no-display", action="store_true", help="No display window") | |
| args = parser.parse_args() | |
| bridge = DSPBridge( | |
| grid_size=args.grid, | |
| num_workers=args.workers, | |
| viewport_size=tuple(args.viewport) | |
| ) | |
| try: | |
| stats = bridge.transmit(args.source, show_window=not args.no_display) | |
| if args.output: | |
| bridge.save_output(args.output) | |
| # Exit code based on SSIM | |
| if stats.ssim < 1.0: | |
| print(f"[WARN] Lossy transmission: SSIM={stats.ssim:.6f}") | |
| except Exception as e: | |
| print(f"[ERROR] {e}") | |
| raise | |
| if __name__ == "__main__": | |
| main() | |