Spring Boot与IoTDB集成:实现高效时序数据管理模式

在物联网(IoT)和工业互联网领域,时序数据的高效管理和分析是至关重要的。Apache IoTDB 是一个高性能的时序数据库,专为处理大规模时序数据而设计。本文将介绍如何在 Spring Boot 应用程序中采用IotDB-Session的方式集成 IoTDB,实现数据的插入、查询和聚合等功能。

1. 项目背景

随着物联网设备的普及,数据量呈爆发式增长。这些数据通常是时间序列数据,具有高吞吐量、高并发和快速写入的特点。传统的关系型数据库在处理这类数据时往往力不从心,而时序数据库(如 IoTDB)则能够更好地满足这些需求。

IoTDB 是一个开源的时序数据库,支持高效的数据插入和查询,特别适合处理设备传感器数据、工业数据等。通过 Spring Boot 集成 IoTDB,我们可以快速构建一个高性能的时序数据管理系统。

2. 环境准备

在开始之前,确保你已经安装了以下环境:

  1. Java Development Kit (JDK):推荐使用 JDK 1.8 或更高版本。

  2. Maven:用于项目构建和依赖管理。

  3. IoTDB:下载并安装 IoTDB,并确保其服务已启动。

  4. Spring Boot:用于构建应用程序。

3. 项目依赖配置

pom.xml 中,添加 IoTDB 的 Java 客户端依赖以及其他必要的 Spring Boot 依赖:

 <dependency>
     <groupId>org.apache.iotdb</groupId>
     <artifactId>iotdb-session</artifactId>
     <version>1.3.0</version>
 </dependency>

4. 配置文件

application-dev.yml 中,配置 IoTDB 的连接信息:

spring:
  iotdb:
    username: root
    password: root
    hostUrl:
     - 127.0.0.1:6667
    maxSize: 100

5. IoTDB 会话管理

创建 IotDbSessionPoolManager 类,用于管理 IoTDB 的会话连接池:

@Component
@Slf4j
@ConfigurationProperties(prefix = "spring.iotdb")
@Setter
public class IotDbSessionPoolManager {

    private String username;

    private String password;

    private List<String> hostUrl;

    private int maxSize;

    private static SessionPool sessionPool;

    public SessionPool getSessionPool() {
        if (sessionPool == null) {
            sessionPool = new SessionPool(hostUrl, username, password, maxSize);
        }
        return sessionPool;
    }

    @PostConstruct
    public void init()  {
        // 创建 SessionPool
        log.info("====SessionPool init====");
        sessionPool = new SessionPool(hostUrl, username, password, maxSize);
    }

    @PreDestroy
    public void destroy() {
        // 关闭 SessionPool
        log.info("====SessionPool destroy====");
        close();
    }

    /**
     * 关闭连接池
     */
    public void close() {
        if (sessionPool != null) {
            sessionPool.close();
            sessionPool = null;
        }
    }

}

6. 服务接口与实现

定义 IotDBService 接口,提供各种数据操作方法,例如插入数据、查询实时数据、查询历史数据、聚合查询等:

public interface IotDBService {

    /**
     * 根据设备获取指标实时数据
     *
     * @param db     iotdb库名
     * @param device iotdb设备路径
     * @param points iotdb测点编码
     * @return
     */
    List<RealtimeValueDto> realtime(String db, String device, List<String> points);

    /**
     * 根据设备获取指标点码平均值
     *
     * @param device    iotdb设备路径 用点隔开
     * @param points    iotdb设备路径 用点隔开
     * @param startTime 开始时间 秒级时间戳
     * @param endTime   结束时间 秒级时间戳
     * @return
     */
    List<AvgValueDto> avgValue(String device, List<String> points, Long startTime, Long endTime);

    /**
     * 根据设备获取指标点码最大值和最小值
     *
     * @param device    iotdb设备路径 用点隔开
     * @param points    iotdb设备路径 用点隔开
     * @param startTime 开始时间 秒级时间戳
     * @param endTime   结束时间 秒级时间戳
     * @return
     */
    List<StatisticsValueDto> statisticsValue(String device, List<String> points, Long startTime, Long endTime);

    /**
     * 根据设备求和指标点码
     *
     * @param device
     * @param points
     * @return
     */
    List<SumValueDto> sumValue(String device, List<String> points, Long startTime, Long endTime);

    /**
     * 获取点码历史数据
     *
     * @param device
     * @param: points
     * @param: page
     * @param: pageSize
     * @param: startTime
     * @param: endTime
     */
    HistoryValuesDto historyData(String device, List<String> points, Integer page, Integer pageSize, Long startTime, Long endTime);

    /**
     * 根据时间窗口按照时间间隔获取点码历史数据
     *
     * @param device
     * @param: points
     * @param: page
     * @param: pageSize
     * @param: startTime
     * @param: endTime
     * @param: windowSize 间隔窗口大小
     */
    HistoryValuesDto windowAggHistoryData(String device, List<String> points, Integer page, Integer pageSize, Long startTime, Long endTime, Integer windowSize, String interval);

    /**
     * 插入数据到IoTDB
     *
     * @param device   deviceId: String - 设备的唯一标识符。
     * @param points   pointArr: String[] - 要插入的数据点名称列表。
     * @param types    types: TSDataType[] - 各数据点对应的数据类型列表,支持的数据类型(BOOLEAN,INT32,INT64,FLOAT,DOUBLE,TEXT)。
     * @param dataJson dataJson: String - 包含要插入的数据的JSON字符串。
     *                 {"data":[{"timestamp":1617187200,"temperature":"111","speed":"1"},{"timestamp":1617187200,"temperature":"111","speed":"1"}]}
     *                 timestamp: Long (可选) - 数据点的时间戳,单位为毫秒。如果不传,则默认为当前时间。
     */
    int insertData(String device, List<String> points, List<TSDataType> types, String dataJson);

    /**
     * 多设备获取指标实时数据
     *
     * @param deviceIds iotdb设备路径
     * @param points    iotdb测点编码
     * @return
     */
    List<BatchRealtimeValueDto> batchRealtime(List<String> deviceIds, List<String> points);

    /**
     * 多设备获取指标点码历史数据
     *
     * @param deviceIds iotdb设备路径
     * @param points    iotdb测点编码
     * @return
     */
    BatchHistoryValuesDto batchHistoryData(List<String> deviceIds, List<String> points, Integer page, Integer pageSize, Long startTime, Long endTime);

    /**
     * 根据时间窗口按照时间间隔获取点码历史数据
     *
     * @param deviceIds
     * @param: points
     * @param: page
     * @param: pageSize
     * @param: startTime
     * @param: endTime
     * @param: windowSize 间隔窗口大小
     */
    BatchHistoryValuesDto batchWindowAggHistoryData(List<String> deviceIds, List<String> points, Integer page, Integer pageSize, Long startTime, Long endTime, Integer windowSize, String interval);

}

实现 IotDBServiceImpl 类,具体实现上述接口中的方法:

@Slf4j
@Service
public class IotDBServiceImpl implements IotDBService {

    @Autowired
    IotDbSessionPoolManager ioTDBSessionPoolManager;

    @Override
    public List<RealtimeValueDto> realtime(String db, String device, List<String> points) {
        //如果请求为空 直接返回null
        if (StringUtils.isEmpty(db) || StringUtils.isEmpty(device) || CollectionUtils.isEmpty(points)) {
            return null;
        }
        SessionPool session = null;
        SessionDataSetWrapper dataSetWrapper = null;
        try {
            session = ioTDBSessionPoolManager.getSessionPool();
            dataSetWrapper = session.executeLastDataQueryForOneDevice(db, device, points, false);
            SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();
            List<RealtimeValueDto> realtimeValueDtoList = new ArrayList<>();
            while (resultSet.hasNext()) {
                RealtimeValueDto realtimeValueDto = RealtimeValueDto.builder().build();
                RowRecord record = resultSet.next();
                realtimeValueDto.setTs(record.getTimestamp());
                List<Field> fields = record.getFields();
                realtimeValueDto.setTimeSeries(fields.get(0).getStringValue());
                realtimeValueDto.setVal(fields.get(1).getStringValue());
                realtimeValueDtoList.add(realtimeValueDto);
            }
            return realtimeValueDtoList;
        } catch (Throwable ex) {
            log.error("realtime error", ex);
            throw new RuntimeException(ex);
        } finally {
            // 释放资源
            if (session != null && dataSetWrapper != null) {
                session.closeResultSet(dataSetWrapper);
            }
        }
    }

