feat(iot): Wave 4 Round 2 — B6/B7/B18 ActionProvider + 分支执行 + DataRule迁移

B6 ActionProvider SPI + 5 核心动作(alarm/notify/device-ctrl):
- ActionProvider 接口(extends NodeProvider,默认 bridge execute)
- ActionResult record(SUCCESS/FAILURE/SKIP + output + message)
- ActionProviderManager(Spring 自动收集 + fail-fast 重复 type)
- AlarmTriggerAction(调用 IotAlarmRecordApi.triggerAlarm,模板变量解析)
- AlarmClearAction(alarmId 从 config 或 ctx.metadata 解析,幂等)
- NotifyAction(4 通道并发 + 部分失败不阻塞,第一期 stub)
- DeviceServiceInvokeAction(调用 IotDeviceControlApi.invokeService)
- DevicePropertySetAction(第一期 stub,B27 补全 Redis/MySQL)
- IotAlarmRecordApi + DTO(rule 模块→server 跨模块接口)
- IotAlarmRecordApiImpl(server 端 FeignClient 实现,委托 Service)
- 14 单元测试全绿

B7 分支执行逻辑(executeAnyway if/else-if/else):
- BranchConfiguration POJO(branches[] + executeAnyway + BranchCondition)
- BranchExecutor(核心语义:else/executeAnyway/条件异常短路/action 异常隔离)
- BranchNode NodeProvider(ACTION/"branch",内联执行命中 branch actions)
- DagExecutor 最小扩展(ctx.metadata 传递 CompiledRuleChain 供 BranchNode 使用)
- 9 单元测试全绿(含 validate else 位置校验)

B18 DataRule → DAG 自动转换工具:
- DataRuleToChainMapper(v1→v2 映射,6 种 Sink,合并/拆分多 source)
- DataRuleMigrator(dry-run + execute + 幂等映射表)
- DataRuleMigrationController(3 端点:dry-run/execute/mapping)
- 8 单元测试全绿

测试总计:rule 模块 159/159 ✓,server 模块 8/8(B18)✓

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
lzh
2026-04-24 10:10:04 +08:00
parent 42466363c7
commit 24c486900a
23 changed files with 2969 additions and 0 deletions

View File

@@ -0,0 +1,62 @@
package com.viewsh.module.iot.api.alarm;
import com.viewsh.framework.common.pojo.CommonResult;
import com.viewsh.module.iot.api.alarm.dto.IotAlarmClearReqDTO;
import com.viewsh.module.iot.api.alarm.dto.IotAlarmTriggerReqDTO;
import com.viewsh.module.iot.service.alarm.IotAlarmRecordService;
import com.viewsh.module.iot.service.alarm.dto.AlarmStateTransitionRequest;
import com.viewsh.module.iot.service.alarm.dto.AlarmTriggerRequest;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Primary;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import static com.viewsh.framework.common.pojo.CommonResult.success;
/**
* IoT 告警记录 API 实现(供 rule 模块 FeignClient 调用)
*
* @author B6
*/
@RestController
@Validated
@Primary
@Slf4j
public class IotAlarmRecordApiImpl implements IotAlarmRecordApi {
@Resource
private IotAlarmRecordService alarmRecordService;
@Override
@PostMapping(PREFIX + "/trigger")
public CommonResult<Long> triggerAlarm(@RequestBody IotAlarmTriggerReqDTO reqDTO) {
AlarmTriggerRequest req = AlarmTriggerRequest.builder()
.deviceId(reqDTO.getDeviceId())
.alarmConfigId(reqDTO.getAlarmConfigId())
.severity(reqDTO.getSeverity())
.alarmName(reqDTO.getAlarmName())
.productId(reqDTO.getProductId())
.subsystemId(reqDTO.getSubsystemId())
.ruleChainId(reqDTO.getRuleChainId())
.sceneRuleId(reqDTO.getSceneRuleId())
.details(reqDTO.getDetails())
.build();
Long alarmId = alarmRecordService.triggerAlarm(req);
return success(alarmId);
}
@Override
@PostMapping(PREFIX + "/clear")
public CommonResult<Boolean> clearAlarm(@RequestBody IotAlarmClearReqDTO reqDTO) {
AlarmStateTransitionRequest req = AlarmStateTransitionRequest.builder()
.alarmId(reqDTO.getAlarmId())
.operator(reqDTO.getOperator())
.remark(reqDTO.getRemark())
.build();
alarmRecordService.clearAlarm(req);
return success(true);
}
}

View File

