feat(iot): 抽象时序数据库访问层,支持 TDengine + CTSDB(InfluxDB) 双引擎

将原有 TDengine 强耦合的 Mapper 层重构为统一的 TsDb 抽象接口:
- 新增 IotTsDbDeviceMessageDao / IotTsDbDevicePropertyDao 接口
- 实现 TDengine 和 CTSDB(InfluxDB) 两套适配器
- 通过 viewsh.iot.tsdb.type 配置项切换时序数据库引擎
- Service 层从直接依赖 TDengine Mapper 改为依赖抽象 Dao 接口
- 新增 influxdb-client-java 7.2.0 依赖
- 删除旧的 TDengineTableInitRunner,统一由 TsDbTableInitRunner 管理

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
lzh
2026-04-13 14:14:51 +08:00
parent 88a6651d59
commit d451aaf449
20 changed files with 1423 additions and 84 deletions

View File

@@ -69,6 +69,11 @@
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
</dependency>
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.viewsh</groupId>

View File

@@ -10,7 +10,7 @@ import com.viewsh.module.iot.controller.admin.device.vo.message.IotDeviceMessage
import com.viewsh.module.iot.controller.admin.device.vo.message.IotDeviceMessageSendReqVO;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceMessageDO;
import com.viewsh.module.iot.dal.tdengine.IotDeviceMessageMapper;
import com.viewsh.module.iot.service.device.IotDeviceService;
import com.viewsh.module.iot.service.device.message.IotDeviceMessageService;
import com.viewsh.module.iot.service.thingmodel.IotThingModelService;
@@ -41,8 +41,6 @@ public class IotDeviceMessageController {
private IotDeviceService deviceService;
@Resource
private IotThingModelService thingModelService;
@Resource
private IotDeviceMessageMapper deviceMessageMapper;
@GetMapping("/page")
@Operation(summary = "获得设备消息分页")

View File

@@ -1,34 +0,0 @@
package com.viewsh.module.iot.framework.tdengine.config;
import com.viewsh.module.iot.service.device.message.IotDeviceMessageService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
/**
* TDengine 表初始化的 Configuration
*
* @author alwayssuper
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class TDengineTableInitRunner implements ApplicationRunner {
private final IotDeviceMessageService deviceMessageService;
@Override
public void run(ApplicationArguments args) {
try {
// 初始化设备消息表
deviceMessageService.defineDeviceMessageStable();
} catch (Exception ex) {
// 初始化失败时打印错误消息并退出系统
log.error("[run][TDengine初始化设备消息表结构失败系统无法正常运行即将退出]", ex);
System.exit(1);
}
}
}

View File

@@ -0,0 +1,35 @@
package com.viewsh.module.iot.framework.tsdb;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* 按小时统计的设备消息计数 DTO
*
* 替代 {@code Map<String, Object>},解决 TDengine 返回 {@code java.sql.Timestamp}
* 而 CTSDB 返回 {@code java.time.Instant} 导致 Service 层强转 ClassCastException 的问题
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class HourlyMessageCountDTO {
/**
* 小时时间窗口的起始时间
*/
private LocalDateTime time;
/**
* 上行消息数量
*/
private Integer upstreamCount;
/**
* 下行消息数量
*/
private Integer downstreamCount;
}

View File

@@ -0,0 +1,72 @@
package com.viewsh.module.iot.framework.tsdb;
import com.viewsh.module.iot.controller.admin.device.vo.message.IotDeviceMessagePageReqVO;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceMessageDO;
import com.baomidou.mybatisplus.core.metadata.IPage;
import java.util.Collection;
import java.util.List;
/**
* 设备消息时序数据访问接口
*
* 抽象不同时序数据库TDengine、CTSDB/InfluxDB的设备消息操作
*/
public interface IotTsDbDeviceMessageDao {
/**
* 创建设备消息的 Schema超级表/Measurement
*/
void createSchema();
/**
* 检查设备消息的 Schema 是否存在
*
* @return 是否存在
*/
boolean schemaExists();
/**
* 插入设备消息数据
*
* @param message 设备消息数据
*/
void insert(IotDeviceMessageDO message);
/**
* 获得设备消息分页
*
* @param page 分页参数
* @param reqVO 查询条件
* @return 分页结果
*/
IPage<IotDeviceMessageDO> selectPage(IPage<IotDeviceMessageDO> page, IotDeviceMessagePageReqVO reqVO);
/**
* 统计设备消息数量
*
* @param createTime 创建时间(毫秒时间戳),如果为空,则统计所有消息数量
* @return 消息数量
*/
Long selectCountByCreateTime(Long createTime);
/**
* 按照 requestIds 批量查询消息
*
* @param deviceId 设备编号
* @param requestIds 请求编号集合
* @param reply 是否回复消息
* @return 消息列表
*/
List<IotDeviceMessageDO> selectListByRequestIdsAndReply(Long deviceId, Collection<String> requestIds, Boolean reply);
/**
* 按照时间范围(小时),统计设备的消息数量
*
* @param startTime 开始时间(毫秒时间戳)
* @param endTime 结束时间(毫秒时间戳)
* @return 按小时分组的统计数据
*/
List<HourlyMessageCountDTO> selectDeviceMessageCountGroupByDate(Long startTime, Long endTime);
}

View File

@@ -0,0 +1,61 @@
package com.viewsh.module.iot.framework.tsdb;
import com.viewsh.module.iot.controller.admin.device.vo.property.IotDevicePropertyHistoryListReqVO;
import com.viewsh.module.iot.controller.admin.device.vo.property.IotDevicePropertyRespVO;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
import java.util.List;
import java.util.Map;
/**
* 设备属性时序数据访问接口
*
* 抽象不同时序数据库TDengine、CTSDB/InfluxDB的设备属性操作
*/
public interface IotTsDbDevicePropertyDao {
/**
* 获取产品属性表的字段列表
*
* @param productId 产品编号
* @return 字段列表
*/
List<TsDbTableField> getTableFields(Long productId);
/**
* 创建产品属性表
*
* @param productId 产品编号
* @param fields 字段列表
*/
void createPropertyTable(Long productId, List<TsDbTableField> fields);
/**
* 变更产品属性表结构(对比旧字段和新字段,自动执行 add/modify/drop
*
* @param productId 产品编号
* @param oldFields 旧字段列表
* @param newFields 新字段列表
*/
void alterPropertyTable(Long productId, List<TsDbTableField> oldFields, List<TsDbTableField> newFields);
/**
* 插入设备属性数据
*
* @param device 设备
* @param properties 属性数据
* @param reportTime 上报时间(毫秒时间戳)
*/
void insert(IotDeviceDO device, Map<String, Object> properties, Long reportTime);
/**
* 查询设备属性历史数据
*
* @param reqVO 查询条件
* @param productId 产品编号CTSDB 实现需要此参数精确定位 measurement
* TDengine 实现可忽略(通过子表名定位)
* @return 历史数据列表
*/
List<IotDevicePropertyRespVO> selectHistory(IotDevicePropertyHistoryListReqVO reqVO, Long productId);
}

View File

@@ -0,0 +1,57 @@
package com.viewsh.module.iot.framework.tsdb;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 时序数据库通用字段定义
*
* 用于抽象不同时序数据库TDengine、CTSDB/InfluxDB的表字段描述
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TsDbTableField {
// ========== 通用类型常量 ==========
public static final String TYPE_TINYINT = "TINYINT";
public static final String TYPE_INT = "INT";
public static final String TYPE_FLOAT = "FLOAT";
public static final String TYPE_DOUBLE = "DOUBLE";
public static final String TYPE_BOOL = "BOOL";
public static final String TYPE_STRING = "STRING";
public static final String TYPE_TIMESTAMP = "TIMESTAMP";
/**
* 字段长度 - STRING 默认长度
*/
public static final int LENGTH_DEFAULT_STRING = 1024;
/**
* 字段名
*/
private String field;
/**
* 字段类型(通用类型)
*/
private String type;
/**
* 字段长度
*/
private Integer length;
/**
* 是否为 TAG 字段
*/
private boolean tag;
public TsDbTableField(String field, String type) {
this.field = field;
this.type = type;
}
}

View File

