Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import bz2 | |
| import bson | |
| import struct | |
| from typing import List, Dict, Tuple | |
| # ==================== ХЕЛПЕРЫ ==================== | |
| def _u8(buf, off): return buf[off] | |
| def _u16(buf, off): return struct.unpack_from('<H', buf, off)[0] | |
| def _u32(buf, off): return struct.unpack_from('<I', buf, off)[0] | |
| def _i8(buf, off): return struct.unpack_from('<b', buf, off)[0] | |
| def _f32(buf, off): return struct.unpack_from('<f', buf, off)[0] | |
| # ==================== ЧТЕНИЕ partsPos ==================== | |
| def read_parts_pos(partsPos: bytes, bx: int, by: int) -> List[int]: | |
| if len(partsPos) != bx * by * 3: | |
| raise ValueError('partsPos length mismatch') | |
| cnt_map = [] | |
| for i in range(bx * by): | |
| base = i * 3 | |
| cnt = (partsPos[base] << 16) | (partsPos[base + 1] << 8) | partsPos[base + 2] | |
| cnt_map.append(cnt) | |
| return cnt_map | |
| # ==================== ЧТЕНИЕ parts ==================== | |
| class Particle: | |
| __slots__ = ('type', 'x', 'y', 'temp', 'life', 'tmp', 'ctype', | |
| 'dcolour', 'vx', 'vy', 'tmp2', 'tmp3', 'tmp4') | |
| def __init__(self): | |
| self.type: int = 0 | |
| self.x: float = 0.0 | |
| self.y: float = 0.0 | |
| self.temp: float = 0.0 | |
| self.life: int = 0 | |
| self.tmp: int = 0 | |
| self.ctype: int = 0 | |
| self.dcolour: int = 0 | |
| self.vx: float = 0.0 | |
| self.vy: float = 0.0 | |
| self.tmp2: int = 0 | |
| self.tmp3: int = 0 | |
| self.tmp4: int = 0 | |
| def __repr__(self): | |
| return f'Particle(type={self.type} @{self.x:.1f},{self.y:.1f})' | |
| def read_parts(partsData: bytes, partsPosCounts: List[int], | |
| bx: int, by: int, cell_size: int = 4) -> List[Particle]: | |
| px_per_cell = cell_size | |
| parts: List[Particle] = [] | |
| off = 0 | |
| data_len = len(partsData) | |
| for y in range(by): | |
| for x in range(bx): | |
| cnt = partsPosCounts[y * bx + x] | |
| base_x, base_y = x * px_per_cell, y * px_per_cell | |
| for _ in range(cnt): | |
| if off >= data_len - 3: | |
| raise ValueError('truncated partsData') | |
| ty = _u8(partsData, off) | |
| off += 1 | |
| fd = _u16(partsData, off) | |
| off += 2 | |
| has_fd3 = bool(fd & 0x8000) | |
| if has_fd3: | |
| if off >= data_len: | |
| raise ValueError('no fd3 byte') | |
| fd3 = _u8(partsData, off) | |
| off += 1 | |
| fd |= fd3 << 16 | |
| if fd & (1 << 14): | |
| ty |= _u8(partsData, off) << 8 | |
| off += 1 | |
| p = Particle() | |
| p.type = ty | |
| p.x = float(base_x + px_per_cell // 2) | |
| p.y = float(base_y + px_per_cell // 2) | |
| if fd & 1: | |
| p.temp = float(_u16(partsData, off)) | |
| off += 2 | |
| else: | |
| delta = _i8(partsData, off) | |
| off += 1 | |
| p.temp = 294.15 + delta | |
| if fd & (1 << 1): | |
| life = _u8(partsData, off) | |
| off += 1 | |
| if fd & (1 << 2): | |
| life |= _u8(partsData, off) << 8 | |
| off += 1 | |
| p.life = life | |
| if fd & (1 << 3): | |
| tmp = _u8(partsData, off) | |
| off += 1 | |
| if fd & (1 << 4): | |
| tmp |= _u8(partsData, off) << 8 | |
| off += 1 | |
| if fd & (1 << 12): | |
| tmp |= _u8(partsData, off) << 24 | |
| off += 1 | |
| tmp |= _u8(partsData, off) << 16 | |
| off += 1 | |
| p.tmp = tmp | |
| if fd & (1 << 5): | |
| ct = _u8(partsData, off) | |
| off += 1 | |
| if fd & (1 << 9): | |
| ct |= _u8(partsData, off) << 24 | |
| off += 1 | |
| ct |= _u8(partsData, off) << 16 | |
| off += 1 | |
| ct |= _u8(partsData, off) << 8 | |
| off += 1 | |
| p.ctype = ct | |
| if fd & (1 << 6): | |
| p.dcolour = (_u8(partsData, off) << 24) | \ | |
| (_u8(partsData, off + 1) << 16) | \ | |
| (_u8(partsData, off + 2) << 8) | \ | |
| _u8(partsData, off + 3) | |
| off += 4 | |
| if fd & (1 << 7): | |
| p.vx = (_u8(partsData, off) - 127.0) / 16.0 | |
| off += 1 | |
| if fd & (1 << 8): | |
| p.vy = (_u8(partsData, off) - 127.0) / 16.0 | |
| off += 1 | |
| if fd & (1 << 10): | |
| t2 = _u8(partsData, off) | |
| off += 1 | |
| if fd & (1 << 11): | |
| t2 |= _u8(partsData, off) << 8 | |
| off += 1 | |
| p.tmp2 = t2 | |
| if fd & (1 << 13): | |
| p.tmp3 = _u16(partsData, off) | |
| p.tmp4 = _u16(partsData, off + 2) | |
| off += 4 | |
| if fd & (1 << 16): | |
| p.tmp3 |= _u16(partsData, off) << 16 | |
| p.tmp4 |= _u16(partsData, off + 2) << 16 | |
| off += 4 | |
| parts.append(p) | |
| return parts | |
| # ==================== ОСТАЛЬНЫЕ MAPS ==================== | |
| def read_wall_map(wallMap: bytes, bx: int, by: int) -> List[int]: | |
| if len(wallMap) != bx * by: | |
| raise ValueError('wallMap length mismatch') | |
| return list(wallMap) | |
| def read_fan_map(fanMap: bytes, bx: int, by: int) -> List[Tuple[float, float]]: | |
| if len(fanMap) != bx * by * 2: | |
| raise ValueError('fanMap length mismatch') | |
| fan_velocities = [] | |
| for i in range(bx * by): | |
| base = i * 2 | |
| vx = (_u8(fanMap, base) - 127.0) / 64.0 | |
| vy = (_u8(fanMap, base + 1) - 127.0) / 64.0 | |
| fan_velocities.append((vx, vy)) | |
| return fan_velocities | |
| def read_press_map(pressMap: bytes, bx: int, by: int) -> List[float]: | |
| if len(pressMap) != bx * by * 2: | |
| raise ValueError('pressMap length mismatch') | |
| pressures = [] | |
| for i in range(bx * by): | |
| base = i * 2 | |
| pressure = ((_u8(pressMap, base) + (_u8(pressMap, base + 1) << 8)) / 128.0) - 256 | |
| pressures.append(pressure) | |
| return pressures | |
| def read_velocity_map(velocityMap: bytes, bx: int, by: int) -> List[float]: | |
| if len(velocityMap) != bx * by * 2: | |
| raise ValueError('velocityMap length mismatch') | |
| velocities = [] | |
| for i in range(bx * by): | |
| base = i * 2 | |
| velocity = ((_u8(velocityMap, base) + (_u8(velocityMap, base + 1) << 8)) / 128.0) - 256 | |
| velocities.append(velocity) | |
| return velocities | |
| def read_ambient_map(ambientMap: bytes, bx: int, by: int) -> List[float]: | |
| if len(ambientMap) != bx * by * 2: | |
| raise ValueError('ambientMap length mismatch') | |
| ambient_temps = [] | |
| for i in range(bx * by): | |
| base = i * 2 | |
| temp = _u16(ambientMap, base) | |
| ambient_temps.append(float(temp)) | |
| return ambient_temps | |
| def read_block_air(blockAir: bytes, bx: int, by: int) -> List[Tuple[int, int]]: | |
| if len(blockAir) != bx * by * 2: | |
| raise ValueError('blockAir length mismatch') | |
| block_air_values = [] | |
| for i in range(bx * by): | |
| base = i * 2 | |
| air = _u8(blockAir, base) | |
| airh = _u8(blockAir, base + 1) | |
| block_air_values.append((air, airh)) | |
| return block_air_values | |
| def read_gravity(gravity: bytes, bx: int, by: int) -> List[Tuple[float, float, float, float]]: | |
| if len(gravity) != bx * by * 4 * 4: | |
| raise ValueError('gravity length mismatch') | |
| gravity_values = [] | |
| for i in range(bx * by): | |
| base = i * 4 * 4 | |
| mass = _f32(gravity, base) | |
| mask = _u32(gravity, base + 4) | |
| fx = _f32(gravity, base + 8) | |
| fy = _f32(gravity, base + 12) | |
| gravity_values.append((mass, mask, fx, fy)) | |
| return gravity_values | |
| # ==================== SIGNS ==================== | |
| class Sign: | |
| __slots__ = ('text', 'x', 'y', 'justification') | |
| def __init__(self): | |
| self.text: str = "" | |
| self.x: int = 0 | |
| self.y: int = 0 | |
| self.justification: int = 0 | |
| def __repr__(self): | |
| return f'Sign(text="{self.text}", x={self.x}, y={self.y}, justification={self.justification})' | |
| def read_signs(signs: List[Dict]) -> List[Sign]: | |
| result = [] | |
| for sign_data in signs: | |
| sign = Sign() | |
| sign.text = sign_data.get('text', "") | |
| sign.x = sign_data.get('x', 0) | |
| sign.y = sign_data.get('y', 0) | |
| sign.justification = sign_data.get('justification', 0) | |
| result.append(sign) | |
| return result | |
| # ==================== MAIN PARSER ==================== | |
| def parse_save(doc: dict) -> dict: | |
| # Попробуем достать blockSize | |
| bx, by = 0, 0 | |
| if 'blockSizeX' in doc and 'blockSizeY' in doc: | |
| bx, by = doc['blockSizeX'], doc['blockSizeY'] | |
| elif 'blockSize' in doc: | |
| bs = doc['blockSize'] | |
| if isinstance(bs, dict): | |
| bx, by = bs.get('X', 0), bs.get('Y', 0) | |
| elif isinstance(bs, list) and len(bs) == 2: | |
| bx, by = bs[0], bs[1] | |
| if not bx or not by: | |
| raise ValueError("Missing blockSize") | |
| result = {"blockSize": (bx, by)} | |
| # parts | |
| try: | |
| parts_raw = bytes(doc.get('parts', [])) | |
| parts_pos_raw = bytes(doc.get('partsPos', [])) | |
| counts = read_parts_pos(parts_pos_raw, bx, by) | |
| result['particles'] = read_parts(parts_raw, counts, bx, by) | |
| except Exception as e: | |
| result['particles'] = f"[Ошибка parts]: {e}" | |
| # maps | |
| try: | |
| result['wall_map'] = read_wall_map(bytes(doc.get('wallMap', [])), bx, by) | |
| result['fan_map'] = read_fan_map(bytes(doc.get('fanMap', [])), bx, by) | |
| result['press_map'] = read_press_map(bytes(doc.get('pressMap', [])), bx, by) | |
| result['vx_map'] = read_velocity_map(bytes(doc.get('vxMap', [])), bx, by) | |
| result['vy_map'] = read_velocity_map(bytes(doc.get('vyMap', [])), bx, by) | |
| result['ambient_map'] = read_ambient_map(bytes(doc.get('ambientMap', [])), bx, by) | |
| result['block_air'] = read_block_air(bytes(doc.get('blockAir', [])), bx, by) | |
| result['gravity'] = read_gravity(bytes(doc.get('gravity', [])), bx, by) | |
| except Exception as e: | |
| result['maps_error'] = str(e) | |
| # signs | |
| try: | |
| result['signs'] = read_signs(doc.get('signs', [])) | |
| except Exception as e: | |
| result['signs'] = f"[Ошибка signs]: {e}" | |
| # meta | |
| result['meta'] = { | |
| k: doc[k] for k in ['saveInfo', 'palette', 'authors', 'origin'] | |
| if k in doc | |
| } | |
| return result | |
| # ==================== GRADIO ==================== | |
| def analyze_cps(file): | |
| if file is None: | |
| return "", "", "", "" | |
| try: | |
| with open(file.name, "rb") as f: | |
| raw_data = f.read() | |
| hex_orig = ' '.join(f'{b:02x}' for b in raw_data[:256]) | |
| hex_original = f"Оригинал (первые 256 байт):\n{hex_orig}" | |
| if raw_data[:4] != b'OPS1': | |
| return hex_original, "[Ошибка] Неверная сигнатура файла.", "", "" | |
| compressed = raw_data[12:] | |
| try: | |
| decompressed = bz2.decompress(compressed) | |
| except Exception as e: | |
| return hex_original, f"[Ошибка bzip2]: {e}", "", "" | |
| hex_decomp = ' '.join(f'{b:02x}' for b in decompressed[:512]) | |
| hex_decomp_view = f"Распакованные данные (первые 512 байт):\n{hex_decomp}" | |
| try: | |
| doc = bson.decode(decompressed) | |
| except Exception as e: | |
| return hex_original, hex_decomp_view, f"[Ошибка BSON]: {e}", "" | |
| try: | |
| parsed = parse_save(doc) | |
| except Exception as e: | |
| return hex_original, hex_decomp_view, f"[Ошибка парсинга]: {e}", "" | |
| return hex_original, hex_decomp_view, str(parsed) | |
| except Exception as e: | |
| return f"[Ошибка]: {e}", "", "", "" | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## Распаковка .cps с полным парсером") | |
| file_input = gr.File(label="Загрузите .cps файл", file_types=[".cps"]) | |
| hex_orig_out = gr.Textbox(label="HEX оригинала", lines=5) | |
| hex_decomp_out = gr.Textbox(label="HEX распакованных данных", lines=10) | |
| parsed_output = gr.Textbox(label="Полный парсинг", lines=30) | |
| file_input.change( | |
| fn=analyze_cps, | |
| inputs=file_input, | |
| outputs=[hex_orig_out, hex_decomp_out, parsed_output] | |
| ) | |
| file_input.clear( | |
| fn=lambda: ("", "", ""), | |
| outputs=[hex_orig_out, hex_decomp_out, parsed_output] | |
| ) | |
| demo.launch() | |