Python爬取小红书用户主页帖子附完整代码(2024.11.28)

声明:本项目仅供学习参考使用,请勿做任何商业化乃至违法行为

随着小红书成为越来越多用户分享和获取内容的热门平台,针对其数据的爬取需求也随之增加。本文将分享如何通过Python脚本爬取小红书用户数据,包括用户的帖子信息,并实现图片的批量下载,并根据用户ID和帖子ID分组存储,支持多账号切换,突破了频率访问限制。

废话不多说,先上结果:

1. 项目背景

本项目实现了对小红书用户数据的爬取,支持以下功能:

  1. 批量爬取多个用户的帖子信息。
  2. 对爬取的帖子进行数据解析,提取标题、内容、点赞数、评论数、收藏数、转发数以及话题等信息。
  3. 自动下载帖子中的图片,并按照用户ID和帖子ID进行分组存储。
  4. 支持多账号轮换,规避频率访问限制。

2. 环境准备

在开始之前,请确保已经安装了以下必要的Python库:

pip install requests pandas PyExecJS loguru openpyxl

3. 核心功能解析

3.1 爬取用户数据

通过requests模块,我们向小红书的用户接口发起GET请求,从响应中提取数据。为了提高成功率,脚本采用多组cookies轮换请求。

url = "https://edith.xiaohongshu.com/api/sns/web/v1/user_posted"
params = {
    "num": "30",
    "cursor": "",
    "user_id": user_id,
    "image_formats": "jpg,webp,avif",
    "xsec_token": "",
    "xsec_source": "pc_note"
}
response = requests.get(url, headers=headers, cookies=current_cookies, params=params)
if response.status_code == 200 and response.json().get('success') == True:
    data_page = response.json()
    notes = data_page.get('data', {}).get('notes', [])
    has_more = data_page.get('data', {}).get('has_more', False)
 

3.2 数据解析与存储

爬取的JSON数据包含用户帖子的信息。我们通过parse_data方法解析数据,提取有用的字段,并存储为CSV文件。

def parse_data(data):
    items = data.get('data', {}).get('items', [])
    parsed_info = []
    for item in items:
        note = item.get('note_card', {})
        parsed_info.append({
            '标题': note.get('title', ''),
            '内容': note.get('desc', '').strip(),
            '点赞数': note.get('interact_info', {}).get('liked_count', 0),
            '评论数': note.get('interact_info', {}).get('comment_count', 0),
            '收藏数': note.get('interact_info', {}).get('collected_count', 0),
            '转发数': note.get('interact_info', {}).get('share_count', 0),
            '话题': [word.strip('#') for word in note.get('desc', '').split() if '[话题]' in word]
        })
    return parsed_info
 

3.3 图片下载

帖子中的图片信息可以通过解析接口返回的数据获取。脚本实现了图片的批量下载,并将图片按用户ID和帖子ID分类存储。

def download_img(data, img_path, user_id, note_id):
    image_list = data["data"]["items"][0]["note_card"]["image_list"]
    image_urls = [img["url_default"] for img in image_list]
    output_dir = f"./{img_path}/{user_id}/{note_id}"
    os.makedirs(output_dir, exist_ok=True)
    for idx, url in enumerate(image_urls):
        try:
            response = requests.get(url)
            if response.status_code == 200:
                with open(os.path.join(output_dir, f"image_{idx + 1}.jpg"), "wb") as f:
                    f.write(response.content)
        except Exception as e:
            print(f"图片下载出错: {e}")
 

4. 突破频率限制

为应对小红书的频率限制,脚本采用以下策略:

  1. 多Cookies轮换: 通过cookies_list提供多组登录信息,切换访问身份。
  2. 动态签名: 利用JavaScript脚本生成接口调用所需的签名,确保请求合法。

def update_headers(api, data, current_cookies):
    with open('1.js', 'r', encoding='utf-8') as f:
        js_script = f.read()
        context = execjs.compile(js_script)
        sign = context.call('getXs', api, data, current_cookies['a1'])
        return sign
 

