Python Ray 扩展指南(三)
原文:
annas-archive.org/md5/95872ff5b3ec96901f7e3cfb51cd271f
译者:飞龙
协议:CC BY-NC-SA 4.0
第十一章:使用 Ray 与 GPU 和加速器
虽然 Ray 主要专注于水平扩展,但有时使用像 GPU 这样的特殊加速器可能比仅仅投入更多“常规”计算节点更便宜和更快。GPU 特别适合执行向量化操作,一次对数据块执行相同操作。机器学习,以及更广泛的线性代数,是一些顶级用例,¹ 因为深度学习极易向量化。
通常情况下,GPU 资源比 CPU 资源更昂贵,因此 Ray 的架构使得在必要时仅需请求 GPU 资源变得更加容易。要利用 GPU,您需要使用专门的库,并且由于这些库涉及直接内存访问,它们的结果可能并不总是可串行化的。在 GPU 计算世界中,NVIDIA 和 AMD 是两个主要选项,具有不同的集成库。
GPU 擅长什么?
并非每个问题都适合 GPU 加速。GPU 特别擅长同时在许多数据点上执行相同计算。如果一个问题非常适合向量化,那么 GPU 可能非常适合解决这个问题。
以下是从 GPU 加速中受益的常见问题:
机器学习
线性代数
物理学模拟
图形(这不奇怪)
GPU 不适合分支密集的非向量化工作流程,或者数据复制成本与计算成本相似或更高的工作流程。
构建模块
使用 GPU 需要额外的开销,类似于分发任务的开销(尽管速度稍快)。这些开销来自于数据串行化以及通信,尽管 CPU 和 GPU 之间的链接通常比网络链接更快。与 Ray 的分布式任务不同,GPU 没有 Python 解释器。相反,您的高级工具通常会生成或调用本机 GPU 代码。CUDA 和 Radeon Open Compute(ROCm)是与 GPU 交互的两个事实上的低级库,分别来自 NVIDIA 和 AMD。
NVIDIA 首先发布了 CUDA,并迅速在许多高级库和工具中获得了广泛应用,包括 TensorFlow。AMD 的 ROCm 起步较慢,并未见到同样程度的采纳。一些高级工具,包括 PyTorch,现在已经集成了 ROCm 支持,但许多其他工具需要使用特殊分支的 ROCm 版本,例如 TensorFlow(tensorflow-rocm)或 LAPACK(rocSOLVER)。
弄清楚构建模块可能会出人意料地具有挑战性。例如,在我们的经验中,让 NVIDIA GPU Docker 容器在 Linux4Tegra 上与 Ray 构建需要几天的时间。 ROCm 和 CUDA 库有支持特定硬件的特定版本,同样,您可能希望使用的更高级程序可能仅支持某些版本。如果您正在运行 Kubernetes 或类似的容器化平台,则可以从像 NVIDIA 的 CUDA 映像 或 AMD 的 ROCm 映像 这样的预构建容器开始受益,作为基础。
更高级的库
除非您有特殊需求,否则您可能会发现与为您生成 GPU 代码的更高级库一起工作最容易,例如基本线性代数子程序(BLAS)、TensorFlow 或 Numba。您应尝试将这些库安装在您正在使用的基础容器或机器映像中,因为它们在安装期间通常需要大量编译时间。
一些库,例如 Numba,执行动态重写您的 Python 代码。要使 Numba 对您的代码起作用,您需要在函数中添加装饰器(例如 @numba.jit
)。不幸的是,numba.jit
和其他对函数的动态重写在 Ray 中不受直接支持。相反,如果您使用此类库,只需像示例 11-1 中所示包装调用即可。
示例 11-1. 简单的 CUDA 示例
from numba import cuda, float32
# CUDA kernel
@cuda.jit
def mul_two(io_array):
pos = cuda.grid(1)
if pos < io_array.size:
io_array[pos] *= 2 # do the computation
@ray.remote
def remote_mul(input_array):
# This implicitly transfers the array into the GPU and back, which is not free
return mul_two(input_array)
注意
与 Ray 的分布式函数类似,这些工具通常会为您处理数据复制,但重要的是要记住移动数据进出 GPU 不是免费的。由于这些数据集可能很大,大多数库尝试在相同数据上执行多个操作。如果您有一个重复使用数据的迭代算法,则使用 actor 来持有 GPU 资源并将数据保留在 GPU 中可以减少此成本。
无论您选择哪个库(或者是否决定编写自己的 GPU 代码),都需要确保 Ray 将您的代码调度到具有 GPU 的节点上。
获取和释放 GPU 和加速器资源
您可以通过将 num_gpus
添加到 ray.remote
装饰器来请求 GPU 资源,与内存和 CPU 的方式类似。与 Ray 中的其他资源(包括内存)一样,Ray 中的 GPU 不能保证,并且 Ray 不会自动为您清理资源。虽然 Ray 不会自动为您清理内存,但 Python(在一定程度上)会,这使得 GPU 泄漏比内存泄漏更有可能。
许多高级库在 Python VM 退出之前不会释放 GPU。您可以在每次调用后强制 Python VM 退出,从而释放任何 GPU 资源,方法是在您的 ray.remote
装饰器中添加 max_calls=1
,如示例 11-2 所示。
示例 11-2. 请求和释放 GPU 资源
# Request a full GPU, like CPUs we can request fractional
@ray.remote(num_gpus=1)
def do_serious_work():
# Restart entire worker after each call
@ray.remote(num_gpus=1, max_calls=1)
def do_serious_work():
重新启动的一个缺点是它会移除你在 GPU 或加速器中重用现有数据的能力。你可以通过使用长生命周期的 actors 来解决这个问题,但这会牺牲资源在这些 actors 中的锁定。
Ray 的机器学习库
你也可以配置 Ray 的内置机器学习库来使用 GPU。为了让 Ray Train 启动 PyTorch 使用 GPU 资源进行训练,你需要在Trainer
构造函数调用中设置use_gpu=True
,就像你配置工作进程数量一样。Ray Tune 为资源请求提供了更大的灵活性,你可以在tune.run
中指定资源,使用与ray.remote
相同的字典。例如,要在每次试验中使用两个 CPU 和一个 GPU,你会调用tune.run(trainable, num_samples=10, resources_per_trial={"cpu": 2, "gpu": 2})
。
带有 GPU 和加速器的自动缩放器
Ray 的自动缩放器具有理解不同类型节点并选择基于请求资源调度的能力。这在 GPU 方面尤为重要,因为 GPU 比其他资源更昂贵(且供应更少)。在我们的集群中,由于只有四个带有 GPU 的节点,我们配置自动缩放器如下(https://oreil.ly/juA4y):
imagePullSecrets: []
# In practice you _might_ want an official Ray image
# but this is for a bleeding-edge mixed arch cluster,
# which still is not fully supported by Ray's official
# wheels & containers.
image: holdenk/ray-ray:nightly
operatorImage: holdenk/ray-ray:nightly
podTypes:
rayGPUWorkerType:
memory: 10Gi
maxWorkers: 4
minWorkers: 1
# Normally you'd ask for a GPU but NV auto labeler is...funky on ARM
CPU: 1
rayResources:
CPU: 1
GPU: 1
memory: 1000000000
nodeSelector:
node.kubernetes.io/gpu: gpu
rayWorkerType:
memory: 10Gi
maxWorkers: 4
minWorkers: 1
CPU: 1
rayHeadType:
memory: 3Gi
CPU: 1
这样,自动缩放器可以分配不带 GPU 资源的容器,从而使 Kubernetes 能够将这些 pod 放置在仅 CPU 节点上。
CPU 回退作为一种设计模式
大多数可以通过 GPU 加速的高级库也具有 CPU 回退功能。Ray 没有内置的表达 CPU 回退或“如果可用则使用 GPU”的方法。在 Ray 中,如果你请求一个资源,调度程序找不到它,并且自动缩放器无法为其创建一个实例,该函数或 actor 将永远阻塞。通过一些创意,你可以在 Ray 中构建自己的 CPU 回退代码。
如果你希望在集群有 GPU 资源时使用它们,并在没有 GPU 时回退到 CPU,你需要做一些额外的工作。确定集群是否有可用的 GPU 资源的最简单方法是请求 Ray 运行一个带有 GPU 的远程任务,然后基于此设置资源,如示例 11-3 所示。
示例 11-3. 如果不存在 GPU,则回退到 CPU
# Function that requests a GPU
@ray.remote(num_gpus=1)
def do_i_have_gpus():
return True
# Give it at most 4 minutes to see if we can get a GPU
# We want to give the autoscaler some time to see if it can spin up
# a GPU node for us.
futures = [do_i_have_gpus.remote()]
ready_futures, rest_futures = ray.wait(futures, timeout=240)
resources = {"num_cpus": 1}
# If we have a ready future, we have a GPU node in our cluster
if ready_futures:
resources["num_gpus"] =1
# "splat" the resources
@ray.remote(** resources)
def optional_gpu_task():
你使用的任何库也需要回退到基于 CPU 的代码。如果它们不会自动这样做(例如,根据 CPU 与 GPU 调用不同的两个函数,如mul_two_cuda
和mul_two_np
),你可以通过传递一个布尔值来指示集群是否有 GPU。
警告
如果 GPU 资源没有得到正确释放,这仍可能导致在 GPU 集群上失败。理想情况下,你应该修复 GPU 释放问题,但在多租户集群上,这可能不是一个选项。你还可以在每个函数内部尝试/捕获获取 GPU 的异常。
其他(非 GPU)加速器
尽管本章的大部分内容集中在 GPU 加速器上,但相同的一般技术也适用于其他类型的硬件加速。例如,Numba 能够利用特殊的 CPU 特性,而 TensorFlow 则可以利用张量处理单元(TPU)。在某些情况下,资源可能不需要代码更改,而只是通过相同的 API 提供更快的性能,例如具有非易失性存储器快速执行(NVMe)驱动器的机器。在所有这些情况下,您可以配置自动扩展器来标记并使这些资源可用,方式与 GPU 类似。
结论
GPU 是在 Ray 上加速某些工作流程的绝佳工具。虽然 Ray 本身没有用于利用 GPU 加速代码的钩子,但它与各种您可以用于 GPU 计算的库集成良好。许多这些库并非为共享计算而创建,因此需要特别注意意外资源泄漏,尤其是由于 GPU 资源往往更昂贵。
¹ 其他顶级用例之一是加密货币挖矿,但你不需要像 Ray 这样的系统。使用 GPU 进行加密货币挖矿导致需求增加,许多显卡售价高于官方价格,而 NVIDIA 已经在尝试用其最新的 GPU 抑制加密货币挖矿。
第十二章:Ray 在企业中
在企业环境中部署软件通常需要满足额外的要求,特别是在安全方面。企业部署往往涉及多个利益相关者,并且需要为更大的科学家/工程师群体提供服务。虽然不是必需的,但许多企业集群往往具有某种形式的多租户性质,以允许更有效地利用资源(包括人力资源,如运营人员)。
Ray 依赖项安全问题
不幸的是,Ray 的默认要求文件引入了一些不安全的库。许多企业环境都有某种容器扫描或类似系统来检测此类问题¹。在某些情况下,您可以简单地删除或升级标记的依赖项问题,但当 Ray 将依赖项包含在其 wheel 中时(例如,Apache Log4j 问题),限制自己使用预构建的 wheel 会有严重的缺点。如果发现 Java 或本地库有问题,则需要使用升级版本从源代码重新构建 Ray。Derwen.ai 在其ray_base repo中有一个关于在 Docker 中执行此操作的示例。
与现有工具进行交互
企业部署通常涉及与现有工具及其产生的数据的交互。此处进行集成的一些潜在点包括使用 Ray 的数据集通用 Arrow 接口与其他工具交互。当数据处于“静止”状态时,Parquet 是与其他工具交互的最佳格式。
使用 Ray 与 CI/CD 工具
在大型团队中工作时,持续集成和交付(CI/CD)是项目有效协作的重要组成部分。使用 Ray 与 CI/CD 的最简单选择是在本地模式下使用 Ray,并将其视为正常的 Python 项目。另外,您可以通过使用 Ray 的作业提交 API 提交测试作业并验证结果。这可以让您测试超出单台计算机规模的 Ray 作业。无论您使用 Ray 的作业 API 还是 Ray 的本地模式,都可以使用 Ray 与任何 CI/CD 工具和虚拟环境。
与 Ray 进行身份验证
Ray的默认部署使您可以轻松入门,因此,在客户端和服务器之间没有任何身份验证。这种缺乏身份验证意味着任何能连接到您的 Ray 服务器的人都可能提交作业并执行任意代码。通常,企业环境需要比默认配置提供的更高级别的访问控制。
Ray 的 gRPC 端点(而不是作业服务器)可以配置为在客户端和服务器之间进行互相认证的传输层安全性(TLS)。Ray 在客户端和头节点之间以及工作节点之间使用相同的 TLS 通信机制。
警告
Ray 的 TLS 实现要求客户端具有私钥。您应该将 Ray 的 TLS 实现视为类似于共享密钥加密,但速度较慢。
另一种选项是使用作业服务器,将端点保持不安全,但限制可以与端点通信的人。这可以通过入口控制器、网络规则甚至作为虚拟私有网络(VPN)的集成部分来完成,例如Tailscale 的 Grafana RBAC 规则示例。幸运的是,Ray 的仪表板——以及作业服务器端点——已绑定到localhost/127.0.0.1,并在 8265 端口上运行。例如,如果你在 Kubernetes 上使用 Traefik 作为入口,你可以像这样通过基本身份验证暴露作业 API:
apiVersion: traefik.containo.us/v1alpha1
kind: Middleware
metadata:
name: basicauth
namespace: ray-cluster
spec:
basicAuth:
secret: basic-auth
-
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: longhorn-ingress
namespace: longhorn-system
annotations:
traefik.ingress.kubernetes.io/router.entrypoints: websecure
traefik.ingress.kubernetes.io/router.tls.certresolver: le
traefik.ingress.kubernetes.io/router.tls: "true"
kubernetes.io/ingress.class: traefik
traefik.ingress.kubernetes.io/router.middlewares: ba-ray-cluster@kubernetescrd
spec:
rules:
- host: "mymagicendpoints.pigscanfly.ca"
http:
paths:
- pathType: Prefix
path: "/"
backend:
service:
name: ray-head-svc
port:
number: 8265
依赖于限制端点访问的方式存在一个缺点,即任何可以访问该计算机的人都可以向你的集群提交作业,因此对于共享计算资源效果不佳。
Ray 上的多租户
在箱外,Ray 集群支持多个运行作业。当所有作业都来自同一用户且你不关心隔离作业时,你不需要考虑多租户的影响。
在我们看来,Ray 的租户隔离相对于其他部分来说发展不足。Ray 通过将独立的工作节点绑定到作业来实现每个用户的多租户安全,从而降低了不同用户之间意外信息泄露的机会。与 Ray 的执行环境一样,你的用户可以安装不同的 Python 库,但 Ray 不会隔离系统级库(例如 CUDA)。
我们认为 Ray 中的租户隔离就像门上的锁。它的存在是为了保持诚实的人诚实,并防止意外泄露。然而,像命名的演员这样的命名资源可以从任何其他作业中调用。这是命名演员的预期功能,但由于 Ray 经常使用 cloudpickle,你应该考虑任何命名演员都有允许同一集群上的恶意演员执行任意代码的潜力。
警告
命名资源会破坏 Ray 的租户隔离。
虽然 Ray 对多租户有一些支持,但我们建议部署多租户 Kubernetes 或 Yarn 集群。多租户很好地引出了为数据源提供凭据的下一个问题。
数据源的凭据
多租户使得数据源的凭据变得复杂,因为你不能依赖基于实例的角色/配置。通过向运行环境添加env_vars
,你可以在整个作业中指定凭据。理想情况下,你不应该在源代码中硬编码这些凭据,而是从类似 Kubernetes 秘钥中获取并传播这些值:
ray.init(
runtime_env={
"env_vars": {
"AWS_ACCESS_KEY_ID": "key",
"AWS_SECRET_ACCESS_KEY": "secret",
}
}
)
你还可以使用相同的技术为每个函数分配凭据(例如,如果只有一个演员应该具有写权限),通过分配带有.option
的运行环境。然而,在实践中,跟踪这些单独的凭据可能会成为一个头疼的问题。
永久与临时集群
部署 Ray 时,您必须选择永久集群还是瞬时集群。对于永久集群,多租户问题和确保自动缩放器能够缩小(例如,没有悬空资源)尤为重要。然而,随着越来越多的企业采用 Kubernetes 或其他云原生技术,我们认为瞬时集群的吸引力将增加。
瞬时集群
瞬时集群有许多好处。最重要的两个好处是低成本和不需要多租户集群。瞬时集群允许在计算结束时完全释放资源。通过提供瞬时集群,您可以避免多租户问题,这可以减少操作负担。瞬时集群使得试验新版本的 Ray 和新的本地库相对轻量化。这也可以防止强制迁移带来的问题,每个团队可以运行自己的 Ray 版本。⁴
瞬时集群在做出选择时也有一些您应该注意的缺点。最明显的两个缺点是需要等待集群启动,以及在应用程序启动时间之上,不能在集群上使用缓存/持久性。启动瞬时集群取决于能够分配计算资源,这取决于您的环境和预算,可能需要从几秒到几天的时间(在云问题期间)。如果您的计算依赖于大量状态或数据,每次在新集群上启动应用程序时,它都会先读取大量信息,这可能会相当慢。
永久集群
除了成本和多租户问题之外,永久集群还带来了额外的缺点。永久集群更容易积累配置“残留物”,当迁移到新集群时可能更难重新创建。随着基础硬件老化,这些集群随时间变得更加脆弱。即使在云中,长时间运行的实例越来越可能遇到故障。永久集群中的长期资源可能最终会包含需要基于监管原因清除的信息。
永久集群还有一些重要的好处,可以很有用。从开发者的角度来看,一个优势是能够拥有长期存在的参与者或其他资源。从运营的角度来看,永久集群不需要同样的启动时间,因此如果需要执行新任务,你不必等待集群变得可用。表 12-1 总结了瞬时和永久集群之间的差异。
表 12-1. 瞬时和永久集群比较表
瞬时/瞬时集群 | 永久集群 | |
---|---|---|
资源成本 | 通常较低,除非运行时,工作负载可以进行二进制打包或在用户之间共享资源 | 当资源泄漏阻止自动缩放器缩减时成本较高 |
库隔离 | 灵活(包括本地) | 仅在 venv/Conda 环境级别隔离 |
尝试新版本 Ray 的能力 | 是,可能需要针对新 API 进行代码更改 | 开销较大 |
最长 actor 生命周期 | 短暂(与集群一起) | “永久”(除非集群崩溃/重新部署) |
共享 actors | 否 | 是 |
启动新应用程序的时间 | 可能较长(依赖云) | 可变(如果集群具有几乎即时的备用容量;否则,依赖于云) |
数据读取摊销 | 否(每个集群必须读取任何共享数据集) | 可能(如果结构良好) |
使用短暂集群或永久集群的选择取决于您的用例和要求。在某些部署中,短暂集群和永久集群的混合可能提供正确的权衡。
监控
随着您组织中 Ray 集群的规模或数量增长,监控变得越来越重要。Ray 通过其内部仪表板或 Prometheus 提供内置的度量报告,尽管 Prometheus 默认情况下是禁用的。
注意
安装 ray[default]
时会安装 Ray 的内部仪表板,但仅安装 ray
不会。
当您独自工作或调试生产问题时,Ray 的仪表板非常出色。如果安装了仪表板,Ray 将打印一条包含指向仪表板的链接的信息日志(例如,在 http://127.0.0.1:8265 查看 Ray 仪表板
)。此外,ray.init
的结果包含 webui_url
,指向度量仪表板。然而,Ray 的仪表板无法创建警报,因此仅在您知道出现问题时才有帮助。Ray 的仪表板 UI 正在 Ray 2 中升级;图 12-1 显示旧版仪表板,而 图 12-2 显示新版仪表板。
https://github.com/OpenDocCN/ibooker-python-zh/raw/master/docs/scl-py-ray/img/spwr_1201.png
图 12-1. 旧版(2.0 之前)Ray 仪表板
https://github.com/OpenDocCN/ibooker-python-zh/raw/master/docs/scl-py-ray/img/spwr_1202.png
图 12-2. 新的 Ray 仪表板
如您所见,新仪表板并非自然演化而来;相反,它是经过有意设计的,并包含新信息。两个版本的仪表板均包含有关执行器进程和内存使用情况的信息。新仪表板还具有用于通过 ID 查找对象的 Web UI。
警告
仪表板不应公开,作业 API 使用相同的端口。
Ray 的指标也可以导出到 Prometheus,Ray 默认会选择一个随机端口。您可以通过查看 ray.init 的结果中的 metrics_export_port
来找到端口,或者在启动 Ray 的主节点时指定一个固定的端口 --metrics-export-port=
。Ray 与 Prometheus 的集成不仅提供了与 Grafana 等指标可视化工具的集成(见图 12-3),而且在某些参数超出预定范围时添加了警报功能。
https://github.com/OpenDocCN/ibooker-python-zh/raw/master/docs/scl-py-ray/img/spwr_1203.png
图 12-3. Ray 的示例 Grafana 仪表板⁵
要获取导出的指标,需要配置 Prometheus 来抓取哪些主机或 Pod。对于静态集群的用户,只需提供一个主机文件即可;但对于动态用户,有多种选择。Kubernetes 用户可以使用pod monitors配置 Prometheus 的 Pod 抓取。由于 Ray 集群没有统一的标签适用于所有节点,因此这里我们使用了两个 Pod Monitor——一个用于主节点,一个用于工作节点。
非 Kubernetes 用户可以使用 Prometheus 的file-based discovery,使用 Ray 在主节点自动生成的文件*/tmp/ray/prom_metrics_service_discovery.json*。
除了监控 Ray 本身,您还可以在 Ray 内部对代码进行仪表化。您可以将自己的指标添加到 Ray 的 Prometheus 指标中,或者与 OpenTelemetry 集成。正确的指标和仪表化主要取决于您的组织其余部分的使用情况。比较 OpenTelemetry 和 Prometheus 超出了本书的范围。
使用 Ray 指标仪表化您的代码
Ray 的内置指标很好地报告了集群的健康状况,但我们通常关心的是应用程序的健康状况。例如,由于所有作业都处于停滞状态而导致的低内存使用的集群在集群级别看起来很好,但我们实际关心的(为用户提供服务、训练模型等)并没有发生。幸运的是,您可以向 Ray 添加自己的指标来监视应用程序的使用情况。
Tip
您添加到 Ray 指标的指标会像 Ray 的内置指标一样暴露为 Prometheus 指标。
Ray 指标支持在 ray.util.metrics
内的 counter, gauge 和 histogram 指标类型。这些指标对象不可序列化,因为它们引用了 C 对象。在记录任何值之前,您需要明确创建该指标。在创建新指标时,可以指定名称、描述和标签。一个常用的标签是指标在 actor 内部使用的 actor 名称,用于 actor 分片。由于它们不可序列化,您需要将它们要么创建并在 actors 内使用,如 Example 12-1,要么使用 lazy singleton 模式,如 Example 12-2。
Example 12-1. 在 actor 内部使用 Ray 计数器
# Singleton for reporting a Ray metric
@ray.remote
class MySpecialActor(object):
def __init__(self, name):
self.total = 0
from ray.util.metrics import Counter, Gauge
self.failed_withdrawls = Counter(
"failed_withdrawls", description="Number of failed withdrawls.",
tag_keys=("actor_name",), # Useful if you end up sharding actors
)
self.failed_withdrawls.set_default_tags({"actor_name": name})
self.total_guage = Gauge(
"money",
description="How much money we have in total. Goes up and down.",
tag_keys=("actor_name",), # Useful if you end up sharding actors
)
self.total_guage.set_default_tags({"actor_name": name})
self.accounts = {}
def deposit(self, account, amount):
if account not in self.accounts:
self.accounts[account] = 0
self.accounts[account] += amount
self.total += amount
self.total_guage.set(self.total)
def withdrawl(self, account, amount):
if account not in self.accounts:
self.failed_withdrawls.inc()
raise Exception("No account")
if self.accounts[account] < amount:
self.failed_withdrawls.inc()
raise Exception("Not enough money")
self.accounts[account] -= amount
self.total -= amount
self.total_guage.set(self.total)
Example 12-2. 使用全局单例方法使 Ray 计数器与远程函数一起使用
# Singleton for reporting a Ray metric
class FailureCounter(object):
_instance = None
def __new__(cls):
if cls._instance is None:
print('Creating the object')
cls._instance = super(FailureCounter, cls).__new__(cls)
from ray.util.metrics import Counter
cls._instance.counter = Counter(
"failure",
description="Number of failures (goes up only).")
return cls._instance
# This will fail with every zero because divide by zero
@ray.remote
def remote_fun(x):
try:
return 10 / x
except:
FailureCounter().counter.inc()
return None
OpenTelemetry 可以在包括 Python 在内的多种语言中使用。Ray 具有基本的 OpenTelemetry 实现,但其使用范围不如其 Prometheus 插件广泛。
使用 Ray 包装自定义程序
Python 的一个强大功能是使用 subprocess 模块⁶ 启动子进程。这些进程可以是系统上的任何 shell 命令或任何应用程序。这种能力允许在 Ray 实现中有许多有趣的选项。我们将在这里展示其中一个选项,即作为 Ray 执行的一部分运行任何自定义 Docker 镜像⁷。Example 12-3 演示了如何实现这一点。
Example 12-3. 在 Ray 远程函数中执行 Docker 镜像
ray.init(address='ray://<*`your IP`*>:10001')
@ray.remote(num_cpus=6)
def runDocker(cmd):
with open("result.txt", "w") as output:
result = subprocess.run(
cmd,
shell=True, # Pass single string to shell, let it handle.
stdout=output,
stderr=output
)
print(f"return code {result.returncode}")
with open("result.txt", "r") as output:
log = output.read()
return log
cmd='docker run --rm busybox echo "Hello world"'
result=runDocker.remote(cmd)
print(f"result: {ray.get(result)}")
此代码包含一个简单的远程函数,执行外部命令并返回执行结果。主函数向其传递一个简单的 docker run
命令,然后打印调用结果。
此方法允许您在 Ray 远程函数执行的一部分中执行任何现有的 Docker 镜像,这反过来允许多语言 Ray 实现,甚至执行具有特定库需求的 Python 需要为此远程函数运行创建虚拟环境。它还允许在 Ray 执行中轻松包含预构建的镜像。
在 Ray 内部使用 subprocess
运行 Docker 镜像只是其有用应用之一。一般来说,可以通过这种方法调用安装在 Ray 节点上的任何应用程序。
结论
尽管 Ray 最初是在研究实验室中创建的,您可以通过这里描述的实现增强将 Ray 引入主流企业计算基础设施。具体来说,请确保执行以下操作:
仔细评估此操作可能带来的安全性和多租户问题。
要注意与 CI/CD 和可观察性工具的集成。
决定是否需要永久或短暂的 Ray 集群。
这些考虑因您的企业环境和 Ray 的具体用例而异。
到达本书的这一点,您应该对所有 Ray 基础知识有了扎实的掌握,并了解下一步的指引。我们期待您加入 Ray 社区,并鼓励您查看社区资源,包括Ray 的 Slack 频道。如果您想看看如何将 Ray 的各个部分组合起来,附录 A 探讨了如何为开源卫星通信系统构建后端的一种方式。
¹ 一些常见的安全扫描工具包括 Grype、Anchore 和 Dagda。
² 使其与 gRPC 客户端配合工作更复杂,因为 Ray 的工作节点需要能够与头节点和 Redis 服务器通信,这在使用本地主机进行绑定时会出现问题。
³ 本书作者中有些人有在 Tailscale 工作的朋友,其他解决方案也完全可以。
⁴ 在实际操作中,我们建议仅支持少数几个版本的 Ray,因为它在快速发展。
⁵ 查看Ray metrics-1650932823424.json获取配置信息。
⁶ 特别感谢 Michael Behrendt 建议本节讨论的实现方法。
⁷ 这仅适用于使用 VM 上的 Ray 安装的云安装环境。参考附录 B 了解如何在 IBM Cloud 和 AWS 上执行此操作。
附录 A. Space Beaver 案例研究:Actors、Kubernetes 等
Space Beaver 项目(来自 Pigs Can Fly Labs)利用 Swarm 和简单邮件传输协议(SMTP)提供被礼貌称为性价比高(即便宜)的离网消息服务。¹ Space Beaver 核心架构的初稿使用了 Scala 和 Akka,但后来我们转而使用 Ray。通过使用 Python 的 Ray 而不是 Scala 的 Akka,我们能够重用网站的对象关系映射(ORM)并简化部署。
虽然在 Kubernetes 上部署 Akka 应用是可能的,但(依据 Holden 的意见)相比使用 Ray 完成相同任务要复杂得多。² 在本附录中,我们将概述 Space Beaver 后端的一般设计,各种 actor 的代码,并展示如何部署它(以及类似的应用)。
注意
您可以在 Pigs Can Fly Labs GitHub 仓库 找到此案例研究的代码。
高级设计
Space Beaver 的核心要求是作为电子邮件(通过 SMTP)、短信(通过 Twilio)和 Swarm 卫星 API 之间的桥梁。其中大部分涉及一定程度的状态,例如运行 SMTP 服务器,但出站邮件消息可以在没有任何状态的情况下实现。图 A-1 展示了设计的大致轮廓。
https://github.com/OpenDocCN/ibooker-python-zh/raw/master/docs/scl-py-ray/img/spwr_aa01.png
图 A-1. Actor 布局
实施
现在您已经看到了一个大致的设计,是时候探索您在整本书中学到的模式是如何应用来将所有内容整合在一起的了。
出站邮件客户端
出站邮件客户端是唯一一个无状态的代码,因为它为每个出站消息建立连接。由于它是无状态的,我们将其实现为常规的远程函数,每个传入请求创建一个。根据传入请求的数量,Ray 可以根据需要扩展或缩减远程函数实例的数量。由于客户端可能在外部主机上阻塞,因此能够扩展包含邮件客户端的远程函数实例的数量非常有用。
提示
调度每个远程函数调用都需要一些开销。在我们的情况下,预期的消息速率并不高。如果您对所需并发有很好的了解,应考虑使用 Ray 的 multiprocessing.Pool
来避免函数创建开销。
但是,我们希望序列化某些设置,比如在设置类中,所以我们用一个特殊的方法包装出站邮件客户端函数,通过自引用传递,尽管它不是一个 actor,如 示例 A-1 所示。
示例 A-1. 邮件客户端
class MailClient(object):
"""
Mail Client
"""
def __init__(self, settings: Settings):
self.settings = settings
def send_message(self, *args, **kwargs):
"""
Wrap send_msg to include settings.
"""
return self.send_msg.remote(self, *args, **kwargs)
@ray.remote(retry_exceptions=True)
def send_msg(self, msg_from: str, msg_to: str, data: str):
message = MIMEMultipart("alternative")
message["From"] = msg_from
message["To"] = msg_to
message["Subject"] = f"A satelite msg: f{data[0:20]}"
part1 = MIMEText(data, "plain")
# Possible later: HTML
message.attach(part1)
with SMTP(self.settings.mail_server, port=self.settings.mail_port) as smtp:
if self.settings.mail_username is not None:
smtp.login(self.settings.mail_username,
self.settings.mail_password)
logging.info(f"Sending message {message}")
r = smtp.sendmail(
msg=str(message),
from_addr=msg_from,
to_addrs=msg_to)
return r
另一个合理的方法是使其有状态,并跨消息维持连接。
共享的 actor 模式和工具
系统的其余组件在长期网络连接或数据库连接的上下文中都是有状态的。 由于用户参与者需要与系统中运行的所有其他参与者进行通信(反之亦然),为了简化发现其他运行中参与者的过程,我们添加了一个LazyNamedActorPool
,它结合了命名参与者和参与者池的概念(示例 A-2)。³
示例 A-2. 懒加载命名参与者池
class LazyNamedPool:
"""
Lazily constructed pool by name.
"""
def __init__(self, name, size, min_size=1):
self._actors = []
self.name = name
self.size = size
self.min_actors = min_size
def _get_actor(self, idx):
actor_name = f"{self.name}_{idx}"
try:
return [ray.get_actor(actor_name)]
except Exception as e:
print(f"Failed to fetch {actor_name}: {e} ({type(e)})")
return []
def _get_actors(self):
"""
Get actors by name, caches result once we have the "full" set.
"""
if len(self._actors) < self.size:
return list(flat_map(self._get_actor, range(0, self.size)))
def get_pool(self):
new_actors = self._get_actors()
# Wait for at least min_actors to show up
c = 0
while len(new_actors) < self.min_actors and c < 10:
print(f"Have {new_actors} waiting for {self.min_actors}")
time.sleep(2)
new_actors = self._get_actors()
c = c + 1
# If we got more actors
if (len(new_actors) > len(self._actors)):
self._actors = new_actors
self._pool = ActorPool(new_actors)
if len(new_actors) < self.min_actors:
raise Exception("Could not find enough actors to launch pool.")
return self._pool
我们使用的另一个共享模式是优雅关闭,在此模式下,我们要求参与者停止处理新消息。 一旦参与者停止接受新消息,队列中的现有消息将被排出,根据需要发送到卫星网络或 SMTP 网络。 然后可以删除参与者,而不必持久化和恢复参与者正在处理的消息。 我们将在接下来看到的邮件服务器中实现此模式,如示例 A-3 所示。
示例 A-3. 停止进行升级
async def prepare_for_shutdown(self):
"""
Prepare for shutdown, so stop remove pod label (if present)
then stop accepting connections.
"""
if self.label is not None:
try:
self.update_label(opp="remove")
await asyncio.sleep(120)
except Exception:
pass
self.server.stop()
邮件服务器参与者
邮件服务器参与者负责接受新的入站消息并将其传递给用户参与者。 这是作为 aiosmtpd 服务器处理程序实现的,如示例 A-4 所示。
示例 A-4. 邮件服务器消息处理
async def handle_RCPT(self, server, session, envelope, address, rcpt_options):
"""
Call back for RCPT. This only accepts email for us, no relaying.
"""
logging.info(f"RCPT to with {address} received.")
if not address.endswith(f"@{self.domain}"):
self.emails_rejected.inc()
return '550 not relaying to that domain'
# Do we really want to support multiple emails? idk.
envelope.rcpt_tos.append(address)
return '250 OK'
async def handle_DATA(self, server, session, envelope):
"""
Call back for the message data.
"""
logging.info(f"Received message {envelope}")
print('Message for %s' % envelope.rcpt_tos)
parsed_email = message_from_bytes(envelope.content, policy=policy.SMTPUTF8)
text = ""
if "subject" in parsed_email:
subject = parsed_email["subject"]
text = f"{subject}\n"
body = None
# You would think "get_body" would give us the body but...maybe not? ugh
try:
body = (parsed_email.get_body(preferencelist=('plain', 'html',)).
get_content())
except Exception:
if parsed_email.is_multipart():
for part in parsed_email.walk():
ctype = part.get_content_type()
cdispo = str(part.get('Content-Disposition'))
# skip any text/plain (txt) attachments
if ctype == 'text/plain' and 'attachment' not in cdispo:
body = part.get_payload(decode=True) # decode
break
# not multipart - i.e. plain text, no attachments,
# keeping fingers crossed
else:
body = parsed_email.get_payload(decode=True)
text = f"{text}{body}"
text = text.replace("\r\n", "\n").rstrip("\n")
self.emails_forwaded.inc()
for rcpt in envelope.rcpt_tos:
message = CombinedMessage(
text=text,
to=parseaddr(rcpt)[1].split('@')[0],
msg_from=envelope.mail_from,
from_device=False,
protocol=EMAIL_PROTOCOL)
self.user_pool.get_pool().submit(
lambda actor, message: actor.handle_message.remote(message),
message)
return '250 Message accepted for delivery'
拥有邮件服务器的一个重要部分是外部用户可以连接到服务器。 对于 HTTP 服务(如推理服务器),您可以使用 Ray Serve 公开您的服务。 但是,邮件服务器使用 SMTP,目前无法使用 Ray Serve 公开。 因此,为了允许 Kubernetes 将请求路由到正确的主机,邮件参与者会像示例 A-5 中所示那样标记自身。
示例 A-5. 邮件服务器 Kubernetes 标记
def update_label(self, opp="add"):
label = self.label
patch_json = (
"[{" +
f""" "op": "{opp}", "path": "/metadata/labels/{label}", """ +
f""" "value": "present" """ +
"}]")
print(f"Preparing to patch with {patch_json}")
try:
kube_host = os.getenv("KUBERNETES_SERVICE_HOST")
kube_port = os.getenv("KUBERNETES_PORT_443_TCP_PORT", "443")
pod_namespace = os.getenv("POD_NAMESPACE")
pod_name = os.getenv("POD_NAME")
url = f"http://{kube_host}:{kube_port}/api/v1/namespace/" +
f"{pod_namespace}/pods/{pod_name}"
headers = {"Content-Type": "application/json-patch+json"}
print(f"Patching with url {url}")
result = requests.post(url, data=patch_json, headers=headers)
logging.info(f"Got back {result} updating header.")
print(f"Got patch result {result}")
if result.status_code != 200:
raise Exception(f"Got back a bad status code {result.status_code}")
except Exception as e:
print(f"Got an error trying to patch with https API {e}")
patch_cmd = [
"kubectl",
"patch",
"pod",
"-n",
pod_namespace,
pod_name,
"--type=json",
f"-p={patch_json}"]
print("Running cmd:")
print(" ".join(patch_cmd))
out = subprocess.check_output(patch_cmd)
print(f"Got {out} from patching pod.")
print("Pod patched?")
卫星参与者
卫星参与者类似于邮件服务器参与者,但不是接受入站请求,而是通过轮询获取新消息,并且我们也通过它发送消息。 轮询就像在车上开着一个六岁的孩子一样,不停地问:“我们到了吗?” 但在我们的情况下,问题是“你有没有新消息?” 在 Ray 中,异步参与者是实现轮询的最佳选项,因为轮询循环永远运行,但您仍然希望能够处理其他消息。 示例 A-6 展示了卫星参与者的轮询实现。
示例 A-6. 卫星参与者轮询
async def run(self):
print("Prepairing to run.")
internal_retries = 0
self.running = True
while self.running:
try:
self._login()
while True:
await asyncio.sleep(self.delay)
await self.check_msgs()
internal_retries = 0 # On success reset retry counter.
except Exception as e:
print(f"Error {e} while checking messages.")
logging.error(f"Error {e}, retrying")
internal_retries = internal_retries + 1
if (internal_retries > self.max_internal_retries):
raise e
此轮询循环大部分逻辑委托给check_msgs
,如示例 A-7 所示。
示例 A-7. 卫星检查消息
async def check_msgs(self):
print("Checking messages...")
res = self.session.get(
self._getMessageURL,
headers=self.hdrs,
params={'count': self._page_request_size, 'status': 0})
messages = res.json()
for item in messages:
# Is this a message we are responsible for
if int(item["messageId"]) % self.poolsize == self.idx:
try:
await self._process_mesage(item)
except Exception as e:
logging.error(f"Error {e} processing {item}")
self.session.post(
self._ackMessageURL.format(item['packetId']),
headers=self.hdrs)
print("Done!")
在卫星参与者中我们使用的另一个有趣模式是在测试中暴露可序列化的结果,但在正常流程中保持数据以更高效的异步表示。 这种模式在消息解码方式中展示,如示例 A-8 所示。
示例 A-8. 卫星处理消息
async def _decode_message(self, item: dict) -> AsyncIterator[CombinedMessage]:
"""
Decode a message. Note: result is not serializable.
"""
raw_msg_data = item["data"]
logging.info(f"msg: {raw_msg_data}")
messagedata = MessageDataPB() # noqa
bin_data = base64.b64decode(raw_msg_data)
# Note: this really does no validation, so if it gets a message instead
# of MessageDataPb it just gives back nothing
messagedata.ParseFromString(bin_data)
logging.info(f"Formatted: {text_format.MessageToString(messagedata)}")
if (len(messagedata.message) < 1):
logging.warn(f"Received {raw_msg_data} with no messages?")
for message in messagedata.message:
yield CombinedMessage(
text=message.text, to=message.to, protocol=message.protocol,
msg_from=item["deviceId"], from_device=True
)
async def _ser_decode_message(self, item: dict) -> List[CombinedMessage]:
"""
Decode a message. Serializeable but blocking. Exposed for testing.
"""
gen = self._decode_message(item)
# See PEP-0530
return [i async for i in gen]
async def _process_message(self, item: dict):
messages = self._decode_message(item)
async for message in messages:
self.user_pool.get_pool().submit(
lambda actor, msg: actor.handle_message.remote(msg),
message)
用户演员
虽然其他演员都是异步的,允许在演员内部进行并行处理,但用户演员是同步的,因为 ORM 尚未处理异步执行。用户演员的代码在 示例 A-9 中展示得相当完整,因此你可以看到共享的模式(其他演员因简洁起见而跳过)。
示例 A-9. 用户演员
class UserActorBase():
"""
Base client class for talking to the swarm.space APIs.
Note: this actor is not async because Django's ORM is not happy with
async.
"""
def __init__(self, settings: Settings, idx: int, poolsize: int):
print(f"Running on {platform.machine()}")
self.settings = settings
self.idx = idx
self.poolsize = poolsize
self.satellite_pool = utils.LazyNamedPool("satellite", poolsize)
self.outbound_sms = utils.LazyNamedPool("sms", poolsize)
self.mail_client = MailClient(self.settings)
self.messages_forwarded = Counter(
"messages_forwarded",
description="Messages forwarded",
tag_keys=("idx",),
)
self.messages_forwarded.set_default_tags(
{"idx": str(idx)})
self.messages_rejected = Counter(
"messages_rejected",
description="Rejected messages",
tag_keys=("idx",),
)
self.messages_rejected.set_default_tags(
{"idx": str(idx)})
print(f"Starting user actor {idx}")
def _fetch_user(self, msg: CombinedMessage) -> User:
"""
Find the user associated with the message.
"""
if (msg.from_device):
device = Device.objects.get(serial_number=msg.msg_from)
return device.user
elif (msg.protocol == EMAIL_PROTOCOL):
username = msg.to
print(f"Fetching user {msg.to}")
try:
return User.objects.get(username=username)
except Exception as e:
print(f"Failed to get user: {username}?")
raise e
elif (msg.protocol == SMS_PROTOCOL):
print(f"Looking up user for phone {msg.to}")
try:
return User.objects.get(twillion_number=str(msg.to))
except Exception as e:
print(f"Failed to get user: {username}?")
raise e
else:
raise Exception(f"Unhandled protocol? - {msg.protocol}")
def prepare_for_shutdown(self):
"""
Prepare for shutdown (not needed for sync DB connection)
"""
pass
def handle_message(self, input_msg: CombinedMessage):
"""
Handle messages.
"""
print(f"Handling message {input_msg}")
user = self._fetch_user(input_msg)
self.messages_forwarded.inc()
if (input_msg.from_device):
msg = {
"data": input_msg.text,
"msg_from": f"{user.username}@spacebeaver.com",
"msg_to": input_msg.to
}
# Underneath this calls a ray.remote method.
self.mail_client.send_message(**msg)
else:
msg = {
"protocol": input_msg.protocol,
"msg_from": input_msg.msg_from,
"msg_to": user.device.serial_number,
"data": input_msg.text
}
self.satellite_pool.get_pool().submit(
lambda actor, msg: actor.send_message.remote(**msg),
msg)
@ray.remote(max_restarts=-1)
class UserActor(UserActorBase):
"""
Routes messages and checks the user account info.
"""
注释
Django 是一个流行的 Python Web 开发框架,包括许多组件,包括我们正在使用的 ORM。
SMS 演员和 Serve 实现
除了卫星和电子邮件网关的演员外,Space Beaver 还使用 Ray Serve 来公开 phone-api
,如 示例 A-10 所示。
示例 A-10. 使用 Ray Serve 处理入站短信
from messaging.utils import utils
from pydantic import BaseModel, Field
from fastapi import FastAPI, HTTPException, Request
from ray import serve
from messaging.settings.settings import Settings
from messaging.proto.MessageDataPB_pb2 import SMS as SMS_PROTOCOL
from messaging.internal_types import CombinedMessage
from typing import Optional
from twilio.request_validator import RequestValidator
# 1: Define a FastAPI app and wrap it in a deployment with a route handler.
app = FastAPI()
class InboundMessage(BaseModel):
x_twilio_signature: str
message_from: str = Field(None, alias='from')
to: str
body: str
msg_type: Optional[str] = Field(None, alias="type")
@serve.deployment(num_replicas=3, route_prefix="/")
@serve.ingress(app)
class PhoneWeb:
def __init__(self, settings: Settings, poolsize: int):
self.settings = settings
self.poolsize = poolsize
self.user_pool = utils.LazyNamedPool("user", poolsize)
self.validator = RequestValidator(settings.TW_AUTH_TOKEN)
# FastAPI will automatically parse the HTTP request for us.
@app.get("/sms")
async def inbound_message(self, request: Request,
message: InboundMessage) -> str:
# Validate the message
request_valid = self.validator.validate(
request.url,
request.form,
request.headers.get('X-TWILIO-SIGNATURE', ''))
if request_valid:
internal_message = CombinedMessage(
text=message.body, to=message.to, protocol=SMS_PROTOCOL,
msg_from=message.message_from, from_device=False
)
self.user_pool.get_pool().submit(
lambda actor, msg: actor.handle_message.remote(msg),
internal_message)
return ""
else:
raise HTTPException(status_code=403, detail="Validation failed.")
测试
为了方便测试,演员代码被分解为一个基类,然后扩展为演员类。这允许独立测试邮件服务器,而不依赖其在 Ray 上的部署,如 示例 A-11 中所示。
示例 A-11. 独立邮件测试
class StandaloneMailServerActorTests(unittest.TestCase):
port = 7779 + 100 * random.randint(0, 9)
def setUp(self):
self.port = self.port + 1
self.actor = mailserver_actor.MailServerActorBase(
idx=1, poolsize=1, port=self.port, hostname="0.0.0.0",
label=None)
self.actor.user_pool = test_utils.FakeLazyNamedPool("u", 1)
self.pool = self.actor.user_pool.get_pool()
def tearDown(self):
self.actor.server.stop()
self.server = None
def test_constructor_makes_server(self):
self.assertEquals(self.actor.server.hostname, "0.0.0.0")
def test_extract_body_and_connect(self):
client = Client("localhost", self.port)
msg_text = "Hi Boop, this is timbit."
client.sendmail("c@gull.com", "boop@spacebeaver.com",
msg_text)
self.assertEquals(self.pool.submitted[0][1].text, msg_text)
self.assertEquals(self.pool.submitted[0][1].protocol, EMAIL_PROTOCOL)
self.assertEquals(self.pool.submitted[0][1].from_device, False)
尽管这些独立测试可以减少开销,但最好还是有一些完整的演员测试。你可以通过在测试中重复使用 Ray 上下文来加快速度(尽管当出现问题时,调试是很痛苦的),就像在 示例 A-12 中展示的那样。
示例 A-12. 完整演员测试
@ray.remote
class MailServerActorForTesting(mailserver_actor.MailServerActorBase):
def __init__(self, idx, poolsize, port, hostname):
mailserver_actor.MailServerActorBase.__init__(self, idx, poolsize,
port, hostname)
self.user_pool = test_utils.FakeLazyNamedPool("user", 1)
class MailServerActorTestCases(unittest.TestCase):
@classmethod
def setUpClass(cls):
ray.init()
@classmethod
def tearDownClass(cls):
ray.shutdown()
def test_mail_server_actor_construct(self):
mailserver_actor.MailServerActor.remote(0, 1, 7587, "localhost")
部署
尽管 Ray 处理了大部分部署工作,我们仍然需要创建一个 Kubernetes 服务来使我们的 SMTP 和 SMS 服务可访问。在我们的测试集群上,我们通过暴露一个负载均衡器服务来实现,如 示例 A-13 所示。
示例 A-13. SMTP 和 SMS 服务
apiVersion: v1
kind: Service
metadata:
name: message-backend-svc
namespace: spacebeaver
spec:
selector:
mail_ingress: present
ports:
- name: smtp
protocol: TCP
port: 25
targetPort: 7420
type: LoadBalancer
loadBalancerIP: 23.177.16.210
sessionAffinity: None
---
apiVersion: v1
kind: Service
metadata:
name: phone-api-svc
namespace: spacebeaver
spec:
selector:
ray-cluster-name: spacebeaver
ports:
- name: http
protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
sessionAffinity: None
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: spacebeaver-phone-api-ingress
namespace: spacebeaver
annotations:
cert-manager.io/cluster-issuer: letsencrypt
cert-manager.io/issue-temporary-certificate: "true"
acme.cert-manager.io/http01-edit-in-place: "true"
spec:
ingressClassName: nginx
tls:
- hosts:
- phone-api.spacebeaver.com
secretName: phone-api-tls-secret
rules:
- host: "phone-api.spacebeaver.com"
http:
paths:
- pathType: Prefix
path: "/"
backend:
service:
name: phone-api-svc
port:
number: 80
如图所示,SMTP 和 SMS 服务使用不同的节点选择器将请求路由到正确的 pod。
结论
Space Beaver 消息后端的 Ray 移植大大减少了部署和打包的复杂性,同时增加了代码复用。部分原因来自于广泛的 Python 生态系统(流行的前端工具和后端工具),但其余部分来自于 Ray 的无服务器特性。与之相对应的 Akka 系统需要用户在调度演员时考虑意图,而使用 Ray,我们可以把这些交给调度器。当然,Akka 带来了许多好处,比如强大的 JVM 生态系统,但希望这个案例研究已经展示了你可以如何有趣地使用 Ray。
¹ Holden Karau 是 Pigs Can Fly Labs 的管理合伙人,虽然她真的希望你会购买这款离线消息设备,但她意识到阅读编程书籍的人群和需要低成本开源卫星电子邮件消息的人群之间的交集相当小。实际上,对于许多消费者使用案例来说,Garmin inReach Mini2 或者 Apple 可能更好。
² 在 Akka on Kubernetes 中,用户需要手动将 actors 调度到单独的容器上并重新启动 actors,而 Ray 可以为我们处理这些。
³ 另一种解决方案是让主程序或启动程序在创建 actors 时通过引用来调用它们。
附录 B. 安装和部署 Ray
Ray 的强大之处在于它支持各种部署模型,从单节点部署——允许您在本地进行 Ray 实验——到包含数千台机器的集群。在本附录中,我们将展示编写本书时评估的一些安装选项。
在本地安装 Ray
最简单的 Ray 安装是使用 pip
在本地进行的。使用以下命令:
pip install -U ray
此命令安装了运行本地 Ray 程序或在 Ray 集群上启动程序所需的所有代码(参见 “使用 Ray 集群”)。该命令安装了最新的官方发布版本。此外,还可以从 每日发布 或 特定提交 安装 Ray。还可以在 Conda 环境 中安装 Ray。最后,您可以按照 Ray 文档 中的说明从源代码构建 Ray。
使用 Ray Docker 镜像
除了在本地机器上进行本地安装外,Ray 还提供了通过运行提供的 Docker 镜像 的选项。Ray 项目提供了丰富的 Docker 镜像,适用于各种 Python 版本和硬件选项。这些镜像可以用来通过启动相应的 Ray 镜像来执行 Ray 的代码:
docker run --rm --shm-size=<*shm-size*> -t -i <*image name*>
在这里 <*shm-size*>
是 Ray 内部用于对象存储的内存大小。对于此值的一个良好估计是使用您可用内存的大约 30%;<*image name*>
是所使用的镜像的名称。
执行此命令后,您将收到一个命令行提示符,并可以输入任何 Ray 代码。
使用 Ray 集群
尽管本地 Ray 安装对于实验和初始调试非常有用,但 Ray 的真正强大之处在于其能够在机器集群上运行和扩展。
Ray 集群节点是基于 Docker 镜像的逻辑节点。Ray 项目提供的 Docker 镜像包含了运行逻辑节点所需的所有代码,但不一定包含运行用户应用程序所需的所有代码。问题在于用户的代码可能需要特定的 Python 库,而这些库并不包含在 Ray 的 Docker 镜像中。
为了解决这个问题,Ray 允许在集群安装的一部分中向节点安装特定库,这对于初始测试非常有帮助,但可能会显著影响节点的创建性能。因此,在生产安装中,通常建议使用从 Ray 提供的自定义镜像派生的镜像并添加所需的库。
Ray 提供了两种主要的安装选项:直接安装在硬件节点或云提供商的 VM 上,以及在 Kubernetes 上安装。在这里,我们将讨论 Ray 在云提供商和 Kubernetes 上的安装。有关 Ray 在硬件节点上的安装信息,请参阅Ray 文档。
官方文档描述了 Ray 在包括 AWS、Azure、Google Cloud、阿里巴巴和自定义云在内的多个云提供商上的安装。在这里,我们将讨论在 AWS 上的安装(因为它是最流行的)和 IBM Cloud 上的安装(因为其中一位合著者在 IBM 工作,采用了独特的方法)。¹
在 AWS 上安装 Ray
AWS 云安装利用了 Python 的 Boto3 AWS SDK,并且需要在*~/.aws/credentials*文件中配置您的 AWS 凭证。²
一旦凭证创建并安装了 Boto3,您可以使用从Ray GitHub 存储库适配的ray-aws.yaml文件,通过以下命令在 AWS 上安装 Ray:
ray up <*your location*>/ray-aws.yaml
此命令创建了集群,并提供了一组您可以使用的有用命令:
Monitor autoscaling with
ray exec ~/Projects/Platform-Infrastructure/middleware\
/ray/install/ray-aws.yaml\
'tail -n 100 -f /tmp/ray/session_latest/logs/monitor*'
Connect to a terminal on the cluster head:
ray attach ~/Projects/Platform-Infrastructure/middleware\
/ray/install/ray-aws.yaml
Get a remote shell to the cluster manually:
ssh -o IdentitiesOnly=yes\
-i /Users/boris/Downloads/id.rsa.ray-boris root@52.118.80.225
请注意,您将看到的 IP 地址与此处显示的不同。在创建集群时,它使用了一个只允许通过 Secure Shell(SSH)连接到集群的防火墙。如果您想访问集群的仪表板,您需要打开 8265 端口;如果需要 gRPC 访问,请使用 10001 端口。为此,请在 Amazon Elastic Compute Cloud(EC2)仪表板中找到您的节点,点击“安全组”选项卡,选择安全组,并修改入站规则。Figure B-1 显示了一个允许来自任何地方的实例端口访问的新规则。有关入站规则配置的更多信息,请参阅AWS 文档。
https://github.com/OpenDocCN/ibooker-python-zh/raw/master/docs/scl-py-ray/img/spwr_ab01.png
图 B-1. AWS 控制台中的实例视图
按照您的 YAML 文件的请求,您只能看到一个头部,并且工作节点将被创建以满足提交作业的执行要求。要验证集群是否正常运行,您可以使用 GitHub 上localPython.py中的代码,该代码验证它是否可以连接到集群及其节点。
使用 VM 直接安装 Ray 的替代方法是直接在 VM 上安装 Ray。这种方法的优势在于能够轻松地向 VM 添加附加软件,这在实际中非常有用。一个明显的用例是管理 Python 库。您可以使用基于 Docker 的安装来做到这一点,但随后需要为每个库配置构建 Docker 镜像。在基于 VM 的方法中,无需创建和管理 Docker 镜像;只需适当地使用pip
进行安装即可。此外,您还可以在 VM 上安装应用程序,以便在 Ray 执行中利用它们(参见“使用 Ray 包装自定义程序”)。
提示
在 VM 上安装 Ray 需要大量的设置命令,因此 Ray 节点启动可能需要相当长的时间。推荐的方法是先启动 Ray 集群一次,创建一个新镜像,然后使用此镜像并移除额外的设置命令。
在 IBM Cloud 上安装 Ray
IBM Cloud 安装基于Gen2 连接器,该连接器使 Ray 集群可以部署在 IBM 的 Gen2 云基础设施上。与在 AWS 上使用 Ray 一样,您将首先在 YAML 文件中创建集群规范。如果您不想手动创建 YAML 文件,可以交互式地使用 Lithopscloud 完成此操作。您可以像平常一样使用pip
安装 Lithopscloud:
pip3 install lithopscloud
要使用 Lithopscloud,您首先需要创建一个API 密钥或重用现有的密钥。有了您的 API 密钥,您可以运行 lithopscloud -o cluster.yaml
来生成一个cluster.yaml文件。启动 Lithopscloud 后,按照提示生成文件(您需要使用上下箭头进行选择)。您可以在GitHub上找到生成文件的示例。
自动生成文件的限制在于它对头节点和工作节点使用相同的镜像类型,这并不总是理想的。通常情况下,您可能希望为这些节点提供不同的类型。要做到这一点,您可以修改自动生成的cluster.yaml文件如下:
available_node_types:
ray_head_default:
max_workers: 0
min_workers: 0
node_config:
boot_volume_capacity: 100
image_id: r006-dd164da8-c4d9-46ba-87c4-03c614f0532c
instance_profile_name: bx2-4x16
key_id: r006-d6d823da-5c41-4e92-a6b6-6e98dcc90c8e
resource_group_id: 5f6b028dc4ef41b9b8189bbfb90f2a79
security_group_id: r006-c8e44f9c-7159-4041-a7ab-cf63cdb0dca7
subnet_id: 0737-213b5b33-cee3-41d0-8d25-95aef8e86470
volume_tier_name: general-purpose
vpc_id: r006-50485f78-a76f-4401-a742-ce0a748b46f9
resources:
CPU: 4
ray_worker_default:
max_workers: 10
min_workers: 0
node_config:
boot_volume_capacity: 100
image_id: r006-dd164da8-c4d9-46ba-87c4-03c614f0532c
instance_profile_name: bx2-8x32
key_id: r006-d6d823da-5c41-4e92-a6b6-6e98dcc90c8e
resource_group_id: 5f6b028dc4ef41b9b8189bbfb90f2a79
security_group_id: r006-c8e44f9c-7159-4041-a7ab-cf63cdb0dca7
subnet_id: 0737-213b5b33-cee3-41d0-8d25-95aef8e86470
volume_tier_name: general-purpose
vpc_id: r006-50485f78-a76f-4401-a742-ce0a748b46f9
resources:
CPU: 8
在这里,您定义了两种类型的节点:默认的头节点和默认的工作节点(您可以定义多个工作节点类型,并设置每次的最大工作节点数)。因此,您现在可以拥有一个相对较小的头节点(始终运行),以及会根据需要创建的更大的工作节点。
提示
如果您查看生成的 YAML 文件,您会注意到它包含许多设置命令,因此 Ray 节点启动可能需要相当长的时间。推荐的方法是先启动 Ray 集群一次,创建一个新镜像,然后使用此镜像并移除设置命令。
生成 YAML 文件后,您可以安装 Gen2 连接器以便使用它。运行 pip3 install gen2-connector
。然后,通过运行 ray up cluster.yaml
来创建您的集群。
类似于在 AWS 上安装 Ray,此安装显示了一系列有用的命令:
Monitor autoscaling with
ray exec /Users/boris/Downloads/cluster.yaml \
'tail -n 100 -f /tmp/ray/session_latest/logs/monitor*'
Connect to a terminal on the cluster head:
ray attach ~/Downloads/cluster.yaml
Get a remote shell to the cluster manually:
ssh -o IdentitiesOnly=yes -i ~/Downloads/id.rsa.ray-boris root@52.118.80.225
要访问集群,请确保按照IBM Cloud 文档(图 B-2)开放所需端口。
https://github.com/OpenDocCN/ibooker-python-zh/raw/master/docs/scl-py-ray/img/spwr_ab02.png
图 B-2. IBM Cloud 控制台显示防火墙规则
根据您的 YAML 文件的请求,您只能看到一个头部;工作节点将被创建以满足提交作业的执行需求。要验证集群是否正确运行,请执行 localPython.py 脚本。
在 Kubernetes 上安装 Ray
在实际集群在 Kubernetes 上的安装中,Ray 提供了两种基本机制:
集群启动器
与使用虚拟机进行安装类似,这使得在任何云上部署 Ray 集群变得简单。它将使用云提供商的 SDK 创建新的实例或机器,执行 shell 命令以使用提供的选项设置 Ray,并初始化集群。
Ray Kubernetes 操作器
这简化了在现有 Kubernetes 集群上部署 Ray 的过程。操作器定义了一个称为 RayCluster
的自定义资源,它描述了 Ray 集群的期望状态,以及一个自定义控制器,即 Ray 操作器,它处理 RayCluster 资源并管理 Ray 集群。
提示
当您使用集群启动器和操作器在 Kubernetes 集群上安装 Ray 时,Ray 利用 Kubernetes 的能力创建一个新的 Ray 节点,形式为 Kubernetes Pod。虽然 Ray 自动扩展器的工作方式相同,但它有效地从 Kubernetes 集群中“窃取”资源。因此,您的 Kubernetes 集群要么足够大以支持 Ray 的所有资源需求,要么提供自己的自动缩放机制。此外,由于 Ray 的节点在这种情况下是作为底层 Kubernetes Pod 实现的,因此 Kubernetes 资源管理器可以随时删除这些 Pod 以获取额外的资源。
在 kind 集群上安装 Ray
为了演示两种方法,让我们首先在 kind (Kubernetes in Docker) 集群 上安装并访问 Ray 集群。这个流行的工具通过使用 Docker 容器“节点”来运行本地 Kubernetes 集群,并且经常用于本地开发。为此,您需要首先通过运行以下命令来创建一个集群:
kind create cluster
这将使用默认配置创建一个集群。要修改配置,请参考配置文档。一旦集群启动运行,您可以使用 ray up
或 Kubernetes 操作器来创建 Ray 集群。
使用 ray up
要使用 ray up
创建 Ray 集群,必须在一个 YAML 文件中指定资源需求,例如raycluster.yaml,该文件改编自 Ray GitHub 仓库 中的 Ray Kubernetes 自动缩放器默认值。该文件包含创建 Ray 集群所需的所有信息:
关于集群名称和自动缩放参数的一般信息。
关于集群提供程序(在我们的情况下是 Kubernetes)的信息,包括创建 Ray 集群节点所需的特定于提供程序的信息。
特定于节点的信息(CPU/内存等)。这还包括节点启动命令列表,包括安装所需的 Python 库。
有了这个文件,创建集群的命令看起来像这样:
ray up <*your location*>/raycluster.yaml
完成集群创建后,您可以看到有几个 pod 在运行:
> get pods -n ray
NAME READY STATUS RESTARTS AGE
ray-ray-head-88978 1/1 Running 0 2m15s
ray-ray-worker-czqlx 1/1 Running 0 23s
ray-ray-worker-lcdmm 1/1 Running 0 23s
根据我们的 YAML 文件的请求,您可以看到一个 head 和两个 worker 节点。要验证集群是否正常运行,可以使用以下 作业:
kubectl create -f <your location>/jobexample.yaml -n ray
执行的结果类似于这样:
> kubectl logs ray-test-job-bx4xj-4nfbl -n ray
--2021-09-28 15:18:59-- https://raw.githubusercontent.com/scalingpythonml/...
Resolving raw.githubusercontent.com (raw.githubusercontent.com) ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com) ...
Length: 1750 (1.7K) [text/plain]
Saving to: ‘servicePython.py’
0K . 100% 9.97M=0s
2021-09-28 15:18:59 (9.97 MB/s) - ‘servicePython.py’ saved [1750/1750]
Connecting to Ray at service ray-ray-head, port 10001
Iteration 0
Counter({('ray-ray-head-88978', 'ray-ray-head-88978'): 30, ...
Iteration 1
……………………………….
Success!
作业启动后,您还可以通过运行以下命令将 ray-ray-head
服务进行端口转发:³
kubectl port-forward -n ray service/ray-ray-head 10001
然后,使用书籍示例文件中的localPython.py测试脚本从本地机器连接到它。执行此代码会产生与先前显示的相同结果。
此外,您可以将 ray 服务端口转发到 8265 端口以查看 Ray 仪表板:
kubectl port-forward -n ray service/ray-ray-head 8265
完成后,您可以查看 Ray 仪表板(图 B-3)。
https://github.com/OpenDocCN/ibooker-python-zh/raw/master/docs/scl-py-ray/img/spwr_ab03.png
图 B-3. Ray 仪表板
使用以下命令可以卸载 Ray 集群:⁴
ray down <*your location*>/raycluster.yaml
使用 Ray Kubernetes 操作员
对于部署到 Kubernetes 集群,我们还可以使用 Ray 操作员,这是一种推荐的方法。为了简化操作员的使用,Ray 提供了作为 Ray GitHub 仓库的一部分可用的Helm 图表。在这里,我们使用多个 YAML 文件来部署 Ray,以使安装变得更加简单,而不是使用 Helm 图表。
我们的部署分为三个文件:operatorcrd.yaml,其中包含用于 CustomResourceDefinition(CRD)创建的所有命令;operator.yaml,其中包含用于操作员创建的所有命令;以及rayoperatorcluster.yaml,其中包含用于集群创建的所有命令。这些文件假定操作员是在 ray 命名空间中创建的。
要安装操作员本身,我们需要执行这两个命令:
kubectl apply -f <*your location*>/operatorcrd.yaml
kubectl apply -f <*your location*>/operator.yaml
完成后,请使用以下命令确保操作员 pod 正在运行:
> kubectl get pods -n ray
NAME READY STATUS RESTARTS AGE
ray-operator-6c9954cddf-cjn9c 1/1 Running 0 110s
一旦操作员启动并运行,您可以使用以下命令启动集群本身:⁵
kubectl apply -f <*your location*>/rayoperatorcluster.yaml -n ray
rayoperatorcluster.yaml 的内容与 raycluster.yaml 类似,但格式略有不同。一旦集群运行起来,你可以使用与之前描述的 ray up
相同的验证代码。
在 OpenShift 上安装 Ray
OpenShift 是一种 Kubernetes 集群类型,因此理论上可以使用 Kubernetes 运算符在 OpenShift 集群上安装 Ray。不过,这种安装过程会稍微复杂一些。
如果你曾经使用过 OpenShift,你会知道默认情况下,所有 OpenShift 中的 pod 都以 限制模式 运行。此模式拒绝访问所有主机功能,并要求 pod 使用分配给命名空间的唯一标识符 (UID) 和安全增强型 Linux (SELinux) 上下文来运行。
不幸的是,对于 Ray operator 来说,设计为以用户 1000 运行的情况不太适用。为了启用此功能,你需要对安装在 kind(以及任何其他纯 Kubernetes 集群)上的文件进行几处更改:
添加 ray-operator-serviceaccount
服务账户,该账户由运算符使用,设置为 anyuid
模式。这允许用户使用任何非根 UID 运行:
oc adm policy add-scc-to-user anyuid -z ray-operator-serviceaccount
修改 operator.yaml,确保运算符 pod 以用户 1000 的身份运行。
此外,必须稍微修改 测试作业,以用户 1000 的身份运行。这需要创建一个 ray-node-serviceaccount
服务账户用于运行作业,并将该服务账户设置为 anyuid
模式,允许用户使用任何非根 UID 运行。
结论
Ray 提供丰富的部署选项。当使用 Ray 解决特定问题时,你需要决定哪个选项最适合你的具体情况。
¹ 为了透明起见:Boris 目前在 IBM 工作,Holden 曾在 IBM 工作过。Holden 也曾在 Google、Microsoft 和 Amazon 工作过。
² 参见 “Boto3 Docs 1.24.95” 文档 获取有关设置 Boto3 配置的信息。
³ 理论上,你也可以创建一个入口来连接 Ray 集群。但遗憾的是,在 NGINX 入口控制器的情况下,它将无法工作。问题在于 Ray 客户端使用的是不安全的 gRPC,而 NGINX 入口控制器仅支持安全的 gRPC 调用。在使用 Ray 集群时,请检查入口是否支持不安全的 gRPC,在将 Ray 的 head 服务公开为入口之前。
⁴ 此命令会删除 pod,并且会留下作为集群一部分创建的服务。你必须手动删除服务以进行完全清理。
⁵ 尽管文档提到了集群范围的部署运算符,但它仅适用于部署运算符的命名空间。
附录 C. 使用 Ray 进行调试
根据您的调试技术,迁移到分布式系统可能需要一套新的技术。幸运的是,像 Pdb 和 PyCharm 这样的工具允许您连接远程调试器,而 Ray 的本地模式则允许您在许多其他情况下使用现有的调试工具。有些错误发生在 Python 之外,使得它们更难调试,如容器内存不足(OOM)错误、分段错误和其他本地错误。
注意
此附录的一些部分与使用 Dask 扩展 Python共享,因为它们是调试所有类型分布式系统的一般良好建议。
使用 Ray 的一般调试技巧
您可能有自己的标准 Python 代码调试技术,本附录不旨在替代它们。以下是一些在使用 Ray 时更有意义的一般技术:
将失败的函数拆分为较小的函数。由于ray.remote
在函数块上调度,较小的函数使问题更容易隔离。
注意任何意外的作用域捕获。
使用示例数据尝试在本地重现它(本地调试通常更容易)。
使用 Mypy 进行类型检查。虽然我们并未在所有示例中包含类型,但在生产代码中,使用自由的类型可以捕获棘手的错误。
当问题无论并行化如何出现时,请在单线程模式下调试您的代码,这样更容易理解发生了什么。
现在,有了这些额外的一般技巧,是时候了解更多有关工具和技术,帮助您进行 Ray 调试了。
序列化错误
序列化在 Ray 中起着重要作用,但也可能是头痛的来源,因为小的更改可能导致意外的变量捕获和序列化失败。幸运的是,Ray 在 ray.util
中有一个实用函数 inspect_serializability
,您可以使用它来调试序列化错误。如果您有意定义一个捕获非可序列化数据的函数,比如 示例 C-1,您可以运行 inspect_serializability
看看它如何报告失败(如 示例 C-2)。
示例 C-1. 错误的序列化示例
pool = Pool(5)
def special_business(x):
def inc(y):
return y + x
return pool.map(inc, range(0, x))
ray.util.inspect_serializability(special_business)
示例 C-2. 错误的序列化结果
=========================================================================
Checking Serializability of <function special_business at 0x7f78802820d0>
=========================================================================
!!! FAIL serialization: pool objects cannot be passed between processes or pickled
Detected 1 global variables. Checking serializability...
Serializing 'pool' <multiprocessing.pool.Pool state=RUN pool_size=5>...
!!! FAIL serialization: pool objects cannot be passed between processes ...
...
在此示例中,Ray 检查元素的可序列化性,并指出非序列化值 pool
来自全局范围。
使用 Ray 本地调试
在本地模式下使用 Ray,可以让您使用习惯的工具,而无需处理设置远程调试的复杂性。我们不会涵盖各种本地 Python 调试工具,因此这一部分存在的意义仅在于提醒您首先尝试在本地模式下重现问题,然后再开始使用本附录其余部分涵盖的高级调试技术。
远程调试
远程调试可以是一个很好的工具,但需要更多对集群的访问权限,这在某些情况下可能并不总是可用。Ray 的特殊集成工具 ray debug
支持跨整个集群的追踪。不幸的是,其他远程 Python 调试器一次只能附加到一个机器上,因此你不能简单地将调试器指向整个集群。
警告
远程调试可能会导致性能变化和安全隐患。在启用集群上的远程调试之前,通知所有用户非常重要。
如果你控制自己的环境,设置远程调试相对比较简单,但在企业部署中,你可能会遇到启用这一功能的阻力。在这种情况下,使用本地集群或请求一个开发集群进行调试是你最好的选择。
提示
对于交互式调试器,你可能需要与系统管理员合作,以公开集群中额外的端口。
Ray 的集成调试器(通过 Pdb)
Ray 支持通过 Pdb 进行调试的集成,允许你跨集群跟踪代码。你仍然需要修改启动命令 (ray start
) 来包括 (ray start --ray-debugger-external
) 来加载调试器。启用 Ray 的外部调试器后,Pdb 将在额外端口上监听(无需任何认证),以供调试器连接。
一旦你配置并启动了集群,你可以在主节点上启动 Ray 调试器。¹ 要启动调试器,只需运行 ray debug
,然后你可以使用所有你喜欢的 Pdb 调试命令。
其他工具
对于非集成工具,由于每次对远程函数的调用可能被安排在不同的工作节点上,你可能会发现将你的无状态函数暂时转换为执行器会更容易调试。这将会对性能产生真实的影响,因此在生产环境中可能不适用,但确实意味着重复调用将路由到同一台机器,从而使调试任务更加简单。
PyCharm
PyCharm 是一个集成调试器的流行 Python IDE。虽然它不像 Pdb 那样集成,但你可以通过几个简单的更改使其工作。第一步是将 pydevd-pycharm
包添加到你的容器/要求中。然后,在你想要调试的执行器中,你可以像 示例 C-3 中展示的那样启用 PyCharm 调试。
示例 C-3. 启用 PyCharm 远程调试
@ray.remote
class Bloop():
def __init__(self, dev_host):
import pydevd_pycharm
# Requires ability to connect to dev from prod.
try:
pydevd_pycharm.settrace(
dev_host, port=7779, stdoutToServer=True, stderrToServer=True)
except ConnectionRefusedError:
print("Skipping debug")
pass
def dothing(x):
return x + 1
你的执行器将会从执行者回到你的 PyCharm IDE 创建连接。
Python 分析器
Python 分析器可以帮助追踪内存泄漏、热代码路径以及其他重要但不是错误状态。
从安全角度来看,性能分析器比远程实时调试问题少,因为它们不需要从您的计算机直接连接到集群。 相反,分析器运行并生成报告,您可以离线查看。 性能分析仍然会带来性能开销,因此在决定是否启用时要小心。
要在执行器上启用 Python 内存分析,您可以更改启动命令以添加前缀mprof run -E --include-children, -o memory_profile.dat --python
。 然后,您可以收集memory_profile
并在您的计算机上用matplotlib
绘制它们,以查看是否有什么异常。
类似地,您可以通过在启动命令中用echo "from ray.scripts.scripts import main; main()" > launch.py; python -m cProfile -o stats launch.py
替换ray start
来在ray execute
中启用函数分析。 这比使用mprof
略微复杂,因为默认的 Ray 启动脚本与cProfile
不兼容,因此您需要创建一个不同的入口点—但在概念上是等效的。
警告
用于基于注释的分析的line_profiler
包与 Ray 不兼容,因此您必须使用整体程序分析。
Ray 和容器退出代码
退出代码是程序退出时设置的数字代码,除了 0 以外的任何值通常表示失败。 这些代码(按照惯例)通常有意义,但不是 100%一致。 以下是一些常见的退出代码:
0
成功(但通常误报,特别是在 shell 脚本中)
1
通用错误
127
命令未找到(在 shell 脚本中)
130
用户终止(Ctrl-C 或 kill)
137
Out-of-memory error 或 kill -9(强制终止,不可忽略)
139
段错误(通常是本地代码中的空指针解引用)
您可以使用echo $?
打印出上次运行命令的退出代码。 在运行严格模式脚本(如某些 Ray 启动脚本)时,您可以打印出退出代码,同时传播错误[raycommand] || (error=$?; echo $error; exit $error)
。²
Ray 日志
Ray 的日志与许多其他分布式应用程序的日志行为不同。 由于 Ray 倾向于在容器启动之后在容器上启动工作进程,³ 与容器关联的 stdout 和 stderr 通常不包含您需要的调试信息。 相反,您可以通过查找 Ray 在头节点上创建符号链接到的最新会话目录*/tmp/ray/session_latest*来访问工作容器日志。
容器错误
调试容器错误可能特别具有挑战性,因为到目前为止探索的许多标准调试技术都有挑战。 这些错误可以从常见的错误(如 OOM 错误)到更神秘的错误。 很难区分容器错误或退出的原因,因为容器退出有时会删除日志。
在 Kubernetes 上,有时可以通过在日志请求中添加-p
来获取已经退出的容器的日志(例如,kubectl logs -p
)。你还可以配置terminationMessagePath
来指向包含有关终止退出信息的文件。如果你的 Ray worker 退出,定制 Ray 容器启动脚本以增加更多日志记录可能是有意义的。常见的附加日志类型包括从syslog或dmesg中获取的最后几行(查找 OOMs),将其记录到稍后可以用于调试的文件位置。
最常见的容器错误之一,本地内存泄漏,可能很难调试。像Valgrind这样的工具有时可以追踪本地内存泄漏。使用类似 Valgrind 的工具的详细信息超出了本书的范围,请查看Python Valgrind 文档。你可能想尝试的另一个“技巧”是有效地二分你的代码;因为本地内存泄漏最常发生在库调用中,你可以尝试注释它们并运行测试,看看哪个库调用是泄漏的来源。
本地错误
本地错误和核心转储与容器错误具有相同的调试挑战。由于这些类型的错误通常导致容器退出,因此访问调试信息变得更加困难。这个问题的“快速”解决方案是在 Ray 启动脚本中(失败时)添加sleep
,以便你可以连接到容器(例如,[raylaunchcommand] || sleep 100000
)并使用本地调试工具。
然而,在许多生产环境中,访问容器的内部可能比表面看起来更容易。出于安全原因,你可能无法获得远程访问(例如,在 Kubernetes 上使用kubectl exec
)。如果是这种情况,你可以(有时)向容器规范添加关闭脚本,将核心文件复制到容器关闭后仍然存在的位置(例如,s3、HDFS 或 NFS)。
结论
在 Ray 中,启动调试工具可能需要更多工作,如果可能的话,Ray 的本地模式是远程调试的一个很好的选择。你可以利用 Ray 的 actors 来使远程函数调度更可预测,这样更容易知道在哪里附加你的调试工具。并非所有错误都是平等的,一些错误,比如本地代码中的分段错误,尤其难以调试。祝你找到 Bug(s)!我们相信你。
¹ Ray 有ray attach
命令来创建到主节点的 SSH 连接;但并非所有主节点都会有 SSH 服务器。在 Ray on Kubernetes 上,你可以通过运行kubectl exec -it -n [rayns] [podname] – /bin/bash
来到达主节点。每个集群管理器在此处略有不同,因此你可能需要查阅你的集群管理器的文档。
² 这个更改的确切配置位置取决于所使用的集群管理器。对于在 Kube 上使用自动缩放器的 Ray,你可以修改 workerStartRayCommands
。对于在 AWS 上的 Ray,修改 worker_start_ray_commands
,等等。
³ 这可以通过 ssh
或 kubectl exec
完成。
作者:绝不原创的飞龙