from playwright.sync_api import sync_playwright import time from typing import Optional import asyncio from sympy import false # from win32gui import FlashWindowEx import json import requests import os def automate_cwall_login( username: str, password: str, headless: bool = False, timeout: int = 30000, keep_open: bool = False ): """ 自动化登录cwall网站的函数 参数: username (str): 用户名 password (str): 密码 headless (bool): 是否无头模式,默认显示浏览器 timeout (int): 超时时间(毫秒),默认30秒 keep_open (bool): 完成后是否保持浏览器打开 """ with sync_playwright() as p: browser = p.chromium.launch(headless=headless) context = browser.new_context() page = context.new_page() try: # 设置默认超时 page.set_default_timeout(timeout) print("正在导航到登录页面...") page.goto("https://cwall.czoffice.top/login.php", wait_until="networkidle") # 输入凭据 print("正在输入用户名和密码...") page.fill("xpath=/html/body/div[2]/form/div[1]/input", username) page.fill("xpath=/html/body/div[2]/form/div[2]/input", password) # 点击登录 print("正在提交登录表单...") page.click("xpath=/html/body/div[2]/form/input[@type='submit']") # 等待登录完成 print("等待登录完成...") try: # 尝试检测登录成功元素 page.wait_for_selector("xpath=//*[contains(text(),'欢迎') or contains(text(),'Dashboard')]", timeout=5000) print("登录成功!") except: print("未检测到欢迎信息,但可能已登录成功") if keep_open: print(f"浏览器将保持打开,按Ctrl+C终止程序...") while True: time.sleep(1) context.storage_state(path="auth.json") return True except Exception as e: print(f"自动化过程中发生错误: {str(e)}") # 出错时截图保存 page.screenshot(path="error_screenshot.png") print("已保存错误截图: error_screenshot.png") return False finally: if not keep_open: browser.close() def cwall_ww_sub( info: str, kind: str, kind_enable: bool = False, image_update_enable: bool = False, image: Optional[str] = None, headless: bool = False, timeout: int = 30000, keep_open: bool = False ) -> bool: """ Cwall WW自动发送参数 参数: info (str): 发送内容 kind_enable (bool): 是否启用分类 kind (str): 分类(只支持一个) image_update_enable (bool): 是否启用图片上传 image (str): 图片路径 headless (bool): 是否无头模式,默认显示浏览器 timeout (int): 超时时间(毫秒),默认30秒 keep_open (bool): 完成后是否保持浏览器打开 """ with sync_playwright() as p: browser = p.chromium.launch(headless=headless) context = browser.new_context(storage_state="auth.json") page = context.new_page() try: page.set_default_timeout(timeout) print("正在导航ww页面...") page.goto("https://cwall.czoffice.top/ww/", wait_until="networkidle") # 输入内容 print("正在输入ww内容...") page.fill("xpath=/html/body/div[3]/textarea", info) # 处理分类 if kind_enable: print(f"正在设置分类: {kind}") page.fill("xpath=/html/body/div[3]/input", kind) page.press("xpath=/html/body/div[3]/input", "Enter") # 处理图片上传 if image_update_enable and image: print(f"正在上传图片: {image}") # 先触发文件选择对话框的打开(如点击上传按钮) # page.click("xpath=/html/body/div[3]/button[1]") # # # 然后等待文件输入框出现并上传 # with page.expect_file_chooser() as fc: # page.click("text=打开(O)") # 触发文件选择器的元素 # file_chooser = fc.value # file_chooser.set_files(image) with page.expect_file_chooser() as fc_info: page.get_by_role("button", name="上传图片").click() file_chooser = fc_info.value file_chooser.set_files(image) # 提交表单 print("正在提交内容...") page.click("xpath=/html/body/div[3]/button[2]") time.sleep(3) # 等待提交完成 # page.wait_for_selector("text=提交成功", timeout=15000) # print("提交成功!") if keep_open: print("浏览器将保持打开,按Ctrl+C退出...") while True: time.sleep(1) return True except Exception as e: print(f"操作失败: {str(e)}") page.screenshot(path="ww_submission_error.png") print("错误截图已保存为 ww_submission_error.png") return False finally: if not keep_open: browser.close() def Get_PHPSESSID( username: str, password: str, headless: bool = False, timeout: int = 30000, keep_open: bool = False ): """ 自动化获得PHPSESSID的函数 参数: username (str): 用户名 password (str): 密码 headless (bool): 是否无头模式,默认显示浏览器 timeout (int): 超时时间(毫秒),默认30秒 keep_open (bool): 完成后是否保持浏览器打开 """ with sync_playwright() as p: browser = p.chromium.launch(headless=headless) context = browser.new_context() page = context.new_page() try: # 设置默认超时 page.set_default_timeout(timeout) print("正在导航到登录页面...") page.goto("https://cwall.czoffice.top/login.php", wait_until="networkidle") # 输入凭据 print("正在输入用户名和密码...") page.fill("xpath=/html/body/div[2]/form/div[1]/input", username) page.fill("xpath=/html/body/div[2]/form/div[2]/input", password) # 点击登录 print("正在提交登录表单...") page.click("xpath=/html/body/div[2]/form/input[@type='submit']") # 等待登录完成 print("等待登录完成...") try: # 尝试检测登录成功元素 page.wait_for_selector("xpath=//*[contains(text(),'欢迎') or contains(text(),'Dashboard')]", timeout=5000) print("登录成功!") except: print("未检测到欢迎信息,但可能已登录成功") if keep_open: print(f"浏览器将保持打开,按Ctrl+C终止程序...") while True: time.sleep(1) context.storage_state(path="auth.json") # 读取auth.json文件 with open('auth.json', 'r', encoding='utf-8') as file: data = json.load(file) # 提取PHPSESSID phpsessid = None for cookie in data['cookies']: if cookie['name'] == 'PHPSESSID': phpsessid = cookie['value'] break if phpsessid: print("提取到的PHPSESSID是:", phpsessid) else: print("未找到PHPSESSID") return True except Exception as e: print(f"自动化过程中发生错误: {str(e)}") # 出错时截图保存 page.screenshot(path="error_screenshot.png") print("已保存错误截图: error_screenshot.png") return False finally: if not keep_open: browser.close() def get_cwall_ww_info( page: str = "1" ): """ 自动化解析Cwall ww内容 参数: page (str): 页码 返回: list: 包含所有ww信息的列表,每个元素是一个字典 """ result = [] #请求Cwall API response = requests.get("https://cwall.czoffice.top/ww/fetch_contents.php?page="+page) # 将响应内容解析为 Python 字典 if response.status_code == 200: data_dict = response.json() # 检查请求是否成功 if data_dict.get('success'): # 遍历 data 列表 for item in data_dict.get('data', []): result.append({ "ID": item.get('id'), "UserID": item.get('user_id'), "用户名": item.get('username'), "内容": item.get('content'), "发布时间": item.get('publish_time'), "点赞数": item.get('like'), "图片列表": item.get('images') }) return result else: print("请求失败") return None else: print(f"请求失败,状态码: {response.status_code}") return None def clear_auth(): if os.path.exists("auth.json"): os.remove("auth.json") print("auth.json已删除") else: print("auth.json不存在") def ww_like( post_id: str, headless: bool = False, timeout: int = 30000, keep_open: bool = False ): """ 自动化ww点赞/取消 参数: Post_id(str): ww的ID headless (bool): 是否无头模式,默认显示浏览器 timeout (int): 超时时间(毫秒),默认30秒 keep_open (bool): 完成后是否保持浏览器打开 """ with sync_playwright() as p: browser = p.chromium.launch(headless=headless) context = browser.new_context(storage_state="auth.json") page = context.new_page() try: page.set_default_timeout(timeout) print("正在导航ww页面...") page.goto("https://cwall.czoffice.top/ww/detail.php?id="+post_id) #点击点赞按钮 print("正在点赞") page.click("xpath=/html/body/div/div[1]/button[1]") time.sleep(1) if keep_open: print(f"浏览器将保持打开,按Ctrl+C终止程序...") while True: time.sleep(1) return True except Exception as e: print("自动化过程出现错误") page.screenshot(path="error-like.png") print("已保存错误截图:error-like.png") return False finally: if not keep_open: browser.close() def ww_Comment( info: str, post_id: str, headless: bool = False, timeout: int = 30000, keep_open: bool = False ): """ 自动化ww评论 参数: info(str): 评论内容 Post_id(str): ww的ID headless (bool): 是否无头模式,默认显示浏览器 timeout (int): 超时时间(毫秒),默认30秒 keep_open (bool): 完成后是否保持浏览器打开 """ with sync_playwright() as p: browser = p.chromium.launch(headless=headless) context = browser.new_context(storage_state="auth.json") page = context.new_page() try: page.set_default_timeout(timeout) print("正在导航ww页面...") page.goto("https://cwall.czoffice.top/ww/detail.php?id=" + post_id) # 输入 page.fill("xpath=/html/body/div/form/textarea",info) #提交 page.click("xpath=/html/body/div/form/button") if keep_open: print(f"浏览器将保持打开,按Ctrl+C终止程序...") while True: time.sleep(1) return True except Exception as e: print("自动化过程出现错误") page.screenshot(path="error-comment.png") print("已保存错误截图:error-comment.png") return False finally: if not keep_open: browser.close() # 使用示例 if __name__ == "__main__": # # 基本用法 # automate_cwall_login( # username="一只fish (づ ●─● )づ", # password="zy842530", # headless=False # ) # cwall_ww_sub( # info="Xes什么私人UI,这个C拉的比面条还长", # 必需 # kind="Open Cwall API", # 必需 # kind_enable=True, # image_update_enable=True, # image="C://Users/张雨.WIN-KF4D01O81O2/desktop/屏幕截图 2025-05-17 185324.png", # 可选 # headless=True, # keep_open=False # ) # 高级用法 - 保持浏览器打开 # automate_cwall_login( # username="testuser", # password="testpass", # keep_open=True # ) # get_cwall_ww_info( # page= "1" # ) # clear_auth() # ww_like( # post_id="3432", # keep_open = True, # timeout=30000, # ) ww_Comment( info="OpenCwallAPI评论测试", # 必需 post_id="3438", # 必需 headless=False, keep_open=False, timeout=30000, )