@@ -0,0 +1,62 @@
package com.viewsh.module.iot.migration;
import com.viewsh.framework.common.pojo.CommonResult;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import static com.viewsh.framework.common.pojo.CommonResult.success;
/**
* B18 — DataRule 迁移 REST API
*
* <p>3 个端点(与 B17 对称):
* <ul>
* <li>POST /iot/migration/data-rule/dry-run — 预览迁移结果</li>
* <li>POST /iot/migration/data-rule/execute — 执行迁移(幂等)</li>
* <li>GET /iot/migration/data-rule/mapping — 查询已迁移映射关系</li>
* </ul>
*/
@Tag(name = "管理后台 - IoT DataRule 迁移工具B18")
@RestController
@RequestMapping("/iot/migration/data-rule")
@Validated
public class DataRuleMigrationController {
@Resource
private DataRuleMigrator datRuleMigrator;
@PostMapping("/dry-run")
@Operation(summary = "预览 DataRule 迁移结果(不写库)",
description = "返回每条 v1 DataRule 将被转换成几个 v2 RuleChain 以及 chain 名称列表")
@PreAuthorize("@ss.hasPermission('iot:migration:data-rule:dry-run')")
public CommonResult<List<DataRuleMigrator.DryRunResult>> dryRun() {
return success(datRuleMigrator.dryRun());
}
@PostMapping("/execute")
@Operation(summary = "执行 DataRule 迁移(幂等)",
description = "将 v1 iot_data_rule + iot_data_sink 迁移为 v2 DAG RuleChain已迁移的会被跳过")
@PreAuthorize("@ss.hasPermission('iot:migration:data-rule:execute')")
public CommonResult<DataRuleMigrator.ExecuteResult> execute(
@RequestParam(value = "migrator", defaultValue = "system")
@Parameter(description = "操作人标识(审计用)", example = "admin")
String migrator) {
return success(datRuleMigrator.execute(migrator));
}
@GetMapping("/mapping")
@Operation(summary = "查询已迁移的映射关系",
description = "查询 iot_data_rule_migration 表中的所有映射记录")
@PreAuthorize("@ss.hasPermission('iot:migration:data-rule:mapping')")
public CommonResult<List<DataRuleMigrator.MappingRecord>> queryMapping() {
return success(datRuleMigrator.queryMappings());
}
}

View File