@@ -0,0 +1,84 @@
package com.viewsh.module.iot.framework.tsdb.config;
import com.viewsh.module.iot.dal.tdengine.IotDeviceMessageMapper;
import com.viewsh.module.iot.dal.tdengine.IotDevicePropertyMapper;
import com.viewsh.module.iot.framework.tsdb.IotTsDbDeviceMessageDao;
import com.viewsh.module.iot.framework.tsdb.IotTsDbDevicePropertyDao;
import com.viewsh.module.iot.framework.tsdb.tdengine.TDengineDeviceMessageDaoImpl;
import com.viewsh.module.iot.framework.tsdb.tdengine.TDengineDevicePropertyDaoImpl;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 时序数据库自动装配
*
* 根据配置 viewsh.iot.tsdb.type 选择 TDengine 或 CTSDB 实现
*/
@Configuration
@Slf4j
public class TsDbAutoConfiguration {
/**
* 配置校验:确保 viewsh.iot.tsdb.type 的值合法,避免拼写错误导致两种实现都未生效
*/
@Value("${viewsh.iot.tsdb.type:tdengine}")
private String tsDbType;
@PostConstruct
public void validateTsDbType() {
if (!"tdengine".equalsIgnoreCase(tsDbType) && !"ctsdb".equalsIgnoreCase(tsDbType)) {
throw new IllegalArgumentException(
String.format("无效的时序数据库类型: '%s',仅支持 'tdengine' 或 'ctsdb'" +
"请检查配置项 viewsh.iot.tsdb.type", tsDbType));
}
log.info("[validateTsDbType][时序数据库类型: {}]", tsDbType);
}
// ========== TDengine 实现 ==========
@Bean
@ConditionalOnProperty(name = "viewsh.iot.tsdb.type", havingValue = "tdengine", matchIfMissing = true)
public IotTsDbDeviceMessageDao tdengineDeviceMessageDao(IotDeviceMessageMapper mapper) {
return new TDengineDeviceMessageDaoImpl(mapper);
}
@Bean
@ConditionalOnProperty(name = "viewsh.iot.tsdb.type", havingValue = "tdengine", matchIfMissing = true)
public IotTsDbDevicePropertyDao tdengineDevicePropertyDao(IotDevicePropertyMapper mapper) {
return new TDengineDevicePropertyDaoImpl(mapper);
}
// ========== CTSDB (InfluxDB) 实现 ==========
// 使用内部类 + @ConditionalOnClass 保护,避免 classpath 无 InfluxDB 依赖时 NoClassDefFoundError
@Configuration
@ConditionalOnClass(name = "com.influxdb.client.InfluxDBClient")
@ConditionalOnProperty(name = "viewsh.iot.tsdb.type", havingValue = "ctsdb")
static class CtsdbDaoConfiguration {
@Bean
public IotTsDbDeviceMessageDao ctsdbDeviceMessageDao(
com.influxdb.client.InfluxDBClient influxDBClient,
com.viewsh.module.iot.framework.tsdb.ctsdb.CtsdbProperties properties,
com.influxdb.client.WriteApi ctsdbWriteApi) {
return new com.viewsh.module.iot.framework.tsdb.ctsdb.CtsdbDeviceMessageDaoImpl(
influxDBClient, properties, ctsdbWriteApi);
}
@Bean
public IotTsDbDevicePropertyDao ctsdbDevicePropertyDao(
com.influxdb.client.InfluxDBClient influxDBClient,
com.viewsh.module.iot.framework.tsdb.ctsdb.CtsdbProperties properties,
com.influxdb.client.WriteApi ctsdbWriteApi) {
return new com.viewsh.module.iot.framework.tsdb.ctsdb.CtsdbDevicePropertyDaoImpl(
influxDBClient, properties, ctsdbWriteApi);
}
}
}

View File

@@ -0,0 +1,55 @@
package com.viewsh.module.iot.framework.tsdb.config;
import com.viewsh.module.iot.service.device.message.IotDeviceMessageService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
/**
* 时序数据库表初始化 Runner
*
* 替代原 TDengineTableInitRunner支持 TDengine 和 CTSDB 两种时序数据库。
* 初始化失败时重试最多 3 次(间隔 5 秒),全部失败后抛出异常让 Spring 容器启动失败,
* 而非直接 System.exit(1),以便容器编排工具(如 K8s能正确感知启动失败并触发重启。
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class TsDbTableInitRunner implements ApplicationRunner {
private static final int MAX_RETRIES = 3;
private static final long RETRY_INTERVAL_MS = 5000;
private final IotDeviceMessageService deviceMessageService;
@Override
public void run(ApplicationArguments args) {
Exception lastException = null;
for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
// 初始化设备消息表TDengine 会创建超级表CTSDB 会跳过)
deviceMessageService.defineDeviceMessageStable();
log.info("[run][时序数据库初始化设备消息表结构成功]");
return;
} catch (Exception ex) {
lastException = ex;
log.warn("[run][时序数据库初始化失败,第 {}/{} 次尝试]", attempt, MAX_RETRIES, ex);
if (attempt < MAX_RETRIES) {
try {
Thread.sleep(RETRY_INTERVAL_MS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
// 重试全部失败,抛出异常让 Spring 容器启动失败
throw new IllegalStateException(
"时序数据库初始化设备消息表结构失败(已重试 " + MAX_RETRIES + " 次),系统无法正常运行",
lastException);
}
}

View File

@@ -0,0 +1,55 @@
package com.viewsh.module.iot.framework.tsdb.ctsdb;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.WriteOptions;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* CTSDB (InfluxDB) 客户端配置
*
* 添加 @ConditionalOnClass 保护,避免 classpath 无 InfluxDB 依赖时类加载失败
*/
@Configuration
@ConditionalOnClass(name = "com.influxdb.client.InfluxDBClient")
@ConditionalOnProperty(name = "viewsh.iot.tsdb.type", havingValue = "ctsdb")
@EnableConfigurationProperties(CtsdbProperties.class)
public class CtsdbClientConfig {
@Bean(destroyMethod = "close")
public InfluxDBClient influxDBClient(CtsdbProperties properties) {
return InfluxDBClientFactory.create(
properties.getUrl(),
properties.getToken().toCharArray(),
properties.getOrg(),
properties.getBucket()
);
}
/**
* 异步批量写入 API适合 IoT 高频写入场景
*
* - batchSize: 累计 1000 个 Point 后批量刷写
* - flushInterval: 每 1 秒自动刷写一次(即使未达 batchSize
* - retryInterval: 写入失败后 2 秒重试
* - maxRetries: 最多重试 3 次
*
* 注意WriteApi 实现了 AutoCloseabledestroyMethod="close" 确保应用关闭时数据全部刷写
*/
@Bean(destroyMethod = "close")
public WriteApi ctsdbWriteApi(InfluxDBClient influxDBClient) {
WriteOptions writeOptions = WriteOptions.builder()
.batchSize(1000)
.flushInterval(1000)
.retryInterval(2000)
.maxRetries(3)
.build();
return influxDBClient.makeWriteApi(writeOptions);
}
}

View File

