Spring Boot与IoTDB集成:实现高效时序数据管理模式
在物联网(IoT)和工业互联网领域,时序数据的高效管理和分析是至关重要的。Apache IoTDB 是一个高性能的时序数据库,专为处理大规模时序数据而设计。本文将介绍如何在 Spring Boot 应用程序中采用IotDB-Session的方式集成 IoTDB,实现数据的插入、查询和聚合等功能。
1. 项目背景
随着物联网设备的普及,数据量呈爆发式增长。这些数据通常是时间序列数据,具有高吞吐量、高并发和快速写入的特点。传统的关系型数据库在处理这类数据时往往力不从心,而时序数据库(如 IoTDB)则能够更好地满足这些需求。
IoTDB 是一个开源的时序数据库,支持高效的数据插入和查询,特别适合处理设备传感器数据、工业数据等。通过 Spring Boot 集成 IoTDB,我们可以快速构建一个高性能的时序数据管理系统。
2. 环境准备
在开始之前,确保你已经安装了以下环境:
-
Java Development Kit (JDK):推荐使用 JDK 1.8 或更高版本。
-
Maven:用于项目构建和依赖管理。
-
IoTDB:下载并安装 IoTDB,并确保其服务已启动。
-
Spring Boot:用于构建应用程序。
3. 项目依赖配置
在 pom.xml
中,添加 IoTDB 的 Java 客户端依赖以及其他必要的 Spring Boot 依赖:
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>1.3.0</version>
</dependency>
4. 配置文件
在 application-dev.yml
中,配置 IoTDB 的连接信息:
spring:
iotdb:
username: root
password: root
hostUrl:
- 127.0.0.1:6667
maxSize: 100
5. IoTDB 会话管理
创建 IotDbSessionPoolManager
类,用于管理 IoTDB 的会话连接池:
@Component
@Slf4j
@ConfigurationProperties(prefix = "spring.iotdb")
@Setter
public class IotDbSessionPoolManager {
private String username;
private String password;
private List<String> hostUrl;
private int maxSize;
private static SessionPool sessionPool;
public SessionPool getSessionPool() {
if (sessionPool == null) {
sessionPool = new SessionPool(hostUrl, username, password, maxSize);
}
return sessionPool;
}
@PostConstruct
public void init() {
// 创建 SessionPool
log.info("====SessionPool init====");
sessionPool = new SessionPool(hostUrl, username, password, maxSize);
}
@PreDestroy
public void destroy() {
// 关闭 SessionPool
log.info("====SessionPool destroy====");
close();
}
/**
* 关闭连接池
*/
public void close() {
if (sessionPool != null) {
sessionPool.close();
sessionPool = null;
}
}
}
6. 服务接口与实现
定义 IotDBService
接口,提供各种数据操作方法,例如插入数据、查询实时数据、查询历史数据、聚合查询等:
public interface IotDBService {
/**
* 根据设备获取指标实时数据
*
* @param db iotdb库名
* @param device iotdb设备路径
* @param points iotdb测点编码
* @return
*/
List<RealtimeValueDto> realtime(String db, String device, List<String> points);
/**
* 根据设备获取指标点码平均值
*
* @param device iotdb设备路径 用点隔开
* @param points iotdb设备路径 用点隔开
* @param startTime 开始时间 秒级时间戳
* @param endTime 结束时间 秒级时间戳
* @return
*/
List<AvgValueDto> avgValue(String device, List<String> points, Long startTime, Long endTime);
/**
* 根据设备获取指标点码最大值和最小值
*
* @param device iotdb设备路径 用点隔开
* @param points iotdb设备路径 用点隔开
* @param startTime 开始时间 秒级时间戳
* @param endTime 结束时间 秒级时间戳
* @return
*/
List<StatisticsValueDto> statisticsValue(String device, List<String> points, Long startTime, Long endTime);
/**
* 根据设备求和指标点码
*
* @param device
* @param points
* @return
*/
List<SumValueDto> sumValue(String device, List<String> points, Long startTime, Long endTime);
/**
* 获取点码历史数据
*
* @param device
* @param: points
* @param: page
* @param: pageSize
* @param: startTime
* @param: endTime
*/
HistoryValuesDto historyData(String device, List<String> points, Integer page, Integer pageSize, Long startTime, Long endTime);
/**
* 根据时间窗口按照时间间隔获取点码历史数据
*
* @param device
* @param: points
* @param: page
* @param: pageSize
* @param: startTime
* @param: endTime
* @param: windowSize 间隔窗口大小
*/
HistoryValuesDto windowAggHistoryData(String device, List<String> points, Integer page, Integer pageSize, Long startTime, Long endTime, Integer windowSize, String interval);
/**
* 插入数据到IoTDB
*
* @param device deviceId: String - 设备的唯一标识符。
* @param points pointArr: String[] - 要插入的数据点名称列表。
* @param types types: TSDataType[] - 各数据点对应的数据类型列表,支持的数据类型(BOOLEAN,INT32,INT64,FLOAT,DOUBLE,TEXT)。
* @param dataJson dataJson: String - 包含要插入的数据的JSON字符串。
* {"data":[{"timestamp":1617187200,"temperature":"111","speed":"1"},{"timestamp":1617187200,"temperature":"111","speed":"1"}]}
* timestamp: Long (可选) - 数据点的时间戳,单位为毫秒。如果不传,则默认为当前时间。
*/
int insertData(String device, List<String> points, List<TSDataType> types, String dataJson);
/**
* 多设备获取指标实时数据
*
* @param deviceIds iotdb设备路径
* @param points iotdb测点编码
* @return
*/
List<BatchRealtimeValueDto> batchRealtime(List<String> deviceIds, List<String> points);
/**
* 多设备获取指标点码历史数据
*
* @param deviceIds iotdb设备路径
* @param points iotdb测点编码
* @return
*/
BatchHistoryValuesDto batchHistoryData(List<String> deviceIds, List<String> points, Integer page, Integer pageSize, Long startTime, Long endTime);
/**
* 根据时间窗口按照时间间隔获取点码历史数据
*
* @param deviceIds
* @param: points
* @param: page
* @param: pageSize
* @param: startTime
* @param: endTime
* @param: windowSize 间隔窗口大小
*/
BatchHistoryValuesDto batchWindowAggHistoryData(List<String> deviceIds, List<String> points, Integer page, Integer pageSize, Long startTime, Long endTime, Integer windowSize, String interval);
}
实现 IotDBServiceImpl
类,具体实现上述接口中的方法:
@Slf4j
@Service
public class IotDBServiceImpl implements IotDBService {
@Autowired
IotDbSessionPoolManager ioTDBSessionPoolManager;
@Override
public List<RealtimeValueDto> realtime(String db, String device, List<String> points) {
//如果请求为空 直接返回null
if (StringUtils.isEmpty(db) || StringUtils.isEmpty(device) || CollectionUtils.isEmpty(points)) {
return null;
}
SessionPool session = null;
SessionDataSetWrapper dataSetWrapper = null;
try {
session = ioTDBSessionPoolManager.getSessionPool();
dataSetWrapper = session.executeLastDataQueryForOneDevice(db, device, points, false);
SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();
List<RealtimeValueDto> realtimeValueDtoList = new ArrayList<>();
while (resultSet.hasNext()) {
RealtimeValueDto realtimeValueDto = RealtimeValueDto.builder().build();
RowRecord record = resultSet.next();
realtimeValueDto.setTs(record.getTimestamp());
List<Field> fields = record.getFields();
realtimeValueDto.setTimeSeries(fields.get(0).getStringValue());
realtimeValueDto.setVal(fields.get(1).getStringValue());
realtimeValueDtoList.add(realtimeValueDto);
}
return realtimeValueDtoList;
} catch (Throwable ex) {
log.error("realtime error", ex);
throw new RuntimeException(ex);
} finally {
// 释放资源
if (session != null && dataSetWrapper != null) {
session.closeResultSet(dataSetWrapper);
}
}
}
@Override
public List<AvgValueDto> avgValue(String device, List<String> points, Long startTime, Long endTime) {
//如果请求为空 直接返回null
if (StringUtils.isEmpty(device) || CollectionUtils.isEmpty(points)) {
return null;
}
// 构建查询语句
String sql = buildAvgValueQuery(device, points, startTime, endTime);
log.info("avgValue sql::{}", sql);
SessionPool session = null;
SessionDataSetWrapper dataSetWrapper = null;
// 执行查询
try {
session = ioTDBSessionPoolManager.getSessionPool();
dataSetWrapper = session.executeQueryStatement(sql);
SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();
List<AvgValueDto> avgValueList = new ArrayList<>();
while (resultSet.hasNext()) {
RowRecord record = resultSet.next();
List<Field> fields = record.getFields();
for (int i = 0; i < fields.size(); i++) {
AvgValueDto avgValue = AvgValueDto.builder().build();
avgValue.setPoint(points.get(i));
if (!Objects.equals(fields.get(i).getStringValue(), "null")) {
avgValue.setVal(fields.get(i).getDoubleV());
}
avgValueList.add(avgValue);
}
}
return avgValueList;
} catch (Throwable ex) {
log.error("avgValue error", ex);
throw new RuntimeException(ex);
} finally {
// 释放资源
if (session != null && dataSetWrapper != null) {
session.closeResultSet(dataSetWrapper);
}
}
}
@Override
public List<StatisticsValueDto> statisticsValue(String device, List<String> points, Long startTime, Long endTime) {
//如果请求为空 直接返回null
if (StringUtils.isEmpty(device) || CollectionUtils.isEmpty(points)) {
return null;
}
// 构建查询语句
String sql = buildStatisticsValueQuery(device, points, startTime, endTime);
log.info("statisticsValue sql :: {}", sql);
// 执行查询
SessionPool session = null;
SessionDataSetWrapper dataSetWrapper = null;
try {
session = ioTDBSessionPoolManager.getSessionPool();
dataSetWrapper = session.executeQueryStatement(sql);
SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();
List<StatisticsValueDto> statisticsValueDtoList = new ArrayList<>();
while (resultSet.hasNext()) {
RowRecord record = resultSet.next();
List<Field> fields = record.getFields();
for (int i = 0; i < points.size(); i++) {
StatisticsValueDto value = StatisticsValueDto.builder().build();
value.setPoint(points.get(i));
value.setMaxVal(fields.get(i * 2).getStringValue());
value.setMinVal(fields.get(i * 2 + 1).getStringValue());
statisticsValueDtoList.add(value);
}
}
return statisticsValueDtoList;
} catch (Throwable ex) {
log.error("statisticsValue error", ex);
throw new RuntimeException(ex);
} finally {
// 释放资源
if (session != null && dataSetWrapper != null) {
session.closeResultSet(dataSetWrapper);
}
}
}
@Override
public List<SumValueDto> sumValue(String device, List<String> points, Long startTime, Long endTime) {
//如果请求为空 直接返回null
if (StringUtils.isEmpty(device) || CollectionUtils.isEmpty(points)) {
return null;
}
// 构建查询语句
String sql = buildSumValueQuery(device, points, startTime, endTime);
log.info("sumValue sql::{}", sql);
// 执行查询
SessionPool session = null;
SessionDataSetWrapper dataSetWrapper = null;
try {
session = ioTDBSessionPoolManager.getSessionPool();
dataSetWrapper = session.executeQueryStatement(sql);
SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();
List<SumValueDto> sumValueDtoList = new ArrayList<>();
while (resultSet.hasNext()) {
RowRecord record = resultSet.next();
List<Field> fields = record.getFields();
for (int i = 0; i < fields.size(); i++) {
SumValueDto value = SumValueDto.builder().build();
value.setPoint(points.get(i));
if (!Objects.equals(fields.get(i).getStringValue(), "null")) {
value.setVal(fields.get(i).getDoubleV());
}
sumValueDtoList.add(value);
}
}
return sumValueDtoList;
} catch (Throwable ex) {
log.error("sumValue error", ex);
throw new RuntimeException(ex);
} finally {
// 释放资源
if (session != null && dataSetWrapper != null) {
session.closeResultSet(dataSetWrapper);
}
}
}
@Override
public HistoryValuesDto historyData(String device, List<String> points, Integer page, Integer pageSize, Long startTime, Long endTime) {
// 如果请求为空,直接返回null
if (StringUtils.isEmpty(device) || CollectionUtils.isEmpty(points) || page == null || pageSize == null) {
return null;
}
// 执行计数查询
long totalRecords = getQueryResultCount(device, points, startTime, endTime);
if (totalRecords == 0) {
// 如果没有记录,直接返回null
return null;
}
// 将页码转换为基于0的索引
int offset = (page - 1) * pageSize;
// 构建查询语句
String sql = buildHistoryDataQuery(device, points, startTime, endTime, offset, pageSize);
log.info("historyData sql::{}", sql);
// 执行查询
SessionPool session = null;
SessionDataSetWrapper dataSetWrapper = null;
try {
session = ioTDBSessionPoolManager.getSessionPool();
dataSetWrapper = session.executeQueryStatement(sql);
SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();
HistoryValuesDto historyValuesDto = new HistoryValuesDto();
List<HistoryDataDto> historyDataList = new ArrayList<>();
Map<String, HistoryDataDto> historyDataMap = new HashMap<>();
// 计算总页数
int totalPage = (int) Math.ceil((double) totalRecords / pageSize);
while (resultSet.hasNext()) {
RowRecord record = resultSet.next();
// 获取时间戳
long timestamp = record.getTimestamp();
List<Field> fields = record.getFields();
// 遍历所有数据点
for (int i = 0; i < points.size(); i++) {
String point = points.get(i); // 当前数据点的名称
Object val = fields.get(i).getStringValue(); // 当前数据点的值
// 创建或获取HistoryData对象
HistoryDataDto historyData = historyDataMap.computeIfAbsent(point, k -> {
HistoryDataDto newHistoryData = new HistoryDataDto();
newHistoryData.setPoint(k);
newHistoryData.setValues(new ArrayList<>());
return newHistoryData;
});
// 创建HistoryValue对象并添加到HistoryData的values列表中
HistoryValueDto historyValueDto = new HistoryValueDto();
historyValueDto.setTimestamp(timestamp);
historyValueDto.setVal(val);
historyData.getValues().add(historyValueDto);
}
}
// 将Map中的所有HistoryData对象添加到列表中
historyDataList.addAll(historyDataMap.values());
historyValuesDto.setHistoryData(historyDataList);
historyValuesDto.setTotal((int) totalRecords);
historyValuesDto.setPageSize(pageSize);
historyValuesDto.setPage(totalPage);
historyValuesDto.setCurrent(page);
return historyValuesDto;
} catch (Throwable ex) {
log.error("historyData error", ex);
throw new RuntimeException(ex);
} finally {
// 释放资源
if (session != null && dataSetWrapper != null) {
session.closeResultSet(dataSetWrapper);
}
}
}
@Override
public HistoryValuesDto windowAggHistoryData(String device, List<String> points, Integer page, Integer pageSize, Long startTime, Long endTime, Integer windowSize, String interval) {
// 如果请求为空,直接返回null
if (StringUtils.isEmpty(device) || CollectionUtils.isEmpty(points) || page == null || pageSize == null || windowSize == null || StringUtils.isEmpty(interval)) {
return null;
}
// 执行计数查询
long totalRecords = getQueryResultCountForSingleDevices(device, points, startTime, endTime, windowSize, interval);
if (totalRecords == 0) {
// 如果没有记录,直接返回null
return null;
}
// 将页码转换为基于0的索引
int offset = (page - 1) * pageSize;
// 构建查询语句
String sql = buildWindowAggQuery(device, points, startTime, endTime, windowSize, interval, offset, pageSize);
log.info("windowAggHistoryData sql::{}", sql);
// 执行查询
SessionPool session = null;
SessionDataSetWrapper dataSetWrapper = null;
try {
session = ioTDBSessionPoolManager.getSessionPool();
dataSetWrapper = session.executeQueryStatement(sql);
SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();
HistoryValuesDto historyValuesDto = new HistoryValuesDto();
List<HistoryDataDto> historyDataList = new ArrayList<>();
// 计算总页数
int totalPage = (int) Math.ceil((double) totalRecords / pageSize);
log.info("totalPage::{},totalRecords::{},::pageSize{}", totalPage, totalRecords, pageSize);
while (resultSet.hasNext()) {
RowRecord record = resultSet.next();
// 获取时间戳
long timestamp = record.getTimestamp();
List<Field> fields = record.getFields();
// 遍历所有数据点
for (int i = 0; i < points.size(); i++) {
String point = points.get(i); // 当前数据点的名称
Object val = fields.get(i).getStringValue(); // 当前数据点的值
// 创建或获取HistoryData对象
HistoryDataDto historyData = historyDataList.stream()
.filter(h -> h.getPoint().equals(point))
.findFirst()
.orElseGet(() -> {
HistoryDataDto newHistoryData = new HistoryDataDto();
newHistoryData.setPoint(point);
newHistoryData.setValues(new ArrayList<>());
historyDataList.add(newHistoryData);
return newHistoryData;
});
// 创建HistoryValue对象并添加到HistoryData的values列表中
HistoryValueDto historyValueDto = new HistoryValueDto();
historyValueDto.setTimestamp(timestamp);
historyValueDto.setVal(val);
historyData.getValues().add(historyValueDto);
}
}
historyValuesDto.setHistoryData(historyDataList);
historyValuesDto.setTotal((int) totalRecords);
historyValuesDto.setPageSize(pageSize);
historyValuesDto.setPage(totalPage);
historyValuesDto.setCurrent(page);
return historyValuesDto;
} catch (Throwable ex) {
log.error("windowAggHistoryData error", ex);
throw new RuntimeException(ex);
} finally {
// 释放资源
if (session != null && dataSetWrapper != null) {
session.closeResultSet(dataSetWrapper);
}
}
}
/**
* 插入数据到IoTDB
*
* @param deviceId 设备的唯一标识符
* @param pointArr 要插入的数据点名称列表
* @param types 各数据点对应的数据类型列表
* @param dataJson 包含要插入的数据的JSON字符串
* timestamp 数据点的时间戳,单位为毫秒。如果不传,则默认为当前时间。
*/
@Override
public int insertData(String deviceId, List<String> pointArr, List<TSDataType> types, String dataJson) {
// log.info("dataJson===>{}", dataJson);
List<MeasurementSchema> schemaList = new ArrayList<>();
int insertedRowCount = 0; // 用于跟踪成功插入的行数
try {
SessionPool session = ioTDBSessionPoolManager.getSessionPool();
for (int i = 0; i < pointArr.size(); i++) {
schemaList.add(new MeasurementSchema(pointArr.get(i), types.get(i)));
}
JSONObject dataJsonObj = new JSONObject(dataJson);
JSONArray dataArray = dataJsonObj.getJSONArray("data");
if (dataArray != null && dataArray.length() > 0) {
Tablet tablet = new Tablet(deviceId, schemaList, dataArray.length());
tablet.rowSize = dataArray.length();
for (int i = 0; i < tablet.rowSize; i++) {
JSONObject dataObj = dataArray.getJSONObject(i);
long timestamp = dataObj.getLong("timestamp");
tablet.addTimestamp(i, timestamp);
for (int j = 0; j < pointArr.size(); j++) {
String point = pointArr.get(j);
if (dataObj.has(point)) {
Object value = dataObj.get(point);
// 根据数据类型将值添加到 Tablet
if (types.get(j) == TSDataType.BOOLEAN) {
tablet.addValue(point, i, Boolean.parseBoolean(value.toString()));
} else if (types.get(j) == TSDataType.INT32) {
tablet.addValue(point, i, Integer.parseInt(value.toString()));
} else if (types.get(j) == TSDataType.INT64) {
tablet.addValue(point, i, Long.parseLong(value.toString()));
} else if (types.get(j) == TSDataType.FLOAT) {
tablet.addValue(point, i, Float.parseFloat(value.toString()));
} else if (types.get(j) == TSDataType.DOUBLE) {
tablet.addValue(point, i, Double.parseDouble(value.toString()));
} else if (types.get(j) == TSDataType.TEXT) {
tablet.addValue(point, i, value.toString());
}
}
}
}
if (tablet.rowSize != 0) {
session.insertTablet(tablet);
log.info("Inserted data into device: {}", deviceId);
insertedRowCount = tablet.rowSize; // 更新成功插入的行数
}
}
} catch (Throwable ex) {
log.error("insertData error", ex);
throw new RuntimeException(ex);
}
return insertedRowCount; // 返回成功插入的行数
}
@Override
public List<BatchRealtimeValueDto> batchRealtime(List<String> deviceIds, List<String> points) {
List<BatchRealtimeValueDto> batchRealtimeValues = new ArrayList<>();
SessionPool session = null;
SessionDataSetWrapper dataSetWrapper = null;
try {
String sql = buildLastDataQuery(deviceIds, points);
log.info("batchRealtime sql::{}", sql);
session = ioTDBSessionPoolManager.getSessionPool();
dataSetWrapper = session.executeQueryStatement(sql);
SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();
// 用于存储每个设备的最新数据
Map<String, BatchRealtimeValueDto> deviceDataMap = new HashMap<>();
while (resultSet.hasNext()) {
// 获取设备路径和时间序列名称和时间戳
RowRecord record = resultSet.next();
long timestamp = record.getTimestamp();
String timeSeries = record.getFields().get(0).getStringValue();
String currentDeviceId = timeSeries.substring(0, timeSeries.lastIndexOf("."));
for (String deviceId : deviceIds) {
// 如果当前记录的设备ID与遍历的设备ID匹配
if (currentDeviceId.equals(deviceId)) {
// 初始化设备的最新数据对象
BatchRealtimeValueDto batchRealtimeValue = deviceDataMap.computeIfAbsent(deviceId, k -> {
BatchRealtimeValueDto newValue = new BatchRealtimeValueDto();
newValue.setDeviceId(k);
newValue.setRealtimeData(new ArrayList<>());
return newValue;
});
RealtimeValueDto realtimeValueDto = RealtimeValueDto.builder().build();
realtimeValueDto.setTs(timestamp);
realtimeValueDto.setTimeSeries(timeSeries);
// 假设数据类型为 STRING,需要根据实际数据类型进行转换
realtimeValueDto.setVal(record.getFields().get(1).getStringValue());
batchRealtimeValue.getRealtimeData().add(realtimeValueDto);
}
}
}
// 将所有设备的最新数据添加到结果列表
batchRealtimeValues.addAll(deviceDataMap.values());
} catch (Exception ex) {
log.error("batchRealtime error", ex);
throw new RuntimeException(ex);
} finally {
// 释放资源
if (session != null && dataSetWrapper != null) {
session.closeResultSet(dataSetWrapper);
}
}
return batchRealtimeValues;
}
@Override
public BatchHistoryValuesDto batchHistoryData(List<String> deviceIds, List<String> points, Integer page, Integer pageSize, Long startTime, Long endTime) {
// 如果请求为空,直接返回null
if (CollectionUtils.isEmpty(deviceIds) || CollectionUtils.isEmpty(points) || page == null || pageSize == null) {
return null;
}
// 执行计数查询
long totalRecords = getQueryResultCountForMultipleDevices(deviceIds, points, startTime, endTime);
if (totalRecords == 0) {
// 如果没有记录,直接返回null
return null;
}
// 将页码转换为基于0的索引
int offset = (page - 1) * pageSize;
// 计算总页数和偏移量
int totalPage = (int) Math.ceil((double) totalRecords / pageSize);
// 构建查询语句
String sql = buildHistoryDataQuery(deviceIds, points, startTime, endTime, offset, pageSize);
log.info("batchHistoryData sql::{}", sql);
SessionPool session = null;
SessionDataSetWrapper dataSetWrapper = null;
try {
session = ioTDBSessionPoolManager.getSessionPool();
dataSetWrapper = session.executeQueryStatement(sql);
SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();
BatchHistoryValuesDto batchHistoryValuesDto = new BatchHistoryValuesDto();
List<HistoryDataByDeviceDto> historyDataByDeviceList = new ArrayList<>();
Map<String, HistoryDataByDeviceDto> deviceDataMap = new HashMap<>();
// 获取列名
List<String> columnNames = resultSet.getColumnNames();
while (resultSet.hasNext()) {
RowRecord record = resultSet.next();
long timestamp = record.getTimestamp();
// 遍历字段,从第一个字段开始(跳过时间字段)
for (int i = 1; i < columnNames.size(); i++) { // 从1开始跳过时间字段
String fieldName = columnNames.get(i);
Object value = record.getFields().get(i - 1).getStringValue(); // 获取值
// 根据字段名解析设备 ID 和测点名
String deviceId = fieldName.substring(0, fieldName.lastIndexOf(".")); // 设备 ID
String point = fieldName.substring(fieldName.lastIndexOf(".") + 1); // 测点名
// 获取或创建 HistoryDataByDeviceDto 对象
HistoryDataByDeviceDto deviceData = deviceDataMap.computeIfAbsent(deviceId, k -> {
HistoryDataByDeviceDto newDeviceData = new HistoryDataByDeviceDto();
newDeviceData.setDeviceId(k);
newDeviceData.setHistoryData(new ArrayList<>());
return newDeviceData;
});
// 创建或获取 HistoryDataDto 对象
HistoryDataDto historyData = deviceData.getHistoryData().stream()
.filter(h -> h.getPoint().equals(point))
.findFirst()
.orElseGet(() -> {
HistoryDataDto newHistoryData = new HistoryDataDto();
newHistoryData.setPoint(point);
newHistoryData.setValues(new ArrayList<>());
deviceData.getHistoryData().add(newHistoryData);
return newHistoryData;
});
// 创建并添加 HistoryValueDto 对象
HistoryValueDto historyValueDto = new HistoryValueDto();
historyValueDto.setTimestamp(timestamp);
historyValueDto.setVal(value);
historyData.getValues().add(historyValueDto);
}
}
historyDataByDeviceList.addAll(deviceDataMap.values());
batchHistoryValuesDto.setHistoryDataByDevice(historyDataByDeviceList);
batchHistoryValuesDto.setTotal((int) totalRecords);
batchHistoryValuesDto.setPage(totalPage);
batchHistoryValuesDto.setCurrent(page);
batchHistoryValuesDto.setPageSize(pageSize);
return batchHistoryValuesDto;
} catch (Throwable ex) {
log.error("batchHistoryData error", ex);
throw new RuntimeException("Error fetching batch history data", ex);
} finally {
// 释放资源
if (session != null && dataSetWrapper != null) {
session.closeResultSet(dataSetWrapper);
}
}
}
@Override
public BatchHistoryValuesDto batchWindowAggHistoryData(List<String> deviceIds, List<String> points, Integer page, Integer pageSize, Long startTime, Long endTime, Integer windowSize, String interval) {
// 如果请求为空,直接返回null
if (CollectionUtils.isEmpty(deviceIds) || CollectionUtils.isEmpty(points) || page == null || pageSize == null || windowSize == null || interval == null) {
return null;
}
// 执行计数查询
long totalRecords = getQueryResultCountForMultipleDevices(deviceIds, points, startTime, endTime, windowSize, interval, pageSize, page);
if (totalRecords == 0) {
return null;
}
// 分页计算
int offset = (page - 1) * pageSize;
int totalPage = (int) Math.ceil((double) totalRecords / pageSize);
// 构建查询语句
String sql = buildBatchWindowAggQuery(deviceIds, points, startTime, endTime, windowSize, interval, offset, pageSize);
log.info("batchWindowAggHistoryData sql::{}", sql);
SessionPool session = null;
SessionDataSetWrapper dataSetWrapper = null;
try {
session = ioTDBSessionPoolManager.getSessionPool();
dataSetWrapper = session.executeQueryStatement(sql);
SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();
BatchHistoryValuesDto batchHistoryValuesDto = new BatchHistoryValuesDto();
List<HistoryDataByDeviceDto> historyDataByDeviceList = new ArrayList<>();
Map<String, HistoryDataByDeviceDto> deviceDataMap = new HashMap<>();
// 获取列名
List<String> columnNames = resultSet.getColumnNames();
while (resultSet.hasNext()) {
RowRecord record = resultSet.next();
long timestamp = record.getTimestamp();
// 遍历字段,从第一个字段开始(跳过时间字段)
for (int i = 1; i < columnNames.size(); i++) { // 从1开始跳过时间字段
String fieldName = columnNames.get(i);
Object value = record.getFields().get(i - 1).getStringValue(); // 获取值
// fieldName 形式如 "avg(root.ln.wf01.wt01.temperature)"
// 确保提取正确的设备 ID 和测点名
String cleanedFieldName = fieldName.trim(); // 去掉首尾空格
// 使用正则表达式来提取设备 ID 和测点名
// 修改这里的正则表达式以提取完整的 deviceId
Pattern pattern = Pattern.compile("avg\\((.+?)\\.([\\w`]+)\\)"); // 匹配 deviceId 和 point
Matcher matcher = pattern.matcher(cleanedFieldName);
if (matcher.find()) {
String deviceId = matcher.group(1); // 获取到除最后一段的设备 ID
String point = matcher.group(2); // 获取最后一段作为测点名
// 创建或获取 HistoryDataByDeviceDto 对象
HistoryDataByDeviceDto deviceData = deviceDataMap.computeIfAbsent(deviceId, k -> {
HistoryDataByDeviceDto newDeviceData = new HistoryDataByDeviceDto();
newDeviceData.setDeviceId(k);
newDeviceData.setHistoryData(new ArrayList<>());
return newDeviceData;
});
// 创建或获取 HistoryDataDto 对象
HistoryDataDto historyData = deviceData.getHistoryData().stream()
.filter(h -> h.getPoint().equals(point))
.findFirst()
.orElseGet(() -> {
HistoryDataDto newHistoryData = new HistoryDataDto();
newHistoryData.setPoint(point);
newHistoryData.setValues(new ArrayList<>());
deviceData.getHistoryData().add(newHistoryData);
return newHistoryData;
});
// 创建并添加 HistoryValueDto 对象
HistoryValueDto historyValueDto = new HistoryValueDto();
historyValueDto.setTimestamp(timestamp);
historyValueDto.setVal(value);
historyData.getValues().add(historyValueDto);
}
}
}
historyDataByDeviceList.addAll(deviceDataMap.values());
batchHistoryValuesDto.setHistoryDataByDevice(historyDataByDeviceList);
batchHistoryValuesDto.setTotal((int) totalRecords);
batchHistoryValuesDto.setPage(totalPage);
batchHistoryValuesDto.setCurrent(page);
batchHistoryValuesDto.setPageSize(pageSize);
return batchHistoryValuesDto;
} catch (Throwable ex) {
log.error("batchWindowAggHistoryData error", ex);
throw new RuntimeException("Error fetching batch history data", ex);
} finally {
//释放资源
if (session != null && dataSetWrapper != null) {
session.closeResultSet(dataSetWrapper);
}
}
}
public long getQueryResultCountForSingleDevices(String device, List<String> points, Long startTime, Long endTime, Integer windowSize, String interval) {
long count = 0;
SessionPool session = null;
SessionDataSetWrapper dataSetWrapper = null;
try {
session = ioTDBSessionPoolManager.getSessionPool();
// 构建查询语句
String sql = buildWindowAggQuery(device, points, startTime, endTime, windowSize, interval);
log.info("getQueryResultCountForSingleDevices sql::{}", sql);
// 执行查询
dataSetWrapper = session.executeQueryStatement(sql);
SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();
// 由于是聚合查询,获取行数的方法是简单地调用 next(),不需要记录具体的字段
while (resultSet.hasNext()) {
resultSet.next();
count++;
}
} catch (Exception ex) {
log.error("getQueryResultCountForSingleDevices error", ex);
throw new RuntimeException(ex);
} finally {
// 释放资源
if (session != null && dataSetWrapper != null) {
session.closeResultSet(dataSetWrapper);
}
}
return count;
}
public long getQueryResultCountForMultipleDevices(List<String> devices, List<String> points, Long startTime, Long endTime, Integer windowSize, String interval, int offset, int limit) {
long count = 0;
SessionPool session = null;
SessionDataSetWrapper dataSetWrapper = null;
try {
session = ioTDBSessionPoolManager.getSessionPool();
// 构建查询语句
String sql = buildBatchWindowAggQueryCount(devices, points, startTime, endTime, windowSize, interval);
log.info("getQueryResultCountForMultipleDevices sql::{}", sql);
// 执行查询
dataSetWrapper = session.executeQueryStatement(sql);
SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();
// 由于是聚合查询,获取行数的方法是简单地调用 next(),不需要记录具体的字段
while (resultSet.hasNext()) {
resultSet.next();
count++;
}
} catch (Exception ex) {
log.error("getQueryResultCountForMultipleDevices error", ex);
throw new RuntimeException(ex);
} finally {
// 释放资源
if (session != null && dataSetWrapper != null) {
session.closeResultSet(dataSetWrapper);
}
}
return count;
}
private static String buildBatchWindowAggQueryCount(List<String> devices, List<String> points, Long startTime, Long endTime, Integer windowSize, String interval) {
if (devices == null || devices.isEmpty() || points == null || points.isEmpty() || startTime == null || endTime == null || windowSize == null || interval == null) {
throw new IllegalArgumentException("Invalid input parameters");
}
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append("SELECT ");
// 构建SELECT部分,多个测点的平均值
for (int i = 0; i < points.size(); i++) {
queryBuilder.append("avg(").append(points.get(i)).append(")");
if (i < points.size() - 1) {
queryBuilder.append(", ");
}
}
// FROM部分,多个设备用逗号分隔
queryBuilder.append(" FROM ").append(String.join(", ", devices));
// GROUP BY部分
queryBuilder.append(" GROUP BY ([")
.append(startTime).append(", ")
.append(endTime).append("), ")
.append(windowSize).append(interval)
.append(") ");
// ORDER BY部分
queryBuilder.append("ORDER BY time ASC ");
return queryBuilder.toString();
}
private static String buildBatchWindowAggQuery(List<String> devices, List<String> points, Long startTime, Long endTime, Integer windowSize, String interval, int offset, int limit) {
if (devices == null || devices.isEmpty() || points == null || points.isEmpty() || startTime == null || endTime == null || windowSize == null || interval == null) {
throw new IllegalArgumentException("Invalid input parameters");
}
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append("SELECT ");
// 构建SELECT部分,多个测点的平均值
for (int i = 0; i < points.size(); i++) {
queryBuilder.append("avg(").append(points.get(i)).append(")");
if (i < points.size() - 1) {
queryBuilder.append(", ");
}
}
// FROM部分,多个设备用逗号分隔
queryBuilder.append(" FROM ").append(String.join(", ", devices));
// GROUP BY部分
queryBuilder.append(" GROUP BY ([")
.append(startTime).append(", ")
.append(endTime).append("), ")
.append(windowSize).append(interval)
.append(") ");
// ORDER BY部分
queryBuilder.append("ORDER BY time ASC fill(previous) ");
// queryBuilder.append("ORDER BY time ASC ");
// LIMIT和OFFSET部分
queryBuilder.append("LIMIT ").append(limit).append(" OFFSET ").append(offset);
return queryBuilder.toString();
}
private String buildHistoryDataQuery(List<String> deviceIds, List<String> points, Long startTime, Long endTime, int offset, int limit) {
StringBuilder sqlBuilder = new StringBuilder("SELECT ");
for (int i = 0; i < points.size(); i++) {
sqlBuilder.append(points.get(i));
if (i < points.size() - 1) {
sqlBuilder.append(", ");
}
}
sqlBuilder.append(" FROM ");
for (int i = 0; i < deviceIds.size(); i++) {
sqlBuilder.append(deviceIds.get(i));
if (i < deviceIds.size() - 1) {
sqlBuilder.append(", ");
}
}
sqlBuilder.append(" WHERE time >= ").append(startTime).append(" AND time <= ").append(endTime);
sqlBuilder.append(" ORDER BY time ASC");
sqlBuilder.append(" LIMIT ").append(limit).append(" OFFSET ").append(offset);
return sqlBuilder.toString();
}
/**
* 根据传入的测量值列表构建查询最新值语句
*/
public static String buildLastDataQuery(List<String> deviceIds, List<String> points) {
StringBuilder sqlBuilder = new StringBuilder("select last ");
// 为每个数据点添加到 SELECT 子句
for (int i = 0; i < points.size(); i++) {
sqlBuilder.append(points.get(i));
if (i < points.size() - 1) {
sqlBuilder.append(", ");
}
}
// 添加 FROM 子句,包括所有设备
sqlBuilder.append(" from ");
for (int i = 0; i < deviceIds.size(); i++) {
sqlBuilder.append(deviceIds.get(i));
if (i < deviceIds.size() - 1) {
sqlBuilder.append(", ");
}
}
// 返回构建的 SQL 查询语句
return sqlBuilder.toString();
}
/**
* 根据传入的参数构建窗口聚合查询语句 降采样聚合查询
*/
private static String buildWindowAggQuery(String device, List<String> points, Long startTime, Long endTime, Integer windowSize, String interval) {
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append("SELECT ");
for (int i = 0; i < points.size(); i++) {
queryBuilder.append("avg(").append(points.get(i)).append(") as avg_").append(points.get(i));
if (i < points.size() - 1) {
queryBuilder.append(", ");
}
}
queryBuilder.append(" FROM ").append(device);
queryBuilder.append(" GROUP BY ([").append(startTime).append(", ").append(endTime).append("), ").append(windowSize).append(interval);
queryBuilder.append(") ORDER BY time asc");
return queryBuilder.toString();
}
/**
* 根据传入的参数构建窗口聚合查询语句 降采样聚合查询
*/
private static String buildWindowAggQuery(String device, List<String> points, Long startTime, Long endTime, Integer windowSize, String interval, int offset, int limit) {
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append("SELECT ");
for (int i = 0; i < points.size(); i++) {
queryBuilder.append("avg(").append(points.get(i)).append(") as avg_").append(points.get(i));
if (i < points.size() - 1) {
queryBuilder.append(", ");
}
}
queryBuilder.append(" FROM ").append(device);
queryBuilder.append(" GROUP BY ([").append(startTime).append(", ").append(endTime).append("), ").append(windowSize).append(interval);
queryBuilder.append(") ORDER BY time asc");
queryBuilder.append(" LIMIT ").append(limit).append(" OFFSET ").append(offset);
return queryBuilder.toString();
}
/**
* 获取指定查询返回的条数
*
* @param device 设备的唯一标识符
* @param points 要查询的数据点名称列表
* @param startTime 查询的开始时间戳
* @param endTime 查询的结束时间戳
* @return 查询返回的条数
*/
public long getQueryResultCount(String device, List<String> points, Long startTime, Long endTime) {
long maxCount = 0;
SessionPool session = null;
SessionDataSetWrapper dataSetWrapper = null;
try {
session = ioTDBSessionPoolManager.getSessionPool();
// 构建查询语句
String sql = buildQueryResultCountSql(device, points, startTime, endTime);
log.info("getQueryResultCount sql::{}", sql);
// 执行查询
dataSetWrapper = session.executeQueryStatement(sql);
SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();
// 遍历结果集并计数
while (resultSet.hasNext()) {
resultSet.next();
maxCount++;
}
} catch (Exception ex) {
log.error("getQueryResultCount error", ex);
throw new RuntimeException(ex);
} finally {
// 释放资源
if (session != null && dataSetWrapper != null) {
session.closeResultSet(dataSetWrapper);
}
}
return maxCount;
}
/**
* 根据传入的参数构建查询语句
*/
private String buildQueryResultCountSql(String device, List<String> points, Long startTime, Long endTime) {
StringBuilder sqlBuilder = new StringBuilder("SELECT ");
for (int i = 0; i < points.size(); i++) {
sqlBuilder.append(points.get(i));
if (i < points.size() - 1) {
sqlBuilder.append(", ");
}
}
sqlBuilder.append(" FROM ").append(device);
sqlBuilder.append(" WHERE time >= ").append(startTime).append(" AND time <= ").append(endTime);
return sqlBuilder.toString();
}
/**
* 获取多设备指定查询返回的总条数
*
* @param deviceIds 设备的唯一标识符列表
* @param points 要查询的数据点名称列表
* @param startTime 查询的开始时间戳
* @param endTime 查询的结束时间戳
* @return 查询返回的总条数
*/
public long getQueryResultCountForMultipleDevices(List<String> deviceIds, List<String> points, Long startTime, Long endTime) {
long count = 0;
SessionPool session = null;
SessionDataSetWrapper dataSetWrapper = null;
try {
session = ioTDBSessionPoolManager.getSessionPool();
// 构建查询语句
String sql = buildQueryResultForMultipleDevices(deviceIds, points, startTime, endTime);
log.info("getQueryResultCountForMultipleDevices sql::{}", sql);
// 执行查询
dataSetWrapper = session.executeQueryStatement(sql);
SessionDataSet resultSet = dataSetWrapper.getSessionDataSet();
// 遍历结果集并计数
while (resultSet.hasNext()) {
resultSet.next();
count++;
}
} catch (Exception ex) {
log.error("getQueryResultCountForMultipleDevices error", ex);
throw new RuntimeException(ex);
} finally {
// 释放资源
if (session != null && dataSetWrapper != null) {
session.closeResultSet(dataSetWrapper);
}
}
return count;
}
/**
* 根据传入的参数构建多设备查询语句
*/
private String buildQueryResultForMultipleDevices(List<String> deviceIds, List<String> points, Long startTime, Long endTime) {
StringBuilder sqlBuilder = new StringBuilder("SELECT ");
for (int i = 0; i < points.size(); i++) {
sqlBuilder.append(points.get(i));
if (i < points.size() - 1) {
sqlBuilder.append(", ");
}
}
sqlBuilder.append(" FROM ");
for (int i = 0; i < deviceIds.size(); i++) {
sqlBuilder.append(deviceIds.get(i));
if (i < deviceIds.size() - 1) {
sqlBuilder.append(", ");
}
}
sqlBuilder.append(" WHERE time >= ").append(startTime).append(" AND time <= ").append(endTime);
return sqlBuilder.toString();
}
/**
* 根据传入的测量值列表构建查询语句
*/
private static String buildHistoryDataQuery(String device, List<String> points, Long startTime, Long endTime, int offset, int limit) {
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append("select ");
for (int i = 0; i < points.size(); i++) {
queryBuilder.append(points.get(i));
if (i < points.size() - 1) {
queryBuilder.append(", ");
}
}
queryBuilder.append(" from ").append(device);
queryBuilder.append(" where time >= ").append(startTime).append(" and time <= ").append(endTime);
queryBuilder.append(" order by time asc");
queryBuilder.append(" limit ").append(limit).append(" offset ").append(offset);
return queryBuilder.toString();
}
/**
* 根据传入的测量值列表构建查询语句
*/
private static String buildAvgValueQuery(String device, List<String> points, Long startTime, Long endTime) {
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append("select ");
// 为每个测量值构建 AVG 聚合函数
for (int i = 0; i < points.size(); i++) {
queryBuilder.append("avg(").append(points.get(i)).append(")");
if (i < points.size() - 1) {
queryBuilder.append(", ");
}
}
// 指定查询的时间序列路径
queryBuilder.append(" from " + device);
// 时间范围条件(可根据需求调整)
if (Objects.nonNull(startTime) && Objects.nonNull(endTime)) {
queryBuilder.append(" where time >= " + startTime + " and time <= " + endTime);
}
return queryBuilder.toString();
}
/**
* 根据传入的测量值列表构建查询最大值和最小值语句
*/
private static String buildStatisticsValueQuery(String device, List<String> points, Long startTime, Long
endTime) {
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append("select ");
// 为每个测量值构建 MAX 和 MIN 聚合函数
for (int i = 0; i < points.size(); i++) {
queryBuilder.append("max_value(").append(points.get(i)).append("), ").append("min_value(").append(points.get(i)).append(")");
if (i < points.size() - 1) {
queryBuilder.append(", ");
}
}
// 指定查询的时间序列路径
queryBuilder.append(" from " + device);
// 时间范围条件(可根据需求调整)
if (Objects.nonNull(startTime) && Objects.nonNull(endTime)) {
queryBuilder.append(" where time >= " + startTime + " and time <= " + endTime);
}
return queryBuilder.toString();
}
/**
* 根据传入的测量值列表构建求和查询语句
*/
private static String buildSumValueQuery(String device, List<String> points, Long startTime, Long endTime) {
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append("select ");
// 为每个测量值构建 AVG 聚合函数
for (int i = 0; i < points.size(); i++) {
queryBuilder.append("sum(").append(points.get(i)).append(")");
if (i < points.size() - 1) {
queryBuilder.append(", ");
}
}
// 指定查询的时间序列路径
queryBuilder.append(" from " + device);
// 时间范围条件(可根据需求调整)
if (Objects.nonNull(startTime) && Objects.nonNull(endTime)) {
queryBuilder.append(" where time >= " + startTime + " and time <= " + endTime);
}
return queryBuilder.toString();
}
/**
* 根据传入的测量值列表构建求和查询语句
*/
private static String buildHistoryQuery(String device, List<String> points, Long startTime, Long endTime) {
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append("select ");
// 为每个测量值构建 AVG 聚合函数
for (int i = 0; i < points.size(); i++) {
queryBuilder.append("sum(").append(points.get(i)).append(")");
if (i < points.size() - 1) {
queryBuilder.append(", ");
}
}
// 指定查询的时间序列路径
queryBuilder.append(" from " + device);
// 时间范围条件(可根据需求调整)
if (Objects.nonNull(startTime) && Objects.nonNull(endTime)) {
queryBuilder.append(" where time >= " + startTime + " and time <= " + endTime);
}
return queryBuilder.toString();
}
}
7. 数据传输对象(DTO)& 前端展示对象(VO)
定义一系列dto和vo,用于封装请求和响应数据。
@Data
@Builder
public class AvgValueDto {
/**
* 测点
*/
private String point;
/**
* 平均值
*/
private Double val;
}
@Data
public class BatchHistoryValuesDto {
/**
* 历史数据
*/
private List<HistoryDataByDeviceDto> historyDataByDevice;
/**
* 总共多少条数据
*/
private Integer total;
/**
* 总共多少页
*/
private Integer page;
/**
* 当前第几页
*/
private Integer current;
/**
* 页大小
*/
private Integer pageSize;
}
@Data
public class BatchRealtimeValueDto {
String deviceId;
List<RealtimeValueDto> realtimeData;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class HistoryDataByDeviceDto {
/**
* 测点
*/
private String deviceId;
/**
* 平均值
*/
private List<HistoryDataDto> historyData;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class HistoryDataDto {
/**
* 测点
*/
private String point;
/**
* 平均值
*/
private List<HistoryValueDto> values;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class HistoryValueDto {
/**
* 值
*/
private Object val;
/**
* 时间戳
*/
private long timestamp;
}
@Data
public class HistoryValuesDto {
/**
* 历史数据
*
* @param null
*/
private List<HistoryDataDto> historyData;
/**
* 总共多少条数据
*/
private Integer total;
/**
* 总共多少页
*/
private Integer page;
/**
* 当前第几页
*/
private Integer current;
/**
* 页大小
*/
private Integer pageSize;
}
@Data
public class IotDbInsertBeanDto {
/**
* 设备id
*/
private String deviceId;
/**
* 数据json
*/
private String dataJson;
/**
* 测点列表
*/
private List<String> pointArr;
/**
* 数据类型列表
*/
private List<TSDataType> types;
}
public class RealtimeValueDto {
/**
* 时间戳
*/
private long ts;
/**
* 时间序列
*/
private String timeSeries;
/**
* 值
*/
private Object val;
}
@Data
@Builder
public class StatisticsValueDto {
/**
* 测点
*/
private String point;
/**
* 最大值
*/
private String maxVal;
/**
* 最小值
*/
private String minVal;
/**
* 平均值
*/
private Double val;
}
@Data
@Builder
public class SumValueDto {
/**
* 测点
*/
private String point;
/**
* 求和值
*/
private Double val;
}
@Data
public class IotDbBatchHistoryRequestQueryVo {
/**
* 设备完整的树路径
*/
private List<String> deviceIds;
/**
* 测点编码
*/
private List<String> points;
/**
* 开始时间
*/
private Long startTime;
/**
* 结束时间
*/
private Long endTime;
/**
* 分页
*/
private Integer page;
/**
* 页大小
*/
private Integer pageSize;
/**
* (窗口大小)
*/
private Integer windowSize;
/**
* 单位 天/毫秒/秒 d/ms/s
*/
private String interval;
}
@Data
public class IotDbBatchRequestQueryVo {
/**
* 数据库名称
*/
private String db;
/**
* 设备完整的树路径
*/
private List<String> deviceIds;
/**
* 测点编码
*/
private List<String> points;
}
@Data
public class IotDbRequestQueryVo {
/**
* 测点编码
*/
private List<String> points;
/**
* 设备完整的树路径
*/
private String device;
/**
* 数据库名称
*/
private String db;
/**
* 开始时间
*/
private Long startTime;
/**
* 结束时间
*/
private Long endTime;
/**
* 分页
*/
private Integer page;
/**
* 页大小
*/
private Integer pageSize;
/**
* (窗口大小)
*/
private Integer windowSize;
/**
* 单位 天/毫秒/秒 d/ms/s
*/
private String interval;
}
public class ResponseBuilder<T> {
private static final Logger LOG = LoggerFactory.getLogger(ResponseBuilder.class);
private int code;
private String message;
private T data;
private String timestamp;
private String version;
private ResponseBuilder() {
}
public static <R> ResponseVo<R> buildSuccessResponse() {
return new ResponseVo<R>(ResponseCode.SUCCESS.code, ResponseCode.SUCCESS.desc);
}
public static <R> ResponseVo<R> buildSuccessResponse(R data) {
return new ResponseVo<>(ResponseCode.SUCCESS.code, ResponseCode.SUCCESS.desc, data);
}
public static <R> ResponseVo<R> buildSuccessResponse(ResponseCode respCode, R data, String serverVersion) {
return ResponseBuilder.<R>newBuilder().code(respCode.code)
.message(respCode.desc)
.data(data)
.version(serverVersion)
.build();
}
public static <R> ResponseVo<R> buildErrorResponse() {
return new ResponseVo<>(ResponseCode.UNEXPECTED_ERROR.code, ResponseCode.UNEXPECTED_ERROR.desc);
}
public static <R> ResponseVo<R> buildErrorResponse(ResponseCode respCode) {
return new ResponseVo<>(respCode.code, respCode.desc);
}
public static <R> ResponseVo<R> buildErrorResponse(String message) {
return new ResponseVo<>(ResponseCode.UNEXPECTED_ERROR.code, message);
}
public static <R> ResponseVo<R> buildErrorResponse(int respCode, String message) {
if (respCode == ResponseCode.SUCCESS.getCode()) {
LOG.warn("error code match failed respCode:{}", respCode);
//纠正过来 避免使用合作方的错误码 造成歧义
respCode = ResponseCode.FAILURE.getCode();
}
return new ResponseVo<>(respCode, message);
}
public static <R> ResponseVo<R> buildResponse(int respCode, String message) {
return new ResponseVo<>(respCode, message);
}
public static <R> ResponseVo<R> buildErrorResponse(int respCode, String message, R data) {
return new ResponseVo<R>(respCode, message, data);
}
public static <R> ResponseBuilder<R> newBuilder() {
return new ResponseBuilder<R>();
}
public ResponseVo<T> build() {
return new ResponseVo<>(this);
}
public ResponseBuilder<T> code(int code) {
this.code = code;
return this;
}
public ResponseBuilder<T> message(String message) {
this.message = message;
return this;
}
public ResponseBuilder<T> data(T data) {
this.data = data;
return this;
}
public ResponseBuilder<T> timestamp(String timestamp) {
this.timestamp = timestamp;
return this;
}
public ResponseBuilder<T> version(String version) {
this.version = version;
return this;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
}
public enum ResponseCode {
/**
* 未知错误(各类异常抛出时指定)
*/
UNEXPECTED_ERROR(0, "Unexpected error"),
/**
* 请求成功
*/
SUCCESS(1, "Successful"),
/**
* 处理错误
*/
FAILURE(2, "Failure"),
/**
* 缺失参数、参数类型错误等
*/
PARAMS_ERROR(3, "Params error"),
;
public final int code;
public final String desc;
ResponseCode(int code, String desc) {
this.code = code;
this.desc = desc;
}
public int getCode() {
return code;
}
public String getDesc() {
return desc;
}
}
public class ResponseVo<T> {
private int code;
private String message;
private T data;
private String timestamp = String.valueOf(System.currentTimeMillis());
public ResponseVo() {
}
ResponseVo(ResponseBuilder<T> builder) {
this.code = builder.getCode();
this.message = builder.getMessage();
this.data = builder.getData();
this.timestamp = builder.getTimestamp();
}
public ResponseVo(int code, String message) {
this.code = code;
this.message = message;
}
public ResponseVo(int code, String message, T data) {
this.code = code;
this.message = message;
this.data = data;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
public boolean isSuccess() {
return this.code == ResponseCode.SUCCESS.getCode();
}
@Override
public int hashCode() {
return Objects.hash(this.code, this.message, this.data, this.timestamp);
}
@Override
public boolean equals(Object obj) {
return super.equals(obj);
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("code", code)
.append("message", message)
.append("data", data)
.append("timestamp", timestamp)
.toString();
}
}
8. 控制器层
创建 IotDBApiController
,提供 RESTful API 接口,用于与前端或其他服务交互。例如,查询实时数据的接口如下:
@RestController
@Slf4j
@RequestMapping("/api")
public class IotDBApiController {
//http://localhost:9091/data-core/api/test
@Autowired
IotDBService iotDBService;
@PostMapping(value = "/realtime", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseVo<?> realtime(@RequestBody IotDbRequestQueryVo request) {
log.info("获取点码实时数据::{}", request);
try {
Asserts.notNull(request, "参数为空");
Asserts.notNull(request.getDb(), "数据库为空");
Asserts.notNull(request.getDevice(), "设备路径为空");
Asserts.notNull(request.getPoints(), "测点编码为空");
return ResponseBuilder.buildSuccessResponse(iotDBService.realtime(request.getDb(), request.getDevice(), request.getPoints()));
} catch (Exception e) {
return ResponseBuilder.buildErrorResponse(e.getMessage());
}
}
@PostMapping(value = "/avgValue", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseVo<?> avgValue(@RequestBody IotDbRequestQueryVo request) {
log.info("获取点码平均数据::{}", request);
try {
Asserts.notNull(request, "参数为空");
Asserts.notNull(request.getDb(), "数据库为空");
Asserts.notNull(request.getDevice(), "设备路径为空");
Asserts.notNull(request.getPoints(), "测点编码为空");
return ResponseBuilder.buildSuccessResponse(iotDBService.avgValue(request.getDevice(), request.getPoints(), request.getStartTime(), request.getEndTime()));
} catch (Exception e) {
return ResponseBuilder.buildErrorResponse(e.getMessage());
}
}
@PostMapping(value = "/statisticsValue", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseVo<?> statisticsValue(@RequestBody IotDbRequestQueryVo request) {
log.info("获取点码最大最小值数据::{}", request);
try {
Asserts.notNull(request, "参数为空");
Asserts.notNull(request.getDevice(), "设备路径为空");
Asserts.notNull(request.getPoints(), "测点编码为空");
return ResponseBuilder.buildSuccessResponse(iotDBService.statisticsValue(request.getDevice(), request.getPoints(), request.getStartTime(), request.getEndTime()));
} catch (Exception e) {
return ResponseBuilder.buildErrorResponse(e.getMessage());
}
}
@PostMapping(value = "/sumValue", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseVo<?> sumValue(@RequestBody IotDbRequestQueryVo request) {
log.info("获取点码求和数据::{}", request);
try {
Asserts.notNull(request, "参数为空");
Asserts.notNull(request.getDevice(), "设备路径为空");
Asserts.notNull(request.getPoints(), "测点编码为空");
return ResponseBuilder.buildSuccessResponse(iotDBService.sumValue(request.getDevice(), request.getPoints(), request.getStartTime(), request.getEndTime()));
} catch (Exception e) {
return ResponseBuilder.buildErrorResponse(e.getMessage());
}
}
@PostMapping(value = "/history", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseVo<?> history(@RequestBody IotDbRequestQueryVo request) {
log.info("获取点码历史数据::{}", request);
try {
Asserts.notNull(request, "参数为空");
Asserts.notNull(request.getDevice(), "设备路径为空");
Asserts.notNull(request.getPoints(), "测点编码为空");
Asserts.notNull(request.getStartTime(), "开始时间为空");
Asserts.notNull(request.getEndTime(), "结束时间为空");
Asserts.notNull(request.getPage(), "分页为空");
Asserts.notNull(request.getPageSize(), "页大小为空");
return ResponseBuilder.buildSuccessResponse(iotDBService.historyData(request.getDevice(), request.getPoints(), request.getPage(), request.getPageSize(), request.getStartTime(), request.getEndTime()));
} catch (Exception e) {
return ResponseBuilder.buildErrorResponse(e.getMessage());
}
}
@PostMapping(value = "/windowAgg", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseVo<?> windowAgg(@RequestBody IotDbRequestQueryVo request) {
log.info("根据时间窗口按照时间间隔获取点码历史数据::{}", request);
try {
Asserts.notNull(request, "参数为空");
Asserts.notNull(request.getDevice(), "设备路径为空");
Asserts.notNull(request.getPoints(), "测点编码为空");
Asserts.notNull(request.getStartTime(), "开始时间为空");
Asserts.notNull(request.getEndTime(), "结束时间为空");
Asserts.notNull(request.getPage(), "分页为空");
Asserts.notNull(request.getPageSize(), "页大小为空");
return ResponseBuilder.buildSuccessResponse(iotDBService.windowAggHistoryData(request.getDevice(), request.getPoints(), request.getPage(), request.getPageSize(), request.getStartTime(), request.getEndTime(), request.getWindowSize(), request.getInterval()));
} catch (Exception e) {
return ResponseBuilder.buildErrorResponse(e.getMessage());
}
}
@PostMapping(value = "/batchInsert", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseVo<?> batchInsert(@RequestBody IotDbInsertBeanDto insertBean) {
// log.info("批量插入数据::{}", insertBean);
try {
Asserts.notNull(insertBean, "参数为空");
Asserts.notNull(insertBean.getDeviceId(), "设备路径为空");
Asserts.notNull(insertBean.getPointArr(), "测点编码为空");
Asserts.notNull(insertBean.getTypes(), "数据类型为空");
//成功插入的条数
int total = iotDBService.insertData(insertBean.getDeviceId(), insertBean.getPointArr(), insertBean.getTypes(), insertBean.getDataJson());
return ResponseBuilder.buildSuccessResponse(total);
} catch (Exception e) {
return ResponseBuilder.buildErrorResponse(e.getMessage());
}
}
@PostMapping(value = "/batch/realtime", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseVo<?> batchRealtime(@RequestBody IotDbBatchRequestQueryVo request) {
log.info("获取点码实时数据::{}", request);
try {
Asserts.notNull(request, "参数为空");
Asserts.notNull(request.getDeviceIds(), "设备路径为空");
Asserts.notNull(request.getPoints(), "测点编码为空");
return ResponseBuilder.buildSuccessResponse(iotDBService.batchRealtime(request.getDeviceIds(), request.getPoints()));
} catch (Exception e) {
return ResponseBuilder.buildErrorResponse(e.getMessage());
}
}
@PostMapping(value = "/batch/history", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseVo<?> batchHistory(@RequestBody IotDbBatchHistoryRequestQueryVo request) {
log.info("获取点码历史数据::{}", request);
try {
Asserts.notNull(request, "参数为空");
Asserts.notNull(request.getDeviceIds(), "设备路径为空");
Asserts.notNull(request.getPoints(), "测点编码为空");
Asserts.notNull(request.getStartTime(), "开始时间为空");
Asserts.notNull(request.getEndTime(), "结束时间为空");
Asserts.notNull(request.getPage(), "分页为空");
Asserts.notNull(request.getPageSize(), "页大小为空");
return ResponseBuilder.buildSuccessResponse(iotDBService.batchHistoryData(request.getDeviceIds(), request.getPoints(), request.getPage(), request.getPageSize(), request.getStartTime(), request.getEndTime()));
} catch (Exception e) {
return ResponseBuilder.buildErrorResponse(e.getMessage());
}
}
@PostMapping(value = "/batch/windowAgg", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
public ResponseVo<?> batchWindowAgg(@RequestBody IotDbBatchHistoryRequestQueryVo request) {
log.info("多设备多测点根据时间窗口按照时间间隔获取点码历史数据::{}", request);
try {
Asserts.notNull(request, "参数为空");
Asserts.notNull(request.getDeviceIds(), "设备路径为空");
Asserts.notNull(request.getPoints(), "测点编码为空");
Asserts.notNull(request.getStartTime(), "开始时间为空");
Asserts.notNull(request.getEndTime(), "结束时间为空");
Asserts.notNull(request.getPage(), "分页为空");
Asserts.notNull(request.getPageSize(), "页大小为空");
return ResponseBuilder.buildSuccessResponse(iotDBService.batchWindowAggHistoryData(request.getDeviceIds(), request.getPoints(), request.getPage(), request.getPageSize(), request.getStartTime(), request.getEndTime(), request.getWindowSize(), request.getInterval()));
} catch (Exception e) {
return ResponseBuilder.buildErrorResponse(e.getMessage());
}
}
}
9. 总结
通过本文介绍的 Spring Boot 和 IoTDB 集成方法,我们可以轻松实现高效时序数据的管理。IoTDB 的高性能特性和 Spring Boot 的灵活架构相结合,为物联网和工业互联网应用提供了强大的数据处理能力。
作者:stay_hungry&&modest