bibibi12345 commited on
Commit
9e5c54d
·
1 Parent(s): 8794f06

proxy pool race condition fixed

Browse files
Files changed (1) hide show
  1. freeplay2api.py +24 -51
freeplay2api.py CHANGED
@@ -343,63 +343,45 @@ class KeyMaintainer(threading.Thread):
343
  while True:
344
  try:
345
  logging.info("KeyMaintainer: Starting maintenance cycle.")
 
 
346
  accounts = self.manager.get_all_accounts()
347
-
348
- # 更新所有账户余额
349
  for account in accounts:
350
  balance = self.client.check_balance(account["session_id"])
351
  if balance != account.get("balance"):
352
  account["balance"] = balance
353
  self.manager.update_account(account)
354
- logging.info(
355
- f"Account {account['email']} balance updated to ${balance:.4f}"
356
- )
 
 
 
 
 
 
357
 
358
- # 检查是否需要注册新账号
359
- healthy_accounts = [
360
- acc
361
- for acc in self.manager.get_all_accounts()
362
- if acc.get("balance", 0) > self.config["LOW_BALANCE_THRESHOLD"]
363
- ]
364
- needed = self.config["ACTIVE_KEY_THRESHOLD"] - len(healthy_accounts)
365
 
366
- while needed > 0:
367
- logging.info(
368
- f"Healthy accounts ({len(healthy_accounts)}) below threshold ({self.config['ACTIVE_KEY_THRESHOLD']}). Need to register {needed} new accounts."
369
- )
370
 
371
- # Determine how many to register in this batch
372
  batch_size = min(needed, self.config["REGISTRATION_CONCURRENCY"])
373
-
374
  with concurrent.futures.ThreadPoolExecutor(max_workers=batch_size) as executor:
375
  futures = [executor.submit(self.client.register) for _ in range(batch_size)]
376
 
377
- for future in concurrent.futures.as_completed(futures):
378
- new_account = future.result()
379
- if new_account:
380
- self.manager.add_account(new_account)
381
-
382
- # Re-evaluate after the batch
383
- healthy_accounts = [
384
- acc
385
- for acc in self.manager.get_all_accounts()
386
- if acc.get("balance", 0) > self.config["LOW_BALANCE_THRESHOLD"]
387
- ]
388
- needed = self.config["ACTIVE_KEY_THRESHOLD"] - len(healthy_accounts)
389
-
390
- if needed > 0:
391
- logging.info(f"{needed} more accounts still needed, retrying...")
392
- time.sleep(5) # Wait a bit before the next batch
393
 
394
- if not needed > 0:
395
- logging.info(
396
- f"Sufficient healthy accounts ({len(healthy_accounts)}). No registration needed."
397
- )
398
 
399
  except Exception as e:
400
  logging.error(f"Error in KeyMaintainer cycle: {e}")
401
 
402
- time.sleep(self.config["CHECK_INTERVAL_SECONDS"])
 
403
 
404
 
405
  # --- FastAPI应用 ---
@@ -464,19 +446,10 @@ def initialize_app():
464
 
465
  # 5. 启动后台维护线程
466
  maintainer = KeyMaintainer(account_manager, freeplay_client, config)
467
- if not config.get("USE_PROXY_POOL") or (proxy_pool and proxy_pool.is_initialized):
468
- maintainer.start()
469
- logging.info("Key maintenance service started.")
470
- else:
471
- # If using proxy pool, delay maintainer start until pool is ready
472
- threading.Thread(target=self.wait_for_pool_and_start_maintainer, args=(maintainer, proxy_pool)).start()
473
-
474
 
475
- def wait_for_pool_and_start_maintainer(self, maintainer, pool):
476
- while not pool.is_initialized:
477
- time.sleep(2)
478
- maintainer.start()
479
- logging.info("Key maintenance service started after proxy pool initialization.")
480
 
481
  async def authenticate_client(auth: HTTPAuthorizationCredentials = Depends(security)):
482
  if not auth or auth.credentials not in valid_client_keys:
 
343
  while True:
344
  try:
345
  logging.info("KeyMaintainer: Starting maintenance cycle.")
346
+
347
+ # First, update balances of existing accounts
348
  accounts = self.manager.get_all_accounts()
 
 
349
  for account in accounts:
350
  balance = self.client.check_balance(account["session_id"])
351
  if balance != account.get("balance"):
352
  account["balance"] = balance
353
  self.manager.update_account(account)
354
+ logging.info(f"Account {account['email']} balance updated to ${balance:.4f}")
355
+
356
+ # Now, check if we need to register new accounts and loop until we have enough
357
+ while True:
358
+ healthy_accounts = [
359
+ acc for acc in self.manager.get_all_accounts()
360
+ if acc.get("balance", 0) > self.config["LOW_BALANCE_THRESHOLD"]
361
+ ]
362
+ needed = self.config["ACTIVE_KEY_THRESHOLD"] - len(healthy_accounts)
363
 
364
+ if needed <= 0:
365
+ logging.info(f"Sufficient healthy accounts ({len(healthy_accounts)}). No registration needed for now.")
366
+ break
 
 
 
 
367
 
368
+ logging.info(f"Healthy accounts ({len(healthy_accounts)}) below threshold ({self.config['ACTIVE_KEY_THRESHOLD']}). Need to register {needed} new accounts.")
 
 
 
369
 
 
370
  batch_size = min(needed, self.config["REGISTRATION_CONCURRENCY"])
 
371
  with concurrent.futures.ThreadPoolExecutor(max_workers=batch_size) as executor:
372
  futures = [executor.submit(self.client.register) for _ in range(batch_size)]
373
 
374
+ # Wait for the batch to complete
375
+ concurrent.futures.wait(futures)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
376
 
377
+ logging.info("Registration batch finished. Re-checking healthy account count.")
378
+ # The loop will now re-evaluate the 'needed' count
 
 
379
 
380
  except Exception as e:
381
  logging.error(f"Error in KeyMaintainer cycle: {e}")
382
 
383
+ logging.info(f"KeyMaintainer cycle finished. Waiting for {self.config['CHECK_INTERVAL_SECONDS']} seconds.")
384
+ time.sleep(self.config['CHECK_INTERVAL_SECONDS'])
385
 
386
 
387
  # --- FastAPI应用 ---
 
446
 
447
  # 5. 启动后台维护线程
448
  maintainer = KeyMaintainer(account_manager, freeplay_client, config)
449
+ # Always start the maintainer, it will wait for the pool internally if needed.
450
+ maintainer.start()
451
+ logging.info("Key maintenance service started.")
 
 
 
 
452
 
 
 
 
 
 
453
 
454
  async def authenticate_client(auth: HTTPAuthorizationCredentials = Depends(security)):
455
  if not auth or auth.credentials not in valid_client_keys: