Java 集成物联网时序数据库:IoTDB

引言

        随着物联网(IoT)的快速发展,软件和硬件的交互越发紧密,数据的管理和分析因此变得尤为重要,虽然我们常用的数据库也可以完成这种操作的,但是数据入库时比较复杂,查询时效率低下,所以时序数据库就此诞生了。时序数据库是专门用于存储、查询和分析时间序列数据的数据库系统,今天我们就了解一个由清华大学大数据系统软件团队研发的时序数据库:IoTDB

简介

        Apache IoTDB是一款聚焦工业物联网、高性能轻量级的开源时序数据管理系统,旨在为用户提供数据收集、存储和分析等服务,解决企业组建物联网大数据平台管理时序数据时所遇到的应用场景复杂、数据体量大、采样频率高、数据乱序多、数据处理耗时长、分析需求多样、存储与运维成本高等多种问题

系统架构

特点

  1. 灵活的部署方式
  2. 云端一键部署
  3. 终端解压即用
  4. 终端-云端无缝连接(数据云端同步工具)
  5. 低硬件成本的存储解决方案
  6. 高压缩比的磁盘存储
  7. 目录结构的时间序列组织管理方式
  8. 支持复杂结构的智能网联设备的时间序列组织
  9. 支持大量同类物联网设备的时间序列组织
  10. 可用模糊方式对海量复杂的时间序列目录结构进行检索
  11. 高通量的时间序列数据读写
  12. 支持百万级低功耗强连接设备数据接入
  13. 支持智能网联设备数据高速读写
  14. 以及同时具备上述特点的混合负载
  15. 面向时间序列的丰富查询语义
  16. 跨设备、跨传感器的时间序列时间对齐
  17. 面向时序数据特征的计算
  18. 提供面向时间维度的丰富聚合函数支持
  19. 极低的学习门槛
  20. 支持类 SQL 的数据操作
  21. 提供 JDBC 的编程接口
  22. 完善的导入导出工具
  23. 完美对接开源生态环境
  24. 支持开源数据分析生态系统:Hadoop、Spark
  25. 支持开源可视化工具对接:Grafana
  26. 统一的数据访问模式
  27. 无需进行分库分表处理
  28. 无需区分实时库和历史库
  29. 高可用性支持
  30. 支持HA分布式架构,系统提供7*24小时不间断的实时数据库服务
  31. 应用访问系统,可以连接集群中的任何一个节点进行
  32. 一个物理节点宕机或网络故障,不会影响系统的正常运行
  33. 物理节点的增加、删除或过热,系统会自动进行计算/存储资源的负载均衡处理
  34. 支持异构环境,不同类型、不同性能的服务器可以组建集群,系统根据物理机的配置,自动负载均衡

下载

访问官方下载地址:发行版本 | IoTDB Website

安装

linux系统 – 单机

解压下载的压缩包

upzip apache-iotdb-{version}-all-bin.zip

进入安装目录下的sbin

cd apache-iotdb-{version}-all-bin/sbin

一起或者单个启动ConfigNode和DataNode

#一起启动ConfigNode和DataNode
./start-standalone.sh -d #“-d”参数将在后台进行启动
#单独启动ConfigNode
./start-confignode.sh -d #“-d”参数将在后台进行启动 
#单独启动DataNode
./start-datanode.sh -d #“-d”参数将在后台进行启动 

验证部署

./start-cli.sh -h ip(本机ip或域名) -p 端口号(6667)

出现如下界面即代表部署成功

详情可参考: 单机版部署 | IoTDB Website

数据模型

设备

        物理设备,通常是一组测点的集合,由一到多个标签定位标识

测点

        多个数据点按时间戳递增排列形成的一个时间序列,通常一个测点代表一个采集点位,能够定期采集所在环境的物理量

数据点

        由一个时间戳和一个数值组成,时间戳为 long 类型,数值可以为 BOOLEAN、FLOAT、INT32 等各种类型

时间序列命名

        IoTDB 采用树形结构定义数据模式,以 root 为根节点到叶子节点的路径来命名一个时间序列,层次间以“.”连接,结合数据模型,我们可以这样命名时间序列:root.区域.企业.设备.测点

