dyspark / core /tasks.py
cacodex's picture
Upload 35 files
bce2d6d verified
import asyncio
import traceback
from utils.logger import setup_logger
from utils.config import reload_config, reload_userData
from core.msg_builder import build_message
from core.browser import get_browser
complates = {}
logger = setup_logger(level="DEBUG")
CHAT_INPUT_SELECTORS = [
"xpath=//div[contains(@class, 'chat-input-') and @contenteditable='true']",
"xpath=//div[contains(@class, 'chat-input-')]",
"css=div[class*='chat-input-'][contenteditable='true']",
"css=div[contenteditable='true'][role='textbox']",
"css=div[contenteditable='true']",
]
async def retry_operation(name, operation, retries=3, delay=2, *args, **kwargs):
for attempt in range(retries):
try:
return await operation(*args, **kwargs)
except Exception as exc:
if attempt < retries - 1:
logger.warning(f"{name} ???????? {attempt + 1} ?????{exc}")
await asyncio.sleep(delay)
else:
logger.error(f"{name} ????????????????{exc}")
raise
async def scroll_and_select_user(page, username, targets):
friends_tab_selector = 'xpath=//*[@id="sub-app"]/div/div/div[1]/div[2]'
target_selector = 'xpath=//*[@id="sub-app"]/div/div[1]/div[2]/div[2]//div[contains(@class, "semi-list-item-body semi-list-item-body-flex-start")]'
scrollable_friends_selector = 'xpath=//*[@id="sub-app"]/div/div[1]/div[2]/div[2]/div/div/div[3]/div/div/div/ul/div'
no_more_selector = 'xpath=//div[contains(@class, "no-more-tip-")]'
loading_selector = 'xpath=//div[contains(@class, "semi-spin")]'
logger.debug(f"?? {username} ??????????")
logger.debug(f"?? {username} ??????: {targets}")
await page.wait_for_selector(friends_tab_selector)
await page.locator(friends_tab_selector).click()
logger.debug(f"?? {username} ????????")
first_friend_selector = 'xpath=//*[@id="sub-app"]/div/div/div[2]/div[2]/div/div/div[1]/div/div/div/ul/div/div/div[1]/li/div'
await page.wait_for_selector(first_friend_selector)
await page.locator(first_friend_selector).click()
logger.debug(f"?? {username} ??????????????????")
await asyncio.sleep(2)
found_usernames = set()
remaining_targets = set(targets)
empty_scroll_count = 0
max_empty_scrolls = 10
while True:
target_elements = await page.locator(target_selector).all()
prev_found_count = len(found_usernames)
for element in target_elements:
try:
span = element.locator("xpath=.//span[contains(@class, 'item-header-name-')]")
target_name = (await span.inner_text()).strip()
if target_name in found_usernames:
continue
found_usernames.add(target_name)
logger.debug(f"?? {username} ???? {target_name}")
if target_name in targets:
await element.click()
logger.info(f"?? {username} ?????? {target_name}???????")
yield target_name
if target_name in remaining_targets:
remaining_targets.remove(target_name)
if not remaining_targets:
logger.info(f"?? {username} ???????????????")
return
break
except Exception:
traceback.print_exc()
else:
new_found = len(found_usernames) > prev_found_count
if new_found:
empty_scroll_count = 0
else:
empty_scroll_count += 1
if await page.locator(no_more_selector).count() > 0:
logger.info(f"?? {username} ??????????????????")
if remaining_targets:
logger.warning(f"?? {username} ??????????????: {remaining_targets}")
break
if empty_scroll_count >= max_empty_scrolls:
logger.warning(
f"?? {username} ?? {max_empty_scrolls} ?????????????????"
)
if remaining_targets:
logger.warning(f"?? {username} ??????????????: {remaining_targets}")
break
if await page.locator(loading_selector).count() > 0:
logger.debug(f"?? {username} ??????? (Loading)...")
await asyncio.sleep(1.5)
scrollable_element = await page.locator(scrollable_friends_selector).element_handle()
if not scrollable_element:
logger.error(f"?? {username} ??????????")
break
scroll_top_before = await page.evaluate("(element) => element.scrollTop", scrollable_element)
await page.evaluate("(element) => element.scrollTop += 800", scrollable_element)
await asyncio.sleep(0.3)
scroll_top_after = await page.evaluate("(element) => element.scrollTop", scrollable_element)
if scroll_top_before == scroll_top_after:
empty_scroll_count += 2
logger.debug(
f"?? {username} scrollTop ??? ({scroll_top_before})?????? "
f"(?????: {empty_scroll_count}/{max_empty_scrolls})"
)
else:
logger.debug(
f"?? {username} ????????????? "
f"(scrollTop: {scroll_top_before} -> {scroll_top_after})"
)
await asyncio.sleep(1.5)
async def _wait_for_chat_input(page):
last_error = None
for selector in CHAT_INPUT_SELECTORS:
locator = page.locator(selector).first
try:
await locator.wait_for(state="visible", timeout=30000)
return locator, selector
except Exception as exc:
last_error = exc
raise RuntimeError(
"?????????"
f" ??????: {CHAT_INPUT_SELECTORS}; ????: {last_error}"
)
async def _type_message(chat_input, message: str):
await chat_input.click()
try:
await chat_input.press("Control+A")
await chat_input.press("Backspace")
except Exception:
pass
lines = message.split("\n") if message else [""]
for index, line in enumerate(lines):
if line:
await chat_input.type(line)
if index < len(lines) - 1:
await chat_input.press("Shift+Enter")
async def do_user_task(browser, username, cookies, targets, semaphore, config):
async with semaphore:
context = await browser.new_context()
context.set_default_navigation_timeout(120000)
context.set_default_timeout(120000)
try:
page = await context.new_page()
await retry_operation(
"?????????",
page.goto,
retries=3,
delay=5,
url="https://creator.douyin.com/",
)
await context.add_cookies(cookies)
await retry_operation(
"???????",
page.goto,
retries=3,
delay=5,
url="https://creator.douyin.com/creator-micro/data/following/chat",
)
logger.info(f"Account {username} start sending messages")
async for target_name in scroll_and_select_user(page, username, targets):
logger.info(f"Account {username} selected target {target_name}, preparing message")
await asyncio.sleep(0.5)
chat_input, selector = await _wait_for_chat_input(page)
logger.debug(f"Account {username} resolved chat input selector={selector}")
message = build_message(config=config)
await _type_message(chat_input, message)
logger.debug(f"Account {username} ready to send message to {target_name}:\n\t{message}")
await chat_input.press("Enter")
logger.info(f"Account {username} sent message to {target_name}")
await asyncio.sleep(2)
finally:
await context.close()
async def runTasks(config=None, userData=None):
active_config = config if config is not None else reload_config()
active_user_data = userData if userData is not None else reload_userData()
playwright, browser = await get_browser()
try:
logger.info("Task run started with current config")
multi_task = bool(active_config.get("multiTask", True))
task_count = int(active_config.get("taskCount", 1) or 1)
logger.info(f"multiTask={multi_task}, taskCount={task_count}")
logger.info(f"messageTemplate={active_config.get('messageTemplate', '')}")
logger.info(f"hitokotoTypes={active_config.get('hitokotoTypes', [])}")
for user in active_user_data:
logger.info(
f"user={user.get('username', 'unknown')}, targets={user.get('targets', [])}"
)
semaphore = asyncio.Semaphore(task_count if multi_task else 1)
tasks = []
for user in active_user_data:
cookies = user.get("cookies", [])
targets = user.get("targets", [])
unique_id = user.get("unique_id", "")
username = user.get("username", "????")
if not cookies:
logger.warning(f"User {username} has no cookies, skipping")
continue
if not targets:
logger.warning(f"User {username} has no targets, skipping")
continue
complates[unique_id] = []
tasks.append(do_user_task(browser, username, cookies, targets, semaphore, active_config))
if tasks:
await asyncio.gather(*tasks)
else:
logger.warning("No runnable tasks found: empty data, missing cookies, or no targets")
finally:
await browser.close()
await playwright.stop()