Python爬取小红书用户主页帖子附完整代码(2024.11.28)
声明:本项目仅供学习参考使用,请勿做任何商业化乃至违法行为
随着小红书成为越来越多用户分享和获取内容的热门平台,针对其数据的爬取需求也随之增加。本文将分享如何通过Python脚本爬取小红书用户数据,包括用户的帖子信息,并实现图片的批量下载,并根据用户ID和帖子ID分组存储,支持多账号切换,突破了频率访问限制。
废话不多说,先上结果:
1. 项目背景
本项目实现了对小红书用户数据的爬取,支持以下功能:
- 批量爬取多个用户的帖子信息。
- 对爬取的帖子进行数据解析,提取标题、内容、点赞数、评论数、收藏数、转发数以及话题等信息。
- 自动下载帖子中的图片,并按照用户ID和帖子ID进行分组存储。
- 支持多账号轮换,规避频率访问限制。
2. 环境准备
在开始之前,请确保已经安装了以下必要的Python库:
pip install requests pandas PyExecJS loguru openpyxl
3. 核心功能解析
3.1 爬取用户数据
通过requests
模块,我们向小红书的用户接口发起GET请求,从响应中提取数据。为了提高成功率,脚本采用多组cookies
轮换请求。
url = "https://edith.xiaohongshu.com/api/sns/web/v1/user_posted"
params = {
"num": "30",
"cursor": "",
"user_id": user_id,
"image_formats": "jpg,webp,avif",
"xsec_token": "",
"xsec_source": "pc_note"
}
response = requests.get(url, headers=headers, cookies=current_cookies, params=params)
if response.status_code == 200 and response.json().get('success') == True:
data_page = response.json()
notes = data_page.get('data', {}).get('notes', [])
has_more = data_page.get('data', {}).get('has_more', False)
3.2 数据解析与存储
爬取的JSON数据包含用户帖子的信息。我们通过parse_data
方法解析数据,提取有用的字段,并存储为CSV文件。
def parse_data(data):
items = data.get('data', {}).get('items', [])
parsed_info = []
for item in items:
note = item.get('note_card', {})
parsed_info.append({
'标题': note.get('title', ''),
'内容': note.get('desc', '').strip(),
'点赞数': note.get('interact_info', {}).get('liked_count', 0),
'评论数': note.get('interact_info', {}).get('comment_count', 0),
'收藏数': note.get('interact_info', {}).get('collected_count', 0),
'转发数': note.get('interact_info', {}).get('share_count', 0),
'话题': [word.strip('#') for word in note.get('desc', '').split() if '[话题]' in word]
})
return parsed_info
3.3 图片下载
帖子中的图片信息可以通过解析接口返回的数据获取。脚本实现了图片的批量下载,并将图片按用户ID和帖子ID分类存储。
def download_img(data, img_path, user_id, note_id):
image_list = data["data"]["items"][0]["note_card"]["image_list"]
image_urls = [img["url_default"] for img in image_list]
output_dir = f"./{img_path}/{user_id}/{note_id}"
os.makedirs(output_dir, exist_ok=True)
for idx, url in enumerate(image_urls):
try:
response = requests.get(url)
if response.status_code == 200:
with open(os.path.join(output_dir, f"image_{idx + 1}.jpg"), "wb") as f:
f.write(response.content)
except Exception as e:
print(f"图片下载出错: {e}")
4. 突破频率限制
为应对小红书的频率限制,脚本采用以下策略:
- 多Cookies轮换: 通过
cookies_list
提供多组登录信息,切换访问身份。 - 动态签名: 利用JavaScript脚本生成接口调用所需的签名,确保请求合法。
def update_headers(api, data, current_cookies):
with open('1.js', 'r', encoding='utf-8') as f:
js_script = f.read()
context = execjs.compile(js_script)
sign = context.call('getXs', api, data, current_cookies['a1'])
return sign
5. 主程序实现
主程序负责读取用户ID列表,按顺序调用上述功能,逐一爬取用户数据并保存为CSV文件。
def main(file_path, cookies_list, headers_init, img_path, output_file_path):
data = pd.read_excel(file_path)
id_data = data['用户id']
for user_id in id_data:
logger.info(f'正在爬取用户 {user_id} 的帖子信息')
# 逐用户爬取数据
# 保存数据到 CSV 文件
6.结果展示
7. 注意事项
- 合法使用: 爬取数据仅供学习研究使用,请勿用于非法用途。
- 数据敏感性: 确保对爬取的数据进行妥善处理,避免泄露用户隐私。
- 防止封禁: 请控制请求频率,避免对目标服务器造成压力。
8. 结语
本文介绍了如何通过Python爬取小红书用户数据并实现图片的批量下载,希望对您有所帮助。如果您有任何疑问或改进建议,欢迎在评论区留言。
如有任何问题可以联系:zx_luckfe
9.完整代码:
import time
import execjs
import urllib.parse
from loguru import logger
import pandas as pd
import os
from fetch_note_detail import fetch_xiaohongshu_data
import requests
def convert_to_int(value):
if '万' in value:
value = value.replace('万', '')
return float(value) * 10000 # 转换为万单位的整数
else:
return value
def parse_data(data):
items = data.get('data', {}).get('items', [])
parsed_info = []
for item in items:
note = item.get('note_card', {})
title = note.get('title', '')
desc = note.get('desc', '')
topics = [word.strip('#').replace('[话题]', '').strip() for word in desc.split() if '[话题]' in word]
desc_cleaned = ' '.join([word for word in desc.split() if '[话题]' not in word]).strip()
interact_info = note.get('interact_info', {})
liked_count = interact_info.get('liked_count', 0)
comment_count = interact_info.get('comment_count', 0)
collected_count = interact_info.get('collected_count', 0)
share_count = interact_info.get('share_count', 0)
parsed_info.append({
'标题': title,
'内容': desc_cleaned,
'点赞数': liked_count,
'评论数': comment_count,
'收藏数': collected_count,
'转发数': share_count,
'话题': topics
})
return parsed_info
def download_img(data,img_path,user_id,note_id):
image_list = data["data"]["items"][0]["note_card"]["image_list"]
image_urls = [img["url_default"] for img in image_list]
output_dir = f"./{img_path}/{user_id}/{note_id}"
os.makedirs(output_dir, exist_ok=True)
for idx, url in enumerate(image_urls):
image_path = os.path.join(output_dir, f"image_{idx + 1}.jpg")
try:
response = requests.get(url)
if response.status_code == 200:
with open(image_path, "wb") as f:
f.write(response.content)
print(f"图片已下载: {image_path}")
else:
print(f"下载失败,状态码: {response.status_code}")
except Exception as e:
print(f"下载出错: {e}")
def update_headers(api, data, current_cookies):
with open('1.js', 'r', encoding='utf-8') as f:
js_script = f.read()
context = execjs.compile(js_script)
sign = context.call('getXs', api, data, current_cookies['a1'])
return sign
cookies_list =[ ‘你自己的cookies’ ]
headers_init = {‘你自己的headers’}
def main(file_path,cookies_list,headers_init,img_path,output_file_path):
data = pd.read_excel(file_path)
id_data = data['用户id']
if not os.path.exists(output_file_path):
with open(output_file_path, mode="w", encoding="utf-8-sig", newline="") as f:
f.write("note_id,xsec_token,type,title,user_id,text,topics,likes,comments,collects,shares\n")
url = "https://edith.xiaohongshu.com/api/sns/web/v1/user_posted"
people_index = 0
for user_id in id_data:
people_index += 1
has_more = True
logger.info(f'正在爬取第{people_index}个人 {user_id} 的帖子信息')
params = {
"num": "30",
"cursor": "",
"user_id": user_id,
"image_formats": "jpg,webp,avif",
"xsec_token": "",
"xsec_source": "pc_note"
}
k = 0
current_cookie_index = 0
while has_more:
while current_cookie_index < len(cookies_list):
current_cookies = cookies_list[current_cookie_index]
params_encoded = urllib.parse.urlencode(params)
headers = headers_init.copy()
sign_headers = update_headers(f'/api/sns/web/v1/user_posted?{params_encoded}', None, current_cookies)
headers['x-s'] = sign_headers['X-s']
headers['x-t'] = str(sign_headers['X-t'])
response1 = requests.get(url, headers=headers, cookies=current_cookies, params=params)
# print(response1.status_code)
# print(response1.json())
if response1.status_code == 200 and response1.json().get('success') == True:
data_page = response1.json()
notes = data_page.get('data', {}).get('notes', [])
has_more = data_page.get('data', {}).get('has_more', False)
for note in notes:
logger.info(f'正在爬取第{people_index}个人的第{k}个帖子')
k += 1
xsec_token = note.get('xsec_token')
note_id = note.get('note_id')
current_cookies=cookies_list[current_cookie_index]
note_data,status_code_result,headers_result = fetch_xiaohongshu_data(note_id, xsec_token, current_cookies)
if (status_code_result == 200 and note_data.get('success') == False) or status_code_result == 461:
current_cookie_index+=1
print('出现频率访问异常,切换下一个cookies,跳出当前页笔记')
print(note_data)
break
else:
pass
if status_code_result==200 and note_data.get('success') == True:
download_img(note_data,img_path,user_id,note_id)
result = parse_data(note_data)
note_type = note.get('type', 'N/A')
text = result[0]['内容'].replace("\n", "").strip()
likes = convert_to_int(result[0]['点赞数'])
comments = convert_to_int(result[0]['评论数'])
collects = convert_to_int(result[0]['收藏数'])
shares = convert_to_int(result[0]['转发数'])
topics = ", ".join(result[0]['话题']).replace("\n", "").strip()
display_title = note.get('display_title', 'N/A')
data_row = {
'note_id': note_id,
'xsec_token': xsec_token,
'type': note_type,
"title": display_title,
"user_id": user_id,
"text": text,
"topics": topics,
"likes": likes,
"comments": comments,
"collects": collects,
"shares": shares
}
df = pd.DataFrame([data_row])
df.to_csv(output_file_path, mode="a", index=False, header=False, encoding="utf-8-sig", quoting=1)
cursor = data_page.get('data', {}).get('cursor', "")
params['cursor']= data_page.get('data', {}).get('cursor', "")
logger.info(f'当前页的游标为:{cursor}')
if has_more==False:
break
else:
pass
else:
logger.info('------------------------------------')
logger.info('请求失败,切换下一个 cookies')
logger.info('------------------------------------')
current_cookie_index += 1
logger.info("所有用户数据处理完毕")
if __name__ == '__main__':
#输出文件路径
output_file_path = "result.csv"
#图片路径
img_path = 'img'
#用户id文件
file_path = '用户id.xlsx'
#运行主程序
main(file_path,cookies_list,headers_init,img_path,output_file_path)
作者:才华是浅浅的耐心