open-cwall-api / Wright.py
ZHZ1024's picture
Update Wright.py
852cc01 verified
from playwright.sync_api import sync_playwright
import time
from typing import Optional
import asyncio
from sympy import false
# from win32gui import FlashWindowEx
import json
import requests
import os
def automate_cwall_login(
username: str,
password: str,
headless: bool = False,
timeout: int = 30000,
keep_open: bool = False
):
"""
自动化登录cwall网站的函数
参数:
username (str): 用户名
password (str): 密码
headless (bool): 是否无头模式,默认显示浏览器
timeout (int): 超时时间(毫秒),默认30秒
keep_open (bool): 完成后是否保持浏览器打开
"""
with sync_playwright() as p:
browser = p.chromium.launch(headless=headless)
context = browser.new_context()
page = context.new_page()
try:
# 设置默认超时
page.set_default_timeout(timeout)
print("正在导航到登录页面...")
page.goto("https://cwall.czoffice.top/login.php", wait_until="networkidle")
# 输入凭据
print("正在输入用户名和密码...")
page.fill("xpath=/html/body/div[2]/form/div[1]/input", username)
page.fill("xpath=/html/body/div[2]/form/div[2]/input", password)
# 点击登录
print("正在提交登录表单...")
page.click("xpath=/html/body/div[2]/form/input[@type='submit']")
# 等待登录完成
print("等待登录完成...")
try:
# 尝试检测登录成功元素
page.wait_for_selector("xpath=//*[contains(text(),'欢迎') or contains(text(),'Dashboard')]", timeout=5000)
print("登录成功!")
except:
print("未检测到欢迎信息,但可能已登录成功")
if keep_open:
print(f"浏览器将保持打开,按Ctrl+C终止程序...")
while True:
time.sleep(1)
context.storage_state(path="auth.json")
return True
except Exception as e:
print(f"自动化过程中发生错误: {str(e)}")
# 出错时截图保存
page.screenshot(path="error_screenshot.png")
print("已保存错误截图: error_screenshot.png")
return False
finally:
if not keep_open:
browser.close()
def cwall_ww_sub(
info: str,
kind: str,
kind_enable: bool = False,
image_update_enable: bool = False,
image: Optional[str] = None,
headless: bool = False,
timeout: int = 30000,
keep_open: bool = False
) -> bool:
"""
Cwall WW自动发送参数
参数:
info (str): 发送内容
kind_enable (bool): 是否启用分类
kind (str): 分类(只支持一个)
image_update_enable (bool): 是否启用图片上传
image (str): 图片路径
headless (bool): 是否无头模式,默认显示浏览器
timeout (int): 超时时间(毫秒),默认30秒
keep_open (bool): 完成后是否保持浏览器打开
"""
with sync_playwright() as p:
browser = p.chromium.launch(headless=headless)
context = browser.new_context(storage_state="auth.json")
page = context.new_page()
try:
page.set_default_timeout(timeout)
print("正在导航ww页面...")
page.goto("https://cwall.czoffice.top/ww/", wait_until="networkidle")
# 输入内容
print("正在输入ww内容...")
page.fill("xpath=/html/body/div[3]/textarea", info)
# 处理分类
if kind_enable:
print(f"正在设置分类: {kind}")
page.fill("xpath=/html/body/div[3]/input", kind)
page.press("xpath=/html/body/div[3]/input", "Enter")
# 处理图片上传
if image_update_enable and image:
print(f"正在上传图片: {image}")
# 先触发文件选择对话框的打开(如点击上传按钮)
# page.click("xpath=/html/body/div[3]/button[1]")
#
# # 然后等待文件输入框出现并上传
# with page.expect_file_chooser() as fc:
# page.click("text=打开(O)") # 触发文件选择器的元素
# file_chooser = fc.value
# file_chooser.set_files(image)
with page.expect_file_chooser() as fc_info:
page.get_by_role("button", name="上传图片").click()
file_chooser = fc_info.value
file_chooser.set_files(image)
# 提交表单
print("正在提交内容...")
page.click("xpath=/html/body/div[3]/button[2]")
time.sleep(3)
# 等待提交完成
# page.wait_for_selector("text=提交成功", timeout=15000)
# print("提交成功!")
if keep_open:
print("浏览器将保持打开,按Ctrl+C退出...")
while True:
time.sleep(1)
return True
except Exception as e:
print(f"操作失败: {str(e)}")
page.screenshot(path="ww_submission_error.png")
print("错误截图已保存为 ww_submission_error.png")
return False
finally:
if not keep_open:
browser.close()
def Get_PHPSESSID(
username: str,
password: str,
headless: bool = False,
timeout: int = 30000,
keep_open: bool = False
):
"""
自动化获得PHPSESSID的函数
参数:
username (str): 用户名
password (str): 密码
headless (bool): 是否无头模式,默认显示浏览器
timeout (int): 超时时间(毫秒),默认30秒
keep_open (bool): 完成后是否保持浏览器打开
"""
with sync_playwright() as p:
browser = p.chromium.launch(headless=headless)
context = browser.new_context()
page = context.new_page()
try:
# 设置默认超时
page.set_default_timeout(timeout)
print("正在导航到登录页面...")
page.goto("https://cwall.czoffice.top/login.php", wait_until="networkidle")
# 输入凭据
print("正在输入用户名和密码...")
page.fill("xpath=/html/body/div[2]/form/div[1]/input", username)
page.fill("xpath=/html/body/div[2]/form/div[2]/input", password)
# 点击登录
print("正在提交登录表单...")
page.click("xpath=/html/body/div[2]/form/input[@type='submit']")
# 等待登录完成
print("等待登录完成...")
try:
# 尝试检测登录成功元素
page.wait_for_selector("xpath=//*[contains(text(),'欢迎') or contains(text(),'Dashboard')]",
timeout=5000)
print("登录成功!")
except:
print("未检测到欢迎信息,但可能已登录成功")
if keep_open:
print(f"浏览器将保持打开,按Ctrl+C终止程序...")
while True:
time.sleep(1)
context.storage_state(path="auth.json")
# 读取auth.json文件
with open('auth.json', 'r', encoding='utf-8') as file:
data = json.load(file)
# 提取PHPSESSID
phpsessid = None
for cookie in data['cookies']:
if cookie['name'] == 'PHPSESSID':
phpsessid = cookie['value']
break
if phpsessid:
print("提取到的PHPSESSID是:", phpsessid)
else:
print("未找到PHPSESSID")
return True
except Exception as e:
print(f"自动化过程中发生错误: {str(e)}")
# 出错时截图保存
page.screenshot(path="error_screenshot.png")
print("已保存错误截图: error_screenshot.png")
return False
finally:
if not keep_open:
browser.close()
def get_cwall_ww_info(
page: str = "1"
):
"""
自动化解析Cwall ww内容
参数:
page (str): 页码
返回:
list: 包含所有ww信息的列表,每个元素是一个字典
"""
result = []
#请求Cwall API
response = requests.get("https://cwall.czoffice.top/ww/fetch_contents.php?page="+page)
# 将响应内容解析为 Python 字典
if response.status_code == 200:
data_dict = response.json()
# 检查请求是否成功
if data_dict.get('success'):
# 遍历 data 列表
for item in data_dict.get('data', []):
result.append({
"ID": item.get('id'),
"UserID": item.get('user_id'),
"用户名": item.get('username'),
"内容": item.get('content'),
"发布时间": item.get('publish_time'),
"点赞数": item.get('like'),
"图片列表": item.get('images')
})
return result
else:
print("请求失败")
return None
else:
print(f"请求失败,状态码: {response.status_code}")
return None
def clear_auth():
if os.path.exists("auth.json"):
os.remove("auth.json")
print("auth.json已删除")
else:
print("auth.json不存在")
def ww_like(
post_id: str,
headless: bool = False,
timeout: int = 30000,
keep_open: bool = False
):
"""
自动化ww点赞/取消
参数:
Post_id(str): ww的ID
headless (bool): 是否无头模式,默认显示浏览器
timeout (int): 超时时间(毫秒),默认30秒
keep_open (bool): 完成后是否保持浏览器打开
"""
with sync_playwright() as p:
browser = p.chromium.launch(headless=headless)
context = browser.new_context(storage_state="auth.json")
page = context.new_page()
try:
page.set_default_timeout(timeout)
print("正在导航ww页面...")
page.goto("https://cwall.czoffice.top/ww/detail.php?id="+post_id)
#点击点赞按钮
print("正在点赞")
page.click("xpath=/html/body/div/div[1]/button[1]")
time.sleep(1)
if keep_open:
print(f"浏览器将保持打开,按Ctrl+C终止程序...")
while True:
time.sleep(1)
return True
except Exception as e:
print("自动化过程出现错误")
page.screenshot(path="error-like.png")
print("已保存错误截图:error-like.png")
return False
finally:
if not keep_open:
browser.close()
def ww_Comment(
info: str,
post_id: str,
headless: bool = False,
timeout: int = 30000,
keep_open: bool = False
):
"""
自动化ww评论
参数:
info(str): 评论内容
Post_id(str): ww的ID
headless (bool): 是否无头模式,默认显示浏览器
timeout (int): 超时时间(毫秒),默认30秒
keep_open (bool): 完成后是否保持浏览器打开
"""
with sync_playwright() as p:
browser = p.chromium.launch(headless=headless)
context = browser.new_context(storage_state="auth.json")
page = context.new_page()
try:
page.set_default_timeout(timeout)
print("正在导航ww页面...")
page.goto("https://cwall.czoffice.top/ww/detail.php?id=" + post_id)
# 输入
page.fill("xpath=/html/body/div/form/textarea",info)
#提交
page.click("xpath=/html/body/div/form/button")
if keep_open:
print(f"浏览器将保持打开,按Ctrl+C终止程序...")
while True:
time.sleep(1)
return True
except Exception as e:
print("自动化过程出现错误")
page.screenshot(path="error-comment.png")
print("已保存错误截图:error-comment.png")
return False
finally:
if not keep_open:
browser.close()
# 使用示例
if __name__ == "__main__":
# # 基本用法
# automate_cwall_login(
# username="一只fish (づ ●─● )づ",
# password="zy842530",
# headless=False
# )
# cwall_ww_sub(
# info="Xes什么私人UI,这个C拉的比面条还长", # 必需
# kind="Open Cwall API", # 必需
# kind_enable=True,
# image_update_enable=True,
# image="C://Users/张雨.WIN-KF4D01O81O2/desktop/屏幕截图 2025-05-17 185324.png", # 可选
# headless=True,
# keep_open=False
# )
# 高级用法 - 保持浏览器打开
# automate_cwall_login(
# username="testuser",
# password="testpass",
# keep_open=True
# )
# get_cwall_ww_info(
# page= "1"
# )
# clear_auth()
# ww_like(
# post_id="3432",
# keep_open = True,
# timeout=30000,
# )
ww_Comment(
info="OpenCwallAPI评论测试", # 必需
post_id="3438", # 必需
headless=False,
keep_open=False,
timeout=30000,
)