q6 commited on
Commit
0cde756
·
1 Parent(s): 7f1899d

test-quick-stealth

Browse files
Files changed (1) hide show
  1. Client/Scripts/test-short.py +340 -0
Client/Scripts/test-short.py ADDED
@@ -0,0 +1,340 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import zlib
3
+
4
+ import requests
5
+
6
+
7
+ MAGIC = b"stealth_pngcomp"
8
+ PNG_SIGNATURE = b"\x89PNG\r\n\x1a\n"
9
+ USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0"
10
+ REQUEST_TIMEOUT = 45
11
+ POST_IDS = [
12
+ "116370378",
13
+ "137989718",
14
+ "127319128",
15
+ "134714737",
16
+ "121968369",
17
+ ]
18
+
19
+
20
+ def read_dotenv_value(path, key):
21
+ try:
22
+ with open(path, "r") as env_file:
23
+ for line in env_file:
24
+ line = line.strip()
25
+ if not line or line.startswith("#") or "=" not in line:
26
+ continue
27
+ k, v = line.split("=", 1)
28
+ if k == key:
29
+ return v
30
+ except FileNotFoundError:
31
+ return None
32
+ return None
33
+
34
+
35
+ def get_phpsessid():
36
+ phpsessid = os.getenv("PHPSESSID")
37
+ if phpsessid:
38
+ return phpsessid
39
+ script_dir = os.path.dirname(os.path.abspath(__file__))
40
+ env_candidates = [
41
+ os.path.join(script_dir, "..", ".env"),
42
+ os.path.join(script_dir, "..", "..", ".env"),
43
+ ]
44
+ for env_path in env_candidates:
45
+ phpsessid = read_dotenv_value(os.path.abspath(env_path), "PHPSESSID")
46
+ if phpsessid:
47
+ return phpsessid
48
+ raise RuntimeError("PHPSESSID is not set in the environment or .env")
49
+
50
+
51
+ def build_session(phpsessid):
52
+ session = requests.Session()
53
+ session.headers.update({"User-Agent": USER_AGENT, "Referer": "https://www.pixiv.net/"})
54
+ session.cookies.update({"PHPSESSID": phpsessid})
55
+ return session
56
+
57
+
58
+ def fetch_post_pages(session, post_id):
59
+ url = f"https://www.pixiv.net/ajax/illust/{post_id}/pages"
60
+ headers = {"Referer": f"https://www.pixiv.net/artworks/{post_id}"}
61
+ response = session.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
62
+ response.raise_for_status()
63
+ data = response.json()
64
+ return data.get("body") or []
65
+
66
+
67
+ def read_exact(stream, size):
68
+ chunks = []
69
+ remaining = size
70
+ while remaining > 0:
71
+ data = stream.read(remaining)
72
+ if not data:
73
+ raise EOFError("Unexpected end of stream.")
74
+ chunks.append(data)
75
+ remaining -= len(data)
76
+ return b"".join(chunks)
77
+
78
+
79
+ def paeth_predictor(a, b, c):
80
+ p = a + b - c
81
+ pa = abs(p - a)
82
+ pb = abs(p - b)
83
+ pc = abs(p - c)
84
+ if pa <= pb and pa <= pc:
85
+ return a
86
+ if pb <= pc:
87
+ return b
88
+ return c
89
+
90
+
91
+ def unfilter_row(filter_type, raw, prev, bpp):
92
+ raw = memoryview(raw)
93
+ row = bytearray(len(raw))
94
+ if filter_type == 0:
95
+ row[:] = raw
96
+ return row
97
+ if filter_type == 1:
98
+ for i in range(len(raw)):
99
+ left = row[i - bpp] if i >= bpp else 0
100
+ row[i] = (raw[i] + left) & 0xFF
101
+ return row
102
+ if filter_type == 2:
103
+ for i in range(len(raw)):
104
+ row[i] = (raw[i] + prev[i]) & 0xFF
105
+ return row
106
+ if filter_type == 3:
107
+ for i in range(len(raw)):
108
+ left = row[i - bpp] if i >= bpp else 0
109
+ up = prev[i]
110
+ row[i] = (raw[i] + ((left + up) // 2)) & 0xFF
111
+ return row
112
+ if filter_type == 4:
113
+ for i in range(len(raw)):
114
+ left = row[i - bpp] if i >= bpp else 0
115
+ up = prev[i]
116
+ up_left = prev[i - bpp] if i >= bpp else 0
117
+ row[i] = (raw[i] + paeth_predictor(left, up, up_left)) & 0xFF
118
+ return row
119
+ raise ValueError(f"Unsupported PNG filter type: {filter_type}")
120
+
121
+
122
+ def parse_ihdr(data):
123
+ if len(data) != 13:
124
+ raise ValueError("Invalid IHDR chunk length.")
125
+ width = int.from_bytes(data[0:4], "big")
126
+ height = int.from_bytes(data[4:8], "big")
127
+ bit_depth = data[8]
128
+ color_type = data[9]
129
+ compression = data[10]
130
+ filter_method = data[11]
131
+ interlace = data[12]
132
+ return width, height, bit_depth, color_type, compression, filter_method, interlace
133
+
134
+
135
+ def stream_magic_and_length(session, url, referer):
136
+ headers = {
137
+ "Referer": referer,
138
+ "User-Agent": USER_AGENT,
139
+ "Accept-Encoding": "identity",
140
+ }
141
+ required_bytes = len(MAGIC) + 4
142
+ required_bits = required_bytes * 8
143
+ with session.get(url, headers=headers, stream=True, timeout=REQUEST_TIMEOUT) as response:
144
+ response.raise_for_status()
145
+ response.raw.decode_content = False
146
+ stream = response.raw
147
+
148
+ signature = read_exact(stream, len(PNG_SIGNATURE))
149
+ if signature != PNG_SIGNATURE:
150
+ raise ValueError("Not a PNG file.")
151
+
152
+ width = height = None
153
+ row_bytes = None
154
+ bpp = None
155
+ rows_needed = None
156
+ rows_processed = 0
157
+ prev_row = None
158
+ scanline_buf = bytearray()
159
+ out_bytes = bytearray()
160
+ current_byte = 0
161
+ bits_in_current = 0
162
+ decompressor = zlib.decompressobj()
163
+
164
+ while True:
165
+ length_bytes = read_exact(stream, 4)
166
+ chunk_length = int.from_bytes(length_bytes, "big")
167
+ chunk_type = read_exact(stream, 4)
168
+
169
+ if chunk_type == b"IHDR":
170
+ ihdr = read_exact(stream, chunk_length)
171
+ width, height, bit_depth, color_type, compression, filter_method, interlace = parse_ihdr(ihdr)
172
+ if compression != 0 or filter_method != 0 or interlace != 0:
173
+ raise ValueError("Unsupported PNG compression/filter/interlace.")
174
+ if bit_depth != 8:
175
+ raise ValueError("Unsupported PNG bit depth.")
176
+ if color_type not in (4, 6):
177
+ raise ValueError("PNG does not have an alpha channel.")
178
+ if width <= 0 or height <= 0:
179
+ raise ValueError("Invalid PNG dimensions.")
180
+ channels = 2 if color_type == 4 else 4
181
+ bpp = channels
182
+ row_bytes = width * bpp
183
+ rows_needed = (required_bits + width - 1) // width
184
+ if rows_needed > height:
185
+ raise ValueError("PNG too short to read header bits.")
186
+ prev_row = bytearray(row_bytes)
187
+ read_exact(stream, 4)
188
+ continue
189
+
190
+ if chunk_type == b"IDAT":
191
+ if row_bytes is None:
192
+ raise ValueError("IDAT before IHDR.")
193
+ remaining = chunk_length
194
+ while remaining > 0:
195
+ chunk = read_exact(stream, min(16384, remaining))
196
+ remaining -= len(chunk)
197
+ output = decompressor.decompress(chunk)
198
+ if not output:
199
+ continue
200
+ scanline_buf.extend(output)
201
+
202
+ while rows_processed < rows_needed and len(scanline_buf) >= row_bytes + 1:
203
+ filter_type = scanline_buf[0]
204
+ raw_row = scanline_buf[1:1 + row_bytes]
205
+ del scanline_buf[:1 + row_bytes]
206
+ row = unfilter_row(filter_type, raw_row, prev_row, bpp)
207
+ prev_row = row
208
+ rows_processed += 1
209
+
210
+ alpha_offset = bpp - 1
211
+ for i in range(alpha_offset, row_bytes, bpp):
212
+ bit = row[i] & 1
213
+ current_byte = (current_byte << 1) | bit
214
+ bits_in_current += 1
215
+ if bits_in_current == 8:
216
+ out_bytes.append(current_byte)
217
+ bits_in_current = 0
218
+ current_byte = 0
219
+ if len(out_bytes) >= required_bytes:
220
+ return {
221
+ "magic": bytes(out_bytes[: len(MAGIC)]),
222
+ "length_bits": int.from_bytes(
223
+ out_bytes[len(MAGIC):required_bytes],
224
+ "big",
225
+ ),
226
+ "width": width,
227
+ "height": height,
228
+ "rows_needed": rows_needed,
229
+ "rows_processed": rows_processed,
230
+ }
231
+ if rows_processed >= rows_needed and len(out_bytes) >= required_bytes:
232
+ return {
233
+ "magic": bytes(out_bytes[: len(MAGIC)]),
234
+ "length_bits": int.from_bytes(
235
+ out_bytes[len(MAGIC):required_bytes],
236
+ "big",
237
+ ),
238
+ "width": width,
239
+ "height": height,
240
+ "rows_needed": rows_needed,
241
+ "rows_processed": rows_processed,
242
+ }
243
+
244
+ read_exact(stream, 4)
245
+ continue
246
+
247
+ read_exact(stream, chunk_length)
248
+ read_exact(stream, 4)
249
+ if chunk_type == b"IEND":
250
+ break
251
+
252
+ raise ValueError("Not enough PNG data to read header bits.")
253
+
254
+
255
+ def read_post_ids(path):
256
+ with open(path, "r") as handle:
257
+ return [line.strip() for line in handle if line.strip()]
258
+
259
+
260
+ def load_post_ids(args):
261
+ if args:
262
+ if len(args) == 1 and os.path.isfile(args[0]):
263
+ return read_post_ids(args[0])
264
+ return [arg for arg in args if arg.isdigit()]
265
+ default_path = os.path.join(
266
+ os.path.dirname(os.path.abspath(__file__)),
267
+ "..",
268
+ "txt logs",
269
+ "test.txt",
270
+ )
271
+ if os.path.exists(default_path):
272
+ return read_post_ids(default_path)
273
+ return []
274
+
275
+
276
+ def main() -> int:
277
+ try:
278
+ phpsessid = get_phpsessid()
279
+ except Exception as exc:
280
+ print(f"Failed to load PHPSESSID: {exc}")
281
+ return 1
282
+
283
+ post_ids = list(POST_IDS)
284
+ if not post_ids:
285
+ print("No post IDs provided.")
286
+ return 1
287
+
288
+ session = build_session(phpsessid)
289
+
290
+ for post_id in post_ids:
291
+ print(f"Post {post_id}")
292
+ try:
293
+ pages = fetch_post_pages(session, post_id)
294
+ except Exception as exc:
295
+ print(f" Failed to fetch pages: {exc}")
296
+ continue
297
+
298
+ png_pages = []
299
+ for idx, page in enumerate(pages):
300
+ original = page.get("urls", {}).get("original")
301
+ if original and original.lower().endswith(".png"):
302
+ png_pages.append((idx + 1, original))
303
+
304
+ if not png_pages:
305
+ print(" No PNG pages found.")
306
+ continue
307
+
308
+ for page_index, url in png_pages:
309
+ try:
310
+ info = stream_magic_and_length(
311
+ session,
312
+ url,
313
+ f"https://www.pixiv.net/artworks/{post_id}",
314
+ )
315
+ except Exception as exc:
316
+ print(f" Page {page_index}: error {exc}")
317
+ continue
318
+
319
+ magic = info["magic"]
320
+ if magic == MAGIC:
321
+ length_bits = info["length_bits"]
322
+ length_bytes = length_bits // 8 if length_bits is not None else None
323
+ print(
324
+ f" Page {page_index}: magic ok, "
325
+ f"len_bits={length_bits}, len_bytes={length_bytes}, "
326
+ f"width={info['width']}, rows={info['rows_needed']}"
327
+ )
328
+ else:
329
+ magic_ascii = magic.decode("ascii", errors="replace")
330
+ print(
331
+ f" Page {page_index}: magic mismatch "
332
+ f"ascii={magic_ascii!r} hex={magic.hex()}, "
333
+ f"width={info['width']}, rows={info['rows_needed']}"
334
+ )
335
+
336
+ return 0
337
+
338
+
339
+ if __name__ == "__main__":
340
+ raise SystemExit(main())