@@ -0,0 +1,265 @@
package com.viewsh.module.iot.migration;
import com.viewsh.module.iot.dal.dataobject.rule.IotDataRuleDO;
import com.viewsh.module.iot.dal.dataobject.rule.IotDataSinkDO;
import com.viewsh.module.iot.dal.mysql.rule.IotDataRuleMapper;
import com.viewsh.module.iot.dal.mysql.rule.IotDataSinkMapper;
import com.viewsh.module.iot.migration.mapping.DataRuleToChainMapper;
import com.viewsh.module.iot.rule.controller.admin.vo.IotRuleChainSaveReqVO;
import com.viewsh.module.iot.rule.service.IotRuleChainService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
/**
* B18 — DataRule + DataSink → v2 DAG RuleChain 迁移主服务
*
* <p>策略:
* <ol>
* <li>读取所有 v1 data_rule</li>
* <li>通过 {@link DataRuleToChainMapper} 转换为 v2 chain 请求</li>
* <li>调用 {@link IotRuleChainService#createRuleChain} 写入 v2 chain</li>
* <li>向映射表 {@code iot_data_rule_migration} 写入记录幂等UK 冲突时跳过)</li>
* </ol>
*
* <p>幂等:映射表唯一键 {@code uk_old_source (old_rule_id, source_index, tenant_id)}
* 重复执行时 INSERT IGNORE 跳过已迁移记录。
*
* <p>映射表 DDL参考
* <pre>{@code
* CREATE TABLE iot_data_rule_migration (
* id BIGINT PRIMARY KEY AUTO_INCREMENT,
* old_rule_id BIGINT NOT NULL,
* new_chain_id BIGINT NOT NULL,
* source_index INT DEFAULT 0,
* migrated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
* migrator VARCHAR(64),
* tenant_id BIGINT NOT NULL,
* UNIQUE KEY uk_old_source (old_rule_id, source_index, tenant_id)
* ) COMMENT='DataRule 迁移映射';
* }</pre>
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class DataRuleMigrator {
private final IotDataRuleMapper dataRuleMapper;
private final IotDataSinkMapper dataSinkMapper;
private final DataRuleToChainMapper chainMapper;
private final IotRuleChainService ruleChainService;
private final JdbcTemplate jdbcTemplate;
// -------------------------------------------------------------------------
// Public API
// -------------------------------------------------------------------------
/**
* 预览dry-run不实际写库仅返回每条 v1 rule 会生成几个 v2 chain
*
* @return 预览结果列表
*/
public List<DryRunResult> dryRun() {
List<IotDataRuleDO> rules = dataRuleMapper.selectList(null);
if (rules.isEmpty()) {
return Collections.emptyList();
}
// 批量加载所有 sink
Map<Long, IotDataSinkDO> sinkMap = loadAllSinks();
List<DryRunResult> results = new ArrayList<>();
for (IotDataRuleDO rule : rules) {
List<IotDataSinkDO> sinks = resolveSinks(rule, sinkMap);
List<DataRuleToChainMapper.ChainCandidate> candidates = chainMapper.toChainCandidates(rule, sinks);
results.add(new DryRunResult(
rule.getId(),
rule.getName(),
candidates.size(),
candidates.stream()
.map(c -> c.saveReqVO().getName())
.collect(Collectors.toList())
));
}
return results;
}
/**
* 执行迁移(幂等):已迁移的 rule+sourceIndex 会被跳过
*
* @param migrator 操作人标识(用于审计)
* @return 执行结果汇总
*/
@Transactional(rollbackFor = Exception.class)
public ExecuteResult execute(String migrator) {
List<IotDataRuleDO> rules = dataRuleMapper.selectList(null);
if (rules.isEmpty()) {
return new ExecuteResult(0, 0, 0, Collections.emptyList());
}
Map<Long, IotDataSinkDO> sinkMap = loadAllSinks();
int totalRules = rules.size();
int migratedCount = 0;
int skippedCount = 0;
List<String> errors = new ArrayList<>();
for (IotDataRuleDO rule : rules) {
try {
List<IotDataSinkDO> sinks = resolveSinks(rule, sinkMap);
List<DataRuleToChainMapper.ChainCandidate> candidates = chainMapper.toChainCandidates(rule, sinks);
for (DataRuleToChainMapper.ChainCandidate candidate : candidates) {
boolean inserted = persistCandidate(rule.getId(), candidate, migrator);
if (inserted) {
migratedCount++;
} else {
skippedCount++;
}
}
} catch (Exception ex) {
log.error("[B18] 迁移 DataRule(id={}) 失败", rule.getId(), ex);
errors.add("rule=" + rule.getId() + ": " + ex.getMessage());
}
}
return new ExecuteResult(totalRules, migratedCount, skippedCount, errors);
}
/**
* 查询已迁移的映射关系
*
* @return 映射记录列表
*/
public List<MappingRecord> queryMappings() {
String sql = "SELECT id, old_rule_id, new_chain_id, source_index, migrated_at, migrator, tenant_id "
+ "FROM iot_data_rule_migration ORDER BY old_rule_id, source_index";
return jdbcTemplate.query(sql, (rs, rowNum) -> new MappingRecord(
rs.getLong("id"),
rs.getLong("old_rule_id"),
rs.getLong("new_chain_id"),
rs.getInt("source_index"),
rs.getObject("migrated_at", LocalDateTime.class),
rs.getString("migrator"),
rs.getLong("tenant_id")
));
}
// -------------------------------------------------------------------------
// Private helpers
// -------------------------------------------------------------------------
/**
* 尝试持久化单个 ChainCandidate
* <ol>
* <li>调用 createRuleChain 写入 v2 chain</li>
* <li>INSERT IGNORE 写映射表</li>
* </ol>
*
* @return true=新增迁移false=已存在(幂等跳过)
*/
private boolean persistCandidate(Long oldRuleId,
DataRuleToChainMapper.ChainCandidate candidate,
String migrator) {
// 先检查映射表是否已存在(幂等判断)
Integer existing = jdbcTemplate.queryForObject(
"SELECT COUNT(1) FROM iot_data_rule_migration WHERE old_rule_id = ? AND source_index = ?",
Integer.class,
oldRuleId, candidate.sourceIndex());
if (existing != null && existing > 0) {
log.info("[B18] DataRule(id={}) sourceIndex={} 已迁移,跳过", oldRuleId, candidate.sourceIndex());
return false;
}
// 创建 v2 chain使用空 links 列表,节点本身携带 DAG 语义信息)
IotRuleChainSaveReqVO req = candidate.saveReqVO();
if (req.getLinks() == null) {
req.setLinks(Collections.emptyList());
}
Long newChainId = ruleChainService.createRuleChain(req);
// 写映射表
try {
jdbcTemplate.update(
"INSERT INTO iot_data_rule_migration (old_rule_id, new_chain_id, source_index, migrator, tenant_id) "
+ "VALUES (?, ?, ?, ?, ?)",
oldRuleId, newChainId, candidate.sourceIndex(), migrator, 0L /* tenantId 由多租户框架注入,此处占位 */
);
} catch (DuplicateKeyException e) {
// 并发场景下极少出现,直接忽略
log.warn("[B18] 映射表 UK 冲突DataRule(id={}) sourceIndex={},忽略", oldRuleId, candidate.sourceIndex());
}
log.info("[B18] DataRule(id={}) sourceIndex={} → RuleChain(id={})", oldRuleId, candidate.sourceIndex(), newChainId);
return true;
}
/** 批量加载所有 Sink构成 id→DO 索引 */
private Map<Long, IotDataSinkDO> loadAllSinks() {
List<IotDataSinkDO> allSinks = dataSinkMapper.selectList(null);
Map<Long, IotDataSinkDO> map = new HashMap<>();
for (IotDataSinkDO sink : allSinks) {
map.put(sink.getId(), sink);
}
return map;
}
/** 根据 rule.sinkIds 从 map 中取出对应 Sink 列表 */
private List<IotDataSinkDO> resolveSinks(IotDataRuleDO rule, Map<Long, IotDataSinkDO> sinkMap) {
if (rule.getSinkIds() == null || rule.getSinkIds().isEmpty()) {
return Collections.emptyList();
}
return rule.getSinkIds().stream()
.map(sinkMap::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
// -------------------------------------------------------------------------
// Result DTOs
// -------------------------------------------------------------------------
/**
* dry-run 结果:每条 v1 rule 会生成几个 v2 chain
*/
public record DryRunResult(
Long oldRuleId,
String oldRuleName,
int willCreateChainCount,
List<String> chainNames
) {
}
/**
* 执行结果汇总
*/
public record ExecuteResult(
int totalRules,
int migratedCount,
int skippedCount,
List<String> errors
) {
}
/**
* 映射表记录 DTO
*/
public record MappingRecord(
Long id,
Long oldRuleId,
Long newChainId,
Integer sourceIndex,
LocalDateTime migratedAt,
String migrator,
Long tenantId
) {
}
}

View File

@@ -0,0 +1,314 @@
package com.viewsh.module.iot.migration.mapping;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.module.iot.dal.dataobject.rule.IotDataRuleDO;
import com.viewsh.module.iot.dal.dataobject.rule.IotDataSinkDO;
import com.viewsh.module.iot.dal.dataobject.rule.config.*;
import com.viewsh.module.iot.enums.rule.IotDataSinkTypeEnum;
import com.viewsh.module.iot.rule.controller.admin.vo.IotRuleChainSaveReqVO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.stream.Collectors;
/**
* v1 DataRule + DataSink → v2 DAG RuleChain 映射器
*
* <p>映射规则(评审 B7
* <ul>
* <li>http → http_push</li>
* <li>rocketmq → mq_push (provider=rocketmq)</li>
* <li>kafka → mq_push (provider=kafka)</li>
* <li>rabbitmq → mq_push (provider=rabbitmq)</li>
* <li>redis → redis_push</li>
* <li>tcp → tcp_push</li>
* </ul>
*
* <p>多 sourceConfig 处理策略(评审 A5
* <ul>
* <li>若所有 sourceConfig 的 productId 相同且 method 相同 → 合并为 1 chainidentifiers 数组合并)</li>
* <li>否则 → 拆分为多个 chain名称加 "-source{n}"</li>
* </ul>
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class DataRuleToChainMapper {
private final ObjectMapper objectMapper;
/**
* 将 1 个 v1 DataRule + 对应 Sinks 转换为 1 或 N 个 v2 RuleChain 请求 VO
*
* @param rule v1 数据流转规则
* @param sinks v1 数据目的列表(与 rule.sinkIds 对应)
* @return 转换结果列表,每个元素对应一个 chain含 sourceIndex
*/
public List<ChainCandidate> toChainCandidates(IotDataRuleDO rule, List<IotDataSinkDO> sinks) {
List<IotDataRuleDO.SourceConfig> sources = rule.getSourceConfigs();
if (sources == null || sources.isEmpty()) {
log.warn("[B18] DataRule(id={}) 无 sourceConfigs跳过", rule.getId());
return Collections.emptyList();
}
// 尝试合并策略:所有 source 的 productId 和 method 相同
boolean mergeable = canMerge(sources);
if (mergeable) {
// 合并为单 chain
IotRuleChainSaveReqVO req = buildChainReq(rule, sources, sinks, null);
return List.of(new ChainCandidate(0, req));
}
// 降级:每个 source 单独生成一个 chain
List<ChainCandidate> candidates = new ArrayList<>();
for (int i = 0; i < sources.size(); i++) {
IotDataRuleDO.SourceConfig src = sources.get(i);
IotRuleChainSaveReqVO req = buildChainReq(rule, List.of(src), sinks, i);
candidates.add(new ChainCandidate(i, req));
}
return candidates;
}
/**
* 判断多个 sourceConfig 是否可以合并productId 相同 + method 相同)
*/
public boolean canMerge(List<IotDataRuleDO.SourceConfig> sources) {
if (sources.size() <= 1) {
return true;
}
Long firstProductId = sources.get(0).getProductId();
String firstMethod = sources.get(0).getMethod();
return sources.stream().allMatch(s ->
Objects.equals(s.getProductId(), firstProductId)
&& Objects.equals(s.getMethod(), firstMethod));
}
/**
* 构建单个 RuleChain 请求 VO
*
* <p>节点顺序约定(匹配 IotRuleChainServiceImpl.validateNoCycle 的临时 ID 规则):
* <ul>
* <li>index 0 → trigger 节点,临时 key = -1</li>
* <li>index 1..N → action 节点,临时 key = -(2), -(3), ...</li>
* </ul>
* links 的 sourceNodeId/targetNodeId 使用上述临时 key负数
*
* @param rule v1 规则
* @param sources 用于本 chain 的 sourceConfigs1 个或合并多个)
* @param sinks v1 sinks
* @param sourceIndex 若为拆分模式传序号0-based合并模式传 null
*/
private IotRuleChainSaveReqVO buildChainReq(
IotDataRuleDO rule,
List<IotDataRuleDO.SourceConfig> sources,
List<IotDataSinkDO> sinks,
Integer sourceIndex) {
String chainName = sourceIndex == null
? rule.getName()
: rule.getName() + "-source" + (sourceIndex + 1);
IotRuleChainSaveReqVO req = new IotRuleChainSaveReqVO();
req.setName(chainName);
req.setDescription(rule.getDescription());
req.setType("DATA");
req.setPriority(10);
req.setDebugMode(false);
// 取第一个 source 的 productId/deviceId 作为 chain 级别过滤
IotDataRuleDO.SourceConfig firstSrc = sources.get(0);
req.setProductId(firstSrc.getProductId());
req.setDeviceId(firstSrc.getDeviceId());
List<IotRuleChainSaveReqVO.NodeVO> nodes = new ArrayList<>();
List<IotRuleChainSaveReqVO.LinkVO> links = new ArrayList<>();
// --- Trigger 节点index=0临时 key=-1---
IotRuleChainSaveReqVO.NodeVO triggerNode = buildTriggerNode(sources);
triggerNode.setPositionX(100);
triggerNode.setPositionY(200);
nodes.add(triggerNode); // index 0
// --- Action 节点index=1..N临时 key=-(i+1)---
int actionY = 100;
int nodeIndex = 1; // 从 1 开始trigger 占 0
for (IotDataSinkDO sink : sinks) {
IotRuleChainSaveReqVO.NodeVO actionNode = buildActionNode(sink);
if (actionNode == null) {
log.warn("[B18] DataSink(id={}, type={}) 无法映射,跳过", sink.getId(), sink.getType());
continue;
}
actionNode.setPositionX(400);
actionNode.setPositionY(actionY);
actionY += 120;
nodes.add(actionNode);
// 建立 trigger → action 的 link临时 key
// trigger 的临时 key = -(0+1) = -1
// action 的临时 key = -(nodeIndex+1)
IotRuleChainSaveReqVO.LinkVO link = new IotRuleChainSaveReqVO.LinkVO();
link.setSourceNodeId(-1L);
link.setTargetNodeId(-(long) (nodeIndex + 1));
link.setRelationType("Success");
link.setSortOrder(nodeIndex - 1);
links.add(link);
nodeIndex++;
}
req.setNodes(nodes);
req.setLinks(links);
return req;
}
/**
* 构建 Trigger 节点device.property.trigger 或 device.event.trigger
*
* <p>若 sources 均为同 productId则 identifiers 合并为数组JSON config
*/
private IotRuleChainSaveReqVO.NodeVO buildTriggerNode(List<IotDataRuleDO.SourceConfig> sources) {
IotRuleChainSaveReqVO.NodeVO node = new IotRuleChainSaveReqVO.NodeVO();
node.setCategory("trigger");
node.setName("设备数据触发器");
// 根据 method 决定 trigger 类型
String method = sources.get(0).getMethod();
String triggerType;
if (method != null && method.toLowerCase().contains("event")) {
triggerType = "device.event.trigger";
} else {
triggerType = "device.property.trigger";
}
node.setType(triggerType);
// 收集 identifiers
List<String> identifiers = sources.stream()
.map(IotDataRuleDO.SourceConfig::getIdentifier)
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());
// 构建 configuration JSON
Map<String, Object> config = new LinkedHashMap<>();
config.put("productId", sources.get(0).getProductId());
config.put("deviceId", sources.get(0).getDeviceId());
config.put("method", method);
if (!identifiers.isEmpty()) {
config.put("identifiers", identifiers);
}
node.setConfiguration(toJson(config));
return node;
}
/**
* 将 v1 DataSink 映射为 v2 Action 节点
*
* @return null 表示无法映射(未知类型),调用方跳过
*/
IotRuleChainSaveReqVO.NodeVO buildActionNode(IotDataSinkDO sink) {
if (sink.getType() == null || sink.getConfig() == null) {
return null;
}
IotRuleChainSaveReqVO.NodeVO node = new IotRuleChainSaveReqVO.NodeVO();
node.setCategory("action");
node.setName(sink.getName() != null ? sink.getName() : "推送动作");
int type = sink.getType();
Map<String, Object> config = new LinkedHashMap<>();
if (type == IotDataSinkTypeEnum.HTTP.getType()) {
// http → http_push
node.setType("http_push");
IotDataSinkHttpConfig http = (IotDataSinkHttpConfig) sink.getConfig();
config.put("url", http.getUrl());
config.put("method", http.getMethod());
config.put("headers", http.getHeaders());
config.put("body", http.getBody());
} else if (type == IotDataSinkTypeEnum.ROCKETMQ.getType()) {
// rocketmq → mq_push (provider=rocketmq)
node.setType("mq_push");
IotDataSinkRocketMQConfig rmq = (IotDataSinkRocketMQConfig) sink.getConfig();
config.put("provider", "rocketmq");
config.put("nameServer", rmq.getNameServer());
config.put("topic", rmq.getTopic());
config.put("tag", rmq.getTags());
config.put("group", rmq.getGroup());
} else if (type == IotDataSinkTypeEnum.KAFKA.getType()) {
// kafka → mq_push (provider=kafka)
node.setType("mq_push");
IotDataSinkKafkaConfig kafka = (IotDataSinkKafkaConfig) sink.getConfig();
config.put("provider", "kafka");
config.put("bootstrapServers", kafka.getBootstrapServers());
config.put("topic", kafka.getTopic());
} else if (type == IotDataSinkTypeEnum.RABBITMQ.getType()) {
// rabbitmq → mq_push (provider=rabbitmq)
node.setType("mq_push");
IotDataSinkRabbitMQConfig rmq = (IotDataSinkRabbitMQConfig) sink.getConfig();
config.put("provider", "rabbitmq");
config.put("host", rmq.getHost());
config.put("port", rmq.getPort());
config.put("exchange", rmq.getExchange());
config.put("routingKey", rmq.getRoutingKey());
} else if (type == IotDataSinkTypeEnum.REDIS.getType()) {
// redis → redis_push
node.setType("redis_push");
IotDataSinkRedisConfig redis = (IotDataSinkRedisConfig) sink.getConfig();
config.put("host", redis.getHost());
config.put("port", redis.getPort());
config.put("channel", redis.getTopic());
config.put("key", redis.getTopic());
config.put("dataStructure", redis.getDataStructure());
} else if (type == IotDataSinkTypeEnum.TCP.getType()) {
// tcp → tcp_push
node.setType("tcp_push");
IotDataSinkTcpConfig tcp = (IotDataSinkTcpConfig) sink.getConfig();
config.put("host", tcp.getHost());
config.put("port", tcp.getPort());
} else {
// 未知类型WebSocket/MQTT/DATABASE 暂不支持)
log.warn("[B18] DataSink type={} 暂不支持迁移,跳过 sinkId={}", type, sink.getId());
return null;
}
node.setConfiguration(toJson(config));
return node;
}
/**
* 将 Map 序列化为 JSON 字符串(迁移工具内部使用)
*/
private String toJson(Map<String, Object> map) {
try {
return objectMapper.writeValueAsString(map);
} catch (Exception e) {
log.error("[B18] JSON 序列化失败", e);
return "{}";
}
}
// -------------------------------------------------------------------------
// 内部 DTO
// -------------------------------------------------------------------------
/**
* 单次迁移候选(对应一个将要创建的 v2 chain
*/
public record ChainCandidate(
/** 对应 v1 sourceConfigs 的序号(合并模式固定为 0 */
int sourceIndex,
/** 待创建的 chain 请求 VO */
IotRuleChainSaveReqVO saveReqVO
) {
}
}