@@ -0,0 +1,445 @@
package com.viewsh.module.iot.framework.tsdb.ctsdb;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.StrUtil;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.QueryApi;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
import com.viewsh.module.iot.controller.admin.device.vo.message.IotDeviceMessagePageReqVO;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceMessageDO;
import com.viewsh.module.iot.framework.tsdb.HourlyMessageCountDTO;
import com.viewsh.module.iot.framework.tsdb.IotTsDbDeviceMessageDao;
import com.baomidou.mybatisplus.core.metadata.IPage;
import lombok.extern.slf4j.Slf4j;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
import static com.viewsh.module.iot.framework.tsdb.ctsdb.FluxQuerySanitizer.escapeStringLiteral;
/**
* CTSDB (InfluxDB) 实现的设备消息时序 DAO
*
* 变更说明:
* - Flux 查询字符串拼接全部使用 FluxQuerySanitizer 转义,防止注入
* - insert() 改用异步批量 WriteApi提升高频写入性能
* - selectPage()/countMessages() 默认最近 30 天,避免 range(start:0) 全表扫描
* - 查询方法添加 tenant_id 过滤,保障多租户数据隔离
* - selectDeviceMessageCountGroupByDate() 返回 HourlyMessageCountDTO消除类型转换问题
*/
@Slf4j
public class CtsdbDeviceMessageDaoImpl implements IotTsDbDeviceMessageDao {
private static final String MEASUREMENT = "device_message";
private final InfluxDBClient influxDBClient;
private final CtsdbProperties properties;
/**
* 异步批量写入 API由 CtsdbClientConfig 创建。
* 相比 WriteApiBlocking支持自动批量刷写和失败重试适合 IoT 高频写入场景。
* 可为 null向下兼容旧构造函数此时回退到同步写入。
*/
private final WriteApi writeApi;
/**
* 向下兼容构造函数(不使用异步写入)
*/
public CtsdbDeviceMessageDaoImpl(InfluxDBClient influxDBClient, CtsdbProperties properties) {
this(influxDBClient, properties, null);
}
public CtsdbDeviceMessageDaoImpl(InfluxDBClient influxDBClient, CtsdbProperties properties, WriteApi writeApi) {
this.influxDBClient = influxDBClient;
this.properties = properties;
this.writeApi = writeApi;
}
@Override
public void createSchema() {
// InfluxDB 是 schema-on-write无需预建表
log.info("[createSchema][CTSDB/InfluxDB schema-on-write跳过建表]");
}
@Override
public boolean schemaExists() {
try {
String flux = String.format(
"import \"influxdata/influxdb/schema\"\n" +
"schema.measurements(bucket: \"%s\")",
escapeStringLiteral(properties.getBucket()));
QueryApi queryApi = influxDBClient.getQueryApi();
List<FluxTable> tables = queryApi.query(flux);
if (CollUtil.isEmpty(tables)) {
return false;
}
return tables.get(0).getRecords().stream()
.anyMatch(r -> MEASUREMENT.equals(r.getValueByKey("_value")));
} catch (Exception e) {
log.warn("[schemaExists][查询 measurement 失败,视为不存在]", e);
return false;
}
}
@Override
public void insert(IotDeviceMessageDO message) {
Point point = Point.measurement(MEASUREMENT)
.addTag("device_id", String.valueOf(message.getDeviceId()))
.time(message.getTs() != null ? message.getTs() : System.currentTimeMillis(),
WritePrecision.MS);
// 添加 fields
if (message.getId() != null) {
point.addField("id", message.getId());
}
if (message.getReportTime() != null) {
point.addField("report_time", message.getReportTime());
}
if (message.getTenantId() != null) {
point.addField("tenant_id", message.getTenantId());
}
if (message.getServerId() != null) {
point.addField("server_id", message.getServerId());
}
if (message.getUpstream() != null) {
point.addField("upstream", message.getUpstream());
}
if (message.getReply() != null) {
point.addField("reply", message.getReply());
}
if (message.getIdentifier() != null) {
point.addField("identifier", message.getIdentifier());
}
if (message.getRequestId() != null) {
point.addField("request_id", message.getRequestId());
}
if (message.getMethod() != null) {
point.addField("method", message.getMethod());
}
if (message.getParams() != null) {
point.addField("params", String.valueOf(message.getParams()));
}
if (message.getData() != null) {
point.addField("data", String.valueOf(message.getData()));
}
if (message.getCode() != null) {
point.addField("code", message.getCode());
}
if (message.getMsg() != null) {
point.addField("msg", message.getMsg());
}
// 优先使用异步批量写入 API回退到同步写入
if (writeApi != null) {
writeApi.writePoint(point);
} else {
influxDBClient.getWriteApiBlocking().writePoint(point);
}
}
@Override
public IPage<IotDeviceMessageDO> selectPage(IPage<IotDeviceMessageDO> page, IotDeviceMessagePageReqVO reqVO) {
StringBuilder flux = new StringBuilder();
flux.append(String.format("from(bucket: \"%s\")\n", escapeStringLiteral(properties.getBucket())));
// 修复全表扫描:优先使用调用方传入的时间范围,否则默认最近 30 天
appendTimeRange(flux, reqVO.getTimes());
flux.append(String.format(" |> filter(fn: (r) => r._measurement == \"%s\")\n", MEASUREMENT));
flux.append(String.format(" |> filter(fn: (r) => r.device_id == \"%s\")\n",
Long.toString(reqVO.getDeviceId())));
// pivot 使每行包含所有 field
flux.append(" |> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")\n");
// 多租户过滤:确保查询结果只返回当前租户的数据
appendTenantFilter(flux);
// 可选过滤条件(使用转义防止注入)
if (StrUtil.isNotEmpty(reqVO.getMethod())) {
flux.append(String.format(" |> filter(fn: (r) => r.method == \"%s\")\n",
escapeStringLiteral(reqVO.getMethod())));
}
if (reqVO.getUpstream() != null) {
flux.append(String.format(" |> filter(fn: (r) => r.upstream == %s)\n", reqVO.getUpstream()));
}
if (reqVO.getReply() != null) {
flux.append(String.format(" |> filter(fn: (r) => r.reply == %s)\n", reqVO.getReply()));
}
if (StrUtil.isNotEmpty(reqVO.getIdentifier())) {
flux.append(String.format(" |> filter(fn: (r) => r.identifier == \"%s\")\n",
escapeStringLiteral(reqVO.getIdentifier())));
}
// 排序和分页
flux.append(" |> sort(columns: [\"_time\"], desc: true)\n");
long offset = (page.getCurrent() - 1) * page.getSize();
flux.append(String.format(" |> limit(n: %d, offset: %d)\n", page.getSize(), offset));
try {
QueryApi queryApi = influxDBClient.getQueryApi();
List<FluxTable> tables = queryApi.query(flux.toString());
List<IotDeviceMessageDO> records = new ArrayList<>();
for (FluxTable table : tables) {
for (FluxRecord record : table.getRecords()) {
records.add(mapToDeviceMessageDO(record));
}
}
page.setRecords(records);
page.setTotal(countMessages(reqVO));
} catch (Exception e) {
log.error("[selectPage][查询设备消息分页失败]", e);
page.setRecords(Collections.emptyList());
page.setTotal(0);
}
return page;
}
private long countMessages(IotDeviceMessagePageReqVO reqVO) {
StringBuilder flux = new StringBuilder();
flux.append(String.format("from(bucket: \"%s\")\n", escapeStringLiteral(properties.getBucket())));
// count 查询也使用时间范围,避免全表扫描
appendTimeRange(flux, reqVO.getTimes());
flux.append(String.format(" |> filter(fn: (r) => r._measurement == \"%s\")\n", MEASUREMENT));
flux.append(String.format(" |> filter(fn: (r) => r.device_id == \"%s\")\n",
Long.toString(reqVO.getDeviceId())));
flux.append(" |> filter(fn: (r) => r._field == \"id\")\n");
flux.append(" |> count()\n");
try {
QueryApi queryApi = influxDBClient.getQueryApi();
List<FluxTable> tables = queryApi.query(flux.toString());
if (CollUtil.isNotEmpty(tables) && CollUtil.isNotEmpty(tables.get(0).getRecords())) {
Object value = tables.get(0).getRecords().get(0).getValue();
if (value instanceof Number) {
return ((Number) value).longValue();
}
}
} catch (Exception e) {
log.warn("[countMessages][统计消息数量失败]", e);
}
return 0;
}
@Override
public Long selectCountByCreateTime(Long createTime) {
StringBuilder flux = new StringBuilder();
flux.append(String.format("from(bucket: \"%s\")\n", escapeStringLiteral(properties.getBucket())));
if (createTime != null) {
flux.append(String.format(" |> range(start: %s)\n",
Instant.ofEpochMilli(createTime).toString()));
} else {
flux.append(" |> range(start: 0)\n");
}
flux.append(String.format(" |> filter(fn: (r) => r._measurement == \"%s\")\n", MEASUREMENT));
flux.append(" |> filter(fn: (r) => r._field == \"id\")\n");
flux.append(" |> count()\n");
try {
QueryApi queryApi = influxDBClient.getQueryApi();
List<FluxTable> tables = queryApi.query(flux.toString());
if (CollUtil.isNotEmpty(tables) && CollUtil.isNotEmpty(tables.get(0).getRecords())) {
Object value = tables.get(0).getRecords().get(0).getValue();
if (value instanceof Number) {
return ((Number) value).longValue();
}
}
} catch (Exception e) {
log.error("[selectCountByCreateTime][统计消息数量失败]", e);
}
return 0L;
}
@Override
public List<IotDeviceMessageDO> selectListByRequestIdsAndReply(Long deviceId, Collection<String> requestIds,
Boolean reply) {
if (CollUtil.isEmpty(requestIds)) {
return Collections.emptyList();
}
// 构建 request_id OR 过滤表达式(对每个 requestId 转义防止注入)
List<String> idList = requestIds.stream().map(String::valueOf).toList();
StringBuilder filterExpr = new StringBuilder();
for (int i = 0; i < idList.size(); i++) {
if (i > 0) {
filterExpr.append(" or ");
}
filterExpr.append(String.format("r.request_id == \"%s\"", escapeStringLiteral(idList.get(i))));
}
StringBuilder flux = new StringBuilder();
flux.append(String.format("from(bucket: \"%s\")\n", escapeStringLiteral(properties.getBucket())));
flux.append(" |> range(start: -30d)\n");
flux.append(String.format(" |> filter(fn: (r) => r._measurement == \"%s\")\n", MEASUREMENT));
flux.append(String.format(" |> filter(fn: (r) => r.device_id == \"%s\")\n",
Long.toString(deviceId)));
flux.append(" |> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")\n");
flux.append(String.format(" |> filter(fn: (r) => %s)\n", filterExpr));
if (reply != null) {
flux.append(String.format(" |> filter(fn: (r) => r.reply == %s)\n", reply));
}
try {
QueryApi queryApi = influxDBClient.getQueryApi();
List<FluxTable> tables = queryApi.query(flux.toString());
List<IotDeviceMessageDO> result = new ArrayList<>();
for (FluxTable table : tables) {
for (FluxRecord record : table.getRecords()) {
result.add(mapToDeviceMessageDO(record));
}
}
return result;
} catch (Exception e) {
log.error("[selectListByRequestIdsAndReply][查询消息失败]", e);
return Collections.emptyList();
}
}
@Override
public List<HourlyMessageCountDTO> selectDeviceMessageCountGroupByDate(Long startTime, Long endTime) {
List<HourlyMessageCountDTO> result = new ArrayList<>();
// 分别查询 upstream 和 downstream 的数量,按小时聚合
Map<Instant, Long> upstreamCounts = queryHourlyCount(startTime, endTime, true);
Map<Instant, Long> downstreamCounts = queryHourlyCount(startTime, endTime, false);
// 合并结果,统一转换为 LocalDateTime 消除 TDengine/CTSDB 返回类型差异
Set<Instant> allTimes = new TreeSet<>();
allTimes.addAll(upstreamCounts.keySet());
allTimes.addAll(downstreamCounts.keySet());
for (Instant time : allTimes) {
LocalDateTime localTime = LocalDateTime.ofInstant(time, ZoneId.systemDefault());
result.add(new HourlyMessageCountDTO(
localTime,
upstreamCounts.getOrDefault(time, 0L).intValue(),
downstreamCounts.getOrDefault(time, 0L).intValue()));
}
return result;
}
private Map<Instant, Long> queryHourlyCount(Long startTime, Long endTime, boolean upstream) {
Map<Instant, Long> counts = new LinkedHashMap<>();
StringBuilder flux = new StringBuilder();
flux.append(String.format("from(bucket: \"%s\")\n", escapeStringLiteral(properties.getBucket())));
flux.append(String.format(" |> range(start: %s, stop: %s)\n",
Instant.ofEpochMilli(startTime).toString(),
Instant.ofEpochMilli(endTime).toString()));
flux.append(String.format(" |> filter(fn: (r) => r._measurement == \"%s\")\n", MEASUREMENT));
flux.append(String.format(" |> filter(fn: (r) => r._field == \"upstream\" and r._value == %s)\n", upstream));
flux.append(" |> window(every: 1h)\n");
flux.append(" |> count()\n");
flux.append(" |> duplicate(column: \"_start\", as: \"_time\")\n");
flux.append(" |> window(every: inf)\n");
try {
QueryApi queryApi = influxDBClient.getQueryApi();
List<FluxTable> tables = queryApi.query(flux.toString());
for (FluxTable table : tables) {
for (FluxRecord record : table.getRecords()) {
Instant time = record.getTime();
Object value = record.getValue();
if (time != null && value instanceof Number) {
counts.put(time, ((Number) value).longValue());
}
}
}
} catch (Exception e) {
log.error("[queryHourlyCount][按小时统计消息数量失败, upstream={}]", upstream, e);
}
return counts;
}
// ========== 辅助方法 ==========
/**
* 追加时间范围过滤,优先使用传入的时间范围,否则默认最近 30 天
*/
private void appendTimeRange(StringBuilder flux, java.time.LocalDateTime[] times) {
if (times != null && times.length >= 2 && times[0] != null && times[1] != null) {
long start = LocalDateTimeUtil.toEpochMilli(times[0]);
long stop = LocalDateTimeUtil.toEpochMilli(times[1]);
flux.append(String.format(" |> range(start: %s, stop: %s)\n",
Instant.ofEpochMilli(start).toString(),
Instant.ofEpochMilli(stop).toString()));
} else {
flux.append(" |> range(start: -30d)\n");
}
}
/**
* 追加多租户过滤条件
*/
private void appendTenantFilter(StringBuilder flux) {
Long tenantId = TenantContextHolder.getTenantId();
if (tenantId != null) {
flux.append(String.format(" |> filter(fn: (r) => r.tenant_id == %d)\n", tenantId));
}
}
// ========== 结果映射 ==========
private IotDeviceMessageDO mapToDeviceMessageDO(FluxRecord record) {
IotDeviceMessageDO message = new IotDeviceMessageDO();
message.setId(getStringValue(record, "id"));
message.setTs(record.getTime() != null ? record.getTime().toEpochMilli() : null);
message.setReportTime(getLongValue(record, "report_time"));
message.setDeviceId(getLongValue(record, "device_id"));
message.setTenantId(getLongValue(record, "tenant_id"));
message.setServerId(getStringValue(record, "server_id"));
message.setUpstream(getBooleanValue(record, "upstream"));
message.setReply(getBooleanValue(record, "reply"));
message.setIdentifier(getStringValue(record, "identifier"));
message.setRequestId(getStringValue(record, "request_id"));
message.setMethod(getStringValue(record, "method"));
message.setParams(getStringValue(record, "params"));
message.setData(getStringValue(record, "data"));
message.setCode(getIntValue(record, "code"));
message.setMsg(getStringValue(record, "msg"));
return message;
}
private String getStringValue(FluxRecord record, String key) {
Object value = record.getValueByKey(key);
return value != null ? String.valueOf(value) : null;
}
private Long getLongValue(FluxRecord record, String key) {
Object value = record.getValueByKey(key);
if (value instanceof Number) {
return ((Number) value).longValue();
}
if (value instanceof String str) {
try {
return Long.parseLong(str);
} catch (NumberFormatException ignored) {}
}
return null;
}
private Integer getIntValue(FluxRecord record, String key) {
Long value = getLongValue(record, key);
return value != null ? value.intValue() : null;
}
private Boolean getBooleanValue(FluxRecord record, String key) {
Object value = record.getValueByKey(key);
if (value instanceof Boolean) {
return (Boolean) value;
}
if (value instanceof String str) {
return Boolean.parseBoolean(str);
}
return null;
}
}