数据库

        存储规则:一个数据库中的数据会存储在同一批文件夹下,不同数据库的数据会存储在磁盘的不同文件夹下,从而实现物理隔离。一般情况下建议设置一个数据库

        时间序列的任意前缀路径都可设置成数据库,后续在其下新增设备,也将属于该数据库

        我们需要根据使用场景去创建数据库,比如数据模型中的场景

                1.如果是针对四川的多个医院进行数据存储,创建的数据库为root.四川

                2.如果是针对医院的多个设备进行数据存储,创建的数据库为root.四川.医院

设备模板

        定义:实现同类型不同实体的物理量元数据共享,减少元数据内存占用,同时简化同类型实体的管理

        如果没有设备模板,相同类型的设备我们就需要重复创建时间序列

        一般情况下是将测点设置为设备模板,建议将其挂载到数据库节点

Java 集成 IoTDB

引入依赖

<dependencies>
    <dependency>
      <groupId>org.apache.iotdb</groupId>
      <artifactId>iotdb-session</artifactId>
      <version>1.3.2</version>
    </dependency>
</dependencies>

创建会话交给Spring容器管理

/**
 * IoTDB配置
 * @author muze
 */
@Data
@Slf4j
@Configuration
@ConfigurationProperties(prefix = "iotdb")
public class IoTDBConfig {
    /**
     * 主机
     */
    private String host;
    /**
     * 端口
     */
    private int port;
    /**
     * 用户名
     */
    private String username;
    /**
     * 密码
     */
    private String password;
    /**
     * 创建IoTDB会话交给Spring容器管理,并打开会话
     * @return IoTDB会话
     */
    @Bean
    public Session iotdbSession() {
        // 使用配置数据创建IoTDB会话
        Session session = new Session.Builder().host(host).port(port).username(username).password(password).build();
        try {
            // 打开会话
            session.open();
        } catch (IoTDBConnectionException e) {
            log.error(e.getMessage());
        }
        // 返回会话
        return session;
    }
}

在application.yml中添加IoTDB相关信息

iotdb:
  host: 主机
  port: 端口
  username: 账号
  password: 密码

注入使用

创建生命体征实体

/**
 * 生命体征实体类
 * @author muze
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class VitalSign {
    /**
     * 时间
     */
    private String time;
    /**
     * 体温
     */
    private String temperature;
    /**
     * 心率
     */
    private String heartRate;
    /**
     * 血压
     */
    private String bloodPressure;
}

编写测试类 

@Slf4j
@SpringBootTest
public class IoTDBTest {
    @Autowired
    private Session iotdbSession;
    /**
     * 时间格式
     */
    private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    @Test
    public void test() {
        // 数据库名称
        String databaseName = "root.四川.医院";
        // 设备模板名称
        String templateName = "template";
        // 设备模板字段
        List<String> templateFieldList = Arrays.asList("体温", "心率", "血压");
        // 设备名称集合
        List<String> deviceNameList = Arrays.asList("生命体征监测仪1", "生命体征监测仪2", "生命体征监测仪3");
        // 模拟数据
        VitalSign vitalSign1 = new VitalSign("2024-10-12 17:50:00", "36.3", "101", "71.3");
        VitalSign vitalSign2 = new VitalSign("2024-10-12 17:51:00", "36.1", "99", "72.1");
        VitalSign vitalSign3 = new VitalSign("2024-10-12 17:52:00", "36.2", "103", "71.7");
        Map<String, List<VitalSign>> map = new HashMap<>();
        map.put("生命体征监测仪1", List.of(vitalSign1));
        map.put("生命体征监测仪2", List.of(vitalSign2));
        map.put("生命体征监测仪3", List.of(vitalSign3));
        try {
            // 创建数据库
            iotdbSession.setStorageGroup(databaseName);
            Template template = buildDeviceTemplate(templateName, templateFieldList);
            // 创建设备模板
            iotdbSession.createSchemaTemplate(template);
            // 挂在设备模板
            iotdbSession.setSchemaTemplate(templateName, databaseName);
            // 使用模板为设备创建时序列表
            List<String> devicePathList = buildDevicePathList(databaseName, deviceNameList);
            iotdbSession.createTimeseriesUsingSchemaTemplate(devicePathList);
            /*
             使用Table插入数据
             1.写入效率高
             2.支持批量写入
             3.支持写入空值
             */
            Map<String, Tablet> tabletMap = buildTableMap(map, databaseName, templateFieldList, deviceNameList);
            // 以子表格式数据Map向IoTDB插入设备数据
            iotdbSession.insertTablets(tabletMap);
        } catch (IoTDBConnectionException | StatementExecutionException | IOException e) {
            log.error("IoTDB异常:{}", e.getMessage());
        }
    }

