SnowFlash383935 commited on
Commit
44aa74a
·
verified ·
1 Parent(s): 56194fd

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +66 -60
app.py CHANGED
@@ -2,19 +2,21 @@ import gradio as gr
2
  import bz2
3
  import bson
4
  import struct
5
- from typing import List, Dict, Any
6
 
7
  # ---------- низкоуровневые хелперы ----------
8
  def _u8(buf, off) -> int:
9
  return buf[off]
10
  def _u16(buf, off) -> int:
11
  return struct.unpack_from('<H', buf, off)[0]
 
 
12
  def _i8(buf, off) -> int:
13
  return struct.unpack_from('<b', buf, off)[0]
14
 
15
  # ---------- чтение partsPos ----------
16
  def read_parts_pos(partsPos: bytes, bx: int, by: int) -> List[int]:
17
- """Возвращает сколько частиц в каждой клетке (длина = bx*by)"""
18
  if len(partsPos) != bx * by * 3:
19
  raise ValueError(f'partsPos length mismatch: expected {bx*by*3}, got {len(partsPos)}')
20
  cnt_map = []
@@ -24,7 +26,7 @@ def read_parts_pos(partsPos: bytes, bx: int, by: int) -> List[int]:
24
  cnt_map.append(cnt)
25
  return cnt_map
26
 
27
- # ---------- класс Particle ----------
28
  class Particle:
29
  __slots__ = ('type', 'x', 'y', 'temp', 'life', 'tmp', 'ctype',
30
  'dcolour', 'vx', 'vy', 'tmp2', 'tmp3', 'tmp4')
@@ -44,22 +46,17 @@ class Particle:
44
  self.tmp3: int = 0
45
  self.tmp4: int = 0
46
 
47
- def to_dict(self) -> Dict[str, Any]:
48
  return {field: getattr(self, field) for field in self.__slots__}
49
 
50
  def __repr__(self):
51
  return f'Particle(type={self.type} @{self.x:.1f},{self.y:.1f})'
52
 
53
- # ---------- чтение parts ----------
54
  def read_parts(partsData: bytes, partsPosCounts: List[int],
55
- bx: int, by: int, cell_size: int = 4) -> List[Dict[str, Any]]:
56
- """
57
- partsData – байты из BSON['parts']
58
- partsPosCounts – результат read_parts_pos
59
- Возвращает список частиц как dict
60
- """
61
  px_per_cell = cell_size
62
- particles: List[Dict[str, Any]] = []
63
  off = 0
64
  data_len = len(partsData)
65
 
