diff --git a/viewsh-dependencies/pom.xml b/viewsh-dependencies/pom.xml index 56eb78ce..14e760c7 100644 --- a/viewsh-dependencies/pom.xml +++ b/viewsh-dependencies/pom.xml @@ -35,6 +35,7 @@ 8.6.0 5.1.0 3.7.9 + 7.2.0 2.3.5 @@ -305,6 +306,11 @@ taos-jdbcdriver ${taos.version} + + com.influxdb + influxdb-client-java + ${influxdb-client.version} + diff --git a/viewsh-module-iot/viewsh-module-iot-server/pom.xml b/viewsh-module-iot/viewsh-module-iot-server/pom.xml index 3d84f0a9..af21b27a 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/pom.xml +++ b/viewsh-module-iot/viewsh-module-iot-server/pom.xml @@ -69,6 +69,11 @@ com.taosdata.jdbc taos-jdbcdriver + + com.influxdb + influxdb-client-java + true + com.viewsh diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/controller/admin/device/IotDeviceMessageController.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/controller/admin/device/IotDeviceMessageController.java index 489ba5ad..c2155f57 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/controller/admin/device/IotDeviceMessageController.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/controller/admin/device/IotDeviceMessageController.java @@ -10,7 +10,7 @@ import com.viewsh.module.iot.controller.admin.device.vo.message.IotDeviceMessage import com.viewsh.module.iot.controller.admin.device.vo.message.IotDeviceMessageSendReqVO; import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; import com.viewsh.module.iot.dal.dataobject.device.IotDeviceMessageDO; -import com.viewsh.module.iot.dal.tdengine.IotDeviceMessageMapper; + import com.viewsh.module.iot.service.device.IotDeviceService; import com.viewsh.module.iot.service.device.message.IotDeviceMessageService; import com.viewsh.module.iot.service.thingmodel.IotThingModelService; @@ -41,8 +41,6 @@ public class IotDeviceMessageController { private IotDeviceService deviceService; @Resource private IotThingModelService thingModelService; - @Resource - private IotDeviceMessageMapper deviceMessageMapper; @GetMapping("/page") @Operation(summary = "获得设备消息分页") diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tdengine/config/TDengineTableInitRunner.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tdengine/config/TDengineTableInitRunner.java deleted file mode 100644 index d0ac7d7d..00000000 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tdengine/config/TDengineTableInitRunner.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.viewsh.module.iot.framework.tdengine.config; - -import com.viewsh.module.iot.service.device.message.IotDeviceMessageService; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.stereotype.Component; - -/** - * TDengine 表初始化的 Configuration - * - * @author alwayssuper - */ -@Component -@RequiredArgsConstructor -@Slf4j -public class TDengineTableInitRunner implements ApplicationRunner { - - private final IotDeviceMessageService deviceMessageService; - - @Override - public void run(ApplicationArguments args) { - try { - // 初始化设备消息表 - deviceMessageService.defineDeviceMessageStable(); - } catch (Exception ex) { - // 初始化失败时打印错误消息并退出系统 - log.error("[run][TDengine初始化设备消息表结构失败,系统无法正常运行,即将退出]", ex); - System.exit(1); - } - } - -} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/HourlyMessageCountDTO.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/HourlyMessageCountDTO.java new file mode 100644 index 00000000..c3fa21ce --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/HourlyMessageCountDTO.java @@ -0,0 +1,35 @@ +package com.viewsh.module.iot.framework.tsdb; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +/** + * 按小时统计的设备消息计数 DTO + * + * 替代 {@code Map},解决 TDengine 返回 {@code java.sql.Timestamp} + * 而 CTSDB 返回 {@code java.time.Instant} 导致 Service 层强转 ClassCastException 的问题 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class HourlyMessageCountDTO { + + /** + * 小时时间窗口的起始时间 + */ + private LocalDateTime time; + + /** + * 上行消息数量 + */ + private Integer upstreamCount; + + /** + * 下行消息数量 + */ + private Integer downstreamCount; + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/IotTsDbDeviceMessageDao.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/IotTsDbDeviceMessageDao.java new file mode 100644 index 00000000..3ac9d2e2 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/IotTsDbDeviceMessageDao.java @@ -0,0 +1,72 @@ +package com.viewsh.module.iot.framework.tsdb; + +import com.viewsh.module.iot.controller.admin.device.vo.message.IotDeviceMessagePageReqVO; +import com.viewsh.module.iot.dal.dataobject.device.IotDeviceMessageDO; +import com.baomidou.mybatisplus.core.metadata.IPage; + +import java.util.Collection; +import java.util.List; + +/** + * 设备消息时序数据访问接口 + * + * 抽象不同时序数据库(TDengine、CTSDB/InfluxDB)的设备消息操作 + */ +public interface IotTsDbDeviceMessageDao { + + /** + * 创建设备消息的 Schema(超级表/Measurement) + */ + void createSchema(); + + /** + * 检查设备消息的 Schema 是否存在 + * + * @return 是否存在 + */ + boolean schemaExists(); + + /** + * 插入设备消息数据 + * + * @param message 设备消息数据 + */ + void insert(IotDeviceMessageDO message); + + /** + * 获得设备消息分页 + * + * @param page 分页参数 + * @param reqVO 查询条件 + * @return 分页结果 + */ + IPage selectPage(IPage page, IotDeviceMessagePageReqVO reqVO); + + /** + * 统计设备消息数量 + * + * @param createTime 创建时间(毫秒时间戳),如果为空,则统计所有消息数量 + * @return 消息数量 + */ + Long selectCountByCreateTime(Long createTime); + + /** + * 按照 requestIds 批量查询消息 + * + * @param deviceId 设备编号 + * @param requestIds 请求编号集合 + * @param reply 是否回复消息 + * @return 消息列表 + */ + List selectListByRequestIdsAndReply(Long deviceId, Collection requestIds, Boolean reply); + + /** + * 按照时间范围(小时),统计设备的消息数量 + * + * @param startTime 开始时间(毫秒时间戳) + * @param endTime 结束时间(毫秒时间戳) + * @return 按小时分组的统计数据 + */ + List selectDeviceMessageCountGroupByDate(Long startTime, Long endTime); + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/IotTsDbDevicePropertyDao.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/IotTsDbDevicePropertyDao.java new file mode 100644 index 00000000..bebddcc1 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/IotTsDbDevicePropertyDao.java @@ -0,0 +1,61 @@ +package com.viewsh.module.iot.framework.tsdb; + +import com.viewsh.module.iot.controller.admin.device.vo.property.IotDevicePropertyHistoryListReqVO; +import com.viewsh.module.iot.controller.admin.device.vo.property.IotDevicePropertyRespVO; +import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO; + +import java.util.List; +import java.util.Map; + +/** + * 设备属性时序数据访问接口 + * + * 抽象不同时序数据库(TDengine、CTSDB/InfluxDB)的设备属性操作 + */ +public interface IotTsDbDevicePropertyDao { + + /** + * 获取产品属性表的字段列表 + * + * @param productId 产品编号 + * @return 字段列表 + */ + List getTableFields(Long productId); + + /** + * 创建产品属性表 + * + * @param productId 产品编号 + * @param fields 字段列表 + */ + void createPropertyTable(Long productId, List fields); + + /** + * 变更产品属性表结构(对比旧字段和新字段,自动执行 add/modify/drop) + * + * @param productId 产品编号 + * @param oldFields 旧字段列表 + * @param newFields 新字段列表 + */ + void alterPropertyTable(Long productId, List oldFields, List newFields); + + /** + * 插入设备属性数据 + * + * @param device 设备 + * @param properties 属性数据 + * @param reportTime 上报时间(毫秒时间戳) + */ + void insert(IotDeviceDO device, Map properties, Long reportTime); + + /** + * 查询设备属性历史数据 + * + * @param reqVO 查询条件 + * @param productId 产品编号,CTSDB 实现需要此参数精确定位 measurement, + * TDengine 实现可忽略(通过子表名定位) + * @return 历史数据列表 + */ + List selectHistory(IotDevicePropertyHistoryListReqVO reqVO, Long productId); + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/TsDbTableField.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/TsDbTableField.java new file mode 100644 index 00000000..d96bcdf5 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/TsDbTableField.java @@ -0,0 +1,57 @@ +package com.viewsh.module.iot.framework.tsdb; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 时序数据库通用字段定义 + * + * 用于抽象不同时序数据库(TDengine、CTSDB/InfluxDB)的表字段描述 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TsDbTableField { + + // ========== 通用类型常量 ========== + + public static final String TYPE_TINYINT = "TINYINT"; + public static final String TYPE_INT = "INT"; + public static final String TYPE_FLOAT = "FLOAT"; + public static final String TYPE_DOUBLE = "DOUBLE"; + public static final String TYPE_BOOL = "BOOL"; + public static final String TYPE_STRING = "STRING"; + public static final String TYPE_TIMESTAMP = "TIMESTAMP"; + + /** + * 字段长度 - STRING 默认长度 + */ + public static final int LENGTH_DEFAULT_STRING = 1024; + + /** + * 字段名 + */ + private String field; + + /** + * 字段类型(通用类型) + */ + private String type; + + /** + * 字段长度 + */ + private Integer length; + + /** + * 是否为 TAG 字段 + */ + private boolean tag; + + public TsDbTableField(String field, String type) { + this.field = field; + this.type = type; + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/config/TsDbAutoConfiguration.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/config/TsDbAutoConfiguration.java new file mode 100644 index 00000000..cc0f831e --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/config/TsDbAutoConfiguration.java @@ -0,0 +1,84 @@ +package com.viewsh.module.iot.framework.tsdb.config; + +import com.viewsh.module.iot.dal.tdengine.IotDeviceMessageMapper; +import com.viewsh.module.iot.dal.tdengine.IotDevicePropertyMapper; +import com.viewsh.module.iot.framework.tsdb.IotTsDbDeviceMessageDao; +import com.viewsh.module.iot.framework.tsdb.IotTsDbDevicePropertyDao; +import com.viewsh.module.iot.framework.tsdb.tdengine.TDengineDeviceMessageDaoImpl; +import com.viewsh.module.iot.framework.tsdb.tdengine.TDengineDevicePropertyDaoImpl; +import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * 时序数据库自动装配 + * + * 根据配置 viewsh.iot.tsdb.type 选择 TDengine 或 CTSDB 实现 + */ +@Configuration +@Slf4j +public class TsDbAutoConfiguration { + + /** + * 配置校验:确保 viewsh.iot.tsdb.type 的值合法,避免拼写错误导致两种实现都未生效 + */ + @Value("${viewsh.iot.tsdb.type:tdengine}") + private String tsDbType; + + @PostConstruct + public void validateTsDbType() { + if (!"tdengine".equalsIgnoreCase(tsDbType) && !"ctsdb".equalsIgnoreCase(tsDbType)) { + throw new IllegalArgumentException( + String.format("无效的时序数据库类型: '%s',仅支持 'tdengine' 或 'ctsdb'," + + "请检查配置项 viewsh.iot.tsdb.type", tsDbType)); + } + log.info("[validateTsDbType][时序数据库类型: {}]", tsDbType); + } + + // ========== TDengine 实现 ========== + + @Bean + @ConditionalOnProperty(name = "viewsh.iot.tsdb.type", havingValue = "tdengine", matchIfMissing = true) + public IotTsDbDeviceMessageDao tdengineDeviceMessageDao(IotDeviceMessageMapper mapper) { + return new TDengineDeviceMessageDaoImpl(mapper); + } + + @Bean + @ConditionalOnProperty(name = "viewsh.iot.tsdb.type", havingValue = "tdengine", matchIfMissing = true) + public IotTsDbDevicePropertyDao tdengineDevicePropertyDao(IotDevicePropertyMapper mapper) { + return new TDengineDevicePropertyDaoImpl(mapper); + } + + // ========== CTSDB (InfluxDB) 实现 ========== + // 使用内部类 + @ConditionalOnClass 保护,避免 classpath 无 InfluxDB 依赖时 NoClassDefFoundError + + @Configuration + @ConditionalOnClass(name = "com.influxdb.client.InfluxDBClient") + @ConditionalOnProperty(name = "viewsh.iot.tsdb.type", havingValue = "ctsdb") + static class CtsdbDaoConfiguration { + + @Bean + public IotTsDbDeviceMessageDao ctsdbDeviceMessageDao( + com.influxdb.client.InfluxDBClient influxDBClient, + com.viewsh.module.iot.framework.tsdb.ctsdb.CtsdbProperties properties, + com.influxdb.client.WriteApi ctsdbWriteApi) { + return new com.viewsh.module.iot.framework.tsdb.ctsdb.CtsdbDeviceMessageDaoImpl( + influxDBClient, properties, ctsdbWriteApi); + } + + @Bean + public IotTsDbDevicePropertyDao ctsdbDevicePropertyDao( + com.influxdb.client.InfluxDBClient influxDBClient, + com.viewsh.module.iot.framework.tsdb.ctsdb.CtsdbProperties properties, + com.influxdb.client.WriteApi ctsdbWriteApi) { + return new com.viewsh.module.iot.framework.tsdb.ctsdb.CtsdbDevicePropertyDaoImpl( + influxDBClient, properties, ctsdbWriteApi); + } + + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/config/TsDbTableInitRunner.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/config/TsDbTableInitRunner.java new file mode 100644 index 00000000..7141afbd --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/config/TsDbTableInitRunner.java @@ -0,0 +1,55 @@ +package com.viewsh.module.iot.framework.tsdb.config; + +import com.viewsh.module.iot.service.device.message.IotDeviceMessageService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +/** + * 时序数据库表初始化 Runner + * + * 替代原 TDengineTableInitRunner,支持 TDengine 和 CTSDB 两种时序数据库。 + * 初始化失败时重试最多 3 次(间隔 5 秒),全部失败后抛出异常让 Spring 容器启动失败, + * 而非直接 System.exit(1),以便容器编排工具(如 K8s)能正确感知启动失败并触发重启。 + */ +@Component +@RequiredArgsConstructor +@Slf4j +public class TsDbTableInitRunner implements ApplicationRunner { + + private static final int MAX_RETRIES = 3; + private static final long RETRY_INTERVAL_MS = 5000; + + private final IotDeviceMessageService deviceMessageService; + + @Override + public void run(ApplicationArguments args) { + Exception lastException = null; + for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) { + try { + // 初始化设备消息表(TDengine 会创建超级表,CTSDB 会跳过) + deviceMessageService.defineDeviceMessageStable(); + log.info("[run][时序数据库初始化设备消息表结构成功]"); + return; + } catch (Exception ex) { + lastException = ex; + log.warn("[run][时序数据库初始化失败,第 {}/{} 次尝试]", attempt, MAX_RETRIES, ex); + if (attempt < MAX_RETRIES) { + try { + Thread.sleep(RETRY_INTERVAL_MS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + } + } + // 重试全部失败,抛出异常让 Spring 容器启动失败 + throw new IllegalStateException( + "时序数据库初始化设备消息表结构失败(已重试 " + MAX_RETRIES + " 次),系统无法正常运行", + lastException); + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/ctsdb/CtsdbClientConfig.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/ctsdb/CtsdbClientConfig.java new file mode 100644 index 00000000..dc0eab57 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/ctsdb/CtsdbClientConfig.java @@ -0,0 +1,55 @@ +package com.viewsh.module.iot.framework.tsdb.ctsdb; + +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.InfluxDBClientFactory; +import com.influxdb.client.WriteApi; +import com.influxdb.client.WriteOptions; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * CTSDB (InfluxDB) 客户端配置 + * + * 添加 @ConditionalOnClass 保护,避免 classpath 无 InfluxDB 依赖时类加载失败 + */ +@Configuration +@ConditionalOnClass(name = "com.influxdb.client.InfluxDBClient") +@ConditionalOnProperty(name = "viewsh.iot.tsdb.type", havingValue = "ctsdb") +@EnableConfigurationProperties(CtsdbProperties.class) +public class CtsdbClientConfig { + + @Bean(destroyMethod = "close") + public InfluxDBClient influxDBClient(CtsdbProperties properties) { + return InfluxDBClientFactory.create( + properties.getUrl(), + properties.getToken().toCharArray(), + properties.getOrg(), + properties.getBucket() + ); + } + + /** + * 异步批量写入 API,适合 IoT 高频写入场景 + * + * - batchSize: 累计 1000 个 Point 后批量刷写 + * - flushInterval: 每 1 秒自动刷写一次(即使未达 batchSize) + * - retryInterval: 写入失败后 2 秒重试 + * - maxRetries: 最多重试 3 次 + * + * 注意:WriteApi 实现了 AutoCloseable,destroyMethod="close" 确保应用关闭时数据全部刷写 + */ + @Bean(destroyMethod = "close") + public WriteApi ctsdbWriteApi(InfluxDBClient influxDBClient) { + WriteOptions writeOptions = WriteOptions.builder() + .batchSize(1000) + .flushInterval(1000) + .retryInterval(2000) + .maxRetries(3) + .build(); + return influxDBClient.makeWriteApi(writeOptions); + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/ctsdb/CtsdbDeviceMessageDaoImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/ctsdb/CtsdbDeviceMessageDaoImpl.java new file mode 100644 index 00000000..c12afcd6 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/ctsdb/CtsdbDeviceMessageDaoImpl.java @@ -0,0 +1,445 @@ +package com.viewsh.module.iot.framework.tsdb.ctsdb; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.date.LocalDateTimeUtil; +import cn.hutool.core.util.StrUtil; +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.QueryApi; +import com.influxdb.client.WriteApi; +import com.influxdb.client.domain.WritePrecision; +import com.influxdb.client.write.Point; +import com.influxdb.query.FluxRecord; +import com.influxdb.query.FluxTable; +import com.viewsh.framework.tenant.core.context.TenantContextHolder; +import com.viewsh.module.iot.controller.admin.device.vo.message.IotDeviceMessagePageReqVO; +import com.viewsh.module.iot.dal.dataobject.device.IotDeviceMessageDO; +import com.viewsh.module.iot.framework.tsdb.HourlyMessageCountDTO; +import com.viewsh.module.iot.framework.tsdb.IotTsDbDeviceMessageDao; +import com.baomidou.mybatisplus.core.metadata.IPage; +import lombok.extern.slf4j.Slf4j; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.*; + +import static com.viewsh.module.iot.framework.tsdb.ctsdb.FluxQuerySanitizer.escapeStringLiteral; + +/** + * CTSDB (InfluxDB) 实现的设备消息时序 DAO + * + * 变更说明: + * - Flux 查询字符串拼接全部使用 FluxQuerySanitizer 转义,防止注入 + * - insert() 改用异步批量 WriteApi,提升高频写入性能 + * - selectPage()/countMessages() 默认最近 30 天,避免 range(start:0) 全表扫描 + * - 查询方法添加 tenant_id 过滤,保障多租户数据隔离 + * - selectDeviceMessageCountGroupByDate() 返回 HourlyMessageCountDTO,消除类型转换问题 + */ +@Slf4j +public class CtsdbDeviceMessageDaoImpl implements IotTsDbDeviceMessageDao { + + private static final String MEASUREMENT = "device_message"; + + private final InfluxDBClient influxDBClient; + private final CtsdbProperties properties; + /** + * 异步批量写入 API,由 CtsdbClientConfig 创建。 + * 相比 WriteApiBlocking,支持自动批量刷写和失败重试,适合 IoT 高频写入场景。 + * 可为 null(向下兼容旧构造函数),此时回退到同步写入。 + */ + private final WriteApi writeApi; + + /** + * 向下兼容构造函数(不使用异步写入) + */ + public CtsdbDeviceMessageDaoImpl(InfluxDBClient influxDBClient, CtsdbProperties properties) { + this(influxDBClient, properties, null); + } + + public CtsdbDeviceMessageDaoImpl(InfluxDBClient influxDBClient, CtsdbProperties properties, WriteApi writeApi) { + this.influxDBClient = influxDBClient; + this.properties = properties; + this.writeApi = writeApi; + } + + @Override + public void createSchema() { + // InfluxDB 是 schema-on-write,无需预建表 + log.info("[createSchema][CTSDB/InfluxDB schema-on-write,跳过建表]"); + } + + @Override + public boolean schemaExists() { + try { + String flux = String.format( + "import \"influxdata/influxdb/schema\"\n" + + "schema.measurements(bucket: \"%s\")", + escapeStringLiteral(properties.getBucket())); + QueryApi queryApi = influxDBClient.getQueryApi(); + List tables = queryApi.query(flux); + if (CollUtil.isEmpty(tables)) { + return false; + } + return tables.get(0).getRecords().stream() + .anyMatch(r -> MEASUREMENT.equals(r.getValueByKey("_value"))); + } catch (Exception e) { + log.warn("[schemaExists][查询 measurement 失败,视为不存在]", e); + return false; + } + } + + @Override + public void insert(IotDeviceMessageDO message) { + Point point = Point.measurement(MEASUREMENT) + .addTag("device_id", String.valueOf(message.getDeviceId())) + .time(message.getTs() != null ? message.getTs() : System.currentTimeMillis(), + WritePrecision.MS); + + // 添加 fields + if (message.getId() != null) { + point.addField("id", message.getId()); + } + if (message.getReportTime() != null) { + point.addField("report_time", message.getReportTime()); + } + if (message.getTenantId() != null) { + point.addField("tenant_id", message.getTenantId()); + } + if (message.getServerId() != null) { + point.addField("server_id", message.getServerId()); + } + if (message.getUpstream() != null) { + point.addField("upstream", message.getUpstream()); + } + if (message.getReply() != null) { + point.addField("reply", message.getReply()); + } + if (message.getIdentifier() != null) { + point.addField("identifier", message.getIdentifier()); + } + if (message.getRequestId() != null) { + point.addField("request_id", message.getRequestId()); + } + if (message.getMethod() != null) { + point.addField("method", message.getMethod()); + } + if (message.getParams() != null) { + point.addField("params", String.valueOf(message.getParams())); + } + if (message.getData() != null) { + point.addField("data", String.valueOf(message.getData())); + } + if (message.getCode() != null) { + point.addField("code", message.getCode()); + } + if (message.getMsg() != null) { + point.addField("msg", message.getMsg()); + } + + // 优先使用异步批量写入 API,回退到同步写入 + if (writeApi != null) { + writeApi.writePoint(point); + } else { + influxDBClient.getWriteApiBlocking().writePoint(point); + } + } + + @Override + public IPage selectPage(IPage page, IotDeviceMessagePageReqVO reqVO) { + StringBuilder flux = new StringBuilder(); + flux.append(String.format("from(bucket: \"%s\")\n", escapeStringLiteral(properties.getBucket()))); + + // 修复全表扫描:优先使用调用方传入的时间范围,否则默认最近 30 天 + appendTimeRange(flux, reqVO.getTimes()); + + flux.append(String.format(" |> filter(fn: (r) => r._measurement == \"%s\")\n", MEASUREMENT)); + flux.append(String.format(" |> filter(fn: (r) => r.device_id == \"%s\")\n", + Long.toString(reqVO.getDeviceId()))); + + // pivot 使每行包含所有 field + flux.append(" |> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")\n"); + + // 多租户过滤:确保查询结果只返回当前租户的数据 + appendTenantFilter(flux); + + // 可选过滤条件(使用转义防止注入) + if (StrUtil.isNotEmpty(reqVO.getMethod())) { + flux.append(String.format(" |> filter(fn: (r) => r.method == \"%s\")\n", + escapeStringLiteral(reqVO.getMethod()))); + } + if (reqVO.getUpstream() != null) { + flux.append(String.format(" |> filter(fn: (r) => r.upstream == %s)\n", reqVO.getUpstream())); + } + if (reqVO.getReply() != null) { + flux.append(String.format(" |> filter(fn: (r) => r.reply == %s)\n", reqVO.getReply())); + } + if (StrUtil.isNotEmpty(reqVO.getIdentifier())) { + flux.append(String.format(" |> filter(fn: (r) => r.identifier == \"%s\")\n", + escapeStringLiteral(reqVO.getIdentifier()))); + } + + // 排序和分页 + flux.append(" |> sort(columns: [\"_time\"], desc: true)\n"); + long offset = (page.getCurrent() - 1) * page.getSize(); + flux.append(String.format(" |> limit(n: %d, offset: %d)\n", page.getSize(), offset)); + + try { + QueryApi queryApi = influxDBClient.getQueryApi(); + List tables = queryApi.query(flux.toString()); + List records = new ArrayList<>(); + for (FluxTable table : tables) { + for (FluxRecord record : table.getRecords()) { + records.add(mapToDeviceMessageDO(record)); + } + } + page.setRecords(records); + page.setTotal(countMessages(reqVO)); + } catch (Exception e) { + log.error("[selectPage][查询设备消息分页失败]", e); + page.setRecords(Collections.emptyList()); + page.setTotal(0); + } + return page; + } + + private long countMessages(IotDeviceMessagePageReqVO reqVO) { + StringBuilder flux = new StringBuilder(); + flux.append(String.format("from(bucket: \"%s\")\n", escapeStringLiteral(properties.getBucket()))); + + // count 查询也使用时间范围,避免全表扫描 + appendTimeRange(flux, reqVO.getTimes()); + + flux.append(String.format(" |> filter(fn: (r) => r._measurement == \"%s\")\n", MEASUREMENT)); + flux.append(String.format(" |> filter(fn: (r) => r.device_id == \"%s\")\n", + Long.toString(reqVO.getDeviceId()))); + flux.append(" |> filter(fn: (r) => r._field == \"id\")\n"); + flux.append(" |> count()\n"); + + try { + QueryApi queryApi = influxDBClient.getQueryApi(); + List tables = queryApi.query(flux.toString()); + if (CollUtil.isNotEmpty(tables) && CollUtil.isNotEmpty(tables.get(0).getRecords())) { + Object value = tables.get(0).getRecords().get(0).getValue(); + if (value instanceof Number) { + return ((Number) value).longValue(); + } + } + } catch (Exception e) { + log.warn("[countMessages][统计消息数量失败]", e); + } + return 0; + } + + @Override + public Long selectCountByCreateTime(Long createTime) { + StringBuilder flux = new StringBuilder(); + flux.append(String.format("from(bucket: \"%s\")\n", escapeStringLiteral(properties.getBucket()))); + if (createTime != null) { + flux.append(String.format(" |> range(start: %s)\n", + Instant.ofEpochMilli(createTime).toString())); + } else { + flux.append(" |> range(start: 0)\n"); + } + flux.append(String.format(" |> filter(fn: (r) => r._measurement == \"%s\")\n", MEASUREMENT)); + flux.append(" |> filter(fn: (r) => r._field == \"id\")\n"); + flux.append(" |> count()\n"); + + try { + QueryApi queryApi = influxDBClient.getQueryApi(); + List tables = queryApi.query(flux.toString()); + if (CollUtil.isNotEmpty(tables) && CollUtil.isNotEmpty(tables.get(0).getRecords())) { + Object value = tables.get(0).getRecords().get(0).getValue(); + if (value instanceof Number) { + return ((Number) value).longValue(); + } + } + } catch (Exception e) { + log.error("[selectCountByCreateTime][统计消息数量失败]", e); + } + return 0L; + } + + @Override + public List selectListByRequestIdsAndReply(Long deviceId, Collection requestIds, + Boolean reply) { + if (CollUtil.isEmpty(requestIds)) { + return Collections.emptyList(); + } + + // 构建 request_id OR 过滤表达式(对每个 requestId 转义防止注入) + List idList = requestIds.stream().map(String::valueOf).toList(); + StringBuilder filterExpr = new StringBuilder(); + for (int i = 0; i < idList.size(); i++) { + if (i > 0) { + filterExpr.append(" or "); + } + filterExpr.append(String.format("r.request_id == \"%s\"", escapeStringLiteral(idList.get(i)))); + } + + StringBuilder flux = new StringBuilder(); + flux.append(String.format("from(bucket: \"%s\")\n", escapeStringLiteral(properties.getBucket()))); + flux.append(" |> range(start: -30d)\n"); + flux.append(String.format(" |> filter(fn: (r) => r._measurement == \"%s\")\n", MEASUREMENT)); + flux.append(String.format(" |> filter(fn: (r) => r.device_id == \"%s\")\n", + Long.toString(deviceId))); + flux.append(" |> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")\n"); + flux.append(String.format(" |> filter(fn: (r) => %s)\n", filterExpr)); + if (reply != null) { + flux.append(String.format(" |> filter(fn: (r) => r.reply == %s)\n", reply)); + } + + try { + QueryApi queryApi = influxDBClient.getQueryApi(); + List tables = queryApi.query(flux.toString()); + List result = new ArrayList<>(); + for (FluxTable table : tables) { + for (FluxRecord record : table.getRecords()) { + result.add(mapToDeviceMessageDO(record)); + } + } + return result; + } catch (Exception e) { + log.error("[selectListByRequestIdsAndReply][查询消息失败]", e); + return Collections.emptyList(); + } + } + + @Override + public List selectDeviceMessageCountGroupByDate(Long startTime, Long endTime) { + List result = new ArrayList<>(); + + // 分别查询 upstream 和 downstream 的数量,按小时聚合 + Map upstreamCounts = queryHourlyCount(startTime, endTime, true); + Map downstreamCounts = queryHourlyCount(startTime, endTime, false); + + // 合并结果,统一转换为 LocalDateTime 消除 TDengine/CTSDB 返回类型差异 + Set allTimes = new TreeSet<>(); + allTimes.addAll(upstreamCounts.keySet()); + allTimes.addAll(downstreamCounts.keySet()); + + for (Instant time : allTimes) { + LocalDateTime localTime = LocalDateTime.ofInstant(time, ZoneId.systemDefault()); + result.add(new HourlyMessageCountDTO( + localTime, + upstreamCounts.getOrDefault(time, 0L).intValue(), + downstreamCounts.getOrDefault(time, 0L).intValue())); + } + return result; + } + + private Map queryHourlyCount(Long startTime, Long endTime, boolean upstream) { + Map counts = new LinkedHashMap<>(); + StringBuilder flux = new StringBuilder(); + flux.append(String.format("from(bucket: \"%s\")\n", escapeStringLiteral(properties.getBucket()))); + flux.append(String.format(" |> range(start: %s, stop: %s)\n", + Instant.ofEpochMilli(startTime).toString(), + Instant.ofEpochMilli(endTime).toString())); + flux.append(String.format(" |> filter(fn: (r) => r._measurement == \"%s\")\n", MEASUREMENT)); + flux.append(String.format(" |> filter(fn: (r) => r._field == \"upstream\" and r._value == %s)\n", upstream)); + flux.append(" |> window(every: 1h)\n"); + flux.append(" |> count()\n"); + flux.append(" |> duplicate(column: \"_start\", as: \"_time\")\n"); + flux.append(" |> window(every: inf)\n"); + + try { + QueryApi queryApi = influxDBClient.getQueryApi(); + List tables = queryApi.query(flux.toString()); + for (FluxTable table : tables) { + for (FluxRecord record : table.getRecords()) { + Instant time = record.getTime(); + Object value = record.getValue(); + if (time != null && value instanceof Number) { + counts.put(time, ((Number) value).longValue()); + } + } + } + } catch (Exception e) { + log.error("[queryHourlyCount][按小时统计消息数量失败, upstream={}]", upstream, e); + } + return counts; + } + + // ========== 辅助方法 ========== + + /** + * 追加时间范围过滤,优先使用传入的时间范围,否则默认最近 30 天 + */ + private void appendTimeRange(StringBuilder flux, java.time.LocalDateTime[] times) { + if (times != null && times.length >= 2 && times[0] != null && times[1] != null) { + long start = LocalDateTimeUtil.toEpochMilli(times[0]); + long stop = LocalDateTimeUtil.toEpochMilli(times[1]); + flux.append(String.format(" |> range(start: %s, stop: %s)\n", + Instant.ofEpochMilli(start).toString(), + Instant.ofEpochMilli(stop).toString())); + } else { + flux.append(" |> range(start: -30d)\n"); + } + } + + /** + * 追加多租户过滤条件 + */ + private void appendTenantFilter(StringBuilder flux) { + Long tenantId = TenantContextHolder.getTenantId(); + if (tenantId != null) { + flux.append(String.format(" |> filter(fn: (r) => r.tenant_id == %d)\n", tenantId)); + } + } + + // ========== 结果映射 ========== + + private IotDeviceMessageDO mapToDeviceMessageDO(FluxRecord record) { + IotDeviceMessageDO message = new IotDeviceMessageDO(); + message.setId(getStringValue(record, "id")); + message.setTs(record.getTime() != null ? record.getTime().toEpochMilli() : null); + message.setReportTime(getLongValue(record, "report_time")); + message.setDeviceId(getLongValue(record, "device_id")); + message.setTenantId(getLongValue(record, "tenant_id")); + message.setServerId(getStringValue(record, "server_id")); + message.setUpstream(getBooleanValue(record, "upstream")); + message.setReply(getBooleanValue(record, "reply")); + message.setIdentifier(getStringValue(record, "identifier")); + message.setRequestId(getStringValue(record, "request_id")); + message.setMethod(getStringValue(record, "method")); + message.setParams(getStringValue(record, "params")); + message.setData(getStringValue(record, "data")); + message.setCode(getIntValue(record, "code")); + message.setMsg(getStringValue(record, "msg")); + return message; + } + + private String getStringValue(FluxRecord record, String key) { + Object value = record.getValueByKey(key); + return value != null ? String.valueOf(value) : null; + } + + private Long getLongValue(FluxRecord record, String key) { + Object value = record.getValueByKey(key); + if (value instanceof Number) { + return ((Number) value).longValue(); + } + if (value instanceof String str) { + try { + return Long.parseLong(str); + } catch (NumberFormatException ignored) {} + } + return null; + } + + private Integer getIntValue(FluxRecord record, String key) { + Long value = getLongValue(record, key); + return value != null ? value.intValue() : null; + } + + private Boolean getBooleanValue(FluxRecord record, String key) { + Object value = record.getValueByKey(key); + if (value instanceof Boolean) { + return (Boolean) value; + } + if (value instanceof String str) { + return Boolean.parseBoolean(str); + } + return null; + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/ctsdb/CtsdbDevicePropertyDaoImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/ctsdb/CtsdbDevicePropertyDaoImpl.java new file mode 100644 index 00000000..d41724c0 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/ctsdb/CtsdbDevicePropertyDaoImpl.java @@ -0,0 +1,208 @@ +package com.viewsh.module.iot.framework.tsdb.ctsdb; + +import cn.hutool.core.date.LocalDateTimeUtil; +import cn.hutool.core.util.StrUtil; +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.QueryApi; +import com.influxdb.client.WriteApi; +import com.influxdb.client.domain.WritePrecision; +import com.influxdb.client.write.Point; +import com.influxdb.query.FluxRecord; +import com.influxdb.query.FluxTable; +import com.viewsh.module.iot.controller.admin.device.vo.property.IotDevicePropertyHistoryListReqVO; +import com.viewsh.module.iot.controller.admin.device.vo.property.IotDevicePropertyRespVO; +import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO; +import com.viewsh.module.iot.framework.tsdb.IotTsDbDevicePropertyDao; +import com.viewsh.module.iot.framework.tsdb.TsDbTableField; +import lombok.extern.slf4j.Slf4j; + +import java.time.Instant; +import java.util.*; + +import static com.viewsh.module.iot.framework.tsdb.ctsdb.FluxQuerySanitizer.escapeStringLiteral; + +/** + * CTSDB (InfluxDB) 实现的设备属性时序 DAO + * + * 变更说明: + * - Flux 查询字符串拼接全部使用 FluxQuerySanitizer 转义,防止注入 + * - insert() 改用异步批量 WriteApi + * - selectHistory() 新增 productId 参数,精确匹配 measurement 替代正则全扫描 + */ +@Slf4j +public class CtsdbDevicePropertyDaoImpl implements IotTsDbDevicePropertyDao { + + private static final String MEASUREMENT_PREFIX = "product_property_"; + + private final InfluxDBClient influxDBClient; + private final CtsdbProperties properties; + private final WriteApi writeApi; + + /** + * 向下兼容构造函数(不使用异步写入) + */ + public CtsdbDevicePropertyDaoImpl(InfluxDBClient influxDBClient, CtsdbProperties properties) { + this(influxDBClient, properties, null); + } + + public CtsdbDevicePropertyDaoImpl(InfluxDBClient influxDBClient, CtsdbProperties properties, WriteApi writeApi) { + this.influxDBClient = influxDBClient; + this.properties = properties; + this.writeApi = writeApi; + } + + @Override + public List getTableFields(Long productId) { + String measurement = MEASUREMENT_PREFIX + productId; + // 使用 Flux 查询 field keys(bucket 和 measurement 名转义) + String flux = String.format( + "import \"influxdata/influxdb/schema\"\n" + + "schema.fieldKeys(bucket: \"%s\", measurement: \"%s\")", + escapeStringLiteral(this.properties.getBucket()), + escapeStringLiteral(measurement)); + + List fields = new ArrayList<>(); + try { + QueryApi queryApi = influxDBClient.getQueryApi(); + List tables = queryApi.query(flux); + for (FluxTable table : tables) { + for (FluxRecord record : table.getRecords()) { + String fieldName = (String) record.getValueByKey("_value"); + if (fieldName != null && !"report_time".equals(fieldName)) { + TsDbTableField field = new TsDbTableField(); + field.setField(fieldName); + field.setType(TsDbTableField.TYPE_STRING); + field.setTag(false); + fields.add(field); + } + } + } + // 添加 device_id tag + TsDbTableField deviceIdTag = new TsDbTableField(); + deviceIdTag.setField("device_id"); + deviceIdTag.setType(TsDbTableField.TYPE_INT); + deviceIdTag.setTag(true); + fields.add(deviceIdTag); + } catch (Exception e) { + if (e.getMessage() != null && e.getMessage().contains("not found")) { + log.debug("[getTableFields][measurement {} 不存在]", measurement); + return Collections.emptyList(); + } + throw new RuntimeException("Table does not exist", e); + } + return fields; + } + + @Override + public void createPropertyTable(Long productId, List fields) { + // InfluxDB 是 schema-on-write,无需预建表 + log.info("[createPropertyTable][CTSDB/InfluxDB schema-on-write,产品({}) 跳过建表]", productId); + } + + @Override + public void alterPropertyTable(Long productId, List oldFields, List newFields) { + // InfluxDB 是 schema-on-write,无需 ALTER TABLE + log.info("[alterPropertyTable][CTSDB/InfluxDB schema-on-write,产品({}) 跳过表结构变更]", productId); + } + + @Override + public void insert(IotDeviceDO device, Map properties, Long reportTime) { + String measurement = MEASUREMENT_PREFIX + device.getProductId(); + Point point = Point.measurement(measurement) + .addTag("device_id", String.valueOf(device.getId())) + .time(System.currentTimeMillis(), WritePrecision.MS); + + // report_time + if (reportTime != null) { + point.addField("report_time", reportTime); + } + + // 动态属性 + properties.forEach((key, value) -> { + String fieldName = StrUtil.toUnderlineCase(key); + if (value instanceof Integer intVal) { + point.addField(fieldName, intVal); + } else if (value instanceof Long longVal) { + point.addField(fieldName, longVal); + } else if (value instanceof Float floatVal) { + point.addField(fieldName, floatVal); + } else if (value instanceof Double doubleVal) { + point.addField(fieldName, doubleVal); + } else if (value instanceof Boolean boolVal) { + point.addField(fieldName, boolVal); + } else if (value != null) { + point.addField(fieldName, String.valueOf(value)); + } + }); + + // 优先使用异步批量写入 API,回退到同步写入 + if (writeApi != null) { + writeApi.writePoint(point); + } else { + influxDBClient.getWriteApiBlocking().writePoint(point); + } + } + + @Override + public List selectHistory(IotDevicePropertyHistoryListReqVO reqVO, Long productId) { + String fieldName = StrUtil.toUnderlineCase(reqVO.getIdentifier()); + + StringBuilder flux = new StringBuilder(); + flux.append(String.format("from(bucket: \"%s\")\n", escapeStringLiteral(this.properties.getBucket()))); + + // 时间范围 + if (reqVO.getTimes() != null && reqVO.getTimes().length >= 2 + && reqVO.getTimes()[0] != null && reqVO.getTimes()[1] != null) { + long start = LocalDateTimeUtil.toEpochMilli(reqVO.getTimes()[0]); + long stop = LocalDateTimeUtil.toEpochMilli(reqVO.getTimes()[1]); + flux.append(String.format(" |> range(start: %s, stop: %s)\n", + Instant.ofEpochMilli(start).toString(), + Instant.ofEpochMilli(stop).toString())); + } else { + flux.append(" |> range(start: -30d)\n"); + } + + // 精确匹配 measurement(通过 productId),替代原来的正则全扫描 + if (productId != null) { + String measurement = MEASUREMENT_PREFIX + productId; + flux.append(String.format(" |> filter(fn: (r) => r._measurement == \"%s\")\n", + escapeStringLiteral(measurement))); + } else { + // 兜底:如果 productId 未传入,使用前缀正则匹配(性能较差) + log.warn("[selectHistory][productId 为空,将使用正则扫描所有 measurement,性能可能较差]"); + flux.append(String.format(" |> filter(fn: (r) => r._measurement =~ /^%s/)\n", + FluxQuerySanitizer.escapeRegexLiteral(MEASUREMENT_PREFIX))); + } + + flux.append(String.format(" |> filter(fn: (r) => r.device_id == \"%s\")\n", + Long.toString(reqVO.getDeviceId()))); + flux.append(String.format(" |> filter(fn: (r) => r._field == \"%s\")\n", + escapeStringLiteral(fieldName))); + flux.append(" |> sort(columns: [\"_time\"], desc: true)\n"); + + List result = new ArrayList<>(); + try { + QueryApi queryApi = influxDBClient.getQueryApi(); + List tables = queryApi.query(flux.toString()); + for (FluxTable table : tables) { + for (FluxRecord record : table.getRecords()) { + IotDevicePropertyRespVO vo = new IotDevicePropertyRespVO(); + vo.setIdentifier(reqVO.getIdentifier()); + vo.setValue(record.getValue()); + if (record.getTime() != null) { + vo.setUpdateTime(record.getTime().toEpochMilli()); + } + result.add(vo); + } + } + } catch (Exception e) { + if (e.getMessage() != null && e.getMessage().contains("not found")) { + log.debug("[selectHistory][measurement 不存在]"); + return Collections.emptyList(); + } + throw new RuntimeException("Table does not exist", e); + } + return result; + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/ctsdb/CtsdbProperties.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/ctsdb/CtsdbProperties.java new file mode 100644 index 00000000..8c033dda --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/ctsdb/CtsdbProperties.java @@ -0,0 +1,35 @@ +package com.viewsh.module.iot.framework.tsdb.ctsdb; + +import lombok.Data; +import lombok.ToString; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * CTSDB (InfluxDB) 配置属性 + */ +@Data +@ConfigurationProperties(prefix = "viewsh.iot.tsdb.ctsdb") +public class CtsdbProperties { + + /** + * InfluxDB HTTP API 地址 + */ + private String url = "http://localhost:8086"; + + /** + * 认证 Token(排除在 toString 输出之外,避免日志泄漏) + */ + @ToString.Exclude + private String token = ""; + + /** + * 组织名称 + */ + private String org = "aiot"; + + /** + * 存储桶名称 + */ + private String bucket = "aiot_platform"; + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/ctsdb/FluxQuerySanitizer.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/ctsdb/FluxQuerySanitizer.java new file mode 100644 index 00000000..059d4e78 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/ctsdb/FluxQuerySanitizer.java @@ -0,0 +1,57 @@ +package com.viewsh.module.iot.framework.tsdb.ctsdb; + +/** + * Flux 查询安全工具类 + * + * 对拼接到 Flux 查询中的字符串值进行转义,防止 Flux 注入攻击。 + * InfluxDB Flux 查询语言中,字符串字面量使用双引号包裹, + * 需要转义的特殊字符包括: \, ", $, {, } + */ +public final class FluxQuerySanitizer { + + private FluxQuerySanitizer() { + } + + /** + * 转义 Flux 字符串字面量中的特殊字符 + * + * @param value 原始字符串 + * @return 转义后的安全字符串,可直接放入 Flux 双引号内 + */ + public static String escapeStringLiteral(String value) { + if (value == null) { + return ""; + } + StringBuilder sb = new StringBuilder(value.length() + 8); + for (int i = 0; i < value.length(); i++) { + char c = value.charAt(i); + switch (c) { + case '\\' -> sb.append("\\\\"); + case '"' -> sb.append("\\\""); + case '$' -> sb.append("\\$"); + case '{' -> sb.append("\\{"); + case '}' -> sb.append("\\}"); + case '\n' -> sb.append("\\n"); + case '\r' -> sb.append("\\r"); + case '\t' -> sb.append("\\t"); + default -> sb.append(c); + } + } + return sb.toString(); + } + + /** + * 转义 Flux 正则表达式中的特殊字符 + * + * @param value 原始字符串 + * @return 转义后的安全字符串,可直接放入 Flux 正则中 + */ + public static String escapeRegexLiteral(String value) { + if (value == null) { + return ""; + } + // 正则特殊字符: . * + ? ^ $ { } [ ] ( ) | \ + return value.replaceAll("([.\\\\*+?^${}\\[\\]()|])", "\\\\$1"); + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/tdengine/TDengineDeviceMessageDaoImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/tdengine/TDengineDeviceMessageDaoImpl.java new file mode 100644 index 00000000..28589c6d --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/tdengine/TDengineDeviceMessageDaoImpl.java @@ -0,0 +1,78 @@ +package com.viewsh.module.iot.framework.tsdb.tdengine; + +import cn.hutool.core.util.StrUtil; +import com.viewsh.module.iot.controller.admin.device.vo.message.IotDeviceMessagePageReqVO; +import com.viewsh.module.iot.dal.dataobject.device.IotDeviceMessageDO; +import com.viewsh.module.iot.dal.tdengine.IotDeviceMessageMapper; +import com.viewsh.module.iot.framework.tsdb.HourlyMessageCountDTO; +import com.viewsh.module.iot.framework.tsdb.IotTsDbDeviceMessageDao; +import com.baomidou.mybatisplus.core.metadata.IPage; +import lombok.RequiredArgsConstructor; + +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static com.viewsh.framework.common.util.collection.CollectionUtils.convertList; + +/** + * TDengine 实现的设备消息时序 DAO + * + * 委托给现有的 {@link IotDeviceMessageMapper} + */ +@RequiredArgsConstructor +public class TDengineDeviceMessageDaoImpl implements IotTsDbDeviceMessageDao { + + private final IotDeviceMessageMapper deviceMessageMapper; + + @Override + public void createSchema() { + deviceMessageMapper.createSTable(); + } + + @Override + public boolean schemaExists() { + return StrUtil.isNotEmpty(deviceMessageMapper.showSTable()); + } + + @Override + public void insert(IotDeviceMessageDO message) { + deviceMessageMapper.insert(message); + } + + @Override + public IPage selectPage(IPage page, IotDeviceMessagePageReqVO reqVO) { + return deviceMessageMapper.selectPage(page, reqVO); + } + + @Override + public Long selectCountByCreateTime(Long createTime) { + return deviceMessageMapper.selectCountByCreateTime(createTime); + } + + @Override + public List selectListByRequestIdsAndReply(Long deviceId, Collection requestIds, + Boolean reply) { + return deviceMessageMapper.selectListByRequestIdsAndReply(deviceId, requestIds, reply); + } + + @Override + public List selectDeviceMessageCountGroupByDate(Long startTime, Long endTime) { + // TDengine Mapper 返回 Map,其中 time 为 java.sql.Timestamp + // 在此统一转换为 HourlyMessageCountDTO,消除上层对 Timestamp 的强依赖 + List> rawList = deviceMessageMapper.selectDeviceMessageCountGroupByDate(startTime, endTime); + return convertList(rawList, row -> { + LocalDateTime time = null; + Object timeObj = row.get("time"); + if (timeObj instanceof Timestamp ts) { + time = ts.toLocalDateTime(); + } + Integer upstreamCount = row.get("upstream_count") instanceof Number n ? n.intValue() : 0; + Integer downstreamCount = row.get("downstream_count") instanceof Number n ? n.intValue() : 0; + return new HourlyMessageCountDTO(time, upstreamCount, downstreamCount); + }); + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/tdengine/TDengineDevicePropertyDaoImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/tdengine/TDengineDevicePropertyDaoImpl.java new file mode 100644 index 00000000..5fb73927 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/framework/tsdb/tdengine/TDengineDevicePropertyDaoImpl.java @@ -0,0 +1,109 @@ +package com.viewsh.module.iot.framework.tsdb.tdengine; + +import com.viewsh.module.iot.controller.admin.device.vo.property.IotDevicePropertyHistoryListReqVO; +import com.viewsh.module.iot.controller.admin.device.vo.property.IotDevicePropertyRespVO; +import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO; +import com.viewsh.module.iot.dal.tdengine.IotDevicePropertyMapper; +import com.viewsh.module.iot.framework.tdengine.core.TDengineTableField; +import com.viewsh.module.iot.framework.tsdb.IotTsDbDevicePropertyDao; +import com.viewsh.module.iot.framework.tsdb.TsDbTableField; +import lombok.RequiredArgsConstructor; + +import java.util.List; +import java.util.Map; + +import static com.viewsh.framework.common.util.collection.CollectionUtils.convertList; + +/** + * TDengine 实现的设备属性时序 DAO + * + * 委托给现有的 {@link IotDevicePropertyMapper} + */ +@RequiredArgsConstructor +public class TDengineDevicePropertyDaoImpl implements IotTsDbDevicePropertyDao { + + private final IotDevicePropertyMapper devicePropertyMapper; + + @Override + public List getTableFields(Long productId) { + List tdFields = devicePropertyMapper.getProductPropertySTableFieldList(productId); + return convertList(tdFields, TDengineDevicePropertyDaoImpl::toTsDbField); + } + + @Override + public void createPropertyTable(Long productId, List fields) { + devicePropertyMapper.createProductPropertySTable(productId, convertList(fields, + TDengineDevicePropertyDaoImpl::toTDengineField)); + } + + @Override + public void alterPropertyTable(Long productId, List oldFields, List newFields) { + devicePropertyMapper.alterProductPropertySTable(productId, + convertList(oldFields, TDengineDevicePropertyDaoImpl::toTDengineField), + convertList(newFields, TDengineDevicePropertyDaoImpl::toTDengineField)); + } + + @Override + public void insert(IotDeviceDO device, Map properties, Long reportTime) { + devicePropertyMapper.insert(device, properties, reportTime); + } + + @Override + public List selectHistory(IotDevicePropertyHistoryListReqVO reqVO, Long productId) { + // TDengine 通过子表名 device_property_{deviceId} 定位数据,不需要 productId + return devicePropertyMapper.selectListByHistory(reqVO); + } + + // ========== TsDbTableField <-> TDengineTableField 转换 ========== + + static TDengineTableField toTDengineField(TsDbTableField field) { + TDengineTableField tdField = new TDengineTableField(); + tdField.setField(field.getField()); + tdField.setType(toTDengineType(field.getType())); + tdField.setLength(field.getLength()); + tdField.setNote(field.isTag() ? TDengineTableField.NOTE_TAG : null); + return tdField; + } + + static TsDbTableField toTsDbField(TDengineTableField tdField) { + TsDbTableField field = new TsDbTableField(); + field.setField(tdField.getField()); + field.setType(fromTDengineType(tdField.getType())); + field.setLength(tdField.getLength()); + field.setTag(TDengineTableField.NOTE_TAG.equals(tdField.getNote())); + return field; + } + + private static String toTDengineType(String tsDbType) { + if (tsDbType == null) { + return null; + } + return switch (tsDbType) { + case TsDbTableField.TYPE_TINYINT -> TDengineTableField.TYPE_TINYINT; + case TsDbTableField.TYPE_INT -> TDengineTableField.TYPE_INT; + case TsDbTableField.TYPE_FLOAT -> TDengineTableField.TYPE_FLOAT; + case TsDbTableField.TYPE_DOUBLE -> TDengineTableField.TYPE_DOUBLE; + case TsDbTableField.TYPE_BOOL -> TDengineTableField.TYPE_BOOL; + case TsDbTableField.TYPE_STRING -> TDengineTableField.TYPE_VARCHAR; + case TsDbTableField.TYPE_TIMESTAMP -> TDengineTableField.TYPE_TIMESTAMP; + default -> tsDbType; + }; + } + + private static String fromTDengineType(String tdType) { + if (tdType == null) { + return null; + } + return switch (tdType.toUpperCase()) { + case "TINYINT" -> TsDbTableField.TYPE_TINYINT; + case "INT" -> TsDbTableField.TYPE_INT; + case "FLOAT" -> TsDbTableField.TYPE_FLOAT; + case "DOUBLE" -> TsDbTableField.TYPE_DOUBLE; + case "BOOL" -> TsDbTableField.TYPE_BOOL; + case "VARCHAR", "NCHAR" -> TsDbTableField.TYPE_STRING; + case "TIMESTAMP" -> TsDbTableField.TYPE_TIMESTAMP; + default -> tdType; + }; + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/message/IotDeviceMessageServiceImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/message/IotDeviceMessageServiceImpl.java index eb70bf3f..a2e6093e 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/message/IotDeviceMessageServiceImpl.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/message/IotDeviceMessageServiceImpl.java @@ -2,7 +2,6 @@ package com.viewsh.module.iot.service.device.message; import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.lang.Assert; -import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import com.viewsh.framework.common.exception.ServiceException; @@ -20,7 +19,7 @@ import com.viewsh.module.iot.core.mq.producer.IotDeviceMessageProducer; import com.viewsh.module.iot.core.util.IotDeviceMessageUtils; import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO; import com.viewsh.module.iot.dal.dataobject.device.IotDeviceMessageDO; -import com.viewsh.module.iot.dal.tdengine.IotDeviceMessageMapper; +import com.viewsh.module.iot.framework.tsdb.IotTsDbDeviceMessageDao; import com.viewsh.module.iot.service.device.IotDeviceService; import com.viewsh.module.iot.service.device.property.IotDevicePropertyService; import com.viewsh.module.iot.service.ota.IotOtaTaskRecordService; @@ -34,7 +33,6 @@ import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.validation.annotation.Validated; -import java.sql.Timestamp; import java.time.LocalDateTime; import java.util.List; import java.util.Map; @@ -62,20 +60,20 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { private IotOtaTaskRecordService otaTaskRecordService; @Resource - private IotDeviceMessageMapper deviceMessageMapper; + private IotTsDbDeviceMessageDao tsDbDeviceMessageDao; @Resource private IotDeviceMessageProducer deviceMessageProducer; @Override public void defineDeviceMessageStable() { - if (StrUtil.isNotEmpty(deviceMessageMapper.showSTable())) { - log.info("[defineDeviceMessageStable][设备消息超级表已存在,创建跳过]"); + if (tsDbDeviceMessageDao.schemaExists()) { + log.info("[defineDeviceMessageStable][设备消息表已存在,创建跳过]"); return; } - log.info("[defineDeviceMessageStable][设备消息超级表不存在,创建开始...]"); - deviceMessageMapper.createSTable(); - log.info("[defineDeviceMessageStable][设备消息超级表不存在,创建成功]"); + log.info("[defineDeviceMessageStable][设备消息表不存在,创建开始...]"); + tsDbDeviceMessageDao.createSchema(); + log.info("[defineDeviceMessageStable][设备消息表不存在,创建成功]"); } @Async @@ -90,7 +88,7 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { if (messageDO.getData() != null) { messageDO.setData(JsonUtils.toJsonString(messageDO.getData())); } - deviceMessageMapper.insert(messageDO); + tsDbDeviceMessageDao.insert(messageDO); } @Override @@ -224,7 +222,7 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { @Override public PageResult getDeviceMessagePage(IotDeviceMessagePageReqVO pageReqVO) { try { - IPage page = deviceMessageMapper.selectPage( + IPage page = tsDbDeviceMessageDao.selectPage( new Page<>(pageReqVO.getPageNo(), pageReqVO.getPageSize()), pageReqVO); return new PageResult<>(page.getRecords(), page.getTotal()); } catch (Exception exception) { @@ -239,33 +237,36 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { public List getDeviceMessageListByRequestIdsAndReply(Long deviceId, List requestIds, Boolean reply) { - return deviceMessageMapper.selectListByRequestIdsAndReply(deviceId, requestIds, reply); + return tsDbDeviceMessageDao.selectListByRequestIdsAndReply(deviceId, requestIds, reply); } @Override public Long getDeviceMessageCount(LocalDateTime createTime) { - return deviceMessageMapper.selectCountByCreateTime( + return tsDbDeviceMessageDao.selectCountByCreateTime( createTime != null ? LocalDateTimeUtil.toEpochMilli(createTime) : null); } @Override public List getDeviceMessageSummaryByDate( IotStatisticsDeviceMessageReqVO reqVO) { - // 1. 按小时统计,获取分项统计数据 - List> countList = deviceMessageMapper.selectDeviceMessageCountGroupByDate( - LocalDateTimeUtil.toEpochMilli(reqVO.getTimes()[0]), - LocalDateTimeUtil.toEpochMilli(reqVO.getTimes()[1])); + // 1. 按小时统计,获取分项统计数据(返回类型已统一为 HourlyMessageCountDTO) + List countList = + tsDbDeviceMessageDao.selectDeviceMessageCountGroupByDate( + LocalDateTimeUtil.toEpochMilli(reqVO.getTimes()[0]), + LocalDateTimeUtil.toEpochMilli(reqVO.getTimes()[1])); // 2. 按照日期间隔,合并数据 List timeRanges = LocalDateTimeUtils.getDateRangeList(reqVO.getTimes()[0], reqVO.getTimes()[1], reqVO.getInterval()); return convertList(timeRanges, times -> { Integer upstreamCount = countList.stream() - .filter(vo -> LocalDateTimeUtils.isBetween(times[0], times[1], (Timestamp) vo.get("time"))) - .mapToInt(value -> MapUtil.getInt(value, "upstream_count")).sum(); + .filter(dto -> dto.getTime() != null + && LocalDateTimeUtil.isIn(dto.getTime(), times[0], times[1])) + .mapToInt(dto -> dto.getUpstreamCount() != null ? dto.getUpstreamCount() : 0).sum(); Integer downstreamCount = countList.stream() - .filter(vo -> LocalDateTimeUtils.isBetween(times[0], times[1], (Timestamp) vo.get("time"))) - .mapToInt(value -> MapUtil.getInt(value, "downstream_count")).sum(); + .filter(dto -> dto.getTime() != null + && LocalDateTimeUtil.isIn(dto.getTime(), times[0], times[1])) + .mapToInt(dto -> dto.getDownstreamCount() != null ? dto.getDownstreamCount() : 0).sum(); return new IotStatisticsDeviceMessageSummaryByDateRespVO() .setTime(LocalDateTimeUtils.formatDateRange(times[0], times[1], reqVO.getInterval())) .setUpstreamCount(upstreamCount).setDownstreamCount(downstreamCount); diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java index 7775fbfe..3f4919c4 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java @@ -20,10 +20,11 @@ import com.viewsh.module.iot.dal.dataobject.thingmodel.model.dataType.ThingModel import com.viewsh.module.iot.dal.redis.device.DevicePropertyRedisDAO; import com.viewsh.module.iot.dal.redis.device.DeviceReportTimeRedisDAO; import com.viewsh.module.iot.dal.redis.device.DeviceServerIdRedisDAO; -import com.viewsh.module.iot.dal.tdengine.IotDevicePropertyMapper; import com.viewsh.module.iot.enums.thingmodel.IotDataSpecsDataTypeEnum; import com.viewsh.module.iot.enums.thingmodel.IotThingModelTypeEnum; -import com.viewsh.module.iot.framework.tdengine.core.TDengineTableField; +import com.viewsh.module.iot.framework.tsdb.IotTsDbDevicePropertyDao; +import com.viewsh.module.iot.framework.tsdb.TsDbTableField; +import com.viewsh.module.iot.service.device.IotDeviceService; import com.viewsh.module.iot.service.product.IotProductService; import com.viewsh.module.iot.service.thingmodel.IotThingModelService; import jakarta.annotation.Resource; @@ -47,20 +48,18 @@ import static com.viewsh.framework.common.util.collection.CollectionUtils.*; public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { /** - * 物模型的数据类型,与 TDengine 数据类型的映射关系 - * - * @see TDEngine 数据类型 + * 物模型的数据类型,与时序数据库通用类型的映射关系 */ private static final Map TYPE_MAPPING = MapUtil.builder() - .put(IotDataSpecsDataTypeEnum.INT.getDataType(), TDengineTableField.TYPE_INT) - .put(IotDataSpecsDataTypeEnum.FLOAT.getDataType(), TDengineTableField.TYPE_FLOAT) - .put(IotDataSpecsDataTypeEnum.DOUBLE.getDataType(), TDengineTableField.TYPE_DOUBLE) - .put(IotDataSpecsDataTypeEnum.ENUM.getDataType(), TDengineTableField.TYPE_TINYINT) - .put(IotDataSpecsDataTypeEnum.BOOL.getDataType(), TDengineTableField.TYPE_TINYINT) - .put(IotDataSpecsDataTypeEnum.TEXT.getDataType(), TDengineTableField.TYPE_VARCHAR) - .put(IotDataSpecsDataTypeEnum.DATE.getDataType(), TDengineTableField.TYPE_TIMESTAMP) - .put(IotDataSpecsDataTypeEnum.STRUCT.getDataType(), TDengineTableField.TYPE_VARCHAR) - .put(IotDataSpecsDataTypeEnum.ARRAY.getDataType(), TDengineTableField.TYPE_VARCHAR) + .put(IotDataSpecsDataTypeEnum.INT.getDataType(), TsDbTableField.TYPE_INT) + .put(IotDataSpecsDataTypeEnum.FLOAT.getDataType(), TsDbTableField.TYPE_FLOAT) + .put(IotDataSpecsDataTypeEnum.DOUBLE.getDataType(), TsDbTableField.TYPE_DOUBLE) + .put(IotDataSpecsDataTypeEnum.ENUM.getDataType(), TsDbTableField.TYPE_TINYINT) + .put(IotDataSpecsDataTypeEnum.BOOL.getDataType(), TsDbTableField.TYPE_TINYINT) + .put(IotDataSpecsDataTypeEnum.TEXT.getDataType(), TsDbTableField.TYPE_STRING) + .put(IotDataSpecsDataTypeEnum.DATE.getDataType(), TsDbTableField.TYPE_TIMESTAMP) + .put(IotDataSpecsDataTypeEnum.STRUCT.getDataType(), TsDbTableField.TYPE_STRING) + .put(IotDataSpecsDataTypeEnum.ARRAY.getDataType(), TsDbTableField.TYPE_STRING) .build(); @Resource @@ -68,6 +67,9 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { @Resource @Lazy // 延迟加载,解决循环依赖 private IotProductService productService; + @Resource + @Lazy + private IotDeviceService deviceService; @Resource private DevicePropertyRedisDAO deviceDataRedisDAO; @@ -77,7 +79,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { private DeviceServerIdRedisDAO deviceServerIdRedisDAO; @Resource - private IotDevicePropertyMapper devicePropertyMapper; + private IotTsDbDevicePropertyDao tsDbDevicePropertyDao; @Resource private RedisMQTemplate redisMQTemplate; @@ -97,9 +99,9 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { List thingModels = filterList(thingModelService.getThingModelListByProductId(productId), thingModel -> IotThingModelTypeEnum.PROPERTY.getType().equals(thingModel.getType())); // 1.2 解析 DB 里的字段 - List oldFields = new ArrayList<>(); + List oldFields = new ArrayList<>(); try { - oldFields.addAll(devicePropertyMapper.getProductPropertySTableFieldList(product.getId())); + oldFields.addAll(tsDbDevicePropertyDao.getTableFields(product.getId())); } catch (Exception e) { if (!e.getMessage().contains("Table does not exist")) { throw e; @@ -107,30 +109,30 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { } // 2.1 情况一:如果是新增的时候,需要创建表 - List newFields = buildTableFieldList(thingModels); + List newFields = buildTableFieldList(thingModels); if (CollUtil.isEmpty(oldFields)) { if (CollUtil.isEmpty(newFields)) { log.info("[defineDevicePropertyData][productId({}) 没有需要定义的属性]", productId); return; } - devicePropertyMapper.createProductPropertySTable(product.getId(), newFields); + tsDbDevicePropertyDao.createPropertyTable(product.getId(), newFields); return; } // 2.2 情况二:如果是修改的时候,需要更新表 - devicePropertyMapper.alterProductPropertySTable(product.getId(), oldFields, newFields); + tsDbDevicePropertyDao.alterPropertyTable(product.getId(), oldFields, newFields); } - private List buildTableFieldList(List thingModels) { + private List buildTableFieldList(List thingModels) { return convertList(thingModels, thingModel -> { - TDengineTableField field = new TDengineTableField( - StrUtil.toUnderlineCase(thingModel.getIdentifier()), // TDengine 字段默认都是小写 + TsDbTableField field = new TsDbTableField( + StrUtil.toUnderlineCase(thingModel.getIdentifier()), TYPE_MAPPING.get(thingModel.getProperty().getDataType())); String dataType = thingModel.getProperty().getDataType(); if (Objects.equals(dataType, IotDataSpecsDataTypeEnum.TEXT.getDataType())) { field.setLength(((ThingModelDateOrTextDataSpecs) thingModel.getProperty().getDataSpecs()).getLength()); } else if (ObjectUtils.equalsAny(dataType, IotDataSpecsDataTypeEnum.STRUCT.getDataType(), IotDataSpecsDataTypeEnum.ARRAY.getDataType())) { - field.setLength(TDengineTableField.LENGTH_VARCHAR); + field.setLength(TsDbTableField.LENGTH_DEFAULT_STRING); } return field; }); @@ -167,7 +169,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { } // 2.1 保存设备属性【数据】 - devicePropertyMapper.insert(device, properties, LocalDateTimeUtil.toEpochMilli(message.getReportTime())); + tsDbDevicePropertyDao.insert(device, properties, LocalDateTimeUtil.toEpochMilli(message.getReportTime())); // 2.2 保存设备属性【日志】 Map properties2 = convertMap(properties.entrySet(), Map.Entry::getKey, entry -> @@ -243,7 +245,10 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { @Override public List getHistoryDevicePropertyList(IotDevicePropertyHistoryListReqVO listReqVO) { try { - return devicePropertyMapper.selectListByHistory(listReqVO); + // 通过 deviceId 查询 productId,用于 CTSDB 精确匹配 measurement + IotDeviceDO device = deviceService.getDevice(listReqVO.getDeviceId()); + Long productId = device != null ? device.getProductId() : null; + return tsDbDevicePropertyDao.selectHistory(listReqVO, productId); } catch (Exception exception) { if (exception.getMessage().contains("Table does not exist")) { return Collections.emptyList(); diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/resources/application.yaml b/viewsh-module-iot/viewsh-module-iot-server/src/main/resources/application.yaml index c8ac4931..00eddc78 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/resources/application.yaml +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/resources/application.yaml @@ -153,6 +153,13 @@ viewsh: iot: message-bus: type: redis # 消息总线的类型 + tsdb: + type: tdengine # 时序数据库类型: tdengine | ctsdb + ctsdb: # CTSDB (InfluxDB) 配置,仅 type=ctsdb 时生效 + url: http://${CTSDB_HOST:localhost}:${CTSDB_PORT:8086} + token: ${CTSDB_TOKEN:} + org: ${CTSDB_ORG:aiot} + bucket: ${CTSDB_BUCKET:aiot_platform} # 跨模块事件总线配置(IntegrationEventBus) integration: mq: