摸鱼大数据——Spark Structured Steaming——物联网数据分析案例

1、数据模拟器代码

  • 1- 创建一个topic, 放置后续物联网的数据 search-log-topic

  •  ./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --create --topic search-log-topic --partitions 3 --replication-factor 2
  • 2- 将代码放置到项目中:

  • import json
    import random
    import sys
    import time
    import os
    from kafka import KafkaProducer
    from kafka.errors import KafkaError

    # 锁定远端操作环境, 避免存在多个版本环境的问题
    os.environ['SPARK_HOME'] = '/export/server/spark'
    os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/bin/python"
    os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/bin/python"

    # 快捷键:  main 回车
    if __name__ == '__main__':
        print("模拟物联网数据")

        # 1- 构建一个kafka的生产者:
        producer = KafkaProducer(
            bootstrap_servers=['node1:9092', 'node2:9092', 'node3:9092'],
            acks='all',
            value_serializer=lambda m: json.dumps(m).encode("utf-8")
        )

        # 2- 物联网设备类型
        deviceTypes = ["洗衣机", "油烟机", "空调", "窗帘", "灯", "窗户", "煤气报警器", "水表", "燃气表"]

        while True:
            index = random.choice(range(0, len(deviceTypes)))
            deviceID = f'device_{index}_{random.randrange(1, 20)}'  # 设备ID
            deviceType = deviceTypes[index]  # 设备类型
            deviceSignal = random.choice(range(10, 100)) # 设备信号

            # 组装数据集
            print({'deviceID': deviceID, 'deviceType': deviceType, 'deviceSignal': deviceSignal,
                   'time': time.strftime('%s')})

            # 发送数据
            producer.send(topic='search-log-topic',
                          value={'deviceID': deviceID, 'deviceType': deviceType, 'deviceSignal': deviceSignal,
                                           'time': time.strftime('%s')}
            )

            # 间隔时间 5s内随机
            time.sleep(random.choice(range(1, 5)))

  • 测试, 观察是否可以正常生成:

  •  ./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --describe --topic search-log-topic

    2、需求说明

    目前咱们有一个模拟器程序, 可以向Kafka不断的写入数据

    要做的是, 用Spark的结构化流接收数据, 并且对数据进行统计分析操作:

  • 求: 各种信号强度>30各种类型的设备数量 和 它们的平均信号强度

  • 需求分析:

    1- 需要按照设备类型进行分组,也就是维度是设备类型deviceType

    2- 指标

    设备数量:deviceID

    平均信号强度:deviceSignal

    示例数据:

    {'deviceID': 'device_1_1', 'deviceType': '油烟机', 'deviceSignal': 23, 'time': '1668848417'} {'deviceID': 'device_0_4', 'deviceType': '洗衣机', 'deviceSignal': 55, 'time': '1668848418'}

     deviceID: 设备ID
     deviceType: 设备类型
     deviceSignal: 设备信号
     time : 设备发送时间戳

    3、代码实现

     from pyspark import SparkConf, SparkContext
     import os
     from pyspark.sql import SparkSession
     import pyspark.sql.functions as F
     ​
     # 绑定指定的Python解释器
     os.environ['SPARK_HOME'] = '/export/server/spark'
     os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
     os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
     ​
     ​
     def sql():
         # SQL
         # 3.2- 拆解数据结构。将json解析得到单个的字段
         """
             get_json_object(参数1,参数2):用来解析json串。一次只能得到一个字段的值
                 参数1:要解析的json字段名称
                 参数2:字段的解析路径 $.字段路径
         """
         etl_df = spark.sql("""
             select
                 get_json_object(value,'$.deviceID') as deviceID,
                 get_json_object(value,'$.deviceType') as deviceType,
                 get_json_object(value,'$.deviceSignal') as deviceSignal,
                 get_json_object(value,'$.time') as time
             from iot
         """)
         etl_df.createTempView("etl")
     ​
         # 3.3- 各种信号强度>30各种类型的设备数量  和  它们的平均信号强度
         result_df = spark.sql("""
             select
                 deviceType,
                 count(deviceID) as cnt_deviceID,
                 round(avg(deviceSignal),2) as avg_deviceSignal
             from etl
             where deviceSignal>30
             group by deviceType
         """)
     ​
         # 4- 数据输出
         # 5- 启动流式任务
         result_df.writeStream.format('console').outputMode('complete').start().awaitTermination()
     ​
     ​
     def dsl():
         result_df = etl_tmp_df.select(
             F.get_json_object('value', '$.deviceID').alias('deviceID'),
             F.get_json_object('value', '$.deviceType').alias('deviceType'),
             F.get_json_object('value', '$.deviceSignal').alias('deviceSignal'),
             F.get_json_object('value', '$.time').alias('time')
         ).where('deviceSignal>30').groupBy('deviceType').agg(
             F.count('deviceID').alias('cnt_deviceID'),
             F.round(F.avg('deviceSignal'), 2).alias('avg_deviceSignal')
         )
         
         # 4- 数据输出
         # 5- 启动流式任务
         result_df.writeStream.format('console').outputMode('complete').start().awaitTermination()
     ​
     ​
     if __name__ == '__main__':
         # 1- 创建SparkSession对象
         spark = SparkSession.builder\
             .config("spark.sql.shuffle.partitions",2)\
             .appName('iot')\
             .master('local[*]')\
             .getOrCreate()
     ​
         # 2- 数据输入
         init_df = spark.readStream\
             .format("kafka") \
             .option("kafka.bootstrap.servers", "node1.itcast.cn:9092,node2.itcast.cn:9092") \
             .option("subscribe", "search-log-topic") \
             .load()
     ​
         # 3- 数据处理
         # 3.1- 数据ETL:进行数据类型转换,将value字段bytes->字符串
         etl_tmp_df = init_df.selectExpr("cast(value as string) as value")
         etl_tmp_df.createTempView('iot')
     ​
         # SQL
         # sql()
     ​
         # DSL
         dsl()

    运行结果截图:

    结构化流不支持的操作:

  • 多个流同时聚合

  • limit和take不能使用

  • 不能使用去重操作

  • Few types of outer joins on streaming Datasets are not supported. See the support matrix in the Join Operations section for more details.

  • 作者:困了就倒头睡

    物联沃分享整理
    物联沃-IOTWORD物联网 » 摸鱼大数据——Spark Structured Steaming——物联网数据分析案例

    发表回复