import gradio as gr import bz2 import bson import struct from typing import List, Dict, Tuple # ==================== ХЕЛПЕРЫ ==================== def _u8(buf, off): return buf[off] def _u16(buf, off): return struct.unpack_from(' List[int]: if len(partsPos) != bx * by * 3: raise ValueError('partsPos length mismatch') cnt_map = [] for i in range(bx * by): base = i * 3 cnt = (partsPos[base] << 16) | (partsPos[base + 1] << 8) | partsPos[base + 2] cnt_map.append(cnt) return cnt_map # ==================== ЧТЕНИЕ parts ==================== class Particle: __slots__ = ('type', 'x', 'y', 'temp', 'life', 'tmp', 'ctype', 'dcolour', 'vx', 'vy', 'tmp2', 'tmp3', 'tmp4') def __init__(self): self.type: int = 0 self.x: float = 0.0 self.y: float = 0.0 self.temp: float = 0.0 self.life: int = 0 self.tmp: int = 0 self.ctype: int = 0 self.dcolour: int = 0 self.vx: float = 0.0 self.vy: float = 0.0 self.tmp2: int = 0 self.tmp3: int = 0 self.tmp4: int = 0 def __repr__(self): return f'Particle(type={self.type} @{self.x:.1f},{self.y:.1f})' def read_parts(partsData: bytes, partsPosCounts: List[int], bx: int, by: int, cell_size: int = 4) -> List[Particle]: px_per_cell = cell_size parts: List[Particle] = [] off = 0 data_len = len(partsData) for y in range(by): for x in range(bx): cnt = partsPosCounts[y * bx + x] base_x, base_y = x * px_per_cell, y * px_per_cell for _ in range(cnt): if off >= data_len - 3: raise ValueError('truncated partsData') ty = _u8(partsData, off) off += 1 fd = _u16(partsData, off) off += 2 has_fd3 = bool(fd & 0x8000) if has_fd3: if off >= data_len: raise ValueError('no fd3 byte') fd3 = _u8(partsData, off) off += 1 fd |= fd3 << 16 if fd & (1 << 14): ty |= _u8(partsData, off) << 8 off += 1 p = Particle() p.type = ty p.x = float(base_x + px_per_cell // 2) p.y = float(base_y + px_per_cell // 2) if fd & 1: p.temp = float(_u16(partsData, off)) off += 2 else: delta = _i8(partsData, off) off += 1 p.temp = 294.15 + delta if fd & (1 << 1): life = _u8(partsData, off) off += 1 if fd & (1 << 2): life |= _u8(partsData, off) << 8 off += 1 p.life = life if fd & (1 << 3): tmp = _u8(partsData, off) off += 1 if fd & (1 << 4): tmp |= _u8(partsData, off) << 8 off += 1 if fd & (1 << 12): tmp |= _u8(partsData, off) << 24 off += 1 tmp |= _u8(partsData, off) << 16 off += 1 p.tmp = tmp if fd & (1 << 5): ct = _u8(partsData, off) off += 1 if fd & (1 << 9): ct |= _u8(partsData, off) << 24 off += 1 ct |= _u8(partsData, off) << 16 off += 1 ct |= _u8(partsData, off) << 8 off += 1 p.ctype = ct if fd & (1 << 6): p.dcolour = (_u8(partsData, off) << 24) | \ (_u8(partsData, off + 1) << 16) | \ (_u8(partsData, off + 2) << 8) | \ _u8(partsData, off + 3) off += 4 if fd & (1 << 7): p.vx = (_u8(partsData, off) - 127.0) / 16.0 off += 1 if fd & (1 << 8): p.vy = (_u8(partsData, off) - 127.0) / 16.0 off += 1 if fd & (1 << 10): t2 = _u8(partsData, off) off += 1 if fd & (1 << 11): t2 |= _u8(partsData, off) << 8 off += 1 p.tmp2 = t2 if fd & (1 << 13): p.tmp3 = _u16(partsData, off) p.tmp4 = _u16(partsData, off + 2) off += 4 if fd & (1 << 16): p.tmp3 |= _u16(partsData, off) << 16 p.tmp4 |= _u16(partsData, off + 2) << 16 off += 4 parts.append(p) return parts # ==================== ОСТАЛЬНЫЕ MAPS ==================== def read_wall_map(wallMap: bytes, bx: int, by: int) -> List[int]: if len(wallMap) != bx * by: raise ValueError('wallMap length mismatch') return list(wallMap) def read_fan_map(fanMap: bytes, bx: int, by: int) -> List[Tuple[float, float]]: if len(fanMap) != bx * by * 2: raise ValueError('fanMap length mismatch') fan_velocities = [] for i in range(bx * by): base = i * 2 vx = (_u8(fanMap, base) - 127.0) / 64.0 vy = (_u8(fanMap, base + 1) - 127.0) / 64.0 fan_velocities.append((vx, vy)) return fan_velocities def read_press_map(pressMap: bytes, bx: int, by: int) -> List[float]: if len(pressMap) != bx * by * 2: raise ValueError('pressMap length mismatch') pressures = [] for i in range(bx * by): base = i * 2 pressure = ((_u8(pressMap, base) + (_u8(pressMap, base + 1) << 8)) / 128.0) - 256 pressures.append(pressure) return pressures def read_velocity_map(velocityMap: bytes, bx: int, by: int) -> List[float]: if len(velocityMap) != bx * by * 2: raise ValueError('velocityMap length mismatch') velocities = [] for i in range(bx * by): base = i * 2 velocity = ((_u8(velocityMap, base) + (_u8(velocityMap, base + 1) << 8)) / 128.0) - 256 velocities.append(velocity) return velocities def read_ambient_map(ambientMap: bytes, bx: int, by: int) -> List[float]: if len(ambientMap) != bx * by * 2: raise ValueError('ambientMap length mismatch') ambient_temps = [] for i in range(bx * by): base = i * 2 temp = _u16(ambientMap, base) ambient_temps.append(float(temp)) return ambient_temps def read_block_air(blockAir: bytes, bx: int, by: int) -> List[Tuple[int, int]]: if len(blockAir) != bx * by * 2: raise ValueError('blockAir length mismatch') block_air_values = [] for i in range(bx * by): base = i * 2 air = _u8(blockAir, base) airh = _u8(blockAir, base + 1) block_air_values.append((air, airh)) return block_air_values def read_gravity(gravity: bytes, bx: int, by: int) -> List[Tuple[float, float, float, float]]: if len(gravity) != bx * by * 4 * 4: raise ValueError('gravity length mismatch') gravity_values = [] for i in range(bx * by): base = i * 4 * 4 mass = _f32(gravity, base) mask = _u32(gravity, base + 4) fx = _f32(gravity, base + 8) fy = _f32(gravity, base + 12) gravity_values.append((mass, mask, fx, fy)) return gravity_values # ==================== SIGNS ==================== class Sign: __slots__ = ('text', 'x', 'y', 'justification') def __init__(self): self.text: str = "" self.x: int = 0 self.y: int = 0 self.justification: int = 0 def __repr__(self): return f'Sign(text="{self.text}", x={self.x}, y={self.y}, justification={self.justification})' def read_signs(signs: List[Dict]) -> List[Sign]: result = [] for sign_data in signs: sign = Sign() sign.text = sign_data.get('text', "") sign.x = sign_data.get('x', 0) sign.y = sign_data.get('y', 0) sign.justification = sign_data.get('justification', 0) result.append(sign) return result # ==================== MAIN PARSER ==================== def parse_save(doc: dict) -> dict: # Попробуем достать blockSize bx, by = 0, 0 if 'blockSizeX' in doc and 'blockSizeY' in doc: bx, by = doc['blockSizeX'], doc['blockSizeY'] elif 'blockSize' in doc: bs = doc['blockSize'] if isinstance(bs, dict): bx, by = bs.get('X', 0), bs.get('Y', 0) elif isinstance(bs, list) and len(bs) == 2: bx, by = bs[0], bs[1] if not bx or not by: raise ValueError("Missing blockSize") result = {"blockSize": (bx, by)} # parts try: parts_raw = bytes(doc.get('parts', [])) parts_pos_raw = bytes(doc.get('partsPos', [])) counts = read_parts_pos(parts_pos_raw, bx, by) result['particles'] = read_parts(parts_raw, counts, bx, by) except Exception as e: result['particles'] = f"[Ошибка parts]: {e}" # maps try: result['wall_map'] = read_wall_map(bytes(doc.get('wallMap', [])), bx, by) result['fan_map'] = read_fan_map(bytes(doc.get('fanMap', [])), bx, by) result['press_map'] = read_press_map(bytes(doc.get('pressMap', [])), bx, by) result['vx_map'] = read_velocity_map(bytes(doc.get('vxMap', [])), bx, by) result['vy_map'] = read_velocity_map(bytes(doc.get('vyMap', [])), bx, by) result['ambient_map'] = read_ambient_map(bytes(doc.get('ambientMap', [])), bx, by) result['block_air'] = read_block_air(bytes(doc.get('blockAir', [])), bx, by) result['gravity'] = read_gravity(bytes(doc.get('gravity', [])), bx, by) except Exception as e: result['maps_error'] = str(e) # signs try: result['signs'] = read_signs(doc.get('signs', [])) except Exception as e: result['signs'] = f"[Ошибка signs]: {e}" # meta result['meta'] = { k: doc[k] for k in ['saveInfo', 'palette', 'authors', 'origin'] if k in doc } return result # ==================== GRADIO ==================== def analyze_cps(file): if file is None: return "", "", "", "" try: with open(file.name, "rb") as f: raw_data = f.read() hex_orig = ' '.join(f'{b:02x}' for b in raw_data[:256]) hex_original = f"Оригинал (первые 256 байт):\n{hex_orig}" if raw_data[:4] != b'OPS1': return hex_original, "[Ошибка] Неверная сигнатура файла.", "", "" compressed = raw_data[12:] try: decompressed = bz2.decompress(compressed) except Exception as e: return hex_original, f"[Ошибка bzip2]: {e}", "", "" hex_decomp = ' '.join(f'{b:02x}' for b in decompressed[:512]) hex_decomp_view = f"Распакованные данные (первые 512 байт):\n{hex_decomp}" try: doc = bson.decode(decompressed) except Exception as e: return hex_original, hex_decomp_view, f"[Ошибка BSON]: {e}", "" try: parsed = parse_save(doc) except Exception as e: return hex_original, hex_decomp_view, f"[Ошибка парсинга]: {e}", "" return hex_original, hex_decomp_view, str(parsed) except Exception as e: return f"[Ошибка]: {e}", "", "", "" with gr.Blocks() as demo: gr.Markdown("## Распаковка .cps с полным парсером") file_input = gr.File(label="Загрузите .cps файл", file_types=[".cps"]) hex_orig_out = gr.Textbox(label="HEX оригинала", lines=5) hex_decomp_out = gr.Textbox(label="HEX распакованных данных", lines=10) parsed_output = gr.Textbox(label="Полный парсинг", lines=30) file_input.change( fn=analyze_cps, inputs=file_input, outputs=[hex_orig_out, hex_decomp_out, parsed_output] ) file_input.clear( fn=lambda: ("", "", ""), outputs=[hex_orig_out, hex_decomp_out, parsed_output] ) demo.launch()