    @Override
    public List<AvgValueDto> avgValue(String device, List<String> points, Long startTime, Long endTime) {
        //如果请求为空 直接返回null
        if (StringUtils.isEmpty(device) || CollectionUtils.isEmpty(points)) {
            return null;
        }

        // 构建查询语句
        String sql = buildAvgValueQuery(device, points, startTime, endTime);
        log.info("avgValue sql::{}", sql);

        SessionPool session = null;
        SessionDataSetWrapper dataSetWrapper = null;
        // 执行查询
        try {
            session = ioTDBSessionPoolManager.getSessionPool();
            dataSetWrapper = session.executeQueryStatement(sql);
            SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();

            List<AvgValueDto> avgValueList = new ArrayList<>();
            while (resultSet.hasNext()) {
                RowRecord record = resultSet.next();
                List<Field> fields = record.getFields();
                for (int i = 0; i < fields.size(); i++) {
                    AvgValueDto avgValue = AvgValueDto.builder().build();
                    avgValue.setPoint(points.get(i));
                    if (!Objects.equals(fields.get(i).getStringValue(), "null")) {
                        avgValue.setVal(fields.get(i).getDoubleV());
                    }
                    avgValueList.add(avgValue);
                }
            }
            return avgValueList;
        } catch (Throwable ex) {
            log.error("avgValue error", ex);
            throw new RuntimeException(ex);
        } finally {
            // 释放资源
            if (session != null && dataSetWrapper != null) {
                session.closeResultSet(dataSetWrapper);
            }
        }
    }

    @Override
    public List<StatisticsValueDto> statisticsValue(String device, List<String> points, Long startTime, Long endTime) {
        //如果请求为空 直接返回null
        if (StringUtils.isEmpty(device) || CollectionUtils.isEmpty(points)) {
            return null;
        }

        // 构建查询语句
        String sql = buildStatisticsValueQuery(device, points, startTime, endTime);
        log.info("statisticsValue sql :: {}", sql);

        // 执行查询
        SessionPool session = null;
        SessionDataSetWrapper dataSetWrapper = null;
        try {
            session = ioTDBSessionPoolManager.getSessionPool();
            dataSetWrapper = session.executeQueryStatement(sql);
            SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();

            List<StatisticsValueDto> statisticsValueDtoList = new ArrayList<>();
            while (resultSet.hasNext()) {
                RowRecord record = resultSet.next();
                List<Field> fields = record.getFields();
                for (int i = 0; i < points.size(); i++) {
                    StatisticsValueDto value = StatisticsValueDto.builder().build();
                    value.setPoint(points.get(i));
                    value.setMaxVal(fields.get(i * 2).getStringValue());
                    value.setMinVal(fields.get(i * 2 + 1).getStringValue());
                    statisticsValueDtoList.add(value);
                }
            }
            return statisticsValueDtoList;
        } catch (Throwable ex) {
            log.error("statisticsValue error", ex);
            throw new RuntimeException(ex);
        } finally {
            // 释放资源
            if (session != null && dataSetWrapper != null) {
                session.closeResultSet(dataSetWrapper);
            }
        }
    }

    @Override
    public List<SumValueDto> sumValue(String device, List<String> points, Long startTime, Long endTime) {
        //如果请求为空 直接返回null
        if (StringUtils.isEmpty(device) || CollectionUtils.isEmpty(points)) {
            return null;
        }

        // 构建查询语句
        String sql = buildSumValueQuery(device, points, startTime, endTime);
        log.info("sumValue sql::{}", sql);

        // 执行查询
        SessionPool session = null;
        SessionDataSetWrapper dataSetWrapper = null;
        try {
            session = ioTDBSessionPoolManager.getSessionPool();
            dataSetWrapper = session.executeQueryStatement(sql);
            SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();

            List<SumValueDto> sumValueDtoList = new ArrayList<>();
            while (resultSet.hasNext()) {
                RowRecord record = resultSet.next();
                List<Field> fields = record.getFields();
                for (int i = 0; i < fields.size(); i++) {
                    SumValueDto value = SumValueDto.builder().build();
                    value.setPoint(points.get(i));
                    if (!Objects.equals(fields.get(i).getStringValue(), "null")) {
                        value.setVal(fields.get(i).getDoubleV());
                    }
                    sumValueDtoList.add(value);
                }
            }
            return sumValueDtoList;
        } catch (Throwable ex) {
            log.error("sumValue error", ex);
            throw new RuntimeException(ex);
        } finally {
            // 释放资源
            if (session != null && dataSetWrapper != null) {
                session.closeResultSet(dataSetWrapper);
            }
        }
    }

    @Override
    public HistoryValuesDto historyData(String device, List<String> points, Integer page, Integer pageSize, Long startTime, Long endTime) {
        // 如果请求为空,直接返回null
        if (StringUtils.isEmpty(device) || CollectionUtils.isEmpty(points) || page == null || pageSize == null) {
            return null;
        }

        // 执行计数查询
        long totalRecords = getQueryResultCount(device, points, startTime, endTime);
        if (totalRecords == 0) {
            // 如果没有记录,直接返回null
            return null;
        }

        // 将页码转换为基于0的索引
        int offset = (page - 1) * pageSize;

        // 构建查询语句
        String sql = buildHistoryDataQuery(device, points, startTime, endTime, offset, pageSize);
        log.info("historyData sql::{}", sql);

        // 执行查询
        SessionPool session = null;
        SessionDataSetWrapper dataSetWrapper = null;
        try {
            session = ioTDBSessionPoolManager.getSessionPool();
            dataSetWrapper = session.executeQueryStatement(sql);
            SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();

            HistoryValuesDto historyValuesDto = new HistoryValuesDto();
            List<HistoryDataDto> historyDataList = new ArrayList<>();
            Map<String, HistoryDataDto> historyDataMap = new HashMap<>();

            // 计算总页数
            int totalPage = (int) Math.ceil((double) totalRecords / pageSize);

            while (resultSet.hasNext()) {
                RowRecord record = resultSet.next();
                // 获取时间戳
                long timestamp = record.getTimestamp();
                List<Field> fields = record.getFields();

                // 遍历所有数据点
                for (int i = 0; i < points.size(); i++) {
                    String point = points.get(i); // 当前数据点的名称
                    Object val = fields.get(i).getStringValue(); // 当前数据点的值

                    // 创建或获取HistoryData对象
                    HistoryDataDto historyData = historyDataMap.computeIfAbsent(point, k -> {
                        HistoryDataDto newHistoryData = new HistoryDataDto();
                        newHistoryData.setPoint(k);
                        newHistoryData.setValues(new ArrayList<>());
                        return newHistoryData;
                    });

                    // 创建HistoryValue对象并添加到HistoryData的values列表中
                    HistoryValueDto historyValueDto = new HistoryValueDto();
                    historyValueDto.setTimestamp(timestamp);
                    historyValueDto.setVal(val);
                    historyData.getValues().add(historyValueDto);
                }
            }

            // 将Map中的所有HistoryData对象添加到列表中
            historyDataList.addAll(historyDataMap.values());
            historyValuesDto.setHistoryData(historyDataList);
            historyValuesDto.setTotal((int) totalRecords);
            historyValuesDto.setPageSize(pageSize);
            historyValuesDto.setPage(totalPage);
            historyValuesDto.setCurrent(page);
            return historyValuesDto;
        } catch (Throwable ex) {
            log.error("historyData error", ex);
            throw new RuntimeException(ex);
        } finally {
            // 释放资源
            if (session != null && dataSetWrapper != null) {
                session.closeResultSet(dataSetWrapper);
            }
        }
    }


