from __future__ import annotations import json import re from hashlib import sha256 from datetime import datetime, timezone from typing import Any _URL_RE = re.compile(r"https?://\S+") _NOTE_ID_RE = re.compile(r"/explore/([^/?#]+)") _USER_ID_RE = re.compile(r"/user/profile/([^/?#]+)") def clean_cell(value: Any) -> Any: if value is None: return None if isinstance(value, str): s = value.strip() return s if s else None return value def clean_raw_row(raw: dict[str, Any]) -> dict[str, Any]: cleaned: dict[str, Any] = {} for k, v in raw.items(): key = str(k).strip() if k is not None else "" if not key: continue cleaned[key] = clean_cell(v) return cleaned def _pick_by_headers(raw: dict[str, Any], candidates: set[str]) -> Any: for k, v in raw.items(): if str(k).strip().lower() in candidates and v is not None: return v return None def _find_first_url(raw: dict[str, Any]) -> str | None: for v in raw.values(): if isinstance(v, str): m = _URL_RE.search(v) if m: return m.group(0) return None def _extract_note_id(url: str) -> str | None: m = _NOTE_ID_RE.search(url) return m.group(1) if m else None def _extract_user_id(url: str) -> str | None: m = _USER_ID_RE.search(url) return m.group(1) if m else None def _parse_int(value: Any) -> int | None: if value is None: return None if isinstance(value, bool): return int(value) if isinstance(value, int): return value if isinstance(value, float): return int(value) if isinstance(value, str): s = value.strip() if not s: return None s = s.replace(",", "").replace(",", "") if s.endswith("万"): try: return int(float(s[:-1]) * 10000) except Exception: return None if s.endswith("w"): try: return int(float(s[:-1]) * 10000) except Exception: return None m = re.search(r"-?\d+(\.\d+)?", s) if not m: return None try: return int(float(m.group(0))) except Exception: return None return None def _parse_time_iso(value: Any) -> str | None: if value is None: return None if isinstance(value, datetime): return value.isoformat() if isinstance(value, (int, float)) and not isinstance(value, bool): ts = float(value) try: if ts > 10_000_000_000: ts = ts / 1000.0 return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() except Exception: return None if isinstance(value, str): s = value.strip() if not s: return None try: return datetime.fromisoformat(s).isoformat() except Exception: pass for fmt in ( "%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y/%m/%d %H:%M", "%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d %H:%M:%S", "%Y.%m.%d %H:%M", "%Y.%m.%d", ): try: return datetime.strptime(s, fmt).isoformat() except Exception: continue return None def compute_row_hash(raw: dict[str, Any]) -> str: data = json.dumps(raw, ensure_ascii=False, sort_keys=True, default=str).encode("utf-8") return sha256(data).hexdigest() def map_row(raw_row: dict[str, Any]) -> tuple[dict[str, Any] | None, str | None, list[str]]: issues: list[str] = [] raw = clean_raw_row(raw_row) url = _pick_by_headers( raw, { "url", "link", "note_url", "user_url", "note link", "链接", "笔记url", "笔记链接", "笔记地址", "作品链接", "作品url", "作品地址", "主页链接", "用户主页", "用户url", }, ) if isinstance(url, str): url = url.strip() else: url = None if not url: url = _find_first_url(raw) query = _pick_by_headers(raw, {"query", "keyword", "关键词", "搜索词"}) if isinstance(query, str): query = query.strip() else: query = None title = _pick_by_headers(raw, {"title", "标题", "笔记标题", "作品标题"}) if isinstance(title, str): title = title.strip() else: title = None nickname = _pick_by_headers(raw, {"nickname", "author", "作者", "昵称", "作者昵称", "博主昵称", "博主"}) if isinstance(nickname, str): nickname = nickname.strip() else: nickname = None publish_time = _pick_by_headers( raw, { "publish_time", "published_at", "发布时间", "发布/发布时间", "发布", "发布时间(北京时间)", "发布时间(北京时间)", "上传时间", }, ) publish_time_iso = _parse_time_iso(publish_time) like_count = _parse_int( _pick_by_headers(raw, {"like", "likes", "点赞", "点赞数", "赞", "赞数", "获赞", "喜欢"}), ) comment_count = _parse_int(_pick_by_headers(raw, {"comment", "comments", "评论", "评论数"})) collect_count = _parse_int(_pick_by_headers(raw, {"collect", "collects", "收藏", "收藏数"})) share_count = _parse_int(_pick_by_headers(raw, {"share", "shares", "分享", "分享数", "转发", "转发数"})) exposure = _parse_int(_pick_by_headers(raw, {"exposure", "曝光", "曝光量"})) read_count = _parse_int(_pick_by_headers(raw, {"read", "阅读", "阅读量"})) view_count = _parse_int(_pick_by_headers(raw, {"view", "views", "浏览", "浏览量"})) interact_count = _parse_int(_pick_by_headers(raw, {"interact", "interaction", "互动", "互动量"})) follow_count = _parse_int(_pick_by_headers(raw, {"follow", "转粉", "涨粉"})) content_type = _pick_by_headers(raw, {"content_type", "内容类型", "笔记类型", "内容形式"}) if isinstance(content_type, str): content_type = content_type.strip() else: content_type = None normalized: dict[str, Any] = {} dedup_key: str | None = None if url and "xiaohongshu.com" in url: note_id = _extract_note_id(url) user_id = _extract_user_id(url) if note_id: normalized["kind"] = "note" normalized["note_url"] = url normalized["note_id"] = note_id dedup_key = f"note:{note_id}" elif user_id: normalized["kind"] = "user" normalized["user_url"] = url normalized["user_id"] = user_id dedup_key = f"user:{user_id}" else: normalized["kind"] = "url" normalized["url"] = url dedup_key = f"url:{url}" elif query: normalized["kind"] = "search" normalized["query"] = query dedup_key = f"search:{query}" elif url: normalized["kind"] = "url" normalized["url"] = url dedup_key = f"url:{url}" else: issues.append("未识别到 url 或 query") if title: normalized["title"] = title if nickname: normalized["nickname"] = nickname normalized["author"] = nickname if publish_time_iso: normalized["publish_time"] = publish_time_iso if like_count is not None: normalized["like_count"] = like_count if comment_count is not None: normalized["comment_count"] = comment_count if collect_count is not None: normalized["collect_count"] = collect_count if share_count is not None: normalized["share_count"] = share_count if exposure is not None: normalized["exposure"] = exposure if read_count is not None: normalized["read"] = read_count if view_count is not None: normalized["view"] = view_count if interact_count is not None: normalized["interact"] = interact_count if follow_count is not None: normalized["follow"] = follow_count if content_type: normalized["content_type"] = content_type if not normalized: return None, None, issues return normalized, dedup_key, issues