5. 主程序实现

主程序负责读取用户ID列表,按顺序调用上述功能,逐一爬取用户数据并保存为CSV文件。

def main(file_path, cookies_list, headers_init, img_path, output_file_path):
    data = pd.read_excel(file_path)
    id_data = data['用户id']
    for user_id in id_data:
        logger.info(f'正在爬取用户 {user_id} 的帖子信息')
        # 逐用户爬取数据
        # 保存数据到 CSV 文件
 

6.结果展示

7. 注意事项

  1. 合法使用: 爬取数据仅供学习研究使用,请勿用于非法用途。
  2. 数据敏感性: 确保对爬取的数据进行妥善处理,避免泄露用户隐私。
  3. 防止封禁: 请控制请求频率,避免对目标服务器造成压力。

8. 结语

本文介绍了如何通过Python爬取小红书用户数据并实现图片的批量下载,希望对您有所帮助。如果您有任何疑问或改进建议,欢迎在评论区留言。

如有任何问题可以联系:zx_luckfe

9.完整代码:

import time
import execjs
import urllib.parse
from loguru import logger
import pandas as pd
import os
from fetch_note_detail import fetch_xiaohongshu_data
import requests

def convert_to_int(value):
    if '万' in value:
        value = value.replace('万', '')
        return float(value) * 10000  # 转换为万单位的整数
    else:
        return value
def parse_data(data):
    items = data.get('data', {}).get('items', [])
    parsed_info = []
    for item in items:
        note = item.get('note_card', {})
        title = note.get('title', '')
        desc = note.get('desc', '')
        topics = [word.strip('#').replace('[话题]', '').strip() for word in desc.split() if '[话题]' in word]
        desc_cleaned = ' '.join([word for word in desc.split() if '[话题]' not in word]).strip()

        interact_info = note.get('interact_info', {})
        liked_count = interact_info.get('liked_count', 0)
        comment_count = interact_info.get('comment_count', 0)
        collected_count = interact_info.get('collected_count', 0)
        share_count = interact_info.get('share_count', 0)

        parsed_info.append({
            '标题': title,
            '内容': desc_cleaned,
            '点赞数': liked_count,
            '评论数': comment_count,
            '收藏数': collected_count,
            '转发数': share_count,
            '话题': topics
        })
    return parsed_info
def download_img(data,img_path,user_id,note_id):
    image_list = data["data"]["items"][0]["note_card"]["image_list"]
    image_urls = [img["url_default"] for img in image_list]
    output_dir = f"./{img_path}/{user_id}/{note_id}"
    os.makedirs(output_dir, exist_ok=True)
    for idx, url in enumerate(image_urls):
        image_path = os.path.join(output_dir, f"image_{idx + 1}.jpg")
        try:
            response = requests.get(url)
            if response.status_code == 200:
                with open(image_path, "wb") as f:
                    f.write(response.content)
                print(f"图片已下载: {image_path}")
            else:
                print(f"下载失败,状态码: {response.status_code}")
        except Exception as e:
            print(f"下载出错: {e}")
def update_headers(api, data, current_cookies):
    with open('1.js', 'r', encoding='utf-8') as f:
        js_script = f.read()
        context = execjs.compile(js_script)
        sign = context.call('getXs', api, data, current_cookies['a1'])
        return sign

cookies_list =[  ‘你自己的cookies’  ]
headers_init = {‘你自己的headers’}