View File

@@ -0,0 +1,427 @@
package com.viewsh.module.iot.migration;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.framework.test.core.ut.BaseMockitoUnitTest;
import com.viewsh.module.iot.dal.dataobject.rule.IotDataRuleDO;
import com.viewsh.module.iot.dal.dataobject.rule.IotDataSinkDO;
import com.viewsh.module.iot.dal.dataobject.rule.config.*;
import com.viewsh.module.iot.dal.mysql.rule.IotDataRuleMapper;
import com.viewsh.module.iot.dal.mysql.rule.IotDataSinkMapper;
import com.viewsh.module.iot.enums.rule.IotDataSinkTypeEnum;
import com.viewsh.module.iot.migration.mapping.DataRuleToChainMapper;
import com.viewsh.module.iot.rule.controller.admin.vo.IotRuleChainSaveReqVO;
import com.viewsh.module.iot.rule.service.IotRuleChainService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import java.util.Collections;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
/**
* B18 — {@link DataRuleMigrator} 单元测试
*
* <p>6 个测试用例:
* <ol>
* <li>single_source_http — 1 source + 1 http sink → 1 chain (Trigger + HttpPush)</li>
* <li>multi_sink — 1 source + 5 sinks → 1 chain (Trigger + 5 Actions)</li>
* <li>multi_source_mergeable — 2 sources 同 productId + method → 合并为 1 chain</li>
* <li>multi_source_split — 2 sources 不同 productId → 拆为 2 chain</li>
* <li>mq_provider_preserve — sink=rocketmq → mq_push, config.provider="rocketmq"</li>
* <li>idempotent — 重跑 → 跳过(映射表 UK 冲突不抛错)</li>
* </ol>
*/
class DataRuleMigratorTest extends BaseMockitoUnitTest {
@Mock
private IotDataRuleMapper dataRuleMapper;
@Mock
private IotDataSinkMapper dataSinkMapper;
@Mock
private IotRuleChainService ruleChainService;
@Mock
private JdbcTemplate jdbcTemplate;
private DataRuleToChainMapper chainMapper;
private DataRuleMigrator migrator;
@BeforeEach
void setUp() {
chainMapper = new DataRuleToChainMapper(new ObjectMapper());
migrator = new DataRuleMigrator(dataRuleMapper, dataSinkMapper, chainMapper, ruleChainService, jdbcTemplate);
}
// =========================================================================
// 用例 1single_source_http
// =========================================================================
@Test
@DisplayName("single_source_http: 1 source + 1 http sink → 1 chain with Trigger + HttpPush action")
void testSingleSourceHttp() {
// Given
Long ruleId = 1L;
Long sinkId = 100L;
IotDataRuleDO rule = buildRule(ruleId, "HTTP 规则",
List.of(buildSource(10L, 101L, "testProp", "property")),
List.of(sinkId));
IotDataSinkDO httpSink = buildSink(sinkId, "HTTP推送", IotDataSinkTypeEnum.HTTP.getType(), buildHttpConfig());
mockMappers(List.of(rule), List.of(httpSink));
mockMigrationTableEmpty(ruleId, 0);
when(ruleChainService.createRuleChain(any())).thenReturn(200L);
mockInsertMapping();
// When
DataRuleMigrator.ExecuteResult result = migrator.execute("test");
// Then
assertThat(result.totalRules()).isEqualTo(1);
assertThat(result.migratedCount()).isEqualTo(1);
assertThat(result.skippedCount()).isEqualTo(0);
assertThat(result.errors()).isEmpty();
// 验证 createRuleChain 被调用,且 chain type=DATA
ArgumentCaptor<IotRuleChainSaveReqVO> captor = ArgumentCaptor.forClass(IotRuleChainSaveReqVO.class);
verify(ruleChainService).createRuleChain(captor.capture());
IotRuleChainSaveReqVO req = captor.getValue();
assertThat(req.getType()).isEqualTo("DATA");
// trigger 节点
IotRuleChainSaveReqVO.NodeVO trigger = req.getNodes().stream()
.filter(n -> "trigger".equals(n.getCategory())).findFirst().orElseThrow();
assertThat(trigger.getType()).startsWith("device.");
// action 节点
IotRuleChainSaveReqVO.NodeVO action = req.getNodes().stream()
.filter(n -> "action".equals(n.getCategory())).findFirst().orElseThrow();
assertThat(action.getType()).isEqualTo("http_push");
}
// =========================================================================
// 用例 2multi_sink
// =========================================================================
@Test
@DisplayName("multi_sink: 1 source + 5 sinks → 1 chain with Trigger + 5 action nodes")
void testMultiSink() {
// Given
Long ruleId = 2L;
List<Long> sinkIds = List.of(201L, 202L, 203L, 204L, 205L);
IotDataRuleDO rule = buildRule(ruleId, "多 Sink 规则",
List.of(buildSource(10L, 0L, null, "property")),
sinkIds);
List<IotDataSinkDO> sinks = List.of(
buildSink(201L, "HTTP", IotDataSinkTypeEnum.HTTP.getType(), buildHttpConfig()),
buildSink(202L, "RocketMQ", IotDataSinkTypeEnum.ROCKETMQ.getType(), buildRocketMQConfig()),
buildSink(203L, "Kafka", IotDataSinkTypeEnum.KAFKA.getType(), buildKafkaConfig()),
buildSink(204L, "RabbitMQ", IotDataSinkTypeEnum.RABBITMQ.getType(), buildRabbitMQConfig()),
buildSink(205L, "Redis", IotDataSinkTypeEnum.REDIS.getType(), buildRedisConfig())
);
mockMappers(List.of(rule), sinks);
mockMigrationTableEmpty(ruleId, 0);
when(ruleChainService.createRuleChain(any())).thenReturn(300L);
mockInsertMapping();
// When
DataRuleMigrator.ExecuteResult result = migrator.execute("test");
// Then
assertThat(result.migratedCount()).isEqualTo(1);
ArgumentCaptor<IotRuleChainSaveReqVO> captor = ArgumentCaptor.forClass(IotRuleChainSaveReqVO.class);
verify(ruleChainService).createRuleChain(captor.capture());
IotRuleChainSaveReqVO req = captor.getValue();
long triggerCount = req.getNodes().stream().filter(n -> "trigger".equals(n.getCategory())).count();
long actionCount = req.getNodes().stream().filter(n -> "action".equals(n.getCategory())).count();
assertThat(triggerCount).isEqualTo(1);
assertThat(actionCount).isEqualTo(5);
}
// =========================================================================
// 用例 3multi_source_mergeable
// =========================================================================
@Test
@DisplayName("multi_source_mergeable: 2 sources 同 productId + method → 合并为 1 chain")
void testMultiSourceMergeable() {
// Given - same productId + method
Long ruleId = 3L;
Long sinkId = 301L;
IotDataRuleDO rule = buildRule(ruleId, "可合并多源",
List.of(
buildSource(50L, 0L, "temp", "property"),
buildSource(50L, 0L, "humidity", "property")
),
List.of(sinkId));
IotDataSinkDO sink = buildSink(sinkId, "HTTP", IotDataSinkTypeEnum.HTTP.getType(), buildHttpConfig());
mockMappers(List.of(rule), List.of(sink));
mockMigrationTableEmpty(ruleId, 0);
when(ruleChainService.createRuleChain(any())).thenReturn(400L);
mockInsertMapping();
// When
DataRuleMigrator.ExecuteResult result = migrator.execute("test");
// Then: only 1 chain created (merged)
assertThat(result.migratedCount()).isEqualTo(1);
verify(ruleChainService, times(1)).createRuleChain(any());
// 验证 trigger config 中 identifiers 包含两个值
ArgumentCaptor<IotRuleChainSaveReqVO> captor = ArgumentCaptor.forClass(IotRuleChainSaveReqVO.class);
verify(ruleChainService).createRuleChain(captor.capture());
IotRuleChainSaveReqVO req = captor.getValue();
IotRuleChainSaveReqVO.NodeVO trigger = req.getNodes().stream()
.filter(n -> "trigger".equals(n.getCategory())).findFirst().orElseThrow();
assertThat(trigger.getConfiguration()).contains("temp").contains("humidity");
}
// =========================================================================
// 用例 4multi_source_split
// =========================================================================
@Test
@DisplayName("multi_source_split: 2 sources 不同 productId → 拆为 2 chain")
void testMultiSourceSplit() {
// Given - different productId
Long ruleId = 4L;
Long sinkId = 401L;
IotDataRuleDO rule = buildRule(ruleId, "拆分多源",
List.of(
buildSource(60L, 0L, "temp", "property"),
buildSource(70L, 0L, "temp", "property")
),
List.of(sinkId));
IotDataSinkDO sink = buildSink(sinkId, "HTTP", IotDataSinkTypeEnum.HTTP.getType(), buildHttpConfig());
mockMappers(List.of(rule), List.of(sink));
// source_index 0 未迁移source_index 1 未迁移
mockMigrationTableEmpty(ruleId, 0);
mockMigrationTableEmpty(ruleId, 1);
when(ruleChainService.createRuleChain(any())).thenReturn(500L).thenReturn(501L);
mockInsertMapping();
// When
DataRuleMigrator.ExecuteResult result = migrator.execute("test");
// Then: 2 chains created (split)
assertThat(result.migratedCount()).isEqualTo(2);
verify(ruleChainService, times(2)).createRuleChain(any());
// 验证 chain 名称带序号
ArgumentCaptor<IotRuleChainSaveReqVO> captor = ArgumentCaptor.forClass(IotRuleChainSaveReqVO.class);
verify(ruleChainService, times(2)).createRuleChain(captor.capture());
List<IotRuleChainSaveReqVO> reqs = captor.getAllValues();
assertThat(reqs.get(0).getName()).endsWith("-source1");
assertThat(reqs.get(1).getName()).endsWith("-source2");
}
// =========================================================================
// 用例 5mq_provider_preserve
// =========================================================================
@Test
@DisplayName("mq_provider_preserve: sink=rocketmq → action type=mq_push, config.provider=rocketmq")
void testMqProviderPreserve() {
// Given
Long ruleId = 5L;
Long sinkId = 501L;
IotDataRuleDO rule = buildRule(ruleId, "RocketMQ 规则",
List.of(buildSource(10L, 0L, "temp", "property")),
List.of(sinkId));
IotDataSinkDO rmqSink = buildSink(sinkId, "RMQ", IotDataSinkTypeEnum.ROCKETMQ.getType(), buildRocketMQConfig());
mockMappers(List.of(rule), List.of(rmqSink));
mockMigrationTableEmpty(ruleId, 0);
when(ruleChainService.createRuleChain(any())).thenReturn(600L);
mockInsertMapping();
// When
migrator.execute("test");
// Then: action type=mq_push + provider=rocketmq
ArgumentCaptor<IotRuleChainSaveReqVO> captor = ArgumentCaptor.forClass(IotRuleChainSaveReqVO.class);
verify(ruleChainService).createRuleChain(captor.capture());
IotRuleChainSaveReqVO req = captor.getValue();
IotRuleChainSaveReqVO.NodeVO action = req.getNodes().stream()
.filter(n -> "action".equals(n.getCategory())).findFirst().orElseThrow();
assertThat(action.getType()).isEqualTo("mq_push");
assertThat(action.getConfiguration()).contains("\"provider\"").contains("rocketmq");
}
// =========================================================================
// 用例 6idempotent
// =========================================================================
@Test
@DisplayName("idempotent: 重跑时映射表已存在 → 跳过,不重复创建 chain")
void testIdempotent() {
// Given
Long ruleId = 6L;
Long sinkId = 601L;
IotDataRuleDO rule = buildRule(ruleId, "幂等测试",
List.of(buildSource(10L, 0L, "temp", "property")),
List.of(sinkId));
IotDataSinkDO sink = buildSink(sinkId, "HTTP", IotDataSinkTypeEnum.HTTP.getType(), buildHttpConfig());
mockMappers(List.of(rule), List.of(sink));
// 模拟映射表已存在该记录COUNT=1
when(jdbcTemplate.queryForObject(
contains("COUNT(1)"),
eq(Integer.class),
eq(ruleId), eq(0)
)).thenReturn(1);
// When
DataRuleMigrator.ExecuteResult result = migrator.execute("test");
// Then: skipped=1, createRuleChain 不被调用
assertThat(result.skippedCount()).isEqualTo(1);
assertThat(result.migratedCount()).isEqualTo(0);
verify(ruleChainService, never()).createRuleChain(any());
}
// =========================================================================
// Helper: DataRuleToChainMapper 直接测试canMerge 逻辑)
// =========================================================================
@Test
@DisplayName("canMerge: 相同 productId + method 可合并")
void testCanMerge_same() {
DataRuleToChainMapper m = new DataRuleToChainMapper(new ObjectMapper());
List<IotDataRuleDO.SourceConfig> sources = List.of(
buildSource(1L, 0L, "a", "property"),
buildSource(1L, 0L, "b", "property")
);
assertThat(m.canMerge(sources)).isTrue();
}
@Test
@DisplayName("canMerge: 不同 productId 不可合并")
void testCanMerge_diff() {
DataRuleToChainMapper m = new DataRuleToChainMapper(new ObjectMapper());
List<IotDataRuleDO.SourceConfig> sources = List.of(
buildSource(1L, 0L, "a", "property"),
buildSource(2L, 0L, "a", "property")
);
assertThat(m.canMerge(sources)).isFalse();
}
// =========================================================================
// Mock helpers
// =========================================================================
private void mockMappers(List<IotDataRuleDO> rules, List<IotDataSinkDO> sinks) {
when(dataRuleMapper.selectList(null)).thenReturn(rules);
when(dataSinkMapper.selectList(null)).thenReturn(sinks);
}
/** 模拟映射表 COUNT 查询返回 0未迁移 */
private void mockMigrationTableEmpty(Long ruleId, int sourceIndex) {
when(jdbcTemplate.queryForObject(
contains("COUNT(1)"),
eq(Integer.class),
eq(ruleId), eq(sourceIndex)
)).thenReturn(0);
}
/** 模拟 INSERT 映射表(正常写入) */
private void mockInsertMapping() {
when(jdbcTemplate.update(anyString(), any(), any(), any(), any(), any())).thenReturn(1);
}
// =========================================================================
// DO builders
// =========================================================================
private IotDataRuleDO buildRule(Long id, String name,
List<IotDataRuleDO.SourceConfig> sources,
List<Long> sinkIds) {
IotDataRuleDO rule = new IotDataRuleDO();
rule.setId(id);
rule.setName(name);
rule.setSourceConfigs(sources);
rule.setSinkIds(sinkIds);
return rule;
}
private IotDataRuleDO.SourceConfig buildSource(Long productId, Long deviceId, String identifier, String method) {
IotDataRuleDO.SourceConfig src = new IotDataRuleDO.SourceConfig();
src.setProductId(productId);
src.setDeviceId(deviceId);
src.setIdentifier(identifier);
src.setMethod(method);
return src;
}
private IotDataSinkDO buildSink(Long id, String name, Integer type, IotAbstractDataSinkConfig config) {
IotDataSinkDO sink = new IotDataSinkDO();
sink.setId(id);
sink.setName(name);
sink.setType(type);
sink.setConfig(config);
return sink;
}
// Config builders
private IotDataSinkHttpConfig buildHttpConfig() {
IotDataSinkHttpConfig cfg = new IotDataSinkHttpConfig();
cfg.setUrl("http://example.com/api");
cfg.setMethod("POST");
cfg.setBody("{\"msg\":\"hello\"}");
return cfg;
}
private IotDataSinkRocketMQConfig buildRocketMQConfig() {
IotDataSinkRocketMQConfig cfg = new IotDataSinkRocketMQConfig();
cfg.setNameServer("127.0.0.1:9876");
cfg.setTopic("iot-topic");
cfg.setTags("tag1");
cfg.setGroup("iot-group");
return cfg;
}
private IotDataSinkKafkaConfig buildKafkaConfig() {
IotDataSinkKafkaConfig cfg = new IotDataSinkKafkaConfig();
cfg.setBootstrapServers("127.0.0.1:9092");
cfg.setTopic("iot-kafka-topic");
return cfg;
}
private IotDataSinkRabbitMQConfig buildRabbitMQConfig() {
IotDataSinkRabbitMQConfig cfg = new IotDataSinkRabbitMQConfig();
cfg.setHost("127.0.0.1");
cfg.setPort(5672);
cfg.setExchange("iot-exchange");
cfg.setRoutingKey("iot.key");
return cfg;
}
private IotDataSinkRedisConfig buildRedisConfig() {
IotDataSinkRedisConfig cfg = new IotDataSinkRedisConfig();
cfg.setHost("127.0.0.1");
cfg.setPort(6379);
cfg.setTopic("iot-channel");
return cfg;
}
}