• 去除空格

  • str = ' 这是 含 空格 '
    print(f'去除两端空格={str.strip()}')
    print(f'去除左端空格={str.lstrip()}')
    print(f'去除右端空格={str.rstrip()}')
    print(f'去除全部空格={str.replace(" ", "")}')
  • 方法返回对象yield

  • yield ':'.join([ip, port])
    
    
    yield {
                'ranking': ranking,
                'name': name,
                'img': img,
                'score': score,
                'author': author,
                'desc': desc,
            }
  • 字符串格式化变量format

  • BASE_URL = 'https://proxylist.geonode.com/api/proxy-list?limit=500&page={page}&sort_by=lastChecked&sort_type=desc'
    
    MAX_PAGE = 18
    
    urls = [BASE_URL.format(page=page) for page in range(1, MAX_PAGE + 1)]
  • 字符串拆分split

  • addr_split = addr.split(':')
                if(len(addr_split) == 2):
                    host = addr_split[0]
                    port = addr_split[1]
                    yield Proxy(host=host, port=port)
  • 字符串拼接

  • tmp = ['b', 'a', 'c']
    print(",".join(str(i) for i in tmp))
  • time格式化strptime

  • from datetime import datetime
    
    targetTimeStr = "2024-10-08 10:08:00"
    target_time = datetime.strptime(targetTimeStr, "%Y-%m-%d %H:%M:%S")
    
    
    # 获取当前时间
            current_time = datetime.now()
            print(f"当前时间: {current_time.strftime('%Y-%m-%d %H:%M:%S.%f')}")
            # 检查当前时间是否达到了目标时间
            if current_time >= target_time:
  • json转对象 【json.loads

  • result = json.loads(html)

  • 对象转json 【json.dumps】

  • json.dumps(item, ensure_ascii=False)
  • 对象中获取属性值

  • proxy_list = result['data']

  • http请求并返回响应内容

  • import requests
    
    # 1 基础
    def request_dangdang(url):
        try:
            response = requests.get(url)
            if response.status_code == 200:
                return response.text
        except requests.RequestException as e:
            print(e)
            return None
    
    
    
    # 2 扩展【设置header】
    def request_data(url):
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36',
            'cookie': 'bid=dT8Z3OE5_cY; _pk_id.100001.4cf6=97307cec25d927ab.1727414571.; __utmz=30149280.1727414571.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); __utmz=223695111.1727414571.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); __yadk_uid=kz8b5hlKFkxH8Y9DITjOMuxWgBYikz0h; ll="108296"; _vwo_uuid_v2=D5055D26948C52E0832B26F1769798A7F|836a6dfe85637bb8c39462e0dadf8747; __utmc=30149280; __utmc=223695111; _pk_ses.100001.4cf6=1; ap_v=0,6.0; __utma=30149280.1074059804.1727414571.1728178377.1728193731.4; __utmb=30149280.0.10.1728193731; __utma=223695111.688962959.1727414571.1728178377.1728193731.4; __utmb=223695111.0.10.1728193731'
        }
        try:
            response = requests.get(url=url, headers=headers)
            if response.status_code == 200:
                return response.text
            return None
        except requests.RequestException:
            return None
  • 文件写入txt

  • """抓取到的数据写入文件"""
    
    def write_item_to_file(item):
        print('开始写入数据 ====> ' + str(item))
        with open('dangdang_top_500_book.txt', 'a', encoding='UTF-8') as f:
            f.write(json.dumps(item, ensure_ascii=False) + '\n')
            f.close()
  • 写入Excel

  • import xlwt
    
    # 数据写入Excel
    def save_to_excel(result_list):
        if result_list is None:
            return
        book = xlwt.Workbook(encoding='utf-8')
        sheet = book.add_sheet('豆瓣电影Top250', cell_overwrite_ok=True)
        sheet.write(0, 0, '排名')
        sheet.write(0, 1, '电影名称')
        sheet.write(0, 2, '图片')
        sheet.write(0, 3, '评分')
        sheet.write(0, 4, '作者')
        sheet.write(0, 5, '简介')
    
        for i, item in enumerate(result_list):
            row = i + 1
            sheet.write(row, 0, item['ranking'])
            sheet.write(row, 1, item['name'])
            sheet.write(row, 2, item['img'])
            sheet.write(row, 3, item['score'])
            sheet.write(row, 4, item['author'])
            sheet.write(row, 5, item['desc'])
    
        book.save('豆瓣电影Top250.xls')
  • for循环

  • # 1-25页
    if __name__ == '__main__':
        for i in range(1, 26):
            print(f'正在抓取第{i}页')
            main(i) #自己的业务处理函数
  • html解析 BeautifulSoup

  • import requests
    from bs4 import BeautifulSoup
    import xlwt
    
    # 采集数据
    def collect_data():
        result_list = []
        # 分页获取数据
        for i in range(0, 10):
            url = 'https://movie.douban.com/top250?start=' + str(i * 25) + '&filter='
            html = request_data(url)
            soup = BeautifulSoup(html, 'lxml')
            movie_sub_list = parse_result(soup)
            if movie_sub_list:
                result_list.extend(movie_sub_list)
        # print(f'movie_list={movie_list}')
        if result_list:
            save_to_excel(result_list)
    
    
    
    # 解析数据
    def parse_result(soup):
        movie_list_this_page = soup.find('ol', class_='grid_view').find_all('li')
        for item in movie_list_this_page:
            ranking = item.find('em').get_text()
            name = item.find(class_='title').string
            img = item.find('a').find('img').get('src')
            score = item.find(class_='rating_num').string
            author = item.find('p', class_='').get_text(strip=True)
            if item.find(class_='inq') is not None:
                desc = item.find(class_='inq').string
            else:
                desc = '暂无'
    
            print(ranking, name, img, score, author, desc)
            yield {
                'ranking': ranking,
                'name': name,
                'img': img,
                'score': score,
                'author': author,
                'desc': desc,
            }
  • 进程池使用multiprocessing

  • import multiprocessing
    
    import requests
    from bs4 import BeautifulSoup
    import xlwt
    import time
    import sys
    
    # 采集数据
    def collect_data():
        result_list = []
        start_time = time.time()
        urls = []
        try:
            # 根据电脑核数设置进程数
            pool = multiprocessing.Pool(multiprocessing.cpu_count())
            # pool = multiprocessing.Pool(1)
            # 分页获取数据
            for i in range(0, 10):
                url = 'https://movie.douban.com/top250?start=' + str(i * 25) + '&filter='
                urls.append(url)
            # 各进程处理后格式为:[[{},{}],[{}]]
            result_list_mid = pool.map(collect_data_pool, urls)
            # result_list_mid = pool.apply_async(collect_data_pool, urls)
            print('总条数=======', len(result_list_mid))
            if result_list_mid:
                result_list = [item for sublist in result_list_mid for item in sublist]
                # result_list.sort(key=lambda x: x['ranking'], reverse=True)
                save_to_excel(result_list)
            pool.close()
            pool.join()
        except Exception as e:
            print('采集数据【异常】', str(e))
        end_time = time.time()
        print('总耗时:', end_time - start_time)
  • 持续更新补充ing……

    作者:Defry

    物联沃分享整理
    物联沃-IOTWORD物联网 » python菜鸟知识

    发表回复