Python并行–使用 ArcPy 进行多进程处理
一、需求背景
在处理或分析大规模GIS数据时,使用并行计算技术是一种有效的解决方案。结合ArcPy的丰富地理处理工具与开源库,如Geopandas和Shapely,可以灵活地满足复杂需求并实现高效开发。该技术路线充分发挥了ArcPy的功能,同时利用Geopandas和Shapely的优势,能够高效地处理和分析大规模空间数据。然而,在实际开发中,可能会遇到以下挑战:
1.如何在ArcPy中实现多进程处理;
2.如何在同一脚本中同时使用ArcPy和开源库;
3.如何在Pro的Python工具箱中实现多进程处理(★★★★★)。
针对这些问题,需要探索具体的解决方案,以确保在高效性与功能性之间取得平衡。
在对以上内容进行展开介绍前,先快速补充一下相关知识。
二、多进程和并发处理
2.1 纯 Python 中的多进程和并发处理
多进程是一种编程模型,它允许一个程序同时执行多个独立的任务或计算过程。在多进程模型中,一个程序可以创建多个进程,每个进程都有自己的内存空间和资源,这些进程可以并行地执行不同的任务。这种模型特别适用于利用多核处理器或多台计算机的能力来加速程序的执行速度。
2.1.1 什么是多进程?
多进程是指在一个程序中创建多个独立的进程来并行执行任务的能力。每个进程都是一个独立的执行单元,具有自己的私有内存空间、堆栈和数据段。这意味着每个进程都可以独立地执行自己的代码,并且不会影响其他进程的状态。
多进程的优势:
-
资源隔离:每个进程都有自己的内存空间,这使得进程之间的资源隔离更加容易实现,从而降低了数据竞争的风险。
-
稳定性:如果一个进程崩溃或出现问题,其他进程通常不受影响,这提高了整个程序的稳定性和可靠性。
-
并行计算:在多核处理器上,多进程可以充分利用多个核心,从而提高程序的执行效率和响应速度。
-
易于管理和扩展:由于进程之间相对独立,可以更容易地管理每个进程,并且可以根据需要轻松地添加更多的进程来扩展程序。
多进程的挑战:
-
通信成本:进程间通信通常比线程间通信更昂贵,因为它们需要通过操作系统提供的机制来进行通信,例如管道、套接字或共享内存。
-
上下文切换:虽然多进程可以利用多核处理器,但在不同进程之间切换也会消耗一定的时间和资源。
-
资源管理:每个进程都需要分配和管理自己的资源,这可能会导致额外的资源消耗和管理复杂度。
多进程与多线程的区别:
多线程:在一个进程中创建多个线程,共享相同的内存空间。线程之间的通信更快,但资源隔离较差,可能导致数据竞争。
多进程:创建多个独立的进程,每个进程都有自己的内存空间。进程间的通信较慢,但资源隔离更好,更稳定。
实际应用:
在实际应用中,多进程经常被用于处理那些可以并行化的工作负载,例如大规模的数据处理、模拟计算、网络服务等。例如,在科学计算中,多进程可以用来并行处理大型数据集,显著加快处理速度。
2.1.2 什么不是多进程?
不属于多进程的情况:
串行程序
在一个进程中一次执行一条指令。
指令从开始执行到结束。
程序崩溃会导致整个程序终止。
需要重新开始执行。
失败则关闭程序。
并发程序
在一个进程中同时执行多条指令。
指令可能“休眠”以让其他指令运行。
任何时候只有一条指令在执行。
分散式程序
网络上的多台计算机。
进程/指令在网络中运行。
由一个中央“枢纽”控制。
分布式程序
网络上的多台计算机。
进程/指令在网络中运行。
每台计算机独立运行。
2.2 Python 多进程理念
替换循环为并行迭代:
概念:在串行程序中,循环通常用于重复执行某项任务。在多进程环境下,可以通过将循环中的任务分配给不同的进程来实现并行处理。
实践:使用 multiprocessing.Pool
中的 map
或 imap
方法可以很容易地将循环操作转换为并行迭代。
替换集合为迭代器/生成器:
概念:在多进程场景下,使用迭代器和生成器可以节省内存并提高效率,因为它们按需产生数据而不是一次性加载所有数据到内存中。
实践:可以使用生成器表达式或者定义一个生成器函数来创建迭代器。
结合多进程和并发:
概念:除了多进程之外,还可以利用多线程或异步 I/O(如 asyncio)来进一步提高程序的并发能力。
实践:可以使用 concurrent.futures
模块来轻松地结合多进程和多线程。
并行函数与并发指令:
概念:将原本顺序执行的函数并行化,同时确保指令能够并发执行。
实践:使用 multiprocessing.Process
或 multiprocessing.Pool
来启动并行任务。
容错性:
概念:即使某个子进程发生错误也不会导致整个应用程序崩溃。
实践:利用 multiprocessing
模块提供的异常处理机制,比如使用 Process
的 join()
方法来捕获异常。
受限于输入或“映射”函数:
概念:控制并行任务的执行速度,通常由输入数据流或处理数据的函数决定。
实践:使用 multiprocessing.Queue
或 multiprocessing.Pipe
来控制数据流和任务调度。
验证数据,并发送到可用的 CPU 上进行处理:
概念:确保传入的数据有效,并将其分发给空闲的处理器。
实践:可以在主进程中进行数据验证,并使用 multiprocessing.Pool
的 apply_async
方法来异步地提交任务。
两种形式的输出:
离散返回:每个并行任务完成后立即返回结果,适用于不需要合并结果的情况。
聚合“减少”:将多个任务的结果合并成单一结果,通常用于需要汇总分析的情况。
实践:对于离散返回,可以直接使用 imap
或 imap_unordered
;对于聚合减少,可以使用 reduce
函数或类似方法来合并结果。
2.3 Python 模块
2.3.1 threading
核心开发者:通常只有在非常熟悉 Python 内部机制的核心开发者才会考虑使用 threading
模块。
全局解释器锁 (Global Interpreter Lock, GIL):在 CPython 解释器中,GIL 是一种同步机制,它确保了同一时刻只有一个线程可以执行 Python 字节码。这意味着即使在多核处理器上,由单个 python.exe
控制的两个线程也不能同时运行 Python 代码。
实践:threading
模块适合用于 I/O 密集型任务,如网络请求、文件读写等,而不适合 CPU 密集型任务,因为在 CPU 密集型任务中 GIL 会成为瓶颈。
除非有非常明确的原因才使用
2.3.2 multiprocessing
不受 GIL 问题的影响:multiprocessing
模块通过创建多个 Python 进程来绕过 GIL 的限制,这意味着每个进程都有自己的 Python 解释器和内存空间,因此不受 GIL 的影响。
操作系统负责 python.exe
的线程调度:每个进程中的线程调度由操作系统负责,这意味着在多核处理器上,不同的进程可以并行执行。
实践:multiprocessing
模块非常适合 CPU 密集型任务,因为它可以让多个 CPU 核心同时工作。它也适用于那些需要在多个进程中并行处理数据的任务。
创建多个 python.exe
实例
2.3.3 subprocess
串行或并行:subprocess
模块可以用来启动外部程序(非 Python 程序),这些程序可以在串行或并行模式下运行。
回调允许子进程并行运行:通过使用 subprocess.Popen
或 concurrent.futures
中的 ProcessPoolExecutor
,可以实现子进程的并行执行。例如,可以通过回调函数来处理子进程的输出,或者使用 concurrent.futures
来并行启动多个子进程。
实践:subprocess
模块非常适合需要调用外部程序或脚本的情况,比如执行系统命令、调用编译器、运行其他语言编写的程序等。通过这种方式,可以在 Python 程序中集成多种外部工具和服务。
用于启动非 python.exe
进程
2.3.4 Twisted
开源 Python 包:Twisted 是一个开源的 Python 库,它能够帮助你管理来自 threading
模块的线程,简化并发编程。
设计用于处理 I/O 并发:Twisted 被设计用于处理 I/O 密集型任务,特别是网络通信。它提供了一个高性能的事件循环,使你可以编写非阻塞的网络服务器和客户端。
Twisted 替你管理 threading
中的线程
2.3.5 asyncio
设计用于处理 I/O 并发:与 Twisted 类似,asyncio
也是专为 I/O 密集型任务设计的,但它是基于协程的,使你能够编写简洁且高效的异步代码。
全新!完全接受于 Python 3.6.0:虽然 asyncio
最初是在 Python 3.4 版本中引入的,但在 Python 3.6.0 版本中得到了进一步的发展和完善,并被广泛接受和使用。从那时起,asyncio
成为了编写异步代码的标准方式。
纯 Python 包:asyncio
是一个内置于 Python 标准库中的包,用于编写单线程并发代码。
2.4 Python并行示例
示例1:multiprocessing
这段代码演示了如何使用 multiprocessing
模块来并行处理任务,并使用 tqdm
来显示每个任务的进度。
import multiprocessing
import time
from tqdm import trange
def mp_worker(inputs):
i = inputs[0]
d, t = inputs[1][0], inputs[1][1]
for _ in trange(20, desc=d, position=i):
time.sleep(t)
def mp_handler():
data = list(enumerate([['a', 2], ['b', 4], ['c', 1], ['d', 8]]))
p = multiprocessing.Pool(4)
p.map(mp_worker, data)
if __name__ == "__main__":
mp_handler()
示例2:subprocess
这段代码演示了如何使用 subprocess
模块来执行 conda list
命令,并使用 tqdm
来显示进度条。
import os
import subprocess
import sys
import time
from tqdm import trange, tqdm
path = os.path.dirname(sys.executable) + os.sep + "scripts" + os.sep
sys.path.append(path)
exe = "conda.exe"
args = ["list", "--json"]
def sp_handler():
s = None
for i in trange(10, desc='Python.exe', position=1):
if i != 5:
time.sleep(1)
else:
s = subprocess.check_output([exe] + args)
l = s.splitlines()
for p in trange(len(l), desc='conda list', position=2):
tqdm.write(l[p].decode())
time.sleep(0.1)
if __name__ == "__main__":
sp_handler()
三、多进程与 GIS 中的并发处理
GIS 本质上比单纯的 Python 更加复杂,因为它涉及到地理学的知识和数据处理。将编程与地理学结合起来会带来一系列挑战:
挑战
投影数据:GIS 数据通常需要在不同的坐标系之间进行转换,这涉及到复杂的数学计算和投影变换。
用户界面考虑:GIS 应用程序往往需要直观且易于使用的图形用户界面,以便用户可以方便地浏览地图、查询数据和进行空间分析。
并发和多进程在 GIS 中的考虑因素
在 GIS 应用中使用并发和多进程时,还需要考虑以下几点:
数据一致性:在处理地理空间数据时,必须确保数据的一致性和完整性,尤其是在并发操作中。
资源管理:GIS 数据通常较大,因此在多进程或并发处理时需要有效地管理内存和其他资源。
空间索引:为了提高查询性能,GIS 数据通常需要建立空间索引,这在并发处理时也需要特别注意。
数据同步:当多个进程或线程同时访问 GIS 数据时,需要确保数据的同步,避免数据竞争。
图形渲染:GIS 应用通常需要实时渲染图形,这可能需要特殊的并发策略来优化渲染性能。
地理分析:地理分析通常涉及复杂的算法和大量的数据处理,使用多进程可以显著提高处理速度。
综上所述,在 GIS 领域使用多进程和并发处理时,需要综合考虑地理数据的特点、用户界面的需求以及并发编程带来的挑战。
3.1 多进程在 ArcGIS 中的应用
multiprocessing
模块提供了在给定机器上分配工作的功能,利用多核 CPU 和更大的系统内存。在 ArcGIS 中处理大量数据时,多进程可以在某些情况下提高性能和可扩展性。然而,在其他情况下,多进程可能会降低性能,甚至在某些情况下不应该使用。
示例1:处理大量数据集
第一个示例展示了在工作空间或一组工作空间中对大量数据集执行特定操作。当有大量的数据集时,利用多进程可以更快地完成任务。
以下代码演示了使用 multiprocessing
模块定义投影、添加字段并计算字段,适用于大量 shapefile 文件的简单模式。此 Python 代码将创建一个等于可用 CPU 或 CPU 核心数目的进程池,并使用该进程池处理特征类。
import os
import re
import multiprocessing
import arcpy
def update_shapefiles(shapefile):
'''Worker function'''
# 定义投影为 WGS84 -- 工厂代码为 4326
arcpy.management.DefineProjection(shapefile, 4326)
# 添加名为 CITY 的 TEXT 类型字段
arcpy.management.AddField(shapefile, 'CITY', 'TEXT')
# 计算字段 'CITY',从 shapefile 名称中去除 '_base'
city_name = shapefile.split('_base')[0]
city_name = re.sub('_', ' ', city_name)
arcpy.management.CalculateField(shapefile, 'CITY',
'"{0}"'.format(city_name.upper()), 'PYTHON')
def main():
''' Create a pool class and run the jobs.'''
# 设置工作空间
workspace = 'C:/GISData/USA/usa'
arcpy.env.workspace = workspace
fcs = arcpy.ListFeatureClasses('*')
fc_list = [os.path.join(workspace, fc) for fc in fcs]
pool = multiprocessing.Pool()
pool.map(update_shapefiles, fc_list)
# 同步主进程与作业进程以确保适当的清理
pool.close()
pool.join()
if __name__ == '__main__':
main()
示例2:处理具有大量特征的单个数据集
第二个示例着眼于在具有大量特征和记录的单个数据集上进行地理处理。在这种情况下,我们可以通过将数据分割成组并同时处理这些组来从多进程中受益。
import multiprocessing
import numpy
import arcpy
arcpy.env.overwriteOutput = True
def find_identical(oid):
'''Worker function to perform Find Identical, and return'''
'''a list of numpy arrays as the results.'''
# 创建 fishnet 中的特征层
tile = arcpy.management.MakeFeatureLayer(
'C:/testing/testing.gdb/fishnet',
'layer{0}'.format(oid[0]),
"""OID = {0}""".format((oid[0])))
# 获取特征层的范围并设置环境
tile_row = arcpy.da.SearchCursor(tile, 'shape@')
geometry = tile_row.next()[0]
arcpy.env.extent = geometry.extent
# 执行 Find Identical
identical_table = arcpy.management.FindIdentical(
'C:/testing/testing.gdb/Points',
'in_memory/identical', 'Shape')
# 将结果表转换为 numpy 数组并返回
result_array = arcpy.da.TableToNumPyArray(identical_table, ["*"])
return result_array
def main():
# 创建用于分组输入的 OID 列表
fishnet_rows = arcpy.SearchCursor(
'C:/testing/testing.gdb/fishnet', '', '', 'OID')
oids = [[row.getValue('OID')] for row in fishnet_rows]
# 创建进程池并运行任务
pool = multiprocessing.Pool()
result_arrays = pool.map(find_identical, oids)
# 合并结果数组并创建输出表
result_array = numpy.concatenate(result_arrays, axis=0)
arcpy.da.NumPyArrayToTable(result_array,
'C:/testing/testing.gdb/identical')
# 同步主进程与作业进程以确保适当的清理
pool.close()
pool.join()
if __name__ == '__main__':
main()
示例3:处理基于对象 ID 范围的数据
下面的例子展示了如何不依赖空间范围来分割数据,而是基于对象 ID 范围来处理数据。
import multiprocessing
import numpy
import arcpy
def generate_near_table(ranges):
i, j = ranges[0], ranges[1]
lyr = arcpy.management.MakeFeatureLayer(
'c:/testing/testing.gdb/random1mil',
'layer{0}'.format(i), """OID >= {0} AND
OID <= {1}""".format((i, j)))
gn_table = arcpy.analysis.GenerateNearTable(
lyr, 'c:/testing/testing.gdb/random300',
'in_memory/outnear{0}'.format(i))
result_array = arcpy.da.TableToNumPyArray(gn_table, ["*"])
arcpy.management.Delete(gn_table)
return result_array
def main():
ranges = [[0, 250000], [250001, 500000], [500001, 750000],
[750001, 1000001]]
# 创建进程池并运行任务
pool = multiprocessing.Pool()
result_arrays = pool.map(generate_near_table, ranges)
# 合并结果数组并创建输出表
result_array = numpy.concatenate(result_arrays, axis=0)
arcpy.da.NumPyArrayToTable(result_array, 'c:/testing/testing.gdb/gn3')
# 同步主进程与作业进程以确保适当的清理
pool.close()
pool.join()
if __name__ == '__main__':
main()
3.2 考虑因素
在决定使用多进程之前,请考虑以下重要因素:
在第一个示例所示的情况下,如果使用的是文件地理数据库中的要素类,则不会正常工作,因为每次更新都需要获取模式锁定,这会阻止任何其他进程同时更新文件地理数据库。但是,此示例可以适用于 shapefile 和 ArcSDE 地理数据库数据。
对于每个进程来说,加载 arcpy
库的启动成本约为 1-3 秒。根据数据的复杂性和大小,这可能会导致使用多进程的脚本运行时间比没有使用多进程的脚本更长。在许多情况下,多进程工作流的最后一部是聚合所有结果,这也是额外的成本。
判断多进程是否适用于您的工作流通常是一个试错的过程。虽然这种试错过程可能会使一次性操作中使用多进程的优势失效,但如果最终的工作流需要多次运行或应用于使用大量数据的类似工作流,则试错过程可能非常有价值。
尽可能利用 “in_memory” 工作空间来创建临时数据。这通常可以提高性能,而不是将数据写入磁盘。但是,根据在内存中创建的数据量,可能需要将临时数据写入磁盘。由于模式锁定,无法在文件地理数据库中创建临时数据集。完成使用内存中的数据集后删除它,可以防止内存溢出错误。
以上只是一些示例,展示了如何使用多进程来提高地理处理时的性能和可扩展性。然而,重要的是要记住多进程并不总是意味着更好的性能。
四、同一脚本中同时使用ArcPy和开源库
当有人想要开始使用 Python 进行空间分析或自动化涉及空间数据的任务时,通常会看到两种类型的人:
(a)拥有丰富编程经验但缺乏处理空间数据经验的数据科学家;
(b)对空间数据有着深厚知识但对 Python 缺乏经验的 GIS 从业人员。
在选择正确的 Python 工具包时,这两种人都可能选择不当的道路,后者更有可能这样做。
4.1 主流空间数据分析技术
-
GIS 软件:传统的 GIS 软件(如 ArcGIS 和 QGIS)提供了全面的空间分析功能,支持从数据管理到高级空间分析的各类任务。这些软件通常配有图形用户界面(GUI)和脚本编程接口,方便用户进行操作和自动化。
-
空间数据库:空间数据库(如 PostGIS、Oracle Spatial)提供了强大的空间数据存储和管理功能。它们支持复杂的空间查询和分析,并能处理大规模空间数据集。
-
Python 库:
-
Shapely:用于几何对象的创建和操作,提供了丰富的空间操作功能。
-
Fiona:用于读写空间数据格式,作为读取和写入空间数据的工具。
-
Geopandas:在 Pandas 的基础上扩展,提供了处理空间数据的能力,包括空间操作、数据转换和可视化。
-
Pyproj:用于空间坐标转换和投影变换。
4.2 ArcPy 与 Geopandas 的优缺点比较
ArcPy
ArcPy 是 ESRI 提供的 Python 库,用于与 ArcGIS 环境进行交互。它能够访问 ArcGIS 中的各种空间分析工具和功能。
优点:
强大的功能:ArcPy 能够调用 ESRI 的专业工具,支持复杂的空间分析和制图需求。
企业集成:适用于企业级 GIS 环境,能够处理和管理 ESRI geodatabase 格式的数据。
全面的支持:提供了广泛的功能支持,包括地图制图、空间分析和数据管理。
缺点:
复杂性:与 ArcGIS 的 GUI 操作相比,ArcPy 的编程接口显得复杂和笨重。频繁的磁盘读写和中间文件的创建使得操作不够高效。
闭源限制:依赖于 ESRI 的专有软件和数据格式,限制了与其他工具的集成和数据交换。
性能问题:处理大量数据时,性能表现不如现代开源工具高效。
本质上,使用 ArcPy 进行分析任务就像是运行无界面版本的 Pro。这与大多数 Python 工作流明显不同。有编程经验的人会觉得这种方法笨拙而尴尬。没有编程经验的人可能会认为编写脚本来解决问题得不偿失。
基本上,ArcPy 是为了模仿点击按钮的工作流程而创建的,而脚本的模式应该是完全不同的。ArcMap 以及 ArcGIS Pro 的许多功能旨在尽可能友好地为初学者服务,缺点是严重限制了高级用户。脚本是一种强大的方法,不应该被归结为次等的工作流程。
Geopandas
Geopandas 是一个开源 Python 库,扩展了 Pandas 的功能,以支持空间数据的处理和分析。它集成了多种开源工具,如 Fiona 和 Shapely,以实现空间数据的高效处理。
优点:
简洁高效:Geopandas 提供了类似 Pandas 的直观接口,支持在内存中进行高效的数据操作和分析。
开源和灵活:支持多种数据格式,如 GeoJSON、Shapefile 和 PostGIS,便于数据交换和集成。
现代工具集成:与 Python 的数据科学库(如 matplotlib 和 seaborn)兼容,便于数据可视化和进一步分析。
缺点:
功能限制:对于某些 ESRI 专有功能,Geopandas 可能缺乏直接的支持,特别是在高级空间分析方面。
学习曲线:尽管操作直观,但使用 Geopandas 需要一定的学习成本,特别是与其他数据科学库结合时。
代码示例
在下面的示例中,我将提供两项任务的对比:
-
简单缓冲区分析
-
属性表修改
可能有人会认为我只是挑选了一些让 ArcPy 看起来较差的例子。相反,我故意选择了这些例子,以便 ArcPy 的解决方案不至于过于复杂。如果出现更复杂的需求——比如从互联网导入数据、使用空间数据 API、合并来自不同来源的数据——那么 ArcPy 的示例将会更加繁琐。
示例 1: 缓冲区分析
此示例将涉及多个操作:
从磁盘读取空间数据集
将数据转换为不同的坐标参考系统
选择某些要素
创建缓冲区
保存输出
在 GeoPandas 中,工作流程如下:
import geopandas as gpd
# 从磁盘读取数据
crashes = gpd.read_file('/home/matt/Downloads/Crashes_Involving_Cyclists.geojson')
# 转换 x/y 数据
crashes_proj = crashes.to_crs(32119)
# 选择驾驶员超过一名的事故
crashes_mult = crashes_proj[crashes_proj['drivers'] > 1]
# 缓冲
crashes_buf = crashes_mult.buffer(500)
# 保存
crashes_buf.to_file('/home/matt/Downloads/crashes_buffer.shp')
在 ArcPy 中,工作流程如下:
import arcpy
from arcpy import env
# 设置工作空间以便创建中间文件
env.workspace = 'C:/Users/haffnerm/Downloads/'
# 从磁盘读取数据并转换
arcpy.management.Project('Crashes_Involving_Cyclists.shp',
'crashes_proj.shp',
arcpy.SpatialReference(32119))
# 选择司机数量大于 1 的事故
arcpy.Select_analysis('crashes_proj.shp',
'crashes_mult.shp',
'drivers > 1')
# 缓冲区
arcpy.Buffer_analysis('crashes_mult.shp',
'crashes_buf.shp',
'500 meters')
这两个示例在几个方面存在根本性差异。
首先,在 geopandas 示例中,中间操作在内存中处理,而在 arcpy 示例中,一切都写入磁盘。后者不仅速度较慢,还会创建许多不必要的中间文件,占用文件系统空间。尽管可以在 arcpy 中将数据保存为“内存”图层,但这并不是标准做法,几乎没有示例展示这种方法。即使不考虑磁盘和内存的差异,以下问题更为严重。
由于 arcpy 没有创建中间 Python 对象,因此必须使用 arcpy 提取数据的每一个方面,而不是使用 Python 中的方法或属性。这是非常不便的。例如,要检索行数、列数、字段名和 CRS 信息,可以在 geopandas 中直接使用:
# 行数
crashes_buf.shape[0]
# 列数
crashes_buf.shape[1]
# 字段名
crashes.columns
# CRS 信息
crashes.crs
而在 arcpy 中则需要这样做:
# 行数
len(arcpy.GetCount_management('crashes_buf.shp'))
# 列数
len(arcpy.ListFields('crashes_buf.shp'))
# 字段名
arcpy.ListFields('crashes_buf.shp')
# CRS 信息
arcpy.Describe('crashes_buf.shp').spatialReference
示例 2:属性表修改
pandas 和 geopandas 这些库的出现使数据分析变得更加简单。pandas 数据框设计用来模仿 R 风格的数据结构,使得用户能够利用矢量化操作,避免使用循环。例如,借用我们之前的数据集,假设用户想创建一个名为 injury_total
的字段,该字段将 type_a_injury
、type_b_injury
和 type_c_injury
列相加。在 geopandas 中,这可以这样完成:
import geopandas as gpd
# 从磁盘读取数据
crashes = gpd.read_file('/home/matt/Downloads/Crashes_Involving_Cyclists.geojson')
# 创建新列
crashes['injury_total'] = crashes['type_a_injury'] + crashes['type_b_injury'] + crashes['type_c_injury']
这非常简单。+ 操作符用于将值相加,新的列被“即时”创建。
然而,要在 arcpy 中完成相同的任务,必须执行以下步骤:
import arcpy
# 设置工作空间
arcpy.env.workspace = 'C:/Users/haffnerm/Downloads/'
# 添加新字段
arcpy.AddField_management('Crashes_Involving_Cyclists.shp', 'injury_total', 'LONG')
# 计算字段
arcpy.CalculateField_management('Crashes_Involving_Cyclists.shp', 'injury_total',
'!type_a_injury! + !type_b_injury! + !type_c_injury!', 'PYTHON3')
这个过程涉及更多的步骤和设置,比起 geopandas 的单行代码更加复杂。首先需要添加字段,然后再计算字段值。这种方法对于频繁需要修改或分析属性数据的场景非常繁琐。
在空间数据分析中,ArcPy 和 Geopandas 各有其优势和不足。ArcPy 强大且功能全面,适用于需要复杂分析和企业集成的场景,而 Geopandas 提供了简洁高效的操作方式,更适合快速分析和开放数据格式的处理。通过将两者结合使用,用户可以在保留现有工作流程的同时,充分利用开源工具的灵活性和效率,从而满足不同的分析需求。
4.3 ArcPy 与 Geopandas 等开源库共存
本文给出两种方式:
(1)克隆Pro的Python环境;
(2)在Pro的Python环境下,动态导入其他开源库;
方式一:
在 ArcGIS Pro 中克隆 Python 环境是一种常见的做法,用于确保你在进行开发或数据分析时所使用的 Python 环境与 ArcGIS Pro 内置的环境保持一致。这有助于避免因环境差异导致的问题,比如版本冲突或依赖项不匹配等。
可以通过3种方式实现Python环境的克隆:
(a)通过Pro 应用程序中包管理器中的克隆按钮实现克隆;
(b)通过直接拷贝C:\Program Files\ArcGIS\Pro\bin\Python\envs目录下(默认安装目录)的arcgispro-py3文件夹至任意目录,再将其添加到包管理器中;
(c)从命令提示符访问conda,并克隆Python环境;
之后,在Pro中激活克隆的环境,就可以安装其他所需要的Python包。(注意,若使用pycharm等集成开发环境时,开发环境中的python解释器必须与Pro中已激活的环境一致)
优点:
在此环境中导入安装的任意包或库,几乎不会有导入失败的情况,对于后期将脚本封装为python工具箱时很容易。且在python脚本中导入包没有导入顺序限制。
缺点:
由于pro中numpy和pandas等包版本一般较低,很多开源库只能安装很低的版本或直接安装不成功。
在克隆环境下开发的工具,分享给其他人时,比较繁琐;至少需要两步,一是复制整个克隆环境,二是需要在Pro中将活动环境设置为克隆的环境。
方式二(推荐):
在conda环境中,已安装指定开源库的环境下,如有一个“名为 my_env 的环境”,在该环境下的 ……\Lib\site-packages 目录内,拷贝安装完成的开源库到任意文件夹。
(1)可将该文件夹定义为一个python包(添加__init__.py),并在__init__.py文件中添加如下代码:
import os
import sys
# 获取当前模块所在目录的路径
module_dir = os.path.dirname(__file__)
# 将当前模块所在目录添加到sys.path中
sys.path.insert(0, module_dir)
通过from A import B的方式,利用import导入包的原理,动态将开源库导入python脚本中,实现与Arcpy的同时使用;
(2)获取当前文件夹的绝对路径,在python脚本导入包时,添加如下代码,实现开源库的导入。
import sys
th = r'D:\home1\myabcd'
sys.path.append(th)
import geopandas
优点:
可以安装任意想安装的库或包,可以使用到开源库中的最新特性和能力,并且能使用arcpy中的模块。
缺点:
由于Pro中numpy、pandas等包版本较低,安装的开源库中的numpy、pandas等包版本较高,在python脚本中,必须先导入arcpy、arcgis等包,再导入开源包,如:
import math
import re
import sys
import os
import arcpy
import time
th = r'D:\home1\myabcd'
sys.path.append(th)
import fiona
from shapely.geometry import shape, mapping
import geopandas as gpd
import pandas
import multiprocessing
from multiprocessing import Pool, cpu_count
properties_file_path = r"D:\home\mymodel"
sys.path.append(properties_file_path)
from pyhanlp import HanLP
但是,某些情况下,可能存在导入包失败。尤其是封装python工具箱时,检测语法常出现这种情况,这时候关闭pro重新开启一个pro项目都不好使,一般得重启电脑解决。
五、在Pro的Python工具箱中实现多进程处理
尽管你完成了在一个脚本文件中同时使用arcpy和开源库,并且你的脚本文件在执行多进程时,没有任何问题,但在Pro的Python工具箱中,执行该脚本文件中的函数或方法时,却不尽如意,主要问题如下:
(1)启动多个(与多进程中任务数一样)Pro应用程序,无法关闭,工具一直运行不会结束;
(2)启动多个命令行窗口,窗口无法关闭,工具一直运行不会结束;
(3)工具箱中的工具一直处于运行中,不会结束;
下面就每一种情况进行分析和举例说明。
5.1 启动多个Pro应用程序(不正确)
在Python工具箱中,工具执行时,默认使用的python解释器的路径是:
>>> import os, sys
>>> print(sys.executable)
C:\Program Files\ArcGIS\Pro\bin\ArcGISPro.exe
这意味着,多进程使用 ArcGISPro.exe
而不是 python.exe
来运行子进程。当你执行多进程任务时,会打开与进程任务数相同的Pro应用程序,且无法关闭,可以在任务管理器中看到,所有启动的Pro应用程序进程,占用的CPU都是0,也就是说,这些进程并没有执行任何任务。
简单的测试脚本ccc.py内容如下,直接在pycharm中运行没有问题:
from multiprocessing import Pool, cpu_count
def square(number):
"""计算给定数字的平方"""
result = 0
for _ in range(10000): # 增加循环次数来提高计算复杂度
result += number * number
return result
def test_multiprocessing_pool():
# 创建一个包含一些数字的列表
numbers = list(range(1, 10000))
# 使用 Pool 来并行处理这些数字
job_num = int(cpu_count() * 0.5)
with Pool(job_num) as pool:
results = pool.map(square, numbers)
# 打印结果
print("Squared numbers:", results)
return results
if __name__ == '__main__':
# 直接调用函数
test_multiprocessing_pool()
当然,在Pro的Python工具箱(pyt文件)中,你需要重新组织一下代码。
# -*- coding: utf-8 -*-
import arcpy
import sys
sys.path.append(r"D:\home1")
import ccc
from multiprocessing import Pool
class Toolbox(object):
def __init__(self):
"""Define the toolbox (the name of the toolbox is the name of the
.pyt file)."""
self.label = "Toolbox"
self.alias = "toolbox"
self.tools = [Tool]
class Tool(object):
def __init__(self):
"""Define the tool (tool name is the name of the class)."""
self.label = "Tool"
self.description = ""
self.canRunInBackground = False
def getParameterInfo(self):
"""Define parameter definitions"""
params = None
return params
def isLicensed(self):
"""Set whether tool is licensed to execute."""
return True
def updateParameters(self, parameters):
"""Modify the values and properties of parameters before internal
validation is performed. This method is called whenever a parameter
has been changed."""
return
def updateMessages(self, parameters):
"""Modify the messages created by internal validation for each tool
parameter. This method is called after internal validation."""
return
def execute(self, parameters, messages):
"""The source code of the tool."""
ccc.test_multiprocessing_pool()
return
def postExecute(self, parameters):
"""This method takes place after outputs are processed and
added to the display."""
return
运行工具后,首先启动了指定进程数量的Pro进程,然后不断打开Pro应用程序,运行过程如下:
5.2 启动多个命令行窗口(不正确)
在ArcMap中有一个选项可以禁用“在进程中”运行脚本的功能,但是在 ArcGIS Pro 中没有找到这样的选项。尝试将python解释器设置为Pro自带的conda环境下的python.exe
>>> import os, sys
>>> print(sys.executable)
C:\Program Files\ArcGIS\Pro\bin\ArcGISPro.exe
>>> print(os.path.join(sys.exec_prefix, 'python.exe'))
C:\Program Files\ArcGIS\Pro\bin\Python\envs\arcgispro-py3\python.exe
在导入multiprocessing包后,可以使用multiprocessing.set_executable()
函数来指定python.exe
的路径。
import os
import sys
import multiprocessing
multiprocessing.set_executable(os.path.join(sys.exec_prefix, 'python.exe'))
仅需在脚本ccc.py中导入包的部分增加设置指定python.exe
路径的逻辑,在pycharm中运行过程如下:
但是,您最终发现有几个python运行窗口打开了,并且这些窗口直到关闭 ArcGIS Pro 或工具运行完成才消失。
5.3 工具一直处于运行中,不会结束(不正确)
如果使用pythonw.exe
,它是为GUI应用程序设计的,不会打开Python控制台窗口。仅需在设置python解释器时将python.exe
改为pythonw.exe
multiprocessing.set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
某些场景下,这种方式运行的很好,比如多进程任务中未包含非空间计算时。但涉及空间计算或一些复杂的任务时,使用pythonw.exe
时,实例会显示在任务管理器中的ArcGIS Pro应用下(需要点击下拉菜单才能看到)。当程序卡住时,在任务管理器的CPU列中,每个实例的CPU使用率降至0%,内存显示一个值但保持不变。其他列也都降至零。随后,脚本工具在ArcGIS Pro中挂起并且无法继续;有时候这种挂起会持续数小时(甚至一夜)。
5.4 使用 subprocess 在独立进程中执行多进程脚本(正确)
使用命令行执行多进程脚本
从脚本工具背后的代码中使用subprocess
来调用这些模块,调用示例如下。
cmd = '"' + os.path.join(sys.exec_prefix, 'python.exe" "') + "C:\\temp\\ScriptUsingMultiprocessing.py"'
completed = subprocess.run(cmd, shell=True)
在命令行中执行的过程如下:
从任务管理器中可以看到,在windows命令处理程序下,启动了多个python进程。如下如所示:
在python工具箱中执行多进程脚本
编写一个名为justtest.py的测试脚本,该脚本的功能为使用NLP模型对兴趣点名称进行标注换行处理,涉及本文内容的关键关键部分主要包括:arcpy与开源库的同时使用,以及在"__main__"函数中获取参数并执行换行标注的函数。示例如下:
import math
import re
import sys
import os
import arcpy
th = r'D:\home1\myabcd'
sys.path.append(th)
import fiona
from shapely.geometry import shape, mapping
import geopandas as gpd
import pandas
import multiprocessing
multiprocessing.set_executable(os.path.join(sys.exec_prefix, 'python.exe'))
from multiprocessing import Pool, cpu_count
properties_file_path = r"D:\home\mymodel"
sys.path.append(properties_file_path)
from pyhanlp import HanLP
……
def set_label(fc_path, scratch_ws, lable_name):
"""
对标注字段进行简称处理
"""
fc_path = arcpy.Describe(fc_path).catalogPath
obj = GeoFC(fc_path)
gdf = obj.set_oid("geo_oid")
gdf['group_num'] = gdf['geo_oid'] % (cpu_count())
data_group = gdf.groupby('group_num', as_index=False)
groups = [(group, lable_name) for _, group in data_group]
# 处理换行标注
HanLP.Config.ShowTermNature = False
job_num = int(cpu_count() * 0.5)
# 使用 Pool 来并行处理这些数字
with Pool(3) as pool:
results = pool.map(cal_group, groups)
rdf = pandas.concat(results)
rdf.reset_index(drop=True, inplace=True)
rdf.drop(['geo_oid', 'group_num'], axis=1, inplace=True)
name = os.path.basename(fc_path).split(".")[0]
result_name = "{}_label".format(name)
# 写入到文件
rdf.to_file(f"{scratch_ws}\scratch.gdb",
layer=result_name, driver='OpenFileGDB', encoding='utf-8')
if __name__ == "__main__":
# 从命令行获取参数并运行并行任务
params = sys.argv[1:] # 获取命令行参数
set_label(*params)
在python工具箱中,构建命令行,并执行脚本,代码示例如下:
# -*- coding: utf-8 -*-
import arcpy
import subprocess
class Toolbox(object):
def __init__(self):
"""定义工具箱"""
self.label = "POI标注换行工具集"
self.alias = "toolbox"
# 工具类列表
self.tools = [LabelProperties]
class LabelProperties(object):
def __init__(self):
"""定义工具类"""
self.label = "POI标注换行"
self.description = "POI点根据语义换行工具"
self.canRunInBackground = False
self.category = "POI数据处理工具集"
def getParameterInfo(self):
"""定义参数"""
in_feature = arcpy.Parameter(displayName="输入要素",
name="in_feature_name",
datatype="GPFeatureLayer",
parameterType="Required",
direction="Input")
in_feature.filter.list = ["Point"]
in_feature.value = r"D:\7.python 工具箱\工具箱部署\3.测试数据\testdata.gdb\ds\宗教场所点"
f_name = arcpy.Parameter(displayName="名称",
name="Input name filed",
datatype="Field",
parameterType="Required",
direction="Input")
f_name.parameterDependencies = [in_feature.name]
f_name.filter.list = ['Text']
f_name.value = 'NAME'
scratch_ws = arcpy.Parameter(displayName="输出路径",
name="scratch_ws",
datatype="DEFolder",
parameterType="Required",
direction="Input")
scratch_ws.value = r"D:\tempdata"
return [in_feature, f_name, scratch_ws]
def isLicensed(self):
"""许可检查"""
return True
def updateParameters(self, parameters):
"""参数更新"""
return
def updateMessages(self, parameters):
"""信息更新"""
return
def execute(self, parameters, messages):
"""执行函数"""
in_feature = parameters[0].valueAsText
label_name = parameters[1].valueAsText
scratch_ws = parameters[2].valueAsText
arcpy.AddMessage("正在处理标注...")
# 构建命令行
python_exe = os.path.join(sys.exec_prefix, 'python.exe')
script_path = r'D:\home1\pystdtools1\justtest.py'
cmd = f'"{python_exe}" "{script_path}" "{in_feature}" "{scratch_ws}" "{label_name}"'
# 执行外部脚本
completed = subprocess.run(cmd, shell=True)
if completed.returncode == 0:
arcpy.AddMessage("标注处理完成")
else:
arcpy.AddError("标注处理失败")
arcpy.AddError(completed.stderr)
return
这样在Pro的python工具箱中,启动带有多进程的代码,代码执行没有错误,并且产生了预期的结果。
输出的结果,对NAME字段,按hanlp的分词结果,根据标注规则进行换行处理(通过添加英文逗号实现换行),如下图所示:
作者:craybb