View File

@@ -0,0 +1,208 @@
package com.viewsh.module.iot.framework.tsdb.ctsdb;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.StrUtil;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.QueryApi;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import com.viewsh.module.iot.controller.admin.device.vo.property.IotDevicePropertyHistoryListReqVO;
import com.viewsh.module.iot.controller.admin.device.vo.property.IotDevicePropertyRespVO;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
import com.viewsh.module.iot.framework.tsdb.IotTsDbDevicePropertyDao;
import com.viewsh.module.iot.framework.tsdb.TsDbTableField;
import lombok.extern.slf4j.Slf4j;
import java.time.Instant;
import java.util.*;
import static com.viewsh.module.iot.framework.tsdb.ctsdb.FluxQuerySanitizer.escapeStringLiteral;
/**
* CTSDB (InfluxDB) 实现的设备属性时序 DAO
*
* 变更说明:
* - Flux 查询字符串拼接全部使用 FluxQuerySanitizer 转义,防止注入
* - insert() 改用异步批量 WriteApi
* - selectHistory() 新增 productId 参数,精确匹配 measurement 替代正则全扫描
*/
@Slf4j
public class CtsdbDevicePropertyDaoImpl implements IotTsDbDevicePropertyDao {
private static final String MEASUREMENT_PREFIX = "product_property_";
private final InfluxDBClient influxDBClient;
private final CtsdbProperties properties;
private final WriteApi writeApi;
/**
* 向下兼容构造函数(不使用异步写入)
*/
public CtsdbDevicePropertyDaoImpl(InfluxDBClient influxDBClient, CtsdbProperties properties) {
this(influxDBClient, properties, null);
}
public CtsdbDevicePropertyDaoImpl(InfluxDBClient influxDBClient, CtsdbProperties properties, WriteApi writeApi) {
this.influxDBClient = influxDBClient;
this.properties = properties;
this.writeApi = writeApi;
}
@Override
public List<TsDbTableField> getTableFields(Long productId) {
String measurement = MEASUREMENT_PREFIX + productId;
// 使用 Flux 查询 field keysbucket 和 measurement 名转义)
String flux = String.format(
"import \"influxdata/influxdb/schema\"\n" +
"schema.fieldKeys(bucket: \"%s\", measurement: \"%s\")",
escapeStringLiteral(this.properties.getBucket()),
escapeStringLiteral(measurement));
List<TsDbTableField> fields = new ArrayList<>();
try {
QueryApi queryApi = influxDBClient.getQueryApi();
List<FluxTable> tables = queryApi.query(flux);
for (FluxTable table : tables) {
for (FluxRecord record : table.getRecords()) {
String fieldName = (String) record.getValueByKey("_value");
if (fieldName != null && !"report_time".equals(fieldName)) {
TsDbTableField field = new TsDbTableField();
field.setField(fieldName);
field.setType(TsDbTableField.TYPE_STRING);
field.setTag(false);
fields.add(field);
}
}
}
// 添加 device_id tag
TsDbTableField deviceIdTag = new TsDbTableField();
deviceIdTag.setField("device_id");
deviceIdTag.setType(TsDbTableField.TYPE_INT);
deviceIdTag.setTag(true);
fields.add(deviceIdTag);
} catch (Exception e) {
if (e.getMessage() != null && e.getMessage().contains("not found")) {
log.debug("[getTableFields][measurement {} 不存在]", measurement);
return Collections.emptyList();
}
throw new RuntimeException("Table does not exist", e);
}
return fields;
}
@Override
public void createPropertyTable(Long productId, List<TsDbTableField> fields) {
// InfluxDB 是 schema-on-write无需预建表
log.info("[createPropertyTable][CTSDB/InfluxDB schema-on-write产品({}) 跳过建表]", productId);
}
@Override
public void alterPropertyTable(Long productId, List<TsDbTableField> oldFields, List<TsDbTableField> newFields) {
// InfluxDB 是 schema-on-write无需 ALTER TABLE
log.info("[alterPropertyTable][CTSDB/InfluxDB schema-on-write产品({}) 跳过表结构变更]", productId);
}
@Override
public void insert(IotDeviceDO device, Map<String, Object> properties, Long reportTime) {
String measurement = MEASUREMENT_PREFIX + device.getProductId();
Point point = Point.measurement(measurement)
.addTag("device_id", String.valueOf(device.getId()))
.time(System.currentTimeMillis(), WritePrecision.MS);
// report_time
if (reportTime != null) {
point.addField("report_time", reportTime);
}
// 动态属性
properties.forEach((key, value) -> {
String fieldName = StrUtil.toUnderlineCase(key);
if (value instanceof Integer intVal) {
point.addField(fieldName, intVal);
} else if (value instanceof Long longVal) {
point.addField(fieldName, longVal);
} else if (value instanceof Float floatVal) {
point.addField(fieldName, floatVal);
} else if (value instanceof Double doubleVal) {
point.addField(fieldName, doubleVal);
} else if (value instanceof Boolean boolVal) {
point.addField(fieldName, boolVal);
} else if (value != null) {
point.addField(fieldName, String.valueOf(value));
}
});
// 优先使用异步批量写入 API回退到同步写入
if (writeApi != null) {
writeApi.writePoint(point);
} else {
influxDBClient.getWriteApiBlocking().writePoint(point);
}
}
@Override
public List<IotDevicePropertyRespVO> selectHistory(IotDevicePropertyHistoryListReqVO reqVO, Long productId) {
String fieldName = StrUtil.toUnderlineCase(reqVO.getIdentifier());
StringBuilder flux = new StringBuilder();
flux.append(String.format("from(bucket: \"%s\")\n", escapeStringLiteral(this.properties.getBucket())));
// 时间范围
if (reqVO.getTimes() != null && reqVO.getTimes().length >= 2
&& reqVO.getTimes()[0] != null && reqVO.getTimes()[1] != null) {
long start = LocalDateTimeUtil.toEpochMilli(reqVO.getTimes()[0]);
long stop = LocalDateTimeUtil.toEpochMilli(reqVO.getTimes()[1]);
flux.append(String.format(" |> range(start: %s, stop: %s)\n",
Instant.ofEpochMilli(start).toString(),
Instant.ofEpochMilli(stop).toString()));
} else {
flux.append(" |> range(start: -30d)\n");
}
// 精确匹配 measurement通过 productId替代原来的正则全扫描
if (productId != null) {
String measurement = MEASUREMENT_PREFIX + productId;
flux.append(String.format(" |> filter(fn: (r) => r._measurement == \"%s\")\n",
escapeStringLiteral(measurement)));
} else {
// 兜底:如果 productId 未传入,使用前缀正则匹配(性能较差)
log.warn("[selectHistory][productId 为空,将使用正则扫描所有 measurement性能可能较差]");
flux.append(String.format(" |> filter(fn: (r) => r._measurement =~ /^%s/)\n",
FluxQuerySanitizer.escapeRegexLiteral(MEASUREMENT_PREFIX)));
}
flux.append(String.format(" |> filter(fn: (r) => r.device_id == \"%s\")\n",
Long.toString(reqVO.getDeviceId())));
flux.append(String.format(" |> filter(fn: (r) => r._field == \"%s\")\n",
escapeStringLiteral(fieldName)));
flux.append(" |> sort(columns: [\"_time\"], desc: true)\n");
List<IotDevicePropertyRespVO> result = new ArrayList<>();
try {
QueryApi queryApi = influxDBClient.getQueryApi();
List<FluxTable> tables = queryApi.query(flux.toString());
for (FluxTable table : tables) {
for (FluxRecord record : table.getRecords()) {
IotDevicePropertyRespVO vo = new IotDevicePropertyRespVO();
vo.setIdentifier(reqVO.getIdentifier());
vo.setValue(record.getValue());
if (record.getTime() != null) {
vo.setUpdateTime(record.getTime().toEpochMilli());
}
result.add(vo);
}
}
} catch (Exception e) {
if (e.getMessage() != null && e.getMessage().contains("not found")) {
log.debug("[selectHistory][measurement 不存在]");
return Collections.emptyList();
}
throw new RuntimeException("Table does not exist", e);
}
return result;
}
}

