XHS / importer /mapper.py
Trae Bot
Upload Spider_XHS project
c481f8a
from __future__ import annotations
import json
import re
from hashlib import sha256
from datetime import datetime, timezone
from typing import Any
_URL_RE = re.compile(r"https?://\S+")
_NOTE_ID_RE = re.compile(r"/explore/([^/?#]+)")
_USER_ID_RE = re.compile(r"/user/profile/([^/?#]+)")
def clean_cell(value: Any) -> Any:
if value is None:
return None
if isinstance(value, str):
s = value.strip()
return s if s else None
return value
def clean_raw_row(raw: dict[str, Any]) -> dict[str, Any]:
cleaned: dict[str, Any] = {}
for k, v in raw.items():
key = str(k).strip() if k is not None else ""
if not key:
continue
cleaned[key] = clean_cell(v)
return cleaned
def _pick_by_headers(raw: dict[str, Any], candidates: set[str]) -> Any:
for k, v in raw.items():
if str(k).strip().lower() in candidates and v is not None:
return v
return None
def _find_first_url(raw: dict[str, Any]) -> str | None:
for v in raw.values():
if isinstance(v, str):
m = _URL_RE.search(v)
if m:
return m.group(0)
return None
def _extract_note_id(url: str) -> str | None:
m = _NOTE_ID_RE.search(url)
return m.group(1) if m else None
def _extract_user_id(url: str) -> str | None:
m = _USER_ID_RE.search(url)
return m.group(1) if m else None
def _parse_int(value: Any) -> int | None:
if value is None:
return None
if isinstance(value, bool):
return int(value)
if isinstance(value, int):
return value
if isinstance(value, float):
return int(value)
if isinstance(value, str):
s = value.strip()
if not s:
return None
s = s.replace(",", "").replace(",", "")
if s.endswith("万"):
try:
return int(float(s[:-1]) * 10000)
except Exception:
return None
if s.endswith("w"):
try:
return int(float(s[:-1]) * 10000)
except Exception:
return None
m = re.search(r"-?\d+(\.\d+)?", s)
if not m:
return None
try:
return int(float(m.group(0)))
except Exception:
return None
return None
def _parse_time_iso(value: Any) -> str | None:
if value is None:
return None
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, (int, float)) and not isinstance(value, bool):
ts = float(value)
try:
if ts > 10_000_000_000:
ts = ts / 1000.0
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
except Exception:
return None
if isinstance(value, str):
s = value.strip()
if not s:
return None
try:
return datetime.fromisoformat(s).isoformat()
except Exception:
pass
for fmt in (
"%Y-%m-%d %H:%M:%S",
"%Y/%m/%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%Y/%m/%d %H:%M",
"%Y-%m-%d",
"%Y/%m/%d",
"%Y.%m.%d %H:%M:%S",
"%Y.%m.%d %H:%M",
"%Y.%m.%d",
):
try:
return datetime.strptime(s, fmt).isoformat()
except Exception:
continue
return None
def compute_row_hash(raw: dict[str, Any]) -> str:
data = json.dumps(raw, ensure_ascii=False, sort_keys=True, default=str).encode("utf-8")
return sha256(data).hexdigest()
def map_row(raw_row: dict[str, Any]) -> tuple[dict[str, Any] | None, str | None, list[str]]:
issues: list[str] = []
raw = clean_raw_row(raw_row)
url = _pick_by_headers(
raw,
{
"url",
"link",
"note_url",
"user_url",
"note link",
"链接",
"笔记url",
"笔记链接",
"笔记地址",
"作品链接",
"作品url",
"作品地址",
"主页链接",
"用户主页",
"用户url",
},
)
if isinstance(url, str):
url = url.strip()
else:
url = None
if not url:
url = _find_first_url(raw)
query = _pick_by_headers(raw, {"query", "keyword", "关键词", "搜索词"})
if isinstance(query, str):
query = query.strip()
else:
query = None
title = _pick_by_headers(raw, {"title", "标题", "笔记标题", "作品标题"})
if isinstance(title, str):
title = title.strip()
else:
title = None
nickname = _pick_by_headers(raw, {"nickname", "author", "作者", "昵称", "作者昵称", "博主昵称", "博主"})
if isinstance(nickname, str):
nickname = nickname.strip()
else:
nickname = None
publish_time = _pick_by_headers(
raw,
{
"publish_time",
"published_at",
"发布时间",
"发布/发布时间",
"发布",
"发布时间(北京时间)",
"发布时间(北京时间)",
"上传时间",
},
)
publish_time_iso = _parse_time_iso(publish_time)
like_count = _parse_int(
_pick_by_headers(raw, {"like", "likes", "点赞", "点赞数", "赞", "赞数", "获赞", "喜欢"}),
)
comment_count = _parse_int(_pick_by_headers(raw, {"comment", "comments", "评论", "评论数"}))
collect_count = _parse_int(_pick_by_headers(raw, {"collect", "collects", "收藏", "收藏数"}))
share_count = _parse_int(_pick_by_headers(raw, {"share", "shares", "分享", "分享数", "转发", "转发数"}))
exposure = _parse_int(_pick_by_headers(raw, {"exposure", "曝光", "曝光量"}))
read_count = _parse_int(_pick_by_headers(raw, {"read", "阅读", "阅读量"}))
view_count = _parse_int(_pick_by_headers(raw, {"view", "views", "浏览", "浏览量"}))
interact_count = _parse_int(_pick_by_headers(raw, {"interact", "interaction", "互动", "互动量"}))
follow_count = _parse_int(_pick_by_headers(raw, {"follow", "转粉", "涨粉"}))
content_type = _pick_by_headers(raw, {"content_type", "内容类型", "笔记类型", "内容形式"})
if isinstance(content_type, str):
content_type = content_type.strip()
else:
content_type = None
normalized: dict[str, Any] = {}
dedup_key: str | None = None
if url and "xiaohongshu.com" in url:
note_id = _extract_note_id(url)
user_id = _extract_user_id(url)
if note_id:
normalized["kind"] = "note"
normalized["note_url"] = url
normalized["note_id"] = note_id
dedup_key = f"note:{note_id}"
elif user_id:
normalized["kind"] = "user"
normalized["user_url"] = url
normalized["user_id"] = user_id
dedup_key = f"user:{user_id}"
else:
normalized["kind"] = "url"
normalized["url"] = url
dedup_key = f"url:{url}"
elif query:
normalized["kind"] = "search"
normalized["query"] = query
dedup_key = f"search:{query}"
elif url:
normalized["kind"] = "url"
normalized["url"] = url
dedup_key = f"url:{url}"
else:
issues.append("未识别到 url 或 query")
if title:
normalized["title"] = title
if nickname:
normalized["nickname"] = nickname
normalized["author"] = nickname
if publish_time_iso:
normalized["publish_time"] = publish_time_iso
if like_count is not None:
normalized["like_count"] = like_count
if comment_count is not None:
normalized["comment_count"] = comment_count
if collect_count is not None:
normalized["collect_count"] = collect_count
if share_count is not None:
normalized["share_count"] = share_count
if exposure is not None:
normalized["exposure"] = exposure
if read_count is not None:
normalized["read"] = read_count
if view_count is not None:
normalized["view"] = view_count
if interact_count is not None:
normalized["interact"] = interact_count
if follow_count is not None:
normalized["follow"] = follow_count
if content_type:
normalized["content_type"] = content_type
if not normalized:
return None, None, issues
return normalized, dedup_key, issues