【Python】大数据处理利器:Dask与PySpark实战解析
Python大数据处理:Dask、PySpark
1. Dask 库
1.1 安装
Dask 可以通过多种方式安装,最常见的是使用 pip
包管理器。
# 安装基本的 Dask 库
pip install dask
# 安装包含常用扩展的完整版本,如用于数据处理、机器学习等的扩展
pip install dask[complete]
此外,还可以通过 conda
安装:
conda install -c conda-forge dask
1.2 基本概念
@delayed
装饰器标记函数,然后可以将这些函数组合起来形成一个计算图,Dask 调度器会根据这个计算图来并行执行任务。1.3 示例代码
import dask.array as da
import dask.dataframe as dd
from dask import delayed
# Dask 数组示例
# 创建一个形状为 (10000, 10000) 的 Dask 数组,每个小块大小为 (1000, 1000)
arr = da.random.normal(size=(10000, 10000), chunks=(1000, 1000))
# 计算每列的均值
col_means = arr.mean(axis=0).compute()
print("Dask 数组每列均值:", col_means)
# Dask DataFrame 示例
# 读取一个大型 CSV 文件(假设文件较大,无法一次性读入内存)
df = dd.read_csv('large_file.csv')
# 筛选出某列值大于特定值的行
filtered_df = df[df['column_name'] > 100]
# 按某列进行分组并计算每组的数量
grouped_result = filtered_df.groupby('category_column').size().compute()
print("Dask DataFrame 分组结果:", grouped_result)
# Dask Delayed 示例
@delayed
def multiply(a, b):
return a * b
@delayed
def add(a, b):
return a + b
x = 5
y = 10
z = 15
# 构建计算图
result_delayed = add(multiply(x, y), multiply(y, z))
# 执行计算
print("Dask Delayed 计算结果:", result_delayed.compute())
1.4 Dask 的高级特性
dask.distributed
模块实现。可以创建集群对象,将任务提交到集群中并行执行。from dask.distributed import Client, LocalCluster
# 创建本地集群
cluster = LocalCluster()
# 创建客户端连接到集群
client = Client(cluster)
# 提交 Dask 任务到集群执行
# 例如,提交一个 Dask 数组的计算任务
arr = da.random.normal(size=(10000, 10000), chunks=(1000, 1000))
result = arr.mean(axis=0).compute()
# 关闭客户端和集群
client.close()
cluster.close()
2. PySpark 库
2.1 安装
使用 pip
安装 PySpark:
pip install pyspark
也可以使用 conda
安装:
conda install -c conda-forge pyspark
此外,还需要下载并配置 Apache Spark 运行环境。
2.2 基本概念
SparkSession.builder
来构建一个 SparkSession
对象,并设置应用名称、主节点等参数。map
、filter
等;行动操作返回一个具体的结果,如 collect
、count
等。2.3 示例代码
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# 创建 SparkSession
spark = SparkSession.builder \
.appName("PySpark Example") \
.master("local[*]") # 使用本地所有可用核心
.getOrCreate()
# 创建 DataFrame
data = [("Alice", 25, "Female"), ("Bob", 30, "Male"), ("Charlie", 35, "Male")]
df = spark.createDataFrame(data, ["name", "age", "gender"])
# 数据操作
# 筛选出年龄大于 28 的行
filtered_df = df.filter(col("age") > 28)
# 按性别分组并计算每组的平均年龄
grouped_result = filtered_df.groupBy("gender").avg("age")
grouped_result.show()
# SQL 查询示例
df.createOrReplaceTempView("people")
sql_result = spark.sql("SELECT gender, AVG(age) AS avg_age FROM people WHERE age > 28 GROUP BY gender")
sql_result.show()
# RDD 示例
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
# 对 RDD 进行转换操作
mapped_rdd = rdd.map(lambda x: x * 2)
# 执行行动操作
print("RDD 结果:", mapped_rdd.collect())
# 停止 SparkSession
spark.stop()
2.4 PySpark 的高级特性
cache()
方法将数据缓存到内存中。df = spark.read.csv('large_file.csv', header=True, inferSchema=True)
df.cache() # 缓存数据到内存
# 进行一些计算操作
result = df.groupBy('column').count()
result.show()
StreamingContext
或 SparkSession.readStream
可以创建流数据源,并进行实时的数据处理和分析。3. Dask 与 PySpark 的比较
综上所述,Dask 和 PySpark 各有优势,在实际应用中需要根据具体的需求和场景来选择合适的工具。
作者:浪子西科