| import json |
| import os |
| import re |
| import time |
| import openpyxl |
| import requests |
| from loguru import logger |
| from retry import retry |
|
|
|
|
| def norm_str(str): |
| new_str = re.sub(r"|[\\/:*?\"<>| ]+", "", str).replace('\n', '').replace('\r', '') |
| return new_str |
|
|
| def norm_text(text): |
| ILLEGAL_CHARACTERS_RE = re.compile(r'[\000-\010]|[\013-\014]|[\016-\037]') |
| text = ILLEGAL_CHARACTERS_RE.sub(r'', text) |
| return text |
|
|
|
|
| def timestamp_to_str(timestamp): |
| time_local = time.localtime(timestamp / 1000) |
| dt = time.strftime("%Y-%m-%d %H:%M:%S", time_local) |
| return dt |
|
|
| def handle_user_info(data, user_id): |
| home_url = f'https://www.xiaohongshu.com/user/profile/{user_id}' |
| nickname = data['basic_info']['nickname'] |
| avatar = data['basic_info']['imageb'] |
| red_id = data['basic_info']['red_id'] |
| gender = data['basic_info']['gender'] |
| if gender == 0: |
| gender = '男' |
| elif gender == 1: |
| gender = '女' |
| else: |
| gender = '未知' |
| ip_location = data['basic_info']['ip_location'] |
| desc = data['basic_info']['desc'] |
| follows = data['interactions'][0]['count'] |
| fans = data['interactions'][1]['count'] |
| interaction = data['interactions'][2]['count'] |
| tags_temp = data['tags'] |
| tags = [] |
| for tag in tags_temp: |
| try: |
| tags.append(tag['name']) |
| except: |
| pass |
| return { |
| 'user_id': user_id, |
| 'home_url': home_url, |
| 'nickname': nickname, |
| 'avatar': avatar, |
| 'red_id': red_id, |
| 'gender': gender, |
| 'ip_location': ip_location, |
| 'desc': desc, |
| 'follows': follows, |
| 'fans': fans, |
| 'interaction': interaction, |
| 'tags': tags, |
| } |
|
|
| def handle_note_info(data): |
| note_id = data['id'] |
| note_url = data['url'] |
| note_type = data['note_card']['type'] |
| if note_type == 'normal': |
| note_type = '图集' |
| else: |
| note_type = '视频' |
| user_id = data['note_card']['user']['user_id'] |
| home_url = f'https://www.xiaohongshu.com/user/profile/{user_id}' |
| nickname = data['note_card']['user']['nickname'] |
| avatar = data['note_card']['user']['avatar'] |
| title = data['note_card']['title'] |
| if title.strip() == '': |
| title = f'无标题' |
| desc = data['note_card']['desc'] |
| liked_count = data['note_card']['interact_info']['liked_count'] |
| collected_count = data['note_card']['interact_info']['collected_count'] |
| comment_count = data['note_card']['interact_info']['comment_count'] |
| share_count = data['note_card']['interact_info']['share_count'] |
| image_list_temp = data['note_card']['image_list'] |
| image_list = [] |
| for image in image_list_temp: |
| try: |
| image_list.append(image['info_list'][1]['url']) |
| |
| |
| except: |
| pass |
| if note_type == '视频': |
| video_cover = image_list[0] if image_list else None |
| video_addr = None |
| video_info = data.get('note_card', {}).get('video', {}) |
| streams = video_info.get('media', {}).get('stream', {}).get('h264', []) |
| if streams: |
| video_addr = streams[0].get('master_url') or streams[0].get('url') |
| if not video_addr and 'consumer' in video_info: |
| origin_key = video_info['consumer'].get('origin_video_key') |
| if origin_key: |
| video_addr = f"https://sns-video-bd.xhscdn.com/{origin_key}" |
| else: |
| video_cover = None |
| video_addr = None |
| tags_temp = data['note_card']['tag_list'] |
| tags = [] |
| for tag in tags_temp: |
| try: |
| tags.append(tag['name']) |
| except: |
| pass |
| upload_time = timestamp_to_str(data['note_card']['time']) |
| if 'ip_location' in data['note_card']: |
| ip_location = data['note_card']['ip_location'] |
| else: |
| ip_location = '未知' |
| return { |
| 'note_id': note_id, |
| 'note_url': note_url, |
| 'note_type': note_type, |
| 'user_id': user_id, |
| 'home_url': home_url, |
| 'nickname': nickname, |
| 'avatar': avatar, |
| 'title': title, |
| 'desc': desc, |
| 'liked_count': liked_count, |
| 'collected_count': collected_count, |
| 'comment_count': comment_count, |
| 'share_count': share_count, |
| 'video_cover': video_cover, |
| 'video_addr': video_addr, |
| 'image_list': image_list, |
| 'tags': tags, |
| 'upload_time': upload_time, |
| 'ip_location': ip_location, |
| } |
|
|
| def handle_comment_info(data): |
| note_id = data['note_id'] |
| note_url = data['note_url'] |
| comment_id = data['id'] |
| user_id = data['user_info']['user_id'] |
| home_url = f'https://www.xiaohongshu.com/user/profile/{user_id}' |
| nickname = data['user_info']['nickname'] |
| avatar = data['user_info']['image'] |
| content = data['content'] |
| show_tags = data['show_tags'] |
| like_count = data['like_count'] |
| upload_time = timestamp_to_str(data['create_time']) |
| try: |
| ip_location = data['ip_location'] |
| except: |
| ip_location = '未知' |
| pictures = [] |
| try: |
| pictures_temp = data['pictures'] |
| for picture in pictures_temp: |
| try: |
| pictures.append(picture['info_list'][1]['url']) |
| |
| |
| except: |
| pass |
| except: |
| pass |
| return { |
| 'note_id': note_id, |
| 'note_url': note_url, |
| 'comment_id': comment_id, |
| 'user_id': user_id, |
| 'home_url': home_url, |
| 'nickname': nickname, |
| 'avatar': avatar, |
| 'content': content, |
| 'show_tags': show_tags, |
| 'like_count': like_count, |
| 'upload_time': upload_time, |
| 'ip_location': ip_location, |
| 'pictures': pictures, |
| } |
| def save_to_xlsx(datas, file_path, type='note'): |
| wb = openpyxl.Workbook() |
| ws = wb.active |
| if type == 'note': |
| headers = ['笔记id', '笔记url', '笔记类型', '用户id', '用户主页url', '昵称', '头像url', '标题', '描述', '点赞数量', '收藏数量', '评论数量', '分享数量', '视频封面url', '视频地址url', '图片地址url列表', '标签', '上传时间', 'ip归属地'] |
| elif type == 'user': |
| headers = ['用户id', '用户主页url', '用户名', '头像url', '小红书号', '性别', 'ip地址', '介绍', '关注数量', '粉丝数量', '作品被赞和收藏数量', '标签'] |
| else: |
| headers = ['笔记id', '笔记url', '评论id', '用户id', '用户主页url', '昵称', '头像url', '评论内容', '评论标签', '点赞数量', '上传时间', 'ip归属地', '图片地址url列表'] |
| ws.append(headers) |
| for data in datas: |
| data = {k: norm_text(str(v)) for k, v in data.items()} |
| ws.append(list(data.values())) |
| wb.save(file_path) |
| logger.info(f'数据保存至 {file_path}') |
|
|
| def download_media(path, name, url, type): |
| if type == 'image': |
| content = requests.get(url).content |
| with open(path + '/' + name + '.jpg', mode="wb") as f: |
| f.write(content) |
| elif type == 'video': |
| res = requests.get(url, stream=True) |
| size = 0 |
| chunk_size = 1024 * 1024 |
| with open(path + '/' + name + '.mp4', mode="wb") as f: |
| for data in res.iter_content(chunk_size=chunk_size): |
| f.write(data) |
| size += len(data) |
|
|
| def save_user_detail(user, path): |
| with open(f'{path}/detail.txt', mode="w", encoding="utf-8") as f: |
| |
| f.write(f"用户id: {user['user_id']}\n") |
| f.write(f"用户主页url: {user['home_url']}\n") |
| f.write(f"用户名: {user['nickname']}\n") |
| f.write(f"头像url: {user['avatar']}\n") |
| f.write(f"小红书号: {user['red_id']}\n") |
| f.write(f"性别: {user['gender']}\n") |
| f.write(f"ip地址: {user['ip_location']}\n") |
| f.write(f"介绍: {user['desc']}\n") |
| f.write(f"关注数量: {user['follows']}\n") |
| f.write(f"粉丝数量: {user['fans']}\n") |
| f.write(f"作品被赞和收藏数量: {user['interaction']}\n") |
| f.write(f"标签: {user['tags']}\n") |
|
|
| def save_note_detail(note, path): |
| with open(f'{path}/detail.txt', mode="w", encoding="utf-8") as f: |
| |
| f.write(f"笔记id: {note['note_id']}\n") |
| f.write(f"笔记url: {note['note_url']}\n") |
| f.write(f"笔记类型: {note['note_type']}\n") |
| f.write(f"用户id: {note['user_id']}\n") |
| f.write(f"用户主页url: {note['home_url']}\n") |
| f.write(f"昵称: {note['nickname']}\n") |
| f.write(f"头像url: {note['avatar']}\n") |
| f.write(f"标题: {note['title']}\n") |
| f.write(f"描述: {note['desc']}\n") |
| f.write(f"点赞数量: {note['liked_count']}\n") |
| f.write(f"收藏数量: {note['collected_count']}\n") |
| f.write(f"评论数量: {note['comment_count']}\n") |
| f.write(f"分享数量: {note['share_count']}\n") |
| f.write(f"视频封面url: {note['video_cover']}\n") |
| f.write(f"视频地址url: {note['video_addr']}\n") |
| f.write(f"图片地址url列表: {note['image_list']}\n") |
| f.write(f"标签: {note['tags']}\n") |
| f.write(f"上传时间: {note['upload_time']}\n") |
| f.write(f"ip归属地: {note['ip_location']}\n") |
|
|
|
|
|
|
| @retry(tries=3, delay=1) |
| def download_note(note_info, path, save_choice): |
| note_id = note_info['note_id'] |
| user_id = note_info['user_id'] |
| title = note_info['title'] |
| title = norm_str(title)[:40] |
| nickname = note_info['nickname'] |
| nickname = norm_str(nickname)[:20] |
| if title.strip() == '': |
| title = f'无标题' |
| save_path = f'{path}/{nickname}_{user_id}/{title}_{note_id}' |
| check_and_create_path(save_path) |
| with open(f'{save_path}/info.json', mode='w', encoding='utf-8') as f: |
| f.write(json.dumps(note_info) + '\n') |
| note_type = note_info['note_type'] |
| save_note_detail(note_info, save_path) |
| if note_type == '图集' and save_choice in ['media', 'media-image', 'all']: |
| for img_index, img_url in enumerate(note_info['image_list']): |
| download_media(save_path, f'image_{img_index}', img_url, 'image') |
| elif note_type == '视频' and save_choice in ['media', 'media-video', 'all']: |
| download_media(save_path, 'cover', note_info['video_cover'], 'image') |
| download_media(save_path, 'video', note_info['video_addr'], 'video') |
| return save_path |
|
|
|
|
| def check_and_create_path(path): |
| if not os.path.exists(path): |
| os.makedirs(path) |
|
|