Python中PySpark的应用实战

目录

一、前言

二、pyspark库的安装

三、构建pyspark执行环境入口对象

四、数据输入

4.1  RDD对象

4.2  python数据容器转RDD对象

五、数据计算

5.1  map方法

5.2 flatMap方法

5.3  reduceByKey算子

5.4 练习案例1

5.5  Filter方法

5.6  distinct算子

5.7 sortBy算子

5.8  练习案例2

六、数据输出

6.1  输出为python对象

1  collect算子

2 reduce算子

3  take算子

6.2  输出到文件中

1. saveAsTextFile算子


一、前言

Spark是一个开源的大数据处理框架,它以其高速、易用性和对复杂分析的支持而闻名。以下是Spark的一些关键特点和组件:

  1. 高速性能:Spark能够快速进行数据处理和分析,特别是它可以将数据缓存在内存中,从而加快处理速度。
  2. 易用性:Spark支持多种编程语言,包括Scala、Java、Python和R,这使得它对于不同背景的开发者都很容易上手。
  3. 多样性支持:Spark不仅可以处理批量数据,还能处理实时数据流,支持各种数据源和存储格式。
  4. 组件丰富:Spark包含多个组件,如Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图计算库)。
  5. 与Hadoop的关系:Spark可以运行在Hadoop集群上,并与Hadoop的HDFS和YARN集成,但它的处理速度通常比MapReduce快很多。
  6. 执行流程:Spark的执行流程涉及将数据转化为弹性分布式数据集(RDDs),然后通过转换和动作来进行处理。
  7. 性能调优:Spark提供了丰富的配置选项来优化其性能,包括内存管理、分区策略和任务调度等。
  8. 社区和支持:Spark有一个活跃的社区,提供了大量的文档、教程和案例,帮助开发者学习和使用Spark。
  9. 版本更新:随着技术的发展,Spark也在不断更新,引入新的功能和改进,例如Spark 3.0带来了许多新特性和性能提升。
  10. 教育资源:有许多在线课程和视频教程可以帮助初学者从零开始学习Spark,涵盖环境搭建、核心概念、流处理、SQL、结构化流处理、综合案例等内容。

Spark是一个强大的工具,适用于大数据处理和分析,无论是学术研究还是工业应用,都能找到其用武之地。

二、pyspark库的安装

pip install pyspark

三、构建pyspark执行环境入口对象

想要使用pyspark库完成数据处理,首先需要构建一个执行环境入口环境。

pyspark的执行环境入口对象是:类SparkContext的类对象

"""
演示获取pyspark的执行环境入库对象:SparkContext
并通过SparkContext对象获取当前pyspark的版本
"""
# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印pyspark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止pyspark程序)
sc.stop()

四、数据输入

4.1  RDD对象

RDD(Resilient Distributed Datasets,弹性分布式数据集)是Apache Spark中的核心概念,它是一个不可变的分布式对象集合

