q6 commited on
Commit
3bb33f8
·
1 Parent(s): e3cf3a2

Supposedly faster

Browse files
Files changed (1) hide show
  1. Client/hunt.py +84 -17
Client/hunt.py CHANGED
@@ -2,8 +2,10 @@ import json
2
  import os
3
  import sqlite3
4
  import threading
 
5
  from concurrent.futures import ThreadPoolExecutor, as_completed
6
  import shutil
 
7
 
8
  import requests
9
  from tqdm import tqdm
@@ -18,9 +20,14 @@ REQUEST_HEADERS = {
18
  }
19
  MAX_WORKERS = min(16, os.cpu_count() or 8)
20
  REQUEST_TIMEOUT = (120, 120)
 
 
 
 
 
21
  stop_event = threading.Event()
22
 
23
- def read_dotenv_value(path, key):
24
  try:
25
  with open(path, "r") as env_file:
26
  for line in env_file:
@@ -34,7 +41,7 @@ def read_dotenv_value(path, key):
34
  return None
35
  return None
36
 
37
- def get_phpsessid():
38
  phpsessid = os.getenv("PHPSESSID")
39
  if phpsessid:
40
  return phpsessid
@@ -53,7 +60,7 @@ images_cache = set(os.listdir("images/Stash"))
53
 
54
  DB_PATH = "db.sqlite"
55
 
56
- def open_db(path):
57
  conn = sqlite3.connect(path)
