| from __future__ import annotations |
|
|
| import json |
| import re |
| from hashlib import sha256 |
| from datetime import datetime, timezone |
| from typing import Any |
|
|
|
|
| _URL_RE = re.compile(r"https?://\S+") |
| _NOTE_ID_RE = re.compile(r"/explore/([^/?#]+)") |
| _USER_ID_RE = re.compile(r"/user/profile/([^/?#]+)") |
|
|
|
|
| def clean_cell(value: Any) -> Any: |
| if value is None: |
| return None |
| if isinstance(value, str): |
| s = value.strip() |
| return s if s else None |
| return value |
|
|
|
|
| def clean_raw_row(raw: dict[str, Any]) -> dict[str, Any]: |
| cleaned: dict[str, Any] = {} |
| for k, v in raw.items(): |
| key = str(k).strip() if k is not None else "" |
| if not key: |
| continue |
| cleaned[key] = clean_cell(v) |
| return cleaned |
|
|
|
|
| def _pick_by_headers(raw: dict[str, Any], candidates: set[str]) -> Any: |
| for k, v in raw.items(): |
| if str(k).strip().lower() in candidates and v is not None: |
| return v |
| return None |
|
|
|
|
| def _find_first_url(raw: dict[str, Any]) -> str | None: |
| for v in raw.values(): |
| if isinstance(v, str): |
| m = _URL_RE.search(v) |
| if m: |
| return m.group(0) |
| return None |
|
|
|
|
| def _extract_note_id(url: str) -> str | None: |
| m = _NOTE_ID_RE.search(url) |
| return m.group(1) if m else None |
|
|
|
|
| def _extract_user_id(url: str) -> str | None: |
| m = _USER_ID_RE.search(url) |
| return m.group(1) if m else None |
|
|
|
|
| def _parse_int(value: Any) -> int | None: |
| if value is None: |
| return None |
| if isinstance(value, bool): |
| return int(value) |
| if isinstance(value, int): |
| return value |
| if isinstance(value, float): |
| return int(value) |
| if isinstance(value, str): |
| s = value.strip() |
| if not s: |
| return None |
| s = s.replace(",", "").replace(",", "") |
| if s.endswith("万"): |
| try: |
| return int(float(s[:-1]) * 10000) |
| except Exception: |
| return None |
| if s.endswith("w"): |
| try: |
| return int(float(s[:-1]) * 10000) |
| except Exception: |
| return None |
| m = re.search(r"-?\d+(\.\d+)?", s) |
| if not m: |
| return None |
| try: |
| return int(float(m.group(0))) |
| except Exception: |
| return None |
| return None |
|
|
|
|
| def _parse_time_iso(value: Any) -> str | None: |
| if value is None: |
| return None |
| if isinstance(value, datetime): |
| return value.isoformat() |
| if isinstance(value, (int, float)) and not isinstance(value, bool): |
| ts = float(value) |
| try: |
| if ts > 10_000_000_000: |
| ts = ts / 1000.0 |
| return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() |
| except Exception: |
| return None |
| if isinstance(value, str): |
| s = value.strip() |
| if not s: |
| return None |
| try: |
| return datetime.fromisoformat(s).isoformat() |
| except Exception: |
| pass |
| for fmt in ( |
| "%Y-%m-%d %H:%M:%S", |
| "%Y/%m/%d %H:%M:%S", |
| "%Y-%m-%d %H:%M", |
| "%Y/%m/%d %H:%M", |
| "%Y-%m-%d", |
| "%Y/%m/%d", |
| "%Y.%m.%d %H:%M:%S", |
| "%Y.%m.%d %H:%M", |
| "%Y.%m.%d", |
| ): |
| try: |
| return datetime.strptime(s, fmt).isoformat() |
| except Exception: |
| continue |
| return None |
|
|
|
|
| def compute_row_hash(raw: dict[str, Any]) -> str: |
| data = json.dumps(raw, ensure_ascii=False, sort_keys=True, default=str).encode("utf-8") |
| return sha256(data).hexdigest() |
|
|
|
|
| def map_row(raw_row: dict[str, Any]) -> tuple[dict[str, Any] | None, str | None, list[str]]: |
| issues: list[str] = [] |
| raw = clean_raw_row(raw_row) |
|
|
| url = _pick_by_headers( |
| raw, |
| { |
| "url", |
| "link", |
| "note_url", |
| "user_url", |
| "note link", |
| "链接", |
| "笔记url", |
| "笔记链接", |
| "笔记地址", |
| "作品链接", |
| "作品url", |
| "作品地址", |
| "主页链接", |
| "用户主页", |
| "用户url", |
| }, |
| ) |
| if isinstance(url, str): |
| url = url.strip() |
| else: |
| url = None |
|
|
| if not url: |
| url = _find_first_url(raw) |
|
|
| query = _pick_by_headers(raw, {"query", "keyword", "关键词", "搜索词"}) |
| if isinstance(query, str): |
| query = query.strip() |
| else: |
| query = None |
|
|
| title = _pick_by_headers(raw, {"title", "标题", "笔记标题", "作品标题"}) |
| if isinstance(title, str): |
| title = title.strip() |
| else: |
| title = None |
|
|
| nickname = _pick_by_headers(raw, {"nickname", "author", "作者", "昵称", "作者昵称", "博主昵称", "博主"}) |
| if isinstance(nickname, str): |
| nickname = nickname.strip() |
| else: |
| nickname = None |
|
|
| publish_time = _pick_by_headers( |
| raw, |
| { |
| "publish_time", |
| "published_at", |
| "发布时间", |
| "发布/发布时间", |
| "发布", |
| "发布时间(北京时间)", |
| "发布时间(北京时间)", |
| "上传时间", |
| }, |
| ) |
| publish_time_iso = _parse_time_iso(publish_time) |
|
|
| like_count = _parse_int( |
| _pick_by_headers(raw, {"like", "likes", "点赞", "点赞数", "赞", "赞数", "获赞", "喜欢"}), |
| ) |
| comment_count = _parse_int(_pick_by_headers(raw, {"comment", "comments", "评论", "评论数"})) |
| collect_count = _parse_int(_pick_by_headers(raw, {"collect", "collects", "收藏", "收藏数"})) |
| share_count = _parse_int(_pick_by_headers(raw, {"share", "shares", "分享", "分享数", "转发", "转发数"})) |
|
|
| exposure = _parse_int(_pick_by_headers(raw, {"exposure", "曝光", "曝光量"})) |
| read_count = _parse_int(_pick_by_headers(raw, {"read", "阅读", "阅读量"})) |
| view_count = _parse_int(_pick_by_headers(raw, {"view", "views", "浏览", "浏览量"})) |
| interact_count = _parse_int(_pick_by_headers(raw, {"interact", "interaction", "互动", "互动量"})) |
| follow_count = _parse_int(_pick_by_headers(raw, {"follow", "转粉", "涨粉"})) |
|
|
| content_type = _pick_by_headers(raw, {"content_type", "内容类型", "笔记类型", "内容形式"}) |
| if isinstance(content_type, str): |
| content_type = content_type.strip() |
| else: |
| content_type = None |
|
|
| normalized: dict[str, Any] = {} |
| dedup_key: str | None = None |
|
|
| if url and "xiaohongshu.com" in url: |
| note_id = _extract_note_id(url) |
| user_id = _extract_user_id(url) |
| if note_id: |
| normalized["kind"] = "note" |
| normalized["note_url"] = url |
| normalized["note_id"] = note_id |
| dedup_key = f"note:{note_id}" |
| elif user_id: |
| normalized["kind"] = "user" |
| normalized["user_url"] = url |
| normalized["user_id"] = user_id |
| dedup_key = f"user:{user_id}" |
| else: |
| normalized["kind"] = "url" |
| normalized["url"] = url |
| dedup_key = f"url:{url}" |
| elif query: |
| normalized["kind"] = "search" |
| normalized["query"] = query |
| dedup_key = f"search:{query}" |
| elif url: |
| normalized["kind"] = "url" |
| normalized["url"] = url |
| dedup_key = f"url:{url}" |
| else: |
| issues.append("未识别到 url 或 query") |
|
|
| if title: |
| normalized["title"] = title |
| if nickname: |
| normalized["nickname"] = nickname |
| normalized["author"] = nickname |
|
|
| if publish_time_iso: |
| normalized["publish_time"] = publish_time_iso |
| if like_count is not None: |
| normalized["like_count"] = like_count |
| if comment_count is not None: |
| normalized["comment_count"] = comment_count |
| if collect_count is not None: |
| normalized["collect_count"] = collect_count |
| if share_count is not None: |
| normalized["share_count"] = share_count |
| if exposure is not None: |
| normalized["exposure"] = exposure |
| if read_count is not None: |
| normalized["read"] = read_count |
| if view_count is not None: |
| normalized["view"] = view_count |
| if interact_count is not None: |
| normalized["interact"] = interact_count |
| if follow_count is not None: |
| normalized["follow"] = follow_count |
| if content_type: |
| normalized["content_type"] = content_type |
| if not normalized: |
| return None, None, issues |
|
|
| return normalized, dedup_key, issues |
|
|
|
|