View File

@@ -0,0 +1,35 @@
package com.viewsh.module.iot.framework.tsdb.ctsdb;
import lombok.Data;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* CTSDB (InfluxDB) 配置属性
*/
@Data
@ConfigurationProperties(prefix = "viewsh.iot.tsdb.ctsdb")
public class CtsdbProperties {
/**
* InfluxDB HTTP API 地址
*/
private String url = "http://localhost:8086";
/**
* 认证 Token排除在 toString 输出之外,避免日志泄漏)
*/
@ToString.Exclude
private String token = "";
/**
* 组织名称
*/
private String org = "aiot";
/**
* 存储桶名称
*/
private String bucket = "aiot_platform";
}

View File

@@ -0,0 +1,57 @@
package com.viewsh.module.iot.framework.tsdb.ctsdb;
/**
* Flux 查询安全工具类
*
* 对拼接到 Flux 查询中的字符串值进行转义,防止 Flux 注入攻击。
* InfluxDB Flux 查询语言中,字符串字面量使用双引号包裹,
* 需要转义的特殊字符包括: \, ", $, {, }
*/
public final class FluxQuerySanitizer {
private FluxQuerySanitizer() {
}
/**
* 转义 Flux 字符串字面量中的特殊字符
*
* @param value 原始字符串
* @return 转义后的安全字符串,可直接放入 Flux 双引号内
*/
public static String escapeStringLiteral(String value) {
if (value == null) {
return "";
}
StringBuilder sb = new StringBuilder(value.length() + 8);
for (int i = 0; i < value.length(); i++) {
char c = value.charAt(i);
switch (c) {
case '\\' -> sb.append("\\\\");
case '"' -> sb.append("\\\"");
case '$' -> sb.append("\\$");
case '{' -> sb.append("\\{");
case '}' -> sb.append("\\}");
case '\n' -> sb.append("\\n");
case '\r' -> sb.append("\\r");
case '\t' -> sb.append("\\t");
default -> sb.append(c);
}
}
return sb.toString();
}
/**
* 转义 Flux 正则表达式中的特殊字符
*
* @param value 原始字符串
* @return 转义后的安全字符串,可直接放入 Flux 正则中
*/
public static String escapeRegexLiteral(String value) {
if (value == null) {
return "";
}
// 正则特殊字符: . * + ? ^ $ { } [ ] ( ) | \
return value.replaceAll("([.\\\\*+?^${}\\[\\]()|])", "\\\\$1");
}
}

