Python中PySpark的应用实战
目录
一、前言
二、pyspark库的安装
三、构建pyspark执行环境入口对象
四、数据输入
4.1 RDD对象
4.2 python数据容器转RDD对象
五、数据计算
5.1 map方法
5.2 flatMap方法
5.3 reduceByKey算子
5.4 练习案例1
5.5 Filter方法
5.6 distinct算子
5.7 sortBy算子
5.8 练习案例2
六、数据输出
6.1 输出为python对象
1 collect算子
2 reduce算子
3 take算子
6.2 输出到文件中
1. saveAsTextFile算子
一、前言
Spark是一个开源的大数据处理框架,它以其高速、易用性和对复杂分析的支持而闻名。以下是Spark的一些关键特点和组件:
- 高速性能:Spark能够快速进行数据处理和分析,特别是它可以将数据缓存在内存中,从而加快处理速度。
- 易用性:Spark支持多种编程语言,包括Scala、Java、Python和R,这使得它对于不同背景的开发者都很容易上手。
- 多样性支持:Spark不仅可以处理批量数据,还能处理实时数据流,支持各种数据源和存储格式。
- 组件丰富:Spark包含多个组件,如Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图计算库)。
- 与Hadoop的关系:Spark可以运行在Hadoop集群上,并与Hadoop的HDFS和YARN集成,但它的处理速度通常比MapReduce快很多。
- 执行流程:Spark的执行流程涉及将数据转化为弹性分布式数据集(RDDs),然后通过转换和动作来进行处理。
- 性能调优:Spark提供了丰富的配置选项来优化其性能,包括内存管理、分区策略和任务调度等。
- 社区和支持:Spark有一个活跃的社区,提供了大量的文档、教程和案例,帮助开发者学习和使用Spark。
- 版本更新:随着技术的发展,Spark也在不断更新,引入新的功能和改进,例如Spark 3.0带来了许多新特性和性能提升。
- 教育资源:有许多在线课程和视频教程可以帮助初学者从零开始学习Spark,涵盖环境搭建、核心概念、流处理、SQL、结构化流处理、综合案例等内容。
Spark是一个强大的工具,适用于大数据处理和分析,无论是学术研究还是工业应用,都能找到其用武之地。
二、pyspark库的安装
pip install pyspark
三、构建pyspark执行环境入口对象
想要使用pyspark库完成数据处理,首先需要构建一个执行环境入口环境。
pyspark的执行环境入口对象是:类SparkContext的类对象
"""
演示获取pyspark的执行环境入库对象:SparkContext
并通过SparkContext对象获取当前pyspark的版本
"""
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印pyspark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止pyspark程序)
sc.stop()
四、数据输入
4.1 RDD对象
RDD(Resilient Distributed Datasets,弹性分布式数据集)是Apache Spark中的核心概念,它是一个不可变的分布式对象集合。
pyspark支持多种数据的输入,再输入完成后,都会得到一个RDD对象,pyspark针对数据的处理,都是以RDD对象作为载体,即:
4.2 python数据容器转RDD对象
pyspark支持通过SparkContext对象的parallelize成员方法,将
转换为pyspark的RDD对象
(字符串会被拆分出1个个的字符,存入RDD对象;字典仅有key会被存入RDD对象)
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 通过parallelize方法将python对象加载到spark内,成为RDD对象
add1 = sc.parallelize([1, 2, 3, 4, 5])
add2 = sc.parallelize((1, 2, 3, 4, 5))
add3 = sc.parallelize("abcdefg")
add4 = sc.parallelize({1, 2, 3, 4, 5})
add5 = sc.parallelize({"key1":"value1", "key2":"value2"})
# 如果要查看RDD里面有什么内容,需要用collect()方法
print(add1.collect())
print(add2.collect())
print(add3.collect())
print(add4.collect())
print(add5.collect())
sc.stop()
结果如下:
通过读取文件转RDD对象
先准备好一个hello.txt文件,内容为:
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 通过textFile方法,读取文件按数据加载到spark内,成为RDD对象
add = sc.textFile("D:/pydaima/8day速成python/shuju/hello.txt")
print(add.collect())
sc.stop()
运行结果:
五、数据计算
5.1 map方法
功能:map算子,是将RDD的数据一条条处理(处理的逻辑基于map算子中接受的处理函数),返回新的RDD
语法:
"""
演示RDD的map方法的使用
"""
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个RDD
add = sc.parallelize([1, 2, 3, 4, 5])
# 通过map方法将全部数据都乘以10
def func(data):
return data * 10
add2 = add.map(lambda x: x * 10)
print(add2.collect())
sc.stop()
5.2 flatMap方法
功能:对add执行map操作,然后进行解除嵌套操作。
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个RDD
add = sc.parallelize(["python student 588", "python 588 588", "process finished"])
# 需求:将RDD数据里面的一个个单词提取出来
add2 = add.flatMap(lambda x: x.split(' '))
print(add2.collect())
5.3 reduceByKey算子
功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据的聚合操作。
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个RDD
add = sc.parallelize([('男', 99), ('女', 99), ('女', 912), ('男', 99)])
# 求男生和女生两个组的成绩之和
add2 = add.reduceByKey(lambda a, b: a + b)
print(add2.collect())
5.4 练习案例1
使用学习到的内容,完成:
先准备一个文件,文件内容如下:
# 1.统计执行环境入口对象
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 2.读取数据文件
add = sc.textFile("D:/pydaima/8day速成python/shuju/wordcount.txt")
# 3.取出全部单词
word_add = add.flatMap(lambda x: x.split(' '))
# 4.将所有单词都转换为二元元组,单词为key,value设置为1
add2 = word_add.map(lambda word: (word, 1))
# 5.分组并求和
result_add = add2.reduceByKey(lambda a, b: a + b)
# 6.打印输出结果
print(result_add.collect())
运行结果:
5.5 Filter方法
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 对RDD的数据进行过滤
rdd2 = rdd.filter(lambda num: num % 2 == 0)
print(rdd2.collect())
运行示例:
5.6 distinct算子
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([1, 1, 3, 3, 5, 5, 7, 8, 8, 9])
# 对RDD的数据进行去重
rdd2 = rdd.distinct()
print(rdd2.collect())
5.7 sortBy算子
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 2.读取数据文件
add = sc.textFile("D:/pydaima/8day速成python/shuju/wordcount.txt")
# 3.取出全部单词
word_add = add.flatMap(lambda x: x.split(' '))
# 4.将所有单词都转换为二元元组,单词为key,value设置为1
add2 = word_add.map(lambda word: (word, 1))
# 5.分组并求和
result_add = add2.reduceByKey(lambda a, b: a + b)
# 以上使用综合案例1的代码进行补充
# 对结果进行排序
final_rdd = result_add.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print(final_rdd.collect())
5.8 练习案例2
对数据文件进行读取,将数据按照各个城市销售额排名,从大到小;全部城市有哪些商品类别在售卖;北京市有哪些商品类别在售卖。
import json
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 需求1:城市销售额排名
# 1.1 读取文件得到RDD
file_rdd = sc.textFile("D:/pydaima/8day速成python/shuju/orders.txt")
# 1.2 取出一个个json字符串
json_str_rdd = file_rdd.flatMap(lambda x: x.split('|'))
# 1.3 将一个个json字符串转为字典
dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
# 1.4 取出城市和销售额数据 (城市,销售额)
city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))
# 1.5 按城市分组销售额聚合
city_result_rdd = city_with_money_rdd.reduceByKey(lambda a, b: a + b)
# 1.6 按照销售额聚合结果进行排序
result1_rdd = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print("需求1的结果:", result1_rdd.collect())
# 需求2:全部城市有哪些商品类别在售卖
# 2.1 取出全部的商品类别, 并去重
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2的结果:", category_rdd.collect())
# 需求3: 北京市有哪些商品类别在售卖
# 3.1 过滤北京的数
bj_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')
# 3.2 取出商品类别,并去重
result3_rdd = bj_data_rdd.map(lambda x: x['category']).distinct()
print("需求3的结果:", result3_rdd.collect())
六、数据输出
6.1 输出为python对象
1 collect算子
2 reduce算子
3 take算子
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# collect算子,输出为list对象
rdd_list = rdd.collect()
print(rdd_list)
print(type(rdd_list))
# reduce算子,输出RDD为list对象
num = rdd.reduce(lambda a, b: a + b)
print(num)
# take算子,取出RDD的前N个元素,组成list返回
take_list = rdd.take(3)
print(f"前3个元素是:{take_list}")
# count算子,统计RDD内有多少条数据,返回值为数字
num_count = rdd.count()
print(f"rdd内有{num_count}个元素")
sc.stop()
6.2 输出到文件中
1. saveAsTextFile算子
使用saveAsTextFile需要配置相关环境,调用保存文件的算子,配置Hadoop依赖。
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe" # 需要指向python解释器
os.environ['HADOOP-HOME'] = "D:/hadoop-py/hadoop-3.0.0"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 设置全局并型度为1
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)
# 准备RDD1
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
# 准备RDD2
rdd2 = sc.parallelize([('hello', 3), ('Spark', 5), ('Hi', 7)])
# 准备RDD3
rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]])
# 输出到文件中
rdd1.saveAsTextFile("D:/pydaima/8day速成python/shuju/output1.txt")
rdd2.saveAsTextFile("D:/pydaima/8day速成python/shuju/output2.txt")
rdd3.saveAsTextFile("D:/pydaima/8day速成python/shuju/output3.txt")
作者:星星法术嗲人