    /**
     * 构建设备模板
     * @param templateName 设备模板名称
     * @param templateFieldList 设备模板字段集合
     * @return 设备模板
     * @throws StatementExecutionException 语句执行异常
     */
    private static Template buildDeviceTemplate(String templateName, List<String> templateFieldList) throws StatementExecutionException {
        Template template = new Template(templateName);
        for (String templateField : templateFieldList) {
            template.addToTemplate(new MeasurementNode(templateField, TSDataType.TEXT, TSEncoding.PLAIN, CompressionType.LZ4));
        }
        return template;
    }

    /**
     * 构建设备路径集合
     * @param databaseName 数据库名称
     * @param deviceNameList 设备名称集合
     * @return 设备路径集合
     */
    private static List<String> buildDevicePathList(String databaseName, List<String> deviceNameList) {
        List<String> devicePathList = new ArrayList<>();
        for (String deviceName : deviceNameList) {
            devicePathList.add(databaseName + "." + deviceName);
        }
        return devicePathList;
    }

    /**
     * 构建子表结构数据Map
     * @param deviceNameDataMap 设备名称数据Map
     * @param databaseName 数据库名称
     * @param templateFieldList 设备模板字段集合
     * @param deviceNameList 设备名称集合
     * @return 子表结构数据Map
     * @throws ParseException 解析异常
     */
    private static Map<String, Tablet> buildTableMap(Map<String, List<VitalSign>> deviceNameDataMap, String databaseName, List<String> templateFieldList, List<String> deviceNameList) {
        Map<String, Tablet> tabletMap = new HashMap<>();
        List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
        for (String templateFiled : templateFieldList) {
            measurementSchemaList.add(new MeasurementSchema(templateFiled, TSDataType.TEXT));
        }
        for (String deviceName : deviceNameList) {
            // 根据设备名称获取生命体征数据集合
            List<VitalSign> vitalSignList = deviceNameDataMap.get(deviceName);
            int deviceDataListSize = vitalSignList.size();
            Tablet tablet = new Tablet(databaseName + "." + deviceName, measurementSchemaList, deviceDataListSize);
            tablet.rowSize = deviceDataListSize;
            for (int rowIndex = 0; rowIndex < deviceDataListSize; rowIndex++) {
                // 创建子表数据结构
                for (VitalSign vitalSign : vitalSignList) {
                    try {
                        tablet.addTimestamp(rowIndex, simpleDateFormat.parse(vitalSign.getTime()).getTime());
                    } catch (ParseException e) {
                        log.error("时间格式解析异常:{}", e.getMessage());
                    }
                    tablet.addValue("体温", rowIndex, vitalSign.getTemperature());
                    tablet.addValue("心率", rowIndex, vitalSign.getHeartRate());
                    tablet.addValue("血压", rowIndex, vitalSign.getBloodPressure());
                }
                // 将设备路径作为key,子表格式数据作为value放入子表格式数据Map
                tabletMap.put(databaseName + "." + deviceName, tablet);

            }
        }
        return tabletMap;
    }
}

运行结果

到这里我们就在Java 中成功集成了 IoTDB,快去试试吧 

总结

        IoTDB作为一款高性能的时序数据库,具有轻量级架构、高性能、丰富的功能集等特点,能够满足物联网行业中大规模数据存储、高速数据摄入和复杂数据分析的需求,当你遇到这样的场景时就可以考虑使用它,希望这篇文章能对你有所帮助

作者:牧泽_

物联沃分享整理
物联沃-IOTWORD物联网 » Java 集成物联网时序数据库:IoTDB

发表回复