tpt_utils / app.py
SnowFlash383935's picture
Update app.py
d0fca70 verified
import gradio as gr
import bz2
import bson
import struct
from typing import List, Dict, Tuple
# ==================== ХЕЛПЕРЫ ====================
def _u8(buf, off): return buf[off]
def _u16(buf, off): return struct.unpack_from('<H', buf, off)[0]
def _u32(buf, off): return struct.unpack_from('<I', buf, off)[0]
def _i8(buf, off): return struct.unpack_from('<b', buf, off)[0]
def _f32(buf, off): return struct.unpack_from('<f', buf, off)[0]
# ==================== ЧТЕНИЕ partsPos ====================
def read_parts_pos(partsPos: bytes, bx: int, by: int) -> List[int]:
if len(partsPos) != bx * by * 3:
raise ValueError('partsPos length mismatch')
cnt_map = []
for i in range(bx * by):
base = i * 3
cnt = (partsPos[base] << 16) | (partsPos[base + 1] << 8) | partsPos[base + 2]
cnt_map.append(cnt)
return cnt_map
# ==================== ЧТЕНИЕ parts ====================
class Particle:
__slots__ = ('type', 'x', 'y', 'temp', 'life', 'tmp', 'ctype',
'dcolour', 'vx', 'vy', 'tmp2', 'tmp3', 'tmp4')
def __init__(self):
self.type: int = 0
self.x: float = 0.0
self.y: float = 0.0
self.temp: float = 0.0
self.life: int = 0
self.tmp: int = 0
self.ctype: int = 0
self.dcolour: int = 0
self.vx: float = 0.0
self.vy: float = 0.0
self.tmp2: int = 0
self.tmp3: int = 0
self.tmp4: int = 0
def __repr__(self):
return f'Particle(type={self.type} @{self.x:.1f},{self.y:.1f})'
def read_parts(partsData: bytes, partsPosCounts: List[int],
bx: int, by: int, cell_size: int = 4) -> List[Particle]:
px_per_cell = cell_size
parts: List[Particle] = []
off = 0
data_len = len(partsData)
for y in range(by):
for x in range(bx):
cnt = partsPosCounts[y * bx + x]
base_x, base_y = x * px_per_cell, y * px_per_cell
for _ in range(cnt):
if off >= data_len - 3:
raise ValueError('truncated partsData')
ty = _u8(partsData, off)
off += 1
fd = _u16(partsData, off)
off += 2
has_fd3 = bool(fd & 0x8000)
if has_fd3:
if off >= data_len:
raise ValueError('no fd3 byte')
fd3 = _u8(partsData, off)
off += 1
fd |= fd3 << 16
if fd & (1 << 14):
ty |= _u8(partsData, off) << 8
off += 1
p = Particle()
p.type = ty
p.x = float(base_x + px_per_cell // 2)
p.y = float(base_y + px_per_cell // 2)
if fd & 1:
p.temp = float(_u16(partsData, off))
off += 2
else:
delta = _i8(partsData, off)
off += 1
p.temp = 294.15 + delta
if fd & (1 << 1):
life = _u8(partsData, off)
off += 1
if fd & (1 << 2):
life |= _u8(partsData, off) << 8
off += 1
p.life = life
if fd & (1 << 3):
tmp = _u8(partsData, off)
off += 1
if fd & (1 << 4):
tmp |= _u8(partsData, off) << 8
off += 1
if fd & (1 << 12):
tmp |= _u8(partsData, off) << 24
off += 1
tmp |= _u8(partsData, off) << 16
off += 1
p.tmp = tmp
if fd & (1 << 5):
ct = _u8(partsData, off)
off += 1
if fd & (1 << 9):
ct |= _u8(partsData, off) << 24
off += 1
ct |= _u8(partsData, off) << 16
off += 1
ct |= _u8(partsData, off) << 8
off += 1
p.ctype = ct
if fd & (1 << 6):
p.dcolour = (_u8(partsData, off) << 24) | \
(_u8(partsData, off + 1) << 16) | \
(_u8(partsData, off + 2) << 8) | \
_u8(partsData, off + 3)
off += 4
if fd & (1 << 7):
p.vx = (_u8(partsData, off) - 127.0) / 16.0
off += 1
if fd & (1 << 8):
p.vy = (_u8(partsData, off) - 127.0) / 16.0
off += 1
if fd & (1 << 10):
t2 = _u8(partsData, off)
off += 1
if fd & (1 << 11):
t2 |= _u8(partsData, off) << 8
off += 1
p.tmp2 = t2
if fd & (1 << 13):
p.tmp3 = _u16(partsData, off)
p.tmp4 = _u16(partsData, off + 2)
off += 4
if fd & (1 << 16):
p.tmp3 |= _u16(partsData, off) << 16
p.tmp4 |= _u16(partsData, off + 2) << 16
off += 4
parts.append(p)
return parts
# ==================== ОСТАЛЬНЫЕ MAPS ====================
def read_wall_map(wallMap: bytes, bx: int, by: int) -> List[int]:
if len(wallMap) != bx * by:
raise ValueError('wallMap length mismatch')
return list(wallMap)
def read_fan_map(fanMap: bytes, bx: int, by: int) -> List[Tuple[float, float]]:
if len(fanMap) != bx * by * 2:
raise ValueError('fanMap length mismatch')
fan_velocities = []
for i in range(bx * by):
base = i * 2
vx = (_u8(fanMap, base) - 127.0) / 64.0
vy = (_u8(fanMap, base + 1) - 127.0) / 64.0
fan_velocities.append((vx, vy))
return fan_velocities
def read_press_map(pressMap: bytes, bx: int, by: int) -> List[float]:
if len(pressMap) != bx * by * 2:
raise ValueError('pressMap length mismatch')
pressures = []
for i in range(bx * by):
base = i * 2
pressure = ((_u8(pressMap, base) + (_u8(pressMap, base + 1) << 8)) / 128.0) - 256
pressures.append(pressure)
return pressures
def read_velocity_map(velocityMap: bytes, bx: int, by: int) -> List[float]:
if len(velocityMap) != bx * by * 2:
raise ValueError('velocityMap length mismatch')
velocities = []
for i in range(bx * by):
base = i * 2
velocity = ((_u8(velocityMap, base) + (_u8(velocityMap, base + 1) << 8)) / 128.0) - 256
velocities.append(velocity)
return velocities
def read_ambient_map(ambientMap: bytes, bx: int, by: int) -> List[float]:
if len(ambientMap) != bx * by * 2:
raise ValueError('ambientMap length mismatch')
ambient_temps = []
for i in range(bx * by):
base = i * 2
temp = _u16(ambientMap, base)
ambient_temps.append(float(temp))
return ambient_temps
def read_block_air(blockAir: bytes, bx: int, by: int) -> List[Tuple[int, int]]:
if len(blockAir) != bx * by * 2:
raise ValueError('blockAir length mismatch')
block_air_values = []
for i in range(bx * by):
base = i * 2
air = _u8(blockAir, base)
airh = _u8(blockAir, base + 1)
block_air_values.append((air, airh))
return block_air_values
def read_gravity(gravity: bytes, bx: int, by: int) -> List[Tuple[float, float, float, float]]:
if len(gravity) != bx * by * 4 * 4:
raise ValueError('gravity length mismatch')
gravity_values = []
for i in range(bx * by):
base = i * 4 * 4
mass = _f32(gravity, base)
mask = _u32(gravity, base + 4)
fx = _f32(gravity, base + 8)
fy = _f32(gravity, base + 12)
gravity_values.append((mass, mask, fx, fy))
return gravity_values
# ==================== SIGNS ====================
class Sign:
__slots__ = ('text', 'x', 'y', 'justification')
def __init__(self):
self.text: str = ""
self.x: int = 0
self.y: int = 0
self.justification: int = 0
def __repr__(self):
return f'Sign(text="{self.text}", x={self.x}, y={self.y}, justification={self.justification})'
def read_signs(signs: List[Dict]) -> List[Sign]:
result = []
for sign_data in signs:
sign = Sign()
sign.text = sign_data.get('text', "")
sign.x = sign_data.get('x', 0)
sign.y = sign_data.get('y', 0)
sign.justification = sign_data.get('justification', 0)
result.append(sign)
return result
# ==================== MAIN PARSER ====================
def parse_save(doc: dict) -> dict:
# Попробуем достать blockSize
bx, by = 0, 0
if 'blockSizeX' in doc and 'blockSizeY' in doc:
bx, by = doc['blockSizeX'], doc['blockSizeY']
elif 'blockSize' in doc:
bs = doc['blockSize']
if isinstance(bs, dict):
bx, by = bs.get('X', 0), bs.get('Y', 0)
elif isinstance(bs, list) and len(bs) == 2:
bx, by = bs[0], bs[1]
if not bx or not by:
raise ValueError("Missing blockSize")
result = {"blockSize": (bx, by)}
# parts
try:
parts_raw = bytes(doc.get('parts', []))
parts_pos_raw = bytes(doc.get('partsPos', []))
counts = read_parts_pos(parts_pos_raw, bx, by)
result['particles'] = read_parts(parts_raw, counts, bx, by)
except Exception as e:
result['particles'] = f"[Ошибка parts]: {e}"
# maps
try:
result['wall_map'] = read_wall_map(bytes(doc.get('wallMap', [])), bx, by)
result['fan_map'] = read_fan_map(bytes(doc.get('fanMap', [])), bx, by)
result['press_map'] = read_press_map(bytes(doc.get('pressMap', [])), bx, by)
result['vx_map'] = read_velocity_map(bytes(doc.get('vxMap', [])), bx, by)
result['vy_map'] = read_velocity_map(bytes(doc.get('vyMap', [])), bx, by)
result['ambient_map'] = read_ambient_map(bytes(doc.get('ambientMap', [])), bx, by)
result['block_air'] = read_block_air(bytes(doc.get('blockAir', [])), bx, by)
result['gravity'] = read_gravity(bytes(doc.get('gravity', [])), bx, by)
except Exception as e:
result['maps_error'] = str(e)
# signs
try:
result['signs'] = read_signs(doc.get('signs', []))
except Exception as e:
result['signs'] = f"[Ошибка signs]: {e}"
# meta
result['meta'] = {
k: doc[k] for k in ['saveInfo', 'palette', 'authors', 'origin']
if k in doc
}
return result
# ==================== GRADIO ====================
def analyze_cps(file):
if file is None:
return "", "", "", ""
try:
with open(file.name, "rb") as f:
raw_data = f.read()
hex_orig = ' '.join(f'{b:02x}' for b in raw_data[:256])
hex_original = f"Оригинал (первые 256 байт):\n{hex_orig}"
if raw_data[:4] != b'OPS1':
return hex_original, "[Ошибка] Неверная сигнатура файла.", "", ""
compressed = raw_data[12:]
try:
decompressed = bz2.decompress(compressed)
except Exception as e:
return hex_original, f"[Ошибка bzip2]: {e}", "", ""
hex_decomp = ' '.join(f'{b:02x}' for b in decompressed[:512])
hex_decomp_view = f"Распакованные данные (первые 512 байт):\n{hex_decomp}"
try:
doc = bson.decode(decompressed)
except Exception as e:
return hex_original, hex_decomp_view, f"[Ошибка BSON]: {e}", ""
try:
parsed = parse_save(doc)
except Exception as e:
return hex_original, hex_decomp_view, f"[Ошибка парсинга]: {e}", ""
return hex_original, hex_decomp_view, str(parsed)
except Exception as e:
return f"[Ошибка]: {e}", "", "", ""
with gr.Blocks() as demo:
gr.Markdown("## Распаковка .cps с полным парсером")
file_input = gr.File(label="Загрузите .cps файл", file_types=[".cps"])
hex_orig_out = gr.Textbox(label="HEX оригинала", lines=5)
hex_decomp_out = gr.Textbox(label="HEX распакованных данных", lines=10)
parsed_output = gr.Textbox(label="Полный парсинг", lines=30)
file_input.change(
fn=analyze_cps,
inputs=file_input,
outputs=[hex_orig_out, hex_decomp_out, parsed_output]
)
file_input.clear(
fn=lambda: ("", "", ""),
outputs=[hex_orig_out, hex_decomp_out, parsed_output]
)
demo.launch()