    @Override
    public HistoryValuesDto windowAggHistoryData(String device, List<String> points, Integer page, Integer pageSize, Long startTime, Long endTime, Integer windowSize, String interval) {
        // 如果请求为空,直接返回null
        if (StringUtils.isEmpty(device) || CollectionUtils.isEmpty(points) || page == null || pageSize == null || windowSize == null || StringUtils.isEmpty(interval)) {
            return null;
        }

        // 执行计数查询
        long totalRecords = getQueryResultCountForSingleDevices(device, points, startTime, endTime, windowSize, interval);
        if (totalRecords == 0) {
            // 如果没有记录,直接返回null
            return null;
        }

        // 将页码转换为基于0的索引
        int offset = (page - 1) * pageSize;

        // 构建查询语句
        String sql = buildWindowAggQuery(device, points, startTime, endTime, windowSize, interval, offset, pageSize);
        log.info("windowAggHistoryData sql::{}", sql);

        // 执行查询
        SessionPool session = null;
        SessionDataSetWrapper dataSetWrapper = null;
        try {
            session = ioTDBSessionPoolManager.getSessionPool();
            dataSetWrapper = session.executeQueryStatement(sql);
            SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();

            HistoryValuesDto historyValuesDto = new HistoryValuesDto();
            List<HistoryDataDto> historyDataList = new ArrayList<>();

            // 计算总页数
            int totalPage = (int) Math.ceil((double) totalRecords / pageSize);

            log.info("totalPage::{},totalRecords::{},::pageSize{}", totalPage, totalRecords, pageSize);

            while (resultSet.hasNext()) {
                RowRecord record = resultSet.next();
                // 获取时间戳
                long timestamp = record.getTimestamp();
                List<Field> fields = record.getFields();

                // 遍历所有数据点
                for (int i = 0; i < points.size(); i++) {
                    String point = points.get(i); // 当前数据点的名称
                    Object val = fields.get(i).getStringValue(); // 当前数据点的值

                    // 创建或获取HistoryData对象
                    HistoryDataDto historyData = historyDataList.stream()
                            .filter(h -> h.getPoint().equals(point))
                            .findFirst()
                            .orElseGet(() -> {
                                HistoryDataDto newHistoryData = new HistoryDataDto();
                                newHistoryData.setPoint(point);
                                newHistoryData.setValues(new ArrayList<>());
                                historyDataList.add(newHistoryData);
                                return newHistoryData;
                            });

                    // 创建HistoryValue对象并添加到HistoryData的values列表中
                    HistoryValueDto historyValueDto = new HistoryValueDto();
                    historyValueDto.setTimestamp(timestamp);
                    historyValueDto.setVal(val);
                    historyData.getValues().add(historyValueDto);
                }
            }

            historyValuesDto.setHistoryData(historyDataList);
            historyValuesDto.setTotal((int) totalRecords);
            historyValuesDto.setPageSize(pageSize);
            historyValuesDto.setPage(totalPage);
            historyValuesDto.setCurrent(page);
            return historyValuesDto;
        } catch (Throwable ex) {
            log.error("windowAggHistoryData error", ex);
            throw new RuntimeException(ex);
        } finally {
            // 释放资源
            if (session != null && dataSetWrapper != null) {
                session.closeResultSet(dataSetWrapper);
            }
        }
    }

    /**
     * 插入数据到IoTDB
     *
     * @param deviceId 设备的唯一标识符
     * @param pointArr 要插入的数据点名称列表
     * @param types    各数据点对应的数据类型列表
     * @param dataJson 包含要插入的数据的JSON字符串
     *                 timestamp 数据点的时间戳,单位为毫秒。如果不传,则默认为当前时间。
     */
    @Override
    public int insertData(String deviceId, List<String> pointArr, List<TSDataType> types, String dataJson) {
//        log.info("dataJson===>{}", dataJson);
        List<MeasurementSchema> schemaList = new ArrayList<>();
        int insertedRowCount = 0; // 用于跟踪成功插入的行数
        try {
            SessionPool session = ioTDBSessionPoolManager.getSessionPool();
            for (int i = 0; i < pointArr.size(); i++) {
                schemaList.add(new MeasurementSchema(pointArr.get(i), types.get(i)));
            }
            JSONObject dataJsonObj = new JSONObject(dataJson);
            JSONArray dataArray = dataJsonObj.getJSONArray("data");

            if (dataArray != null && dataArray.length() > 0) {
                Tablet tablet = new Tablet(deviceId, schemaList, dataArray.length());
                tablet.rowSize = dataArray.length();
                for (int i = 0; i < tablet.rowSize; i++) {
                    JSONObject dataObj = dataArray.getJSONObject(i);
                    long timestamp = dataObj.getLong("timestamp");
                    tablet.addTimestamp(i, timestamp);
                    for (int j = 0; j < pointArr.size(); j++) {
                        String point = pointArr.get(j);
                        if (dataObj.has(point)) {
                            Object value = dataObj.get(point);
                            // 根据数据类型将值添加到 Tablet
                            if (types.get(j) == TSDataType.BOOLEAN) {
                                tablet.addValue(point, i, Boolean.parseBoolean(value.toString()));
                            } else if (types.get(j) == TSDataType.INT32) {
                                tablet.addValue(point, i, Integer.parseInt(value.toString()));
                            } else if (types.get(j) == TSDataType.INT64) {
                                tablet.addValue(point, i, Long.parseLong(value.toString()));
                            } else if (types.get(j) == TSDataType.FLOAT) {
                                tablet.addValue(point, i, Float.parseFloat(value.toString()));
                            } else if (types.get(j) == TSDataType.DOUBLE) {
                                tablet.addValue(point, i, Double.parseDouble(value.toString()));
                            } else if (types.get(j) == TSDataType.TEXT) {
                                tablet.addValue(point, i, value.toString());
                            }
                        }
                    }
                }
                if (tablet.rowSize != 0) {
                    session.insertTablet(tablet);
                    log.info("Inserted data into device: {}", deviceId);
                    insertedRowCount = tablet.rowSize; // 更新成功插入的行数
                }
            }
        } catch (Throwable ex) {
            log.error("insertData error", ex);
            throw new RuntimeException(ex);
        }
        return insertedRowCount; // 返回成功插入的行数
    }

    @Override
    public List<BatchRealtimeValueDto> batchRealtime(List<String> deviceIds, List<String> points) {
        List<BatchRealtimeValueDto> batchRealtimeValues = new ArrayList<>();
        SessionPool session = null;
        SessionDataSetWrapper dataSetWrapper = null;
        try {
            String sql = buildLastDataQuery(deviceIds, points);
            log.info("batchRealtime sql::{}", sql);
            session = ioTDBSessionPoolManager.getSessionPool();
            dataSetWrapper = session.executeQueryStatement(sql);
            SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();

            // 用于存储每个设备的最新数据
            Map<String, BatchRealtimeValueDto> deviceDataMap = new HashMap<>();

            while (resultSet.hasNext()) {
                // 获取设备路径和时间序列名称和时间戳
                RowRecord record = resultSet.next();
                long timestamp = record.getTimestamp();
                String timeSeries = record.getFields().get(0).getStringValue();
                String currentDeviceId = timeSeries.substring(0, timeSeries.lastIndexOf("."));
                for (String deviceId : deviceIds) {
                    // 如果当前记录的设备ID与遍历的设备ID匹配
                    if (currentDeviceId.equals(deviceId)) {
                        // 初始化设备的最新数据对象
                        BatchRealtimeValueDto batchRealtimeValue = deviceDataMap.computeIfAbsent(deviceId, k -> {
                            BatchRealtimeValueDto newValue = new BatchRealtimeValueDto();
                            newValue.setDeviceId(k);
                            newValue.setRealtimeData(new ArrayList<>());
                            return newValue;
                        });
                        RealtimeValueDto realtimeValueDto = RealtimeValueDto.builder().build();
                        realtimeValueDto.setTs(timestamp);
                        realtimeValueDto.setTimeSeries(timeSeries);
                        // 假设数据类型为 STRING,需要根据实际数据类型进行转换
                        realtimeValueDto.setVal(record.getFields().get(1).getStringValue());
                        batchRealtimeValue.getRealtimeData().add(realtimeValueDto);
                    }
                }
            }
            // 将所有设备的最新数据添加到结果列表
            batchRealtimeValues.addAll(deviceDataMap.values());
        } catch (Exception ex) {
            log.error("batchRealtime error", ex);
            throw new RuntimeException(ex);
        } finally {
            // 释放资源
            if (session != null && dataSetWrapper != null) {
                session.closeResultSet(dataSetWrapper);
            }
        }
        return batchRealtimeValues;
    }