58
  conn.execute(
59
  """
@@ -66,11 +73,11 @@ def open_db(path):
66
  conn.commit()
67
  return conn
68
 
69
- def chunked(seq, size):
70
  for i in range(0, len(seq), size):
71
  yield seq[i:i + size]
72
 
73
- def fetch_cached_urls(conn, post_ids):
74
  post_ids_dict = {post_id: None for post_id in post_ids}
75
  if not post_ids:
76
  return post_ids_dict
@@ -83,7 +90,7 @@ def fetch_cached_urls(conn, post_ids):
83
 
84
  return post_ids_dict
85
 
86
- def upsert_urls(conn, rows):
87
  if not rows:
88
  return
89
  conn.executemany(
@@ -95,24 +102,42 @@ def upsert_urls(conn, rows):
95
  rows,
96
  )
97
 
98
- def stream_pixif_updates(post_ids, phpsessid, conn, post_ids_dict, desc, stop_event=stop_event):
99
- if not post_ids:
100
- return
101
- payload = {"post_ids": post_ids, "phpsessid": phpsessid}
 
 
 
 
 
 
 
 
 
102
  try:
103
  with requests.post(
104
  f"{endpoint}/pixif_stream",
105
  json=payload,
106
  stream=True,
107
- timeout=REQUEST_TIMEOUT,
108
  ) as response:
109
  response.raise_for_status()
110
- with tqdm(total=len(post_ids), unit="post", desc=desc) as pbar:
 
 
 
111
  while not stop_event.is_set() and pbar.n < pbar.total:
 
 
112
  try:
113
  for line in response.iter_lines(decode_unicode=True):
114
  if stop_event.is_set():
115
  break
 
 
 
 
116
  if not line:
117
  continue
118
  try:
@@ -133,12 +158,21 @@ def stream_pixif_updates(post_ids, phpsessid, conn, post_ids_dict, desc, stop_ev
133
  upsert_urls(conn, [(post_id, url)])
134
  post_ids_dict[post_id] = url
135
  pbar.update(1)
 
 
 
 
 
136
  break
137
  except requests.exceptions.ReadTimeout:
 
 
 
138
  continue
139
  except KeyboardInterrupt:
140
  stop_event.set()
141
  raise
 
142
 
143
  conn = open_db(DB_PATH)
144
 
@@ -155,12 +189,18 @@ for inp in inputs:
155
  elif inp.isdigit():
156
  indexs.append(int(inp) - 1)
157
 
158
- def build_image_url(url):
159
  if url.startswith("http"):
160
  return url
161
  return IMG_BASE + url
162
 
163
- def download_one_image(post_id, url, dest_dir, phpsessid, stop_event=stop_event):
 
 
 
 
 
 
164
  dest_path = os.path.join(dest_dir, f"{post_id}.png")
165
  if os.path.exists(dest_path):
166
  return post_id, "exists", None
@@ -201,7 +241,13 @@ def download_one_image(post_id, url, dest_dir, phpsessid, stop_event=stop_event)
201
  pass
202
  return post_id, "error", str(exc)
203
 
204
- def download_images(to_download, dest_dir, phpsessid, max_workers=MAX_WORKERS, stop_event=stop_event):
 
 
 
 
 
 
205
  if not to_download:
206
  return
207
 
@@ -234,11 +280,32 @@ def download_images(to_download, dest_dir, phpsessid, max_workers=MAX_WORKERS, s
234
  if not interrupted:
235
  executor.shutdown(wait=True)
236
 
237
- def decode_if_binary(val):
238
  if type(val) is bytes:
239
  return val.decode()
240
  return val
241
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
242
  try:
243
  for index in indexs:
244
  group_name = valid[index].rsplit(".", 1)[0]
@@ -260,7 +327,7 @@ try:
260
  if filtered:
261
  if dry_run:
262
  print("Dry run outputs (post_id -> page):")
263
- stream_pixif_updates(filtered, phpsessid, conn, post_ids_dict, "Scanning", stop_event)
264
  if dry_run:
265
  continue
266
  to_download = {post_id: decode_if_binary(url) for post_id, url in post_ids_dict.items() if url and f"{post_id}.png" not in images_cache}
 
2
  import os
3
  import sqlite3
4
  import threading
5
+ import time
6
  from concurrent.futures import ThreadPoolExecutor, as_completed
7
  import shutil
8
+ from typing import Dict, Iterator, List, Optional, Sequence, Tuple, Union
9
 
10
  import requests
11
  from tqdm import tqdm
 
20
  }
21
  MAX_WORKERS = min(16, os.cpu_count() or 8)
22
  REQUEST_TIMEOUT = (120, 120)
23
+ STREAM_REQUEST_TIMEOUT = (120, 30)
24
+ STREAM_IDLE_TIMEOUT_SECONDS = 45
25
+ STREAM_MAX_READ_TIMEOUTS = 3
26
+ STREAM_MAX_RETRIES = 3
27
+ STREAM_RETRY_DELAY_SECONDS = 2
28
  stop_event = threading.Event()
29
 
30
+ def read_dotenv_value(path: str, key: str) -> Optional[str]:
31
  try:
32
  with open(path, "r") as env_file:
33
  for line in env_file:
 
41
  return None
42
  return None
43
 
44
+ def get_phpsessid() -> str:
45
  phpsessid = os.getenv("PHPSESSID")
46
  if phpsessid:
47
  return phpsessid
 
60
 
61
  DB_PATH = "db.sqlite"
62
 
63
+ def open_db(path: str) -> sqlite3.Connection:
64
  conn = sqlite3.connect(path)
65
  conn.execute(
66
  """
 
73
  conn.commit()
74
  return conn
75
 
76
+ def chunked(seq: Sequence[str], size: int) -> Iterator[List[str]]:
77
  for i in range(0, len(seq), size):
78
  yield seq[i:i + size]
79
 
80
+ def fetch_cached_urls(conn: sqlite3.Connection, post_ids: Sequence[str]) -> Dict[str, Optional[str]]:
81
  post_ids_dict = {post_id: None for post_id in post_ids}
82
  if not post_ids:
83
  return post_ids_dict
 
90
 
91
  return post_ids_dict
92
 
93
+ def upsert_urls(conn: sqlite3.Connection, rows: Sequence[Tuple[str, str]]) -> None:
94
  if not rows:
95
  return
96
  conn.executemany(
 
102
  rows,
103
  )
104
 
105
+ def stream_pixif_updates(
106
+ post_ids: Sequence[str],
107
+ phpsessid: str,
108
+ conn: sqlite3.Connection,
109
+ post_ids_dict: Dict[str, Optional[str]],
110
+ desc: str,
111
+ stop_event: threading.Event = stop_event,
112
+ ) -> int:
113
+ post_ids_list = list(post_ids)
114
+ if not post_ids_list:
115
+ return 0
116
+ payload = {"post_ids": post_ids_list, "phpsessid": phpsessid}
117
+ processed = 0
118
  try:
119
  with requests.post(
120
  f"{endpoint}/pixif_stream",
121
  json=payload,
122
  stream=True,
123
+ timeout=STREAM_REQUEST_TIMEOUT,
124
  ) as response:
125
  response.raise_for_status()
126
+ with tqdm(total=len(post_ids_list), unit="post", desc=desc) as pbar:
127
+ last_progress = time.monotonic()
128
+ consecutive_timeouts = 0
129
+ idle_break = False
130
  while not stop_event.is_set() and pbar.n < pbar.total:
131
+ if time.monotonic() - last_progress > STREAM_IDLE_TIMEOUT_SECONDS:
132
+ break
133
  try:
134
  for line in response.iter_lines(decode_unicode=True):
135
  if stop_event.is_set():
136
  break
137
+ now = time.monotonic()
138
+ if now - last_progress > STREAM_IDLE_TIMEOUT_SECONDS:
139
+ idle_break = True
140
+ break
141
  if not line:
142
  continue
143
  try:
 
158
  upsert_urls(conn, [(post_id, url)])
159
  post_ids_dict[post_id] = url
160
  pbar.update(1)
161
+ processed += 1
162
+ last_progress = now
163
+ consecutive_timeouts = 0
164
+ if idle_break:
165
+ break
166
  break
167
  except requests.exceptions.ReadTimeout:
168
+ consecutive_timeouts += 1
169
+ if consecutive_timeouts >= STREAM_MAX_READ_TIMEOUTS:
170
+ break
171
  continue
172
  except KeyboardInterrupt:
173
  stop_event.set()
174
  raise
175
+ return processed
176
 
177
  conn = open_db(DB_PATH)
178
 
 
189
  elif inp.isdigit():
190
  indexs.append(int(inp) - 1)
191
 
192
+ def build_image_url(url: str) -> str:
193
  if url.startswith("http"):
194
  return url
195
  return IMG_BASE + url
196
 
197
+ def download_one_image(
198
+ post_id: str,
199
+ url: str,
200
+ dest_dir: str,
201
+ phpsessid: str,
202
+ stop_event: threading.Event = stop_event,
203
+ ) -> Tuple[str, str, Optional[str]]:
204
  dest_path = os.path.join(dest_dir, f"{post_id}.png")
205
  if os.path.exists(dest_path):
206
  return post_id, "exists", None
 
241
  pass
242
  return post_id, "error", str(exc)
243
 
244
+ def download_images(
245
+ to_download: Dict[str, str],
246
+ dest_dir: str,
247
+ phpsessid: str,
248
+ max_workers: int = MAX_WORKERS,
249
+ stop_event: threading.Event = stop_event,
250
+ ) -> None:
251
  if not to_download:
252
  return
253
 
 
280
  if not interrupted:
281
  executor.shutdown(wait=True)
282
 
283
+ def decode_if_binary(val: Union[str, bytes]) -> str:
284
  if type(val) is bytes:
285
  return val.decode()
286
  return val
287
 
288
+ def scan_with_retries(
289
+ post_ids: Sequence[str],
290
+ phpsessid: str,
291
+ conn: sqlite3.Connection,
292
+ post_ids_dict: Dict[str, Optional[str]],
293
+ desc: str,
294
+ stop_event: threading.Event = stop_event,
295
+ ) -> None:
296
+ if not post_ids:
297
+ return
298
+ remaining = list(post_ids)
299
+ attempts = 0
300
+ while remaining and attempts < STREAM_MAX_RETRIES and not stop_event.is_set():
301
+ stream_pixif_updates(remaining, phpsessid, conn, post_ids_dict, desc, stop_event)
302
+ remaining = [post_id for post_id in remaining if post_ids_dict.get(post_id) is None]
303
+ if not remaining:
304
+ break
305
+ attempts += 1
306
+ if STREAM_RETRY_DELAY_SECONDS:
307
+ time.sleep(STREAM_RETRY_DELAY_SECONDS)
308
+
309
  try:
310
  for index in indexs:
311
  group_name = valid[index].rsplit(".", 1)[0]
 
327
  if filtered:
328
  if dry_run:
329
  print("Dry run outputs (post_id -> page):")
330
+ scan_with_retries(filtered, phpsessid, conn, post_ids_dict, "Scanning", stop_event)
331
  if dry_run:
332
  continue
333
  to_download = {post_id: decode_if_binary(url) for post_id, url in post_ids_dict.items() if url and f"{post_id}.png" not in images_cache}