OldKingMeister commited on
Commit
bd5aedf
·
1 Parent(s): 0fe518e

fix file read error

Browse files
Files changed (1) hide show
  1. app.py +86 -56
app.py CHANGED
@@ -8,170 +8,200 @@ from typing import Any, List, Tuple, Dict, Union
8
  import gradio as gr
9
 
10
 
11
- # -----------------------------
12
- # Safe unpickling (no imports)
13
- # -----------------------------
14
  class _Record(dict):
15
- """Placeholder for any external class; __setstate__ writes into this dict."""
 
 
 
16
  def __setstate__(self, state):
17
  if isinstance(state, dict):
18
  self.update(state)
19
  else:
20
- # keep non-dict state somewhere visible
21
  self["_state"] = state
22
 
 
23
  class _PlaceholderType(type):
 
 
 
 
24
  def __call__(cls, *args, **kwargs):
25
- # Called for REDUCE/callable cases
26
  return _Record()
27
 
 
28
  class _Placeholder(metaclass=_PlaceholderType):
 
 
 
29
  def __new__(cls, *args, **kwargs):
30
- # Used by NEWOBJ/NEWOBJ_EX
31
  return _Record()
32
 
 
33
  class _SafeUnpickler(pickle.Unpickler):
34
- """Never import external modules; map everything to a benign placeholder type."""
 
 
 
35
  def find_class(self, module, name):
36
- return _Placeholder # returning a 'type' satisfies NEWOBJ paths
 
37
 
38
 
39
  def _to_builtin(obj: Any) -> Any:
40
- """Recursively convert placeholders/bytes/sets into JSON-friendly Python builtins."""
 
 
 
41
  if isinstance(obj, _Record) or isinstance(obj, dict):
42
  return {_to_builtin(k): _to_builtin(v) for k, v in obj.items()}
43
  if isinstance(obj, (list, tuple, set)):
44
  return [_to_builtin(x) for x in obj]
45
  if isinstance(obj, bytes):
 
46
  try:
47
  return obj.decode("utf-8")
48
  except Exception:
49
- # keep as repr if not decodable
50
  return {"__bytes__": True, "len": len(obj)}
51
  return obj
52
 
53
 
54
  def safe_unpickle(data: bytes) -> Any:
55
- """Safe, single-object unpickle to builtins."""
 
 
56
  obj = _SafeUnpickler(io.BytesIO(data)).load()
57
  return _to_builtin(obj)
58
 
59
 
60
- # ---------------------------------------
61
- # Support multi-object pickle concatenation
62
- # ---------------------------------------
63
- PROTO_MARKER = re.compile(br"\x80[\x03-\x05]") # common proto 3/4/5
 
 
64
 
65
  def split_pickle_stream(data: bytes) -> List[Tuple[int, int, bytes]]:
66
  """
67
- Some tools append multiple pickles into one file. Split by protocol markers.
68
- If none found, return the whole file as a single segment.
 
69
  """
70
  idxs = [m.start() for m in PROTO_MARKER.finditer(data)]
71
  if not idxs:
 
72
  return [(0, len(data), data)]
73
  idxs.append(len(data))
74
- segs = []
75
  for i in range(len(idxs) - 1):
76
  s, e = idxs[i], idxs[i + 1]
77
- segs.append((s, e, data[s:e]))
78
- return segs
79
 
80
 
81
- # -----------------------------
82
- # Gradio logic
83
- # -----------------------------
84
  def handle_file(
85
- file: gr.File | None,
86
- pretty: bool,
87
- limit_preview: int,
88
- ) -> Tuple[str, Dict[str, Any] | List[Any] | None, str | None]:
89
  """
90
- Returns: (summary_text, json_preview, download_path)
 
 
91
  """
92
  if file is None:
93
  return "Please upload a .pkl (pickle) file.", None, None
94
 
95
- raw = file.read()
 
 
 
 
 