    @Override
    public BatchHistoryValuesDto batchHistoryData(List<String> deviceIds, List<String> points, Integer page, Integer pageSize, Long startTime, Long endTime) {
        // 如果请求为空,直接返回null
        if (CollectionUtils.isEmpty(deviceIds) || CollectionUtils.isEmpty(points) || page == null || pageSize == null) {
            return null;
        }

        // 执行计数查询
        long totalRecords = getQueryResultCountForMultipleDevices(deviceIds, points, startTime, endTime);
        if (totalRecords == 0) {
            // 如果没有记录,直接返回null
            return null;
        }

        // 将页码转换为基于0的索引
        int offset = (page - 1) * pageSize;
        // 计算总页数和偏移量
        int totalPage = (int) Math.ceil((double) totalRecords / pageSize);

        // 构建查询语句
        String sql = buildHistoryDataQuery(deviceIds, points, startTime, endTime, offset, pageSize);
        log.info("batchHistoryData sql::{}", sql);

        SessionPool session = null;
        SessionDataSetWrapper dataSetWrapper = null;
        try {
            session = ioTDBSessionPoolManager.getSessionPool();
            dataSetWrapper = session.executeQueryStatement(sql);
            SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();

            BatchHistoryValuesDto batchHistoryValuesDto = new BatchHistoryValuesDto();
            List<HistoryDataByDeviceDto> historyDataByDeviceList = new ArrayList<>();
            Map<String, HistoryDataByDeviceDto> deviceDataMap = new HashMap<>();

            // 获取列名
            List<String> columnNames = resultSet.getColumnNames();

            while (resultSet.hasNext()) {
                RowRecord record = resultSet.next();
                long timestamp = record.getTimestamp();

                // 遍历字段,从第一个字段开始(跳过时间字段)
                for (int i = 1; i < columnNames.size(); i++) { // 从1开始跳过时间字段
                    String fieldName = columnNames.get(i);
                    Object value = record.getFields().get(i - 1).getStringValue(); // 获取值
                    // 根据字段名解析设备 ID 和测点名
                    String deviceId = fieldName.substring(0, fieldName.lastIndexOf(".")); // 设备 ID
                    String point = fieldName.substring(fieldName.lastIndexOf(".") + 1); // 测点名


                    // 获取或创建 HistoryDataByDeviceDto 对象
                    HistoryDataByDeviceDto deviceData = deviceDataMap.computeIfAbsent(deviceId, k -> {
                        HistoryDataByDeviceDto newDeviceData = new HistoryDataByDeviceDto();
                        newDeviceData.setDeviceId(k);
                        newDeviceData.setHistoryData(new ArrayList<>());
                        return newDeviceData;
                    });

                    // 创建或获取 HistoryDataDto 对象
                    HistoryDataDto historyData = deviceData.getHistoryData().stream()
                            .filter(h -> h.getPoint().equals(point))
                            .findFirst()
                            .orElseGet(() -> {
                                HistoryDataDto newHistoryData = new HistoryDataDto();
                                newHistoryData.setPoint(point);
                                newHistoryData.setValues(new ArrayList<>());
                                deviceData.getHistoryData().add(newHistoryData);
                                return newHistoryData;
                            });

                    // 创建并添加 HistoryValueDto 对象
                    HistoryValueDto historyValueDto = new HistoryValueDto();
                    historyValueDto.setTimestamp(timestamp);
                    historyValueDto.setVal(value);
                    historyData.getValues().add(historyValueDto);
                }
            }


            historyDataByDeviceList.addAll(deviceDataMap.values());
            batchHistoryValuesDto.setHistoryDataByDevice(historyDataByDeviceList);
            batchHistoryValuesDto.setTotal((int) totalRecords);
            batchHistoryValuesDto.setPage(totalPage);
            batchHistoryValuesDto.setCurrent(page);
            batchHistoryValuesDto.setPageSize(pageSize);

            return batchHistoryValuesDto;
        } catch (Throwable ex) {
            log.error("batchHistoryData error", ex);
            throw new RuntimeException("Error fetching batch history data", ex);
        } finally {
            // 释放资源
            if (session != null && dataSetWrapper != null) {
                session.closeResultSet(dataSetWrapper);
            }
        }
    }


    @Override
    public BatchHistoryValuesDto batchWindowAggHistoryData(List<String> deviceIds, List<String> points, Integer page, Integer pageSize, Long startTime, Long endTime, Integer windowSize, String interval) {
        // 如果请求为空,直接返回null
        if (CollectionUtils.isEmpty(deviceIds) || CollectionUtils.isEmpty(points) || page == null || pageSize == null || windowSize == null || interval == null) {
            return null;
        }

        // 执行计数查询
        long totalRecords = getQueryResultCountForMultipleDevices(deviceIds, points, startTime, endTime, windowSize, interval, pageSize, page);
        if (totalRecords == 0) {
            return null;
        }

        // 分页计算
        int offset = (page - 1) * pageSize;
        int totalPage = (int) Math.ceil((double) totalRecords / pageSize);

        // 构建查询语句
        String sql = buildBatchWindowAggQuery(deviceIds, points, startTime, endTime, windowSize, interval, offset, pageSize);
        log.info("batchWindowAggHistoryData sql::{}", sql);

        SessionPool session = null;
        SessionDataSetWrapper dataSetWrapper = null;
        try {
            session = ioTDBSessionPoolManager.getSessionPool();
            dataSetWrapper = session.executeQueryStatement(sql);
            SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();

            BatchHistoryValuesDto batchHistoryValuesDto = new BatchHistoryValuesDto();
            List<HistoryDataByDeviceDto> historyDataByDeviceList = new ArrayList<>();
            Map<String, HistoryDataByDeviceDto> deviceDataMap = new HashMap<>();

            // 获取列名
            List<String> columnNames = resultSet.getColumnNames();

            while (resultSet.hasNext()) {
                RowRecord record = resultSet.next();
                long timestamp = record.getTimestamp();

                // 遍历字段,从第一个字段开始(跳过时间字段)
                for (int i = 1; i < columnNames.size(); i++) { // 从1开始跳过时间字段
                    String fieldName = columnNames.get(i);
                    Object value = record.getFields().get(i - 1).getStringValue(); // 获取值

                    // fieldName 形式如 "avg(root.ln.wf01.wt01.temperature)"
                    // 确保提取正确的设备 ID 和测点名
                    String cleanedFieldName = fieldName.trim(); // 去掉首尾空格

                    // 使用正则表达式来提取设备 ID 和测点名
                    // 修改这里的正则表达式以提取完整的 deviceId
                    Pattern pattern = Pattern.compile("avg\\((.+?)\\.([\\w`]+)\\)"); // 匹配 deviceId 和 point
                    Matcher matcher = pattern.matcher(cleanedFieldName);

                    if (matcher.find()) {
                        String deviceId = matcher.group(1); // 获取到除最后一段的设备 ID
                        String point = matcher.group(2); // 获取最后一段作为测点名
                        // 创建或获取 HistoryDataByDeviceDto 对象
                        HistoryDataByDeviceDto deviceData = deviceDataMap.computeIfAbsent(deviceId, k -> {
                            HistoryDataByDeviceDto newDeviceData = new HistoryDataByDeviceDto();
                            newDeviceData.setDeviceId(k);
                            newDeviceData.setHistoryData(new ArrayList<>());
                            return newDeviceData;
                        });

                        // 创建或获取 HistoryDataDto 对象
                        HistoryDataDto historyData = deviceData.getHistoryData().stream()
                                .filter(h -> h.getPoint().equals(point))
                                .findFirst()
                                .orElseGet(() -> {
                                    HistoryDataDto newHistoryData = new HistoryDataDto();
                                    newHistoryData.setPoint(point);
                                    newHistoryData.setValues(new ArrayList<>());
                                    deviceData.getHistoryData().add(newHistoryData);
                                    return newHistoryData;
                                });

                        // 创建并添加 HistoryValueDto 对象
                        HistoryValueDto historyValueDto = new HistoryValueDto();
                        historyValueDto.setTimestamp(timestamp);
                        historyValueDto.setVal(value);
                        historyData.getValues().add(historyValueDto);
                    }
                }
            }

            historyDataByDeviceList.addAll(deviceDataMap.values());
            batchHistoryValuesDto.setHistoryDataByDevice(historyDataByDeviceList);
            batchHistoryValuesDto.setTotal((int) totalRecords);
            batchHistoryValuesDto.setPage(totalPage);
            batchHistoryValuesDto.setCurrent(page);
            batchHistoryValuesDto.setPageSize(pageSize);

            return batchHistoryValuesDto;
        } catch (Throwable ex) {
            log.error("batchWindowAggHistoryData error", ex);
            throw new RuntimeException("Error fetching batch history data", ex);
        } finally {
            //释放资源
            if (session != null && dataSetWrapper != null) {
                session.closeResultSet(dataSetWrapper);
            }
        }
    }

