XHS / xhs_utils /data_util.py
Trae Bot
Upload Spider_XHS project
c481f8a
import json
import os
import re
import time
import openpyxl
import requests
from loguru import logger
from retry import retry
def norm_str(str):
new_str = re.sub(r"|[\\/:*?\"<>| ]+", "", str).replace('\n', '').replace('\r', '')
return new_str
def norm_text(text):
ILLEGAL_CHARACTERS_RE = re.compile(r'[\000-\010]|[\013-\014]|[\016-\037]')
text = ILLEGAL_CHARACTERS_RE.sub(r'', text)
return text
def timestamp_to_str(timestamp):
time_local = time.localtime(timestamp / 1000)
dt = time.strftime("%Y-%m-%d %H:%M:%S", time_local)
return dt
def handle_user_info(data, user_id):
home_url = f'https://www.xiaohongshu.com/user/profile/{user_id}'
nickname = data['basic_info']['nickname']
avatar = data['basic_info']['imageb']
red_id = data['basic_info']['red_id']
gender = data['basic_info']['gender']
if gender == 0:
gender = '男'
elif gender == 1:
gender = '女'
else:
gender = '未知'
ip_location = data['basic_info']['ip_location']
desc = data['basic_info']['desc']
follows = data['interactions'][0]['count']
fans = data['interactions'][1]['count']
interaction = data['interactions'][2]['count']
tags_temp = data['tags']
tags = []
for tag in tags_temp:
try:
tags.append(tag['name'])
except:
pass
return {
'user_id': user_id,
'home_url': home_url,
'nickname': nickname,
'avatar': avatar,
'red_id': red_id,
'gender': gender,
'ip_location': ip_location,
'desc': desc,
'follows': follows,
'fans': fans,
'interaction': interaction,
'tags': tags,
}
def handle_note_info(data):
note_id = data['id']
note_url = data['url']
note_type = data['note_card']['type']
if note_type == 'normal':
note_type = '图集'
else:
note_type = '视频'
user_id = data['note_card']['user']['user_id']
home_url = f'https://www.xiaohongshu.com/user/profile/{user_id}'
nickname = data['note_card']['user']['nickname']
avatar = data['note_card']['user']['avatar']
title = data['note_card']['title']
if title.strip() == '':
title = f'无标题'
desc = data['note_card']['desc']
liked_count = data['note_card']['interact_info']['liked_count']
collected_count = data['note_card']['interact_info']['collected_count']
comment_count = data['note_card']['interact_info']['comment_count']
share_count = data['note_card']['interact_info']['share_count']
image_list_temp = data['note_card']['image_list']
image_list = []
for image in image_list_temp:
try:
image_list.append(image['info_list'][1]['url'])
# success, msg, img_url = XHS_Apis.get_note_no_water_img(image['info_list'][1]['url'])
# image_list.append(img_url)
except:
pass
if note_type == '视频':
video_cover = image_list[0] if image_list else None
video_addr = None
video_info = data.get('note_card', {}).get('video', {})
streams = video_info.get('media', {}).get('stream', {}).get('h264', [])
if streams:
video_addr = streams[0].get('master_url') or streams[0].get('url')
if not video_addr and 'consumer' in video_info:
origin_key = video_info['consumer'].get('origin_video_key')
if origin_key:
video_addr = f"https://sns-video-bd.xhscdn.com/{origin_key}"
else:
video_cover = None
video_addr = None
tags_temp = data['note_card']['tag_list']
tags = []
for tag in tags_temp:
try:
tags.append(tag['name'])
except:
pass
upload_time = timestamp_to_str(data['note_card']['time'])
if 'ip_location' in data['note_card']:
ip_location = data['note_card']['ip_location']
else:
ip_location = '未知'
return {
'note_id': note_id,
'note_url': note_url,
'note_type': note_type,
'user_id': user_id,
'home_url': home_url,
'nickname': nickname,
'avatar': avatar,
'title': title,
'desc': desc,
'liked_count': liked_count,
'collected_count': collected_count,
'comment_count': comment_count,
'share_count': share_count,
'video_cover': video_cover,
'video_addr': video_addr,
'image_list': image_list,
'tags': tags,
'upload_time': upload_time,
'ip_location': ip_location,
}
def handle_comment_info(data):
note_id = data['note_id']
note_url = data['note_url']
comment_id = data['id']
user_id = data['user_info']['user_id']
home_url = f'https://www.xiaohongshu.com/user/profile/{user_id}'
nickname = data['user_info']['nickname']
avatar = data['user_info']['image']
content = data['content']
show_tags = data['show_tags']
like_count = data['like_count']
upload_time = timestamp_to_str(data['create_time'])
try:
ip_location = data['ip_location']
except:
ip_location = '未知'
pictures = []
try:
pictures_temp = data['pictures']
for picture in pictures_temp:
try:
pictures.append(picture['info_list'][1]['url'])
# success, msg, img_url = XHS_Apis.get_note_no_water_img(picture['info_list'][1]['url'])
# pictures.append(img_url)
except:
pass
except:
pass
return {
'note_id': note_id,
'note_url': note_url,
'comment_id': comment_id,
'user_id': user_id,
'home_url': home_url,
'nickname': nickname,
'avatar': avatar,
'content': content,
'show_tags': show_tags,
'like_count': like_count,
'upload_time': upload_time,
'ip_location': ip_location,
'pictures': pictures,
}
def save_to_xlsx(datas, file_path, type='note'):
wb = openpyxl.Workbook()
ws = wb.active
if type == 'note':
headers = ['笔记id', '笔记url', '笔记类型', '用户id', '用户主页url', '昵称', '头像url', '标题', '描述', '点赞数量', '收藏数量', '评论数量', '分享数量', '视频封面url', '视频地址url', '图片地址url列表', '标签', '上传时间', 'ip归属地']
elif type == 'user':
headers = ['用户id', '用户主页url', '用户名', '头像url', '小红书号', '性别', 'ip地址', '介绍', '关注数量', '粉丝数量', '作品被赞和收藏数量', '标签']
else:
headers = ['笔记id', '笔记url', '评论id', '用户id', '用户主页url', '昵称', '头像url', '评论内容', '评论标签', '点赞数量', '上传时间', 'ip归属地', '图片地址url列表']
ws.append(headers)
for data in datas:
data = {k: norm_text(str(v)) for k, v in data.items()}
ws.append(list(data.values()))
wb.save(file_path)
logger.info(f'数据保存至 {file_path}')
def download_media(path, name, url, type):
if type == 'image':
content = requests.get(url).content
with open(path + '/' + name + '.jpg', mode="wb") as f:
f.write(content)
elif type == 'video':
res = requests.get(url, stream=True)
size = 0
chunk_size = 1024 * 1024
with open(path + '/' + name + '.mp4', mode="wb") as f:
for data in res.iter_content(chunk_size=chunk_size):
f.write(data)
size += len(data)
def save_user_detail(user, path):
with open(f'{path}/detail.txt', mode="w", encoding="utf-8") as f:
# 逐行输出到txt里
f.write(f"用户id: {user['user_id']}\n")
f.write(f"用户主页url: {user['home_url']}\n")
f.write(f"用户名: {user['nickname']}\n")
f.write(f"头像url: {user['avatar']}\n")
f.write(f"小红书号: {user['red_id']}\n")
f.write(f"性别: {user['gender']}\n")
f.write(f"ip地址: {user['ip_location']}\n")
f.write(f"介绍: {user['desc']}\n")
f.write(f"关注数量: {user['follows']}\n")
f.write(f"粉丝数量: {user['fans']}\n")
f.write(f"作品被赞和收藏数量: {user['interaction']}\n")
f.write(f"标签: {user['tags']}\n")
def save_note_detail(note, path):
with open(f'{path}/detail.txt', mode="w", encoding="utf-8") as f:
# 逐行输出到txt里
f.write(f"笔记id: {note['note_id']}\n")
f.write(f"笔记url: {note['note_url']}\n")
f.write(f"笔记类型: {note['note_type']}\n")
f.write(f"用户id: {note['user_id']}\n")
f.write(f"用户主页url: {note['home_url']}\n")
f.write(f"昵称: {note['nickname']}\n")
f.write(f"头像url: {note['avatar']}\n")
f.write(f"标题: {note['title']}\n")
f.write(f"描述: {note['desc']}\n")
f.write(f"点赞数量: {note['liked_count']}\n")
f.write(f"收藏数量: {note['collected_count']}\n")
f.write(f"评论数量: {note['comment_count']}\n")
f.write(f"分享数量: {note['share_count']}\n")
f.write(f"视频封面url: {note['video_cover']}\n")
f.write(f"视频地址url: {note['video_addr']}\n")
f.write(f"图片地址url列表: {note['image_list']}\n")
f.write(f"标签: {note['tags']}\n")
f.write(f"上传时间: {note['upload_time']}\n")
f.write(f"ip归属地: {note['ip_location']}\n")
@retry(tries=3, delay=1)
def download_note(note_info, path, save_choice):
note_id = note_info['note_id']
user_id = note_info['user_id']
title = note_info['title']
title = norm_str(title)[:40]
nickname = note_info['nickname']
nickname = norm_str(nickname)[:20]
if title.strip() == '':
title = f'无标题'
save_path = f'{path}/{nickname}_{user_id}/{title}_{note_id}'
check_and_create_path(save_path)
with open(f'{save_path}/info.json', mode='w', encoding='utf-8') as f:
f.write(json.dumps(note_info) + '\n')
note_type = note_info['note_type']
save_note_detail(note_info, save_path)
if note_type == '图集' and save_choice in ['media', 'media-image', 'all']:
for img_index, img_url in enumerate(note_info['image_list']):
download_media(save_path, f'image_{img_index}', img_url, 'image')
elif note_type == '视频' and save_choice in ['media', 'media-video', 'all']:
download_media(save_path, 'cover', note_info['video_cover'], 'image')
download_media(save_path, 'video', note_info['video_addr'], 'video')
return save_path
def check_and_create_path(path):
if not os.path.exists(path):
os.makedirs(path)