96
  size_kb = len(raw) / 1024.0
97
 
 
98
  segments = split_pickle_stream(raw)
99
  results: List[Any] = []
100
  errors: List[str] = []
101
-
102
  for i, (_, _, seg) in enumerate(segments, 1):
103
  try:
104
  results.append(safe_unpickle(seg))
105
  except Exception as e:
106
  errors.append(f"Segment {i}: {type(e).__name__}: {e}")
107
 
108
- # If it's a single segment, show the object directly; otherwise show a list
109
  output_obj: Union[Dict[str, Any], List[Any]]
110
  if len(results) == 1:
111
  output_obj = results[0]
112
  else:
113
  output_obj = results
114
 
115
- # Build summary text
116
  summary_lines = [
117
  f"File size: {size_kb:.1f} KB",
118
  f"Detected pickle segments: {len(segments)}",
119
  ]
120
  if errors:
121
  summary_lines.append(f"Errors: {len(errors)}")
122
- summary_lines.extend(errors[:limit_preview])
123
-
124
  summary = "\n".join(summary_lines)
125
 
126
- # Create a temporary JSON file for download
127
- tmp = tempfile.NamedTemporaryFile(prefix="pickle_view_", suffix=".json", delete=False)
128
  with open(tmp.name, "w", encoding="utf-8") as f:
129
- if pretty:
130
  json.dump(output_obj, f, ensure_ascii=False, indent=2)
131
  else:
132
  json.dump(output_obj, f, ensure_ascii=False, separators=(",", ":"))
133
  download_path = tmp.name
134
 
135
- # For the on-page JSON preview, Gradio's JSON component handles dict/list nicely
136
- # (It doesn't use the 'pretty' toggle, which only affects the downloadable file.)
137
- json_preview = output_obj
138
 
139
- return summary, json_preview, download_path
140
 
141
-
142
- with gr.Blocks(title="Pickle Viewer (Safe)") as demo:
143
  gr.Markdown(
144
  """
145
- # 🥒 Pickle Viewer (Safe)
146
- Safely deserialize Python pickle files **without executing arbitrary code**, then preview and download as JSON.
 
147
 
148
  - **Safe**: Unknown classes are mapped to harmless placeholders (no imports run).
149
- - **Multi-pickle**: Supports files that contain multiple concatenated pickle objects.
150
- - **Download**: Get the parsed JSON as a file.
151
 
152
- > Tip: This tool is for **inspection** only. If you need to *execute* real classes, do it locally and only on trusted data.
153
  """
154
  )
155
 
156
  with gr.Row():
157
- file_in = gr.File(label="Upload .pkl file", file_types=[".pkl", ".pickle"], type="binary")
 
 
 
 
158
  with gr.Column():
159
  pretty = gr.Checkbox(value=True, label="Pretty JSON (for download)")
160
- limit_preview = gr.Slider(
161
- 1, 50, value=10, step=1,
162
- label="Max error lines to show in summary"
163
- )
164
 
165
  run_btn = gr.Button("Deserialize Safely")
166
- summary_out = gr.Textbox(label="Summary", lines=6)
167
  json_out = gr.JSON(label="Parsed JSON Preview")
168
  download_out = gr.File(label="Download JSON")
169
 
170
  run_btn.click(
171
  fn=handle_file,
172
- inputs=[file_in, pretty, limit_preview],
173
  outputs=[summary_out, json_out, download_out],
174
  )
175
 
176
  if __name__ == "__main__":
 
177
  demo.launch()
 
8
  import gradio as gr
9
 
10
 
11
+ # =========================
12
+ # Safe unpickling machinery
13
+ # =========================
14
  class _Record(dict):
15
+ """
16
+ Placeholder for any external class instance encountered during unpickling.
17
+ We implement __setstate__ so state can be applied into this dict safely.
18
+ """
19
  def __setstate__(self, state):
20
  if isinstance(state, dict):
