Spark算子 – Python(头歌)

头歌Spark算子 – Python

第1关:Transformation – map

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个1到5的列表List
    data = [1, 2, 3, 4, 5]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())

    """
    使用 map 算子,将 rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作,规则如下:
    需求:
        偶数转换成该数的平方
        奇数转换成该数的立方
    """
    # 5.使用 map 算子完成以上需求
    rdd = rdd.map(lambda x: x ** 2 if x % 2 == 0 else x ** 3)

    # 6.使用rdd.collect() 收集完成 map 转换的元素
    print(rdd.collect())

    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

第2关:Transformation – mapPartitions

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

#********** Begin **********#

# 导入 string 模块
import string

#********** End **********#

if __name__ == "__main__":
    #********** Begin **********#
    
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2. 一个内容为("dog", "salmon", "salmon", "rat", "elephant")的列表List
    data = ["dog", "salmon", "salmon", "rat", "elephant"]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())

    """
    使用 mapPartitions 算子,将 rdd 的数据 ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的规则进行转换操作,规则如下:
    需求:
        将字符串与该字符串的长度组合成一个元组,例如:
        dog  -->  (dog,3)
        salmon   -->  (salmon,6)
    """

    # 5.使用 mapPartitions 算子完成以上需求
    rdd = rdd.mapPartitions(lambda partition: [(word, len(word)) for word in partition])

    # 6.使用rdd.collect() 收集完成 mapPartitions 转换的元素
    print(rdd.collect())

    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

第3关:Transformation – filter

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个1到8的列表List
    data = [1, 2, 3, 4, 5, 6, 7, 8]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())

    """
    使用 filter 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的规则进行转换操作,规则如下:
    需求:
        过滤掉rdd中的奇数
    """
    # 5.使用 filter 算子完成以上需求
    rdd = rdd.filter(lambda x: x % 2 == 0)

    # 6.使用rdd.collect() 收集完成 filter 转换的元素
    print(rdd.collect())

    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

第4关:Transformation – flatMap

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表List
    data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())

    """
    使用 flatMap 算子,将 rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作,规则如下:
    需求:
        合并RDD的元素,例如:
                        ([1,2,3],[4,5,6])  -->  (1,2,3,4,5,6)
                        ([2,3],[4,5],[6])  -->  (1,2,3,4,5,6)
    """
    # 5.使用 flatMap 算子完成以上需求
    rdd = rdd.flatMap(lambda x: x)

    # 6.使用rdd.collect() 收集完成 flatMap 转换的元素
    print(rdd.collect())

    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

第5关:Transformation – distinct

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个内容为(1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1)的列表List
    data = [1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())

    """
    使用 distinct 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1) 按照下面的规则进行转换操作,规则如下:
    需求:
        元素去重,例如:
                     1,2,3,3,2,1  --> 1,2,3
                     1,1,1,1,     --> 1
    """
    # 5.使用 distinct 算子完成以上需求
    rdd = rdd.distinct()

    # 6.使用rdd.collect() 收集完成 distinct 转换的元素
    print(rdd.collect())

    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

第6关:Transformation – sortBy

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个内容为(1, 3, 5, 7, 9, 8, 6, 4, 2)的列表List
    data = [1, 3, 5, 7, 9, 8, 6, 4, 2]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())

    """
       使用 sortBy 算子,将 rdd 的数据 (1, 3, 5, 7, 9, 8, 6, 4, 2) 按照下面的规则进行转换操作,规则如下:
       需求:
           元素排序,例如:
            5,4,3,1,2  --> 1,2,3,4,5
       """
    # 5.使用 sortBy 算子完成以上需求
    rdd = rdd.sortBy(lambda x: x)

    # 6.使用rdd.collect() 收集完成 sortBy 转换的元素
    print(rdd.collect())

    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

第7关:Transformation – sortByKey

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个内容为[(B',1),('A',2),('C',3)]的列表List
    data = [('B', 1), ('A', 2), ('C', 3)]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())

    """
       使用 sortByKey 算子,将 rdd 的数据 ('B', 1), ('A', 2), ('C', 3) 按照下面的规则进行转换操作,规则如下:
       需求:
           元素排序,例如:
            [(3,3),(2,2),(1,1)]  -->  [(1,1),(2,2),(3,3)]
       """
    # 5.使用 sortByKey 算子完成以上需求
    rdd = rdd.sortByKey()

    # 6.使用rdd.collect() 收集完成 sortByKey 转换的元素
    print(rdd.collect())

    # 7.停止 SparkContext
    sc.stop()

    # ********** End **********#

第8关:Transformation – mapValues

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个内容为[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表List
    data = [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())

    """
           使用 mapValues 算子,将 rdd 的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的规则进行转换操作,规则如下:
           需求:
               元素(key,value)的value进行以下操作:
                                                偶数转换成该数的平方
                                                奇数转换成该数的立方
    """
    # 5.使用 mapValues 算子完成以上需求
    rdd = rdd.mapValues(lambda x: x ** 2 if x % 2 == 0 else x ** 3)

    # 6.使用rdd.collect() 收集完成 mapValues 转换的元素
    print(rdd.collect())

    # 7.停止 SparkContext
    sc.stop()

    # ********** End **********#

第9关:Transformations – reduceByKey

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个内容为[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表List
    data = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())

    """
          使用 reduceByKey 算子,将 rdd 的数据[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的规则进行转换操作,规则如下:
          需求:
              元素(key-value)的value累加操作,例如:
                                                (1,1),(1,1),(1,2)  --> (1,4)
                                                (1,1),(1,1),(2,2),(2,2)  --> (1,2),(2,4)
    """
    # 5.使用 reduceByKey 算子完成以上需求
    rdd = rdd.reduceByKey(lambda x, y: x + y)

    # 6.使用rdd.collect() 收集完成 reduceByKey 转换的元素
    print(rdd.collect())

    # 7.停止 SparkContext
    sc.stop()

    # ********** End **********#

第10关:Actions – 常用算子

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表List
    data = [1, 3, 5, 7, 9, 8, 6, 4, 2]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.收集rdd的所有元素并print输出
    print(rdd.collect())

    # 5.统计rdd的元素个数并print输出
    print(rdd.count())

    # 6.获取rdd的第一个元素并print输出
    print(rdd.first())

    # 7.获取rdd的前3个元素并print输出
    print(rdd.take(3))

    # 8.聚合rdd的所有元素并print输出
    print(rdd.reduce(lambda x, y: x + y))

    # 9.停止 SparkContext
    sc.stop()

    # ********** End **********#

 

作者:学不好python的小猫

物联沃分享整理
物联沃-IOTWORD物联网 » Spark算子 – Python(头歌)

发表回复