def main(file_path,cookies_list,headers_init,img_path,output_file_path):
    data = pd.read_excel(file_path)
    id_data = data['用户id']
    if not os.path.exists(output_file_path):
        with open(output_file_path, mode="w", encoding="utf-8-sig", newline="") as f:
            f.write("note_id,xsec_token,type,title,user_id,text,topics,likes,comments,collects,shares\n")
    url = "https://edith.xiaohongshu.com/api/sns/web/v1/user_posted"
    people_index = 0
    for user_id in id_data:
        people_index += 1
        has_more = True
        logger.info(f'正在爬取第{people_index}个人 {user_id} 的帖子信息')
        params = {
            "num": "30",
            "cursor": "",
            "user_id": user_id,
            "image_formats": "jpg,webp,avif",
            "xsec_token": "",
            "xsec_source": "pc_note"
        }
        k = 0
        current_cookie_index = 0
        while has_more:
            while current_cookie_index < len(cookies_list):
                current_cookies = cookies_list[current_cookie_index]
                params_encoded = urllib.parse.urlencode(params)
                headers = headers_init.copy()
                sign_headers = update_headers(f'/api/sns/web/v1/user_posted?{params_encoded}', None, current_cookies)
                headers['x-s'] = sign_headers['X-s']
                headers['x-t'] = str(sign_headers['X-t'])
                response1 = requests.get(url, headers=headers, cookies=current_cookies, params=params)
                # print(response1.status_code)
                # print(response1.json())
                if response1.status_code == 200 and response1.json().get('success') == True:
                    data_page = response1.json()
                    notes = data_page.get('data', {}).get('notes', [])
                    has_more = data_page.get('data', {}).get('has_more', False)
                    for note in notes:
                        logger.info(f'正在爬取第{people_index}个人的第{k}个帖子')
                        k += 1
                        xsec_token = note.get('xsec_token')
                        note_id = note.get('note_id')
                        current_cookies=cookies_list[current_cookie_index]
                        note_data,status_code_result,headers_result = fetch_xiaohongshu_data(note_id, xsec_token, current_cookies)
                        if (status_code_result == 200 and note_data.get('success') == False) or status_code_result == 461:
                            current_cookie_index+=1
                            print('出现频率访问异常,切换下一个cookies,跳出当前页笔记')
                            print(note_data)
                            break
                        else:
                            pass
                        if status_code_result==200 and note_data.get('success') == True:
                            download_img(note_data,img_path,user_id,note_id)
                            result = parse_data(note_data)
                            note_type = note.get('type', 'N/A')
                            text = result[0]['内容'].replace("\n", "").strip()
                            likes = convert_to_int(result[0]['点赞数'])
                            comments = convert_to_int(result[0]['评论数'])
                            collects = convert_to_int(result[0]['收藏数'])
                            shares = convert_to_int(result[0]['转发数'])
                            topics = ", ".join(result[0]['话题']).replace("\n", "").strip()
                            display_title = note.get('display_title', 'N/A')
                            data_row = {
                                'note_id': note_id,
                                'xsec_token': xsec_token,
                                'type': note_type,
                                "title": display_title,
                                "user_id": user_id,
                                "text": text,
                                "topics": topics,
                                "likes": likes,
                                "comments": comments,
                                "collects": collects,
                                "shares": shares
                            }
                            df = pd.DataFrame([data_row])
                            df.to_csv(output_file_path, mode="a", index=False, header=False, encoding="utf-8-sig", quoting=1)
                    cursor = data_page.get('data', {}).get('cursor', "")
                    params['cursor']= data_page.get('data', {}).get('cursor', "")
                    logger.info(f'当前页的游标为:{cursor}')
                    if has_more==False:
                        break
                    else:
                        pass
                else:
                    logger.info('------------------------------------')
                    logger.info('请求失败,切换下一个 cookies')
                    logger.info('------------------------------------')
                    current_cookie_index += 1


    logger.info("所有用户数据处理完毕")

if __name__ == '__main__':
    #输出文件路径
    output_file_path = "result.csv"
    #图片路径
    img_path = 'img'
    #用户id文件
    file_path = '用户id.xlsx'
    #运行主程序
    main(file_path,cookies_list,headers_init,img_path,output_file_path)

作者:才华是浅浅的耐心

物联沃分享整理
物联沃-IOTWORD物联网 » Python爬取小红书用户主页帖子附完整代码(2024.11.28)

发表回复