    public long getQueryResultCountForSingleDevices(String device, List<String> points, Long startTime, Long endTime, Integer windowSize, String interval) {
        long count = 0;
        SessionPool session = null;
        SessionDataSetWrapper dataSetWrapper = null;
        try {
            session = ioTDBSessionPoolManager.getSessionPool();
            // 构建查询语句
            String sql = buildWindowAggQuery(device, points, startTime, endTime, windowSize, interval);
            log.info("getQueryResultCountForSingleDevices sql::{}", sql);

            // 执行查询
            dataSetWrapper = session.executeQueryStatement(sql);
            SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();

            // 由于是聚合查询,获取行数的方法是简单地调用 next(),不需要记录具体的字段
            while (resultSet.hasNext()) {
                resultSet.next();
                count++;
            }
        } catch (Exception ex) {
            log.error("getQueryResultCountForSingleDevices error", ex);
            throw new RuntimeException(ex);
        } finally {
            // 释放资源
            if (session != null && dataSetWrapper != null) {
                session.closeResultSet(dataSetWrapper);
            }
        }
        return count;
    }

    public long getQueryResultCountForMultipleDevices(List<String> devices, List<String> points, Long startTime, Long endTime, Integer windowSize, String interval, int offset, int limit) {
        long count = 0;
        SessionPool session = null;
        SessionDataSetWrapper dataSetWrapper = null;
        try {
            session = ioTDBSessionPoolManager.getSessionPool();
            // 构建查询语句
            String sql = buildBatchWindowAggQueryCount(devices, points, startTime, endTime, windowSize, interval);
            log.info("getQueryResultCountForMultipleDevices sql::{}", sql);

            // 执行查询
            dataSetWrapper = session.executeQueryStatement(sql);
            SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();

            // 由于是聚合查询,获取行数的方法是简单地调用 next(),不需要记录具体的字段
            while (resultSet.hasNext()) {
                resultSet.next();
                count++;
            }
        } catch (Exception ex) {
            log.error("getQueryResultCountForMultipleDevices error", ex);
            throw new RuntimeException(ex);
        } finally {
            // 释放资源
            if (session != null && dataSetWrapper != null) {
                session.closeResultSet(dataSetWrapper);
            }
        }
        return count;
    }

    private static String buildBatchWindowAggQueryCount(List<String> devices, List<String> points, Long startTime, Long endTime, Integer windowSize, String interval) {
        if (devices == null || devices.isEmpty() || points == null || points.isEmpty() || startTime == null || endTime == null || windowSize == null || interval == null) {
            throw new IllegalArgumentException("Invalid input parameters");
        }

        StringBuilder queryBuilder = new StringBuilder();
        queryBuilder.append("SELECT ");

        // 构建SELECT部分,多个测点的平均值
        for (int i = 0; i < points.size(); i++) {
            queryBuilder.append("avg(").append(points.get(i)).append(")");
            if (i < points.size() - 1) {
                queryBuilder.append(", ");
            }
        }

        // FROM部分,多个设备用逗号分隔
        queryBuilder.append(" FROM ").append(String.join(", ", devices));

        // GROUP BY部分
        queryBuilder.append(" GROUP BY ([")
                .append(startTime).append(", ")
                .append(endTime).append("), ")
                .append(windowSize).append(interval)
                .append(") ");

        // ORDER BY部分
        queryBuilder.append("ORDER BY time ASC ");
        return queryBuilder.toString();
    }

    private static String buildBatchWindowAggQuery(List<String> devices, List<String> points, Long startTime, Long endTime, Integer windowSize, String interval, int offset, int limit) {
        if (devices == null || devices.isEmpty() || points == null || points.isEmpty() || startTime == null || endTime == null || windowSize == null || interval == null) {
            throw new IllegalArgumentException("Invalid input parameters");
        }

        StringBuilder queryBuilder = new StringBuilder();
        queryBuilder.append("SELECT ");

        // 构建SELECT部分,多个测点的平均值
        for (int i = 0; i < points.size(); i++) {
            queryBuilder.append("avg(").append(points.get(i)).append(")");
            if (i < points.size() - 1) {
                queryBuilder.append(", ");
            }
        }

        // FROM部分,多个设备用逗号分隔
        queryBuilder.append(" FROM ").append(String.join(", ", devices));

        // GROUP BY部分
        queryBuilder.append(" GROUP BY ([")
                .append(startTime).append(", ")
                .append(endTime).append("), ")
                .append(windowSize).append(interval)
                .append(") ");

        // ORDER BY部分
        queryBuilder.append("ORDER BY time ASC fill(previous) ");
//        queryBuilder.append("ORDER BY time ASC ");

        // LIMIT和OFFSET部分
        queryBuilder.append("LIMIT ").append(limit).append(" OFFSET ").append(offset);

        return queryBuilder.toString();
    }


    private String buildHistoryDataQuery(List<String> deviceIds, List<String> points, Long startTime, Long endTime, int offset, int limit) {
        StringBuilder sqlBuilder = new StringBuilder("SELECT ");
        for (int i = 0; i < points.size(); i++) {
            sqlBuilder.append(points.get(i));
            if (i < points.size() - 1) {
                sqlBuilder.append(", ");
            }
        }
        sqlBuilder.append(" FROM ");
        for (int i = 0; i < deviceIds.size(); i++) {
            sqlBuilder.append(deviceIds.get(i));
            if (i < deviceIds.size() - 1) {
                sqlBuilder.append(", ");
            }
        }
        sqlBuilder.append(" WHERE time >= ").append(startTime).append(" AND time <= ").append(endTime);
        sqlBuilder.append(" ORDER BY time ASC");
        sqlBuilder.append(" LIMIT ").append(limit).append(" OFFSET ").append(offset);
        return sqlBuilder.toString();
    }


    /**
     * 根据传入的测量值列表构建查询最新值语句
     */
    public static String buildLastDataQuery(List<String> deviceIds, List<String> points) {
        StringBuilder sqlBuilder = new StringBuilder("select last ");
        // 为每个数据点添加到 SELECT 子句
        for (int i = 0; i < points.size(); i++) {
            sqlBuilder.append(points.get(i));
            if (i < points.size() - 1) {
                sqlBuilder.append(", ");
            }
        }
        // 添加 FROM 子句,包括所有设备
        sqlBuilder.append(" from ");
        for (int i = 0; i < deviceIds.size(); i++) {
            sqlBuilder.append(deviceIds.get(i));
            if (i < deviceIds.size() - 1) {
                sqlBuilder.append(", ");
            }
        }
        // 返回构建的 SQL 查询语句
        return sqlBuilder.toString();
    }

    /**
     * 根据传入的参数构建窗口聚合查询语句 降采样聚合查询
     */
    private static String buildWindowAggQuery(String device, List<String> points, Long startTime, Long endTime, Integer windowSize, String interval) {
        StringBuilder queryBuilder = new StringBuilder();
        queryBuilder.append("SELECT ");
        for (int i = 0; i < points.size(); i++) {
            queryBuilder.append("avg(").append(points.get(i)).append(") as avg_").append(points.get(i));
            if (i < points.size() - 1) {
                queryBuilder.append(", ");
            }
        }
        queryBuilder.append(" FROM ").append(device);
        queryBuilder.append(" GROUP BY ([").append(startTime).append(", ").append(endTime).append("), ").append(windowSize).append(interval);
        queryBuilder.append(") ORDER BY time asc");
        return queryBuilder.toString();
    }

    /**
     * 根据传入的参数构建窗口聚合查询语句 降采样聚合查询
     */
    private static String buildWindowAggQuery(String device, List<String> points, Long startTime, Long endTime, Integer windowSize, String interval, int offset, int limit) {
        StringBuilder queryBuilder = new StringBuilder();
        queryBuilder.append("SELECT ");
        for (int i = 0; i < points.size(); i++) {
            queryBuilder.append("avg(").append(points.get(i)).append(") as avg_").append(points.get(i));
            if (i < points.size() - 1) {
                queryBuilder.append(", ");
            }
        }
        queryBuilder.append(" FROM ").append(device);
        queryBuilder.append(" GROUP BY ([").append(startTime).append(", ").append(endTime).append("), ").append(windowSize).append(interval);
        queryBuilder.append(") ORDER BY time asc");
        queryBuilder.append(" LIMIT ").append(limit).append(" OFFSET ").append(offset);
        return queryBuilder.toString();
    }

    /**
     * 获取指定查询返回的条数
     *
     * @param device    设备的唯一标识符
     * @param points    要查询的数据点名称列表
     * @param startTime 查询的开始时间戳
     * @param endTime   查询的结束时间戳
     * @return 查询返回的条数
     */
    public long getQueryResultCount(String device, List<String> points, Long startTime, Long endTime) {
        long maxCount = 0;
        SessionPool session = null;
        SessionDataSetWrapper dataSetWrapper = null;
        try {
            session = ioTDBSessionPoolManager.getSessionPool();
            // 构建查询语句
            String sql = buildQueryResultCountSql(device, points, startTime, endTime);
            log.info("getQueryResultCount sql::{}", sql);

            // 执行查询
            dataSetWrapper = session.executeQueryStatement(sql);
            SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();

            // 遍历结果集并计数
            while (resultSet.hasNext()) {
                resultSet.next();
                maxCount++;
            }
        } catch (Exception ex) {
            log.error("getQueryResultCount error", ex);
            throw new RuntimeException(ex);
        } finally {
            // 释放资源
            if (session != null && dataSetWrapper != null) {
                session.closeResultSet(dataSetWrapper);
            }
        }
        return maxCount;
    }