View File

@@ -0,0 +1,78 @@
package com.viewsh.module.iot.framework.tsdb.tdengine;
import cn.hutool.core.util.StrUtil;
import com.viewsh.module.iot.controller.admin.device.vo.message.IotDeviceMessagePageReqVO;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceMessageDO;
import com.viewsh.module.iot.dal.tdengine.IotDeviceMessageMapper;
import com.viewsh.module.iot.framework.tsdb.HourlyMessageCountDTO;
import com.viewsh.module.iot.framework.tsdb.IotTsDbDeviceMessageDao;
import com.baomidou.mybatisplus.core.metadata.IPage;
import lombok.RequiredArgsConstructor;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import static com.viewsh.framework.common.util.collection.CollectionUtils.convertList;
/**
* TDengine 实现的设备消息时序 DAO
*
* 委托给现有的 {@link IotDeviceMessageMapper}
*/
@RequiredArgsConstructor
public class TDengineDeviceMessageDaoImpl implements IotTsDbDeviceMessageDao {
private final IotDeviceMessageMapper deviceMessageMapper;
@Override
public void createSchema() {
deviceMessageMapper.createSTable();
}
@Override
public boolean schemaExists() {
return StrUtil.isNotEmpty(deviceMessageMapper.showSTable());
}
@Override
public void insert(IotDeviceMessageDO message) {
deviceMessageMapper.insert(message);
}
@Override
public IPage<IotDeviceMessageDO> selectPage(IPage<IotDeviceMessageDO> page, IotDeviceMessagePageReqVO reqVO) {
return deviceMessageMapper.selectPage(page, reqVO);
}
@Override
public Long selectCountByCreateTime(Long createTime) {
return deviceMessageMapper.selectCountByCreateTime(createTime);
}
@Override
public List<IotDeviceMessageDO> selectListByRequestIdsAndReply(Long deviceId, Collection<String> requestIds,
Boolean reply) {
return deviceMessageMapper.selectListByRequestIdsAndReply(deviceId, requestIds, reply);
}
@Override
public List<HourlyMessageCountDTO> selectDeviceMessageCountGroupByDate(Long startTime, Long endTime) {
// TDengine Mapper 返回 Map<String, Object>,其中 time 为 java.sql.Timestamp
// 在此统一转换为 HourlyMessageCountDTO消除上层对 Timestamp 的强依赖
List<Map<String, Object>> rawList = deviceMessageMapper.selectDeviceMessageCountGroupByDate(startTime, endTime);
return convertList(rawList, row -> {
LocalDateTime time = null;
Object timeObj = row.get("time");
if (timeObj instanceof Timestamp ts) {
time = ts.toLocalDateTime();
}
Integer upstreamCount = row.get("upstream_count") instanceof Number n ? n.intValue() : 0;
Integer downstreamCount = row.get("downstream_count") instanceof Number n ? n.intValue() : 0;
return new HourlyMessageCountDTO(time, upstreamCount, downstreamCount);
});
}
}

View File

@@ -0,0 +1,109 @@
package com.viewsh.module.iot.framework.tsdb.tdengine;
import com.viewsh.module.iot.controller.admin.device.vo.property.IotDevicePropertyHistoryListReqVO;
import com.viewsh.module.iot.controller.admin.device.vo.property.IotDevicePropertyRespVO;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
import com.viewsh.module.iot.dal.tdengine.IotDevicePropertyMapper;
import com.viewsh.module.iot.framework.tdengine.core.TDengineTableField;
import com.viewsh.module.iot.framework.tsdb.IotTsDbDevicePropertyDao;
import com.viewsh.module.iot.framework.tsdb.TsDbTableField;
import lombok.RequiredArgsConstructor;
import java.util.List;
import java.util.Map;
import static com.viewsh.framework.common.util.collection.CollectionUtils.convertList;
/**
* TDengine 实现的设备属性时序 DAO
*
* 委托给现有的 {@link IotDevicePropertyMapper}
*/
@RequiredArgsConstructor
public class TDengineDevicePropertyDaoImpl implements IotTsDbDevicePropertyDao {
private final IotDevicePropertyMapper devicePropertyMapper;
@Override
public List<TsDbTableField> getTableFields(Long productId) {
List<TDengineTableField> tdFields = devicePropertyMapper.getProductPropertySTableFieldList(productId);
return convertList(tdFields, TDengineDevicePropertyDaoImpl::toTsDbField);
}
@Override
public void createPropertyTable(Long productId, List<TsDbTableField> fields) {
devicePropertyMapper.createProductPropertySTable(productId, convertList(fields,
TDengineDevicePropertyDaoImpl::toTDengineField));
}
@Override
public void alterPropertyTable(Long productId, List<TsDbTableField> oldFields, List<TsDbTableField> newFields) {
devicePropertyMapper.alterProductPropertySTable(productId,
convertList(oldFields, TDengineDevicePropertyDaoImpl::toTDengineField),
convertList(newFields, TDengineDevicePropertyDaoImpl::toTDengineField));
}
@Override
public void insert(IotDeviceDO device, Map<String, Object> properties, Long reportTime) {
devicePropertyMapper.insert(device, properties, reportTime);
}
@Override
public List<IotDevicePropertyRespVO> selectHistory(IotDevicePropertyHistoryListReqVO reqVO, Long productId) {
// TDengine 通过子表名 device_property_{deviceId} 定位数据,不需要 productId
return devicePropertyMapper.selectListByHistory(reqVO);
}
// ========== TsDbTableField <-> TDengineTableField 转换 ==========
static TDengineTableField toTDengineField(TsDbTableField field) {
TDengineTableField tdField = new TDengineTableField();
tdField.setField(field.getField());
tdField.setType(toTDengineType(field.getType()));
tdField.setLength(field.getLength());
tdField.setNote(field.isTag() ? TDengineTableField.NOTE_TAG : null);
return tdField;
}
static TsDbTableField toTsDbField(TDengineTableField tdField) {
TsDbTableField field = new TsDbTableField();
field.setField(tdField.getField());
field.setType(fromTDengineType(tdField.getType()));
field.setLength(tdField.getLength());
field.setTag(TDengineTableField.NOTE_TAG.equals(tdField.getNote()));
return field;
}
private static String toTDengineType(String tsDbType) {
if (tsDbType == null) {
return null;
}
return switch (tsDbType) {
case TsDbTableField.TYPE_TINYINT -> TDengineTableField.TYPE_TINYINT;
case TsDbTableField.TYPE_INT -> TDengineTableField.TYPE_INT;
case TsDbTableField.TYPE_FLOAT -> TDengineTableField.TYPE_FLOAT;
case TsDbTableField.TYPE_DOUBLE -> TDengineTableField.TYPE_DOUBLE;
case TsDbTableField.TYPE_BOOL -> TDengineTableField.TYPE_BOOL;
case TsDbTableField.TYPE_STRING -> TDengineTableField.TYPE_VARCHAR;
case TsDbTableField.TYPE_TIMESTAMP -> TDengineTableField.TYPE_TIMESTAMP;
default -> tsDbType;
};
}
private static String fromTDengineType(String tdType) {
if (tdType == null) {
return null;
}
return switch (tdType.toUpperCase()) {
case "TINYINT" -> TsDbTableField.TYPE_TINYINT;
case "INT" -> TsDbTableField.TYPE_INT;
case "FLOAT" -> TsDbTableField.TYPE_FLOAT;
case "DOUBLE" -> TsDbTableField.TYPE_DOUBLE;
case "BOOL" -> TsDbTableField.TYPE_BOOL;
case "VARCHAR", "NCHAR" -> TsDbTableField.TYPE_STRING;
case "TIMESTAMP" -> TsDbTableField.TYPE_TIMESTAMP;
default -> tdType;
};
}
}