@@ -67,19 +64,17 @@ def read_parts(partsData: bytes, partsPosCounts: List[int],
67
  for x in range(bx):
68
  cnt = partsPosCounts[y * bx + x]
69
  base_x, base_y = x * px_per_cell, y * px_per_cell
70
-
71
  for _ in range(cnt):
72
  if off >= data_len - 3:
73
  raise ValueError('truncated partsData')
74
 
75
- # 1. type (1 байт + возможно второй)
76
  ty = _u8(partsData, off)
77
  off += 1
78
 
79
- # 2. field-descriptor 2 байта
80
  fd = _u16(partsData, off)
81
  off += 2
82
-
83
  has_fd3 = bool(fd & 0x8000)
84
  if has_fd3:
85
  if off >= data_len:
@@ -178,84 +173,95 @@ def read_parts(partsData: bytes, partsPosCounts: List[int],
178
  p.tmp4 |= _u16(partsData, off + 2) << 16
179
  off += 4
180
 
181
- particles.append(p.to_dict())
182
-
183
- return particles
 
 
 
 
 
 
 
 
 
 
 
184
 
185
  # ---------- основная функция Gradio ----------
186
  def analyze_cps(file):
187
  if file is None:
188
- return "", "", ""
189
-
190
  try:
191
  with open(file.name, "rb") as f:
192
  raw_data = f.read()
193
-
 
194
  hex_orig = ' '.join(f'{b:02x}' for b in raw_data[:256])
195
  hex_original = f"Оригинал (первые 256 байт):\n{hex_orig}"
196
-
 
197
  if raw_data[:4] != b'OPS1':
198
- return hex_original, "[Ошибка] Неверная сигнатура файла.", ""
199
-
 
200
  compressed = raw_data[12:]
201
  try:
202
  decompressed = bz2.decompress(compressed)
203
  except Exception as e:
204
- return hex_original, f"[Ошибка bzip2]: {e}", ""
205
-
 
206
  hex_decomp = ' '.join(f'{b:02x}' for b in decompressed[:512])
207
  hex_decomp_view = f"Распакованные данные (первые 512 байт):\n{hex_decomp}"
208
-
 
209
  try:
210
  doc = bson.decode(decompressed)
211
  except Exception as e:
212
- return hex_original, hex_decomp_view, f"[Ошибка BSON]: {e}"
213
-
214
- # Парсим parts
215
- if 'parts' in doc and 'partsPos' in doc and 'blockSize' in doc:
216
  try:
217
- # Получаем размеры блока
218
- block_size = doc['blockSize']
219
- if isinstance(block_size, dict):
220
- bx, by = block_size.get('x', 0), block_size.get('y', 0)
221
- elif isinstance(block_size, (list, tuple)):
222
- bx, by = block_size[0], block_size[1]
223
- else:
224
- bx = by = int(block_size)
225
-
226
- if bx <= 0 or by <= 0:
227
- raise ValueError(f"Invalid blockSize: {block_size}")
228
-
229
- # Читаем partsPos и parts
230
  parts_pos_raw = bytes(doc['partsPos'])
231
  parts_raw = bytes(doc['parts'])
232
 
 
233
  counts = read_parts_pos(parts_pos_raw, bx, by)
234
- parsed_parts = read_parts(parts_raw, counts, bx, by)
235
 
236
- # Заменяем бинарные данные на распарсенные
237
- doc['parts'] = f"[{len(parsed_parts)} particles parsed]"
238
- doc['partsPos'] = f"[{len(counts)} cells]"
239
- doc['_particles'] = parsed_parts # Сохраняем отдельно
 
 
 
240
 
241
  except Exception as e:
242
- doc['parts'] = f"[Ошибка парсинга]: {e}"
243
- doc['partsPos'] = f"[Ошибка парсинга]: {e}"
244
-
245
  return hex_original, hex_decomp_view, str(doc)
246
-
247
  except Exception as e:
248
- return f"[Ошибка]: {e}", "", ""
249
 
250
- # ---------- Gradio интерфейс ----------
251
  with gr.Blocks() as demo:
252
- gr.Markdown("## Распаковка .cps (The Powder Toy) с полным парсингом частиц")
 
253
 
254
  file_input = gr.File(label="Загрузите .cps файл", file_types=[".cps"])
255
 
256
- hex_orig_out = gr.Textbox(label="HEX оригинала", lines=5)
257
- hex_decomp_out = gr.Textbox(label="HEX распакованных данных", lines=10)
258
- bson_parsed = gr.Textbox(label="Распарсенный BSON", lines=30)
 
 
259
 
260
  file_input.change(
261
  fn=analyze_cps,
 
2
  import bz2
3
  import bson
4
  import struct
5
+ from typing import List, Dict, Tuple
6
 
7
  # ---------- низкоуровневые хелперы ----------
8
  def _u8(buf, off) -> int:
9
  return buf[off]
10
  def _u16(buf, off) -> int:
11
  return struct.unpack_from('<H', buf, off)[0]
12
+ def _u32(buf, off) -> int:
13
+ return struct.unpack_from('<I', buf, off)[0]
14
  def _i8(buf, off) -> int:
15
  return struct.unpack_from('<b', buf, off)[0]
16
 
17
  # ---------- чтение partsPos ----------
18
  def read_parts_pos(partsPos: bytes, bx: int, by: int) -> List[int]:
19
+ """Возвращает список длиной bx*by – сколько частиц в каждой клетке"""
20
  if len(partsPos) != bx * by * 3:
21
  raise ValueError(f'partsPos length mismatch: expected {bx*by*3}, got {len(partsPos)}')
22
  cnt_map = []
 
26
  cnt_map.append(cnt)
27
  return cnt_map
28
 
29
+ # ---------- чтение parts ----------
30
  class Particle:
31
  __slots__ = ('type', 'x', 'y', 'temp', 'life', 'tmp', 'ctype',
32
  'dcolour', 'vx', 'vy', 'tmp2', 'tmp3', 'tmp4')
 
46
  self.tmp3: int = 0
47
  self.tmp4: int = 0
48
 
49
+ def to_dict(self) -> Dict:
50
  return {field: getattr(self, field) for field in self.__slots__}
51
 
52
  def __repr__(self):
53
  return f'Particle(type={self.type} @{self.x:.1f},{self.y:.1f})'
54
 
 
55
  def read_parts(partsData: bytes, partsPosCounts: List[int],
56
+ bx: int, by: int, cell_size: int = 4) -> List[Particle]:
57
+ """Парсит массив частиц из бинарных данных"""
 
 
 
 
58
  px_per_cell = cell_size
59
+ parts: List[Particle] = []
60
  off = 0
61
  data_len = len(partsData)
62
 
 
64
  for x in range(bx):
65
  cnt = partsPosCounts[y * bx + x]
66
  base_x, base_y = x * px_per_cell, y * px_per_cell
 
67
  for _ in range(cnt):
68
  if off >= data_len - 3:
69
  raise ValueError('truncated partsData')
70
 
71
+ # type (1 байт + возможно второй)
72
  ty = _u8(partsData, off)
73
  off += 1
74
 
75
+ # field-descriptor 2 байта
76
  fd = _u16(partsData, off)
77
  off += 2
 
78
  has_fd3 = bool(fd & 0x8000)
79
  if has_fd3:
80
  if off >= data_len:
 
173
  p.tmp4 |= _u16(partsData, off + 2) << 16
174
  off += 4
175
 
176
+ parts.append(p)
177
+ return parts
178
+
179
+ def get_block_size(bson_dict: Dict) -> Tuple[int, int]:
180
+ """Извлекает размеры сейва в блоках из BSON-словаря"""
181
+ bx = bson_dict.get('blockSizeX', 0)
182
+ by = bson_dict.get('blockSizeY', 0)
183
+ if not bx or not by:
184
+ block_size = bson_dict.get('blockSize', [0, 0])
185
+ if isinstance(block_size, list) and len(block_size) == 2:
186
+ bx, by = block_size
187
+ else:
188
+ raise ValueError("Не удалось извлечь размеры сейва")
189
+ return bx, by
190
 
191
  # ---------- основная функция Gradio ----------
192
  def analyze_cps(file):
193
  if file is None:
194
+ return "", "", "", ""
195
+
196
  try:
197
  with open(file.name, "rb") as f:
198
  raw_data = f.read()
199
+
200
+ # HEX дамп оригинала
201
  hex_orig = ' '.join(f'{b:02x}' for b in raw_data[:256])
202
  hex_original = f"Оригинал (первые 256 байт):\n{hex_orig}"
203
+
204
+ # Проверка сигнатуры
205
  if raw_data[:4] != b'OPS1':
206
+ return hex_original, "[Ошибка] Неверная сигнатура файла.", "", ""
207
+
208
+ # Распаковка bzip2
209
  compressed = raw_data[12:]
210
  try:
211
  decompressed = bz2.decompress(compressed)
212
  except Exception as e:
213
+ return hex_original, f"[Ошибка bzip2]: {e}", "", ""
214
+
215
+ # HEX дамп распакованных данных
216
  hex_decomp = ' '.join(f'{b:02x}' for b in decompressed[:512])
217
  hex_decomp_view = f"Распакованные данные (первые 512 байт):\n{hex_decomp}"
218
+
219
+ # Парсинг BSON
220
  try:
221
  doc = bson.decode(decompressed)
222
  except Exception as e:
223
+ return hex_original, hex_decomp_view, f"[Ошибка BSON]: {e}", ""
224
+
225
+ # Парсинг parts и partsPos
226
+ if 'parts' in doc and 'partsPos' in doc:
227
  try:
228
+ bx, by = get_block_size(doc)
 
 
 
 
 
 
 
 
 
 
 
 
229
  parts_pos_raw = bytes(doc['partsPos'])
230
  parts_raw = bytes(doc['parts'])
231
 
232
+ # Читаем карту количества частиц
233
  counts = read_parts_pos(parts_pos_raw, bx, by)
234
+ total_particles = sum(counts)
235
 
236
+ # Читаем сами частицы
237
+ particles = read_parts(parts_raw, counts, bx, by)
238
+
239
+ # Заменяем бинарные данные на список словарей
240
+ doc['parts'] = [p.to_dict() for p in particles]
241
+ doc['partsPos'] = f"Карта количества частиц ({bx}x{by} = {len(counts)} клеток)"
242
+ doc['__info'] = f"Всего частиц: {total_particles}"
243
 
244
  except Exception as e:
245
+ doc['parts'] = f"[Ошибка парсинга parts]: {e}"
246
+ doc['partsPos'] = f"[Ошибка парсинга partsPos]: {e}"
247
+
248
  return hex_original, hex_decomp_view, str(doc)
249
+
250
  except Exception as e:
251
+ return f"[Ошибка]: {e}", "", "", ""
252
 
253
+ # ---------- Gradio UI ----------
254
  with gr.Blocks() as demo:
255
+ gr.Markdown("## 🔬 Распаковка и анализ .cps (The Powder Toy Save)")
256
+ gr.Markdown("Загрузите файл `.cps`, чтобы увидеть полную структуру данных, включая частицы.")
257
 
258
  file_input = gr.File(label="Загрузите .cps файл", file_types=[".cps"])
259
 
260
+ with gr.Row():
261
+ hex_orig_out = gr.Textbox(label="HEX оригинального файла", lines=5)
262
+ hex_decomp_out = gr.Textbox(label="HEX распакованных данных", lines=10)
263
+
264
+ bson_parsed = gr.Textbox(label="Распарсенный BSON (с частицами)", lines=30)
265
 
266
  file_input.change(
267
  fn=analyze_cps,