    /**
     * 根据传入的参数构建查询语句
     */
    private String buildQueryResultCountSql(String device, List<String> points, Long startTime, Long endTime) {
        StringBuilder sqlBuilder = new StringBuilder("SELECT ");
        for (int i = 0; i < points.size(); i++) {
            sqlBuilder.append(points.get(i));
            if (i < points.size() - 1) {
                sqlBuilder.append(", ");
            }
        }
        sqlBuilder.append(" FROM ").append(device);
        sqlBuilder.append(" WHERE time >= ").append(startTime).append(" AND time <= ").append(endTime);
        return sqlBuilder.toString();
    }

    /**
     * 获取多设备指定查询返回的总条数
     *
     * @param deviceIds 设备的唯一标识符列表
     * @param points    要查询的数据点名称列表
     * @param startTime 查询的开始时间戳
     * @param endTime   查询的结束时间戳
     * @return 查询返回的总条数
     */
    public long getQueryResultCountForMultipleDevices(List<String> deviceIds, List<String> points, Long startTime, Long endTime) {
        long count = 0;
        SessionPool session = null;
        SessionDataSetWrapper dataSetWrapper = null;
        try {
            session = ioTDBSessionPoolManager.getSessionPool();
            // 构建查询语句
            String sql = buildQueryResultForMultipleDevices(deviceIds, points, startTime, endTime);
            log.info("getQueryResultCountForMultipleDevices sql::{}", sql);

            // 执行查询
            dataSetWrapper = session.executeQueryStatement(sql);
            SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();

            // 遍历结果集并计数
            while (resultSet.hasNext()) {
                resultSet.next();
                count++;
            }
        } catch (Exception ex) {
            log.error("getQueryResultCountForMultipleDevices error", ex);
            throw new RuntimeException(ex);
        } finally {
            // 释放资源
            if (session != null && dataSetWrapper != null) {
                session.closeResultSet(dataSetWrapper);
            }
        }
        return count;
    }

    /**
     * 根据传入的参数构建多设备查询语句
     */
    private String buildQueryResultForMultipleDevices(List<String> deviceIds, List<String> points, Long startTime, Long endTime) {
        StringBuilder sqlBuilder = new StringBuilder("SELECT ");
        for (int i = 0; i < points.size(); i++) {
            sqlBuilder.append(points.get(i));
            if (i < points.size() - 1) {
                sqlBuilder.append(", ");
            }
        }
        sqlBuilder.append(" FROM ");
        for (int i = 0; i < deviceIds.size(); i++) {
            sqlBuilder.append(deviceIds.get(i));
            if (i < deviceIds.size() - 1) {
                sqlBuilder.append(", ");
            }
        }
        sqlBuilder.append(" WHERE time >= ").append(startTime).append(" AND time <= ").append(endTime);
        return sqlBuilder.toString();
    }

    /**
     * 根据传入的测量值列表构建查询语句
     */
    private static String buildHistoryDataQuery(String device, List<String> points, Long startTime, Long endTime, int offset, int limit) {
        StringBuilder queryBuilder = new StringBuilder();
        queryBuilder.append("select ");
        for (int i = 0; i < points.size(); i++) {
            queryBuilder.append(points.get(i));
            if (i < points.size() - 1) {
                queryBuilder.append(", ");
            }
        }
        queryBuilder.append(" from ").append(device);
        queryBuilder.append(" where time >= ").append(startTime).append(" and time <= ").append(endTime);
        queryBuilder.append(" order by time asc");
        queryBuilder.append(" limit ").append(limit).append(" offset ").append(offset);
        return queryBuilder.toString();
    }

    /**
     * 根据传入的测量值列表构建查询语句
     */
    private static String buildAvgValueQuery(String device, List<String> points, Long startTime, Long endTime) {
        StringBuilder queryBuilder = new StringBuilder();
        queryBuilder.append("select ");

        // 为每个测量值构建 AVG 聚合函数
        for (int i = 0; i < points.size(); i++) {
            queryBuilder.append("avg(").append(points.get(i)).append(")");
            if (i < points.size() - 1) {
                queryBuilder.append(", ");
            }
        }

        // 指定查询的时间序列路径
        queryBuilder.append(" from  " + device);

        // 时间范围条件(可根据需求调整)
        if (Objects.nonNull(startTime) && Objects.nonNull(endTime)) {
            queryBuilder.append(" where time >= " + startTime + " and time <= " + endTime);
        }
        return queryBuilder.toString();
    }

    /**
     * 根据传入的测量值列表构建查询最大值和最小值语句
     */
    private static String buildStatisticsValueQuery(String device, List<String> points, Long startTime, Long
            endTime) {
        StringBuilder queryBuilder = new StringBuilder();
        queryBuilder.append("select ");

        // 为每个测量值构建 MAX 和 MIN 聚合函数
        for (int i = 0; i < points.size(); i++) {
            queryBuilder.append("max_value(").append(points.get(i)).append("), ").append("min_value(").append(points.get(i)).append(")");
            if (i < points.size() - 1) {
                queryBuilder.append(", ");
            }
        }

        // 指定查询的时间序列路径
        queryBuilder.append(" from  " + device);
        // 时间范围条件(可根据需求调整)
        if (Objects.nonNull(startTime) && Objects.nonNull(endTime)) {
            queryBuilder.append(" where time >= " + startTime + " and time <= " + endTime);
        }
        return queryBuilder.toString();
    }


    /**
     * 根据传入的测量值列表构建求和查询语句
     */
    private static String buildSumValueQuery(String device, List<String> points, Long startTime, Long endTime) {
        StringBuilder queryBuilder = new StringBuilder();
        queryBuilder.append("select ");

        // 为每个测量值构建 AVG 聚合函数
        for (int i = 0; i < points.size(); i++) {
            queryBuilder.append("sum(").append(points.get(i)).append(")");
            if (i < points.size() - 1) {
                queryBuilder.append(", ");
            }
        }

        // 指定查询的时间序列路径
        queryBuilder.append(" from  " + device);
        // 时间范围条件(可根据需求调整)
        if (Objects.nonNull(startTime) && Objects.nonNull(endTime)) {
            queryBuilder.append(" where time >= " + startTime + " and time <= " + endTime);
        }
        return queryBuilder.toString();
    }

    /**
     * 根据传入的测量值列表构建求和查询语句
     */
    private static String buildHistoryQuery(String device, List<String> points, Long startTime, Long endTime) {
        StringBuilder queryBuilder = new StringBuilder();
        queryBuilder.append("select ");

        // 为每个测量值构建 AVG 聚合函数
        for (int i = 0; i < points.size(); i++) {
            queryBuilder.append("sum(").append(points.get(i)).append(")");
            if (i < points.size() - 1) {
                queryBuilder.append(", ");
            }
        }

        // 指定查询的时间序列路径
        queryBuilder.append(" from  " + device);
        // 时间范围条件(可根据需求调整)
        if (Objects.nonNull(startTime) && Objects.nonNull(endTime)) {
            queryBuilder.append(" where time >= " + startTime + " and time <= " + endTime);
        }
        return queryBuilder.toString();
    }

}

7. 数据传输对象(DTO)& 前端展示‌对象(VO)

定义一系列dto和vo,用于封装请求和响应数据。

@Data
@Builder
public class AvgValueDto {

    /**
     * 测点
     */
    private String point;

    /**
     * 平均值
     */
    private Double val;

}
@Data
public class BatchHistoryValuesDto {

    /**
     * 历史数据
     */
    private List<HistoryDataByDeviceDto> historyDataByDevice;

    /**
     * 总共多少条数据
     */
    private Integer total;

    /**
     * 总共多少页
     */
    private Integer page;

    /**
     * 当前第几页
     */
    private Integer current;

    /**
     * 页大小
     */
    private Integer pageSize;

}
@Data
public class BatchRealtimeValueDto {

    String deviceId;

