File size: 8,585 Bytes
c481f8a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 | from __future__ import annotations
import json
import re
from hashlib import sha256
from datetime import datetime, timezone
from typing import Any
_URL_RE = re.compile(r"https?://\S+")
_NOTE_ID_RE = re.compile(r"/explore/([^/?#]+)")
_USER_ID_RE = re.compile(r"/user/profile/([^/?#]+)")
def clean_cell(value: Any) -> Any:
if value is None:
return None
if isinstance(value, str):
s = value.strip()
return s if s else None
return value
def clean_raw_row(raw: dict[str, Any]) -> dict[str, Any]:
cleaned: dict[str, Any] = {}
for k, v in raw.items():
key = str(k).strip() if k is not None else ""
if not key:
continue
cleaned[key] = clean_cell(v)
return cleaned
def _pick_by_headers(raw: dict[str, Any], candidates: set[str]) -> Any:
for k, v in raw.items():
if str(k).strip().lower() in candidates and v is not None:
return v
return None
def _find_first_url(raw: dict[str, Any]) -> str | None:
for v in raw.values():
if isinstance(v, str):
m = _URL_RE.search(v)
if m:
return m.group(0)
return None
def _extract_note_id(url: str) -> str | None:
m = _NOTE_ID_RE.search(url)
return m.group(1) if m else None
def _extract_user_id(url: str) -> str | None:
m = _USER_ID_RE.search(url)
return m.group(1) if m else None
def _parse_int(value: Any) -> int | None:
if value is None:
return None
if isinstance(value, bool):
return int(value)
if isinstance(value, int):
return value
if isinstance(value, float):
return int(value)
if isinstance(value, str):
s = value.strip()
if not s:
return None
s = s.replace(",", "").replace(",", "")
if s.endswith("万"):
try:
return int(float(s[:-1]) * 10000)
except Exception:
return None
if s.endswith("w"):
try:
return int(float(s[:-1]) * 10000)
except Exception:
return None
m = re.search(r"-?\d+(\.\d+)?", s)
if not m:
return None
try:
return int(float(m.group(0)))
except Exception:
return None
return None
def _parse_time_iso(value: Any) -> str | None:
if value is None:
return None
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, (int, float)) and not isinstance(value, bool):
ts = float(value)
try:
if ts > 10_000_000_000:
ts = ts / 1000.0
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
except Exception:
return None
if isinstance(value, str):
s = value.strip()
if not s:
return None
try:
return datetime.fromisoformat(s).isoformat()
except Exception:
pass
for fmt in (
"%Y-%m-%d %H:%M:%S",
"%Y/%m/%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%Y/%m/%d %H:%M",
"%Y-%m-%d",
"%Y/%m/%d",
"%Y.%m.%d %H:%M:%S",
"%Y.%m.%d %H:%M",
"%Y.%m.%d",
):
try:
return datetime.strptime(s, fmt).isoformat()
except Exception:
continue
return None
def compute_row_hash(raw: dict[str, Any]) -> str:
data = json.dumps(raw, ensure_ascii=False, sort_keys=True, default=str).encode("utf-8")
return sha256(data).hexdigest()
def map_row(raw_row: dict[str, Any]) -> tuple[dict[str, Any] | None, str | None, list[str]]:
issues: list[str] = []
raw = clean_raw_row(raw_row)
url = _pick_by_headers(
raw,
{
"url",
"link",
"note_url",
"user_url",
"note link",
"链接",
"笔记url",
"笔记链接",
"笔记地址",
"作品链接",
"作品url",
"作品地址",
"主页链接",
"用户主页",
"用户url",
},
)
if isinstance(url, str):
url = url.strip()
else:
url = None
if not url:
url = _find_first_url(raw)
query = _pick_by_headers(raw, {"query", "keyword", "关键词", "搜索词"})
if isinstance(query, str):
query = query.strip()
else:
query = None
title = _pick_by_headers(raw, {"title", "标题", "笔记标题", "作品标题"})
if isinstance(title, str):
title = title.strip()
else:
title = None
nickname = _pick_by_headers(raw, {"nickname", "author", "作者", "昵称", "作者昵称", "博主昵称", "博主"})
if isinstance(nickname, str):
nickname = nickname.strip()
else:
nickname = None
publish_time = _pick_by_headers(
raw,
{
"publish_time",
"published_at",
"发布时间",
"发布/发布时间",
"发布",
"发布时间(北京时间)",
"发布时间(北京时间)",
"上传时间",
},
)
publish_time_iso = _parse_time_iso(publish_time)
like_count = _parse_int(
_pick_by_headers(raw, {"like", "likes", "点赞", "点赞数", "赞", "赞数", "获赞", "喜欢"}),
)
comment_count = _parse_int(_pick_by_headers(raw, {"comment", "comments", "评论", "评论数"}))
collect_count = _parse_int(_pick_by_headers(raw, {"collect", "collects", "收藏", "收藏数"}))
share_count = _parse_int(_pick_by_headers(raw, {"share", "shares", "分享", "分享数", "转发", "转发数"}))
exposure = _parse_int(_pick_by_headers(raw, {"exposure", "曝光", "曝光量"}))
read_count = _parse_int(_pick_by_headers(raw, {"read", "阅读", "阅读量"}))
view_count = _parse_int(_pick_by_headers(raw, {"view", "views", "浏览", "浏览量"}))
interact_count = _parse_int(_pick_by_headers(raw, {"interact", "interaction", "互动", "互动量"}))
follow_count = _parse_int(_pick_by_headers(raw, {"follow", "转粉", "涨粉"}))
content_type = _pick_by_headers(raw, {"content_type", "内容类型", "笔记类型", "内容形式"})
if isinstance(content_type, str):
content_type = content_type.strip()
else:
content_type = None
normalized: dict[str, Any] = {}
dedup_key: str | None = None
if url and "xiaohongshu.com" in url:
note_id = _extract_note_id(url)
user_id = _extract_user_id(url)
if note_id:
normalized["kind"] = "note"
normalized["note_url"] = url
normalized["note_id"] = note_id
dedup_key = f"note:{note_id}"
elif user_id:
normalized["kind"] = "user"
normalized["user_url"] = url
normalized["user_id"] = user_id
dedup_key = f"user:{user_id}"
else:
normalized["kind"] = "url"
normalized["url"] = url
dedup_key = f"url:{url}"
elif query:
normalized["kind"] = "search"
normalized["query"] = query
dedup_key = f"search:{query}"
elif url:
normalized["kind"] = "url"
normalized["url"] = url
dedup_key = f"url:{url}"
else:
issues.append("未识别到 url 或 query")
if title:
normalized["title"] = title
if nickname:
normalized["nickname"] = nickname
normalized["author"] = nickname
if publish_time_iso:
normalized["publish_time"] = publish_time_iso
if like_count is not None:
normalized["like_count"] = like_count
if comment_count is not None:
normalized["comment_count"] = comment_count
if collect_count is not None:
normalized["collect_count"] = collect_count
if share_count is not None:
normalized["share_count"] = share_count
if exposure is not None:
normalized["exposure"] = exposure
if read_count is not None:
normalized["read"] = read_count
if view_count is not None:
normalized["view"] = view_count
if interact_count is not None:
normalized["interact"] = interact_count
if follow_count is not None:
normalized["follow"] = follow_count
if content_type:
normalized["content_type"] = content_type
if not normalized:
return None, None, issues
return normalized, dedup_key, issues
|