diff --git a/.gitattributes b/.gitattributes index a6344aac8c09253b3b630fb776ae94478aa0275b..fae66d1c313617a6eacb0a742c495013f82318c2 100644 --- a/.gitattributes +++ b/.gitattributes @@ -33,3 +33,4 @@ saved_model/**/* filter=lfs diff=lfs merge=lfs -text *.zip filter=lfs diff=lfs merge=lfs -text *.zst filter=lfs diff=lfs merge=lfs -text *tfevents* filter=lfs diff=lfs merge=lfs -text +*.js filter=lfs diff=lfs merge=lfs -text diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..eec7996732528a322f354e7267a91324bd263753 --- /dev/null +++ b/.gitignore @@ -0,0 +1,31 @@ +__pycache__ +node_modules/ +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +MANIFEST +*.manifest +*.spec +.cache +*.log +local_settings.py +db.sqlite3 +__pypackages__/ +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..7ed41384b278ca7df2009f76a78ceeff248b6240 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,30 @@ +FROM python:3.11-slim + +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl \ + ca-certificates \ + gnupg \ + && rm -rf /var/lib/apt/lists/* + +RUN curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \ + && apt-get install -y --no-install-recommends nodejs \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt \ + && playwright install chromium \ + && playwright install-deps chromium + +COPY package.json . +RUN npm install + +COPY . . + +ENV PYTHONPATH=/app +ENV PYTHONUNBUFFERED=1 + +EXPOSE 7860 + +CMD ["uvicorn", "service.app:app", "--host", "0.0.0.0", "--port", "7860"] diff --git a/PRD.md b/PRD.md new file mode 100644 index 0000000000000000000000000000000000000000..123b6084684ef681e7b8049698e693a0dd059d32 --- /dev/null +++ b/PRD.md @@ -0,0 +1,169 @@ +# 小红书稳定采集微服务 (Spider_XHS) - 产品需求文档 (PRD) + +## 1. 产品概述 + +### 1.1 产品名称 +小红书稳定采集微服务 (Spider_XHS Stability Data Service) + +### 1.2 产品定位 +一款面向企业级大模型(AI Agent)、数据分析、内容运营等上游业务,提供**高可用、抗风控、全链路可溯源**的小红书数据采集基础设施。本产品将底层的反爬对抗与数据清洗封装为标准 RESTful API,使得上游业务无需关注账号、风控及底层协议细节,即可稳定获取所需数据。 + +### 1.3 背景与痛点 +- **风控严苛**:小红书针对协议级 API 采集有着极强的反爬策略(封号、IP 限流、滑动验证码)。单一的协议破解(如逆向 `x-s` 签名)往往在遭遇强风控时全线崩溃。 +- **效率与稳定性的博弈**:传统的浏览器自动化方案(Playwright/Selenium)虽能较好地模拟真人绕过风控,但并发低、极其消耗 CPU/内存资源,无法满足大规模跑批需求。 +- **业务不能断流**:对于上游的内容生成和监控业务,数据的断流意味着业务停滞,必须拥有 100% 可用的兜底机制。 + +### 1.4 核心解决方案 +**“稳定性调度大脑 (Stability Controller) + 三引擎自动降级 + RPA 回传兜底 + AI Agent 自动化获客闭环”**: +1. **主链路(三引擎自动降级)**: + - 引擎 A (Spider_XHS): 协议逆向极速采集(高并发、低成本) + - 引擎 B (MediaCrawler): Playwright Stealth 增强页面采集(含拟人化) + - 引擎 D (AgenticCrawler): 基于大模型视觉的智能自适应页面解析引擎(抗前端 DOM 改版) +2. **统一调度与资源池**:前置 Stability Controller 负责基于错误策略(`auth/rate/risk/captcha/timeout` 等)对 Account Pool(账号冷却池)、Session Pool(会话轮换)和 Proxy Pool(动态代理评分与剔除)进行资源分配、重试与引擎降级决策。在遇到复杂验证码时,主动唤醒 **AI Agentic Captcha Solver** 智能解除风控。 +3. **兜底链路(Chrome 插件 RPA 回传)**:当所有自动引擎和 AI 解除验证码均失效,任务进入 `WAITING_RPA` 时,由人工在真实浏览器环境中通过插件采集并调用 `POST /api/v1/import/extension` 回传结果。 +4. **离线导入链路(人工离线导入)**:支持运营人员导出的 Excel(小红书数据助手等)格式化导入。 +5. **AI 编排脚本(Orchestrator)**:提供基于 SQLite 的业务编排闭环,包含数据清洗、AI 智能生成图文内容、基于 AI Agent(`browser-use`)的**全自动真实发布**与**高意向线索自动私信触达**。 +6. **运营控制台 (Ops Console)**:提供基于 React + Ant Design 的前端看板,实现资源池(账号/会话/代理)监控、错误分析聚合与内容库的只读可视化。 + +--- + +## 2. 核心架构设计 + +### 2.1 系统架构图概览 +```text +上游业务 (Java/Agent) ──► [ OpenAPI / Webhook ] ──► (Spider_XHS FastAPI 微服务) + │ + ┌──────────────────────────────────────────────────────┤ + ▼ ▼ +[ 稳定性大脑: Stability Controller ] [ 兜底链路:数据导入 (Importer) ] + ├─► 资源分配:Account Pool + Session Pool + Proxy Pool ├─► 解析器: xhs_data_assistant + ├─► AI Agent 风控解除 (Captcha Solver) ├─► 解析器: fixed_template + ▼ │ +[ 多引擎执行链路 (Runner) ] └─► 字段标准化、元数据注入 + ├─► 引擎 A (Spider_XHS): 协议逆向、极速并发 │ + │ └─ 遇 timeout/rate/auth/risk/captcha ─重试/降级─┐ │ + │ ▼ │ + ├─► 引擎 B (MediaCrawler): Stealth 防抖模拟人行为 │ + │ └─ DOM 解析失效/改版 ─降级─┐ │ + │ ▼ │ + └─► 引擎 D (Agentic Crawler): AI 视觉自动解析 ───────────┤ + └─ 若仍被验证码拦截 → 状态 WAITING_RPA(等待人工回传) │ + ┌──────────────────────┴───────────────────────────────┘ + ▼ +[ 持久化与监控 ] + ├─► 文件存储 (JSON + HTML快照) 带有 fcntl 进程锁 + ├─► 限流拦截 (IP 滑动窗口 RateLimit) + └─► Prometheus 指标监控 (/metrics) + │ + ▼ +[ AI 自动化业务编排 (Orchestrator) ] + ├─► 核心数据库: SQLite (14张业务表: 关键词、生成草稿、线索等) + ├─► AI Agent 自动操作: 发布图文 (publish_tracker.py) + ├─► AI Agent 自动操作: 线索私信触达 (lead_service.py) + └─► 生态协同: 数据清洗、AI 图文生成、飞书同步 + +[ 运营控制台 (Ops Console) ] + ├─► 资源池中心 (账号/会话/代理健康快照) + ├─► 错误中心 (任务失败聚合与异常扫描) + └─► 内容库看板 (基于 SQLite 数据的全量只读展示) + + (第三兜底链路:Chrome 插件 RPA 回传) + 浏览器插件 ──► POST /api/v1/import/extension ──► 写入任务结果 +``` + +--- + +## 3. 功能需求说明 + +### 3.1 核心采集业务 (Data Scraping) +- **笔记详情采集 (`note_url`)**:输入笔记链接,获取无水印图文、视频信息、正文、点赞/收藏/评论等互动数据。 +- **用户主页采集 (`user_profile`)**:输入用户 ID 或主页链接,获取博主基本信息、粉丝数、关注数、获赞与收藏总数。 +- **关键词搜索采集 (`search`)**:输入关键词,获取相关笔记列表,支持按综合/最新/热门排序。 + +### 3.2 智能引擎调度与稳定性控制 (Stability Controller) +- **资源池化管理**:系统内置账号池 (Account Pool)、会话池 (Session Pool) 和代理池 (Proxy Pool),所有采集任务通过 Stability Controller 动态获取最佳健康资源。 + - **代理打分与剔除**:代理池支持多 provider 聚合,根据任务执行结果实时回写分数,自动降权并剔除高失败率代理。 + - **账号防封冷却**:连续遭遇 `rate` 或 `risk` 错误的账号将进入冷却窗口,避免被平台拉黑。 +- **三引擎容灾与降级(服务端自动执行)**: + - 任务默认以 `auto` 策略下发,优先分配给 **Spider_XHS 引擎 (Engine A)** 以最高效率执行。 + - 监听执行异常并按策略表处置:`timeout` 换代理重试;`rate` 账号冷却+换账号降频;`auth` 标记会话失效+换 Session。 + - 遇到强风控 (`risk`/`captcha`) 时,降级至 **MediaCrawler 引擎 (Engine B)**。 + - Engine B 内置 Stealth 脚本与随机拟人化行为。若 Engine B 报 `parse` 错误(说明 DOM 结构发生变化),任务自动流转至 **Agentic Crawler (Engine D)**,由大模型视觉接管。 + - **智能验证码解除 (Captcha Solver)**:遇到复杂滑块或点选验证码时,系统主动实例化基于大模型的 `AgenticCaptchaSolver`,尝试自动操控浏览器完成过验。若 Agent 仍失败,任务进入 `WAITING_RPA` 状态。 +- **终极免疫链路(人工 RPA 回传)**:处于 `WAITING_RPA` 的任务可由人工通过 Chrome 浏览器插件拦截并回传数据,完成闭环;该链路属于人工兜底通道,而非服务端自动执行的引擎线程。 + +### 3.3 异步任务与状态流转 (Task Lifecycle) +- **扩展任务状态机**:支持 `queued` -> `running` -> `retrying` -> `fallback_running` -> `succeeded` / `failed` / `waiting_rpa` / `rpa_imported` / `risk_paused` 等细粒度状态。 +- **异步拉取**:上游业务通过创建任务获取 `task_id`,并可通过长轮询获取执行结果。 +- **Webhook 回调**:支持配置全局 `CALLBACK_URL`。任务到达终态后自动触发回调推送。 + - **回调重试**:如遇上游网络抖动,系统自动采用指数退避算法最多重试 5 次,并在头部携带 `Idempotency-Key` 确保上游幂等消费。 + +### 3.4 离线数据人工兜底 (Offline Import) +- **Excel 文件解析**:提供 `/api/v1/import/excel` 接口,支持上传 Excel 文件。 +- **模板智能识别**:自动识别“小红书数据助手”等特定报表格式。 +- **数据标准化**:提取“曝光、阅读、互动、转粉”等高阶流量指标,将其与自动化采集的数据结构对齐 (`normalized` 数据契约)。 +- **全链路溯源**:保留 `operator` (操作人)、`source_name` (来源)、行号及表名等元数据,便于数据审计。 + +### 3.5 运营控制台与可视化 (Ops Console) +- **资源池中心**:直接复用内存池(Account/Session/Proxy Pool)状态快照,提供只读的可视化列表,方便运维人员实时监控风控与资源健康度。 +- **错误聚合分析**:基于本地任务文件系统,扫描近期(如近 1000 条)采集任务,并根据错误类别(auth/rate/risk/captcha 等)进行高阶聚合分析,提供失败列表过滤能力。 +- **内容库基础页**:对接 Orchestrator 的 SQLite 底座,为业务人员提供原始笔记和标准化笔记的分页查询与模糊检索视图。 + +--- + +## 4. 非功能需求 + +### 4.1 并发与数据一致性 +- **无状态服务设计**:服务进程应尽量无状态化,数据落盘依赖本地文件系统(`./storage`)。 +- **并发控制锁**:针对单机多进程/多线程场景,写入任务状态时必须使用 `fcntl.flock` 实现进程间排他锁,配合 `.tmp` 文件原子替换 (`os.replace`),杜绝高并发下的文件写坏或数据覆盖问题。 + +### 4.2 限流与防雪崩 +- **IP 级滑动窗口限流**:内置轻量级基于内存的 API 限流(如 `100 次 / 60 秒`),超出阈值立即返回 HTTP 429 状态码与 `Retry-After` 头,防止恶意或异常流量打垮服务。 + +### 4.3 监控与可观测性 +- **Prometheus 集成**:暴露标准 `/api/v1/metrics` 接口,输出以下核心指标: + - `spider_xhs_tasks_total{engine, status}`:各引擎任务执行计数。 + - `spider_xhs_queue_length` 与 `spider_xhs_tasks_inflight`:当前排队及在途队列长度。 + - `spider_xhs_recent_failure_rate`:滑动窗口内的实时失败率报警指标。 + - 代理池指标:`spider_xhs_proxy_pool_size`、`spider_xhs_proxy_pool_avg_score` 及失败原因分布统计。 +- **结构化日志**:使用 `loguru` 输出结构化日志并按天切割;对部分错误日志做基础脱敏处理,但不保证对所有自定义日志字段自动脱敏,敏感凭证应只通过环境变量注入且避免打印。 + +### 4.4 部署与兼容性 +- **容器化支持**:提供 Dockerfile 与 Docker Compose 编排,支持一键部署。存储目录独立挂载,保障数据持久化。 +- **向后兼容**:针对历史未带版本号的旧 API,可通过配置 `ENABLE_LEGACY_ROUTES=1` 提供兼容层平滑过渡。 + +--- + +## 5. API 接口契约说明 + +### 5.1 全局契约 +所有 RESTful API 的响应体均采用标准三段式包装: +```json +{ + "code": 200, // 业务状态码,200 为成功,100xx 为特定业务错误 + "msg": "success", // 提示信息 + "data": { ... } // 载荷数据 +} +``` + +### 5.2 核心端点规划 +| 端点路径 | Method | 用途 | 核心参数/说明 | +|---|---|---|---| +| `/api/v1/tasks` | POST | 创建采集任务 | 必传: `task_type`, `target`; 可选: `engine`, `payload` | +| `/api/v1/tasks/{id}` | GET | 查询任务状态 | 返回当前状态 (`queued`, `running` 等) | +| `/api/v1/tasks/{id}/result`| GET | 获取任务结果 | 若未完成返回 409;完成后返回 `raw`, `normalized`, `meta` | +| `/api/v1/tasks/{id}/callback/retry` | POST | 手动重试回调 | 用于 Webhook 推送彻底失败后的人工介入 | +| `/api/v1/import/excel` | POST | 上传并解析 Excel | 必传: `file`, `operator`; 输出标准化清洗结果 | +| `/api/v1/import/extension` | POST | 插件 RPA 结果回传 | 必传: `task_id`, `raw`, `normalized`; 更新任务为 `RPA_IMPORTED` | +| `/api/v1/metrics` | GET | Prometheus 指标 | 面向监控采集系统 (如 Grafana),含任务、队列、代理池状态 | +| `/api/v1/health` | GET | 健康检查与看板 | 包含队列长度、在途数、各引擎执行统计及实时失败率 | + +--- + +## 6. 演进规划 (Roadmap) + +- **Phase 1 (V1.0) - 历史版本**:完成双引擎双链路架构重构,实现核心采集、容灾降级、IP限流、并发控制与容器化监控交付。 +- **Phase 2 (MVP) - 当前版本**:新增轻量级 Orchestrator 编排脚本与 SQLite 核心业务数据库,提供“采集同步→内容清洗→Mock 飞书同步/告警”的最小闭环样例。 +- **Phase 3 (Stability) - 当前版本**:完成 Stability Controller (稳定性大脑) 重构,补齐 Account Pool、Session Pool 与 Proxy Pool 的全链路生命周期管理。完成 Chrome 插件回传通道(`/api/v1/import/extension` + `WAITING_RPA` 状态机),形成“双引擎自动采集 + 人工 RPA 回传兜底”的可控闭环。新增基于 React 的运营控制台,落地资源池中心、错误中心与内容库基础页的只读可视化。 +- **Phase 4 (V1.1)**:增加任务优先级队列 (Priority Queue),支持高优紧急任务插队;集成 Redis 作为分布式存储与分布式锁的选项,支持多节点横向扩容部署。 +- **Phase 5 (V2.0)**:接入大模型智能提取链路(针对未知格式或乱码的 Excel,通过 LLM 自动提取结构化内容);提供可视化热更新管理后台(用于账号池、代理池调优)。 diff --git a/PROJECT_STATUS.md b/PROJECT_STATUS.md new file mode 100644 index 0000000000000000000000000000000000000000..aa8b24bcca8575c058576ab88bc582908f7a90d0 --- /dev/null +++ b/PROJECT_STATUS.md @@ -0,0 +1,92 @@ +# Spider_XHS (小红书稳定采集微服务) - 项目完成情况介绍 + +## 1. 项目概览 + +Spider_XHS 最初是一个基于小红书 PC 端和创作者平台 API 逆向签名的纯脚本库。经过最近几轮的大型架构重构,该项目已经脱胎换骨,全面升级为一个**企业级、高可用、抗风控**的稳定采集微服务,并在生产层面引入了“稳定性调度大脑 + 资源池化 + 双自动引擎 + RPA 回传兜底”的组合策略。 + +目前,项目已完成 V1.0.0 技术规格书中定义的所有核心指标,实现了从“个人学习用爬虫”到“AI Agent 数据底座”的跨越。 + +--- + +## 2. 核心架构完成情况 + +### 2.1 稳定性大脑与多链路协同 (Stability Controller & Multi-Path Fallback) +已**完全实现**。 +- **稳定性调度大脑 (Stability Controller)**:不再将降级写死在代码中,而是抽象出统一的大脑,依据细粒度错误(`timeout/rate/auth/risk/captcha`)决定:重试、账号冷却、会话失效、引擎降级或暂停任务。 +- **资源池化底座**: + - `Account Pool`:提供账号风险分管理与连续报错(`rate/risk`)的自动冷却窗口。 + - `Session Pool`:统一管理 API Cookie 会话与 Playwright `storage_state`,具备轻量级健康检查与会话轮换能力。 + - `Proxy Pool`:支持从多个外部 API 或文件动态拉取 IP,基于执行结果同步打分,并具备高失败代理自动剔除逻辑。 +- **多引擎自动采集 + 一条人工 RPA 兜底链路**: + - **Engine A (API 引擎)**:作为极速并发的首选。 + - **Engine B (Browser 引擎)**:增强了 Stealth 抹机脚本与随机拟人化行为,作为遭遇风控验证码时的第二链路。 + - **Engine D (Agentic Crawler)**:全新引入的基于 `browser-use` 和视觉大模型的智能引擎,专治小红书前端 DOM 改版导致传统爬虫失效的场景。 + - **Agentic Captcha Solver**:智能风控解除。系统在遇到复杂验证码时,会自动唤醒大模型视觉实例去识别并完成验证码点选/滑块。 + - **RPA 回传链路(Chrome 扩展)**:项目已提供可加载的 Chrome 插件与服务端回传接口 `/api/v1/import/extension`。该链路是**最后兜底通道**,用于将处于 `WAITING_RPA` 的任务结果回填并继续回调/入库流程。 + +### 2.2 人工兜底采集链路 (Offline Excel Import) +已**完全实现**。 +- 针对极端情况(全网 IP 被封或大版本升级),开发了 `/api/v1/import/excel` 接口。 +- 支持直接上传由“小红书数据助手”或第三方平台导出的离线 Excel 数据。 +- 解析器(`xhs_data_assistant`)能够智能识别变体表头,自动完成字段映射(如提取曝光、阅读、互动、转粉等指标),并将其洗牌为标准的 `normalized` 数据结构,实现线上自动化与线下人工数据的无缝衔接。 + +--- + +## 3. 服务端能力完成情况 + +### 3.1 异步任务与 Webhook 回调 +已**完全实现**。 +- 采用 FastAPI 构建了无状态的轻量级微服务(`service/app.py`)。 +- 支持长轮询模式(`/api/v1/tasks/{id}/result`)。 +- 支持更丰富的细粒度任务状态流转(`queued` -> `running` -> `retrying` / `fallback_running` -> `waiting_rpa` / `rpa_running` -> `succeeded/failed/rpa_imported` 等)。 +- 实现了基于 `Idempotency-Key` 的 Webhook 回调推送机制,并在回调失败时支持指数退避的自动重试(最多 5 次),确保上游(如 Java 服务端)能够稳定、幂等地接收采集结果。 + +### 3.2 极高并发与文件锁存储 +已**完全实现**。 +- 数据持久化层(`service/storage.py`)采用了基于本地文件系统(`./storage`)的 JSON 分片存储策略。 +- **原子落盘 + 进程锁**:写入采用临时文件 + `os.replace` 原子替换;为避免多进程并发更新造成覆盖,关键更新路径使用 `fcntl.flock` 做轻量进程互斥,保证一致性与可恢复性。 + +### 3.3 企业级运维与监控保障 +已**完全实现**。 +- **防雪崩 IP 限流**:在 API 网关层实现了基于内存滑动窗口的 IP 限流器(`PerIPRateLimitMiddleware`),默认 `100次/60秒`,超出阈值立即返回 `429 Retry-After`,同时优化了清理逻辑的 O(N) 开销,防止恶意流量打垮服务。 +- **Prometheus 监控**:暴露了标准的 `/api/v1/metrics` 接口,可直出各引擎的 `spider_xhs_tasks_total`、队列积压及在途任务长度,以及 `spider_xhs_recent_failure_rate`、代理池规模、评分及失败原因等核心业务指标,可对接 Grafana 看板。 +- **结构化日志与基础脱敏**:使用 `loguru` 实现日志按天切割、保留与压缩;对部分错误日志做基础脱敏处理,但不保证对所有自定义日志字段自动脱敏,敏感凭证应只通过环境变量注入且避免打印。 +- **自动清理**:服务内置后台守护线程,自动清理过期(默认 7 天)的 Browser 引擎生成的 HTML 原始快照。 + +### 3.4 自动化获客闭环 MVP (Automated Customer Acquisition Closed-Loop) +已**完全实现**。 +- 在服务上层新增了 `orchestrator/` 编排层,实现了“采集同步 → 内容清洗 → AI 生成图文 → AI 自动发布 → AI 自动触达高意向用户 → 飞书线索同步”的完整业务闭环。 +- **本地数据库底座**:基于 SQLite 构建了 14 张核心业务表(`orchestrator/data/mvp.db`),覆盖了从数据获取、图文草稿管理、任务重试到客户线索的所有生命周期。 +- **AI 生成与自动操作能力**: + - 基于大模型实现了批量仿写与原创文案的生成(`ai_generation.py`)。 + - 引入了基于 `browser-use` 框架的 AI Browser Agent,成功实现了无人值守的**真实小红书笔记自动发布**(`publish_tracker.py`)与**高意向线索的自动私信回复**(`lead_service.py`)。 +- **生态协同**:`feishu_sync.py` 与 `alert.py` 完成了线索数据向上游 CRM(飞书多维表格)和报警群的 Mock 流转。 + +### 3.5 运营控制台可视化 (Ops Console Visualization) +已**完全实现**。 +- 基于 React + Vite + Ant Design 构建了轻量级前端控制台(`frontend/`)。 +- 实现了只读模式的**资源池中心**(账号/会话/代理状态快照)、**错误中心**(任务错误聚合与失败任务扫描)以及**内容库基础页**(从 Orchestrator SQLite 数据库读取原始笔记与清洗后数据)。 +- 通过复用内存中的 `AccountPool`, `SessionPool`, `ProxyPool` 的快照和本地任务文件列表,以最小侵入的方式实现了前后端数据打通,提供了友好的看板展示与空态引导。 + +--- + +## 4. 部署与兼容性完成情况 + +### 4.1 容器化与独立运行 +已**完全实现**。 +- 提供了完整的 `Dockerfile` 与 `docker-compose.yml`。 +- 支持一键拉起包括 FastAPI 服务、本地文件挂载(`./storage`)在内的所有组件。 +- 提供了 `MEDIACRAWLER_MOCK` 模式,允许在没有安装 Playwright 依赖(或 CI/CD 环境)的情况下,通过 Mock HTML 直接验证服务调度、容灾及解析逻辑。 + +### 4.2 遗留系统兼容 +已**完全实现**。 +- 通过环境变量 `ENABLE_LEGACY_ROUTES=1`,支持无缝兼容不带 `/api/v1` 前缀的旧版路由请求。 +- 存储层支持启动时自动识别并热迁移旧版的单一 `tasks.json` 文件到全新的碎片化目录结构中,做到了对老用户的零感知升级。 + +--- + +## 5. 总结 + +目前,**Spider_XHS 已达成当前版本既定的产品化目标**。它不仅保留了作为底层爬虫框架逆向签名的硬核能力,更通过引入 FastAPI 微服务架构、稳定性调度大脑 (Stability Controller)、资源池化底座、双引擎自动容灾(API + Playwright Stealth)与 RPA 回传兜底、原子落盘 + 进程锁的本地持久化、Prometheus 监控以及 IP 防雪崩限流,形成了一个可用于上游 AI/数据业务调用的稳定采集底座。 + +此外,项目提供了一个**脚本样例级的编排 MVP**(`orchestrator/` + SQLite),用于演示采集同步与基础清洗、以及 Mock 的飞书同步与告警落库,为后续完善 AI 生成/合规模块预留了清晰扩展点。同时,配合基于 React 的**前端运营控制台**,打通了资源池监控、错误聚合排查与内容展示的可视化闭环,极大提升了项目的可用性与运维体验。 diff --git a/README.md b/README.md index 6472fcaf3a258c28eea4163df72c80d56472e821..dd806ab49a576c653c71a1e7481db83e52bbac18 100644 --- a/README.md +++ b/README.md @@ -8,3 +8,745 @@ pinned: false --- Check out the configuration reference at https://huggingface.co/docs/hub/spaces-config-reference +
+ +# Spider_XHS + +**专业的小红书数据采集 & 全域运营解决方案 & Agent Skills** + +[![Python](https://img.shields.io/badge/python-3.10%2B-blue)](https://www.python.org/) +[![Node.js](https://img.shields.io/badge/nodejs-20%2B-green)](https://nodejs.org/) +[![License](https://img.shields.io/badge/license-MIT-orange)](LICENSE) + +
+ +> **在 AI 大模型爆发的时代,内容运营的竞争本质是效率竞争。** +> 本项目封装了小红书平台完整的数据采集与内容发布能力,为开发者构建 AI 运营智能体提供可靠、稳定的底层 API 支撑。 +> **本项目已升级为「稳定性大脑 + 三引擎自动降级 + AI Agent 全链路闭环」架构:服务端自动执行 API 协议采集 + Playwright Stealth + 基于大模型视觉的 Agentic Crawler,并支持 AI Agent 全自动发布笔记与自动触达私信线索。** + +**⚠️ 本项目仅供学习交流使用,禁止任何商业化行为,如有违反,后果自负** + +--- + +## 为什么需要这个项目? + +``` +采集竞品笔记 ──► [Spider_XHS] ──► 你的 AI Agent(改写 / 生成 / 分析)──► 自动上传发布 + ▲ │ + └──────────── 获取数据 / 管理账号 ◄──────────────────────┘ +``` + +小红书没有开放完整的内容运营接口。想要接入 AI 大模型实现内容批量采集、智能改写、一键发布,首先需要能**稳定读写平台数据**。Spider_XHS 解决的正是这个前置问题: + +- 逆向还原了小红书 PC 端与创作者平台的签名算法(x-s / x-t / x-s-common / x_b3_traceid / sign / q-signature参数) +- 封装全部核心 HTTP 接口,签名参数已透明处理 +- 同时覆盖 **数据采集**(PC端)、**内容发布**(创作者平台)、**KOL数据**(蒲公英)三大场景 + +**你负责接 AI 大脑,我们负责打通小红书的神经。** + +--- + +## ⭐ 已实现功能 + +| 模块 | 功能 | 状态 | +|------|------|------| +| **小红书 PC 端** | 二维码登录 / 手机验证码登录 | ✅ | +| | 获取主页所有频道 & 推荐笔记 | ✅ | +| | 获取用户主页信息 / 自己的账号信息 | ✅ | +| | 获取用户发布 / 喜欢 / 收藏的所有笔记 | ✅ | +| | 获取笔记详细内容(无水印图片 & 视频) | ✅ | +| | 搜索笔记 & 搜索用户 | ✅ | +| | 获取笔记评论 | ✅ | +| | 获取未读消息 / 评论@提醒 / 点赞收藏 / 新增关注 | ✅ | +| **创作者平台** | 二维码登录 / 手机验证码登录 | ✅ | +| | 上传图集作品 | ✅ | +| | 上传视频作品 | ✅ | +| | 查看已发布作品列表 | ✅ | +| **蒲公英平台** | 获取 KOL 博主列表 & 详细数据 | ✅ | +| | 获取博主粉丝画像 & 历史趋势 | ✅ | +| | 发起合作邀请 | ✅ | +| **千帆平台** | 获取分销商列表 & 详细数据 | ✅ | +| | 获取分销商合作品类 / 店铺 / 商品信息 | ✅ | + +--- + +## 🌟 核心特性:稳定性大脑与 AI Agent 多链路容灾 + +为了解决小红书日益严格的风控(封号、验证码、前端频繁改版)问题,本项目重构了底层的采集架构,全面升级为**三自动引擎 + RPA 回传兜底 + 稳定性大脑 (Stability Controller)** 模式,完美适配生产级高可用业务需求。 + +### 1. 稳定性大脑与资源池化 +不再将异常写死在爬虫逻辑中,而是引入统一的 `Stability Controller`,并辅以三大资源池,实现防封禁的生命周期管理: +- **Account Pool(账号池)**:自动管理多账号风险分,遭遇高频风控时触发防封冷却窗。 +- **Session Pool(会话池)**:兼容 API 登录态与浏览器 Storage State,支持失效检测与自动轮换。 +- **Proxy Pool(代理池)**:支持多数据源动态拉取 IP,根据网络响应执行同步打分,自动剔除高失败率代理。 +- **Agentic Captcha Solver**:智能风控解除。系统在遇到复杂验证码时,会自动唤醒基于 `browser-use` 和视觉大模型的 Agent,自动完成滑块拖动和文字点选。 + +### 2. 自动化主采集链路(三引擎容灾) +提供标准的 FastAPI 轻量服务,服务端自动执行三大引擎: +- **Engine A (API 协议引擎)**:通过逆向签名算法直接请求接口。极速、并发高、资源占用极小,作为绝对主力。 +- **Engine B (Browser 浏览器引擎)**:引入 Playwright 自动化框架,并提供 Stealth 脚本注入与随机拟人化动作(鼠标轨迹、滚动,支持开关)以降低自动化特征。 +- **Engine D (Agentic Crawler)**:引入了**基于视觉大模型的自适应页面提取引擎**。当小红书前端 DOM 结构发生变化导致 Engine B 的固定解析规则失效时,系统自动降级唤醒 Engine D,大模型会自己“看”懂页面并提取标题、作者、正文等结构化数据,无视任何前端改版。 + +### 3. 人工兜底链路(Chrome 插件 RPA 回传) +Chrome 插件并不是服务端“自动执行的第三引擎线程”,而是一个**最后的人工兜底通道**:当所有自动引擎和 AI Agent 都无法突破风控,任务自动进入 `WAITING_RPA` 时,运营人员在真实浏览器环境下采集页面内容/数据后,通过插件调用服务端回传接口完成闭环。 + +### 4. 人工兜底采集链路(Excel 导入解析) +在极端情况下(如全网 IP 被封或大面积升级),支持通过**小红书数据助手**或第三方平台导出的 Excel 进行全量解析: +- 提供 `/api/v1/import/excel` 接口,自动识别各种变体表头(曝光、阅读、互动、转粉等)。 +- 自动完成字段映射、标准化清洗,将离线人工数据无缝对接到自动化流中,确保业务“不断流”。 + +### 5. 企业级运维保障 +- **防雪崩限流**:内置基于 IP 的滑动窗口限流(Rate Limit),超过阈值返回 `429 Retry-After`。 +- **普罗米修斯监控**:暴露 `/api/v1/metrics` 接口,直出 `spider_xhs_tasks_total`、`spider_xhs_recent_failure_rate` 等核心指标。 +- **并发与持久化**:采用基于 `fcntl.flock` 的文件锁分片存储,保证跨进程写文件的绝对原子性。 +- **回调重试机制**:支持结果 Webhook 推送,内置 `Idempotency-Key` 幂等控制与指数退避重试。 + +### 6. AI 自动化获客编排 (Orchestrator MVP) +基于采集服务之上,新增轻量级 Python 编排脚本与 SQLite 本地数据库,提供完整的获客流转: +- **采集同步与清洗**:`crawl_sync.py` & `note_cleaner.py` +- **AI 批量生成图文**:`ai_generation.py` +- **AI 全自动发布笔记**:`publish_tracker.py`(接入 `browser-use`,大模型直接操控创作者中心) +- **AI 全自动触达线索**:`lead_service.py`(AI 自动打开小红书网页版私信,向高意向用户发送留资话术) +- **飞书同步与告警**:`feishu_sync.py` & `alert.py` (Mock 流转) + +### 7. 运营控制台可视化 +提供基于 React + Vite 的前端可视化看板(`frontend/`),支持资源池(账号/会话/代理)健康度监控、任务错误聚合扫描以及底层 SQLite 内容库的只读分页检索,实现了前后端数据链路的直观打通。 + +--- + +## 🤖 接入 AI 智能体 + +Spider_XHS 天然适合作为 AI 运营 Agent 的数据底座,以下是几种典型用法: + +### 场景一:竞品笔记采集 + AI 改写 + 自动发布 + +```python +from apis.xhs_pc_apis import XHS_Apis +from apis.xhs_creator_apis import XHS_Creator_Apis + +pc_api = XHS_Apis() +creator_api = XHS_Creator_Apis() + +# 1. 采集竞品笔记 +success, msg, note = pc_api.get_note_info(note_url, cookies_str) + +# 2. 交给 AI 改写(接入任意大模型) +rewritten = your_ai_agent(note['content']) # GPT / Claude / Qwen / 本地模型 + +# 3. 自动上传到创作者平台 +creator_api.post_note({ + "title": rewritten['title'], + "desc": rewritten['desc'], + "media_type": "image", + "images": [...], + ... +}, creator_cookies_str) +``` + +### 场景二:关键词监控 + AI 情报分析 + +```python +# 搜索指定关键词的最新笔记,交给 AI 分析趋势 +success, msg, notes = pc_api.search_some_note(query, require_num, cookies_str, ...) +analysis = your_ai_agent(notes) +``` + +### 场景三:KOL 筛选 + 智能匹配 + +```python +from apis.xhs_pugongying_apis import PuGongYingAPI + +pgy = PuGongYingAPI() +# 获取目标类目的 KOL 数据,交给 AI 评估匹配度 +kol_list = pgy.get_some_user(num=50, cookies=cookies) +best_kols = your_ai_agent(kol_list, brand_profile) +``` + +--- + +## 🧩 Skills 支持 + +当前项目已经支持基于 skills 的能力接入,既可以直接作为 `Spider_XHS` 的底层能力仓库使用,也可以通过标准化 skills 方式被上层 Agent 工具链引入。 + +如果你希望直接复用已经封装好的 skills,可以直接使用本项目内置的 Agent Skills。该模块目前可被 `Clawbot`、`Claude Code`、`Codex` 等支持 skills 的工具直接引入与集成。 + +--- + +## 🎨 爬虫效果图 + +### 处理后的所有用户 +![image](https://github.com/cv-cat/Spider_XHS/assets/94289429/00902dbd-4da1-45bc-90bb-19f5856a04ad) + +### 某个用户所有的笔记 +![image](https://github.com/cv-cat/Spider_XHS/assets/94289429/880884e8-4a1d-4dc1-a4dc-e168dd0e9896) + +### 某个笔记具体的内容 +![image](https://github.com/cv-cat/Spider_XHS/assets/94289429/d17f3f4e-cd44-4d3a-b9f6-d880da626cc8) + +### 保存的 Excel +![image](https://github.com/user-attachments/assets/707f20ed-be27-4482-89b3-a5863bc360e7) + +--- + +## 🛠️ 快速开始 + +### ⛳ 环境要求 + +- Python 3.10+ +- Node.js 20+ + +### 🎯 安装依赖 + +```bash +pip install -r requirements.txt +npm install +``` + +### 🧩 Playwright(必选:用于 Browser 引擎兜底与扫码登录) + +如果你希望使用**浏览器引擎 (MediaCrawler) 作为兜底策略**,或使用“内置浏览器扫码登录后自动获取 Cookie 并写回 `.env`”功能,必须安装 Playwright 浏览器内核: + +```bash +playwright install chromium +``` + +### 🎨 配置 Cookie + +在项目根目录的 `.env` 文件中填入你的登录 Cookie: + +``` +COOKIES='' +``` + +Cookie 获取方式:浏览器登录小红书后,按 `F12` 打开开发者工具 → 网络 → Fetch/XHR → 找任意一个请求 → 复制请求头中的 `cookie` 字段。 + +![image](https://github.com/user-attachments/assets/6a7e4ecb-0432-4581-890a-577e0eae463d) + +![image](https://github.com/user-attachments/assets/5e62bc35-d758-463e-817c-7dcaacbee13c) + +> **注意:必须是登录后的 Cookie,未登录状态无效。** + +### 🚀 运行项目 + +```bash +python main.py +``` + +### 🧰 CLI(推荐) + +项目提供 CLI 方便团队跑批与断点续跑(默认状态文件 `datas/state.json`): + +```bash +python cli.py --help +``` + +#### 扫码登录并自动写回 Cookie(推荐) + +```bash +python cli.py login pc-qrcode --save-cookies --write-env +``` + +成功后会同时写入: + +- `datas/cookies.json` +- `.env` 中的 `COOKIES="..."` + +#### 抓取示例 + +```bash +python cli.py note --url 'https://www.xiaohongshu.com/explore/xxxx?...' --save-choice excel --excel-name demo --resume +python cli.py user --url 'https://www.xiaohongshu.com/user/profile/xxxx?...' --resume +python cli.py search --query 榴莲 --num 50 --resume +``` + +如需代理: + +```bash +python cli.py search --query 榴莲 --num 50 --proxy http://127.0.0.1:7890 +``` + +--- + +## 🧪 FastAPI 服务(对接 Java/任务系统) +- 适合场景:把 Spider_XHS 作为“采集/解析微服务”,由上游(Java/Agent/工作流引擎)提交任务并拉取结果;也可配置回调推送。 +- 提示:服务端不会要求你把 Cookie 写进代码;推荐通过环境变量或任务 payload 注入,且不要在日志中打印 Cookie。 + +### 🚀 启动服务(uvicorn) +在项目根目录执行: + +```bash +python -m uvicorn Spider_XHS.service.app:app --host 0.0.0.0 --port 8000 +``` + +开发调试(热加载): + +```bash +python -m uvicorn Spider_XHS.service.app:app --host 0.0.0.0 --port 8000 --reload +``` + +启动后: +- OpenAPI JSON:`http://localhost:8000/openapi.json` +- Swagger UI:`http://localhost:8000/docs` + +### 🖥️ 运营控制台前端(frontend/) +`frontend/` 提供基于 Vite + React + TypeScript 的运营控制台,用于查看健康状态、任务中心、RPA 回传、错误中心、资源池中心、内容库与监控指标等能力。 + +页面路由: +- `/dashboard`:服务健康概览 +- `/tasks`、`/tasks/:id`:任务中心与任务详情 +- `/rpa`:RPA 回传(Chrome 插件) +- `/errors`:错误中心 +- `/resources/accounts`、`/resources/sessions`、`/resources/proxies`:资源池中心(账号/会话/代理) +- `/content/raw-notes`、`/content/cleaned-notes`:内容库(原始笔记/清洗笔记) +- `/metrics`:监控指标 + +#### 1) 安装依赖 +```bash +cd frontend +npm install +``` + +#### 2) 配置后端地址(VITE_API_BASE_URL) +默认后端地址为 `http://localhost:8000/api/v1`。如需调整,可通过环境变量覆盖: + +- 推荐本地联调(走 Vite dev proxy,避免 CORS): +```bash +export VITE_API_BASE_URL=/api/v1 +``` + +- 或指定完整地址(跨域访问,需后端允许 CORS): +```bash +export VITE_API_BASE_URL=http://localhost:8000/api/v1 +``` + +#### 3) 启动前端(开发模式) +```bash +npm run dev +``` + +访问地址:`http://localhost:5173/` + +#### 4) 与 FastAPI 同时启动(本地联调) +终端 A(后端): +```bash +python -m uvicorn Spider_XHS.service.app:app --host 0.0.0.0 --port 8000 --reload +``` + +终端 B(前端): +```bash +cd frontend +export VITE_API_BASE_URL=/api/v1 +npm run dev +``` + +### 🧩 API 概览(/api/v1) +服务默认只开放带版本前缀的接口(推荐): +- Base URL:`http://:8000/api/v1` +- 统一响应包装:`{ "code": 200, "msg": "success", "data": ... }` +- 兼容旧路由(不带 `/api/v1`)可通过 `ENABLE_LEGACY_ROUTES=1` 打开,默认关闭 + +#### 统一响应格式 +所有接口(包括错误)均返回如下结构: + +```json +{ + "code": 200, + "msg": "success", + "data": {} +} +``` + +常用错误码(节选): +| 场景 | HTTP | code | 说明 | +|------|------|------|------| +| 鉴权/登录态失效 | 401 | 10001 | auth required | +| 单 IP 限流 | 429 | 10002 | rate limited | +| 风控/验证码 | 403 | 10003 | risk control | +| 参数/路径无效 | 400/404 | 10007 | invalid target | +| 任务不存在 | 404 | 10008 | task not found | +| 结果未就绪 | 409 | 10009 | task not ready | + +#### 端点一览 +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/tasks` | 分页查询任务列表(过滤/排序) | +| POST | `/tasks` | 创建任务 | +| GET | `/tasks/{task_id}` | 查询任务状态 | +| GET | `/tasks/{task_id}/result` | 获取任务结果 | +| POST | `/tasks/{task_id}/callback/retry` | 手动重试回调 | +| POST | `/import/excel` | Excel 导入 | +| POST | `/import/extension` | Chrome 插件回填 | +| GET | `/errors/summary` | 错误聚合统计与失败任务列表 | +| GET | `/resources/accounts` | 账号池快照(只读) | +| GET | `/resources/sessions` | 会话池轻量检查(只读) | +| GET | `/resources/proxies` | 代理池快照(只读) | +| GET | `/content/raw-notes` | 内容库 raw_note 列表(SQLite,只读) | +| GET | `/content/cleaned-notes` | 内容库 cleaned_note 列表(SQLite,只读) | +| GET | `/health` | 健康检查 | +| GET | `/metrics` | Prometheus 指标 | + +#### 运维:GET /api/v1/health +返回字段(节选): +- `queue_length`:当前 `queued` 任务数量 +- `engine_usage_counts`:各引擎累计处理任务数(内存计数器,随进程重启清零) +- `success_count` / `fail_count`:累计成功/失败任务数(内存计数器,随进程重启清零) +- `recent_failure_rate`:近期失败率(滑动窗口) +- `recent_failure_window_seconds` / `recent_failure_total` / `recent_failure_failed`:失败率窗口与分子分母 + +#### 监控:GET /api/v1/metrics +Prometheus 文本格式(`text/plain; version=0.0.4`),核心指标(节选): +- `spider_xhs_queue_length` +- `spider_xhs_tasks_total{engine,status}` +- `spider_xhs_tasks_succeeded_total` / `spider_xhs_tasks_failed_total` +- `spider_xhs_recent_failure_rate` + +#### 错误中心:GET /api/v1/errors/summary +用途:聚合最近 N 条任务的 `error_kind` 分布,并返回失败任务列表(可分页/过滤)。 + +Query 参数(节选): +- `scan_limit`:扫描最近任务数量(默认取 `ERROR_SUMMARY_SCAN_LIMIT`,未配置则 1000) +- `status`:逗号分隔过滤(例如 `failed,waiting_rpa`) +- `error_kind`:逗号分隔过滤(例如 `auth,rate`) + +#### 资源池中心:GET /api/v1/resources/* +- `/resources/accounts`:账号池快照(risk_score/cooldown 等) +- `/resources/sessions`:会话池轻量检查结果(ok/reason) +- `/resources/proxies`:代理池快照(规模/均分/失败原因分布等) + +#### 内容库:GET /api/v1/content/* +说明:内容库从编排层 SQLite 读取数据,数据库路径由 `ORCHESTRATOR_DB_PATH` 指定(默认 `orchestrator/data/mvp.db`)。 + +- `/content/raw-notes`:raw_note 列表(author/url/content 模糊搜索) +- `/content/cleaned-notes`:cleaned_note 列表(cleaned_content + 关联 raw_note 信息) + +#### 日志字段(结构化) +服务日志会在 `extra` 中附带结构化字段(示例字段): +- `task_id` / `task_type` / `engine` / `status` +不要把 Cookie、回调签名、Authorization 等敏感信息写入日志;错误信息会对 URL token/cookies 等做脱敏。 + +#### 1) 创建任务:POST /api/v1/tasks +用途:提交一个采集/解析任务,立即返回 task_id,服务端后台执行。 + +请求体(JSON): +- `task_type`:`note_url` / `search` / `user_profile` +- `target`:采集目标(URL/关键词/用户ID),推荐使用;会自动写入 payload(向后兼容) +- `payload`:任务参数(见下方示例) +- `engine`:可选,`auto` / `api` / `browser`(不传则使用环境变量 `ENGINE_STRATEGY`) + +示例(抓取笔记详情): + +```bash +curl -X POST "http://localhost:8000/api/v1/tasks" \ + -H "Content-Type: application/json" \ + -d '{ + "task_type": "note_url", + "target": "https://www.xiaohongshu.com/explore/xxxx?xsec_token=yyyy", + "engine": "auto", + "payload": { + "operator": "demo" + } + }' +``` + +说明: +- API 引擎(Spider_XHS)Cookie 来源优先级:`payload.cookies`/`payload.cookie`/`payload.cookies_str` > 环境变量 `COOKIES`/`XHS_COOKIES` +- 代理来源优先级:`payload.proxies`(对象)> `payload.proxy`/`payload.proxies`(字符串)> 环境变量 `SERVICE_PROXY`;浏览器引擎可额外配置 `SERVICE_PROXY_POOL` 做简单轮换 + +不同 task_type 的 payload 约定: +- `note_url` + - `note_url`:笔记链接(必填) + - `cookies`:可选(推荐从环境变量注入,避免透传到上游日志) +- `search` + - `query` 或 `keyword`:关键词(必填) + - `require_num` 或 `limit`:期望条数(默认 20) + - `sort_type_choice` / `note_type` / `note_time` / `note_range` / `pos_distance` / `geo`:可选(与 CLI/现有 API 对齐) +- `user_profile` + - `user_id` 或 `uid`:用户 id(必填其一) + - 或 `user_url`/`url`:用户主页链接(可由 URL 自动解析 user_id) + +#### 2) 查询任务状态:GET /api/v1/tasks/{task_id} + +```bash +curl "http://localhost:8000/api/v1/tasks/" +``` + +返回字段(节选): +- `task.status`:`queued` / `running` / `succeeded` / `failed` +- `task.engine`:实际执行引擎(例如 `spider_xhs` / `mediacrawler`) +- `task.callback`:回调状态(配置了回调才会出现) + +#### 3) 获取任务结果:GET /api/v1/tasks/{task_id}/result +- 若任务仍在 `queued/running`,会返回 `409`(result not ready)。 +- 任务结束后返回: + - `raw`:引擎原始返回(结构可能随上游接口变化) + - `normalized`:标准化后的关键字段(便于入库/检索) + - `meta`:任务元信息与错误分类 + +```bash +curl "http://localhost:8000/api/v1/tasks//result" +``` + +`meta` 常用字段: +- `ok`:是否成功 +- `error_kind`:失败类型(如 `auth` / `rate` / `risk` / `parse` / `timeout` / `missing_dependency`) +- `primary_engine` / `final_engine`:自动策略下的主引擎与最终引擎 +- `fallback_reason`:触发兜底的原因(如阈值触发、鉴权/风控失败等) + +#### 4) 回调失败手动重试:POST /api/v1/tasks/{task_id}/callback/retry +仅当配置了回调地址且结果已生成时可用。服务端会: +- 使用 `Idempotency-Key` 请求头进行幂等控制 +- 最多尝试 5 次(指数退避 + 抖动) +- 若仍失败,会把失败请求落盘,供后续重试 + +```bash +curl -X POST "http://localhost:8000/api/v1/tasks//callback/retry" +``` + +#### 5) Excel 导入(人工兜底):POST /api/v1/import/excel +用途:当线上采集受风控/验证码影响,可用人工导出的 Excel 进行导入与标准化。 + +请求(multipart/form-data): +- `file`:Excel 文件 +- `operator`:操作者/来源标识(必填) +- `source_name`:可选,来源名称(如“运营同学A导出”) + +```bash +curl -X POST "http://localhost:8000/api/v1/import/excel" \ + -F "file=@/path/to/demo.xlsx" \ + -F "operator=ops" \ + -F "source_name=manual_export" +``` + +模板识别(自动): +- `xhs_data_assistant`:命中“曝光/阅读/浏览/互动/转粉”等数据助手指标列,并同时包含“笔记标题/笔记链接/作者昵称”等识别列 +- `fixed_template`:兜底模板(任意表头),会尝试从行内提取 URL/关键词等信息 + +`xhs_data_assistant` 支持的常见列(中英文均可,大小写/空格不敏感): +- 识别列:`笔记链接`/`笔记url`/`作品链接` → `note_url`,`笔记标题` → `title`,`作者昵称` → `author` +- 时间列:`发布时间`/`上传时间` → `publish_time` +- 互动指标:`点赞` → `like`,`评论` → `comment`,`收藏` → `collect`,`分享`/`转发` → `share` +- 流量指标:`曝光` → `exposure`,`阅读` → `read`,`浏览` → `view`,`互动` → `interact`,`转粉` → `follow` +- 内容类型:`内容类型`/`笔记类型` → `content_type` + +标准化输出(`records[].normalized`): +- 基础:`kind`(note/user/search/url)、`note_url`/`note_id`/`user_url`/`user_id`/`query` +- 补充:`title`、`author`/`nickname`、`publish_time`、`like_count`、`comment_count`、`collect_count`、`share_count`、`exposure`、`read`、`view`、`interact`、`follow`、`content_type` + +溯源信息(`records[].meta`): +- `import_id`、`operator`、`source_name`、`row_number`、`dedup_key`、`parser`、`filename`、`sheet` + +### ⚙️ 环境变量配置(建议仅在部署环境注入,不要写入代码/仓库) +> 下表仅展示变量名与用途;不要在日志/报错里输出真实 Cookie、回调签名、账号信息等敏感数据。 + +| 变量 | 默认值 | 说明 | +|------|--------|------| +| `ENABLE_LEGACY_ROUTES` | `0` | 是否兼容旧接口(不带 `/api/v1` 前缀);建议生产关闭 | +| `STORAGE_ROOT` / `SERVICE_STORAGE_ROOT` / `SERVICE_STORAGE_DIR` | `./storage` | 服务落盘根目录(任务/结果/回调失败/原始快照/日志) | +| `SERVICE_CONCURRENCY` | `4` | 任务执行并发度(建议和上游并发一起收敛) | +| `SERVICE_PROXY` | 空 | 代理(同时影响 API 引擎与浏览器引擎) | +| `SERVICE_PROXY_POOL` | 空 | 代理池(逗号分隔多个代理,浏览器引擎按重试次数轮换;会与 `SERVICE_PROXY` 合并去重) | +| `ENGINE_STRATEGY` | `auto` | `auto`/`api`/`browser` | +| `ENGINE_FALLBACK_THRESHOLD` | `3` | `auto` 模式下切换浏览器引擎的失败阈值 | +| `RAW_DATA_RETENTION_DAYS` | `7` | 原始 HTML 快照保留天数(启动时清理 + 定时清理) | +| `ORCHESTRATOR_DB_PATH` | `orchestrator/data/mvp.db` | Orchestrator SQLite 路径(内容库接口 `/content/*` 读取,只读打开) | +| `ERROR_SUMMARY_SCAN_LIMIT` | `1000` | 错误中心默认扫描最近任务数(用于 `/errors/summary` 的默认 `scan_limit`) | +| `CALLBACK_URL` / `SERVICE_CALLBACK_URL` | 空 | 任务完成后回调推送地址(可选) | +| `MEDIACRAWLER_STORAGE_STATE_PATHS` | 空 | 浏览器引擎登录态文件路径(逗号分隔多个候选) | +| `MEDIACRAWLER_STORAGE_STATE_PATH` | 空 | 同上(单路径兼容) | +| `MEDIACRAWLER_MOCK` | `0` | 浏览器引擎 mock 模式(不依赖 Playwright,可用于自测/验证脚本) | +| `MEDIACRAWLER_RETRY_MAX` | `3` | 浏览器引擎页面访问重试次数(含首次) | +| `MEDIACRAWLER_RETRY_BASE_S` | `0.8` | 浏览器引擎重试指数退避基准秒数 | +| `MEDIACRAWLER_RETRY_CAP_S` | `12` | 浏览器引擎重试指数退避上限秒数 | +| `MEDIACRAWLER_DELAY_MIN_S` | `0.4` | 浏览器引擎随机延迟下限(秒) | +| `MEDIACRAWLER_DELAY_MAX_S` | `1.2` | 浏览器引擎随机延迟上限(秒) | +| `COOKIES` / `XHS_COOKIES` | 空 | API 引擎登录 Cookie(敏感:建议仅在服务侧注入,不要写入镜像/仓库) | +| `LOG_LEVEL` | `INFO` | 日志级别 | +| `LOG_ROTATION` | `1 day` | 日志切分策略(loguru rotation) | +| `LOG_RETENTION` | `14 days` | 日志保留策略(loguru retention) | +| `LOG_COMPRESSION` | `zip` | 压缩策略(loguru compression) | + +`MEDIACRAWLER_STORAGE_STATE_PATHS` 示例: + +```bash +export MEDIACRAWLER_STORAGE_STATE_PATHS="/data/xhs/storage_state.json,/app/datas/state.json" +``` + +### 🐳 Docker Compose 部署(推荐) +项目根目录已提供 `docker-compose.yml`,默认把宿主机 `./storage` 挂载到容器 `/app/storage`: + +```bash +docker compose up -d --build +``` + +建议做法: +- 使用单独的环境文件(例如 `.env.local`)注入 `COOKIES`/`CALLBACK_URL` 等敏感变量,并确保不提交到仓库 +- 生产环境用反向代理(Nginx/Traefik)提供 TLS,并设置 `x-forwarded-for` 以获得真实 IP(限流按 IP 生效) + +### ☕ Java 对接示例 +#### 模式 A:拉取(推荐,最简单稳定) +流程: +1) Java 调用 `POST /api/v1/tasks` 创建任务拿到 task_id +2) Java 轮询 `GET /api/v1/tasks/{task_id}` 或直接轮询 `GET /api/v1/tasks/{task_id}/result` +3) 拿到 `normalized/meta` 入库;`raw` 可按需归档 + +OkHttp 示例(精简版): + +```java +import okhttp3.*; +import java.util.concurrent.TimeUnit; + +public class SpiderXhsClient { + private final OkHttpClient http = new OkHttpClient.Builder() + .connectTimeout(5, TimeUnit.SECONDS) + .readTimeout(30, TimeUnit.SECONDS) + .build(); + + public String createTask(String baseUrl, String noteUrl) throws Exception { + String json = "{" + + "\"task_type\":\"note_url\"," + + "\"engine\":\"auto\"," + + "\"payload\":{\"note_url\":\"" + noteUrl + "\",\"operator\":\"java\"}" + + "}"; + Request req = new Request.Builder() + .url(baseUrl + "/api/v1/tasks") + .post(RequestBody.create(json, MediaType.parse("application/json"))) + .build(); + try (Response resp = http.newCall(req).execute()) { + if (!resp.isSuccessful()) throw new RuntimeException("createTask http=" + resp.code()); + return resp.body().string(); + } + } +} +``` + +生产环境请解析 JSON 响应,读取 `task.id` 作为 task_id。 + +建议: +- `GET /tasks/{task_id}/result` 返回 409 时,说明结果未就绪;可退避重试(例如 0.5s/1s/2s 递增)。 +- 如果 `meta.ok=false` 且 `meta.error_kind=risk/auth`,通常需要人工处理验证码或更新 Cookie/登录态。 + +#### 模式 B:回调推送(可选) +配置: +- 在 Spider_XHS 服务侧设置 `CALLBACK_URL`(或 `SERVICE_CALLBACK_URL`) + +回调行为: +- 服务完成任务后会 `POST CALLBACK_URL` +- 请求头:`Idempotency-Key: ` +- 请求体:`{"task_id":"...","result":{...}}` + +Spring Boot 接收示例(示意,需自行实现幂等去重存储): + +```java +import org.springframework.web.bind.annotation.*; +import java.util.Map; + +@RestController +public class SpiderXhsCallbackController { + @PostMapping("/spider-xhs/callback") + public Map callback( + @RequestHeader(value = "Idempotency-Key", required = false) String idemKey, + @RequestBody Map body + ) { + return Map.of("ok", true); + } +} +``` + +生产环境建议使用 `Idempotency-Key` 做幂等去重(例如落库或 Redis SETNX),同时避免把任何敏感字段写入日志。 + +回调失败处理: +- 服务会自动重试(最多 5 次),仍失败会落盘记录 +- 上游可调用 `POST /api/v1/tasks/{task_id}/callback/retry` 触发再次推送 + +### 🛡️ 风控/验证码/封号风险建议(务必阅读) +- 限速:控制 QPS 与随机抖动(尤其是搜索/批量详情),避免固定频率与高峰期密集请求。 +- 并发:客户端线程数与服务端并发同时收敛;建议从小并发开始(例如 1~3)逐步压测。 +- 代理:优先稳定的住宅/独享代理;按账号/会话做粘性,避免频繁切换出口;异常时再切换。 +- 人工验证码:当出现验证码/风控(`meta.error_kind=risk/auth`),建议人工在浏览器完成验证并更新 Cookie 或 Playwright storage_state。 +- 账号隔离:不同业务/环境使用不同账号与独立存储目录;避免共享 Cookie 造成互相踢下线。 + +### 🧰 运维说明(日志/清理/迁移) +- 日志目录:`/logs/`,默认文件 `service.log`(按 `LOG_ROTATION` 轮转,保留 `LOG_RETENTION`,按需压缩) +- 原始快照:浏览器引擎可能会将 HTML 快照写入 `/raw/`,由 `RAW_DATA_RETENTION_DAYS` 控制保留天数(启动时清理 + 定时清理) +- 存储迁移:若检测到旧版 `tasks.json`,服务启动时会自动迁移到 `/tasks/{task_id}.json`,并将旧文件重命名为 `tasks.json.migrated` +- 备份建议:按目录整体备份 `` 即可(包含任务、结果、回调失败、快照与日志) + +### ✅ 合规红线(务必遵守) +- 仅在获得授权、符合平台协议与当地法律法规前提下使用本项目 +- 不要采集/存储/传播敏感个人信息,避免长期留存可识别数据(PII) +- Cookie、storage_state、回调签名等属于敏感凭证,严禁写入仓库、镜像、公开日志与错误回传 +- 如需留存数据,建议最小化采集字段、设置保留期、建立审计与删除机制 + +--- + +## 📁 项目结构 + +``` +Spider_XHS/ +├── main.py # 主入口:爬虫调用示例 +├── apis/ +│ ├── xhs_pc_apis.py # 小红书PC端完整API(采集) +│ ├── xhs_creator_apis.py # 创作者平台API(上传发布) +│ ├── xhs_pc_login_apis.py # PC端登录(二维码/手机验证码) +│ ├── xhs_creator_login_apis.py # 创作者平台登录 +│ ├── xhs_pugongying_apis.py # 蒲公英平台API(KOL数据) +│ └── xhs_qianfan_apis.py # 千帆平台API(分销商数据) +├── xhs_utils/ +│ ├── common_util.py # 初始化工具(读取.env配置) +│ ├── cookie_util.py # Cookie解析 +│ ├── data_util.py # 数据处理(Excel保存、媒体下载) +│ ├── xhs_util.py # PC端签名算法封装 +│ ├── xhs_creator_util.py # 创作者平台签名算法封装 +│ ├── xhs_pugongying_util.py # 蒲公英平台工具 +│ └── xhs_qianfan_util.py # 千帆平台工具 +├── static/ +│ ├── xhs_main_260411.js # PC端签名核心JS(最新版) +│ ├── xhs_creator_260411.js # 创作者平台签名核心JS(最新版) +│ └── ... +├── .env # Cookie配置(不要提交到git) +├── requirements.txt +├── Dockerfile +└── package.json +``` + +--- + +## 🗝️ 注意事项 + +- `main.py` 是爬虫入口,可根据需求修改调用逻辑 +- `apis/xhs_pc_apis.py` 包含所有 PC 端数据接口 +- `apis/xhs_creator_apis.py` 包含创作者平台发布接口 +- Cookie 有时效性,失效后需重新获取 +- 建议配合代理(proxies 参数)使用,降低封号风险 + +--- + +## 🍥 更新日志 + +| 日期 | 说明 | +|------|------| +| 23/08/09 | 首次提交 | +| 23/09/13 | API 更改 params 增加两个字段,修复图片无法下载,修复部分页面无法访问报错 | +| 23/09/16 | 修复较大视频编码问题,加入异常处理 | +| 23/09/18 | 代码重构,加入失败重试 | +| 23/09/19 | 新增下载搜索结果功能 | +| 23/10/05 | 新增跳过已下载功能,获取更详细的笔记和用户信息 | +| 23/10/08 | 上传至 PyPI,可通过 pip install 安装 | +| 23/10/17 | 搜索下载新增排序方式(综合 / 热门 / 最新) | +| 23/10/21 | 新增图形化界面,上传至 release v2.1.0 | +| 23/10/28 | Fix Bug:修复搜索功能隐藏问题 | +| 25/03/18 | 更新 API,修复部分问题 | +| 25/06/07 | 更新 search 接口,区分视频和图集下载,新增创作者平台 API | +| 25/07/15 | 更新 xs version56 & 小红书创作者接口 | +| 26/04/11 | 重构创作者平台 API(图集 / 视频上传),新增蒲公英 KOL 数据 API,新增千帆分销商 API,签名算法升级至最新版 | +| 26/04/16 | **架构升级**:全面升级为“稳定性大脑 + 双自动引擎 + RPA 回传兜底”架构(Spider_XHS + MediaCrawler兜底 + Extension 回传),新增 FastAPI 微服务、Webhook 回调、数据助手 Excel 解析、Prometheus 监控与 IP 级限流机制 | +| 26/04/16 | **业务闭环(样例级)**:新增 `orchestrator` 编排层,基于 SQLite 搭建 14 张核心业务表,提供“采集同步 -> 基础清洗 -> Mock 飞书同步/告警落库”的最小闭环样例 | +| 26/04/16 | **稳定性增强**:引入 **Stability Controller** 大脑与资源池(Account/Session/Proxy Pool),升级为**双自动引擎 + RPA 回传兜底**,增强浏览器引擎拟人化 Stealth,任务状态机升级为多阶段容灾流转 | +| 26/04/16 | **可视化控制台**:新增基于 React + Vite 的前端可视化看板,落地资源池中心(账号/会话/代理状态)、错误中心(失败任务聚合分析)与内容库基础页的可视化闭环 | + +--- + +## 🧸 额外说明 + +1. 欢迎 PR 和 Issue。 +2. 项目会持续更新,致力于提供更稳定的数据采集服务。 diff --git a/USAGE.md b/USAGE.md new file mode 100644 index 0000000000000000000000000000000000000000..b8c3d7f5d64a7f40b9f8d51d0be977b718466e87 --- /dev/null +++ b/USAGE.md @@ -0,0 +1,299 @@ +# 小红书稳定采集微服务 (Spider_XHS) 使用说明文档 + +本文档旨在指导开发者、运维人员及业务方如何快速部署、配置、对接与运维 Spider_XHS 数据采集微服务。 + +--- + +## 1. 快速启动与部署 + +本项目推荐使用 Docker Compose 进行容器化部署,以保证运行环境的一致性。 + +### 1.1 前置要求 +- **环境**:已安装 Docker 和 Docker Compose。 +- **配置**:建议在宿主机(如项目根目录)准备 `.env` 文件。 +- **数据目录**:宿主机需要预留足够的磁盘空间用于挂载 `./storage` 目录(存放任务状态、结果、回调失败记录、日志及 HTML 快照)。 + +### 1.2 启动步骤 +在项目根目录执行以下命令,构建并以后台模式启动服务: +```bash +docker compose up -d --build +``` +启动成功后,FastAPI 服务默认监听宿主机的 `8000` 端口。你可以通过访问 `http://localhost:8000/docs` 查看 Swagger UI 接口文档。 + +### 1.3 运营控制台前端(frontend/,可选) +项目内置一个运营控制台前端(Vite + React),用于在浏览器中查看健康状态、任务列表、任务详情与监控指标。 + +在项目根目录执行: +```bash +cd frontend +npm install +export VITE_API_BASE_URL=/api/v1 +npm run dev +``` + +访问地址:`http://localhost:5173/` + +页面路由: +- `/dashboard`:服务健康概览 +- `/tasks`、`/tasks/:id`:任务中心与任务详情 +- `/rpa`:RPA 回传(Chrome 插件) +- `/errors`:错误中心 +- `/resources/accounts`、`/resources/sessions`、`/resources/proxies`:资源池中心 +- `/content/raw-notes`、`/content/cleaned-notes`:内容库 +- `/metrics`:监控指标 + +本地联调建议同时启动 FastAPI: +```bash +python -m uvicorn Spider_XHS.service.app:app --host 0.0.0.0 --port 8000 --reload +``` + +--- + +## 2. 核心配置说明 (环境变量) + +服务的大部分行为通过环境变量进行控制,建议将敏感信息(如 Cookie)通过环境注入,切勿硬编码。 + +| 环境变量名 | 默认值 | 作用与说明 | +|---|---|---| +| `ENGINE_STRATEGY` | `auto` | 采集引擎调度策略(服务端自动执行的只有 A/B 两个引擎)。可选值:
- `auto`: 优先使用 API 引擎,遇风控自动降级到浏览器引擎
- `api`: 仅使用极速协议级引擎 (Spider_XHS)
- `browser`: 仅使用自动化浏览器引擎 (MediaCrawler) | +| `COOKIES` / `XHS_COOKIES` | 无 | 小红书登录后的 Cookie 字符串。支持多账号,可通过 `COOKIES_LIST` 等按逗号/换行分隔配置。 | +| `ACCOUNT_COOLDOWN_SECONDS` | `900` | 当账号遇到严重风控 (rate/risk) 时,自动进入冷却状态的时长(秒)。 | +| `SERVICE_PROXY` | 无 | 单一代理服务器地址(如 `http://127.0.0.1:7890`)。 | +| `SERVICE_PROXY_POOL` | 无 | 代理列表(逗号分隔)。浏览器引擎在重试时会按次数轮询;同时会与 `SERVICE_PROXY` 合并去重。 | +| `PROXY_API_URL` | 无 | 动态代理池提取 API(单个地址,向后兼容);建议使用 `PROXY_API_URLS`。 | +| `PROXY_API_URLS` | 无 | 动态代理池的提取 API,支持逗号分隔配置多个 Provider,系统将自动定时拉取、验活与打分剔除。 | +| `PROXY_FILE_PATH` | 无 | 静态代理池文件路径(每行一个代理),支持与 API 代理池聚合使用。 | +| `CALLBACK_URL` | 无 | 任务完成后的 Webhook 回调推送地址。若配置,服务将在采集成功/失败后主动推送数据。 | +| `STORAGE_ROOT` | `./storage` | 容器内的本地文件存储路径。 | +| `ORCHESTRATOR_DB_PATH` | `orchestrator/data/mvp.db` | Orchestrator SQLite 路径(内容库接口 `/content/*` 读取,只读打开)。 | +| `RAW_DATA_RETENTION_DAYS` | `7` | 浏览器引擎采集时保留的原始 HTML 快照天数,过期自动清理。 | +| `ERROR_SUMMARY_SCAN_LIMIT` | `1000` | 错误中心默认扫描最近任务数(用于 `/errors/summary` 的默认 `scan_limit`)。 | +| `MEDIACRAWLER_STEALTH` | `1` | 浏览器引擎是否启用 Stealth 脚本注入(`0` 关闭)。 | +| `MEDIACRAWLER_HUMANIZE` | `1` | 浏览器引擎是否启用随机拟人化动作(`0` 关闭)。 | +| `AGENT_LLM_API_KEY` | 无 | **[必填]** 用于驱动 AI Agent 操作浏览器的视觉大模型 API 密钥(如 OpenAI Key)。 | +| `AGENT_LLM_MODEL` | `gpt-4o` | AI Agent 使用的模型名称。必须支持 Vision 多模态能力(如 `gpt-4o`, `claude-3-5-sonnet-20241022`)。 | +| `AGENT_LLM_BASE_URL` | 无 | 大模型 API 的自定义 Base URL(如使用中转代理地址时填写)。 | +| `OPENAI_API_KEY` | 无 | 独立配置:如果 AI 图文生成(`ai_generation.py`)需要使用不同的模型或通道,可在此配置。 | + +--- + +## 3. 数据抓取实战操作指南 + +系统提供了多种灵活的抓取方式,以适应本地测试、运营操作和系统集成等不同场景。 + +### 3.1 方式一:命令行工具 (CLI) 本地抓取(推荐测试与获取 Cookie) +适合开发者在本地快速测试抓取效果,并获取最新、最完整的防风控 Cookie。 +1. **扫码登录并保存 Cookie**: + ```bash + python cli.py login pc-qrcode --save-cookies --write-env + ``` + > 运行后会弹出真实的浏览器界面供你扫码登录,登录成功后会自动将完整的 Cookie 和浏览器状态(包含 `storage_state.json`)更新到 `.env` 文件中。这是解决 `auth` 风控拦截的最佳方案。 +2. **执行抓取命令**: + ```bash + python cli.py search --query "小红书架构设计" --num 10 + ``` + > 支持的子命令包括 `search` 等,结果会默认保存到项目目录下的 `datas/excel_datas/` 文件夹中。 + +### 3.2 方式二:前端运营控制台 (Web UI) 可视化抓取 +适合运营人员或无需编写代码的用户进行可视化交互操作。 +1. **启动服务**:确保后端 FastAPI 和前端 Vite 服务已正常启动(参考文档 1.2 和 1.3 节)。 +2. **访问控制台**:在浏览器中打开 `http://localhost:5173/tasks` 进入任务中心。 +3. **创建任务**: + - **task_type**:填写 `search`(搜索)、`note_url`(单篇笔记)或 `user_profile`(用户主页)。 + - **target**:填写对应的关键词、笔记链接或用户主页链接。 + - 点击 **创建任务**。 +4. **查看结果**:任务创建后会在列表中显示,点击列表中的任务 ID 即可进入详情页,实时查看轮询状态、Raw(原始快照数据)和 Normalized(清洗后标准数据)。 + +### 3.3 方式三:API 接口调用 (上游业务集成) +适合将数据采集能力集成到 Java/Go/Python 等后端微服务中。 +- **操作步骤**:通过发送 HTTP POST 请求发起任务,再通过 GET 请求轮询结果,或配置 Webhook 接收自动回调。 +- **详细说明**:请参考本文档第 **4. 对接指南 (上游业务调用)** 节。 + +### 3.4 方式四:离线兜底与插件 RPA 回传 +当线上自动化引擎均遭遇极严格风控拦截时的人工兜底方案。 +- **操作步骤**:利用小红书数据助手导出 Excel 或使用 Chrome 插件在真实浏览器中抓取,随后调用对应的导入接口将数据回传至服务端。 +- **详细说明**:请参考本文档 **4.4 场景三:离线数据兜底导入** 及 **4.5 浏览器插件 RPA 回传**。 + +--- + +## 4. 对接指南 (上游业务调用) + +### 4.1 接口规范 +所有 API 请求路径前缀为 `/api/v1`。 +所有响应统一为 JSON 格式: +```json +{ + "code": 200, // 200 为成功,100xx 为特定业务错误 + "msg": "success", + "data": { ... } // 具体业务数据 +} +``` + +### 4.2 场景一:异步轮询拉取 (推荐) +最简单且稳定的对接方式。上游(如 Java 服务)发起任务后,定期查询结果。 + +**步骤 1:创建任务** +```bash +curl -X POST "http://localhost:8000/api/v1/tasks" \ + -H "Content-Type: application/json" \ + -d '{ + "task_type": "note_url", + "target": "https://www.xiaohongshu.com/explore/xxxx", + "engine": "auto" + }' +``` +> 返回值中的 `data.task.id` 即为 `task_id`。 + +**步骤 2:轮询获取结果** +```bash +curl "http://localhost:8000/api/v1/tasks//result" +``` +> **注意**: +> - 如果任务还在执行中,会返回 HTTP `409` (result not ready),上游应捕获 409 并等待 1~2 秒后重试。 +> - 任务结束后返回 200,其中 `data.normalized` 为标准化后的数据字段,`data.meta` 包含采集消耗的引擎与溯源信息。 + +### 4.3 场景二:Webhook 回调推送 +如果上游不希望轮询,可以在环境变量中配置 `CALLBACK_URL`。 + +服务在任务完成后(无论成功或失败),会自动向该地址发起 POST 请求: +- **Header**: 包含 `Idempotency-Key` (基于 task_id 和内容生成的哈希),上游应据此做**幂等去重**。 +- **Body**: +```json +{ + "task_id": "...", + "result": { + "raw": {...}, + "normalized": {...}, + "meta": {"ok": true, ...} + } +} +``` +> 如果回调失败(网络抖动或上游返回 5xx),服务会自动使用指数退避算法重试最多 5 次。 + +### 4.4 场景三:离线数据兜底导入 +当线上风控极其严格,导致 API 和浏览器双双失效时,业务方可通过“小红书数据助手”导出 Excel,并由上游系统或运营人员调用导入接口,将离线数据平滑注入现有业务流。 + +```bash +curl -X POST "http://localhost:8000/api/v1/import/excel" \ + -F "file=@/path/to/data_assistant.xlsx" \ + -F "operator=运营人员张三" +``` +> 服务会自动识别 Excel 表头(如曝光、阅读、互动等),并返回标准化的 `normalized` 数据数组。 + +### 4.5 浏览器插件 RPA 回传:POST /api/v1/import/extension +用途:该链路属于**人工兜底通道**(不在服务端自动执行引擎内)。通常用于处于 `waiting_rpa` 的任务:运营人员在真实浏览器环境中完成登录/验证码后,通过 Chrome 插件采集页面并回传;服务端目前只校验 `task_id` 存在性,然后写入结果并将任务更新为 `rpa_imported`。 + +```bash +curl -X POST "http://localhost:8000/api/v1/import/extension" \ + -H "Content-Type: application/json" \ + -d '{ + "task_id": "...", + "raw": {...}, + "normalized": {...} + }' +``` + +### 4.6 运营控制台只读接口 +资源池中心(只读): +- `GET /api/v1/resources/accounts` +- `GET /api/v1/resources/sessions` +- `GET /api/v1/resources/proxies` + +错误中心(聚合统计): +- `GET /api/v1/errors/summary` + - `scan_limit`:扫描最近任务数量(默认取 `ERROR_SUMMARY_SCAN_LIMIT`,未配置则 1000) + - `status` / `error_kind`:逗号分隔过滤 + +内容库(Orchestrator SQLite,只读): +- `GET /api/v1/content/raw-notes`(`query` 模糊匹配 author/url/content) +- `GET /api/v1/content/cleaned-notes`(`query` 模糊匹配 cleaned_content/raw_note 字段) + +--- + +## 5. 支持的任务类型 (task_type) + +在调用 `POST /api/v1/tasks` 时,支持以下三种主要采集任务: + +| task_type | target 含义 | 示例 | 产出数据 (`normalized` 核心字段) | +|---|---|---|---| +| `note_url` | 笔记链接 | `https://www.xiaohongshu.com/explore/xxx` | `note_id`, `title`, `author`, `publish_time` | +| `user_profile` | 用户主页链接或 ID | `https://www.xiaohongshu.com/user/profile/xxx` | `user_id`, `nickname`, `title` (个性签名) | +| `search` | 搜索关键词 | `AI Agent` | 匹配该关键词下的多条笔记基础信息列表 | + +--- + +## 6. 运维与监控 + +### 6.1 IP 限流防雪崩 +服务内置了防雪崩限流机制(默认每个 IP `100次 / 60秒`)。超出阈值的请求会立即被拒绝,并返回 HTTP `429 Too Many Requests` 以及 `Retry-After` Header。 +> **建议**:上游业务在收到 429 时,应遵守 `Retry-After` 指定的秒数进行休眠,避免持续轰炸。 + +### 6.2 Prometheus 指标监控 +服务直接暴露标准的 Prometheus Metrics 接口: +```bash +curl http://localhost:8000/api/v1/metrics +``` +**核心监控指标**: +- `spider_xhs_tasks_total{engine="api", status="succeeded"}`:按引擎和状态统计的任务总数。 +- `spider_xhs_queue_length`:当前正在排队中的积压任务数。 +- `spider_xhs_recent_failure_rate`:近 5 分钟内的实时失败率(可用于配置 Grafana 报警:如 > 20% 时触发风控预警)。 +- 代理池指标(若启用 Proxy Pool):`spider_xhs_proxy_pool_size`、`spider_xhs_proxy_pool_avg_score`、`spider_xhs_proxy_pool_ejected_total`、`spider_xhs_proxy_pool_failures_total{reason}`。 + +### 6.3 存储与数据清理 +所有任务状态、采集结果、失败回调记录以及日志,均持久化存储在宿主机挂载的 `./storage` 目录下。 +- **原子落盘 + 进程锁**:写入采用临时文件 + `os.replace` 原子替换;关键更新路径使用 `fcntl.flock` 做跨进程互斥,避免并发更新覆盖。 +- **自动清理**:服务后台线程会每天定期扫描 `./storage/raw` 目录,自动清理超过 `RAW_DATA_RETENTION_DAYS`(默认 7 天)的 HTML 页面快照,防止磁盘爆满。 + +### 6.4 常见错误排查 + +如果在获取结果时发现 `data.meta.ok = false`,请查看 `data.meta.error_kind`: + +- **`auth`**: 鉴权失败。可能是由于 Cookie 失效。**系统策略**:标记为失效并切换 Session Pool,若耗尽则报错。 +- **`rate`**: 遭遇频控。**系统策略**:为当前账号设置冷却窗(例如 15 分钟),换号、降频并重试。 +- **`risk` / `captcha`**: 触发滑动验证码。**系统策略**:主引擎报错,降级至浏览器引擎;若被验证码拦截,系统会唤醒 **Agentic Captcha Solver**(基于视觉大模型)自动拖动滑块/点选验证码;若 AI 依然失败,则置为 `waiting_rpa`,等待人工借助 Chrome 插件拉取。 +- **`parse`**: 页面解析失败。通常发生在小红书前端 DOM 结构发生重大改版时。**系统策略**:自动降级唤醒 **Engine D (Agentic Crawler)**,利用大模型的视觉和推理能力自动寻找并提取页面数据,无视 DOM 结构变化。 +- **`timeout` / `proxy_failed`**: 网络超时。**系统策略**:代理池立刻将此 IP 降分或剔除,换新代理并重新请求。 + +--- + +## 7. AI 自动化编排模块 (Orchestrator MVP) + +本项目在底层采集服务之上,提供了一套基于 Python 脚本的轻量级业务编排样例(位于 `orchestrator/` 目录),演示了借助大模型和 Agentic 技术实现的**获客全链路自动化**。 + +### 7.1 初始化数据库与测试数据 +编排模块依赖 SQLite (`orchestrator/data/mvp.db`) 作为主数据库,首次使用前需初始化: +```bash +# 生成 14 张核心业务表 +python orchestrator/db_init.py + +# 录入测试用的关键词与竞品账号 +python orchestrator/seed_data.py +``` + +### 7.2 核心业务流转脚本 +请确保后台 FastAPI 采集服务已启动,并且在环境变量中配置了 `AGENT_LLM_API_KEY`。可按顺序执行以下脚本体验闭环: + +1. **采集同步**:读取关键词并请求爬虫微服务,将 JSON 数据落库。 + ```bash + python orchestrator/crawl_sync.py + ``` +2. **数据清洗**:从原始快照中清洗提取标题、互动量等结构化字段。 + ```bash + python orchestrator/note_cleaner.py + ``` +3. **AI 图文生成**:利用大模型对爆款笔记进行分析、仿写与原创配图生成。 + ```bash + python orchestrator/ai_generation.py + ``` +4. **AI 自动发布**:唤醒基于 `browser-use` 的 AI 浏览器代理,自动操控小红书创作者中心进行真实发帖。 + ```bash + python orchestrator/publish_tracker.py + ``` +5. **AI 自动私信触达**:筛选出高意向互动用户,利用 AI Agent 自动打开私信窗口进行获客留资话术的回复。 + ```bash + python orchestrator/lead_service.py + ``` +6. **飞书同步与告警(Mock)**:将获取的线索同步至上游 CRM(如飞书多维表格)。 + ```bash + python orchestrator/feishu_sync.py + ``` diff --git a/apis/__init__.py b/apis/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/apis/xhs_creator_apis.py b/apis/xhs_creator_apis.py new file mode 100644 index 0000000000000000000000000000000000000000..a08a994bd6dfc95c5fbdd94fb7210ac73780e8e9 --- /dev/null +++ b/apis/xhs_creator_apis.py @@ -0,0 +1,428 @@ +import json +import time + +import cv2 +import numpy as np +from xhs_utils.cookie_util import trans_cookies +from xhs_utils.xhs_creator_util import get_upload_media_headers, get_post_note_headers, \ + get_loc_data, signature_js, get_fileIds_params, get_query_transcode_headers, \ + get_encryption_headers, sign_js, get_post_note_video_data, get_post_note_image_data, get_common_headers, \ + generate_xs, generate_xsc, get_search_location_headers +from xhs_utils.xhs_util import splice_str, generate_x_b3_traceid +from xhs_utils.http_client import HttpClient + + +class XHS_Creator_Apis(): + def __init__(self): + self.base_url = "https://creator.xiaohongshu.com" + self.upload_url = "https://ros-upload.xiaohongshu.com" + self.edith_url = "https://edith.xiaohongshu.com" + self.xhs_web_url = "https://www.xiaohongshu.com" + self.client = HttpClient() + + def get_topic(self, keyword, cookies): + try: + api = "/web_api/sns/v1/search/topic" + data = { + "keyword": keyword, + "suggest_topic_request": { + "title": "", + "desc": f"#{keyword}" + }, + "page": { + "page_size": 20, + "page": 1 + } + } + headers = get_common_headers() + xs, xt, data = generate_xs(cookies['a1'], api, data) + headers['x-s'], headers['x-t'] = xs, str(xt) + result = self.client.request_json("POST", self.edith_url + api, headers=headers, cookies=cookies, data=data.encode('utf-8')) + if not result.ok: + raise Exception(result.msg) + res_json = result.json + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + res_json = None + success, msg = False, str(e) + return success, msg, res_json + + def get_location_info(self, keyword, cookies): + try: + data = get_loc_data(keyword) + api = "/web_api/sns/v1/local/poi/creator/search" + headers = get_search_location_headers() + h = generate_xsc(cookies['a1'], api, data) + headers.update(h) + if data: + data = json.dumps(data, separators=(',', ':'), ensure_ascii=False) + result = self.client.request_json("POST", self.edith_url + api, headers=headers, cookies=cookies, data=data.encode('utf-8')) + if not result.ok: + raise Exception(result.msg) + res_json = result.json + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + res_json = None + success, msg = False, str(e) + return success, msg, res_json + + # media_type: image or video + def get_fileIds(self, media_type, cookies): + try: + api = "/api/media/v1/upload/creator/permit" + headers = { + "accept": "application/json, text/plain, */*", + "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", + "authorization;": "", + "cache-control": "no-cache", + "pragma": "no-cache", + "priority": "u=1, i", + "referer": "https://creator.xiaohongshu.com/publish/publish?source=official&from=menu&target=image", + "sec-ch-ua": "\"Not)A;Brand\";v=\"8\", \"Chromium\";v=\"138\", \"Microsoft Edge\";v=\"138\"", + "sec-ch-ua-mobile": "?0", + "sec-ch-ua-platform": "\"Windows\"", + "sec-fetch-dest": "empty", + "sec-fetch-mode": "cors", + "sec-fetch-site": "same-origin", + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0", + "x-b3-traceid": "f4f93b86e05f2402", + "x-s": "XYS_2UQhPsHCH0c1PjhFHjIj2erjwjQM89PjNsQhPjHCHS4kJfz647PjNsQhPUHCHfM1qAZlPebKPbYxwrk9+LEt4p4OJbmLG98e4M4HLgihGDE9y9krzd8r+DEI4Bz+pepY4n+w//QfafzgwBH94A+G2BQxcAmG/ApG2gYyLrhE2rhl8ePlanWM//8Y+f+OLLH9/rzjpe4aabSayBYBL9kVz/YNPLiFGDkjJLSy2dps4n8GGnHF/fRs+M+bnDEtyA8Y+nq62dY8PFRH40zozFkwNAm+wBFMGjHVHdWFH0ijHdF=", + "x-t": str(int(time.time() * 1000)) + } + params = get_fileIds_params(media_type) + splice_api = splice_str(api, params) + + xs, xt, _ = generate_xs(cookies['a1'], splice_api) + headers['x-s'], headers['x-t'] = xs, str(xt) + result = self.client.request_json("GET", self.base_url + splice_api, headers=headers, cookies=cookies) + if not result.ok: + raise Exception(result.msg) + res_json = result.json + success, msg = res_json["success"], '获取fileIds成功' + except Exception as e: + return False, str(e), (None, None) + return success, msg, (res_json, xt) + + + def upload_media(self, path_or_file, media_type, cookies): + res = { + "fileIds": '', + "width": '', + "height": '', + "video_id": '' + } + try: + success, msg, (data, xt) = self.get_fileIds(media_type, cookies) + if not success: + raise Exception(msg) + data = data['data']['uploadTempPermits'][0] + fileIds, expireTime, token = data['fileIds'][0].split('/')[-1], data['expireTime'], data['token'] + res['fileIds'] = fileIds + xt, expireTime = str(xt)[:10], str(expireTime)[:10] + message = f"{xt};{expireTime}" + if media_type == "image": + width, height, file, file_size = self.get_file_info(path_or_file, media_type="image") + res['width'] = width + res['height'] = height + else: + file, file_size = self.get_file_info(path_or_file, media_type="video") + signature = signature_js.call('getSignature', message, fileIds, file_size) + headers = get_upload_media_headers(message, signature, token) + api = f"/spectrum/{fileIds}" + result = self.client.request_text("PUT", self.upload_url + api, headers=headers, cookies=cookies, data=file) + if not result.ok: + raise Exception(result.msg) + if media_type == "video" and result.headers: + res['video_id'] = result.headers.get('X-Ros-Video-Id', '') + except Exception as e: + return False, str(e), None + return True, "上传成功", res + + def query_transcode(self, video_id, cookies): + res_json = None + success, msg = False, '' + try: + api = "/fe_api/burdock/v2/note/query_transcode" + headers = get_query_transcode_headers() + data = { + "videoId": video_id + } + xs, xt, data = generate_xs(cookies['a1'], api, data) + headers['x-b3-traceid'] = generate_x_b3_traceid() + headers['x-s'], headers['x-t'] = xs, str(xt) + result = self.client.request_json("POST", self.xhs_web_url + api, headers=headers, cookies=cookies, data=data) + if not result.ok: + raise Exception(result.msg) + res_json = result.json + success = res_json["success"] + if 'msg' in res_json: + msg = res_json['msg'] + except Exception as e: + success, msg = False, str(e) + return success, msg, res_json + + def encryption(self, file_id, cookies): + res_json = None + success, msg = False, '' + try: + api = "/web_api/sns/v5/creator/file/encryption" + headers = get_encryption_headers() + params = { + "file_id": file_id, + "type": "image", + "ts": str(int(time.time() * 1000)), + "sign": "" + } + sign = sign_js.call('urlSing', file_id) + params['sign'] = sign + splice_api = splice_str(api, params) + xs, xt, _ = generate_xs(cookies['a1'], splice_api) + headers['x-b3-traceid'] = generate_x_b3_traceid() + headers['x-s'], headers['x-t'] = xs, str(xt) + result = self.client.request_json("GET", self.xhs_web_url + splice_api, headers=headers, cookies=cookies) + if not result.ok: + raise Exception(result.msg) + res_json = result.json + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success, msg = False, str(e) + return success, msg, res_json + + + def post_note(self, noteInfo, cookies_str): + post_api = "/web_api/sns/v2/note" + headers = get_post_note_headers() + cookies = trans_cookies(cookies_str) + title = noteInfo['title'] + desc = noteInfo['desc'] + postTime = noteInfo['postTime'] + location = noteInfo['location'] + type = noteInfo['type'] + media_type = noteInfo['media_type'] + + if location is not None: + success, msg, location_info = self.get_location_info(location, cookies) + if not success: + raise Exception(msg) + if len(location_info['data']['poi_list']) == 0: + raise Exception('未找到该地点') + location = location_info['data']['poi_list'][0] + post_loc = { + "name": location['name'], + "subname": location['full_address'], + "poi_id": location['poi_id'], + "poi_type": location['poi_type'], + } + else: + post_loc = {} + if media_type == 'video': + video = noteInfo['video'] + success, msg, fileInfo = self.upload_media(video, media_type, cookies) + if not success: + raise Exception(msg) + firstFrameFileId = '' + while True: + success, msg, res = self.query_transcode(fileInfo['video_id'], cookies) + if not success: + raise Exception(msg) + if res['data']['hasFirstFrame'] == True: + firstFrameFileId = res['data']['firstFrameFileId'] + break + time.sleep(3) + success, msg, res = self.encryption('/' + firstFrameFileId, cookies) + if not success: + raise Exception(msg) + data = get_post_note_video_data(title, desc, postTime, post_loc, type, fileInfo, firstFrameFileId) + else: + fileInfos = [] + images = noteInfo['images'] + for image in images: + success, msg, fileInfo = self.upload_media(image, media_type, cookies) + if not success: + raise Exception(msg) + fileInfos.append(fileInfo) + data = get_post_note_image_data(title, desc, postTime, post_loc, type, fileInfos) + topics = noteInfo['topics'] + for topic in topics: + success, msg, res_json = self.get_topic(topic, cookies) + if not success: + raise Exception(msg) + if len(res_json['data']['topic_info_dtos']) == 0: + raise Exception(f'未找到话题{topic}') + insert_topic = res_json['data']['topic_info_dtos'][0] + insert_topic = { + "id": insert_topic['id'], + "link": insert_topic['link'], + "name": insert_topic['name'], + "type": 'topic' + } + data['common']['hash_tag'].append(insert_topic) + data['common']['desc'] += f" #{insert_topic['name']}[话题]# " + + # headers['x-s'] = xs + # headers['x-t'] = str(int(time.time() * 1000)) + + xs, xt, _ = generate_xs(cookies['a1'], post_api, data) + headers['x-s'], headers['x-t'] = xs, str(xt) + + if data: + data = json.dumps(data, separators=(',', ':'), ensure_ascii=False) + + # xs, xt, data = generate_xs(cookies['a1'], post_api, data) + # headers['x-s'], headers['x-t'] = xs, str(xt) + result = self.client.request_json("POST", self.edith_url + post_api, headers=headers, cookies=cookies, data=data.encode('utf-8')) + if not result.ok: + raise Exception(result.msg) + res_json = result.json + success, msg = res_json["success"], res_json["msg"] + return success, msg, res_json + + def get_file_info(self, file, media_type="image"): + file_size = len(file) + if media_type == "image": + size = cv2.imdecode(np.frombuffer(file, np.uint8), cv2.IMREAD_COLOR).shape + w, h = size[1], size[0] + if w > 2 * h: + h = int(w / 2) + return w, h, file, file_size + else: + return file, file_size + + + # # page: 页数 + # # time: 最近几天的时间 + # def get_publish_note_info(self, page: int, time: int, cookies_str): + # success = False + # msg = '成功' + # res_json = None + # try: + # api = "/api/galaxy/creator/data/note_stats/new" + # headers = get_common_headers() + # cookies = trans_cookies(cookies_str) + # xs, xt, _ = generate_xs(cookies['a1'], '/api/galaxy/creator/data/note_stats/new', '') + # headers['x-s'], headers['x-t'] = xs, str(xt) + # headers['x-b3-traceid'] = generate_x_b3_traceid() + # params = { + # "page": str(page), + # "page_size": "12", + # "sort_by": "time", + # "note_type": "0", + # "time": str(time), + # "is_recent": "false" + # } + # response = requests.get(self.base_url + api, headers=headers, cookies=cookies, params=params) + # res_json = response.json() + # success = res_json["success"] + # except Exception as e: + # success, msg = False, str(e) + # return success, msg, res_json + # + # + # # 获取全部的发布信息 + # def get_all_publish_note_info(self, cookies_str): + # page = 1 + # time = 7 + # success, msg, res_json = self.get_publish_note_info(page, time, cookies_str) + # if not success: + # return False, msg, None + # notes = res_json['data']['note_infos'] + # total = res_json['data']['total'] + # while len(notes) < total: + # page += 1 + # success, msg, res_json = self.get_publish_note_info(page, time, cookies_str) + # if not success: + # return False, msg, None + # notes += res_json['data']['note_infos'] + # return True, '成功', notes + + # page: 页数 + # time: 最近几天的时间 + def get_publish_note_info(self, page, cookies_str): + success = False + msg = '成功' + res_json = None + try: + api = "/api/galaxy/creator/note/user/posted" + headers = get_common_headers() + cookies = trans_cookies(cookies_str) + xs, xt, _ = generate_xs(cookies['a1'], api, '') + headers['x-s'], headers['x-t'] = xs, str(xt) + headers['x-b3-traceid'] = generate_x_b3_traceid() + params = { + "tab": '0', + } + if page: + params["page"] = str(page) + result = self.client.request_json("GET", self.base_url + api, headers=headers, cookies=cookies, params=params) + if not result.ok: + raise Exception(result.msg) + res_json = result.json + success = res_json["success"] + except Exception as e: + success, msg = False, str(e) + return success, msg, res_json + + + # 获取全部的发布信息 + def get_all_publish_note_info(self, cookies_str): + page = None + notes = [] + while True: + success, msg, res_json = self.get_publish_note_info(page, cookies_str) + print(success, msg, res_json) + if not success: + return False, msg, notes + notes += res_json['data']['notes'] + page = res_json['data']['page'] + if page == -1: + break + return True, '成功', notes + + +if __name__ == '__main__': + xhs_creator_apis = XHS_Creator_Apis() + # 创作者平台 https://creator.xiaohongshu.com/login 的cookie + cookies_str = r'' + noteInfos = [ + { + # 标题 + "title": "21121121212", + # 描述 + "desc": "dwadaw最后一把直接神之一手直接立直后第一轮就胡牌了,最近吃点好的,哈哈", + # 13位时间戳 数字类型 + "postTime": None, + # 设置地点 "河海大学" + "location": '南京', + # 0:公开 1:私密 + "type": 1, + "media_type": "image", + # 设置话题 + # "topics": ["雀魂", "麻将"], + "topics": [], + # 图片路径 最多15张 + "images": [ + open(r"D:\Desktop\签名\QQ图片20240903150607.jpg", 'rb').read(), + ], + }, + { + "title": "test2", + "desc": "dwadawd20240815", + "postTime": None, + "location": '河海大学', + "topics": ["北京"], + # "topics": [], + "type": 1, + "media_type": "video", + "video": open(r"D:\data\Videos\2024-05-02 21-14-45.mkv", 'rb').read(), + } + ] + for noteInfo in noteInfos: + success, msg, info = xhs_creator_apis.post_note(noteInfo, cookies_str) + print(success, msg, info) + print('========') + + # topics = ["雀魂", "麻将"] + # cookies = trans_cookies(cookies_str) + # for topic in topics: + # success, msg, res_json = xhs_creator_apis.get_topic(topic, cookies) + # print(success, msg, res_json) diff --git a/apis/xhs_creator_login_apis.py b/apis/xhs_creator_login_apis.py new file mode 100644 index 0000000000000000000000000000000000000000..89071fc6347962e462fb61e4aa6dff9322f8d335 --- /dev/null +++ b/apis/xhs_creator_login_apis.py @@ -0,0 +1,317 @@ +import json +from threading import Thread + +import aiohttp +import asyncio + +import qrcode + +from apis.xhs_creator_apis import XHS_Creator_Apis +from xhs_utils.xhs_creator_util import generate_xs, splice_str, get_common_headers +from playwright.async_api import async_playwright + + +class XHSLoginApi: + def __init__(self): + self.base_url = "https://customer.xiaohongshu.com" + self.home_url = 'https://creator.xiaohongshu.com' + + # 生成初始cookies + async def creatorCheckInitCookies(self, page): + while True: + cookies = dict() + page_cookies = await page.context.cookies() + for cookie in page_cookies: + cookies[cookie['name']] = cookie['value'] + if "a1" in cookies and "xsecappid" in cookies and "webId" in cookies and "acw_tc" in cookies and "gid" in cookies and "websectiga" in cookies and "sec_poison_id" in cookies: + break + await asyncio.sleep(1) + return cookies + + async def creatorGenerateInitCookies(self, headless=True): + async with async_playwright() as p: + browser = await p.chromium.launch( + headless=headless, + args=[ + '--disable-blink-features=AutomationControlled', + ], + ) + page = await browser.new_page() + await page.goto(self.home_url + '/login') + cookies = await self.creatorCheckInitCookies(page) + await browser.close() + return cookies + + # 手机验证码登录 + async def creatorGeneratePhoneCode(self, phone, cookies): + res_json = None + try: + api = "/api/cas/customer/web/verify-code" + data = { + "service": "https://creator.xiaohongshu.com", + "phone": phone, + "zone": "86" + } + headers = get_common_headers() + xs, xt, data = generate_xs(cookies['a1'], api, data) + headers['x-s'], headers['x-t'] = xs, str(xt) + async with aiohttp.ClientSession() as session: + async with session.post(self.base_url + api, headers=headers, cookies=cookies, data=data) as response: + res_json = await response.json() + success, msg = res_json['success'], res_json['msg'] + except Exception as e: + return False, str(e), res_json + return success, msg, res_json + + async def creatorLoginByPhone(self, phone, code, cookies): + res_json = None + try: + api = "/api/cas/customer/web/service-ticket" + data = { + "service": "https://creator.xiaohongshu.com", + "zone": "86", + "phone": phone, + "verify_code": code, + "source": "", + "type": 'phoneVerifyCode' + } + headers = get_common_headers() + xs, xt, data = generate_xs(cookies['a1'], api, data) + headers['x-s'], headers['x-t'] = xs, str(xt) + async with aiohttp.ClientSession() as session: + async with session.post(self.base_url + api, headers=headers, cookies=cookies, data=data) as response: + res_json = await response.json() + success, msg = res_json['success'], res_json['msg'] + add_cookies = dict() + return_cookies = response.cookies + for item in return_cookies.keys(): + add_cookies[return_cookies[item].key] = return_cookies[item].value + cookies.update(add_cookies) + except Exception as e: + return False, str(e), res_json + return success, msg, { + "cookies": cookies, + "res_json": res_json + } + + # 二维码扫描登录 + async def creatorGenerateQRcode(self, cookies): + try: + api = '/api/cas/customer/web/qr-code' + data = { + "service": "https://creator.xiaohongshu.com" + } + headers = get_common_headers() + xs, xt, data = generate_xs(cookies['a1'], api, data) + headers['x-s'], headers['x-t'] = xs, str(xt) + async with aiohttp.ClientSession() as session: + async with session.post(self.base_url + api, headers=headers, cookies=cookies, data=data) as response: + res = await response.json() + qr_id, verify_url = res['data']['id'], res["data"]["url"] + success, msg = res['success'], res['msg'] + except Exception as e: + return False, str(e), { + "cookies": cookies, + "qr_id": None, + "verify_url": None + } + return success, msg, { + "cookies": cookies, + "qr_id": qr_id, + "verify_url": verify_url + } + + async def creatorCheckQRCodeLogin(self, qr_id, cookies): + params = { + "service": "https://creator.xiaohongshu.com", + "qr_code_id": qr_id, + "source": "" + } + ticket = None + try: + api = f"/api/cas/customer/web/qr-code" + splice_api = splice_str(api, params) + headers = get_common_headers() + xs, xt, _ = generate_xs(cookies['a1'], api) + headers['x-s'], headers['x-t'] = xs, str(xt) + async with aiohttp.ClientSession() as session: + async with session.get(self.base_url + splice_api, headers=headers, cookies=cookies) as response: + res = await response.json() + success, msg = res['success'], res['msg'] + code_status = res['data']['status'] + if code_status == 1: + add_cookies = dict() + return_cookies = response.cookies + for item in return_cookies.keys(): + add_cookies[return_cookies[item].key] = return_cookies[item].value + cookies.update(add_cookies) + ticket = res['data'].get('ticket', None) + msg = "验证成功" + elif code_status == 2: + msg = "请扫描二维码" + elif code_status == 3: + msg = "请确认登录" + elif code_status == -1: + msg = "验证码过期" + raise Exception(msg) + else: + msg = "未知错误" + raise Exception(msg) + except Exception as e: + success, msg = False, str(e) + return success, msg, { + 'cookies': cookies, + 'ticket': ticket + } + + async def creatorLoginStep1(self, ticket, cookies): + api = "/sso/customer_login" + data = { + "ticket": ticket, + "login_service": "https://creator.xiaohongshu.com", + "subsystem_alias": "creator", + "set_global_domain": True + } + msg = '成功' + try: + headers = get_common_headers() + xs, xt, data = generate_xs(cookies['a1'], api, data) + headers['x-s'], headers['x-t'] = xs, str(xt) + async with aiohttp.ClientSession() as session: + async with session.post(self.home_url + api, headers=headers, cookies=cookies, data=data) as response: + res = await response.json() + success = res['success'] + add_cookies = dict() + return_cookies = response.cookies + for item in return_cookies.keys(): + add_cookies[return_cookies[item].key] = return_cookies[item].value + cookies.update(add_cookies) + except Exception as e: + success, msg = False, str(e) + return success, msg, { + 'cookies': cookies, + "userInfo": res + } + + async def creatorLoginStep2(self, cookies): + api = "/api/galaxy/user/cas/login" + msg = '成功' + try: + headers = get_common_headers() + xs, xt, _ = generate_xs(cookies['a1'], api) + headers['x-s'], headers['x-t'] = xs, str(xt) + async with aiohttp.ClientSession() as session: + async with session.post(self.home_url + api, headers=headers, cookies=cookies) as response: + res = await response.json() + success = res['success'] + add_cookies = dict() + return_cookies = response.cookies + for item in return_cookies.keys(): + add_cookies[return_cookies[item].key] = return_cookies[item].value + cookies.update(add_cookies) + except Exception as e: + success, msg = False, str(e) + return success, msg, cookies + + def transfer_cookies(self, cookies): + cookies_str = "" + for key, value in cookies.items(): + cookies_str += f"{key}={value}; " + cookies_str = cookies_str[:-2] + return cookies_str + + def generateQrcode(self, verify_url): + qr = qrcode.QRCode( + version=1, + error_correction=qrcode.constants.ERROR_CORRECT_L, + box_size=10, + border=4, + ) + qr.add_data(verify_url) + qr.make(fit=True) + img = qr.make_image(fill_color="black", back_color="white") + img.show() + + async def qrcodeMain(self): + cookies = await self.creatorGenerateInitCookies() + print('获取初始cookies') + success, msg, qrcode_dict = await self.creatorGenerateQRcode(cookies) + print('获取二维码', success, msg) + print(qrcode_dict) + qrcode_thread = Thread(target=self.generateQrcode, args=(qrcode_dict['verify_url'],)) + qrcode_thread.start() + while True: + success, msg, res = await self.creatorCheckQRCodeLogin(qrcode_dict['qr_id'], qrcode_dict['cookies']) + print('检查二维码登录', success, msg) + print(res) + if msg == "验证成功": + cookies = res['cookies'] + ticket = res['ticket'] + break + await asyncio.sleep(10) + + if ticket is None: + print('登录成功') + else: + print('需要ticket继续认证') + success, msg, res = await self.creatorLoginStep1(ticket, cookies) + print('ticket认证第一步', success, msg) + print(res) + cookies = res['cookies'] + userInfo = res['userInfo'] + success, msg, cookies = await self.creatorLoginStep2(cookies) + print('ticket认证第二步', success, msg) + print(cookies) + print('登录成功') + cookies_str = self.transfer_cookies(cookies) + print(f'cookies_str: {cookies_str}') + + async def phoneMain(self): + cookies = await self.creatorGenerateInitCookies() + print('获取初始cookies') + phone_num = "" + phone_num = "" + success, msg, res_json = await self.creatorGeneratePhoneCode(phone_num, cookies) + print('获取手机验证码', success, msg, res_json) + code = input("请输入验证码:") + success, msg, res_json = await self.creatorLoginByPhone(phone_num, code, cookies) + print('手机验证码登录', success, msg, res_json) + cookies = res_json['cookies'] + cookies_str = self.transfer_cookies(cookies) + print(f'cookies_str: {cookies_str}') + self.test(cookies_str) + + def test(self, cookies_str): + xhs_creator_apis = XHS_Creator_Apis() + noteInfos = [ + { + # 标题 + "title": "我是笨蛋", + # 描述 + "desc": "我", + # 13位时间戳 数字类型 + "postTime": None, + # 设置地点 "河海大学" + "location": None, + # 0:公开 1:私密 + "type": 1, + "topics": ["测试"], + "media_type": "image", + # 图片路径 最多15张 + "images": [ + open(r"D:\Desktop\Data\images\temp\22.jpg", 'rb').read(), + open(r"D:\Desktop\Data\images\temp\22.jpg", 'rb').read(), + ], + }, + ] + for noteInfo in noteInfos: + success, msg, info = xhs_creator_apis.post_note(noteInfo, cookies_str) + print(success, msg, info) + print('========') + + +if __name__ == '__main__': + login_util = XHSLoginApi() + loop = asyncio.get_event_loop() + # loop.run_until_complete(login_util.qrcodeMain()) + loop.run_until_complete(login_util.phoneMain()) diff --git a/apis/xhs_pc_apis.py b/apis/xhs_pc_apis.py new file mode 100644 index 0000000000000000000000000000000000000000..0a6b3c3bc06f268c4cfcd8a015573e930f4195c0 --- /dev/null +++ b/apis/xhs_pc_apis.py @@ -0,0 +1,1050 @@ +# encoding: utf-8 +import json +import re +import urllib +from xhs_utils.xhs_util import splice_str, generate_request_params, generate_x_b3_traceid, get_common_headers +from loguru import logger +from xhs_utils.http_client import HttpClient + +""" + 获小红书的api + :param cookies_str: 你的cookies +""" +class XHS_Apis(): + def __init__(self): + self.base_url = "https://edith.xiaohongshu.com" + self.client = HttpClient() + + def get_homefeed_all_channel(self, cookies_str: str, proxies: dict = None): + """ + 获取主页的所有频道 + 返回主页的所有频道 + """ + res_json = None + try: + api = "/api/sns/web/v1/homefeed/category" + headers, cookies, data = generate_request_params(cookies_str, api, '', 'GET') + result = self.client.request_json("GET", self.base_url + api, headers=headers, cookies=cookies, proxies=proxies) + res_json = result.json + if not result.ok: + raise Exception(result.msg) + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success = False + msg = str(e) + return success, msg, res_json + + def get_homefeed_recommend(self, category, cursor_score, refresh_type, note_index, cookies_str: str, proxies: dict = None): + """ + 获取主页推荐的笔记 + :param category: 你想要获取的频道 + :param cursor_score: 你想要获取的笔记的cursor + :param refresh_type: 你想要获取的笔记的刷新类型 + :param note_index: 你想要获取的笔记的index + :param cookies_str: 你的cookies + 返回主页推荐的笔记 + """ + res_json = None + try: + api = f"/api/sns/web/v1/homefeed" + data = { + "cursor_score": cursor_score, + "num": 20, + "refresh_type": refresh_type, + "note_index": note_index, + "unread_begin_note_id": "", + "unread_end_note_id": "", + "unread_note_count": 0, + "category": category, + "search_key": "", + "need_num": 10, + "image_formats": [ + "jpg", + "webp", + "avif" + ], + "need_filter_image": False + } + headers, cookies, trans_data = generate_request_params(cookies_str, api, data, 'POST') + result = self.client.request_json("POST", self.base_url + api, headers=headers, cookies=cookies, data=trans_data, proxies=proxies) + res_json = result.json + if not result.ok: + raise Exception(result.msg) + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success = False + msg = str(e) + return success, msg, res_json + + def get_homefeed_recommend_by_num(self, category, require_num, cookies_str: str, proxies: dict = None): + """ + 根据数量获取主页推荐的笔记 + :param category: 你想要获取的频道 + :param require_num: 你想要获取的笔记的数量 + :param cookies_str: 你的cookies + 根据数量返回主页推荐的笔记 + """ + cursor_score, refresh_type, note_index = "", 1, 0 + note_list = [] + try: + while True: + success, msg, res_json = self.get_homefeed_recommend(category, cursor_score, refresh_type, note_index, cookies_str, proxies) + if not success: + raise Exception(msg) + if "items" not in res_json["data"]: + break + notes = res_json["data"]["items"] + note_list.extend(notes) + cursor_score = res_json["data"]["cursor_score"] + refresh_type = 3 + note_index += 20 + if len(note_list) > require_num: + break + except Exception as e: + success = False + msg = str(e) + if len(note_list) > require_num: + note_list = note_list[:require_num] + return success, msg, note_list + + def get_user_info(self, user_id: str, cookies_str: str, proxies: dict = None): + """ + 获取用户的信息 + :param user_id: 你想要获取的用户的id + :param cookies_str: 你的cookies + 返回用户的信息 + """ + res_json = None + try: + api = f"/api/sns/web/v1/user/otherinfo" + params = { + "target_user_id": user_id + } + splice_api = splice_str(api, params) + headers, cookies, data = generate_request_params(cookies_str, splice_api, '', 'GET') + result = self.client.request_json("GET", self.base_url + splice_api, headers=headers, cookies=cookies, proxies=proxies) + res_json = result.json + if not result.ok: + raise Exception(result.msg) + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success = False + msg = str(e) + return success, msg, res_json + + def get_user_self_info(self, cookies_str: str, proxies: dict = None): + """ + 获取用户自己的信息1 + :param cookies_str: 你的cookies + 返回用户自己的信息1 + """ + res_json = None + try: + api = f"/api/sns/web/v1/user/selfinfo" + headers, cookies, data = generate_request_params(cookies_str, api, '', 'GET') + result = self.client.request_json("GET", self.base_url + api, headers=headers, cookies=cookies, proxies=proxies) + res_json = result.json + if not result.ok: + raise Exception(result.msg) + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success = False + msg = str(e) + return success, msg, res_json + + + def get_user_self_info2(self, cookies_str: str, proxies: dict = None): + """ + 获取用户自己的信息2 + :param cookies_str: 你的cookies + 返回用户自己的信息2 + """ + res_json = None + try: + api = f"/api/sns/web/v2/user/me" + headers, cookies, data = generate_request_params(cookies_str, api, '', 'GET') + result = self.client.request_json("GET", self.base_url + api, headers=headers, cookies=cookies, proxies=proxies) + res_json = result.json + if not result.ok: + raise Exception(result.msg) + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success = False + msg = str(e) + return success, msg, res_json + + def get_user_note_info(self, user_id: str, cursor: str, cookies_str: str, xsec_token='', xsec_source='', proxies: dict = None): + """ + 获取用户指定位置的笔记 + :param user_id: 你想要获取的用户的id + :param cursor: 你想要获取的笔记的cursor + :param cookies_str: 你的cookies + 返回用户指定位置的笔记 + """ + res_json = None + try: + api = f"/api/sns/web/v1/user_posted" + params = { + "num": "30", + "cursor": cursor, + "user_id": user_id, + "image_formats": "jpg,webp,avif", + "xsec_token": xsec_token, + "xsec_source": xsec_source, + } + splice_api = splice_str(api, params) + headers, cookies, data = generate_request_params(cookies_str, splice_api, '', 'GET') + result = self.client.request_json("GET", self.base_url + splice_api, headers=headers, cookies=cookies, proxies=proxies) + res_json = result.json + if not result.ok: + raise Exception(result.msg) + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success = False + msg = str(e) + return success, msg, res_json + + + def get_user_all_notes(self, user_url: str, cookies_str: str, proxies: dict = None): + """ + 获取用户所有笔记 + :param user_id: 你想要获取的用户的id + :param cookies_str: 你的cookies + 返回用户的所有笔记 + """ + cursor = '' + note_list = [] + try: + urlParse = urllib.parse.urlparse(user_url) + user_id = urlParse.path.split("/")[-1] + kvs = urlParse.query.split('&') + kvDist = {kv.split('=')[0]: kv.split('=')[1] for kv in kvs} + xsec_token = kvDist['xsec_token'] if 'xsec_token' in kvDist else "" + xsec_source = kvDist['xsec_source'] if 'xsec_source' in kvDist else "pc_search" + while True: + success, msg, res_json = self.get_user_note_info(user_id, cursor, cookies_str, xsec_token, xsec_source, proxies) + if not success: + raise Exception(msg) + notes = res_json["data"]["notes"] + if 'cursor' in res_json["data"]: + cursor = str(res_json["data"]["cursor"]) + else: + break + note_list.extend(notes) + if len(notes) == 0 or not res_json["data"]["has_more"]: + break + except Exception as e: + success = False + msg = str(e) + return success, msg, note_list + + def get_user_like_note_info(self, user_id: str, cursor: str, cookies_str: str, xsec_token='', xsec_source='', proxies: dict = None): + """ + 获取用户指定位置喜欢的笔记 + :param user_id: 你想要获取的用户的id + :param cursor: 你想要获取的笔记的cursor + :param cookies_str: 你的cookies + 返回用户指定位置喜欢的笔记 + """ + res_json = None + try: + api = f"/api/sns/web/v1/note/like/page" + params = { + "num": "30", + "cursor": cursor, + "user_id": user_id, + "image_formats": "jpg,webp,avif", + "xsec_token": xsec_token, + "xsec_source": xsec_source, + } + splice_api = splice_str(api, params) + headers, cookies, data = generate_request_params(cookies_str, splice_api, '', 'GET') + result = self.client.request_json("GET", self.base_url + splice_api, headers=headers, cookies=cookies, proxies=proxies) + res_json = result.json + if not result.ok: + raise Exception(result.msg) + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success = False + msg = str(e) + return success, msg, res_json + + def get_user_all_like_note_info(self, user_url: str, cookies_str: str, proxies: dict = None): + """ + 获取用户所有喜欢笔记 + :param user_id: 你想要获取的用户的id + :param cookies_str: 你的cookies + 返回用户的所有喜欢笔记 + """ + cursor = '' + note_list = [] + try: + urlParse = urllib.parse.urlparse(user_url) + user_id = urlParse.path.split("/")[-1] + kvs = urlParse.query.split('&') + kvDist = {kv.split('=')[0]: kv.split('=')[1] for kv in kvs} + xsec_token = kvDist['xsec_token'] if 'xsec_token' in kvDist else "" + xsec_source = kvDist['xsec_source'] if 'xsec_source' in kvDist else "pc_user" + while True: + success, msg, res_json = self.get_user_like_note_info(user_id, cursor, cookies_str, xsec_token, + xsec_source, proxies) + if not success: + raise Exception(msg) + notes = res_json["data"]["notes"] + if 'cursor' in res_json["data"]: + cursor = str(res_json["data"]["cursor"]) + else: + break + note_list.extend(notes) + if len(notes) == 0 or not res_json["data"]["has_more"]: + break + except Exception as e: + success = False + msg = str(e) + return success, msg, note_list + + def get_user_collect_note_info(self, user_id: str, cursor: str, cookies_str: str, xsec_token='', xsec_source='', proxies: dict = None): + """ + 获取用户指定位置收藏的笔记 + :param user_id: 你想要获取的用户的id + :param cursor: 你想要获取的笔记的cursor + :param cookies_str: 你的cookies + 返回用户指定位置收藏的笔记 + """ + res_json = None + try: + api = f"/api/sns/web/v2/note/collect/page" + params = { + "num": "30", + "cursor": cursor, + "user_id": user_id, + "image_formats": "jpg,webp,avif", + "xsec_token": xsec_token, + "xsec_source": xsec_source, + } + splice_api = splice_str(api, params) + headers, cookies, data = generate_request_params(cookies_str, splice_api, '', 'GET') + result = self.client.request_json("GET", self.base_url + splice_api, headers=headers, cookies=cookies, proxies=proxies) + res_json = result.json + if not result.ok: + raise Exception(result.msg) + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success = False + msg = str(e) + return success, msg, res_json + + def get_user_all_collect_note_info(self, user_url: str, cookies_str: str, proxies: dict = None): + """ + 获取用户所有收藏笔记 + :param user_id: 你想要获取的用户的id + :param cookies_str: 你的cookies + 返回用户的所有收藏笔记 + """ + cursor = '' + note_list = [] + try: + urlParse = urllib.parse.urlparse(user_url) + user_id = urlParse.path.split("/")[-1] + kvs = urlParse.query.split('&') + kvDist = {kv.split('=')[0]: kv.split('=')[1] for kv in kvs} + xsec_token = kvDist['xsec_token'] if 'xsec_token' in kvDist else "" + xsec_source = kvDist['xsec_source'] if 'xsec_source' in kvDist else "pc_search" + while True: + success, msg, res_json = self.get_user_collect_note_info(user_id, cursor, cookies_str, xsec_token, + xsec_source, proxies) + if not success: + raise Exception(msg) + notes = res_json["data"]["notes"] + if 'cursor' in res_json["data"]: + cursor = str(res_json["data"]["cursor"]) + else: + break + note_list.extend(notes) + if len(notes) == 0 or not res_json["data"]["has_more"]: + break + except Exception as e: + success = False + msg = str(e) + return success, msg, note_list + + def get_note_info(self, url: str, cookies_str: str, proxies: dict = None): + """ + 获取笔记的详细 + :param url: 你想要获取的笔记的url + :param cookies_str: 你的cookies + :param xsec_source: 你的xsec_source 默认为pc_search pc_user pc_feed + 返回笔记的详细 + """ + res_json = None + try: + urlParse = urllib.parse.urlparse(url) + note_id = urlParse.path.split("/")[-1] + kvs = urlParse.query.split('&') + kvDist = {kv.split('=')[0]: kv.split('=')[1] for kv in kvs} + api = f"/api/sns/web/v1/feed" + data = { + "source_note_id": note_id, + "image_formats": [ + "jpg", + "webp", + "avif" + ], + "extra": { + "need_body_topic": "1" + }, + "xsec_source": kvDist['xsec_source'] if 'xsec_source' in kvDist else "pc_search", + "xsec_token": kvDist['xsec_token'] + } + headers, cookies, data = generate_request_params(cookies_str, api, data, 'POST') + result = self.client.request_json("POST", self.base_url + api, headers=headers, cookies=cookies, data=data, proxies=proxies) + res_json = result.json + if not result.ok: + raise Exception(result.msg) + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success = False + msg = str(e) + return success, msg, res_json + + + def get_search_keyword(self, word: str, cookies_str: str, proxies: dict = None): + """ + 获取搜索关键词 + :param word: 你的关键词 + :param cookies_str: 你的cookies + 返回搜索关键词 + """ + res_json = None + try: + api = "/api/sns/web/v1/search/recommend" + params = { + "keyword": urllib.parse.quote(word) + } + splice_api = splice_str(api, params) + headers, cookies, data = generate_request_params(cookies_str, splice_api, '', 'GET') + result = self.client.request_json("GET", self.base_url + splice_api, headers=headers, cookies=cookies, proxies=proxies) + res_json = result.json + if not result.ok: + raise Exception(result.msg) + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success = False + msg = str(e) + return success, msg, res_json + + def search_note(self, query: str, cookies_str: str, page=1, sort_type_choice=0, note_type=0, note_time=0, note_range=0, pos_distance=0, geo="", proxies: dict = None): + """ + 获取搜索笔记的结果 + :param query 搜索的关键词 + :param cookies_str 你的cookies + :param page 搜索的页数 + :param sort_type_choice 排序方式 0 综合排序, 1 最新, 2 最多点赞, 3 最多评论, 4 最多收藏 + :param note_type 笔记类型 0 不限, 1 视频笔记, 2 普通笔记 + :param note_time 笔记时间 0 不限, 1 一天内, 2 一周内天, 3 半年内 + :param note_range 笔记范围 0 不限, 1 已看过, 2 未看过, 3 已关注 + :param pos_distance 位置距离 0 不限, 1 同城, 2 附近 指定这个必须要指定 geo + 返回搜索的结果 + """ + res_json = None + sort_type = "general" + if sort_type_choice == 1: + sort_type = "time_descending" + elif sort_type_choice == 2: + sort_type = "popularity_descending" + elif sort_type_choice == 3: + sort_type = "comment_descending" + elif sort_type_choice == 4: + sort_type = "collect_descending" + filter_note_type = "不限" + if note_type == 1: + filter_note_type = "视频笔记" + elif note_type == 2: + filter_note_type = "普通笔记" + filter_note_time = "不限" + if note_time == 1: + filter_note_time = "一天内" + elif note_time == 2: + filter_note_time = "一周内" + elif note_time == 3: + filter_note_time = "半年内" + filter_note_range = "不限" + if note_range == 1: + filter_note_range = "已看过" + elif note_range == 2: + filter_note_range = "未看过" + elif note_range == 3: + filter_note_range = "已关注" + filter_pos_distance = "不限" + if pos_distance == 1: + filter_pos_distance = "同城" + elif pos_distance == 2: + filter_pos_distance = "附近" + if geo: + geo = json.dumps(geo, separators=(',', ':')) + try: + api = "/api/sns/web/v1/search/notes" + data = { + "keyword": query, + "page": page, + "page_size": 20, + "search_id": generate_x_b3_traceid(21), + "sort": "general", + "note_type": 0, + "ext_flags": [], + "filters": [ + { + "tags": [ + sort_type + ], + "type": "sort_type" + }, + { + "tags": [ + filter_note_type + ], + "type": "filter_note_type" + }, + { + "tags": [ + filter_note_time + ], + "type": "filter_note_time" + }, + { + "tags": [ + filter_note_range + ], + "type": "filter_note_range" + }, + { + "tags": [ + filter_pos_distance + ], + "type": "filter_pos_distance" + } + ], + "geo": geo, + "image_formats": [ + "jpg", + "webp", + "avif" + ] + } + headers, cookies, data = generate_request_params(cookies_str, api, data, 'POST') + result = self.client.request_json("POST", self.base_url + api, headers=headers, cookies=cookies, data=data, proxies=proxies) + res_json = result.json + if not result.ok: + raise Exception(result.msg) + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success = False + msg = str(e) + return success, msg, res_json + + def search_some_note(self, query: str, require_num: int, cookies_str: str, sort_type_choice=0, note_type=0, note_time=0, note_range=0, pos_distance=0, geo="", proxies: dict = None): + """ + 指定数量搜索笔记,设置排序方式和笔记类型和笔记数量 + :param query 搜索的关键词 + :param require_num 搜索的数量 + :param cookies_str 你的cookies + :param sort_type_choice 排序方式 0 综合排序, 1 最新, 2 最多点赞, 3 最多评论, 4 最多收藏 + :param note_type 笔记类型 0 不限, 1 视频笔记, 2 普通笔记 + :param note_time 笔记时间 0 不限, 1 一天内, 2 一周内天, 3 半年内 + :param note_range 笔记范围 0 不限, 1 已看过, 2 未看过, 3 已关注 + :param pos_distance 位置距离 0 不限, 1 同城, 2 附近 指定这个必须要指定 geo + :param geo: 定位信息 经纬度 + 返回搜索的结果 + """ + page = 1 + note_list = [] + try: + while True: + success, msg, res_json = self.search_note(query, cookies_str, page, sort_type_choice, note_type, note_time, note_range, pos_distance, geo, proxies) + if not success: + raise Exception(msg) + if "items" not in res_json["data"]: + break + notes = res_json["data"]["items"] + note_list.extend(notes) + page += 1 + if len(note_list) >= require_num or not res_json["data"]["has_more"]: + break + except Exception as e: + success = False + msg = str(e) + if len(note_list) > require_num: + note_list = note_list[:require_num] + return success, msg, note_list + + def search_user(self, query: str, cookies_str: str, page=1, proxies: dict = None): + """ + 获取搜索用户的结果 + :param query 搜索的关键词 + :param cookies_str 你的cookies + :param page 搜索的页数 + 返回搜索的结果 + """ + res_json = None + try: + api = "/api/sns/web/v1/search/usersearch" + data = { + "search_user_request": { + "keyword": query, + "search_id": "2dn9they1jbjxwawlo4xd", + "page": page, + "page_size": 15, + "biz_type": "web_search_user", + "request_id": "22471139-1723999898524" + } + } + headers, cookies, data = generate_request_params(cookies_str, api, data, 'POST') + result = self.client.request_json("POST", self.base_url + api, headers=headers, cookies=cookies, data=data, proxies=proxies) + res_json = result.json + if not result.ok: + raise Exception(result.msg) + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success = False + msg = str(e) + return success, msg, res_json + + def search_some_user(self, query: str, require_num: int, cookies_str: str, proxies: dict = None): + """ + 指定数量搜索用户 + :param query 搜索的关键词 + :param require_num 搜索的数量 + :param cookies_str 你的cookies + 返回搜索的结果 + """ + page = 1 + user_list = [] + try: + while True: + success, msg, res_json = self.search_user(query, cookies_str, page, proxies) + if not success: + raise Exception(msg) + if "users" not in res_json["data"]: + break + users = res_json["data"]["users"] + user_list.extend(users) + page += 1 + if len(user_list) >= require_num or not res_json["data"]["has_more"]: + break + except Exception as e: + success = False + msg = str(e) + if len(user_list) > require_num: + user_list = user_list[:require_num] + return success, msg, user_list + + def get_note_out_comment(self, note_id: str, cursor: str, xsec_token: str, cookies_str: str, proxies: dict = None): + """ + 获取指定位置的笔记一级评论 + :param note_id 笔记的id + :param cursor 指定位置的评论的cursor + :param cookies_str 你的cookies + 返回指定位置的笔记一级评论 + """ + res_json = None + try: + api = "/api/sns/web/v2/comment/page" + params = { + "note_id": note_id, + "cursor": cursor, + "top_comment_id": "", + "image_formats": "jpg,webp,avif", + "xsec_token": xsec_token + } + splice_api = splice_str(api, params) + headers, cookies, data = generate_request_params(cookies_str, splice_api, '', 'GET') + result = self.client.request_json("GET", self.base_url + splice_api, headers=headers, cookies=cookies, proxies=proxies) + res_json = result.json + if not result.ok: + raise Exception(result.msg) + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success = False + msg = str(e) + return success, msg, res_json + + def get_note_all_out_comment(self, note_id: str, xsec_token: str, cookies_str: str, proxies: dict = None): + """ + 获取笔记的全部一级评论 + :param note_id 笔记的id + :param cookies_str 你的cookies + 返回笔记的全部一级评论 + """ + cursor = '' + note_out_comment_list = [] + try: + while True: + success, msg, res_json = self.get_note_out_comment(note_id, cursor, xsec_token, cookies_str, proxies) + if not success: + raise Exception(msg) + comments = res_json["data"]["comments"] + if 'cursor' in res_json["data"]: + cursor = str(res_json["data"]["cursor"]) + else: + break + note_out_comment_list.extend(comments) + if len(note_out_comment_list) == 0 or not res_json["data"]["has_more"]: + break + except Exception as e: + success = False + msg = str(e) + return success, msg, note_out_comment_list + + def get_note_inner_comment(self, comment: dict, cursor: str, xsec_token: str, cookies_str: str, proxies: dict = None): + """ + 获取指定位置的笔记二级评论 + :param comment 笔记的一级评论 + :param cursor 指定位置的评论的cursor + :param cookies_str 你的cookies + 返回指定位置的笔记二级评论 + """ + res_json = None + try: + api = "/api/sns/web/v2/comment/sub/page" + params = { + "note_id": comment['note_id'], + "root_comment_id": comment['id'], + "num": "10", + "cursor": cursor, + "image_formats": "jpg,webp,avif", + "top_comment_id": '', + "xsec_token": xsec_token + } + splice_api = splice_str(api, params) + headers, cookies, data = generate_request_params(cookies_str, splice_api, '', 'GET') + result = self.client.request_json("GET", self.base_url + splice_api, headers=headers, cookies=cookies, proxies=proxies) + res_json = result.json + if not result.ok: + raise Exception(result.msg) + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success = False + msg = str(e) + return success, msg, res_json + + def get_note_all_inner_comment(self, comment: dict, xsec_token: str, cookies_str: str, proxies: dict = None): + """ + 获取笔记的全部二级评论 + :param comment 笔记的一级评论 + :param cookies_str 你的cookies + 返回笔记的全部二级评论 + """ + try: + if not comment['sub_comment_has_more']: + return True, 'success', comment + cursor = comment['sub_comment_cursor'] + inner_comment_list = [] + while True: + success, msg, res_json = self.get_note_inner_comment(comment, cursor, xsec_token, cookies_str, proxies) + if not success: + raise Exception(msg) + comments = res_json["data"]["comments"] + if 'cursor' in res_json["data"]: + cursor = str(res_json["data"]["cursor"]) + else: + break + inner_comment_list.extend(comments) + if not res_json["data"]["has_more"]: + break + comment['sub_comments'].extend(inner_comment_list) + except Exception as e: + success = False + msg = str(e) + return success, msg, comment + + def get_note_all_comment(self, url: str, cookies_str: str, proxies: dict = None): + """ + 获取一篇文章的所有评论 + :param note_id: 你想要获取的笔记的id + :param cookies_str: 你的cookies + 返回一篇文章的所有评论 + """ + out_comment_list = [] + try: + urlParse = urllib.parse.urlparse(url) + note_id = urlParse.path.split("/")[-1] + kvs = urlParse.query.split('&') + kvDist = {kv.split('=')[0]: kv.split('=')[1] for kv in kvs} + success, msg, out_comment_list = self.get_note_all_out_comment(note_id, kvDist['xsec_token'], cookies_str, proxies) + if not success: + raise Exception(msg) + for comment in out_comment_list: + success, msg, new_comment = self.get_note_all_inner_comment(comment, kvDist['xsec_token'], cookies_str, proxies) + if not success: + raise Exception(msg) + except Exception as e: + success = False + msg = str(e) + return success, msg, out_comment_list + + def get_unread_message(self, cookies_str: str, proxies: dict = None): + """ + 获取未读消息 + :param cookies_str: 你的cookies + 返回未读消息 + """ + res_json = None + try: + api = "/api/sns/web/unread_count" + headers, cookies, data = generate_request_params(cookies_str, api, '', 'GET') + result = self.client.request_json("GET", self.base_url + api, headers=headers, cookies=cookies, proxies=proxies) + res_json = result.json + if not result.ok: + raise Exception(result.msg) + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success = False + msg = str(e) + return success, msg, res_json + + def get_metions(self, cursor: str, cookies_str: str, proxies: dict = None): + """ + 获取评论和@提醒 + :param cursor: 你想要获取的评论和@提醒的cursor + :param cookies_str: 你的cookies + 返回评论和@提醒 + """ + res_json = None + try: + api = "/api/sns/web/v1/you/mentions" + params = { + "num": "20", + "cursor": cursor + } + splice_api = splice_str(api, params) + headers, cookies, data = generate_request_params(cookies_str, splice_api, '', 'GET') + result = self.client.request_json("GET", self.base_url + splice_api, headers=headers, cookies=cookies, proxies=proxies) + res_json = result.json + if not result.ok: + raise Exception(result.msg) + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success = False + msg = str(e) + return success, msg, res_json + + def get_all_metions(self, cookies_str: str, proxies: dict = None): + """ + 获取全部的评论和@提醒 + :param cookies_str: 你的cookies + 返回全部的评论和@提醒 + """ + cursor = '' + metions_list = [] + try: + while True: + success, msg, res_json = self.get_metions(cursor, cookies_str, proxies) + if not success: + raise Exception(msg) + metions = res_json["data"]["message_list"] + if 'cursor' in res_json["data"]: + cursor = str(res_json["data"]["cursor"]) + else: + break + metions_list.extend(metions) + if not res_json["data"]["has_more"]: + break + except Exception as e: + success = False + msg = str(e) + return success, msg, metions_list + + def get_likesAndcollects(self, cursor: str, cookies_str: str, proxies: dict = None): + """ + 获取赞和收藏 + :param cursor: 你想要获取的赞和收藏的cursor + :param cookies_str: 你的cookies + 返回赞和收藏 + """ + res_json = None + try: + api = "/api/sns/web/v1/you/likes" + params = { + "num": "20", + "cursor": cursor + } + splice_api = splice_str(api, params) + headers, cookies, data = generate_request_params(cookies_str, splice_api, '', 'GET') + result = self.client.request_json("GET", self.base_url + splice_api, headers=headers, cookies=cookies, proxies=proxies) + res_json = result.json + if not result.ok: + raise Exception(result.msg) + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success = False + msg = str(e) + return success, msg, res_json + + def get_all_likesAndcollects(self, cookies_str: str, proxies: dict = None): + """ + 获取全部的赞和收藏 + :param cookies_str: 你的cookies + 返回全部的赞和收藏 + """ + cursor = '' + likesAndcollects_list = [] + try: + while True: + success, msg, res_json = self.get_likesAndcollects(cursor, cookies_str, proxies) + if not success: + raise Exception(msg) + likesAndcollects = res_json["data"]["message_list"] + if 'cursor' in res_json["data"]: + cursor = str(res_json["data"]["cursor"]) + else: + break + likesAndcollects_list.extend(likesAndcollects) + if not res_json["data"]["has_more"]: + break + except Exception as e: + success = False + msg = str(e) + return success, msg, likesAndcollects_list + + def get_new_connections(self, cursor: str, cookies_str: str, proxies: dict = None): + """ + 获取新增关注 + :param cursor: 你想要获取的新增关注的cursor + :param cookies_str: 你的cookies + 返回新增关注 + """ + res_json = None + try: + api = "/api/sns/web/v1/you/connections" + params = { + "num": "20", + "cursor": cursor + } + splice_api = splice_str(api, params) + headers, cookies, data = generate_request_params(cookies_str, splice_api, '', 'GET') + result = self.client.request_json("GET", self.base_url + splice_api, headers=headers, cookies=cookies, proxies=proxies) + res_json = result.json + if not result.ok: + raise Exception(result.msg) + success, msg = res_json["success"], res_json["msg"] + except Exception as e: + success = False + msg = str(e) + return success, msg, res_json + + def get_all_new_connections(self, cookies_str: str, proxies: dict = None): + """ + 获取全部的新增关注 + :param cookies_str: 你的cookies + 返回全部的新增关注 + """ + cursor = '' + connections_list = [] + try: + while True: + success, msg, res_json = self.get_new_connections(cursor, cookies_str, proxies) + if not success: + raise Exception(msg) + connections = res_json["data"]["message_list"] + if 'cursor' in res_json["data"]: + cursor = str(res_json["data"]["cursor"]) + else: + break + connections_list.extend(connections) + if not res_json["data"]["has_more"]: + break + except Exception as e: + success = False + msg = str(e) + return success, msg, connections_list + + @staticmethod + def get_note_no_water_video(note_id): + """ + 获取笔记无水印视频 + :param note_id: 你想要获取的笔记的id + 返回笔记无水印视频 + """ + success = True + msg = '成功' + video_addr = None + try: + headers = get_common_headers() + url = f"https://www.xiaohongshu.com/explore/{note_id}" + client = HttpClient() + result = client.request_text("GET", url, headers=headers) + if not result.ok: + raise Exception(result.msg) + res = result.text + video_addr = re.findall(r'', res)[0] + except Exception as e: + success = False + msg = str(e) + return success, msg, video_addr + + + @staticmethod + def get_note_no_water_img(img_url): + """ + 获取笔记无水印图片 + :param img_url: 你想要获取的图片的url + 返回笔记无水印图片 + """ + success = True + msg = '成功' + new_url = None + try: + # 新版图片资源优先保留 notes_pre_post token,使用 ci.xiaohongshu.com 输出 JPEG。 + # 例: + # https://sns-webpic-qc.xhscdn.com/