    List<RealtimeValueDto> realtimeData;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class HistoryDataByDeviceDto {

    /**
     * 测点
     */
    private String deviceId;

    /**
     * 平均值
     */
    private List<HistoryDataDto> historyData;

}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class HistoryDataDto {

    /**
     * 测点
     */
    private String point;

    /**
     * 平均值
     */
    private List<HistoryValueDto> values;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class HistoryValueDto {

    /**
     * 值
     */
    private Object val;

    /**
     * 时间戳
     */
    private long timestamp;

}
@Data
public class HistoryValuesDto {

    /**
     * 历史数据
     *
     * @param null
     */
    private List<HistoryDataDto> historyData;

    /**
     * 总共多少条数据
     */
    private Integer total;

    /**
     * 总共多少页
     */
    private Integer page;

    /**
     * 当前第几页
     */
    private Integer current;

    /**
     * 页大小
     */
    private Integer pageSize;

}
@Data
public class IotDbInsertBeanDto {

    /**
     * 设备id
     */
    private String deviceId;

    /**
     * 数据json
     */
    private String dataJson;

    /**
     * 测点列表
     */
    private List<String> pointArr;

    /**
     * 数据类型列表
     */
    private List<TSDataType> types;

}
public class RealtimeValueDto {

    /**
     * 时间戳
     */
    private long ts;

    /**
     * 时间序列
     */
    private String timeSeries;

    /**
     * 值
     */
    private Object val;
}
@Data
@Builder
public class StatisticsValueDto {

    /**
     * 测点
     */
    private String point;

    /**
     * 最大值
     */
    private String maxVal;

    /**
     * 最小值
     */
    private String minVal;

    /**
     * 平均值
     */
    private Double val;

}
@Data
@Builder
public class SumValueDto {

    /**
     * 测点
     */
    private String point;

    /**
     * 求和值
     */
    private Double val;

}
@Data
public class IotDbBatchHistoryRequestQueryVo {

    /**
     * 设备完整的树路径
     */
    private List<String> deviceIds;

    /**
     * 测点编码
     */
    private List<String> points;

    /**
     * 开始时间
     */
    private Long startTime;

    /**
     * 结束时间
     */
    private Long endTime;

    /**
     * 分页
     */
    private Integer page;

    /**
     * 页大小
     */
    private Integer pageSize;

    /**
     * (窗口大小)
     */
    private Integer windowSize;

    /**
     * 单位 天/毫秒/秒 d/ms/s
     */
    private String interval;

}
@Data
public class IotDbBatchRequestQueryVo {

    /**
     * 数据库名称
     */
    private String db;

    /**
     * 设备完整的树路径
     */
    private List<String> deviceIds;

    /**
     * 测点编码
     */
    private List<String> points;

}
@Data
public class IotDbRequestQueryVo {

    /**
     * 测点编码
     */
    private List<String> points;

    /**
     * 设备完整的树路径
     */
    private String device;

    /**
     * 数据库名称
     */
    private String db;

    /**
     * 开始时间
     */
    private Long startTime;

    /**
     * 结束时间
     */
    private Long endTime;

    /**
     * 分页
     */
    private Integer page;

    /**
     * 页大小
     */
    private Integer pageSize;

    /**
     * (窗口大小)
     */
    private Integer windowSize;

    /**
     * 单位 天/毫秒/秒 d/ms/s
     */
    private String interval;

}
public class ResponseBuilder<T> {

    private static final Logger LOG = LoggerFactory.getLogger(ResponseBuilder.class);

    private int code;

    private String message;

    private T data;

    private String timestamp;

    private String version;

    private ResponseBuilder() {
    }

    public static <R> ResponseVo<R> buildSuccessResponse() {
        return new ResponseVo<R>(ResponseCode.SUCCESS.code, ResponseCode.SUCCESS.desc);
    }

    public static <R> ResponseVo<R> buildSuccessResponse(R data) {
        return new ResponseVo<>(ResponseCode.SUCCESS.code, ResponseCode.SUCCESS.desc, data);
    }

    public static <R> ResponseVo<R> buildSuccessResponse(ResponseCode respCode, R data, String serverVersion) {

        return ResponseBuilder.<R>newBuilder().code(respCode.code)
                .message(respCode.desc)
                .data(data)
                .version(serverVersion)
                .build();
    }

    public static <R> ResponseVo<R> buildErrorResponse() {
        return new ResponseVo<>(ResponseCode.UNEXPECTED_ERROR.code, ResponseCode.UNEXPECTED_ERROR.desc);
    }

    public static <R> ResponseVo<R> buildErrorResponse(ResponseCode respCode) {
        return new ResponseVo<>(respCode.code, respCode.desc);
    }

    public static <R> ResponseVo<R> buildErrorResponse(String message) {
        return new ResponseVo<>(ResponseCode.UNEXPECTED_ERROR.code, message);
    }

    public static <R> ResponseVo<R> buildErrorResponse(int respCode, String message) {
        if (respCode == ResponseCode.SUCCESS.getCode()) {
            LOG.warn("error code match failed respCode:{}", respCode);
            //纠正过来 避免使用合作方的错误码 造成歧义
            respCode = ResponseCode.FAILURE.getCode();
        }
        return new ResponseVo<>(respCode, message);
    }

    public static <R> ResponseVo<R> buildResponse(int respCode, String message) {
        return new ResponseVo<>(respCode, message);
    }

    public static <R> ResponseVo<R> buildErrorResponse(int respCode, String message, R data) {
        return new ResponseVo<R>(respCode, message, data);
    }

    public static <R> ResponseBuilder<R> newBuilder() {
        return new ResponseBuilder<R>();
    }

    public ResponseVo<T> build() {
        return new ResponseVo<>(this);
    }

    public ResponseBuilder<T> code(int code) {
        this.code = code;
        return this;
    }

    public ResponseBuilder<T> message(String message) {
        this.message = message;
        return this;
    }

    public ResponseBuilder<T> data(T data) {
        this.data = data;
        return this;
    }

    public ResponseBuilder<T> timestamp(String timestamp) {
        this.timestamp = timestamp;
        return this;
    }

    public ResponseBuilder<T> version(String version) {
        this.version = version;
        return this;
    }

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }

    public String getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }

    public String getVersion() {
        return version;
    }

    public void setVersion(String version) {
        this.version = version;
    }
}
public enum ResponseCode {

    /**
     * 未知错误(各类异常抛出时指定)
     */
    UNEXPECTED_ERROR(0, "Unexpected error"),


    /**
     * 请求成功
     */
    SUCCESS(1, "Successful"),

    /**
     * 处理错误
     */
    FAILURE(2, "Failure"),

    /**
     * 缺失参数、参数类型错误等
     */
    PARAMS_ERROR(3, "Params error"),

    ;

    public final int code;

    public final String desc;

    ResponseCode(int code, String desc) {
        this.code = code;
        this.desc = desc;
    }

    public int getCode() {
        return code;
    }

    public String getDesc() {
        return desc;
    }
}
public class ResponseVo<T> {

    private int code;

    private String message;

    private T data;

    private String timestamp = String.valueOf(System.currentTimeMillis());

    public ResponseVo() {
    }

    ResponseVo(ResponseBuilder<T> builder) {
        this.code = builder.getCode();
        this.message = builder.getMessage();

        this.data = builder.getData();

        this.timestamp = builder.getTimestamp();
    }

    public ResponseVo(int code, String message) {
        this.code = code;
        this.message = message;
    }

    public ResponseVo(int code, String message, T data) {
        this.code = code;
        this.message = message;
        this.data = data;
    }

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public String getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }


    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }

    public boolean isSuccess() {
        return this.code == ResponseCode.SUCCESS.getCode();
    }

    @Override
    public int hashCode() {
        return Objects.hash(this.code, this.message, this.data, this.timestamp);
    }

    @Override
    public boolean equals(Object obj) {
        return super.equals(obj);
    }


    @Override
    public String toString() {
        return new ToStringBuilder(this)
                .append("code", code)
                .append("message", message)
                .append("data", data)
                .append("timestamp", timestamp)
                .toString();
    }
}

8. 控制器层

创建 IotDBApiController,提供 RESTful API 接口,用于与前端或其他服务交互。例如,查询实时数据的接口如下:

@RestController
@Slf4j
@RequestMapping("/api")
public class IotDBApiController {

    //http://localhost:9091/data-core/api/test

    @Autowired
    IotDBService iotDBService;

