使用telegraf采集物联网终端设备信息

背景

低功耗设备上资源有限,但又比较重要。对其的管理难度很大,有些时候又必须时刻了解其运行状况。我们自然想到的是能否有办法监控它呢?当时是有的!而且很成熟的解决方案。TICK技术栈,那TICK是什么呢?
TICK是由InfluxData开源的监控技术栈,由 Telegraf, InfluxDB, Chronograf, Kapacitor 四个工具的首字母组成。

  • Telegraf:go语言开发的数据采集工具;
  • InfluxDB:go语言开发的时序数据库;
  • Chronograf:数据可视化报表展示;
  • Kapacitor:时序数据的监控告警;

    以上只是做个了解,不在本文重点。而且每个人遇到的工作场景不同,这样的架构也不一定适合自己。取其所需整合到自己的系统中,可以完美的满足业务需求最好。当然了解这个解决方案也是必须的。自己去花时间吧。
  • 问题

    在我的尝试中使用telegraf+influxdb+业务系统去跟踪并管理终端设备,通过交叉编译环境–现场设备–数据采集–数据入库–设备管理等正式环境运行后,欣喜的发现数据上来了。但是没过多久telegraf数据时断时续,最终挂掉而导致数据出现中断。个人模糊的感觉可能是入库时出现了什么我不知道的问题。通过抓包分析发现,其在对influxdb建立网络连接时网络不稳定,主程序进程占用的内存也多,多方面因数导致数据不能及时发出去最终撑爆了telegraf。也有一种通过限制telegraf连接失败时Telegraf将尝试重新连接InfluxDB的最大次数,influx_max_retry_interval参数指定了重试连接的最大时间间隔。但这样不能解决我数据丢失的问题呀。

    架构图
    项目摘取部分架构

    解决

    针对上面的问题决定替换直接连接influxdb的想法,利用终端本身与mqtt交互的方式,将数据回传到系统中。于是有了上面的截图。再一番修改后,数据到了server时又出现了问题。telegraf采集时,上报到mqtt过程中设置了数据格式为“json”,想着通过server转手时再做点自动分析处理的工作。然后再将数据直接入库!想法是好的,也是可行的。就是数据格式中出现了integer与float的交替出现导致了influxdb入库失败。查阅资料时发现可以直接配置数据的类型为influx,Telegraf数据格式配置

    于是我修改配置如下:

    root@lm70:/tffs0a# cat telegraf.conf
    ## 配置在采集终端的telegraf.conf,注意配置信息的正确性
    
    # Telegraf Configuration
    # Global tags can be specified here in key="value" format.
    [global_tags]
      device="3"
    # Configuration for telegraf agent
    [agent]
      interval = "10s"
      round_interval = true
      metric_batch_size = 1000
      metric_buffer_limit = 10000
      collection_jitter = "0s"
      flush_interval = "10s"
      flush_jitter = "0s"
      precision = ""
      debug = false
      quiet = false
      hostname = ""
      omit_hostname = false
    
    # CPU input plugin
    [[inputs.cpu]]
      percpu = true
      totalcpu = true
      collect_cpu_time = false
      report_active = false
    # Memory input plugin
    [[inputs.mem]]
    # Disk input plugin
    [[inputs.disk]]
      ignore_fs = ["tmpfs", "devtmpfs", "devfs"]
    # System input plugin (for boot time)
    [[inputs.system]]
    # Net input plugin (for network traffic)
    [[inputs.net]]
    
    # 配置influxdb信息,出现数据中断的,后改为下面的mqtt方式接收数据
    # InfluxDB v2 output plugin
    #[[outputs.influxdb_v2]]
    #  urls = ["http://remote_ip:port"]
    #  token = "token"
    #  organization = "org"
    #  bucket = "bucket"
    
    # 配置mqtt的数据接收方式,第一次选择了data_format="json"的格式入库,出现了数据类型不匹配的情况,
    # 当然可以自行处理,如果不想处理也是可以直接设置data_format=“influx”这种方式去规避
    [[outputs.mqtt]]
      servers = ["tcp://remote_ip:port"]
      topic = "topic"
      qos = 1
      username = "admin"
      password = "passwd"
      data_format = "influx"
    

    通过这么配置后,就能在server中直接转存到infludb中即可

    # 对着上图的Server中运行的python代码片段
    
    import paho.mqtt.client as mqtt
    from influxdb_client import InfluxDBClient
    from concurrent.futures import ThreadPoolExecutor
    from influxdb_client.client.write_api import SYNCHRONOUS
    
    # 线程池
    executor = ThreadPoolExecutor(max_workers=20)
    # Influxdb连接信息,此处版本2.x
    client = InfluxDBClient(url=ENV_INFLUX_URL, token=ENV_INFLUX_TOKEN, org=ENV_INFLUX_ORG, timezone='Asia/Shanghai')
    write_api = client.write_api(write_options=SYNCHRONOUS)
    
    def save_data(bytes_array):
    	try:
            print(bytes_array)
            write_api.write("bucket", 'org', bytes_array)
            
            # 你可以对数据在做点什么也不是不可以....
            # dosomething...
            
        except Exception as e:
            print(f"监控数据入库失败...{e}")
    
    def on_connect(client, userdata, flags, rc):
        """订阅设备监控数据的topic"""
        print("MQTT连接成功")
        client.subscribe(ENV_MQTT_MT)
      
    def on_message(client, userdata, msg):
        if "mt_device" in msg.topic:
            executor.submit(save_data, msg.payload)
      	else:
      		pass
    
    if __name__ == '__main__':
    	# mqtt配置信息根据自己的配置
        client = mqtt.Client()
        client.on_connect = on_connect
        client.on_message = on_message
        client.connect(ENV_MQTT_IP, int(ENV_MQTT_PORT), 60)
        client.loop_forever()
    

    官方链接:
    https://github.com/influxdata/telegraf/tree/master/plugins

            到这里我遇到的问题就解决了,而且没有再出现断断续续的情况,一直很稳定。这就不禁让我感到疑惑。同样的网络环境下为什么这种方式可以安全平稳的运行,而直接配置influxdb数据库就会出现问题呢?

    讨论

    如果你有好的解决办法,请在评论区告诉我。

    作者:王大惑

    物联沃分享整理
    物联沃-IOTWORD物联网 » 使用telegraf采集物联网终端设备信息

    发表回复