bibibi12345 commited on
Commit
7bb8ae9
·
1 Parent(s): 9e5c54d

proxy pool race condition fixed

Browse files
Files changed (1) hide show
  1. freeplay2api.py +45 -32
freeplay2api.py CHANGED
@@ -343,45 +343,49 @@ class KeyMaintainer(threading.Thread):
343
  while True:
344
  try:
345
  logging.info("KeyMaintainer: Starting maintenance cycle.")
346
-
347
- # First, update balances of existing accounts
348
  accounts = self.manager.get_all_accounts()
 
 
349
  for account in accounts:
350
  balance = self.client.check_balance(account["session_id"])
351
  if balance != account.get("balance"):
352
  account["balance"] = balance
353
  self.manager.update_account(account)
354
- logging.info(f"Account {account['email']} balance updated to ${balance:.4f}")
355
-
356
- # Now, check if we need to register new accounts and loop until we have enough
357
- while True:
358
- healthy_accounts = [
359
- acc for acc in self.manager.get_all_accounts()
360
- if acc.get("balance", 0) > self.config["LOW_BALANCE_THRESHOLD"]
361
- ]
362
- needed = self.config["ACTIVE_KEY_THRESHOLD"] - len(healthy_accounts)
363
-
364
- if needed <= 0:
365
- logging.info(f"Sufficient healthy accounts ({len(healthy_accounts)}). No registration needed for now.")
366
- break
367
-
368
- logging.info(f"Healthy accounts ({len(healthy_accounts)}) below threshold ({self.config['ACTIVE_KEY_THRESHOLD']}). Need to register {needed} new accounts.")
369
-
370
- batch_size = min(needed, self.config["REGISTRATION_CONCURRENCY"])
371
- with concurrent.futures.ThreadPoolExecutor(max_workers=batch_size) as executor:
372
- futures = [executor.submit(self.client.register) for _ in range(batch_size)]
373
-
374
- # Wait for the batch to complete
375
- concurrent.futures.wait(futures)
376
-
377
- logging.info("Registration batch finished. Re-checking healthy account count.")
378
- # The loop will now re-evaluate the 'needed' count
 
 
 
 
 
379
 
380
  except Exception as e:
381
  logging.error(f"Error in KeyMaintainer cycle: {e}")
382
 
383
- logging.info(f"KeyMaintainer cycle finished. Waiting for {self.config['CHECK_INTERVAL_SECONDS']} seconds.")
384
- time.sleep(self.config['CHECK_INTERVAL_SECONDS'])
385
 
386
 
387
  # --- FastAPI应用 ---
@@ -435,8 +439,18 @@ def initialize_app():
435
  logging.info("Initializing proxy pool...")
436
  proxy_pool_config = config.get("PROXY_POOL_CONFIG", {})
437
  proxy_pool = ProxyPool(proxy_pool_config)
438
- proxy_pool.initialize()
439
- logging.info("Proxy pool initialized.")
 
 
 
 
 
 
 
 
 
 
440
  else:
441
  logging.info("Proxy pool is disabled in config.")
442
 
@@ -446,7 +460,6 @@ def initialize_app():
446
 
447
  # 5. 启动后台维护线程
448
  maintainer = KeyMaintainer(account_manager, freeplay_client, config)
449
- # Always start the maintainer, it will wait for the pool internally if needed.
450
  maintainer.start()
451
  logging.info("Key maintenance service started.")
452
 
 
343
  while True:
344
  try:
345
  logging.info("KeyMaintainer: Starting maintenance cycle.")
 
 
346
  accounts = self.manager.get_all_accounts()
347
+
348
+ # 更新所有账户余额
349
  for account in accounts:
350
  balance = self.client.check_balance(account["session_id"])
351
  if balance != account.get("balance"):
352
  account["balance"] = balance
353
  self.manager.update_account(account)
354
+ logging.info(
355
+ f"Account {account['email']} balance updated to ${balance:.4f}"
356
+ )
357
+
358
+ # 检查是否需要注册新账号
359
+ healthy_accounts = [
360
+ acc
361
+ for acc in self.manager.get_all_accounts()
362
+ if acc.get("balance", 0) > self.config["LOW_BALANCE_THRESHOLD"]
363
+ ]
364
+ needed = self.config["ACTIVE_KEY_THRESHOLD"] - len(healthy_accounts)
365
+
366
+ if needed > 0:
367
+ logging.info(
368
+ f"Healthy accounts ({len(healthy_accounts)}) below threshold ({self.config['ACTIVE_KEY_THRESHOLD']}). Need to register {needed} new accounts."
369
+ )
370
+ with concurrent.futures.ThreadPoolExecutor(
371
+ max_workers=self.config["REGISTRATION_CONCURRENCY"]
372
+ ) as executor:
373
+ futures = [
374
+ executor.submit(self.client.register) for _ in range(needed)
375
+ ]
376
+ for future in concurrent.futures.as_completed(futures):
377
+ new_account = future.result()
378
+ if new_account:
379
+ self.manager.add_account(new_account)
380
+ else:
381
+ logging.info(
382
+ f"Sufficient healthy accounts ({len(healthy_accounts)}). No registration needed."
383
+ )
384
 
385
  except Exception as e:
386
  logging.error(f"Error in KeyMaintainer cycle: {e}")
387
 
388
+ time.sleep(self.config["CHECK_INTERVAL_SECONDS"])
 
389
 
390
 
391
  # --- FastAPI应用 ---
 
439
  logging.info("Initializing proxy pool...")
440
  proxy_pool_config = config.get("PROXY_POOL_CONFIG", {})
441
  proxy_pool = ProxyPool(proxy_pool_config)
442
+
443
+ # Start initialization in a separate thread
444
+ init_thread = threading.Thread(target=proxy_pool.initialize, daemon=True)
445
+ init_thread.start()
446
+
447
+ # Wait for the pool to be initialized
448
+ logging.info("Waiting for proxy pool to complete initial refill...")
449
+ while not proxy_pool.is_initialized:
450
+ time.sleep(1)
451
+ init_thread.join() # Ensure initialization thread is complete
452
+ logging.info("Proxy pool initialization process finished.")
453
+
454
  else:
455
  logging.info("Proxy pool is disabled in config.")
456
 
 
460
 
461
  # 5. 启动后台维护线程
462
  maintainer = KeyMaintainer(account_manager, freeplay_client, config)
 
463
  maintainer.start()
464
  logging.info("Key maintenance service started.")
465