Python异步爬虫:高并发、从经纬度到谷歌地图瓦片获取、拼接

前言

​ 我的研究方向是图片地理定位,简而言之,就是给定一张在全球随机位置拍摄的图片,在不包含exif等额外信息的前提下,仅仅通过图像来确定这个图片的拍摄经纬度。在CVPR2024上发表了的论文《OpenStreetView-5M: The Many Roads to Global Visual Geolocation》推出了一个适用于我的领域的全新数据集,其中有在全球拍摄的图片和对应的经纬度信息,一共有五百万多张图片。我的研究需要对这个数据集进行拓展,不仅要街景图片,也要经纬度对应的卫星图片。因此我的本次作业打算写一个基于python的爬虫,通过经纬度映射到url参数 ,然后获取谷歌地图的瓦片。然而网络上的相关教程都是几年前的了,谷歌的url都换了,我就浅浅重新从0来实现这个程序,并记录一下实现过程,正好也把我的python大作业完成了。

​ 考虑到这个数据集很大,单线程的爬虫肯定是不行的。正好python课上老师教了多线程,因此就用多线程(伪)+异步的方式来做。另外要学习经纬度到url参数的转换原理并进行实验,然后要采用代理池的方式来防止谷歌的反爬。

​ 总结要完成的任务,就是以下几点:

  • 多线程爬虫的实现

  • 经纬度到瓦片参数的转换

  • 卫星图片的拼接

  • 任务的分配

  • 数据的存储

  • 反爬:使用代理池、随机请求头、线程呼吸

    好的,让我们开始。

  • 多线程爬虫实现

    首先,使用burpsuite抓包,获得能够成功爬取的包头

    ​ 有关burp的使用,我之前也写过一篇博客,这里就不再赘述了

    ​ 有了这个包,基于这个包进行单元测试,看看能不能在纯python环境中成功实现。这里我先使用了本地的clash for windows代理,然后不使用任务队列,仅运行单个任务。fetch函数存储了api,main函数运行任务队列。

    import aiohttp
    import asyncio
    
    async def fetch(session, x, y, z):
        url = f'https://khms1.google.com/kh/v=991?x={x}&y={y}&z={z}'
        headers = {
            'Host': 'khms1.google.com',
            'Origin': 'https://www.google.com',
            'Sec-Ch-Ua-Platform': '"Windows"',
            'Accept-Language': 'zh-CN,zh;q=0.9',
            'Sec-Ch-Ua': '"Not?A_Brand";v="99", "Chromium";v="130"',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.6723.70 Safari/537.36',
            'Sec-Ch-Ua-Mobile': '?0',
            'Accept': 'image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8',
            'X-Client-Data': 'CJ+MywE=',
            'Sec-Fetch-Site': 'same-site',
            'Sec-Fetch-Mode': 'cors',
            'Sec-Fetch-Dest': 'image',
            'Referer': 'https://www.google.com/',
            'Accept-Encoding': 'gzip, deflate, br',
            'Priority': 'i'
        }
    
        # 使用本地代理
        proxy = 'http://127.0.0.1:7890'
    
        async with session.get(url, headers=headers, proxy=proxy) as response:
            # 我们需要的是图片内容,所以直接读取内容
            content = await response.read()
            return content
    
    async def main(x, y, z):
        # 使用aiohttp的ClientSession
        async with aiohttp.ClientSession() as session:
            content = await fetch(session, x, y, z)
            with open(f'image_x{x}_y{y}_z{z}.png', 'wb') as f:
                f.write(content)
            print(f'图片已保存:image_x{x}_y{y}_z{z}.png')
    
    # 传入参数x, y, z
    x = 0
    y = 0
    z = 0
    # 运行异步爬虫
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(x, y, z))
    

    ​ 爬到的图片就是下面这个玩意,一张大小为256×256的png图片。谷歌地图当下的api为 https://khms1.google.com/kh/v=991?x={x}&y={y}&z={z},其中x,y是区域坐标,z是缩放大小。我们使用的测试参数为0,0,0,缩放大小为0,谷歌返回了整个地球的照片,图片的中点为经纬度为0,0的点,非洲西部的赤道几内亚湾。当缩放等级为0的时候,x,y都只有0。

    ​ 当我们将缩放设置成1时,x和y可以是(0,0)(1,0)(0,1)(1,1)四个点。其中中点坐标就是偏移一半经纬度后的点,比如当参数为x=0,y=0,z=1时,得到的图片中心经纬度为(-90,45)。

    ​ 显然,z越大,图片就越清晰,道理很简单。下面要解决的问题是经纬度如何映射到x,y,z上,以及如何确定一组参数,获得各个缩放等级的图像,以综合更多维度的地理信息。

    经纬度到瓦片参数的转换

    ​ 谷歌地图瓦片系统是一个四叉树结构,其中每个瓦片都有一个唯一的 x, y, z 坐标。以下是转换过程的步骤:

    1. 确定缩放级别 z: 谷歌地图的缩放级别 z 范围是从 0 到 21(或更高,但通常最高到 21)。每个缩放级别将地球表面分成 2^z * 2^z 个瓦片。
    2. 将经纬度转换为瓦片坐标:
    3. 将经度转换为 x 坐标:
    4. 地球可以看作是 -180 到 180 经度的范围。
    5. 将经度范围 [-180, 180] 映射到 [0, 2^z – 1]。
    6. 将纬度转换为 y 坐标:
    7. 地球可以看作是 -85.0511 到 85.0511 纬度的范围(这是谷歌地图可以显示的最大纬度范围)。
    8. 将纬度范围 [-85.0511, 85.0511] 映射到 [0, 2^z – 1]。

    数学解释:

    ​ 经度转换为 x 坐标: (lon + 180.0) / 360.0 将经度从 [-180, 180] 范围映射到 [0, 1]。 * (1 << zoom) 将这个比例乘以 2^zoom,得到最终的瓦片 x 坐标。
    ​ 纬度转换为 y 坐标: math.log(math.tan(lat * math.pi / 180.0) + 1.0 / math.cos(lat * math.pi / 180.0)) / math.pi 是一个将纬度从 [-π/2, π/2] 范围映射到 [0, 1] 的函数。 (1.0 – …) 反转这个比例,因为瓦片坐标的 y 轴是从上到下增长的,而纬度是从下到上增长的。 * (1 << zoom) 同样地,将这个比例乘以 2^zoom,得到最终的瓦片 y 坐标。

    import math
    
    def latlon_to_tile(lat, lon, zoom):
        """
        将经纬度转换为谷歌地图瓦片坐标。
        
        参数:
        lat -- 纬度
        lon -- 经度
        zoom -- 缩放级别
        
        返回:
        (x, y) -- 瓦片坐标
        """
        # 确保纬度在谷歌地图的显示范围内
        lat = max(min(lat, 85.0511), -85.0511)
        
        # 计算瓦片坐标
        x = int((lon + 180.0) / 360.0 * (1 << zoom))
        y = int((1.0 - math.log(math.tan(lat * math.pi / 180.0) + 1.0 / math.cos(lat * math.pi / 180.0)) / math.pi) / 2.0 * (1 << zoom))
        
        return x, y
    
    # 示例使用
    zoom_level = 1
    latitude = 0
    longitude = 0
    tile_x, tile_y = latlon_to_tile(latitude, longitude, zoom_level)
    print(f"Tile coordinates at zoom level {zoom_level} for latitude {latitude} and longitude {longitude}: x={tile_x}, y={tile_y}")
    
    

    下面我们测试一下这个代码能不能正常运行。我选取了西安青少年宫的坐标(34.265056615386364, 108.95181859260963),设置缩放等级为19,最终生成的瓦片参数为x=420816, y=208970,放到之前的脚本中,得到的图片如下:

    我们再去谷歌地图上看一下,可以看到那个很有特点的建筑,确实是成功地把对应的图片获取到了。

    卫星图片的拼接

    已经获得了图片,但是我们的目标是获得尽可能清晰的卫星图。因此我们实现再实现一个小任务,将四个高级缩放的图片拼接成一个低级缩放的图片。来看看获得的图片和上一级相比是否更清晰,以确定最优的爬取缩放等级,来节省资源。

    我们定义两个新函数,latlon_to_tile_bigger用于生成临近的四张图片的参数,combine_tiles用于拼接这四张图片。

    def latlon_to_tile_bigger(lat, lon, zoom):
        """
        获取所给经纬度周围的四个瓦片图片的坐标进行拼接
        参数:
        lat -- 纬度
        lon -- 经度
        zoom -- 缩放级别
        
        返回:
        (x, y) -- 瓦片坐标
        """
        zoom = zoom
        # 确保纬度在谷歌地图的显示范围内
        lat = max(min(lat, 85.0511), -85.0511)
        
        # 计算瓦片坐标
        x = int((lon + 180.0) / 360.0 * (1 << zoom))
        y = int((1.0 - math.log(math.tan(lat * math.pi / 180.0) + 1.0 / math.cos(lat * math.pi / 180.0)) / math.pi) / 2.0 * (1 << zoom))
        
        return [(x, y+1),(x-1, y+1),(x, y),(x-1, y)]
    
    from PIL import Image
    
    def combine_tiles(tile_images):
        """
        将四个瓦片图片拼接成一张大图。
    
        参数:
        tile_images -- 包含四个 Image 对象的列表,分别是 (x, y), (x-1, y), (x, y-1), (x-1, y-1) 的瓦片图片。
    
        返回:
        Image -- 拼接后的图片。
        """
        # 确保提供了四张图片
        if len(tile_images) != 4:
            raise ValueError("需要四张图片来拼接")
        for i,byte_data in enumerate(tile_images):
            # 使用io.BytesIO创建一个字节流
            image_stream = io.BytesIO(byte_data)
            
            # 使用Image.open从字节流中打开图像
            tile_images[i] = Image.open(image_stream)
        # 假设所有图片尺寸相同
        width, height = tile_images[0].size
    
        # 创建一个新的空白图片,大小是单张图片的两倍
        combined_image = Image.new('RGB', (width * 2, height * 2))
    
        # 将四张图片放置到正确的位置
        combined_image.paste(tile_images[2], (width, 0))  # (x, y)
        combined_image.paste(tile_images[3], (0, 0))      # (x-1, y)
        combined_image.paste(tile_images[0], (width, height))  # (x, y-1)
        combined_image.paste(tile_images[1], (0, height))      # (x-1, y-1)
    
        return combined_image
    
    

    然后修改主函数,使其能一次运行四个爬取任务,随后用上面的函数存为一个大图:

    async def main_multitask(lat, lon, z):
        # 使用aiohttp的ClientSession
        async with aiohttp.ClientSession() as session:
            parse = latlon_to_tile_bigger(lat,lon,z)
            tasks = [fetch(session,i[0],i[1],z) for i in parse]
            results = await asyncio.gather(*tasks)
            return results
    z = 20
    result = asyncio.run(main_multitask(lat,lon,z))
    bigger_image = combine_tiles(result)
    print(bigger_image.size)
    bigger_image.save("bigger_image.png")
    

    最终我们用缩放等级为20,爬取四张图进行拼接后得到的大图如下。

    和上面那个图对比分析,可以看到明显要比zoom=19的那张清晰一点。但是两张图片很明显是同一张图,猜想谷歌地图的低缩放tile就是高缩放tile大图的压缩版本。

    ​ 因此我们想要尽可能获得更清晰的图片的话,就要用最小的tile来拼接。已知,谷歌地图的最大缩放等级为21,我们用z=21,生成16张图进行拼接。但是尝试后发现没啥变化。因此,四张缩放等级为20的图片拼接后能尽可能避免图片压缩,同时,可以尽量获得周围的环境信息。

    ​ 到这里,我们的程序已经很出色了。提取几个osv5m中的样本进行测试,获取到图片的中心正是照片的拍摄位置~



    ​ 然而,这里有个比较坑的地方。如果减小缩放等级数值,会发现低级缩放图片并不位于图片正中。为了确保低级图片在正中,一个可行的解法是获取高两级的缩放图片,然后爬取该图片上下左右以及四个角位置的共八张图片后进行拼接。虽然需要爬取的图片数量变多了,但是这样能保证下级图片处在图片中心五分之一的位置,这是我需要的。

    任务分配与数据存储

    ​ 一张图片分别存为jpg和png,大小差了九倍。果断采用jpg进行读取。

    ​ 我们想要获取各个缩放等级的,包含不同范围内特征的图片,需要从大缩放等级拓展到小缩放等级。对于每一个经纬度,我们获取z=10-19的所有图片,观察哪些是需要爬取的。

  • z=19-17:街道、街区级

  • z=16-14:城市、轮廓级

  • z=13-10,地貌级,看不出城市细节

  • ​ 关于怎么选择缩放大小,还需要做实验来判断。目前从19开始间隔选择,选择4\7\10\13\16\19这几个缩放级别。当前数据集大小为200G左右,在数据扩展后,量级大概会来到1.4T。这个数据量倒也能够接收,不过得买一块新硬盘了,还得再买一个新vpn。

    ​ 数据的存储就很简单了,每一个缩放等级新建一个文件夹,按照原文件名存就好。

    ​ 这样,每张图片有6个缩放等级,每个缩放等级请求九次。每张图进行54个请求,一次异步请求n张图片,来组成一个异步队列。

    async def run_5X5(session,lat, lon, z,id):
        # 使用aiohttp的ClientSession
            print(f"执行:id={id}.z={z}")
            parse = latlon_to_tile_bigger5X5(lat,lon,z+2)
            tasks = [fetch(session,i[0],i[1],z+2) for i in parse]
            results = await asyncio.gather(*tasks)
            try:
                bigger_image = combine_tiles5X5(byte2img(results))
                bigger_image.save(f"test/{z}/{id}.jpg")
            except:
                print(results[:10],lat,lon,z,id)
    
    async def run_queqe(data):
        async with aiohttp.ClientSession() as session:
            tasks = [run_5X5(session,i[1],i[2],z,i[0]) for i in data for z in range(4,20,3)]
            await asyncio.gather(*tasks)
            
    data = [
        [1,53.32936106,44.832726],
    [2,40.453126	,-119.334174],
    [3,60.64325302	,9.051189134],
    [4,17.1758181	,77.88178872],
    [5,32.02595871	,-93.53468712],
    [6,-23.97027904	,-53.6356253],
    
    ]
    asyncio.run(run_queqe(data))
    

    结果:

    ​ 感觉不错,然后就是反反爬了。

    反反爬

    使用队列呼吸

    ​ 在组织任务的时候,每次发多少并发请求是有讲究的。长期维持高并发会被识别出来,太低的并发根本短时间里又完成不了我们五百万级的任务。在处理这类问题时,常采用短时间维持高并发,长时间维持中低并发的方式,在模拟用户操作的同时,防止被服务器封禁。

    ​ 我们以10000张图片为一个循环,前1000条使用20个请求并发,然后2000条使用40个请求并发,中间4000条使用50个请求并发,最后3000条使用20个请求并发,然后随机休眠3-5分钟。全都运行完之后应该会报一个索引不存在的错误(数据量不能被10000整除)。到时候对剩下的图片进行统一爬取即可。

    from tqdm import tqdm
    import pandas as pd
    import time
    import random
    df = pd.read_csv("test.csv")
    value  = df[['id','latitude','longitude']].values.tolist()
    del df
    for i in range(len(value)//10000):
        datatemp = value[i*10000:i*10000+1000]
        print(f"第{i}轮开始")
        start_time = time.time()
    
        for j in tqdm(range(1000//10)):
            asyncio.run(run_queqe(datatemp[j*10:j*10+10]))
        datatemp = value[i*10000+1000:i*10000+3000]
        for j in tqdm(range(2000//20)):
            asyncio.run(run_queqe(datatemp[j*20:j*20+20]))
        datatemp = value[i*10000+3000:i*10000+7000]
        for j in tqdm(range(4000//50)):
            asyncio.run(run_queqe(datatemp[j*50:j*50+50]))
        datatemp = value[i*10000+7000:(i+1)*10000]
        for j in tqdm(range(3000//20)):
            asyncio.run(run_queqe(datatemp[j*20:j*20+20]))
        sleep_duration = random.randint(180, 300)
        end_time = time.time()
        elapsed_time = end_time - start_time
        print(f"第{i}轮结束,执行时间{elapsed_time}秒,将睡眠{sleep_duration}秒")
        time.sleep(sleep_duration)
    
    使用随机代理

    ​ 有很多种实现方法,我采用了比较简单的一种,修改clash for windows的yaml文件,启用负载均衡。

    https://blog.csdn.net/weixin_49117441/article/details/140072838?spm=1001.2101.3001.6650.2&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ECtr-2-140072838-blog-137092240.235%5Ev43%5Epc_blog_bottom_relevance_base3&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ECtr-2-140072838-blog-137092240.235%5Ev43%5Epc_blog_bottom_relevance_base3&utm_relevant_index=5

    随机请求头

    ​ 我这个项目好像不需要。

    下面是所有代码

    import aiohttp
    import asyncio
    import math
    import io
    
    def latlon_to_tile(lat, lon, zoom):
    
        # 确保纬度在谷歌地图的显示范围内
        lat = max(min(lat, 85.0511), -85.0511)
        
        # 计算瓦片坐标
        x = int((lon + 180.0) / 360.0 * (1 << zoom))
        y = int((1.0 - math.log(math.tan(lat * math.pi / 180.0) + 1.0 / math.cos(lat * math.pi / 180.0)) / math.pi) / 2.0 * (1 << zoom))
        
        return x, y
    
    def latlon_to_tile_bigger(lat, lon, zoom):
        """
        获取所给经纬度周围的四个瓦片图片的坐标进行拼接
        参数:
        lat -- 纬度
        lon -- 经度
        zoom -- 缩放级别
        
        返回:
        (x, y) -- 瓦片坐标
        """
        zoom = zoom
        # 确保纬度在谷歌地图的显示范围内
        lat = max(min(lat, 85.0511), -85.0511)
        
        # 计算瓦片坐标
        x = int((lon + 180.0) / 360.0 * (1 << zoom))
        y = int((1.0 - math.log(math.tan(lat * math.pi / 180.0) + 1.0 / math.cos(lat * math.pi / 180.0)) / math.pi) / 2.0 * (1 << zoom))
        
        return [(x-1,y+1),(x,y+1),(x+1,y+1),(x-1,y),(x,y),(x+1,y),(x-1,y-1),(x,y-1),(x+1,y-1)]
    def latlon_to_tile_bigger5X5(lat, lon, zoom):
        """
        获取所给经纬度周围的四个瓦片图片的坐标进行拼接
        参数:
        lat -- 纬度
        lon -- 经度
        zoom -- 缩放级别
        
        返回:
        (x, y) -- 瓦片坐标
        """
        zoom = zoom
        # 确保纬度在谷歌地图的显示范围内
        lat = max(min(lat, 85.0511), -85.0511)
        
        # 计算瓦片坐标
        x = int((lon + 180.0) / 360.0 * (1 << zoom))
        y = int((1.0 - math.log(math.tan(lat * math.pi / 180.0) + 1.0 / math.cos(lat * math.pi / 180.0)) / math.pi) / 2.0 * (1 << zoom))
        
        return [(x-2,y+2),(x-1,y+2),(x,y+2),(x+1,y+2),(x+2,y+2),
                (x-2,y+1),(x-1,y+1),(x,y+1),(x+1,y+1),(x+2,y+1),
                (x-2,y ),(x-1,y ),(x,y ),(x+1,y ),(x+2,y ),
                (x-2,y-1),(x-1,y-1),(x,y-1),(x+1,y-1),(x+2,y-1),
                (x-2,y-2),(x-1,y-2),(x,y-2),(x+1,y-2),(x+2,y-2)
                ]
    from PIL import Image
    
    def byte2img(tile_images):
        for i,byte_data in enumerate(tile_images):
                # 使用io.BytesIO创建一个字节流
                image_stream = io.BytesIO(byte_data)
                
                # 使用Image.open从字节流中打开图像
                tile_images[i] = Image.open(image_stream)
        return tile_images
    def combine_tiles2x2(tile_images):
        """
        将四个瓦片图片拼接成一张大图。
    
        参数:
        tile_images -- 包含四个 Image 对象的列表,分别是 (x, y), (x-1, y), (x, y-1), (x-1, y-1) 的瓦片图片。
    
        返回:
        Image -- 拼接后的图片。
        """
        # 假设所有图片尺寸相同
        width, height = tile_images[0].size
    
        # 创建一个新的空白图片,大小是单张图片的两倍
        combined_image = Image.new('RGB', (width * 2, height * 2))
    
        # 将四张图片放置到正确的位置
        combined_image.paste(tile_images[2], (width, 0))  # (x, y)
        combined_image.paste(tile_images[3], (0, 0))      # (x-1, y)
        combined_image.paste(tile_images[0], (width, height))  # (x, y-1)
        combined_image.paste(tile_images[1], (0, height))      # (x-1, y-1)
    
        return combined_image
    
    def combine_tiles(tile_images):
        """
        将九个瓦片图片拼接成一张大图。
    
        参数:
        tile_images -- 包含九个 Image 对象的列表,分别是
                       (x-1, y+1), (x, y+1), (x+1, y+1),
                       (x-1, y),   (x, y),   (x+1, y),
                       (x-1, y-1), (x, y-1), (x+1, y-1) 的瓦片图片。
    
        返回:
        Image -- 拼接后的图片。
        """
        # 假设所有图片尺寸相同
        width, height = tile_images[4].size  # 使用中心图片的尺寸
        # 创建一个新的空白图片,大小是单张图片的三倍
        combined_image = Image.new('RGB', (width * 3, height * 3))
        # 将九张图片放置到正确的位置
        combined_image.paste(tile_images[6], (0, 0))      # (x-1, y+1)
        combined_image.paste(tile_images[7], (width, 0))  # (x+1, y+1)
        combined_image.paste(tile_images[8], (width * 2, 0))          # (x-1, y)
        combined_image.paste(tile_images[3], (0, height))       # (x+1, y)
        combined_image.paste(tile_images[4], (width, height))           # (x, y+1)
        combined_image.paste(tile_images[5], (width*2, height))  # (x, y-1)
        combined_image.paste(tile_images[0], (0, height * 2))      # (x-1, y-1)
        combined_image.paste(tile_images[1], (width , height * 2))  # (x+1, y-1)
        combined_image.paste(tile_images[2], (width * 2, height * 2))               # (x-1, y)
        return combined_image
    
    def combine_tiles5X5(tile_images):
        """
        将九个瓦片图片拼接成一张大图。
    
        参数:
        tile_images -- 包含九个 Image 对象的列表,分别是
                       (x-1, y+1), (x, y+1), (x+1, y+1),
                       (x-1, y),   (x, y),   (x+1, y),
                       (x-1, y-1), (x, y-1), (x+1, y-1) 的瓦片图片。
    
        返回:
        Image -- 拼接后的图片。
        """
        # 假设所有图片尺寸相同
        width, height = tile_images[4].size  # 使用中心图片的尺寸
        # 创建一个新的空白图片,大小是单张图片的三倍
        combined_image = Image.new('RGB', (width * 5, height * 5))
        # 将九张图片放置到正确的位置             
        for i in range(5):
            for j in range(5):
                combined_image.paste(tile_images[i*5+j], (width * j, height * (4-i)))      
           
        return combined_image
    async def fetch(session, x, y, z):
        url = f'https://khms1.google.com/kh/v=991?x={x}&y={y}&z={z}'
        headers = {
            'Host': 'khms1.google.com',
            'Origin': 'https://www.google.com',
            'Sec-Ch-Ua-Platform': '"Windows"',
            'Accept-Language': 'zh-CN,zh;q=0.9',
            'Sec-Ch-Ua': '"Not?A_Brand";v="99", "Chromium";v="130"',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.6723.70 Safari/537.36',
            'Sec-Ch-Ua-Mobile': '?0',
            'Accept': 'image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8',
            'X-Client-Data': 'CJ+MywE=',
            'Sec-Fetch-Site': 'same-site',
            'Sec-Fetch-Mode': 'cors',
            'Sec-Fetch-Dest': 'image',
            'Referer': 'https://www.google.com/',
            'Accept-Encoding': 'gzip, deflate, br',
            'Priority': 'i'
        }
    
        # 使用本地代理
        proxy = 'http://127.0.0.1:7890'
    
        async with session.get(url, headers=headers, proxy=proxy) as response:
            # 这里假设我们需要的是图片内容,所以直接读取内容
            content = await response.read()
            return content
    
    async def main(lat, lon, z):
        # 使用aiohttp的ClientSession
        async with aiohttp.ClientSession() as session:
            i = latlon_to_tile(lat,lon,z)
            content = await fetch(session, i[0], i[1], z)
            # 假设我们获取的是图片,可以将其保存到本地
                
            with open(f'image_x{i[0]}_y{i[1]}_z{z}.png', 'wb') as f:
                f.write(content)
    async def run_5X5(session,lat, lon, z,id):
        # 使用aiohttp的ClientSession
            print(f"执行:id={id}.z={z}")
            parse = latlon_to_tile_bigger5X5(lat,lon,z+2)
            tasks = [fetch(session,i[0],i[1],z+2) for i in parse]
            results = await asyncio.gather(*tasks)
            try:
                bigger_image = combine_tiles5X5(byte2img(results))
                bigger_image.save(f"test/{z}/{int(id)}.jpg")
            except:
                print(results[:10],lat,lon,z,id)
    
    async def run_3X3(lat, lon, z,id):
        # 使用aiohttp的ClientSession
        async with aiohttp.ClientSession() as session:
            parse = latlon_to_tile_bigger(lat,lon,z+2)
            tasks = [fetch(session,i[0],i[1],z+2) for i in parse]
            results = await asyncio.gather(*tasks)
            bigger_image = combine_tiles(byte2img(results))
            bigger_image.save(f"data/{z}/{int(id)}.jpg")
    
    async def run_queqe(data):
        async with aiohttp.ClientSession() as session:
            tasks = [run_5X5(session,i[1],i[2],z,i[0]) for i in data for z in range(4,20,3)]
            await asyncio.gather(*tasks)
    # 传入参数x, y, z
    # lat = 17.03617016
    
    # lon = 95.55530584
    
    # z = 17
    # for z in range(5,21,3):
    #     asyncio.run(run_5X5(lat,lon,z))
    data = [
        [1,53.32936106,44.832726],
    [2,40.453126	,-119.334174],
    [3,60.64325302	,9.051189134],
    [4,17.1758181	,77.88178872],
    [5,32.02595871	,-93.53468712],
    [6,-23.97027904	,-53.6356253],
    
    ]
    from tqdm import tqdm
    import pandas as pd
    import time
    import random
    df = pd.read_csv("test.csv")
    value  = df[['id','latitude','longitude']].values.tolist()
    del df
    for i in range(len(value)//10000):
        datatemp = value[i*10000:i*10000+1000]
        print(f"第{i}轮开始")
        start_time = time.time()
    
        for j in tqdm(range(1000//10)):
            asyncio.run(run_queqe(datatemp[j*10:j*10+10]))
        datatemp = value[i*10000+1000:i*10000+3000]
        for j in tqdm(range(2000//20)):
            asyncio.run(run_queqe(datatemp[j*20:j*20+20]))
        datatemp = value[i*10000+3000:i*10000+7000]
        for j in tqdm(range(4000//50)):
            asyncio.run(run_queqe(datatemp[j*50:j*50+50]))
        datatemp = value[i*10000+7000:(i+1)*10000]
        for j in tqdm(range(3000//20)):
            asyncio.run(run_queqe(datatemp[j*20:j*20+20]))
        sleep_duration = random.randint(180, 300)
        end_time = time.time()
        elapsed_time = end_time - start_time
        print(f"第{i}轮结束,执行时间{elapsed_time}秒,将睡眠{sleep_duration}秒")
        time.sleep(sleep_duration)
        
    
    
    

    作者:navigateException

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python异步爬虫:高并发、从经纬度到谷歌地图瓦片获取、拼接

    发表回复