    @PostMapping(value = "/realtime", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
    public ResponseVo<?> realtime(@RequestBody IotDbRequestQueryVo request) {
        log.info("获取点码实时数据::{}", request);
        try {
            Asserts.notNull(request, "参数为空");
            Asserts.notNull(request.getDb(), "数据库为空");
            Asserts.notNull(request.getDevice(), "设备路径为空");
            Asserts.notNull(request.getPoints(), "测点编码为空");
            return ResponseBuilder.buildSuccessResponse(iotDBService.realtime(request.getDb(), request.getDevice(), request.getPoints()));
        } catch (Exception e) {
            return ResponseBuilder.buildErrorResponse(e.getMessage());
        }
    }

    @PostMapping(value = "/avgValue", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
    public ResponseVo<?> avgValue(@RequestBody IotDbRequestQueryVo request) {
        log.info("获取点码平均数据::{}", request);
        try {
            Asserts.notNull(request, "参数为空");
            Asserts.notNull(request.getDb(), "数据库为空");
            Asserts.notNull(request.getDevice(), "设备路径为空");
            Asserts.notNull(request.getPoints(), "测点编码为空");
            return ResponseBuilder.buildSuccessResponse(iotDBService.avgValue(request.getDevice(), request.getPoints(), request.getStartTime(), request.getEndTime()));
        } catch (Exception e) {
            return ResponseBuilder.buildErrorResponse(e.getMessage());
        }
    }

    @PostMapping(value = "/statisticsValue", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
    public ResponseVo<?> statisticsValue(@RequestBody IotDbRequestQueryVo request) {
        log.info("获取点码最大最小值数据::{}", request);
        try {
            Asserts.notNull(request, "参数为空");
            Asserts.notNull(request.getDevice(), "设备路径为空");
            Asserts.notNull(request.getPoints(), "测点编码为空");
            return ResponseBuilder.buildSuccessResponse(iotDBService.statisticsValue(request.getDevice(), request.getPoints(), request.getStartTime(), request.getEndTime()));
        } catch (Exception e) {
            return ResponseBuilder.buildErrorResponse(e.getMessage());
        }
    }

    @PostMapping(value = "/sumValue", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
    public ResponseVo<?> sumValue(@RequestBody IotDbRequestQueryVo request) {
        log.info("获取点码求和数据::{}", request);
        try {
            Asserts.notNull(request, "参数为空");
            Asserts.notNull(request.getDevice(), "设备路径为空");
            Asserts.notNull(request.getPoints(), "测点编码为空");
            return ResponseBuilder.buildSuccessResponse(iotDBService.sumValue(request.getDevice(), request.getPoints(), request.getStartTime(), request.getEndTime()));
        } catch (Exception e) {
            return ResponseBuilder.buildErrorResponse(e.getMessage());
        }
    }

    @PostMapping(value = "/history", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
    public ResponseVo<?> history(@RequestBody IotDbRequestQueryVo request) {
        log.info("获取点码历史数据::{}", request);
        try {
            Asserts.notNull(request, "参数为空");
            Asserts.notNull(request.getDevice(), "设备路径为空");
            Asserts.notNull(request.getPoints(), "测点编码为空");
            Asserts.notNull(request.getStartTime(), "开始时间为空");
            Asserts.notNull(request.getEndTime(), "结束时间为空");
            Asserts.notNull(request.getPage(), "分页为空");
            Asserts.notNull(request.getPageSize(), "页大小为空");
            return ResponseBuilder.buildSuccessResponse(iotDBService.historyData(request.getDevice(), request.getPoints(), request.getPage(), request.getPageSize(), request.getStartTime(), request.getEndTime()));
        } catch (Exception e) {
            return ResponseBuilder.buildErrorResponse(e.getMessage());
        }
    }

    @PostMapping(value = "/windowAgg", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
    public ResponseVo<?> windowAgg(@RequestBody IotDbRequestQueryVo request) {
        log.info("根据时间窗口按照时间间隔获取点码历史数据::{}", request);
        try {
            Asserts.notNull(request, "参数为空");
            Asserts.notNull(request.getDevice(), "设备路径为空");
            Asserts.notNull(request.getPoints(), "测点编码为空");
            Asserts.notNull(request.getStartTime(), "开始时间为空");
            Asserts.notNull(request.getEndTime(), "结束时间为空");
            Asserts.notNull(request.getPage(), "分页为空");
            Asserts.notNull(request.getPageSize(), "页大小为空");
            return ResponseBuilder.buildSuccessResponse(iotDBService.windowAggHistoryData(request.getDevice(), request.getPoints(), request.getPage(), request.getPageSize(), request.getStartTime(), request.getEndTime(), request.getWindowSize(), request.getInterval()));
        } catch (Exception e) {
            return ResponseBuilder.buildErrorResponse(e.getMessage());
        }
    }

    @PostMapping(value = "/batchInsert", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
    public ResponseVo<?> batchInsert(@RequestBody IotDbInsertBeanDto insertBean) {
//        log.info("批量插入数据::{}", insertBean);
        try {
            Asserts.notNull(insertBean, "参数为空");
            Asserts.notNull(insertBean.getDeviceId(), "设备路径为空");
            Asserts.notNull(insertBean.getPointArr(), "测点编码为空");
            Asserts.notNull(insertBean.getTypes(), "数据类型为空");
            //成功插入的条数
            int total = iotDBService.insertData(insertBean.getDeviceId(), insertBean.getPointArr(), insertBean.getTypes(), insertBean.getDataJson());
            return ResponseBuilder.buildSuccessResponse(total);
        } catch (Exception e) {
            return ResponseBuilder.buildErrorResponse(e.getMessage());
        }
    }

    @PostMapping(value = "/batch/realtime", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
    public ResponseVo<?> batchRealtime(@RequestBody IotDbBatchRequestQueryVo request) {
        log.info("获取点码实时数据::{}", request);
        try {
            Asserts.notNull(request, "参数为空");
            Asserts.notNull(request.getDeviceIds(), "设备路径为空");
            Asserts.notNull(request.getPoints(), "测点编码为空");
            return ResponseBuilder.buildSuccessResponse(iotDBService.batchRealtime(request.getDeviceIds(), request.getPoints()));
        } catch (Exception e) {
            return ResponseBuilder.buildErrorResponse(e.getMessage());
        }
    }

    @PostMapping(value = "/batch/history", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
    public ResponseVo<?> batchHistory(@RequestBody IotDbBatchHistoryRequestQueryVo request) {
        log.info("获取点码历史数据::{}", request);
        try {
            Asserts.notNull(request, "参数为空");
            Asserts.notNull(request.getDeviceIds(), "设备路径为空");
            Asserts.notNull(request.getPoints(), "测点编码为空");
            Asserts.notNull(request.getStartTime(), "开始时间为空");
            Asserts.notNull(request.getEndTime(), "结束时间为空");
            Asserts.notNull(request.getPage(), "分页为空");
            Asserts.notNull(request.getPageSize(), "页大小为空");
            return ResponseBuilder.buildSuccessResponse(iotDBService.batchHistoryData(request.getDeviceIds(), request.getPoints(), request.getPage(), request.getPageSize(), request.getStartTime(), request.getEndTime()));
        } catch (Exception e) {
            return ResponseBuilder.buildErrorResponse(e.getMessage());
        }
    }


    @PostMapping(value = "/batch/windowAgg", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
    public ResponseVo<?> batchWindowAgg(@RequestBody IotDbBatchHistoryRequestQueryVo request) {
        log.info("多设备多测点根据时间窗口按照时间间隔获取点码历史数据::{}", request);
        try {
            Asserts.notNull(request, "参数为空");
            Asserts.notNull(request.getDeviceIds(), "设备路径为空");
            Asserts.notNull(request.getPoints(), "测点编码为空");
            Asserts.notNull(request.getStartTime(), "开始时间为空");
            Asserts.notNull(request.getEndTime(), "结束时间为空");
            Asserts.notNull(request.getPage(), "分页为空");
            Asserts.notNull(request.getPageSize(), "页大小为空");
            return ResponseBuilder.buildSuccessResponse(iotDBService.batchWindowAggHistoryData(request.getDeviceIds(), request.getPoints(), request.getPage(), request.getPageSize(), request.getStartTime(), request.getEndTime(), request.getWindowSize(), request.getInterval()));
        } catch (Exception e) {
            return ResponseBuilder.buildErrorResponse(e.getMessage());
        }
    }
}

9. 总结

通过本文介绍的 Spring Boot 和 IoTDB 集成方法,我们可以轻松实现高效时序数据的管理。IoTDB 的高性能特性和 Spring Boot 的灵活架构相结合,为物联网和工业互联网应用提供了强大的数据处理能力。

作者:stay_hungry&&modest

物联沃分享整理
物联沃-IOTWORD物联网 » Spring Boot与IoTDB集成:实现高效时序数据管理模式

发表回复