View File

@@ -2,7 +2,6 @@ package com.viewsh.module.iot.service.device.message;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.viewsh.framework.common.exception.ServiceException;
@@ -20,7 +19,7 @@ import com.viewsh.module.iot.core.mq.producer.IotDeviceMessageProducer;
import com.viewsh.module.iot.core.util.IotDeviceMessageUtils;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceMessageDO;
import com.viewsh.module.iot.dal.tdengine.IotDeviceMessageMapper;
import com.viewsh.module.iot.framework.tsdb.IotTsDbDeviceMessageDao;
import com.viewsh.module.iot.service.device.IotDeviceService;
import com.viewsh.module.iot.service.device.property.IotDevicePropertyService;
import com.viewsh.module.iot.service.ota.IotOtaTaskRecordService;
@@ -34,7 +33,6 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
@@ -62,20 +60,20 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
private IotOtaTaskRecordService otaTaskRecordService;
@Resource
private IotDeviceMessageMapper deviceMessageMapper;
private IotTsDbDeviceMessageDao tsDbDeviceMessageDao;
@Resource
private IotDeviceMessageProducer deviceMessageProducer;
@Override
public void defineDeviceMessageStable() {
if (StrUtil.isNotEmpty(deviceMessageMapper.showSTable())) {
log.info("[defineDeviceMessageStable][设备消息超级表已存在,创建跳过]");
if (tsDbDeviceMessageDao.schemaExists()) {
log.info("[defineDeviceMessageStable][设备消息表已存在,创建跳过]");
return;
}
log.info("[defineDeviceMessageStable][设备消息超级表不存在,创建开始...]");
deviceMessageMapper.createSTable();
log.info("[defineDeviceMessageStable][设备消息超级表不存在,创建成功]");
log.info("[defineDeviceMessageStable][设备消息表不存在,创建开始...]");
tsDbDeviceMessageDao.createSchema();
log.info("[defineDeviceMessageStable][设备消息表不存在,创建成功]");
}
@Async
@@ -90,7 +88,7 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
if (messageDO.getData() != null) {
messageDO.setData(JsonUtils.toJsonString(messageDO.getData()));
}
deviceMessageMapper.insert(messageDO);
tsDbDeviceMessageDao.insert(messageDO);
}
@Override
@@ -224,7 +222,7 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
@Override
public PageResult<IotDeviceMessageDO> getDeviceMessagePage(IotDeviceMessagePageReqVO pageReqVO) {
try {
IPage<IotDeviceMessageDO> page = deviceMessageMapper.selectPage(
IPage<IotDeviceMessageDO> page = tsDbDeviceMessageDao.selectPage(
new Page<>(pageReqVO.getPageNo(), pageReqVO.getPageSize()), pageReqVO);
return new PageResult<>(page.getRecords(), page.getTotal());
} catch (Exception exception) {
@@ -239,33 +237,36 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
public List<IotDeviceMessageDO> getDeviceMessageListByRequestIdsAndReply(Long deviceId,
List<String> requestIds,
Boolean reply) {
return deviceMessageMapper.selectListByRequestIdsAndReply(deviceId, requestIds, reply);
return tsDbDeviceMessageDao.selectListByRequestIdsAndReply(deviceId, requestIds, reply);
}
@Override
public Long getDeviceMessageCount(LocalDateTime createTime) {
return deviceMessageMapper.selectCountByCreateTime(
return tsDbDeviceMessageDao.selectCountByCreateTime(
createTime != null ? LocalDateTimeUtil.toEpochMilli(createTime) : null);
}
@Override
public List<IotStatisticsDeviceMessageSummaryByDateRespVO> getDeviceMessageSummaryByDate(
IotStatisticsDeviceMessageReqVO reqVO) {
// 1. 按小时统计,获取分项统计数据
List<Map<String, Object>> countList = deviceMessageMapper.selectDeviceMessageCountGroupByDate(
LocalDateTimeUtil.toEpochMilli(reqVO.getTimes()[0]),
LocalDateTimeUtil.toEpochMilli(reqVO.getTimes()[1]));
// 1. 按小时统计,获取分项统计数据(返回类型已统一为 HourlyMessageCountDTO
List<com.viewsh.module.iot.framework.tsdb.HourlyMessageCountDTO> countList =
tsDbDeviceMessageDao.selectDeviceMessageCountGroupByDate(
LocalDateTimeUtil.toEpochMilli(reqVO.getTimes()[0]),
LocalDateTimeUtil.toEpochMilli(reqVO.getTimes()[1]));
// 2. 按照日期间隔,合并数据
List<LocalDateTime[]> timeRanges = LocalDateTimeUtils.getDateRangeList(reqVO.getTimes()[0], reqVO.getTimes()[1],
reqVO.getInterval());
return convertList(timeRanges, times -> {
Integer upstreamCount = countList.stream()
.filter(vo -> LocalDateTimeUtils.isBetween(times[0], times[1], (Timestamp) vo.get("time")))
.mapToInt(value -> MapUtil.getInt(value, "upstream_count")).sum();
.filter(dto -> dto.getTime() != null
&& LocalDateTimeUtil.isIn(dto.getTime(), times[0], times[1]))
.mapToInt(dto -> dto.getUpstreamCount() != null ? dto.getUpstreamCount() : 0).sum();
Integer downstreamCount = countList.stream()
.filter(vo -> LocalDateTimeUtils.isBetween(times[0], times[1], (Timestamp) vo.get("time")))
.mapToInt(value -> MapUtil.getInt(value, "downstream_count")).sum();
.filter(dto -> dto.getTime() != null
&& LocalDateTimeUtil.isIn(dto.getTime(), times[0], times[1]))
.mapToInt(dto -> dto.getDownstreamCount() != null ? dto.getDownstreamCount() : 0).sum();
return new IotStatisticsDeviceMessageSummaryByDateRespVO()
.setTime(LocalDateTimeUtils.formatDateRange(times[0], times[1], reqVO.getInterval()))
.setUpstreamCount(upstreamCount).setDownstreamCount(downstreamCount);

View File

@@ -20,10 +20,11 @@ import com.viewsh.module.iot.dal.dataobject.thingmodel.model.dataType.ThingModel
import com.viewsh.module.iot.dal.redis.device.DevicePropertyRedisDAO;
import com.viewsh.module.iot.dal.redis.device.DeviceReportTimeRedisDAO;
import com.viewsh.module.iot.dal.redis.device.DeviceServerIdRedisDAO;
import com.viewsh.module.iot.dal.tdengine.IotDevicePropertyMapper;
import com.viewsh.module.iot.enums.thingmodel.IotDataSpecsDataTypeEnum;
import com.viewsh.module.iot.enums.thingmodel.IotThingModelTypeEnum;
import com.viewsh.module.iot.framework.tdengine.core.TDengineTableField;
import com.viewsh.module.iot.framework.tsdb.IotTsDbDevicePropertyDao;
import com.viewsh.module.iot.framework.tsdb.TsDbTableField;
import com.viewsh.module.iot.service.device.IotDeviceService;
import com.viewsh.module.iot.service.product.IotProductService;
import com.viewsh.module.iot.service.thingmodel.IotThingModelService;
import jakarta.annotation.Resource;
@@ -47,20 +48,18 @@ import static com.viewsh.framework.common.util.collection.CollectionUtils.*;
public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
/**
* 物模型的数据类型,与 TDengine 数据类型的映射关系
*
* @see <a href="https://docs.taosdata.com/reference/taos-sql/data-type/">TDEngine 数据类型</a>
* 物模型的数据类型,与时序数据库通用类型的映射关系
*/
private static final Map<String, String> TYPE_MAPPING = MapUtil.<String, String>builder()
.put(IotDataSpecsDataTypeEnum.INT.getDataType(), TDengineTableField.TYPE_INT)
.put(IotDataSpecsDataTypeEnum.FLOAT.getDataType(), TDengineTableField.TYPE_FLOAT)
.put(IotDataSpecsDataTypeEnum.DOUBLE.getDataType(), TDengineTableField.TYPE_DOUBLE)
.put(IotDataSpecsDataTypeEnum.ENUM.getDataType(), TDengineTableField.TYPE_TINYINT)
.put(IotDataSpecsDataTypeEnum.BOOL.getDataType(), TDengineTableField.TYPE_TINYINT)
.put(IotDataSpecsDataTypeEnum.TEXT.getDataType(), TDengineTableField.TYPE_VARCHAR)
.put(IotDataSpecsDataTypeEnum.DATE.getDataType(), TDengineTableField.TYPE_TIMESTAMP)
.put(IotDataSpecsDataTypeEnum.STRUCT.getDataType(), TDengineTableField.TYPE_VARCHAR)
.put(IotDataSpecsDataTypeEnum.ARRAY.getDataType(), TDengineTableField.TYPE_VARCHAR)
.put(IotDataSpecsDataTypeEnum.INT.getDataType(), TsDbTableField.TYPE_INT)
.put(IotDataSpecsDataTypeEnum.FLOAT.getDataType(), TsDbTableField.TYPE_FLOAT)
.put(IotDataSpecsDataTypeEnum.DOUBLE.getDataType(), TsDbTableField.TYPE_DOUBLE)
.put(IotDataSpecsDataTypeEnum.ENUM.getDataType(), TsDbTableField.TYPE_TINYINT)
.put(IotDataSpecsDataTypeEnum.BOOL.getDataType(), TsDbTableField.TYPE_TINYINT)
.put(IotDataSpecsDataTypeEnum.TEXT.getDataType(), TsDbTableField.TYPE_STRING)
.put(IotDataSpecsDataTypeEnum.DATE.getDataType(), TsDbTableField.TYPE_TIMESTAMP)
.put(IotDataSpecsDataTypeEnum.STRUCT.getDataType(), TsDbTableField.TYPE_STRING)
.put(IotDataSpecsDataTypeEnum.ARRAY.getDataType(), TsDbTableField.TYPE_STRING)
.build();
@Resource
@@ -68,6 +67,9 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
@Resource
@Lazy // 延迟加载,解决循环依赖
private IotProductService productService;
@Resource
@Lazy
private IotDeviceService deviceService;
@Resource
private DevicePropertyRedisDAO deviceDataRedisDAO;
@@ -77,7 +79,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
private DeviceServerIdRedisDAO deviceServerIdRedisDAO;
@Resource
private IotDevicePropertyMapper devicePropertyMapper;
private IotTsDbDevicePropertyDao tsDbDevicePropertyDao;
@Resource
private RedisMQTemplate redisMQTemplate;
@@ -97,9 +99,9 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
List<IotThingModelDO> thingModels = filterList(thingModelService.getThingModelListByProductId(productId),
thingModel -> IotThingModelTypeEnum.PROPERTY.getType().equals(thingModel.getType()));
// 1.2 解析 DB 里的字段
List<TDengineTableField> oldFields = new ArrayList<>();
List<TsDbTableField> oldFields = new ArrayList<>();
try {
oldFields.addAll(devicePropertyMapper.getProductPropertySTableFieldList(product.getId()));
oldFields.addAll(tsDbDevicePropertyDao.getTableFields(product.getId()));
} catch (Exception e) {
if (!e.getMessage().contains("Table does not exist")) {
throw e;
@@ -107,30 +109,30 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
}
// 2.1 情况一:如果是新增的时候,需要创建表
List<TDengineTableField> newFields = buildTableFieldList(thingModels);
List<TsDbTableField> newFields = buildTableFieldList(thingModels);
if (CollUtil.isEmpty(oldFields)) {
if (CollUtil.isEmpty(newFields)) {
log.info("[defineDevicePropertyData][productId({}) 没有需要定义的属性]", productId);
return;
}
devicePropertyMapper.createProductPropertySTable(product.getId(), newFields);
tsDbDevicePropertyDao.createPropertyTable(product.getId(), newFields);
return;
}
// 2.2 情况二:如果是修改的时候,需要更新表
devicePropertyMapper.alterProductPropertySTable(product.getId(), oldFields, newFields);
tsDbDevicePropertyDao.alterPropertyTable(product.getId(), oldFields, newFields);
}
private List<TDengineTableField> buildTableFieldList(List<IotThingModelDO> thingModels) {
private List<TsDbTableField> buildTableFieldList(List<IotThingModelDO> thingModels) {
return convertList(thingModels, thingModel -> {
TDengineTableField field = new TDengineTableField(
StrUtil.toUnderlineCase(thingModel.getIdentifier()), // TDengine 字段默认都是小写
TsDbTableField field = new TsDbTableField(
StrUtil.toUnderlineCase(thingModel.getIdentifier()),
TYPE_MAPPING.get(thingModel.getProperty().getDataType()));
String dataType = thingModel.getProperty().getDataType();
if (Objects.equals(dataType, IotDataSpecsDataTypeEnum.TEXT.getDataType())) {
field.setLength(((ThingModelDateOrTextDataSpecs) thingModel.getProperty().getDataSpecs()).getLength());
} else if (ObjectUtils.equalsAny(dataType, IotDataSpecsDataTypeEnum.STRUCT.getDataType(),
IotDataSpecsDataTypeEnum.ARRAY.getDataType())) {
field.setLength(TDengineTableField.LENGTH_VARCHAR);
field.setLength(TsDbTableField.LENGTH_DEFAULT_STRING);
}
return field;
});
@@ -167,7 +169,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
}
// 2.1 保存设备属性【数据】
devicePropertyMapper.insert(device, properties, LocalDateTimeUtil.toEpochMilli(message.getReportTime()));
tsDbDevicePropertyDao.insert(device, properties, LocalDateTimeUtil.toEpochMilli(message.getReportTime()));
// 2.2 保存设备属性【日志】
Map<String, IotDevicePropertyDO> properties2 = convertMap(properties.entrySet(), Map.Entry::getKey, entry ->
@@ -243,7 +245,10 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
@Override
public List<IotDevicePropertyRespVO> getHistoryDevicePropertyList(IotDevicePropertyHistoryListReqVO listReqVO) {
try {
return devicePropertyMapper.selectListByHistory(listReqVO);
// 通过 deviceId 查询 productId用于 CTSDB 精确匹配 measurement
IotDeviceDO device = deviceService.getDevice(listReqVO.getDeviceId());
Long productId = device != null ? device.getProductId() : null;
return tsDbDevicePropertyDao.selectHistory(listReqVO, productId);
} catch (Exception exception) {
if (exception.getMessage().contains("Table does not exist")) {
return Collections.emptyList();

View File

@@ -153,6 +153,13 @@ viewsh:
iot:
message-bus:
type: redis # 消息总线的类型
tsdb:
type: tdengine # 时序数据库类型: tdengine | ctsdb
ctsdb: # CTSDB (InfluxDB) 配置,仅 type=ctsdb 时生效
url: http://${CTSDB_HOST:localhost}:${CTSDB_PORT:8086}
token: ${CTSDB_TOKEN:}
org: ${CTSDB_ORG:aiot}
bucket: ${CTSDB_BUCKET:aiot_platform}
# 跨模块事件总线配置IntegrationEventBus
integration:
mq: