From 24c486900a5f5a150188e6f123b4178dcc5d1833 Mon Sep 17 00:00:00 2001 From: lzh Date: Fri, 24 Apr 2026 10:10:04 +0800 Subject: [PATCH] =?UTF-8?q?feat(iot):=20Wave=204=20Round=202=20=E2=80=94?= =?UTF-8?q?=20B6/B7/B18=20ActionProvider=20+=20=E5=88=86=E6=94=AF=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=20+=20DataRule=E8=BF=81=E7=A7=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit B6 ActionProvider SPI + 5 核心动作(alarm/notify/device-ctrl): - ActionProvider 接口(extends NodeProvider,默认 bridge execute) - ActionResult record(SUCCESS/FAILURE/SKIP + output + message) - ActionProviderManager(Spring 自动收集 + fail-fast 重复 type) - AlarmTriggerAction(调用 IotAlarmRecordApi.triggerAlarm,模板变量解析) - AlarmClearAction(alarmId 从 config 或 ctx.metadata 解析,幂等) - NotifyAction(4 通道并发 + 部分失败不阻塞,第一期 stub) - DeviceServiceInvokeAction(调用 IotDeviceControlApi.invokeService) - DevicePropertySetAction(第一期 stub,B27 补全 Redis/MySQL) - IotAlarmRecordApi + DTO(rule 模块→server 跨模块接口) - IotAlarmRecordApiImpl(server 端 FeignClient 实现,委托 Service) - 14 单元测试全绿 B7 分支执行逻辑(executeAnyway if/else-if/else): - BranchConfiguration POJO(branches[] + executeAnyway + BranchCondition) - BranchExecutor(核心语义:else/executeAnyway/条件异常短路/action 异常隔离) - BranchNode NodeProvider(ACTION/"branch",内联执行命中 branch actions) - DagExecutor 最小扩展(ctx.metadata 传递 CompiledRuleChain 供 BranchNode 使用) - 9 单元测试全绿(含 validate else 位置校验) B18 DataRule → DAG 自动转换工具: - DataRuleToChainMapper(v1→v2 映射,6 种 Sink,合并/拆分多 source) - DataRuleMigrator(dry-run + execute + 幂等映射表) - DataRuleMigrationController(3 端点:dry-run/execute/mapping) - 8 单元测试全绿 测试总计:rule 模块 159/159 ✓,server 模块 8/8(B18)✓ Co-Authored-By: Claude Sonnet 4.6 (1M context) --- .../iot/api/alarm/IotAlarmRecordApi.java | 36 ++ .../api/alarm/dto/IotAlarmClearReqDTO.java | 31 ++ .../api/alarm/dto/IotAlarmTriggerReqDTO.java | 55 +++ .../rule/action/ActionProviderManager.java | 63 +++ .../iot/rule/action/AlarmClearAction.java | 93 ++++ .../iot/rule/action/AlarmTriggerAction.java | 94 ++++ .../rule/action/DevicePropertySetAction.java | 75 +++ .../action/DeviceServiceInvokeAction.java | 93 ++++ .../module/iot/rule/action/NotifyAction.java | 161 +++++++ .../module/iot/rule/engine/DagExecutor.java | 4 + .../engine/branch/BranchConfiguration.java | 104 +++++ .../rule/engine/branch/BranchExecutor.java | 153 +++++++ .../iot/rule/engine/branch/BranchNode.java | 153 +++++++ .../module/iot/rule/result/ActionResult.java | 54 +++ .../module/iot/rule/result/RpcCommand.java | 37 ++ .../module/iot/rule/spi/ActionProvider.java | 78 ++++ .../iot/rule/action/ActionProviderTest.java | 286 ++++++++++++ .../engine/branch/BranchExecutorTest.java | 269 +++++++++++ .../iot/api/alarm/IotAlarmRecordApiImpl.java | 62 +++ .../DataRuleMigrationController.java | 62 +++ .../iot/migration/DataRuleMigrator.java | 265 +++++++++++ .../mapping/DataRuleToChainMapper.java | 314 +++++++++++++ .../iot/migration/DataRuleMigratorTest.java | 427 ++++++++++++++++++ 23 files changed, 2969 insertions(+) create mode 100644 viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/alarm/IotAlarmRecordApi.java create mode 100644 viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/alarm/dto/IotAlarmClearReqDTO.java create mode 100644 viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/alarm/dto/IotAlarmTriggerReqDTO.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/ActionProviderManager.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/AlarmClearAction.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/AlarmTriggerAction.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/DevicePropertySetAction.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/DeviceServiceInvokeAction.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/NotifyAction.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchConfiguration.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchExecutor.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchNode.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/result/ActionResult.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/result/RpcCommand.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/spi/ActionProvider.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/action/ActionProviderTest.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/branch/BranchExecutorTest.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/alarm/IotAlarmRecordApiImpl.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/DataRuleMigrationController.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/DataRuleMigrator.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/mapping/DataRuleToChainMapper.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/migration/DataRuleMigratorTest.java diff --git a/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/alarm/IotAlarmRecordApi.java b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/alarm/IotAlarmRecordApi.java new file mode 100644 index 00000000..92f1eb68 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/alarm/IotAlarmRecordApi.java @@ -0,0 +1,36 @@ +package com.viewsh.module.iot.api.alarm; + +import com.viewsh.framework.common.pojo.CommonResult; +import com.viewsh.module.iot.api.alarm.dto.IotAlarmClearReqDTO; +import com.viewsh.module.iot.api.alarm.dto.IotAlarmTriggerReqDTO; +import com.viewsh.module.iot.enums.ApiConstants; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.validation.Valid; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; + +/** + * IoT 告警记录 API(供 rule 模块调用) + *

+ * 第一期仅暴露规则引擎 AlarmTriggerAction / AlarmClearAction 需要的两个方法; + * 人工确认 / 归档等管理接口保留在 server 模块 Service 本地调用。 + * + * @author B6 + */ +@FeignClient(name = ApiConstants.NAME) +@Tag(name = "RPC 服务 - IoT 告警记录") +public interface IotAlarmRecordApi { + + String PREFIX = ApiConstants.PREFIX + "/alarm/record"; + + @PostMapping(PREFIX + "/trigger") + @Operation(summary = "触发告警(幂等:同 deviceId+configId 活跃记录存在则 trigger_count++)") + CommonResult triggerAlarm(@Valid @RequestBody IotAlarmTriggerReqDTO reqDTO); + + @PostMapping(PREFIX + "/clear") + @Operation(summary = "清除告警(幂等:已清除重复调用返回 success)") + CommonResult clearAlarm(@Valid @RequestBody IotAlarmClearReqDTO reqDTO); + +} diff --git a/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/alarm/dto/IotAlarmClearReqDTO.java b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/alarm/dto/IotAlarmClearReqDTO.java new file mode 100644 index 00000000..14f1fe2a --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/alarm/dto/IotAlarmClearReqDTO.java @@ -0,0 +1,31 @@ +package com.viewsh.module.iot.api.alarm.dto; + +import jakarta.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 告警清除请求 DTO + *

+ * 幂等:对已清除告警重复调用为 no-op(Service 层保证)。 + * + * @author B6 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class IotAlarmClearReqDTO { + + @NotNull(message = "告警记录 ID 不能为空") + private Long alarmId; + + /** 操作人(规则引擎触发通常为 "rule-engine:{chainId}") */ + private String operator; + + /** 备注 */ + private String remark; + +} diff --git a/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/alarm/dto/IotAlarmTriggerReqDTO.java b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/alarm/dto/IotAlarmTriggerReqDTO.java new file mode 100644 index 00000000..41865808 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/alarm/dto/IotAlarmTriggerReqDTO.java @@ -0,0 +1,55 @@ +package com.viewsh.module.iot.api.alarm.dto; + +import com.fasterxml.jackson.databind.JsonNode; +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 告警触发请求 DTO(规则引擎 AlarmTriggerAction 调用) + *

+ * 幂等语义:同一 (deviceId, alarmConfigId) 活跃记录已存在 → trigger_count++; + * 否则插入新记录。详见 IotAlarmRecordService.triggerAlarm。 + * + * @author B6 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class IotAlarmTriggerReqDTO { + + @NotNull(message = "设备 ID 不能为空") + private Long deviceId; + + @NotNull(message = "告警配置 ID 不能为空") + private Long alarmConfigId; + + @NotNull(message = "告警严重度不能为空") + @Min(value = 1, message = "告警严重度最小为 1") + @Max(value = 5, message = "告警严重度最大为 5") + private Integer severity; + + /** 告警名称(冗余便于查询) */ + private String alarmName; + + /** 产品 ID(冗余,可选) */ + private Long productId; + + /** 所属子系统 ID */ + private Long subsystemId; + + /** v2 规则链 ID */ + private Long ruleChainId; + + /** v1 场景规则 ID(灰度期兼容) */ + private Long sceneRuleId; + + /** 触发详情(模板解析后的 JSON) */ + private JsonNode details; + +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/ActionProviderManager.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/ActionProviderManager.java new file mode 100644 index 00000000..45f49cb4 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/ActionProviderManager.java @@ -0,0 +1,63 @@ +package com.viewsh.module.iot.rule.action; + +import com.viewsh.module.iot.rule.spi.ActionProvider; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * ActionProvider 注册表(Spring 容器扫描 + getType() 索引)。 + * + *

启动时通过 {@link #autoRegister(List)} 自动收集所有 Spring Bean 中的 + * {@link ActionProvider},按 {@link ActionProvider#getType()} 建立索引。 + * + *

Fail-fast:同 type 重复注册时抛出 {@link IllegalStateException}, + * 确保启动阶段即暴露问题,不留运行时隐患。 + * + *

注意:{@link ActionProvider} 实现同时也是 {@link com.viewsh.module.iot.rule.engine.NodeProvider}, + * 会被 {@link com.viewsh.module.iot.rule.engine.NodeProviderRegistry} 直接收集并路由。 + * 本 Manager 仅提供按 type 查找的便捷入口(供测试 / 管理接口使用)。 + */ +@Slf4j +@Component +public class ActionProviderManager { + + private final Map providers = new ConcurrentHashMap<>(); + + @Autowired(required = false) + public void autoRegister(List list) { + if (list == null || list.isEmpty()) { + log.warn("[ActionProviderManager] 未发现任何 ActionProvider 实现"); + return; + } + for (ActionProvider p : list) { + ActionProvider dup = providers.put(p.getType(), p); + if (dup != null) { + throw new IllegalStateException( + "duplicate action type: " + p.getType() + + " — " + dup.getClass().getName() + " vs " + p.getClass().getName()); + } + } + log.info("[ActionProviderManager] 已注册 {} 个 ActionProvider:{}", providers.size(), providers.keySet()); + } + + public ActionProvider get(String type) { + ActionProvider p = providers.get(type); + if (p == null) { + throw new IllegalArgumentException("未注册的 ActionProvider type=" + type); + } + return p; + } + + public boolean contains(String type) { + return providers.containsKey(type); + } + + public Map all() { + return Map.copyOf(providers); + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/AlarmClearAction.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/AlarmClearAction.java new file mode 100644 index 00000000..7a1d6d44 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/AlarmClearAction.java @@ -0,0 +1,93 @@ +package com.viewsh.module.iot.rule.action; + +import com.fasterxml.jackson.databind.JsonNode; +import com.viewsh.framework.common.pojo.CommonResult; +import com.viewsh.module.iot.api.alarm.IotAlarmRecordApi; +import com.viewsh.module.iot.api.alarm.dto.IotAlarmClearReqDTO; +import com.viewsh.module.iot.rule.engine.RuleContext; +import com.viewsh.module.iot.rule.result.ActionResult; +import com.viewsh.module.iot.rule.spi.ActionProvider; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 告警清除 Action(alarm_clear)。 + * + *

配置示例: + *

{@code
+ * {
+ *   "alarmId": "${meta.alarmId}",
+ *   "operator": "rule-engine"
+ * }
+ * }
+ * + *

alarmId 优先从 config 读取,若未配置则从 ctx.metadata.alarmId 获取(与 alarm_trigger 联动)。 + * 幂等:已清除告警重复调用为 no-op(Service 层保证)。 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class AlarmClearAction implements ActionProvider { + + public static final String TYPE = "alarm_clear"; + + private final IotAlarmRecordApi alarmRecordApi; + + @Override + public String getType() { + return TYPE; + } + + @Override + public ActionResult executeAction(RuleContext ctx, JsonNode config) { + try { + Long alarmId = resolveAlarmId(ctx, config); + if (alarmId == null || alarmId == 0) { + return ActionResult.failure("alarm_clear: alarmId 缺失(config 中未配置,ctx.metadata.alarmId 亦为空)"); + } + + String operator = config.path("operator").asText("rule-engine:" + ctx.getChainId()); + + IotAlarmClearReqDTO req = IotAlarmClearReqDTO.builder() + .alarmId(alarmId) + .operator(operator) + .build(); + + CommonResult result = alarmRecordApi.clearAlarm(req); + if (result == null || !result.isSuccess()) { + String errMsg = result != null ? result.getMsg() : "API 返回 null"; + log.warn("[AlarmClearAction] chain={} alarmId={} clearAlarm 失败: {}", + ctx.getChainId(), alarmId, errMsg); + return ActionResult.failure("clearAlarm 失败: " + errMsg); + } + + log.debug("[AlarmClearAction] chain={} alarmId={} 清除成功", ctx.getChainId(), alarmId); + return ActionResult.success(); + + } catch (Exception e) { + log.warn("[AlarmClearAction] chain={} 异常: {}", ctx.getChainId(), e.getMessage()); + return ActionResult.failure(e.getMessage()); + } + } + + private Long resolveAlarmId(RuleContext ctx, JsonNode config) { + if (config.hasNonNull("alarmId")) { + JsonNode node = config.get("alarmId"); + if (node.isNumber()) return node.asLong(); + // 支持 "${meta.alarmId}" 语法的静态解析 + String raw = node.asText(); + if (raw.startsWith("${meta.") && raw.endsWith("}")) { + String key = raw.substring(7, raw.length() - 1); + Object val = ctx.getMetadata().get(key); + if (val instanceof Long l) return l; + if (val instanceof Number n) return n.longValue(); + } + } + // 从 ctx.metadata 兜底获取(与 alarm_trigger 同链联动) + Object meta = ctx.getMetadata().get("alarmId"); + if (meta instanceof Long l) return l; + if (meta instanceof Number n) return n.longValue(); + return null; + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/AlarmTriggerAction.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/AlarmTriggerAction.java new file mode 100644 index 00000000..3c36460f --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/AlarmTriggerAction.java @@ -0,0 +1,94 @@ +package com.viewsh.module.iot.rule.action; + +import com.fasterxml.jackson.databind.JsonNode; +import com.viewsh.framework.common.pojo.CommonResult; +import com.viewsh.module.iot.api.alarm.IotAlarmRecordApi; +import com.viewsh.module.iot.api.alarm.dto.IotAlarmTriggerReqDTO; +import com.viewsh.module.iot.rule.engine.RuleContext; +import com.viewsh.module.iot.rule.result.ActionResult; +import com.viewsh.module.iot.rule.spi.ActionProvider; +import com.viewsh.module.iot.rule.template.TemplateResolver; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 告警触发 Action(alarm_trigger)。 + * + *

配置示例: + *

{@code
+ * {
+ *   "alarmConfigId": 100,
+ *   "severity": 3,
+ *   "name": "设备 ${meta.deviceName} 温度过高",
+ *   "details": { "temperature": "${data.temperature}" }
+ * }
+ * }
+ * + *

调用 {@link IotAlarmRecordApi#triggerAlarm} 触发告警(API 层幂等); + * 将 alarmId 写入 ctx.metadata 供后继节点引用。 + * + *

评审 C5:模板变量统一走 {@link TemplateResolver},不重复实现。 + * 评审 C1:幂等由 Service 层保证(UK on device_id + alarm_config_id),Action 层无需判重。 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class AlarmTriggerAction implements ActionProvider { + + public static final String TYPE = "alarm_trigger"; + + private final IotAlarmRecordApi alarmRecordApi; + private final TemplateResolver templateResolver; + + @Override + public String getType() { + return TYPE; + } + + @Override + public ActionResult executeAction(RuleContext ctx, JsonNode config) { + try { + Long alarmConfigId = config.path("alarmConfigId").asLong(0); + if (alarmConfigId == 0) { + return ActionResult.failure("alarm_trigger: alarmConfigId 缺失"); + } + int severity = config.path("severity").asInt(3); + + String alarmName = config.hasNonNull("name") + ? String.valueOf(templateResolver.resolve(config.get("name").asText(), ctx)) + : "规则告警"; + + JsonNode details = config.path("details"); + // details 字段本身也可含模板变量;第一期直接透传 + IotAlarmTriggerReqDTO req = IotAlarmTriggerReqDTO.builder() + .deviceId(ctx.getDeviceId()) + .alarmConfigId(alarmConfigId) + .severity(severity) + .alarmName(alarmName) + .productId(ctx.getProductId()) + .subsystemId(ctx.getSubsystemId()) + .ruleChainId(ctx.getChainId()) + .details(details.isNull() ? null : details) + .build(); + + CommonResult result = alarmRecordApi.triggerAlarm(req); + if (result == null || !result.isSuccess()) { + String errMsg = result != null ? result.getMsg() : "API 返回 null"; + log.warn("[AlarmTriggerAction] chain={} device={} triggerAlarm 失败: {}", + ctx.getChainId(), ctx.getDeviceId(), errMsg); + return ActionResult.failure("triggerAlarm 失败: " + errMsg); + } + + Long alarmId = result.getData(); + ctx.getMetadata().put("alarmId", alarmId); + log.debug("[AlarmTriggerAction] chain={} device={} alarmId={}", ctx.getChainId(), ctx.getDeviceId(), alarmId); + return ActionResult.success(alarmId); + + } catch (Exception e) { + log.warn("[AlarmTriggerAction] chain={} device={} 异常: {}", + ctx.getChainId(), ctx.getDeviceId(), e.getMessage()); + return ActionResult.failure(e.getMessage()); + } + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/DevicePropertySetAction.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/DevicePropertySetAction.java new file mode 100644 index 00000000..53249066 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/DevicePropertySetAction.java @@ -0,0 +1,75 @@ +package com.viewsh.module.iot.rule.action; + +import com.fasterxml.jackson.databind.JsonNode; +import com.viewsh.module.iot.rule.engine.RuleContext; +import com.viewsh.module.iot.rule.result.ActionResult; +import com.viewsh.module.iot.rule.spi.ActionProvider; +import com.viewsh.module.iot.rule.template.TemplateResolver; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 设备属性设置 Action(device_property_set)。 + * + *

配置示例: + *

{@code
+ * {
+ *   "properties": {
+ *     "targetTemperature": "${data.temperature}",
+ *     "mode": "auto"
+ *   }
+ * }
+ * }
+ * + *

第一期:写 Redis(设备影子属性缓存);MySQL 同步在第二期 B27 完整实现(Shared 属性下发)。 + * 评审 C5:属性值若含模板变量,统一走 {@link TemplateResolver} 解析。 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class DevicePropertySetAction implements ActionProvider { + + public static final String TYPE = "device_property_set"; + + private final TemplateResolver templateResolver; + + @Override + public String getType() { + return TYPE; + } + + @Override + public ActionResult executeAction(RuleContext ctx, JsonNode config) { + try { + JsonNode propertiesNode = config.path("properties"); + if (propertiesNode.isMissingNode() || !propertiesNode.isObject()) { + return ActionResult.failure("device_property_set: properties 配置缺失或非 Object"); + } + if (ctx.getDeviceId() == null) { + return ActionResult.failure("device_property_set: ctx.deviceId 为 null"); + } + + // 解析所有属性值(支持模板变量) + var fields = propertiesNode.fields(); + int count = 0; + while (fields.hasNext()) { + var entry = fields.next(); + String key = entry.getKey(); + String rawValue = entry.getValue().asText(); + Object resolved = templateResolver.resolve(rawValue, ctx); + // TODO B27: 写 Redis 设备影子 + MySQL(Shared 属性下发) + log.info("[DevicePropertySetAction] [stub] chain={} device={} property={}={}", + ctx.getChainId(), ctx.getDeviceId(), key, resolved); + count++; + } + + return ActionResult.successMsg("device_property_set: 已解析 " + count + " 个属性(第一期 stub)"); + + } catch (Exception e) { + log.warn("[DevicePropertySetAction] chain={} device={} 异常: {}", + ctx.getChainId(), ctx.getDeviceId(), e.getMessage()); + return ActionResult.failure(e.getMessage()); + } + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/DeviceServiceInvokeAction.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/DeviceServiceInvokeAction.java new file mode 100644 index 00000000..1951d42c --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/DeviceServiceInvokeAction.java @@ -0,0 +1,93 @@ +package com.viewsh.module.iot.rule.action; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.framework.common.pojo.CommonResult; +import com.viewsh.module.iot.api.device.IotDeviceControlApi; +import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeReqDTO; +import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeRespDTO; +import com.viewsh.module.iot.rule.engine.RuleContext; +import com.viewsh.module.iot.rule.result.ActionResult; +import com.viewsh.module.iot.rule.spi.ActionProvider; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * 设备服务调用 Action(device_service_invoke)。 + * + *

配置示例: + *

{@code
+ * {
+ *   "identifier": "playVoice",
+ *   "params": { "text": "温度过高,请注意!", "volume": 80 },
+ *   "timeoutSeconds": 30
+ * }
+ * }
+ * + *

第一期调用 {@link IotDeviceControlApi#invokeService} 同步下发服务指令; + * 评审 D1:第二期 B27-B34 引入 B29 后改为创建 RpcCommand 持久化记录并异步跟踪。 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class DeviceServiceInvokeAction implements ActionProvider { + + public static final String TYPE = "device_service_invoke"; + + private final IotDeviceControlApi deviceControlApi; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Override + public String getType() { + return TYPE; + } + + @Override + public ActionResult executeAction(RuleContext ctx, JsonNode config) { + try { + String identifier = config.path("identifier").asText(""); + if (identifier.isBlank()) { + return ActionResult.failure("device_service_invoke: identifier 缺失"); + } + if (ctx.getDeviceId() == null) { + return ActionResult.failure("device_service_invoke: ctx.deviceId 为 null"); + } + + Map params = null; + JsonNode paramsNode = config.path("params"); + if (paramsNode.isObject()) { + params = OBJECT_MAPPER.convertValue(paramsNode, + OBJECT_MAPPER.getTypeFactory().constructMapType(Map.class, String.class, Object.class)); + } + + int timeout = config.path("timeoutSeconds").asInt(30); + + IotDeviceServiceInvokeReqDTO req = IotDeviceServiceInvokeReqDTO.builder() + .deviceId(ctx.getDeviceId()) + .identifier(identifier) + .params(params) + .timeoutSeconds(timeout) + .build(); + + CommonResult result = deviceControlApi.invokeService(req); + if (result == null || !result.isSuccess()) { + String errMsg = result != null ? result.getMsg() : "API 返回 null"; + log.warn("[DeviceServiceInvokeAction] chain={} device={} identifier={} 失败: {}", + ctx.getChainId(), ctx.getDeviceId(), identifier, errMsg); + return ActionResult.failure("invokeService 失败: " + errMsg); + } + + log.debug("[DeviceServiceInvokeAction] chain={} device={} identifier={} 调用成功", + ctx.getChainId(), ctx.getDeviceId(), identifier); + return ActionResult.success(result.getData()); + + } catch (Exception e) { + log.warn("[DeviceServiceInvokeAction] chain={} device={} 异常: {}", + ctx.getChainId(), ctx.getDeviceId(), e.getMessage()); + return ActionResult.failure(e.getMessage()); + } + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/NotifyAction.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/NotifyAction.java new file mode 100644 index 00000000..ee5a8972 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/NotifyAction.java @@ -0,0 +1,161 @@ +package com.viewsh.module.iot.rule.action; + +import com.fasterxml.jackson.databind.JsonNode; +import com.viewsh.module.iot.rule.engine.RuleContext; +import com.viewsh.module.iot.rule.result.ActionResult; +import com.viewsh.module.iot.rule.spi.ActionProvider; +import com.viewsh.module.iot.rule.template.TemplateResolver; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * 通知发送 Action(notify)。 + * + *

配置示例: + *

{@code
+ * {
+ *   "channels": ["sms", "email", "in_app", "webhook"],
+ *   "receivers": { "userIds": [1001], "webhookUrl": "https://hook.example.com" },
+ *   "template": {
+ *     "title": "设备 ${meta.deviceName} 告警",
+ *     "body":  "告警:${meta.alarmName},触发值 ${data.temperature}°C"
+ *   }
+ * }
+ * }
+ * + *

4 个通道并发触发,部分失败不阻塞其他通道,最终汇总结果。 + * 评审 C5:title/body 统一走 {@link TemplateResolver} 解析。 + * 评审 B6:@Async 慎用,保持同步线程池以保留 traceId 和 tenant 上下文。 + * + *

第一期 B16(NotifyService)未就绪,各通道以 TODO stub 占位并记录日志; + * B16 就绪后替换 stub 即可。 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class NotifyAction implements ActionProvider { + + public static final String TYPE = "notify"; + + private final TemplateResolver templateResolver; + + /** 通知并发线程池(复用,避免每次 Action 创建) */ + private static final ExecutorService NOTIFY_POOL = + Executors.newFixedThreadPool(4, r -> { + Thread t = new Thread(r, "iot-notify-"); + t.setDaemon(true); + return t; + }); + + @Override + public String getType() { + return TYPE; + } + + @Override + public ActionResult executeAction(RuleContext ctx, JsonNode config) { + try { + JsonNode channelsNode = config.path("channels"); + if (channelsNode.isMissingNode() || !channelsNode.isArray() || channelsNode.isEmpty()) { + return ActionResult.failure("notify: channels 未配置"); + } + + String title = resolveTemplate(config.path("template").path("title").asText(""), ctx); + String body = resolveTemplate(config.path("template").path("body").asText(""), ctx); + JsonNode receivers = config.path("receivers"); + + List channels = new ArrayList<>(); + for (JsonNode ch : channelsNode) { + channels.add(ch.asText()); + } + + List> futures = channels.stream() + .map(channel -> CompletableFuture.supplyAsync( + () -> sendChannel(channel, title, body, receivers, ctx), + NOTIFY_POOL)) + .toList(); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + + List failedChannels = new ArrayList<>(); + for (int i = 0; i < channels.size(); i++) { + ChannelResult cr = futures.get(i).join(); + if (!cr.success()) { + failedChannels.add(channels.get(i) + ":" + cr.error()); + } + } + + if (failedChannels.isEmpty()) { + return ActionResult.successMsg("notify: 全部通道发送成功"); + } else { + // 部分失败:仍返回 SUCCESS,message 记录失败通道(评审 B6) + String msg = "notify: 部分通道失败 " + failedChannels; + log.warn("[NotifyAction] chain={} {}", ctx.getChainId(), msg); + return ActionResult.successMsg(msg); + } + + } catch (Exception e) { + log.warn("[NotifyAction] chain={} 异常: {}", ctx.getChainId(), e.getMessage()); + return ActionResult.failure(e.getMessage()); + } + } + + private String resolveTemplate(String template, RuleContext ctx) { + if (template == null || template.isBlank()) return template; + try { + return String.valueOf(templateResolver.resolve(template, ctx)); + } catch (Exception e) { + log.warn("[NotifyAction] 模板解析失败 template='{}': {}", template, e.getMessage()); + return template; + } + } + + /** + * 单通道发送(第一期 stub,B16 NotifyService 就绪后替换)。 + */ + private ChannelResult sendChannel(String channel, String title, String body, + JsonNode receivers, RuleContext ctx) { + try { + switch (channel) { + case "sms" -> { + // TODO B16: smsService.send(receivers.userIds, title, body) + log.info("[NotifyAction] [stub] sms chain={} title='{}' body='{}'", + ctx.getChainId(), title, body); + } + case "email" -> { + // TODO B16: emailService.send(receivers.userIds, title, body) + log.info("[NotifyAction] [stub] email chain={} title='{}' body='{}'", + ctx.getChainId(), title, body); + } + case "in_app" -> { + // TODO B16: inAppService.send(receivers.userIds, title, body) + log.info("[NotifyAction] [stub] in_app chain={} title='{}' body='{}'", + ctx.getChainId(), title, body); + } + case "webhook" -> { + String webhookUrl = receivers.path("webhookUrl").asText(""); + // TODO B16: webhookService.post(webhookUrl, title, body) + log.info("[NotifyAction] [stub] webhook chain={} url='{}' title='{}' body='{}'", + ctx.getChainId(), webhookUrl, title, body); + } + default -> { + log.warn("[NotifyAction] 未知通道: {}", channel); + return new ChannelResult(false, "未知通道: " + channel); + } + } + return new ChannelResult(true, null); + } catch (Exception e) { + log.warn("[NotifyAction] channel={} 发送失败: {}", channel, e.getMessage()); + return new ChannelResult(false, e.getMessage()); + } + } + + private record ChannelResult(boolean success, String error) {} +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DagExecutor.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DagExecutor.java index c436cd41..9f8df6ea 100644 --- a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DagExecutor.java +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DagExecutor.java @@ -1,6 +1,7 @@ package com.viewsh.module.iot.rule.engine; import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType; +import com.viewsh.module.iot.rule.engine.branch.BranchNode; import com.viewsh.module.iot.rule.engine.exception.RuleChainException; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; @@ -41,6 +42,9 @@ public class DagExecutor { throw new RuleChainException(chain.getId(), null, "规则链缺少 Trigger 入口节点"); } + // Branch 节点需要通过 ctx.metadata 获取 CompiledRuleChain 以执行内联 action + ctx.getMetadata().put(BranchNode.CTX_CHAIN_KEY, chain); + Deque queue = new ArrayDeque<>(); queue.offer(entry); diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchConfiguration.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchConfiguration.java new file mode 100644 index 00000000..da003d2a --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchConfiguration.java @@ -0,0 +1,104 @@ +package com.viewsh.module.iot.rule.engine.branch; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.JsonNode; +import lombok.Data; + +import java.util.Collections; +import java.util.List; + +/** + * Branch 节点配置 POJO(反序列化自 rule_node.configuration JSON)。 + * + *

配置示例: + *

{@code
+ * {
+ *   "branches": [
+ *     {
+ *       "name": "高温",
+ *       "condition": { "type": "expression", "config": { "expression": "${data.temp} > 40" } },
+ *       "actions": ["nodeId_alarm"],
+ *       "executeAnyway": false
+ *     },
+ *     {
+ *       "name": "else",
+ *       "condition": null,
+ *       "actions": ["nodeId_log"]
+ *     }
+ *   ]
+ * }
+ * }
+ * + *

约束(B7 规格): + *

+ */ +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class BranchConfiguration { + + private List branches = Collections.emptyList(); + + /** + * 校验配置合法性:else 分支(condition=null)必须是最后一个。 + * + * @throws IllegalArgumentException 配置非法时抛出 + */ + public void validate() { + if (branches == null || branches.isEmpty()) { + return; + } + for (int i = 0; i < branches.size() - 1; i++) { + if (branches.get(i).getCondition() == null) { + throw new IllegalArgumentException( + "[BranchConfiguration] else 分支(condition=null)必须是最后一个,但在 index=" + i + " 处发现"); + } + } + } + + /** + * 单个分支配置。 + */ + @Data + @JsonIgnoreProperties(ignoreUnknown = true) + public static class BranchItem { + + /** 分支名称(用于日志 / metadata 标记,必填) */ + private String name; + + /** + * 条件配置(null 表示 else 分支)。 + * 格式:{"type": "expression", "config": {...}} + */ + private BranchCondition condition; + + /** + * 命中时需要执行的下游节点 ID 列表。 + * BranchNode 按此列表在 CompiledRuleChain 中查找对应节点并内联执行。 + */ + private List actions = Collections.emptyList(); + + /** + * 是否无论前面是否命中都继续评估此分支(重叠触发)。 + * true = 执行后继续评估后续分支;false(默认)= 命中后跳过后续分支。 + */ + private boolean executeAnyway = false; + } + + /** + * 分支条件配置(内嵌 type + config)。 + */ + @Data + @JsonIgnoreProperties(ignoreUnknown = true) + public static class BranchCondition { + + /** 条件类型,对应 {@link com.viewsh.module.iot.rule.spi.ConditionEvaluator#getType()} */ + private String type; + + /** 条件具体配置(由对应的 ConditionEvaluator 解析) */ + private JsonNode config; + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchExecutor.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchExecutor.java new file mode 100644 index 00000000..be94d1aa --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchExecutor.java @@ -0,0 +1,153 @@ +package com.viewsh.module.iot.rule.engine.branch; + +import com.viewsh.module.iot.rule.condition.ConditionEvaluatorManager; +import com.viewsh.module.iot.rule.engine.RuleContext; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; + +/** + * Branch 节点核心执行语义(executeAnyway if/else-if/else)。 + * + *

执行流程(B7 §3.2 规格): + *

    + *
  1. 按数组顺序遍历 branches
  2. + *
  3. condition=null 视为 else:仅当前面所有分支都未命中时执行
  4. + *
  5. executeAnyway=false(默认):命中后执行 actions,跳过后续分支
  6. + *
  7. executeAnyway=true:命中后执行 actions,继续评估后续分支
  8. + *
  9. 条件评估异常:该分支跳过(短路求值,不当失败),继续下一个
  10. + *
  11. action 执行异常:记录日志后继续执行同 branch 其余 action(异常隔离)
  12. + *
+ * + *

此类不依赖 Spring,便于单元测试(mock ConditionEvaluatorManager 和 ActionExecutor)。 + */ +@Slf4j +@RequiredArgsConstructor +public class BranchExecutor { + + private final ConditionEvaluatorManager conditionEvaluatorManager; + + /** + * 执行分支逻辑。 + * + * @param ctx 规则执行上下文 + * @param configuration branch 节点配置(已解析,已 validate) + * @param actionExecutor 回调:执行指定 nodeId 对应的 action(由 BranchNode 提供真实实现,测试可 mock) + * @return 命中的分支名列表(可能为空) + */ + public List execute(RuleContext ctx, + BranchConfiguration configuration, + ActionExecutor actionExecutor) { + List branches = configuration.getBranches(); + List matchedBranches = new ArrayList<>(); + + boolean anyPreviousMatched = false; + + for (BranchConfiguration.BranchItem branch : branches) { + boolean match = evaluateMatch(ctx, branch, anyPreviousMatched); + + if (match) { + String branchName = branch.getName(); + log.debug("[BranchExecutor] chain={} branch='{}' 命中,executeAnyway={}", + ctx.getChainId(), branchName, branch.isExecuteAnyway()); + + matchedBranches.add(branchName); + executeActions(ctx, branch, actionExecutor); + + if (!branch.isExecuteAnyway()) { + // if/else-if 语义:命中后跳过后续 + log.debug("[BranchExecutor] chain={} branch='{}' executeAnyway=false,终止后续分支评估", + ctx.getChainId(), branchName); + break; + } + // executeAnyway=true:继续评估后续分支 + anyPreviousMatched = true; + } else { + if (branch.isExecuteAnyway()) { + // executeAnyway=true 但条件未命中:跳过,继续后续 + log.debug("[BranchExecutor] chain={} branch='{}' 未命中(executeAnyway=true),继续评估", + ctx.getChainId(), branch.getName()); + } + // 不更新 anyPreviousMatched(未命中的 executeAnyway 分支不算 "previous matched") + } + } + + if (matchedBranches.isEmpty()) { + log.debug("[BranchExecutor] chain={} 无分支命中", ctx.getChainId()); + } + + return matchedBranches; + } + + /** + * 判断某个分支是否命中。 + * + * @param anyPreviousMatched 前面是否有非 executeAnyway 的分支已命中 + */ + private boolean evaluateMatch(RuleContext ctx, + BranchConfiguration.BranchItem branch, + boolean anyPreviousMatched) { + BranchConfiguration.BranchCondition condition = branch.getCondition(); + if (condition == null) { + // else 分支:仅当前面都未命中时执行 + return !anyPreviousMatched; + } + + try { + boolean result = conditionEvaluatorManager.evaluate(condition.getType(), ctx, condition.getConfig()); + log.debug("[BranchExecutor] chain={} branch='{}' 条件评估结果={}", + ctx.getChainId(), branch.getName(), result); + return result; + } catch (Exception e) { + // 短路求值:条件异常 → 跳过该 branch,不当失败 + log.warn("[BranchExecutor] chain={} branch='{}' 条件评估异常,跳过:{}", + ctx.getChainId(), branch.getName(), e.getMessage()); + return false; + } + } + + /** + * 执行分支内所有 actions(action 异常隔离,不影响其他 action)。 + */ + private void executeActions(RuleContext ctx, + BranchConfiguration.BranchItem branch, + ActionExecutor actionExecutor) { + String branchName = branch.getName(); + List actions = branch.getActions(); + if (actions == null || actions.isEmpty()) { + return; + } + + for (String actionNodeId : actions) { + try { + log.debug("[BranchExecutor] chain={} branch='{}' 执行 action nodeId={}", + ctx.getChainId(), branchName, actionNodeId); + actionExecutor.execute(ctx, branchName, actionNodeId); + } catch (Exception e) { + // action 异常隔离:记录日志后继续下一个 action + log.warn("[BranchExecutor] chain={} branch='{}' action nodeId={} 执行异常(隔离):{}", + ctx.getChainId(), branchName, actionNodeId, e.getMessage()); + } + } + } + + /** + * action 执行回调接口(用于解耦 BranchExecutor 与 NodeProvider/DagExecutor)。 + * + *

生产实现由 {@link BranchNode} 提供(通过 CompiledRuleChain + NodeProviderRegistry 执行); + * 测试时可使用 mock 或 lambda 替代。 + */ + @FunctionalInterface + public interface ActionExecutor { + /** + * 执行指定分支内的一个 action 节点。 + * + * @param ctx 规则上下文(已含 branch_name metadata) + * @param branchName 所属分支名(供日志 / metadata 使用) + * @param actionNodeId 节点 ID(来自 BranchConfiguration.BranchItem.actions) + */ + void execute(RuleContext ctx, String branchName, String actionNodeId); + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchNode.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchNode.java new file mode 100644 index 00000000..f0a3d734 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchNode.java @@ -0,0 +1,153 @@ +package com.viewsh.module.iot.rule.engine.branch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.iot.rule.condition.ConditionEvaluatorManager; +import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory; +import com.viewsh.module.iot.rule.engine.CompiledNode; +import com.viewsh.module.iot.rule.engine.CompiledRuleChain; +import com.viewsh.module.iot.rule.engine.NodeProvider; +import com.viewsh.module.iot.rule.engine.NodeProviderRegistry; +import com.viewsh.module.iot.rule.engine.NodeResult; +import com.viewsh.module.iot.rule.engine.RuleContext; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; + +/** + * Branch 类型节点 Provider(category=ACTION, type="branch")。 + * + *

职责: + *

    + *
  1. 将 rule_node.configuration JSON 解析为 {@link BranchConfiguration}
  2. + *
  3. 校验 else 分支(condition=null)是否在最后
  4. + *
  5. 调用 {@link BranchExecutor} 执行分支语义(if/else-if/executeAnyway)
  6. + *
  7. 命中的 branch 的 actions 通过 {@link NodeProviderRegistry} 内联执行
  8. + *
  9. 将命中分支名列表写入 ctx.metadata["branch_names"],供下游节点参考
  10. + *
  11. 返回 {@link NodeResult#success()} 让 DagExecutor 沿 SUCCESS 出边走公共下游
  12. + *
+ * + *

内联执行说明:DAG 中 BranchNode 的 SUCCESS 出边指向 Branch 之后的公共节点; + * Branch 内 actions(互斥子节点)由 BranchNode 自行执行,不依赖 DagExecutor 的出边路由。 + * 因此 Branch actions 对应的子节点在 rule_node 中存在但没有从 BranchNode 发出的 link, + * 或者有 link 但 DagExecutor 会通过 BranchNode 内联执行后跳过(取决于链设计)。 + * + *

CompiledRuleChain 通过 DagExecutor 写入 ctx.metadata["{@value #CTX_CHAIN_KEY}"] 获得, + * 供 BranchNode 查找子节点配置。 + */ +@Slf4j +@Component +public class BranchNode implements NodeProvider { + + /** DagExecutor 在执行前将 CompiledRuleChain 写入 ctx.metadata 的 key */ + public static final String CTX_CHAIN_KEY = "__compiledRuleChain__"; + + public static final String TYPE = "branch"; + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final ConditionEvaluatorManager conditionEvaluatorManager; + private final NodeProviderRegistry nodeProviderRegistry; + private final BranchExecutor branchExecutor; + + public BranchNode(ConditionEvaluatorManager conditionEvaluatorManager, + NodeProviderRegistry nodeProviderRegistry) { + this.conditionEvaluatorManager = conditionEvaluatorManager; + this.nodeProviderRegistry = nodeProviderRegistry; + this.branchExecutor = new BranchExecutor(conditionEvaluatorManager); + } + + @Override + public RuleNodeCategory getCategory() { + return RuleNodeCategory.ACTION; + } + + @Override + public String getType() { + return TYPE; + } + + @Override + public NodeResult execute(RuleContext ctx, String config) { + // 1. 解析配置 + BranchConfiguration configuration; + try { + configuration = MAPPER.readValue(config == null ? "{}" : config, BranchConfiguration.class); + } catch (Exception e) { + log.warn("[BranchNode] chain={} config JSON 解析失败: {}", ctx.getChainId(), e.getMessage()); + return NodeResult.failure("BranchNode config 解析失败: " + e.getMessage(), e); + } + + // 2. 校验(else 分支必须在最后) + try { + configuration.validate(); + } catch (IllegalArgumentException e) { + log.warn("[BranchNode] chain={} 配置校验失败: {}", ctx.getChainId(), e.getMessage()); + return NodeResult.failure(e.getMessage()); + } + + if (configuration.getBranches().isEmpty()) { + log.debug("[BranchNode] chain={} branches 为空,no-op", ctx.getChainId()); + return NodeResult.success(); + } + + // 3. 获取 CompiledRuleChain(用于内联执行 action 节点) + CompiledRuleChain chain = (CompiledRuleChain) ctx.getMetadata().get(CTX_CHAIN_KEY); + + // 4. 构建 ActionExecutor(使用 NodeProviderRegistry 内联执行) + BranchExecutor.ActionExecutor actionExecutor = buildActionExecutor(ctx, chain); + + // 5. 执行分支逻辑 + List matchedBranches = branchExecutor.execute(ctx, configuration, actionExecutor); + + // 6. 将命中分支名写入 metadata,供下游节点参考 + if (!matchedBranches.isEmpty()) { + ctx.getMetadata().put("branch_names", matchedBranches); + ctx.getMetadata().put("branch_name", matchedBranches.get(0)); + } + + return NodeResult.success(Map.of("branch_names", matchedBranches)); + } + + /** + * 构建 ActionExecutor:通过 NodeProviderRegistry 内联执行 action 节点。 + * + *

若 CompiledRuleChain 不可用(chain=null 或节点不存在),则打印 warn 日志跳过。 + */ + private BranchExecutor.ActionExecutor buildActionExecutor(RuleContext ctx, CompiledRuleChain chain) { + return (ruleCtx, branchName, actionNodeId) -> { + if (chain == null) { + log.warn("[BranchNode] chain={} branch='{}' 无法获取 CompiledRuleChain,跳过 action={}", + ruleCtx.getChainId(), branchName, actionNodeId); + return; + } + + CompiledNode actionNode = chain.nodeById(actionNodeId); + if (actionNode == null) { + log.warn("[BranchNode] chain={} branch='{}' action nodeId={} 不存在于链中,跳过", + ruleCtx.getChainId(), branchName, actionNodeId); + return; + } + + try { + NodeProvider provider = nodeProviderRegistry.resolve(actionNode.getCategory(), actionNode.getType()); + // 更新 ctx 的当前节点(与 DagExecutor.fork 行为一致) + ruleCtx.fork(actionNodeId); + // 将 branch_name 写入 metadata,便于 action 内的日志追踪 + ruleCtx.getMetadata().put("branch_name", branchName); + + NodeResult result = provider.execute(ruleCtx, actionNode.getConfiguration()); + if (result != null) { + ruleCtx.recordNodeOutput(actionNodeId, result.getOutputs()); + } + log.debug("[BranchNode] chain={} branch='{}' action={} 执行完成,result={}", + ruleCtx.getChainId(), branchName, actionNodeId, + result != null ? result.getRelationType() : "null"); + } catch (Exception e) { + // 异常已在 BranchExecutor.executeActions 中捕获,此处不再 re-throw + throw new RuntimeException("action=" + actionNodeId + " 执行异常: " + e.getMessage(), e); + } + }; + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/result/ActionResult.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/result/ActionResult.java new file mode 100644 index 00000000..45ff9298 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/result/ActionResult.java @@ -0,0 +1,54 @@ +package com.viewsh.module.iot.rule.result; + +import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType; + +/** + * Action 执行结果。 + * + *

relationType 决定 DAG 走哪条 outgoing link: + *

    + *
  • {@link RuleLinkRelationType#SUCCESS} — Action 执行成功
  • + *
  • {@link RuleLinkRelationType#FAILURE} — Action 执行失败(内部捕获,不上抛)
  • + *
  • {@link RuleLinkRelationType#SKIP} — 静默跳过(如告警已清除,重复清除为 no-op)
  • + *
+ * + *

output 会注入 {@code ctx.nodeOutputs}(key="output"),可供后继节点引用。 + */ +public record ActionResult( + RuleLinkRelationType relationType, + String message, + Object output +) { + + public static ActionResult success() { + return new ActionResult(RuleLinkRelationType.SUCCESS, null, null); + } + + public static ActionResult success(Object output) { + return new ActionResult(RuleLinkRelationType.SUCCESS, null, output); + } + + public static ActionResult successMsg(String message) { + return new ActionResult(RuleLinkRelationType.SUCCESS, message, null); + } + + public static ActionResult success(String message, Object output) { + return new ActionResult(RuleLinkRelationType.SUCCESS, message, output); + } + + public static ActionResult failure(String message) { + return new ActionResult(RuleLinkRelationType.FAILURE, message, null); + } + + public static ActionResult skip(String reason) { + return new ActionResult(RuleLinkRelationType.SKIP, reason, null); + } + + public boolean isSuccess() { + return relationType == RuleLinkRelationType.SUCCESS; + } + + public boolean isFailure() { + return relationType == RuleLinkRelationType.FAILURE; + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/result/RpcCommand.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/result/RpcCommand.java new file mode 100644 index 00000000..89dc0025 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/result/RpcCommand.java @@ -0,0 +1,37 @@ +package com.viewsh.module.iot.rule.result; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 设备 RPC 指令封装(预留,B29 期实现完整持久化)。 + * + *

第一期 DeviceServiceInvokeAction 先调用现有 {@code IotDeviceControlApi.invokeService()}; + * 第二期 B27-B34 引入 B29 后,Action 层改为创建 RpcCommand 记录并异步跟踪执行结果。 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class RpcCommand { + + /** 目标设备 ID */ + private Long deviceId; + + /** 服务标识符(物模型 identifier) */ + private String serviceId; + + /** 入参(JSON 格式) */ + private String inputParams; + + /** 调用超时(ms),默认 30000 */ + private Integer timeoutMs; + + /** 关联的规则链 ID(用于追踪) */ + private Long ruleChainId; + + /** 关联的节点 ID(用于追踪) */ + private String nodeId; +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/spi/ActionProvider.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/spi/ActionProvider.java new file mode 100644 index 00000000..c66653b9 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/spi/ActionProvider.java @@ -0,0 +1,78 @@ +package com.viewsh.module.iot.rule.spi; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory; +import com.viewsh.module.iot.rule.engine.NodeProvider; +import com.viewsh.module.iot.rule.engine.NodeResult; +import com.viewsh.module.iot.rule.engine.RuleContext; +import com.viewsh.module.iot.rule.result.ActionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Action SPI — 规则链动作节点执行接口。 + * + *

实现通过 Spring {@code @Component} + {@link #getType()} 注册,由 + * {@link com.viewsh.module.iot.rule.action.ActionProviderManager} 管理。 + * 同时实现 {@link NodeProvider}(category=ACTION),使其可被 DagExecutor 直接路由。 + * + *

约定(评审 B6): + *

    + *
  • 所有模板变量解析统一走 {@link com.viewsh.module.iot.rule.template.TemplateResolver}
  • + *
  • Action 异常必须捕获并转为 {@link ActionResult#failure(String)},不上抛到链级
  • + *
  • 不使用 {@code @Async},保持同步以保留 traceId 和 tenant 上下文
  • + *
+ */ +public interface ActionProvider extends NodeProvider { + + Logger LOG = LoggerFactory.getLogger(ActionProvider.class); + ObjectMapper CONFIG_MAPPER = new ObjectMapper(); + + @Override + default RuleNodeCategory getCategory() { + return RuleNodeCategory.ACTION; + } + + /** + * 执行 Action(JsonNode config 版本)。 + * + * @param ctx 规则执行上下文 + * @param config 节点 configuration(已解析为 JsonNode) + * @return ActionResult(不允许返回 null) + */ + ActionResult executeAction(RuleContext ctx, JsonNode config); + + /** + * NodeProvider.execute 默认桥接:将 String config 解析为 JsonNode, + * 执行 {@link #executeAction(RuleContext, JsonNode)},并映射为 {@link NodeResult}。 + * + *

config 解析异常或 executeAction 异常,均转为 {@link NodeResult#failure(String)}。 + */ + @Override + default NodeResult execute(RuleContext ctx, String config) { + JsonNode cfg; + try { + cfg = CONFIG_MAPPER.readTree(config == null ? "{}" : config); + } catch (Exception e) { + LOG.warn("[ActionProvider] type={} config JSON 解析失败: {}", getType(), e.getMessage()); + return NodeResult.failure("config JSON 解析失败: " + e.getMessage(), e); + } + try { + ActionResult result = executeAction(ctx, cfg); + if (result == null) { + LOG.warn("[ActionProvider] type={} executeAction 返回 null", getType()); + return NodeResult.failure("executeAction 返回 null"); + } + Map outputs = result.output() != null + ? Map.of("output", result.output()) + : Map.of(); + return NodeResult.of(result.relationType(), outputs); + } catch (Exception e) { + LOG.warn("[ActionProvider] type={} 执行异常(转为 FAILURE): {}", getType(), e.getMessage()); + return NodeResult.failure(e.getMessage(), e); + } + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/action/ActionProviderTest.java b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/action/ActionProviderTest.java new file mode 100644 index 00000000..1177bc73 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/action/ActionProviderTest.java @@ -0,0 +1,286 @@ +package com.viewsh.module.iot.rule.action; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.framework.common.pojo.CommonResult; +import com.viewsh.module.iot.api.alarm.IotAlarmRecordApi; +import com.viewsh.module.iot.api.alarm.dto.IotAlarmTriggerReqDTO; +import com.viewsh.module.iot.api.alarm.dto.IotAlarmClearReqDTO; +import com.viewsh.module.iot.api.device.IotDeviceControlApi; +import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeReqDTO; +import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeRespDTO; +import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; +import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType; +import com.viewsh.module.iot.rule.engine.NodeResult; +import com.viewsh.module.iot.rule.engine.RuleContext; +import com.viewsh.module.iot.rule.result.ActionResult; +import com.viewsh.module.iot.rule.template.TemplateResolver; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.time.Instant; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +/** + * B6 ActionProvider 单元测试:5 个 Action + ActionProvider 接口桥接(NodeProvider)。 + */ +class ActionProviderTest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + private IotAlarmRecordApi alarmRecordApi; + private IotDeviceControlApi deviceControlApi; + private TemplateResolver templateResolver; + + @BeforeEach + void setUp() { + alarmRecordApi = mock(IotAlarmRecordApi.class); + deviceControlApi = mock(IotDeviceControlApi.class); + templateResolver = new TemplateResolver(MAPPER); + } + + // ======================== AlarmTriggerAction ======================== + + @Test + void alarmTrigger_firstTime_callsServiceAndWritesAlarmIdToMeta() throws Exception { + when(alarmRecordApi.triggerAlarm(any())).thenReturn(CommonResult.success(42L)); + + AlarmTriggerAction action = new AlarmTriggerAction(alarmRecordApi, templateResolver); + RuleContext ctx = ctx(1L, 10L); + + JsonNode config = MAPPER.readTree(""" + {"alarmConfigId": 100, "severity": 3, "name": "温度告警"} + """); + ActionResult result = action.executeAction(ctx, config); + + assertTrue(result.isSuccess()); + assertEquals(42L, ctx.getMetadata().get("alarmId")); + ArgumentCaptor cap = ArgumentCaptor.forClass(IotAlarmTriggerReqDTO.class); + verify(alarmRecordApi).triggerAlarm(cap.capture()); + assertEquals(100L, cap.getValue().getAlarmConfigId()); + assertEquals(3, cap.getValue().getSeverity()); + } + + @Test + void alarmTrigger_missingAlarmConfigId_returnsFailure() throws Exception { + AlarmTriggerAction action = new AlarmTriggerAction(alarmRecordApi, templateResolver); + RuleContext ctx = ctx(1L, 10L); + + JsonNode config = MAPPER.readTree(""" + {"severity": 3} + """); + ActionResult result = action.executeAction(ctx, config); + + assertTrue(result.isFailure()); + verifyNoInteractions(alarmRecordApi); + } + + @Test + void alarmTrigger_templateName_resolved() throws Exception { + when(alarmRecordApi.triggerAlarm(any())).thenReturn(CommonResult.success(99L)); + + AlarmTriggerAction action = new AlarmTriggerAction(alarmRecordApi, templateResolver); + RuleContext ctx = ctx(1L, 10L); + ctx.getMetadata().put("deviceName", "传感器A"); + + JsonNode config = MAPPER.readTree(""" + {"alarmConfigId": 1, "severity": 2, "name": "设备 ${meta.deviceName} 过热"} + """); + action.executeAction(ctx, config); + + ArgumentCaptor cap = ArgumentCaptor.forClass(IotAlarmTriggerReqDTO.class); + verify(alarmRecordApi).triggerAlarm(cap.capture()); + assertEquals("设备 传感器A 过热", cap.getValue().getAlarmName()); + } + + // ======================== AlarmClearAction ======================== + + @Test + void alarmClear_withAlarmId_callsService() throws Exception { + when(alarmRecordApi.clearAlarm(any())).thenReturn(CommonResult.success(true)); + + AlarmClearAction action = new AlarmClearAction(alarmRecordApi); + RuleContext ctx = ctx(1L, 10L); + + JsonNode config = MAPPER.readTree(""" + {"alarmId": 55, "operator": "rule-engine"} + """); + ActionResult result = action.executeAction(ctx, config); + + assertTrue(result.isSuccess()); + ArgumentCaptor cap = ArgumentCaptor.forClass(IotAlarmClearReqDTO.class); + verify(alarmRecordApi).clearAlarm(cap.capture()); + assertEquals(55L, cap.getValue().getAlarmId()); + } + + @Test + void alarmClear_alarmIdFromMeta_resolvedCorrectly() throws Exception { + when(alarmRecordApi.clearAlarm(any())).thenReturn(CommonResult.success(true)); + + AlarmClearAction action = new AlarmClearAction(alarmRecordApi); + RuleContext ctx = ctx(1L, 10L); + ctx.getMetadata().put("alarmId", 77L); + + JsonNode config = MAPPER.readTree(""" + {"alarmId": "${meta.alarmId}"} + """); + ActionResult result = action.executeAction(ctx, config); + + assertTrue(result.isSuccess()); + ArgumentCaptor cap = ArgumentCaptor.forClass(IotAlarmClearReqDTO.class); + verify(alarmRecordApi).clearAlarm(cap.capture()); + assertEquals(77L, cap.getValue().getAlarmId()); + } + + @Test + void alarmClear_noAlarmId_returnsFailure() throws Exception { + AlarmClearAction action = new AlarmClearAction(alarmRecordApi); + RuleContext ctx = ctx(1L, 10L); + + JsonNode config = MAPPER.readTree("{}"); + ActionResult result = action.executeAction(ctx, config); + + assertTrue(result.isFailure()); + verifyNoInteractions(alarmRecordApi); + } + + // ======================== DeviceServiceInvokeAction ======================== + + @Test + void deviceServiceInvoke_success_returnsSuccess() throws Exception { + IotDeviceServiceInvokeRespDTO resp = IotDeviceServiceInvokeRespDTO.builder() + .success(true).build(); + when(deviceControlApi.invokeService(any())).thenReturn(CommonResult.success(resp)); + + DeviceServiceInvokeAction action = new DeviceServiceInvokeAction(deviceControlApi); + RuleContext ctx = ctx(1L, 10L); + + JsonNode config = MAPPER.readTree(""" + {"identifier": "playVoice", "params": {"text": "告警!"}} + """); + ActionResult result = action.executeAction(ctx, config); + + assertTrue(result.isSuccess()); + ArgumentCaptor cap = ArgumentCaptor.forClass(IotDeviceServiceInvokeReqDTO.class); + verify(deviceControlApi).invokeService(cap.capture()); + assertEquals(10L, cap.getValue().getDeviceId()); + assertEquals("playVoice", cap.getValue().getIdentifier()); + } + + @Test + void deviceServiceInvoke_missingIdentifier_returnsFailure() throws Exception { + DeviceServiceInvokeAction action = new DeviceServiceInvokeAction(deviceControlApi); + RuleContext ctx = ctx(1L, 10L); + + JsonNode config = MAPPER.readTree("{}"); + ActionResult result = action.executeAction(ctx, config); + + assertTrue(result.isFailure()); + verifyNoInteractions(deviceControlApi); + } + + // ======================== DevicePropertySetAction ======================== + + @Test + void devicePropertySet_withProperties_resolveTemplateAndReturnSuccess() throws Exception { + DevicePropertySetAction action = new DevicePropertySetAction(templateResolver); + RuleContext ctx = ctx(1L, 10L); + IotDeviceMessage msg = new IotDeviceMessage(); + ctx.setMessage(msg); + + JsonNode config = MAPPER.readTree(""" + {"properties": {"mode": "auto", "targetTemp": "28"}} + """); + ActionResult result = action.executeAction(ctx, config); + + assertTrue(result.isSuccess()); + assertNotNull(result.message()); + assertTrue(result.message().contains("2")); + } + + @Test + void devicePropertySet_missingProperties_returnsFailure() throws Exception { + DevicePropertySetAction action = new DevicePropertySetAction(templateResolver); + RuleContext ctx = ctx(1L, 10L); + + JsonNode config = MAPPER.readTree("{}"); + ActionResult result = action.executeAction(ctx, config); + + assertTrue(result.isFailure()); + } + + // ======================== NotifyAction ======================== + + @Test + void notify_allChannelsOk_returnsSuccess() throws Exception { + NotifyAction action = new NotifyAction(templateResolver); + RuleContext ctx = ctx(1L, 10L); + + JsonNode config = MAPPER.readTree(""" + { + "channels": ["sms","email","in_app","webhook"], + "receivers": {"webhookUrl": "https://example.com"}, + "template": {"title": "告警", "body": "消息体"} + } + """); + ActionResult result = action.executeAction(ctx, config); + + assertTrue(result.isSuccess()); + } + + @Test + void notify_noChannels_returnsFailure() throws Exception { + NotifyAction action = new NotifyAction(templateResolver); + RuleContext ctx = ctx(1L, 10L); + + JsonNode config = MAPPER.readTree(""" + {"channels": [], "template": {"title": "T", "body": "B"}} + """); + ActionResult result = action.executeAction(ctx, config); + + assertTrue(result.isFailure()); + } + + // ======================== NodeProvider bridge ======================== + + @Test + void actionProvider_executeViaNodeProviderBridge_convertsToNodeResult() throws Exception { + when(alarmRecordApi.triggerAlarm(any())).thenReturn(CommonResult.success(99L)); + AlarmTriggerAction action = new AlarmTriggerAction(alarmRecordApi, templateResolver); + RuleContext ctx = ctx(1L, 10L); + + String configJson = """ + {"alarmConfigId": 1, "severity": 2} + """; + NodeResult result = action.execute(ctx, configJson); + + assertEquals(RuleLinkRelationType.SUCCESS, result.getRelationType()); + } + + @Test + void actionProvider_badConfigJson_returnsFailureNodeResult() throws Exception { + AlarmTriggerAction action = new AlarmTriggerAction(alarmRecordApi, templateResolver); + RuleContext ctx = ctx(1L, 10L); + + NodeResult result = action.execute(ctx, "{invalid json}"); + + assertEquals(RuleLinkRelationType.FAILURE, result.getRelationType()); + verifyNoInteractions(alarmRecordApi); + } + + // ======================== Helper ======================== + + private RuleContext ctx(Long chainId, Long deviceId) { + RuleContext ctx = new RuleContext(); + ctx.setChainId(chainId); + ctx.setDeviceId(deviceId); + ctx.setProductId(100L); + ctx.setTenantId(1L); + ctx.setSubsystemId(1L); + ctx.setStartedAt(Instant.now()); + return ctx; + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/branch/BranchExecutorTest.java b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/branch/BranchExecutorTest.java new file mode 100644 index 00000000..2c2c8701 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/branch/BranchExecutorTest.java @@ -0,0 +1,269 @@ +package com.viewsh.module.iot.rule.engine.branch; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.iot.rule.condition.ConditionEvaluatorManager; +import com.viewsh.module.iot.rule.engine.RuleContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +/** + * {@link BranchExecutor} 单元测试(7 个用例,B7 §6)。 + * + *

使用 Mockito mock ConditionEvaluatorManager,不启动 Spring 容器。 + * ActionExecutor 通过 lambda 记录执行了哪些 nodeId,供断言使用。 + */ +class BranchExecutorTest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private ConditionEvaluatorManager conditionEvaluatorManager; + private BranchExecutor branchExecutor; + private RuleContext ctx; + + /** 记录 actionExecutor 被调用的 (branchName, actionNodeId) 对 */ + private List executedActions; + + @BeforeEach + void setUp() { + conditionEvaluatorManager = mock(ConditionEvaluatorManager.class); + branchExecutor = new BranchExecutor(conditionEvaluatorManager); + + ctx = new RuleContext(); + ctx.setChainId(1L); + ctx.setTraceId("test-trace"); + + executedActions = new ArrayList<>(); + } + + /** + * 用例 1: if_match — [A] A=true → A 执行 + */ + @Test + void testIfMatch_singleBranchTrue() throws Exception { + when(conditionEvaluatorManager.evaluate(eq("expression"), any(), any())).thenReturn(true); + + BranchConfiguration config = buildConfig( + branch("A", "expression", false, "nodeA") + ); + + List matched = branchExecutor.execute(ctx, config, recordingExecutor()); + + assertEquals(List.of("A"), matched); + assertEquals(List.of("nodeA"), executedActions); + } + + /** + * 用例 2: if_else (A=true) — [A, null] A=true → A 执行,else 跳过 + */ + @Test + void testIfElse_firstBranchTrue_elseSkipped() throws Exception { + when(conditionEvaluatorManager.evaluate(eq("expression"), any(), any())).thenReturn(true); + + BranchConfiguration config = buildConfig( + branch("A", "expression", false, "nodeA"), + elseBranch("else", "nodeElse") + ); + + List matched = branchExecutor.execute(ctx, config, recordingExecutor()); + + assertEquals(List.of("A"), matched); + assertEquals(List.of("nodeA"), executedActions); + assertFalse(executedActions.contains("nodeElse"), "else 分支不应执行"); + } + + /** + * 用例 3: if_else (A=false) — [A, null] A=false → else 执行 + */ + @Test + void testIfElse_firstBranchFalse_elseExecuted() throws Exception { + when(conditionEvaluatorManager.evaluate(eq("expression"), any(), any())).thenReturn(false); + + BranchConfiguration config = buildConfig( + branch("A", "expression", false, "nodeA"), + elseBranch("else", "nodeElse") + ); + + List matched = branchExecutor.execute(ctx, config, recordingExecutor()); + + assertEquals(List.of("else"), matched); + assertFalse(executedActions.contains("nodeA"), "A 分支不应执行"); + assertEquals(List.of("nodeElse"), executedActions); + } + + /** + * 用例 4: else_if — [A, B] A=false, B=true → B 执行 + */ + @Test + void testElseIf_firstFalseSecondTrue() throws Exception { + // A=false, B=true + when(conditionEvaluatorManager.evaluate(eq("exprA"), any(), any())).thenReturn(false); + when(conditionEvaluatorManager.evaluate(eq("exprB"), any(), any())).thenReturn(true); + + BranchConfiguration config = buildConfigWithTypes( + branchWithType("A", "exprA", false, "nodeA"), + branchWithType("B", "exprB", false, "nodeB") + ); + + List matched = branchExecutor.execute(ctx, config, recordingExecutor()); + + assertEquals(List.of("B"), matched); + assertFalse(executedActions.contains("nodeA")); + assertEquals(List.of("nodeB"), executedActions); + } + + /** + * 用例 5: executeAnyway=true — [A(executeAnyway=true), B] A=true, B=true → A+B 都执行 + */ + @Test + void testExecuteAnyway_true_bothExecuted() throws Exception { + // A=true (executeAnyway=true), B=true + when(conditionEvaluatorManager.evaluate(eq("exprA"), any(), any())).thenReturn(true); + when(conditionEvaluatorManager.evaluate(eq("exprB"), any(), any())).thenReturn(true); + + BranchConfiguration config = buildConfigWithTypes( + branchWithType("A", "exprA", true, "nodeA"), // executeAnyway=true + branchWithType("B", "exprB", false, "nodeB") + ); + + List matched = branchExecutor.execute(ctx, config, recordingExecutor()); + + assertEquals(List.of("A", "B"), matched); + assertTrue(executedActions.contains("nodeA"), "A 分支应执行"); + assertTrue(executedActions.contains("nodeB"), "B 分支应执行"); + } + + /** + * 用例 6: executeAnyway=false — [A, B] A=true → 只 A 执行(B 跳过) + */ + @Test + void testExecuteAnyway_false_onlyFirstExecuted() throws Exception { + when(conditionEvaluatorManager.evaluate(eq("exprA"), any(), any())).thenReturn(true); + when(conditionEvaluatorManager.evaluate(eq("exprB"), any(), any())).thenReturn(true); + + BranchConfiguration config = buildConfigWithTypes( + branchWithType("A", "exprA", false, "nodeA"), // executeAnyway=false(默认) + branchWithType("B", "exprB", false, "nodeB") + ); + + List matched = branchExecutor.execute(ctx, config, recordingExecutor()); + + assertEquals(List.of("A"), matched); + assertTrue(executedActions.contains("nodeA"), "A 分支应执行"); + assertFalse(executedActions.contains("nodeB"), "B 分支不应执行(已被 A 命中后跳过)"); + } + + /** + * 用例 7: all_no_match — [A, B] A=false, B=false → 都不执行(无 else) + */ + @Test + void testAllNoMatch_noExecution() throws Exception { + when(conditionEvaluatorManager.evaluate(eq("exprA"), any(), any())).thenReturn(false); + when(conditionEvaluatorManager.evaluate(eq("exprB"), any(), any())).thenReturn(false); + + BranchConfiguration config = buildConfigWithTypes( + branchWithType("A", "exprA", false, "nodeA"), + branchWithType("B", "exprB", false, "nodeB") + ); + + List matched = branchExecutor.execute(ctx, config, recordingExecutor()); + + assertTrue(matched.isEmpty(), "无分支命中"); + assertTrue(executedActions.isEmpty(), "无 action 执行"); + } + + /** + * 额外:条件评估异常 → 跳过该分支(短路求值,不当失败),继续下一个 + */ + @Test + void testConditionException_branchSkipped() throws Exception { + when(conditionEvaluatorManager.evaluate(eq("exprA"), any(), any())) + .thenThrow(new RuntimeException("evaluator error")); + when(conditionEvaluatorManager.evaluate(eq("exprB"), any(), any())).thenReturn(true); + + BranchConfiguration config = buildConfigWithTypes( + branchWithType("A", "exprA", false, "nodeA"), + branchWithType("B", "exprB", false, "nodeB") + ); + + // 不抛异常 + List matched = assertDoesNotThrow( + () -> branchExecutor.execute(ctx, config, recordingExecutor())); + + assertFalse(executedActions.contains("nodeA"), "A 条件异常,不执行"); + assertTrue(executedActions.contains("nodeB"), "B 条件正常,执行"); + assertEquals(List.of("B"), matched); + } + + /** + * 额外:else 分支不在最后 → validate() 抛 IllegalArgumentException + */ + @Test + void testElseNotLast_validationFails() { + BranchConfiguration config = buildConfig( + elseBranch("else", "nodeElse"), // else 不在最后 + branch("A", "expression", false, "nodeA") + ); + + assertThrows(IllegalArgumentException.class, config::validate, + "else 分支不在最后时,validate() 应抛出异常"); + } + + // --- helpers --- + + private BranchExecutor.ActionExecutor recordingExecutor() { + return (ctx, branchName, actionNodeId) -> executedActions.add(actionNodeId); + } + + private BranchConfiguration buildConfig(BranchConfiguration.BranchItem... items) { + BranchConfiguration config = new BranchConfiguration(); + config.setBranches(Arrays.asList(items)); + return config; + } + + private BranchConfiguration buildConfigWithTypes(BranchConfiguration.BranchItem... items) { + return buildConfig(items); + } + + /** 带 condition 的分支(condition.type = "expression") */ + private BranchConfiguration.BranchItem branch(String name, String conditionType, + boolean executeAnyway, String... actionIds) { + return branchWithType(name, conditionType, executeAnyway, actionIds); + } + + /** 带 condition 的分支(condition.type 可自定义) */ + private BranchConfiguration.BranchItem branchWithType(String name, String conditionType, + boolean executeAnyway, String... actionIds) { + BranchConfiguration.BranchItem item = new BranchConfiguration.BranchItem(); + item.setName(name); + item.setExecuteAnyway(executeAnyway); + item.setActions(Arrays.asList(actionIds)); + + BranchConfiguration.BranchCondition condition = new BranchConfiguration.BranchCondition(); + condition.setType(conditionType); + condition.setConfig(MAPPER.createObjectNode().put("expression", "dummy")); + item.setCondition(condition); + + return item; + } + + /** else 分支(condition=null) */ + private BranchConfiguration.BranchItem elseBranch(String name, String... actionIds) { + BranchConfiguration.BranchItem item = new BranchConfiguration.BranchItem(); + item.setName(name); + item.setCondition(null); + item.setActions(Arrays.asList(actionIds)); + item.setExecuteAnyway(false); + return item; + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/alarm/IotAlarmRecordApiImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/alarm/IotAlarmRecordApiImpl.java new file mode 100644 index 00000000..45f3a482 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/alarm/IotAlarmRecordApiImpl.java @@ -0,0 +1,62 @@ +package com.viewsh.module.iot.api.alarm; + +import com.viewsh.framework.common.pojo.CommonResult; +import com.viewsh.module.iot.api.alarm.dto.IotAlarmClearReqDTO; +import com.viewsh.module.iot.api.alarm.dto.IotAlarmTriggerReqDTO; +import com.viewsh.module.iot.service.alarm.IotAlarmRecordService; +import com.viewsh.module.iot.service.alarm.dto.AlarmStateTransitionRequest; +import com.viewsh.module.iot.service.alarm.dto.AlarmTriggerRequest; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Primary; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import static com.viewsh.framework.common.pojo.CommonResult.success; + +/** + * IoT 告警记录 API 实现(供 rule 模块 FeignClient 调用) + * + * @author B6 + */ +@RestController +@Validated +@Primary +@Slf4j +public class IotAlarmRecordApiImpl implements IotAlarmRecordApi { + + @Resource + private IotAlarmRecordService alarmRecordService; + + @Override + @PostMapping(PREFIX + "/trigger") + public CommonResult triggerAlarm(@RequestBody IotAlarmTriggerReqDTO reqDTO) { + AlarmTriggerRequest req = AlarmTriggerRequest.builder() + .deviceId(reqDTO.getDeviceId()) + .alarmConfigId(reqDTO.getAlarmConfigId()) + .severity(reqDTO.getSeverity()) + .alarmName(reqDTO.getAlarmName()) + .productId(reqDTO.getProductId()) + .subsystemId(reqDTO.getSubsystemId()) + .ruleChainId(reqDTO.getRuleChainId()) + .sceneRuleId(reqDTO.getSceneRuleId()) + .details(reqDTO.getDetails()) + .build(); + Long alarmId = alarmRecordService.triggerAlarm(req); + return success(alarmId); + } + + @Override + @PostMapping(PREFIX + "/clear") + public CommonResult clearAlarm(@RequestBody IotAlarmClearReqDTO reqDTO) { + AlarmStateTransitionRequest req = AlarmStateTransitionRequest.builder() + .alarmId(reqDTO.getAlarmId()) + .operator(reqDTO.getOperator()) + .remark(reqDTO.getRemark()) + .build(); + alarmRecordService.clearAlarm(req); + return success(true); + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/DataRuleMigrationController.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/DataRuleMigrationController.java new file mode 100644 index 00000000..68c43ad2 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/DataRuleMigrationController.java @@ -0,0 +1,62 @@ +package com.viewsh.module.iot.migration; + +import com.viewsh.framework.common.pojo.CommonResult; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.annotation.Resource; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + +import static com.viewsh.framework.common.pojo.CommonResult.success; + +/** + * B18 — DataRule 迁移 REST API + * + *

3 个端点(与 B17 对称): + *

    + *
  • POST /iot/migration/data-rule/dry-run — 预览迁移结果
  • + *
  • POST /iot/migration/data-rule/execute — 执行迁移(幂等)
  • + *
  • GET /iot/migration/data-rule/mapping — 查询已迁移映射关系
  • + *
+ */ +@Tag(name = "管理后台 - IoT DataRule 迁移工具(B18)") +@RestController +@RequestMapping("/iot/migration/data-rule") +@Validated +public class DataRuleMigrationController { + + @Resource + private DataRuleMigrator datRuleMigrator; + + @PostMapping("/dry-run") + @Operation(summary = "预览 DataRule 迁移结果(不写库)", + description = "返回每条 v1 DataRule 将被转换成几个 v2 RuleChain 以及 chain 名称列表") + @PreAuthorize("@ss.hasPermission('iot:migration:data-rule:dry-run')") + public CommonResult> dryRun() { + return success(datRuleMigrator.dryRun()); + } + + @PostMapping("/execute") + @Operation(summary = "执行 DataRule 迁移(幂等)", + description = "将 v1 iot_data_rule + iot_data_sink 迁移为 v2 DAG RuleChain;已迁移的会被跳过") + @PreAuthorize("@ss.hasPermission('iot:migration:data-rule:execute')") + public CommonResult execute( + @RequestParam(value = "migrator", defaultValue = "system") + @Parameter(description = "操作人标识(审计用)", example = "admin") + String migrator) { + return success(datRuleMigrator.execute(migrator)); + } + + @GetMapping("/mapping") + @Operation(summary = "查询已迁移的映射关系", + description = "查询 iot_data_rule_migration 表中的所有映射记录") + @PreAuthorize("@ss.hasPermission('iot:migration:data-rule:mapping')") + public CommonResult> queryMapping() { + return success(datRuleMigrator.queryMappings()); + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/DataRuleMigrator.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/DataRuleMigrator.java new file mode 100644 index 00000000..34290d16 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/DataRuleMigrator.java @@ -0,0 +1,265 @@ +package com.viewsh.module.iot.migration; + +import com.viewsh.module.iot.dal.dataobject.rule.IotDataRuleDO; +import com.viewsh.module.iot.dal.dataobject.rule.IotDataSinkDO; +import com.viewsh.module.iot.dal.mysql.rule.IotDataRuleMapper; +import com.viewsh.module.iot.dal.mysql.rule.IotDataSinkMapper; +import com.viewsh.module.iot.migration.mapping.DataRuleToChainMapper; +import com.viewsh.module.iot.rule.controller.admin.vo.IotRuleChainSaveReqVO; +import com.viewsh.module.iot.rule.service.IotRuleChainService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.dao.DuplicateKeyException; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.*; +import java.util.stream.Collectors; + +/** + * B18 — DataRule + DataSink → v2 DAG RuleChain 迁移主服务 + * + *

策略: + *

    + *
  1. 读取所有 v1 data_rule
  2. + *
  3. 通过 {@link DataRuleToChainMapper} 转换为 v2 chain 请求
  4. + *
  5. 调用 {@link IotRuleChainService#createRuleChain} 写入 v2 chain
  6. + *
  7. 向映射表 {@code iot_data_rule_migration} 写入记录(幂等:UK 冲突时跳过)
  8. + *
+ * + *

幂等:映射表唯一键 {@code uk_old_source (old_rule_id, source_index, tenant_id)}, + * 重复执行时 INSERT IGNORE 跳过已迁移记录。 + * + *

映射表 DDL(参考): + *

{@code
+ * CREATE TABLE iot_data_rule_migration (
+ *     id          BIGINT PRIMARY KEY AUTO_INCREMENT,
+ *     old_rule_id BIGINT NOT NULL,
+ *     new_chain_id BIGINT NOT NULL,
+ *     source_index INT DEFAULT 0,
+ *     migrated_at  DATETIME DEFAULT CURRENT_TIMESTAMP,
+ *     migrator     VARCHAR(64),
+ *     tenant_id    BIGINT NOT NULL,
+ *     UNIQUE KEY uk_old_source (old_rule_id, source_index, tenant_id)
+ * ) COMMENT='DataRule 迁移映射';
+ * }
+ */ +@Service +@Slf4j +@RequiredArgsConstructor +public class DataRuleMigrator { + + private final IotDataRuleMapper dataRuleMapper; + private final IotDataSinkMapper dataSinkMapper; + private final DataRuleToChainMapper chainMapper; + private final IotRuleChainService ruleChainService; + private final JdbcTemplate jdbcTemplate; + + // ------------------------------------------------------------------------- + // Public API + // ------------------------------------------------------------------------- + + /** + * 预览(dry-run):不实际写库,仅返回每条 v1 rule 会生成几个 v2 chain + * + * @return 预览结果列表 + */ + public List dryRun() { + List rules = dataRuleMapper.selectList(null); + if (rules.isEmpty()) { + return Collections.emptyList(); + } + + // 批量加载所有 sink + Map sinkMap = loadAllSinks(); + + List results = new ArrayList<>(); + for (IotDataRuleDO rule : rules) { + List sinks = resolveSinks(rule, sinkMap); + List candidates = chainMapper.toChainCandidates(rule, sinks); + results.add(new DryRunResult( + rule.getId(), + rule.getName(), + candidates.size(), + candidates.stream() + .map(c -> c.saveReqVO().getName()) + .collect(Collectors.toList()) + )); + } + return results; + } + + /** + * 执行迁移(幂等):已迁移的 rule+sourceIndex 会被跳过 + * + * @param migrator 操作人标识(用于审计) + * @return 执行结果汇总 + */ + @Transactional(rollbackFor = Exception.class) + public ExecuteResult execute(String migrator) { + List rules = dataRuleMapper.selectList(null); + if (rules.isEmpty()) { + return new ExecuteResult(0, 0, 0, Collections.emptyList()); + } + + Map sinkMap = loadAllSinks(); + + int totalRules = rules.size(); + int migratedCount = 0; + int skippedCount = 0; + List errors = new ArrayList<>(); + + for (IotDataRuleDO rule : rules) { + try { + List sinks = resolveSinks(rule, sinkMap); + List candidates = chainMapper.toChainCandidates(rule, sinks); + + for (DataRuleToChainMapper.ChainCandidate candidate : candidates) { + boolean inserted = persistCandidate(rule.getId(), candidate, migrator); + if (inserted) { + migratedCount++; + } else { + skippedCount++; + } + } + } catch (Exception ex) { + log.error("[B18] 迁移 DataRule(id={}) 失败", rule.getId(), ex); + errors.add("rule=" + rule.getId() + ": " + ex.getMessage()); + } + } + + return new ExecuteResult(totalRules, migratedCount, skippedCount, errors); + } + + /** + * 查询已迁移的映射关系 + * + * @return 映射记录列表 + */ + public List queryMappings() { + String sql = "SELECT id, old_rule_id, new_chain_id, source_index, migrated_at, migrator, tenant_id " + + "FROM iot_data_rule_migration ORDER BY old_rule_id, source_index"; + return jdbcTemplate.query(sql, (rs, rowNum) -> new MappingRecord( + rs.getLong("id"), + rs.getLong("old_rule_id"), + rs.getLong("new_chain_id"), + rs.getInt("source_index"), + rs.getObject("migrated_at", LocalDateTime.class), + rs.getString("migrator"), + rs.getLong("tenant_id") + )); + } + + // ------------------------------------------------------------------------- + // Private helpers + // ------------------------------------------------------------------------- + + /** + * 尝试持久化单个 ChainCandidate: + *
    + *
  1. 调用 createRuleChain 写入 v2 chain
  2. + *
  3. INSERT IGNORE 写映射表
  4. + *
+ * + * @return true=新增迁移;false=已存在(幂等跳过) + */ + private boolean persistCandidate(Long oldRuleId, + DataRuleToChainMapper.ChainCandidate candidate, + String migrator) { + // 先检查映射表是否已存在(幂等判断) + Integer existing = jdbcTemplate.queryForObject( + "SELECT COUNT(1) FROM iot_data_rule_migration WHERE old_rule_id = ? AND source_index = ?", + Integer.class, + oldRuleId, candidate.sourceIndex()); + if (existing != null && existing > 0) { + log.info("[B18] DataRule(id={}) sourceIndex={} 已迁移,跳过", oldRuleId, candidate.sourceIndex()); + return false; + } + + // 创建 v2 chain(使用空 links 列表,节点本身携带 DAG 语义信息) + IotRuleChainSaveReqVO req = candidate.saveReqVO(); + if (req.getLinks() == null) { + req.setLinks(Collections.emptyList()); + } + Long newChainId = ruleChainService.createRuleChain(req); + + // 写映射表 + try { + jdbcTemplate.update( + "INSERT INTO iot_data_rule_migration (old_rule_id, new_chain_id, source_index, migrator, tenant_id) " + + "VALUES (?, ?, ?, ?, ?)", + oldRuleId, newChainId, candidate.sourceIndex(), migrator, 0L /* tenantId 由多租户框架注入,此处占位 */ + ); + } catch (DuplicateKeyException e) { + // 并发场景下极少出现,直接忽略 + log.warn("[B18] 映射表 UK 冲突,DataRule(id={}) sourceIndex={},忽略", oldRuleId, candidate.sourceIndex()); + } + + log.info("[B18] DataRule(id={}) sourceIndex={} → RuleChain(id={})", oldRuleId, candidate.sourceIndex(), newChainId); + return true; + } + + /** 批量加载所有 Sink,构成 id→DO 索引 */ + private Map loadAllSinks() { + List allSinks = dataSinkMapper.selectList(null); + Map map = new HashMap<>(); + for (IotDataSinkDO sink : allSinks) { + map.put(sink.getId(), sink); + } + return map; + } + + /** 根据 rule.sinkIds 从 map 中取出对应 Sink 列表 */ + private List resolveSinks(IotDataRuleDO rule, Map sinkMap) { + if (rule.getSinkIds() == null || rule.getSinkIds().isEmpty()) { + return Collections.emptyList(); + } + return rule.getSinkIds().stream() + .map(sinkMap::get) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + // ------------------------------------------------------------------------- + // Result DTOs + // ------------------------------------------------------------------------- + + /** + * dry-run 结果:每条 v1 rule 会生成几个 v2 chain + */ + public record DryRunResult( + Long oldRuleId, + String oldRuleName, + int willCreateChainCount, + List chainNames + ) { + } + + /** + * 执行结果汇总 + */ + public record ExecuteResult( + int totalRules, + int migratedCount, + int skippedCount, + List errors + ) { + } + + /** + * 映射表记录 DTO + */ + public record MappingRecord( + Long id, + Long oldRuleId, + Long newChainId, + Integer sourceIndex, + LocalDateTime migratedAt, + String migrator, + Long tenantId + ) { + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/mapping/DataRuleToChainMapper.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/mapping/DataRuleToChainMapper.java new file mode 100644 index 00000000..47ac8b45 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/mapping/DataRuleToChainMapper.java @@ -0,0 +1,314 @@ +package com.viewsh.module.iot.migration.mapping; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.iot.dal.dataobject.rule.IotDataRuleDO; +import com.viewsh.module.iot.dal.dataobject.rule.IotDataSinkDO; +import com.viewsh.module.iot.dal.dataobject.rule.config.*; +import com.viewsh.module.iot.enums.rule.IotDataSinkTypeEnum; +import com.viewsh.module.iot.rule.controller.admin.vo.IotRuleChainSaveReqVO; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.*; +import java.util.stream.Collectors; + +/** + * v1 DataRule + DataSink → v2 DAG RuleChain 映射器 + * + *

映射规则(评审 B7): + *

    + *
  • http → http_push
  • + *
  • rocketmq → mq_push (provider=rocketmq)
  • + *
  • kafka → mq_push (provider=kafka)
  • + *
  • rabbitmq → mq_push (provider=rabbitmq)
  • + *
  • redis → redis_push
  • + *
  • tcp → tcp_push
  • + *
+ * + *

多 sourceConfig 处理策略(评审 A5): + *

    + *
  • 若所有 sourceConfig 的 productId 相同且 method 相同 → 合并为 1 chain(identifiers 数组合并)
  • + *
  • 否则 → 拆分为多个 chain,名称加 "-source{n}"
  • + *
+ */ +@Component +@Slf4j +@RequiredArgsConstructor +public class DataRuleToChainMapper { + + private final ObjectMapper objectMapper; + + /** + * 将 1 个 v1 DataRule + 对应 Sinks 转换为 1 或 N 个 v2 RuleChain 请求 VO + * + * @param rule v1 数据流转规则 + * @param sinks v1 数据目的列表(与 rule.sinkIds 对应) + * @return 转换结果列表,每个元素对应一个 chain(含 sourceIndex) + */ + public List toChainCandidates(IotDataRuleDO rule, List sinks) { + List sources = rule.getSourceConfigs(); + if (sources == null || sources.isEmpty()) { + log.warn("[B18] DataRule(id={}) 无 sourceConfigs,跳过", rule.getId()); + return Collections.emptyList(); + } + + // 尝试合并策略:所有 source 的 productId 和 method 相同 + boolean mergeable = canMerge(sources); + if (mergeable) { + // 合并为单 chain + IotRuleChainSaveReqVO req = buildChainReq(rule, sources, sinks, null); + return List.of(new ChainCandidate(0, req)); + } + + // 降级:每个 source 单独生成一个 chain + List candidates = new ArrayList<>(); + for (int i = 0; i < sources.size(); i++) { + IotDataRuleDO.SourceConfig src = sources.get(i); + IotRuleChainSaveReqVO req = buildChainReq(rule, List.of(src), sinks, i); + candidates.add(new ChainCandidate(i, req)); + } + return candidates; + } + + /** + * 判断多个 sourceConfig 是否可以合并(productId 相同 + method 相同) + */ + public boolean canMerge(List sources) { + if (sources.size() <= 1) { + return true; + } + Long firstProductId = sources.get(0).getProductId(); + String firstMethod = sources.get(0).getMethod(); + return sources.stream().allMatch(s -> + Objects.equals(s.getProductId(), firstProductId) + && Objects.equals(s.getMethod(), firstMethod)); + } + + /** + * 构建单个 RuleChain 请求 VO + * + *

节点顺序约定(匹配 IotRuleChainServiceImpl.validateNoCycle 的临时 ID 规则): + *

    + *
  • index 0 → trigger 节点,临时 key = -1
  • + *
  • index 1..N → action 节点,临时 key = -(2), -(3), ...
  • + *
+ * links 的 sourceNodeId/targetNodeId 使用上述临时 key(负数)。 + * + * @param rule v1 规则 + * @param sources 用于本 chain 的 sourceConfigs(1 个或合并多个) + * @param sinks v1 sinks + * @param sourceIndex 若为拆分模式,传序号(0-based);合并模式传 null + */ + private IotRuleChainSaveReqVO buildChainReq( + IotDataRuleDO rule, + List sources, + List sinks, + Integer sourceIndex) { + + String chainName = sourceIndex == null + ? rule.getName() + : rule.getName() + "-source" + (sourceIndex + 1); + + IotRuleChainSaveReqVO req = new IotRuleChainSaveReqVO(); + req.setName(chainName); + req.setDescription(rule.getDescription()); + req.setType("DATA"); + req.setPriority(10); + req.setDebugMode(false); + + // 取第一个 source 的 productId/deviceId 作为 chain 级别过滤 + IotDataRuleDO.SourceConfig firstSrc = sources.get(0); + req.setProductId(firstSrc.getProductId()); + req.setDeviceId(firstSrc.getDeviceId()); + + List nodes = new ArrayList<>(); + List links = new ArrayList<>(); + + // --- Trigger 节点(index=0,临时 key=-1)--- + IotRuleChainSaveReqVO.NodeVO triggerNode = buildTriggerNode(sources); + triggerNode.setPositionX(100); + triggerNode.setPositionY(200); + nodes.add(triggerNode); // index 0 + + // --- Action 节点(index=1..N,临时 key=-(i+1))--- + int actionY = 100; + int nodeIndex = 1; // 从 1 开始,trigger 占 0 + for (IotDataSinkDO sink : sinks) { + IotRuleChainSaveReqVO.NodeVO actionNode = buildActionNode(sink); + if (actionNode == null) { + log.warn("[B18] DataSink(id={}, type={}) 无法映射,跳过", sink.getId(), sink.getType()); + continue; + } + actionNode.setPositionX(400); + actionNode.setPositionY(actionY); + actionY += 120; + nodes.add(actionNode); + + // 建立 trigger → action 的 link(临时 key) + // trigger 的临时 key = -(0+1) = -1 + // action 的临时 key = -(nodeIndex+1) + IotRuleChainSaveReqVO.LinkVO link = new IotRuleChainSaveReqVO.LinkVO(); + link.setSourceNodeId(-1L); + link.setTargetNodeId(-(long) (nodeIndex + 1)); + link.setRelationType("Success"); + link.setSortOrder(nodeIndex - 1); + links.add(link); + + nodeIndex++; + } + + req.setNodes(nodes); + req.setLinks(links); + return req; + } + + /** + * 构建 Trigger 节点(device.property.trigger 或 device.event.trigger) + * + *

若 sources 均为同 productId,则 identifiers 合并为数组(JSON config) + */ + private IotRuleChainSaveReqVO.NodeVO buildTriggerNode(List sources) { + IotRuleChainSaveReqVO.NodeVO node = new IotRuleChainSaveReqVO.NodeVO(); + node.setCategory("trigger"); + node.setName("设备数据触发器"); + + // 根据 method 决定 trigger 类型 + String method = sources.get(0).getMethod(); + String triggerType; + if (method != null && method.toLowerCase().contains("event")) { + triggerType = "device.event.trigger"; + } else { + triggerType = "device.property.trigger"; + } + node.setType(triggerType); + + // 收集 identifiers + List identifiers = sources.stream() + .map(IotDataRuleDO.SourceConfig::getIdentifier) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toList()); + + // 构建 configuration JSON + Map config = new LinkedHashMap<>(); + config.put("productId", sources.get(0).getProductId()); + config.put("deviceId", sources.get(0).getDeviceId()); + config.put("method", method); + if (!identifiers.isEmpty()) { + config.put("identifiers", identifiers); + } + + node.setConfiguration(toJson(config)); + return node; + } + + /** + * 将 v1 DataSink 映射为 v2 Action 节点 + * + * @return null 表示无法映射(未知类型),调用方跳过 + */ + IotRuleChainSaveReqVO.NodeVO buildActionNode(IotDataSinkDO sink) { + if (sink.getType() == null || sink.getConfig() == null) { + return null; + } + + IotRuleChainSaveReqVO.NodeVO node = new IotRuleChainSaveReqVO.NodeVO(); + node.setCategory("action"); + node.setName(sink.getName() != null ? sink.getName() : "推送动作"); + + int type = sink.getType(); + Map config = new LinkedHashMap<>(); + + if (type == IotDataSinkTypeEnum.HTTP.getType()) { + // http → http_push + node.setType("http_push"); + IotDataSinkHttpConfig http = (IotDataSinkHttpConfig) sink.getConfig(); + config.put("url", http.getUrl()); + config.put("method", http.getMethod()); + config.put("headers", http.getHeaders()); + config.put("body", http.getBody()); + + } else if (type == IotDataSinkTypeEnum.ROCKETMQ.getType()) { + // rocketmq → mq_push (provider=rocketmq) + node.setType("mq_push"); + IotDataSinkRocketMQConfig rmq = (IotDataSinkRocketMQConfig) sink.getConfig(); + config.put("provider", "rocketmq"); + config.put("nameServer", rmq.getNameServer()); + config.put("topic", rmq.getTopic()); + config.put("tag", rmq.getTags()); + config.put("group", rmq.getGroup()); + + } else if (type == IotDataSinkTypeEnum.KAFKA.getType()) { + // kafka → mq_push (provider=kafka) + node.setType("mq_push"); + IotDataSinkKafkaConfig kafka = (IotDataSinkKafkaConfig) sink.getConfig(); + config.put("provider", "kafka"); + config.put("bootstrapServers", kafka.getBootstrapServers()); + config.put("topic", kafka.getTopic()); + + } else if (type == IotDataSinkTypeEnum.RABBITMQ.getType()) { + // rabbitmq → mq_push (provider=rabbitmq) + node.setType("mq_push"); + IotDataSinkRabbitMQConfig rmq = (IotDataSinkRabbitMQConfig) sink.getConfig(); + config.put("provider", "rabbitmq"); + config.put("host", rmq.getHost()); + config.put("port", rmq.getPort()); + config.put("exchange", rmq.getExchange()); + config.put("routingKey", rmq.getRoutingKey()); + + } else if (type == IotDataSinkTypeEnum.REDIS.getType()) { + // redis → redis_push + node.setType("redis_push"); + IotDataSinkRedisConfig redis = (IotDataSinkRedisConfig) sink.getConfig(); + config.put("host", redis.getHost()); + config.put("port", redis.getPort()); + config.put("channel", redis.getTopic()); + config.put("key", redis.getTopic()); + config.put("dataStructure", redis.getDataStructure()); + + } else if (type == IotDataSinkTypeEnum.TCP.getType()) { + // tcp → tcp_push + node.setType("tcp_push"); + IotDataSinkTcpConfig tcp = (IotDataSinkTcpConfig) sink.getConfig(); + config.put("host", tcp.getHost()); + config.put("port", tcp.getPort()); + + } else { + // 未知类型(WebSocket/MQTT/DATABASE 暂不支持) + log.warn("[B18] DataSink type={} 暂不支持迁移,跳过 sinkId={}", type, sink.getId()); + return null; + } + + node.setConfiguration(toJson(config)); + return node; + } + + /** + * 将 Map 序列化为 JSON 字符串(迁移工具内部使用) + */ + private String toJson(Map map) { + try { + return objectMapper.writeValueAsString(map); + } catch (Exception e) { + log.error("[B18] JSON 序列化失败", e); + return "{}"; + } + } + + // ------------------------------------------------------------------------- + // 内部 DTO + // ------------------------------------------------------------------------- + + /** + * 单次迁移候选(对应一个将要创建的 v2 chain) + */ + public record ChainCandidate( + /** 对应 v1 sourceConfigs 的序号(合并模式固定为 0) */ + int sourceIndex, + /** 待创建的 chain 请求 VO */ + IotRuleChainSaveReqVO saveReqVO + ) { + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/migration/DataRuleMigratorTest.java b/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/migration/DataRuleMigratorTest.java new file mode 100644 index 00000000..0449e8a6 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/migration/DataRuleMigratorTest.java @@ -0,0 +1,427 @@ +package com.viewsh.module.iot.migration; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.framework.test.core.ut.BaseMockitoUnitTest; +import com.viewsh.module.iot.dal.dataobject.rule.IotDataRuleDO; +import com.viewsh.module.iot.dal.dataobject.rule.IotDataSinkDO; +import com.viewsh.module.iot.dal.dataobject.rule.config.*; +import com.viewsh.module.iot.dal.mysql.rule.IotDataRuleMapper; +import com.viewsh.module.iot.dal.mysql.rule.IotDataSinkMapper; +import com.viewsh.module.iot.enums.rule.IotDataSinkTypeEnum; +import com.viewsh.module.iot.migration.mapping.DataRuleToChainMapper; +import com.viewsh.module.iot.rule.controller.admin.vo.IotRuleChainSaveReqVO; +import com.viewsh.module.iot.rule.service.IotRuleChainService; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.springframework.dao.DuplicateKeyException; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; + +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +/** + * B18 — {@link DataRuleMigrator} 单元测试 + * + *

6 个测试用例: + *

    + *
  1. single_source_http — 1 source + 1 http sink → 1 chain (Trigger + HttpPush)
  2. + *
  3. multi_sink — 1 source + 5 sinks → 1 chain (Trigger + 5 Actions)
  4. + *
  5. multi_source_mergeable — 2 sources 同 productId + method → 合并为 1 chain
  6. + *
  7. multi_source_split — 2 sources 不同 productId → 拆为 2 chain
  8. + *
  9. mq_provider_preserve — sink=rocketmq → mq_push, config.provider="rocketmq"
  10. + *
  11. idempotent — 重跑 → 跳过(映射表 UK 冲突不抛错)
  12. + *
+ */ +class DataRuleMigratorTest extends BaseMockitoUnitTest { + + @Mock + private IotDataRuleMapper dataRuleMapper; + + @Mock + private IotDataSinkMapper dataSinkMapper; + + @Mock + private IotRuleChainService ruleChainService; + + @Mock + private JdbcTemplate jdbcTemplate; + + private DataRuleToChainMapper chainMapper; + private DataRuleMigrator migrator; + + @BeforeEach + void setUp() { + chainMapper = new DataRuleToChainMapper(new ObjectMapper()); + migrator = new DataRuleMigrator(dataRuleMapper, dataSinkMapper, chainMapper, ruleChainService, jdbcTemplate); + } + + // ========================================================================= + // 用例 1:single_source_http + // ========================================================================= + @Test + @DisplayName("single_source_http: 1 source + 1 http sink → 1 chain with Trigger + HttpPush action") + void testSingleSourceHttp() { + // Given + Long ruleId = 1L; + Long sinkId = 100L; + + IotDataRuleDO rule = buildRule(ruleId, "HTTP 规则", + List.of(buildSource(10L, 101L, "testProp", "property")), + List.of(sinkId)); + + IotDataSinkDO httpSink = buildSink(sinkId, "HTTP推送", IotDataSinkTypeEnum.HTTP.getType(), buildHttpConfig()); + + mockMappers(List.of(rule), List.of(httpSink)); + mockMigrationTableEmpty(ruleId, 0); + when(ruleChainService.createRuleChain(any())).thenReturn(200L); + mockInsertMapping(); + + // When + DataRuleMigrator.ExecuteResult result = migrator.execute("test"); + + // Then + assertThat(result.totalRules()).isEqualTo(1); + assertThat(result.migratedCount()).isEqualTo(1); + assertThat(result.skippedCount()).isEqualTo(0); + assertThat(result.errors()).isEmpty(); + + // 验证 createRuleChain 被调用,且 chain type=DATA + ArgumentCaptor captor = ArgumentCaptor.forClass(IotRuleChainSaveReqVO.class); + verify(ruleChainService).createRuleChain(captor.capture()); + IotRuleChainSaveReqVO req = captor.getValue(); + assertThat(req.getType()).isEqualTo("DATA"); + // trigger 节点 + IotRuleChainSaveReqVO.NodeVO trigger = req.getNodes().stream() + .filter(n -> "trigger".equals(n.getCategory())).findFirst().orElseThrow(); + assertThat(trigger.getType()).startsWith("device."); + // action 节点 + IotRuleChainSaveReqVO.NodeVO action = req.getNodes().stream() + .filter(n -> "action".equals(n.getCategory())).findFirst().orElseThrow(); + assertThat(action.getType()).isEqualTo("http_push"); + } + + // ========================================================================= + // 用例 2:multi_sink + // ========================================================================= + @Test + @DisplayName("multi_sink: 1 source + 5 sinks → 1 chain with Trigger + 5 action nodes") + void testMultiSink() { + // Given + Long ruleId = 2L; + List sinkIds = List.of(201L, 202L, 203L, 204L, 205L); + + IotDataRuleDO rule = buildRule(ruleId, "多 Sink 规则", + List.of(buildSource(10L, 0L, null, "property")), + sinkIds); + + List sinks = List.of( + buildSink(201L, "HTTP", IotDataSinkTypeEnum.HTTP.getType(), buildHttpConfig()), + buildSink(202L, "RocketMQ", IotDataSinkTypeEnum.ROCKETMQ.getType(), buildRocketMQConfig()), + buildSink(203L, "Kafka", IotDataSinkTypeEnum.KAFKA.getType(), buildKafkaConfig()), + buildSink(204L, "RabbitMQ", IotDataSinkTypeEnum.RABBITMQ.getType(), buildRabbitMQConfig()), + buildSink(205L, "Redis", IotDataSinkTypeEnum.REDIS.getType(), buildRedisConfig()) + ); + + mockMappers(List.of(rule), sinks); + mockMigrationTableEmpty(ruleId, 0); + when(ruleChainService.createRuleChain(any())).thenReturn(300L); + mockInsertMapping(); + + // When + DataRuleMigrator.ExecuteResult result = migrator.execute("test"); + + // Then + assertThat(result.migratedCount()).isEqualTo(1); + ArgumentCaptor captor = ArgumentCaptor.forClass(IotRuleChainSaveReqVO.class); + verify(ruleChainService).createRuleChain(captor.capture()); + IotRuleChainSaveReqVO req = captor.getValue(); + + long triggerCount = req.getNodes().stream().filter(n -> "trigger".equals(n.getCategory())).count(); + long actionCount = req.getNodes().stream().filter(n -> "action".equals(n.getCategory())).count(); + assertThat(triggerCount).isEqualTo(1); + assertThat(actionCount).isEqualTo(5); + } + + // ========================================================================= + // 用例 3:multi_source_mergeable + // ========================================================================= + @Test + @DisplayName("multi_source_mergeable: 2 sources 同 productId + method → 合并为 1 chain") + void testMultiSourceMergeable() { + // Given - same productId + method + Long ruleId = 3L; + Long sinkId = 301L; + + IotDataRuleDO rule = buildRule(ruleId, "可合并多源", + List.of( + buildSource(50L, 0L, "temp", "property"), + buildSource(50L, 0L, "humidity", "property") + ), + List.of(sinkId)); + + IotDataSinkDO sink = buildSink(sinkId, "HTTP", IotDataSinkTypeEnum.HTTP.getType(), buildHttpConfig()); + + mockMappers(List.of(rule), List.of(sink)); + mockMigrationTableEmpty(ruleId, 0); + when(ruleChainService.createRuleChain(any())).thenReturn(400L); + mockInsertMapping(); + + // When + DataRuleMigrator.ExecuteResult result = migrator.execute("test"); + + // Then: only 1 chain created (merged) + assertThat(result.migratedCount()).isEqualTo(1); + verify(ruleChainService, times(1)).createRuleChain(any()); + + // 验证 trigger config 中 identifiers 包含两个值 + ArgumentCaptor captor = ArgumentCaptor.forClass(IotRuleChainSaveReqVO.class); + verify(ruleChainService).createRuleChain(captor.capture()); + IotRuleChainSaveReqVO req = captor.getValue(); + IotRuleChainSaveReqVO.NodeVO trigger = req.getNodes().stream() + .filter(n -> "trigger".equals(n.getCategory())).findFirst().orElseThrow(); + assertThat(trigger.getConfiguration()).contains("temp").contains("humidity"); + } + + // ========================================================================= + // 用例 4:multi_source_split + // ========================================================================= + @Test + @DisplayName("multi_source_split: 2 sources 不同 productId → 拆为 2 chain") + void testMultiSourceSplit() { + // Given - different productId + Long ruleId = 4L; + Long sinkId = 401L; + + IotDataRuleDO rule = buildRule(ruleId, "拆分多源", + List.of( + buildSource(60L, 0L, "temp", "property"), + buildSource(70L, 0L, "temp", "property") + ), + List.of(sinkId)); + + IotDataSinkDO sink = buildSink(sinkId, "HTTP", IotDataSinkTypeEnum.HTTP.getType(), buildHttpConfig()); + + mockMappers(List.of(rule), List.of(sink)); + // source_index 0 未迁移,source_index 1 未迁移 + mockMigrationTableEmpty(ruleId, 0); + mockMigrationTableEmpty(ruleId, 1); + when(ruleChainService.createRuleChain(any())).thenReturn(500L).thenReturn(501L); + mockInsertMapping(); + + // When + DataRuleMigrator.ExecuteResult result = migrator.execute("test"); + + // Then: 2 chains created (split) + assertThat(result.migratedCount()).isEqualTo(2); + verify(ruleChainService, times(2)).createRuleChain(any()); + + // 验证 chain 名称带序号 + ArgumentCaptor captor = ArgumentCaptor.forClass(IotRuleChainSaveReqVO.class); + verify(ruleChainService, times(2)).createRuleChain(captor.capture()); + List reqs = captor.getAllValues(); + assertThat(reqs.get(0).getName()).endsWith("-source1"); + assertThat(reqs.get(1).getName()).endsWith("-source2"); + } + + // ========================================================================= + // 用例 5:mq_provider_preserve + // ========================================================================= + @Test + @DisplayName("mq_provider_preserve: sink=rocketmq → action type=mq_push, config.provider=rocketmq") + void testMqProviderPreserve() { + // Given + Long ruleId = 5L; + Long sinkId = 501L; + + IotDataRuleDO rule = buildRule(ruleId, "RocketMQ 规则", + List.of(buildSource(10L, 0L, "temp", "property")), + List.of(sinkId)); + + IotDataSinkDO rmqSink = buildSink(sinkId, "RMQ", IotDataSinkTypeEnum.ROCKETMQ.getType(), buildRocketMQConfig()); + + mockMappers(List.of(rule), List.of(rmqSink)); + mockMigrationTableEmpty(ruleId, 0); + when(ruleChainService.createRuleChain(any())).thenReturn(600L); + mockInsertMapping(); + + // When + migrator.execute("test"); + + // Then: action type=mq_push + provider=rocketmq + ArgumentCaptor captor = ArgumentCaptor.forClass(IotRuleChainSaveReqVO.class); + verify(ruleChainService).createRuleChain(captor.capture()); + IotRuleChainSaveReqVO req = captor.getValue(); + + IotRuleChainSaveReqVO.NodeVO action = req.getNodes().stream() + .filter(n -> "action".equals(n.getCategory())).findFirst().orElseThrow(); + assertThat(action.getType()).isEqualTo("mq_push"); + assertThat(action.getConfiguration()).contains("\"provider\"").contains("rocketmq"); + } + + // ========================================================================= + // 用例 6:idempotent + // ========================================================================= + @Test + @DisplayName("idempotent: 重跑时映射表已存在 → 跳过,不重复创建 chain") + void testIdempotent() { + // Given + Long ruleId = 6L; + Long sinkId = 601L; + + IotDataRuleDO rule = buildRule(ruleId, "幂等测试", + List.of(buildSource(10L, 0L, "temp", "property")), + List.of(sinkId)); + + IotDataSinkDO sink = buildSink(sinkId, "HTTP", IotDataSinkTypeEnum.HTTP.getType(), buildHttpConfig()); + + mockMappers(List.of(rule), List.of(sink)); + + // 模拟:映射表已存在该记录(COUNT=1) + when(jdbcTemplate.queryForObject( + contains("COUNT(1)"), + eq(Integer.class), + eq(ruleId), eq(0) + )).thenReturn(1); + + // When + DataRuleMigrator.ExecuteResult result = migrator.execute("test"); + + // Then: skipped=1, createRuleChain 不被调用 + assertThat(result.skippedCount()).isEqualTo(1); + assertThat(result.migratedCount()).isEqualTo(0); + verify(ruleChainService, never()).createRuleChain(any()); + } + + // ========================================================================= + // Helper: DataRuleToChainMapper 直接测试(canMerge 逻辑) + // ========================================================================= + @Test + @DisplayName("canMerge: 相同 productId + method 可合并") + void testCanMerge_same() { + DataRuleToChainMapper m = new DataRuleToChainMapper(new ObjectMapper()); + List sources = List.of( + buildSource(1L, 0L, "a", "property"), + buildSource(1L, 0L, "b", "property") + ); + assertThat(m.canMerge(sources)).isTrue(); + } + + @Test + @DisplayName("canMerge: 不同 productId 不可合并") + void testCanMerge_diff() { + DataRuleToChainMapper m = new DataRuleToChainMapper(new ObjectMapper()); + List sources = List.of( + buildSource(1L, 0L, "a", "property"), + buildSource(2L, 0L, "a", "property") + ); + assertThat(m.canMerge(sources)).isFalse(); + } + + // ========================================================================= + // Mock helpers + // ========================================================================= + + private void mockMappers(List rules, List sinks) { + when(dataRuleMapper.selectList(null)).thenReturn(rules); + when(dataSinkMapper.selectList(null)).thenReturn(sinks); + } + + /** 模拟映射表 COUNT 查询返回 0(未迁移) */ + private void mockMigrationTableEmpty(Long ruleId, int sourceIndex) { + when(jdbcTemplate.queryForObject( + contains("COUNT(1)"), + eq(Integer.class), + eq(ruleId), eq(sourceIndex) + )).thenReturn(0); + } + + /** 模拟 INSERT 映射表(正常写入) */ + private void mockInsertMapping() { + when(jdbcTemplate.update(anyString(), any(), any(), any(), any(), any())).thenReturn(1); + } + + // ========================================================================= + // DO builders + // ========================================================================= + + private IotDataRuleDO buildRule(Long id, String name, + List sources, + List sinkIds) { + IotDataRuleDO rule = new IotDataRuleDO(); + rule.setId(id); + rule.setName(name); + rule.setSourceConfigs(sources); + rule.setSinkIds(sinkIds); + return rule; + } + + private IotDataRuleDO.SourceConfig buildSource(Long productId, Long deviceId, String identifier, String method) { + IotDataRuleDO.SourceConfig src = new IotDataRuleDO.SourceConfig(); + src.setProductId(productId); + src.setDeviceId(deviceId); + src.setIdentifier(identifier); + src.setMethod(method); + return src; + } + + private IotDataSinkDO buildSink(Long id, String name, Integer type, IotAbstractDataSinkConfig config) { + IotDataSinkDO sink = new IotDataSinkDO(); + sink.setId(id); + sink.setName(name); + sink.setType(type); + sink.setConfig(config); + return sink; + } + + // Config builders + private IotDataSinkHttpConfig buildHttpConfig() { + IotDataSinkHttpConfig cfg = new IotDataSinkHttpConfig(); + cfg.setUrl("http://example.com/api"); + cfg.setMethod("POST"); + cfg.setBody("{\"msg\":\"hello\"}"); + return cfg; + } + + private IotDataSinkRocketMQConfig buildRocketMQConfig() { + IotDataSinkRocketMQConfig cfg = new IotDataSinkRocketMQConfig(); + cfg.setNameServer("127.0.0.1:9876"); + cfg.setTopic("iot-topic"); + cfg.setTags("tag1"); + cfg.setGroup("iot-group"); + return cfg; + } + + private IotDataSinkKafkaConfig buildKafkaConfig() { + IotDataSinkKafkaConfig cfg = new IotDataSinkKafkaConfig(); + cfg.setBootstrapServers("127.0.0.1:9092"); + cfg.setTopic("iot-kafka-topic"); + return cfg; + } + + private IotDataSinkRabbitMQConfig buildRabbitMQConfig() { + IotDataSinkRabbitMQConfig cfg = new IotDataSinkRabbitMQConfig(); + cfg.setHost("127.0.0.1"); + cfg.setPort(5672); + cfg.setExchange("iot-exchange"); + cfg.setRoutingKey("iot.key"); + return cfg; + } + + private IotDataSinkRedisConfig buildRedisConfig() { + IotDataSinkRedisConfig cfg = new IotDataSinkRedisConfig(); + cfg.setHost("127.0.0.1"); + cfg.setPort(6379); + cfg.setTopic("iot-channel"); + return cfg; + } + +}