pyspark支持多种数据的输入,再输入完成后,都会得到一个RDD对象,pyspark针对数据的处理,都是以RDD对象作为载体,即:

  • 数据存储在RDD内
  • 各类数据的计算方法,也都是RDD的成员方法
  • RDD的数据计算方法,返回值依旧是RDD对象
  • 4.2  python数据容器转RDD对象

    pyspark支持通过SparkContext对象的parallelize成员方法,将

  • list
  • tuple
  • set
  • dict
  • str
  • 转换为pyspark的RDD对象

    (字符串会被拆分出1个个的字符,存入RDD对象;字典仅有key会被存入RDD对象)

    from pyspark import SparkConf,SparkContext
    
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    # 通过parallelize方法将python对象加载到spark内,成为RDD对象
    add1 = sc.parallelize([1, 2, 3, 4, 5])
    add2 = sc.parallelize((1, 2, 3, 4, 5))
    add3 = sc.parallelize("abcdefg")
    add4 = sc.parallelize({1, 2, 3, 4, 5})
    add5 = sc.parallelize({"key1":"value1", "key2":"value2"})
    
    # 如果要查看RDD里面有什么内容,需要用collect()方法
    print(add1.collect())
    print(add2.collect())
    print(add3.collect())
    print(add4.collect())
    print(add5.collect())
    
    sc.stop()

    结果如下:

    通过读取文件转RDD对象

    先准备好一个hello.txt文件,内容为:

    from pyspark import SparkConf,SparkContext
    
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    
    # 通过textFile方法,读取文件按数据加载到spark内,成为RDD对象
    add = sc.textFile("D:/pydaima/8day速成python/shuju/hello.txt")
    print(add.collect())
    
    sc.stop()

    运行结果:

    五、数据计算

    5.1  map方法

    功能:map算子,是将RDD的数据一条条处理(处理的逻辑基于map算子中接受的处理函数),返回新的RDD

    语法:

    
    """
    演示RDD的map方法的使用
    """
    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe"   # 需要指向python解释器
    
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    # 准备一个RDD
    add = sc.parallelize([1, 2, 3, 4, 5])
    
    # 通过map方法将全部数据都乘以10
    def func(data):
        return data * 10
    
    add2 = add.map(lambda x: x * 10)
    print(add2.collect())
    
    sc.stop()

    5.2 flatMap方法

    功能:对add执行map操作,然后进行解除嵌套操作。

    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe"   # 需要指向python解释器
    
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    # 准备一个RDD
    add = sc.parallelize(["python student 588", "python 588 588", "process finished"])
    
    # 需求:将RDD数据里面的一个个单词提取出来
    add2 = add.flatMap(lambda x: x.split(' '))
    print(add2.collect())

    5.3  reduceByKey算子

    功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据的聚合操作。

    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe"   # 需要指向python解释器
    
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    # 准备一个RDD
    add = sc.parallelize([('男', 99), ('女', 99), ('女', 912), ('男', 99)])
    
    # 求男生和女生两个组的成绩之和
    add2 = add.reduceByKey(lambda a, b: a + b)
    print(add2.collect())

    5.4 练习案例1

    使用学习到的内容,完成:

  • 读取文件
  • 统计文件内,单词的出现数量
  • 先准备一个文件,文件内容如下:

    # 1.统计执行环境入口对象
    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe"   # 需要指向python解释器
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    # 2.读取数据文件
    add = sc.textFile("D:/pydaima/8day速成python/shuju/wordcount.txt")
    # 3.取出全部单词
    word_add = add.flatMap(lambda x: x.split(' '))
    # 4.将所有单词都转换为二元元组,单词为key,value设置为1
    add2 = word_add.map(lambda word: (word, 1))
    # 5.分组并求和
    result_add = add2.reduceByKey(lambda a, b: a + b)
    # 6.打印输出结果
    print(result_add.collect())

    运行结果:

    5.5  Filter方法

    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe"   # 需要指向python解释器
    
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    
    # 准备一个RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5])
    # 对RDD的数据进行过滤
    rdd2 = rdd.filter(lambda num: num % 2 == 0)
    print(rdd2.collect())

    运行示例:

    5.6  distinct算子

    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe"   # 需要指向python解释器
    
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    # 准备一个RDD
    rdd = sc.parallelize([1, 1, 3, 3, 5, 5, 7, 8, 8, 9])
    # 对RDD的数据进行去重
    rdd2 = rdd.distinct()
    print(rdd2.collect())

    5.7 sortBy算子

    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe"   # 需要指向python解释器
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    # 2.读取数据文件
    add = sc.textFile("D:/pydaima/8day速成python/shuju/wordcount.txt")
    # 3.取出全部单词
    word_add = add.flatMap(lambda x: x.split(' '))
    # 4.将所有单词都转换为二元元组,单词为key,value设置为1
    add2 = word_add.map(lambda word: (word, 1))
    # 5.分组并求和
    result_add = add2.reduceByKey(lambda a, b: a + b)
    # 以上使用综合案例1的代码进行补充
    
    # 对结果进行排序
    final_rdd = result_add.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
    
    print(final_rdd.collect())

    5.8  练习案例2

    对数据文件进行读取,将数据按照各个城市销售额排名,从大到小;全部城市有哪些商品类别在售卖;北京市有哪些商品类别在售卖。

    import json
    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe"   # 需要指向python解释器
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    # 需求1:城市销售额排名
    # 1.1 读取文件得到RDD
    file_rdd = sc.textFile("D:/pydaima/8day速成python/shuju/orders.txt")
    # 1.2 取出一个个json字符串
    json_str_rdd = file_rdd.flatMap(lambda x: x.split('|'))
    # 1.3 将一个个json字符串转为字典
    dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
    # 1.4 取出城市和销售额数据    (城市,销售额)
    city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))
    # 1.5 按城市分组销售额聚合
    city_result_rdd = city_with_money_rdd.reduceByKey(lambda a, b: a + b)
    # 1.6 按照销售额聚合结果进行排序
    result1_rdd = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
    print("需求1的结果:", result1_rdd.collect())
    
    # 需求2:全部城市有哪些商品类别在售卖
    # 2.1 取出全部的商品类别, 并去重
    category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
    print("需求2的结果:", category_rdd.collect())
    
    # 需求3: 北京市有哪些商品类别在售卖
    # 3.1 过滤北京的数
    bj_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')
    # 3.2  取出商品类别,并去重
    result3_rdd = bj_data_rdd.map(lambda x: x['category']).distinct()
    print("需求3的结果:", result3_rdd.collect())

    六、数据输出

    6.1  输出为python对象

    1  collect算子

    2 reduce算子

    3  take算子

    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe"   # 需要指向python解释器
    
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    sc = SparkContext(conf=conf)
    
    # 准备RDD
    rdd = sc.parallelize([1, 2, 3, 4, 5])
    
    # collect算子,输出为list对象
    rdd_list = rdd.collect()
    print(rdd_list)
    print(type(rdd_list))
    
    # reduce算子,输出RDD为list对象
    num = rdd.reduce(lambda a, b: a + b)
    print(num)
    
    # take算子,取出RDD的前N个元素,组成list返回
    take_list = rdd.take(3)
    print(f"前3个元素是:{take_list}")
    
    # count算子,统计RDD内有多少条数据,返回值为数字
    num_count = rdd.count()
    print(f"rdd内有{num_count}个元素")
    
    sc.stop()

    6.2  输出到文件中

    1. saveAsTextFile算子

    使用saveAsTextFile需要配置相关环境,调用保存文件的算子,配置Hadoop依赖。

    from pyspark import SparkConf,SparkContext
    import os
    os.environ['PYSPARK_PYTHON'] = "C:/Program Files/Python39/python.exe"   # 需要指向python解释器
    os.environ['HADOOP-HOME'] = "D:/hadoop-py/hadoop-3.0.0"
    
    conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
    # 设置全局并型度为1
    conf.set("spark.default.parallelism", "1")
    sc = SparkContext(conf=conf)
    
    # 准备RDD1
    rdd1 = sc.parallelize([1, 2, 3, 4, 5])
    
    # 准备RDD2
    rdd2 = sc.parallelize([('hello', 3), ('Spark', 5), ('Hi', 7)])
    
    # 准备RDD3
    rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]])
    
    # 输出到文件中
    rdd1.saveAsTextFile("D:/pydaima/8day速成python/shuju/output1.txt")
    rdd2.saveAsTextFile("D:/pydaima/8day速成python/shuju/output2.txt")
    rdd3.saveAsTextFile("D:/pydaima/8day速成python/shuju/output3.txt")
    

    作者:星星法术嗲人

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python中PySpark的应用实战

    发表回复