21
  self.update(state)
22
  else:
23
+ # Keep non-dict states visible rather than failing
24
  self["_state"] = state
25
 
26
+
27
  class _PlaceholderType(type):
28
+ """
29
+ A metaclass that returns _Record() when the placeholder 'type' is called.
30
+ This handles REDUCE/callable paths in pickle protocols.
31
+ """
32
  def __call__(cls, *args, **kwargs):
 
33
  return _Record()
34
 
35
+
36
  class _Placeholder(metaclass=_PlaceholderType):
37
+ """
38
+ A 'type' whose __new__ also returns _Record(), satisfying NEWOBJ/NEWOBJ_EX.
39
+ """
40
  def __new__(cls, *args, **kwargs):
 
41
  return _Record()
42
 
43
+
44
  class _SafeUnpickler(pickle.Unpickler):
45
+ """
46
+ Unpickler that never imports external modules/classes.
47
+ Every GLOBAL reference becomes _Placeholder, which yields _Record objects.
48
+ """
49
  def find_class(self, module, name):
50
+ # Never import arbitrary classes; map to our safe placeholder 'type'
51
+ return _Placeholder
52
 
53
 
54
  def _to_builtin(obj: Any) -> Any:
55
+ """
56
+ Recursively convert placeholders and non-JSON-friendly types into
57
+ plain Python builtins (dict/list/str/ints), suitable for JSON dumping.
58
+ """
59
  if isinstance(obj, _Record) or isinstance(obj, dict):
60
  return {_to_builtin(k): _to_builtin(v) for k, v in obj.items()}
61
  if isinstance(obj, (list, tuple, set)):
62
  return [_to_builtin(x) for x in obj]
63
  if isinstance(obj, bytes):
64
+ # Try decoding as UTF-8; otherwise keep a small descriptor
65
  try:
66
  return obj.decode("utf-8")
67
  except Exception:
 
68
  return {"__bytes__": True, "len": len(obj)}
69
  return obj
70
 
71
 
72
  def safe_unpickle(data: bytes) -> Any:
73
+ """
74
+ Safely unpickle a single pickle object from bytes without importing modules.
75
+ """
76
  obj = _SafeUnpickler(io.BytesIO(data)).load()
77
  return _to_builtin(obj)
78
 
79
 
80
+ # =========================
81
+ # Multi-pickle file support
82
+ # =========================
83
+ # Common pickle protocol markers: 0x80 0x03/0x04/0x05
84
+ PROTO_MARKER = re.compile(br"\x80[\x03-\x05]")
85
+
86
 
87
  def split_pickle_stream(data: bytes) -> List[Tuple[int, int, bytes]]:
88
  """
89
+ Some tools concatenate multiple pickles in one file.
90
+ Split the byte stream by protocol markers.
91
+ Returns a list of (start, end, segment_bytes).
92
  """
93
  idxs = [m.start() for m in PROTO_MARKER.finditer(data)]
94
  if not idxs:
95
+ # Not a concatenated file (or older protocol); treat whole file as one segment
96
  return [(0, len(data), data)]
97
  idxs.append(len(data))
98
+ segments = []
99
  for i in range(len(idxs) - 1):
100
  s, e = idxs[i], idxs[i + 1]
101
+ segments.append((s, e, data[s:e]))
102
+ return segments
103
 
104
 
105
+ # =========================
106
+ # Gradio handlers / UI
107
+ # =========================
108
  def handle_file(
109
+ file: Union[bytes, io.BufferedReader, None],
110
+ pretty_json: bool,
111
+ max_error_lines: int,
112
+ ) -> Tuple[str, Union[Dict[str, Any], List[Any], None], Union[str, None]]:
113
  """
114
+ Gradio callback:
115
+ - 'file' can be bytes (when File(type="binary")) or a file-like object.
116
+ - Returns (summary_text, json_preview_object, download_json_path).
117
  """
118
  if file is None:
119
  return "Please upload a .pkl (pickle) file.", None, None
120
 
121
+ # Accept both raw bytes and file-like objects
122
+ if isinstance(file, (bytes, bytearray)):
123
+ raw = bytes(file)
124
+ else:
125
+ raw = file.read()
126
+
127
  size_kb = len(raw) / 1024.0
128
 
129
+ # Split and safely unpickle each segment
130
  segments = split_pickle_stream(raw)
131
  results: List[Any] = []
132
  errors: List[str] = []
 
133
  for i, (_, _, seg) in enumerate(segments, 1):
134
  try:
135
  results.append(safe_unpickle(seg))
136
  except Exception as e:
137
  errors.append(f"Segment {i}: {type(e).__name__}: {e}")
138
 
139
+ # Single segment -> show the object; multiple -> show a list
140
  output_obj: Union[Dict[str, Any], List[Any]]
141
  if len(results) == 1:
142
  output_obj = results[0]
143
  else:
144
  output_obj = results
145
 
146
+ # Build summary
147
  summary_lines = [
148
  f"File size: {size_kb:.1f} KB",
149
  f"Detected pickle segments: {len(segments)}",
150
  ]
151
  if errors:
152
  summary_lines.append(f"Errors: {len(errors)}")
153
+ summary_lines.extend(errors[:max_error_lines])
 
154
  summary = "\n".join(summary_lines)
155
 
156
+ # Create a temporary JSON for download
157
+ tmp = tempfile.NamedTemporaryFile(prefix="depickle_", suffix=".json", delete=False)
158
  with open(tmp.name, "w", encoding="utf-8") as f:
159
+ if pretty_json:
160
  json.dump(output_obj, f, ensure_ascii=False, indent=2)
161
  else:
162
  json.dump(output_obj, f, ensure_ascii=False, separators=(",", ":"))
163
  download_path = tmp.name
164
 
165
+ # For the on-page preview, Gradio's JSON component accepts dict/list directly
166
+ return summary, output_obj, download_path
 
167
 
 
168
 
169
+ with gr.Blocks(title="DePickle — Safe Pickle Viewer") as demo:
 
170
  gr.Markdown(
171
  """
172
+ # 🥒 DePickle — Safe Pickle Viewer
173
+
174
+ Safely deserialize Python pickle files **without executing arbitrary code** and preview as JSON.
175
 
176
  - **Safe**: Unknown classes are mapped to harmless placeholders (no imports run).
177
+ - **Multi-pickle**: Supports files containing multiple concatenated pickle objects.
178
+ - **Download**: Export the parsed content as a JSON file.
179
 
180
+ > Note: This tool is for **inspection** only. If you need to execute real classes, do it locally and only on trusted data.
181
  """
182
  )
183
 
184
  with gr.Row():
185
+ file_in = gr.File(
186
+ label="Upload .pkl / .pickle",
187
+ file_types=[".pkl", ".pickle"],
188
+ type="binary", # returns bytes; our handler supports bytes/file
189
+ )
190
  with gr.Column():
191
  pretty = gr.Checkbox(value=True, label="Pretty JSON (for download)")
192
+ max_lines = gr.Slider(1, 50, value=10, step=1, label="Max error lines in summary")
 
 
 
193
 
194
  run_btn = gr.Button("Deserialize Safely")
195
+ summary_out = gr.Textbox(label="Summary", lines=8)
196
  json_out = gr.JSON(label="Parsed JSON Preview")
197
  download_out = gr.File(label="Download JSON")
198
 
199
  run_btn.click(
200
  fn=handle_file,
201
+ inputs=[file_in, pretty, max_lines],
202
  outputs=[summary_out, json_out, download_out],
203
  )
204
 
205
  if __name__ == "__main__":
206
+ # In Spaces, Gradio will be launched by the runtime; locally you can run it yourself.
207
  demo.launch()