clearAlarm(@Valid @RequestBody IotAlarmClearReqDTO reqDTO);
+
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/alarm/dto/IotAlarmClearReqDTO.java b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/alarm/dto/IotAlarmClearReqDTO.java
new file mode 100644
index 00000000..14f1fe2a
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/alarm/dto/IotAlarmClearReqDTO.java
@@ -0,0 +1,31 @@
+package com.viewsh.module.iot.api.alarm.dto;
+
+import jakarta.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * 告警清除请求 DTO
+ *
+ * 幂等:对已清除告警重复调用为 no-op(Service 层保证)。
+ *
+ * @author B6
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class IotAlarmClearReqDTO {
+
+ @NotNull(message = "告警记录 ID 不能为空")
+ private Long alarmId;
+
+ /** 操作人(规则引擎触发通常为 "rule-engine:{chainId}") */
+ private String operator;
+
+ /** 备注 */
+ private String remark;
+
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/alarm/dto/IotAlarmTriggerReqDTO.java b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/alarm/dto/IotAlarmTriggerReqDTO.java
new file mode 100644
index 00000000..41865808
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/alarm/dto/IotAlarmTriggerReqDTO.java
@@ -0,0 +1,55 @@
+package com.viewsh.module.iot.api.alarm.dto;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import jakarta.validation.constraints.Max;
+import jakarta.validation.constraints.Min;
+import jakarta.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * 告警触发请求 DTO(规则引擎 AlarmTriggerAction 调用)
+ *
+ * 幂等语义:同一 (deviceId, alarmConfigId) 活跃记录已存在 → trigger_count++;
+ * 否则插入新记录。详见 IotAlarmRecordService.triggerAlarm。
+ *
+ * @author B6
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class IotAlarmTriggerReqDTO {
+
+ @NotNull(message = "设备 ID 不能为空")
+ private Long deviceId;
+
+ @NotNull(message = "告警配置 ID 不能为空")
+ private Long alarmConfigId;
+
+ @NotNull(message = "告警严重度不能为空")
+ @Min(value = 1, message = "告警严重度最小为 1")
+ @Max(value = 5, message = "告警严重度最大为 5")
+ private Integer severity;
+
+ /** 告警名称(冗余便于查询) */
+ private String alarmName;
+
+ /** 产品 ID(冗余,可选) */
+ private Long productId;
+
+ /** 所属子系统 ID */
+ private Long subsystemId;
+
+ /** v2 规则链 ID */
+ private Long ruleChainId;
+
+ /** v1 场景规则 ID(灰度期兼容) */
+ private Long sceneRuleId;
+
+ /** 触发详情(模板解析后的 JSON) */
+ private JsonNode details;
+
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/ActionProviderManager.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/ActionProviderManager.java
new file mode 100644
index 00000000..45f49cb4
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/ActionProviderManager.java
@@ -0,0 +1,63 @@
+package com.viewsh.module.iot.rule.action;
+
+import com.viewsh.module.iot.rule.spi.ActionProvider;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * ActionProvider 注册表(Spring 容器扫描 + getType() 索引)。
+ *
+ *
启动时通过 {@link #autoRegister(List)} 自动收集所有 Spring Bean 中的
+ * {@link ActionProvider},按 {@link ActionProvider#getType()} 建立索引。
+ *
+ *
Fail-fast:同 type 重复注册时抛出 {@link IllegalStateException},
+ * 确保启动阶段即暴露问题,不留运行时隐患。
+ *
+ *
注意:{@link ActionProvider} 实现同时也是 {@link com.viewsh.module.iot.rule.engine.NodeProvider},
+ * 会被 {@link com.viewsh.module.iot.rule.engine.NodeProviderRegistry} 直接收集并路由。
+ * 本 Manager 仅提供按 type 查找的便捷入口(供测试 / 管理接口使用)。
+ */
+@Slf4j
+@Component
+public class ActionProviderManager {
+
+ private final Map providers = new ConcurrentHashMap<>();
+
+ @Autowired(required = false)
+ public void autoRegister(List list) {
+ if (list == null || list.isEmpty()) {
+ log.warn("[ActionProviderManager] 未发现任何 ActionProvider 实现");
+ return;
+ }
+ for (ActionProvider p : list) {
+ ActionProvider dup = providers.put(p.getType(), p);
+ if (dup != null) {
+ throw new IllegalStateException(
+ "duplicate action type: " + p.getType()
+ + " — " + dup.getClass().getName() + " vs " + p.getClass().getName());
+ }
+ }
+ log.info("[ActionProviderManager] 已注册 {} 个 ActionProvider:{}", providers.size(), providers.keySet());
+ }
+
+ public ActionProvider get(String type) {
+ ActionProvider p = providers.get(type);
+ if (p == null) {
+ throw new IllegalArgumentException("未注册的 ActionProvider type=" + type);
+ }
+ return p;
+ }
+
+ public boolean contains(String type) {
+ return providers.containsKey(type);
+ }
+
+ public Map all() {
+ return Map.copyOf(providers);
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/AlarmClearAction.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/AlarmClearAction.java
new file mode 100644
index 00000000..7a1d6d44
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/AlarmClearAction.java
@@ -0,0 +1,93 @@
+package com.viewsh.module.iot.rule.action;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.viewsh.framework.common.pojo.CommonResult;
+import com.viewsh.module.iot.api.alarm.IotAlarmRecordApi;
+import com.viewsh.module.iot.api.alarm.dto.IotAlarmClearReqDTO;
+import com.viewsh.module.iot.rule.engine.RuleContext;
+import com.viewsh.module.iot.rule.result.ActionResult;
+import com.viewsh.module.iot.rule.spi.ActionProvider;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+/**
+ * 告警清除 Action(alarm_clear)。
+ *
+ * 配置示例:
+ *
{@code
+ * {
+ * "alarmId": "${meta.alarmId}",
+ * "operator": "rule-engine"
+ * }
+ * }
+ *
+ * alarmId 优先从 config 读取,若未配置则从 ctx.metadata.alarmId 获取(与 alarm_trigger 联动)。
+ * 幂等:已清除告警重复调用为 no-op(Service 层保证)。
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class AlarmClearAction implements ActionProvider {
+
+ public static final String TYPE = "alarm_clear";
+
+ private final IotAlarmRecordApi alarmRecordApi;
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ @Override
+ public ActionResult executeAction(RuleContext ctx, JsonNode config) {
+ try {
+ Long alarmId = resolveAlarmId(ctx, config);
+ if (alarmId == null || alarmId == 0) {
+ return ActionResult.failure("alarm_clear: alarmId 缺失(config 中未配置,ctx.metadata.alarmId 亦为空)");
+ }
+
+ String operator = config.path("operator").asText("rule-engine:" + ctx.getChainId());
+
+ IotAlarmClearReqDTO req = IotAlarmClearReqDTO.builder()
+ .alarmId(alarmId)
+ .operator(operator)
+ .build();
+
+ CommonResult result = alarmRecordApi.clearAlarm(req);
+ if (result == null || !result.isSuccess()) {
+ String errMsg = result != null ? result.getMsg() : "API 返回 null";
+ log.warn("[AlarmClearAction] chain={} alarmId={} clearAlarm 失败: {}",
+ ctx.getChainId(), alarmId, errMsg);
+ return ActionResult.failure("clearAlarm 失败: " + errMsg);
+ }
+
+ log.debug("[AlarmClearAction] chain={} alarmId={} 清除成功", ctx.getChainId(), alarmId);
+ return ActionResult.success();
+
+ } catch (Exception e) {
+ log.warn("[AlarmClearAction] chain={} 异常: {}", ctx.getChainId(), e.getMessage());
+ return ActionResult.failure(e.getMessage());
+ }
+ }
+
+ private Long resolveAlarmId(RuleContext ctx, JsonNode config) {
+ if (config.hasNonNull("alarmId")) {
+ JsonNode node = config.get("alarmId");
+ if (node.isNumber()) return node.asLong();
+ // 支持 "${meta.alarmId}" 语法的静态解析
+ String raw = node.asText();
+ if (raw.startsWith("${meta.") && raw.endsWith("}")) {
+ String key = raw.substring(7, raw.length() - 1);
+ Object val = ctx.getMetadata().get(key);
+ if (val instanceof Long l) return l;
+ if (val instanceof Number n) return n.longValue();
+ }
+ }
+ // 从 ctx.metadata 兜底获取(与 alarm_trigger 同链联动)
+ Object meta = ctx.getMetadata().get("alarmId");
+ if (meta instanceof Long l) return l;
+ if (meta instanceof Number n) return n.longValue();
+ return null;
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/AlarmTriggerAction.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/AlarmTriggerAction.java
new file mode 100644
index 00000000..3c36460f
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/AlarmTriggerAction.java
@@ -0,0 +1,94 @@
+package com.viewsh.module.iot.rule.action;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.viewsh.framework.common.pojo.CommonResult;
+import com.viewsh.module.iot.api.alarm.IotAlarmRecordApi;
+import com.viewsh.module.iot.api.alarm.dto.IotAlarmTriggerReqDTO;
+import com.viewsh.module.iot.rule.engine.RuleContext;
+import com.viewsh.module.iot.rule.result.ActionResult;
+import com.viewsh.module.iot.rule.spi.ActionProvider;
+import com.viewsh.module.iot.rule.template.TemplateResolver;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+/**
+ * 告警触发 Action(alarm_trigger)。
+ *
+ * 配置示例:
+ *
{@code
+ * {
+ * "alarmConfigId": 100,
+ * "severity": 3,
+ * "name": "设备 ${meta.deviceName} 温度过高",
+ * "details": { "temperature": "${data.temperature}" }
+ * }
+ * }
+ *
+ * 调用 {@link IotAlarmRecordApi#triggerAlarm} 触发告警(API 层幂等);
+ * 将 alarmId 写入 ctx.metadata 供后继节点引用。
+ *
+ *
评审 C5:模板变量统一走 {@link TemplateResolver},不重复实现。
+ * 评审 C1:幂等由 Service 层保证(UK on device_id + alarm_config_id),Action 层无需判重。
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class AlarmTriggerAction implements ActionProvider {
+
+ public static final String TYPE = "alarm_trigger";
+
+ private final IotAlarmRecordApi alarmRecordApi;
+ private final TemplateResolver templateResolver;
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ @Override
+ public ActionResult executeAction(RuleContext ctx, JsonNode config) {
+ try {
+ Long alarmConfigId = config.path("alarmConfigId").asLong(0);
+ if (alarmConfigId == 0) {
+ return ActionResult.failure("alarm_trigger: alarmConfigId 缺失");
+ }
+ int severity = config.path("severity").asInt(3);
+
+ String alarmName = config.hasNonNull("name")
+ ? String.valueOf(templateResolver.resolve(config.get("name").asText(), ctx))
+ : "规则告警";
+
+ JsonNode details = config.path("details");
+ // details 字段本身也可含模板变量;第一期直接透传
+ IotAlarmTriggerReqDTO req = IotAlarmTriggerReqDTO.builder()
+ .deviceId(ctx.getDeviceId())
+ .alarmConfigId(alarmConfigId)
+ .severity(severity)
+ .alarmName(alarmName)
+ .productId(ctx.getProductId())
+ .subsystemId(ctx.getSubsystemId())
+ .ruleChainId(ctx.getChainId())
+ .details(details.isNull() ? null : details)
+ .build();
+
+ CommonResult result = alarmRecordApi.triggerAlarm(req);
+ if (result == null || !result.isSuccess()) {
+ String errMsg = result != null ? result.getMsg() : "API 返回 null";
+ log.warn("[AlarmTriggerAction] chain={} device={} triggerAlarm 失败: {}",
+ ctx.getChainId(), ctx.getDeviceId(), errMsg);
+ return ActionResult.failure("triggerAlarm 失败: " + errMsg);
+ }
+
+ Long alarmId = result.getData();
+ ctx.getMetadata().put("alarmId", alarmId);
+ log.debug("[AlarmTriggerAction] chain={} device={} alarmId={}", ctx.getChainId(), ctx.getDeviceId(), alarmId);
+ return ActionResult.success(alarmId);
+
+ } catch (Exception e) {
+ log.warn("[AlarmTriggerAction] chain={} device={} 异常: {}",
+ ctx.getChainId(), ctx.getDeviceId(), e.getMessage());
+ return ActionResult.failure(e.getMessage());
+ }
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/DevicePropertySetAction.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/DevicePropertySetAction.java
new file mode 100644
index 00000000..53249066
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/DevicePropertySetAction.java
@@ -0,0 +1,75 @@
+package com.viewsh.module.iot.rule.action;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.viewsh.module.iot.rule.engine.RuleContext;
+import com.viewsh.module.iot.rule.result.ActionResult;
+import com.viewsh.module.iot.rule.spi.ActionProvider;
+import com.viewsh.module.iot.rule.template.TemplateResolver;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+/**
+ * 设备属性设置 Action(device_property_set)。
+ *
+ * 配置示例:
+ *
{@code
+ * {
+ * "properties": {
+ * "targetTemperature": "${data.temperature}",
+ * "mode": "auto"
+ * }
+ * }
+ * }
+ *
+ * 第一期:写 Redis(设备影子属性缓存);MySQL 同步在第二期 B27 完整实现(Shared 属性下发)。
+ * 评审 C5:属性值若含模板变量,统一走 {@link TemplateResolver} 解析。
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class DevicePropertySetAction implements ActionProvider {
+
+ public static final String TYPE = "device_property_set";
+
+ private final TemplateResolver templateResolver;
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ @Override
+ public ActionResult executeAction(RuleContext ctx, JsonNode config) {
+ try {
+ JsonNode propertiesNode = config.path("properties");
+ if (propertiesNode.isMissingNode() || !propertiesNode.isObject()) {
+ return ActionResult.failure("device_property_set: properties 配置缺失或非 Object");
+ }
+ if (ctx.getDeviceId() == null) {
+ return ActionResult.failure("device_property_set: ctx.deviceId 为 null");
+ }
+
+ // 解析所有属性值(支持模板变量)
+ var fields = propertiesNode.fields();
+ int count = 0;
+ while (fields.hasNext()) {
+ var entry = fields.next();
+ String key = entry.getKey();
+ String rawValue = entry.getValue().asText();
+ Object resolved = templateResolver.resolve(rawValue, ctx);
+ // TODO B27: 写 Redis 设备影子 + MySQL(Shared 属性下发)
+ log.info("[DevicePropertySetAction] [stub] chain={} device={} property={}={}",
+ ctx.getChainId(), ctx.getDeviceId(), key, resolved);
+ count++;
+ }
+
+ return ActionResult.successMsg("device_property_set: 已解析 " + count + " 个属性(第一期 stub)");
+
+ } catch (Exception e) {
+ log.warn("[DevicePropertySetAction] chain={} device={} 异常: {}",
+ ctx.getChainId(), ctx.getDeviceId(), e.getMessage());
+ return ActionResult.failure(e.getMessage());
+ }
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/DeviceServiceInvokeAction.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/DeviceServiceInvokeAction.java
new file mode 100644
index 00000000..1951d42c
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/DeviceServiceInvokeAction.java
@@ -0,0 +1,93 @@
+package com.viewsh.module.iot.rule.action;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.viewsh.framework.common.pojo.CommonResult;
+import com.viewsh.module.iot.api.device.IotDeviceControlApi;
+import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeReqDTO;
+import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeRespDTO;
+import com.viewsh.module.iot.rule.engine.RuleContext;
+import com.viewsh.module.iot.rule.result.ActionResult;
+import com.viewsh.module.iot.rule.spi.ActionProvider;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+/**
+ * 设备服务调用 Action(device_service_invoke)。
+ *
+ *
配置示例:
+ *
{@code
+ * {
+ * "identifier": "playVoice",
+ * "params": { "text": "温度过高,请注意!", "volume": 80 },
+ * "timeoutSeconds": 30
+ * }
+ * }
+ *
+ * 第一期调用 {@link IotDeviceControlApi#invokeService} 同步下发服务指令;
+ * 评审 D1:第二期 B27-B34 引入 B29 后改为创建 RpcCommand 持久化记录并异步跟踪。
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class DeviceServiceInvokeAction implements ActionProvider {
+
+ public static final String TYPE = "device_service_invoke";
+
+ private final IotDeviceControlApi deviceControlApi;
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ @Override
+ public ActionResult executeAction(RuleContext ctx, JsonNode config) {
+ try {
+ String identifier = config.path("identifier").asText("");
+ if (identifier.isBlank()) {
+ return ActionResult.failure("device_service_invoke: identifier 缺失");
+ }
+ if (ctx.getDeviceId() == null) {
+ return ActionResult.failure("device_service_invoke: ctx.deviceId 为 null");
+ }
+
+ Map params = null;
+ JsonNode paramsNode = config.path("params");
+ if (paramsNode.isObject()) {
+ params = OBJECT_MAPPER.convertValue(paramsNode,
+ OBJECT_MAPPER.getTypeFactory().constructMapType(Map.class, String.class, Object.class));
+ }
+
+ int timeout = config.path("timeoutSeconds").asInt(30);
+
+ IotDeviceServiceInvokeReqDTO req = IotDeviceServiceInvokeReqDTO.builder()
+ .deviceId(ctx.getDeviceId())
+ .identifier(identifier)
+ .params(params)
+ .timeoutSeconds(timeout)
+ .build();
+
+ CommonResult result = deviceControlApi.invokeService(req);
+ if (result == null || !result.isSuccess()) {
+ String errMsg = result != null ? result.getMsg() : "API 返回 null";
+ log.warn("[DeviceServiceInvokeAction] chain={} device={} identifier={} 失败: {}",
+ ctx.getChainId(), ctx.getDeviceId(), identifier, errMsg);
+ return ActionResult.failure("invokeService 失败: " + errMsg);
+ }
+
+ log.debug("[DeviceServiceInvokeAction] chain={} device={} identifier={} 调用成功",
+ ctx.getChainId(), ctx.getDeviceId(), identifier);
+ return ActionResult.success(result.getData());
+
+ } catch (Exception e) {
+ log.warn("[DeviceServiceInvokeAction] chain={} device={} 异常: {}",
+ ctx.getChainId(), ctx.getDeviceId(), e.getMessage());
+ return ActionResult.failure(e.getMessage());
+ }
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/NotifyAction.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/NotifyAction.java
new file mode 100644
index 00000000..ee5a8972
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/NotifyAction.java
@@ -0,0 +1,161 @@
+package com.viewsh.module.iot.rule.action;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.viewsh.module.iot.rule.engine.RuleContext;
+import com.viewsh.module.iot.rule.result.ActionResult;
+import com.viewsh.module.iot.rule.spi.ActionProvider;
+import com.viewsh.module.iot.rule.template.TemplateResolver;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * 通知发送 Action(notify)。
+ *
+ * 配置示例:
+ *
{@code
+ * {
+ * "channels": ["sms", "email", "in_app", "webhook"],
+ * "receivers": { "userIds": [1001], "webhookUrl": "https://hook.example.com" },
+ * "template": {
+ * "title": "设备 ${meta.deviceName} 告警",
+ * "body": "告警:${meta.alarmName},触发值 ${data.temperature}°C"
+ * }
+ * }
+ * }
+ *
+ * 4 个通道并发触发,部分失败不阻塞其他通道,最终汇总结果。
+ * 评审 C5:title/body 统一走 {@link TemplateResolver} 解析。
+ * 评审 B6:@Async 慎用,保持同步线程池以保留 traceId 和 tenant 上下文。
+ *
+ *
第一期 B16(NotifyService)未就绪,各通道以 TODO stub 占位并记录日志;
+ * B16 就绪后替换 stub 即可。
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class NotifyAction implements ActionProvider {
+
+ public static final String TYPE = "notify";
+
+ private final TemplateResolver templateResolver;
+
+ /** 通知并发线程池(复用,避免每次 Action 创建) */
+ private static final ExecutorService NOTIFY_POOL =
+ Executors.newFixedThreadPool(4, r -> {
+ Thread t = new Thread(r, "iot-notify-");
+ t.setDaemon(true);
+ return t;
+ });
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ @Override
+ public ActionResult executeAction(RuleContext ctx, JsonNode config) {
+ try {
+ JsonNode channelsNode = config.path("channels");
+ if (channelsNode.isMissingNode() || !channelsNode.isArray() || channelsNode.isEmpty()) {
+ return ActionResult.failure("notify: channels 未配置");
+ }
+
+ String title = resolveTemplate(config.path("template").path("title").asText(""), ctx);
+ String body = resolveTemplate(config.path("template").path("body").asText(""), ctx);
+ JsonNode receivers = config.path("receivers");
+
+ List channels = new ArrayList<>();
+ for (JsonNode ch : channelsNode) {
+ channels.add(ch.asText());
+ }
+
+ List> futures = channels.stream()
+ .map(channel -> CompletableFuture.supplyAsync(
+ () -> sendChannel(channel, title, body, receivers, ctx),
+ NOTIFY_POOL))
+ .toList();
+
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+
+ List failedChannels = new ArrayList<>();
+ for (int i = 0; i < channels.size(); i++) {
+ ChannelResult cr = futures.get(i).join();
+ if (!cr.success()) {
+ failedChannels.add(channels.get(i) + ":" + cr.error());
+ }
+ }
+
+ if (failedChannels.isEmpty()) {
+ return ActionResult.successMsg("notify: 全部通道发送成功");
+ } else {
+ // 部分失败:仍返回 SUCCESS,message 记录失败通道(评审 B6)
+ String msg = "notify: 部分通道失败 " + failedChannels;
+ log.warn("[NotifyAction] chain={} {}", ctx.getChainId(), msg);
+ return ActionResult.successMsg(msg);
+ }
+
+ } catch (Exception e) {
+ log.warn("[NotifyAction] chain={} 异常: {}", ctx.getChainId(), e.getMessage());
+ return ActionResult.failure(e.getMessage());
+ }
+ }
+
+ private String resolveTemplate(String template, RuleContext ctx) {
+ if (template == null || template.isBlank()) return template;
+ try {
+ return String.valueOf(templateResolver.resolve(template, ctx));
+ } catch (Exception e) {
+ log.warn("[NotifyAction] 模板解析失败 template='{}': {}", template, e.getMessage());
+ return template;
+ }
+ }
+
+ /**
+ * 单通道发送(第一期 stub,B16 NotifyService 就绪后替换)。
+ */
+ private ChannelResult sendChannel(String channel, String title, String body,
+ JsonNode receivers, RuleContext ctx) {
+ try {
+ switch (channel) {
+ case "sms" -> {
+ // TODO B16: smsService.send(receivers.userIds, title, body)
+ log.info("[NotifyAction] [stub] sms chain={} title='{}' body='{}'",
+ ctx.getChainId(), title, body);
+ }
+ case "email" -> {
+ // TODO B16: emailService.send(receivers.userIds, title, body)
+ log.info("[NotifyAction] [stub] email chain={} title='{}' body='{}'",
+ ctx.getChainId(), title, body);
+ }
+ case "in_app" -> {
+ // TODO B16: inAppService.send(receivers.userIds, title, body)
+ log.info("[NotifyAction] [stub] in_app chain={} title='{}' body='{}'",
+ ctx.getChainId(), title, body);
+ }
+ case "webhook" -> {
+ String webhookUrl = receivers.path("webhookUrl").asText("");
+ // TODO B16: webhookService.post(webhookUrl, title, body)
+ log.info("[NotifyAction] [stub] webhook chain={} url='{}' title='{}' body='{}'",
+ ctx.getChainId(), webhookUrl, title, body);
+ }
+ default -> {
+ log.warn("[NotifyAction] 未知通道: {}", channel);
+ return new ChannelResult(false, "未知通道: " + channel);
+ }
+ }
+ return new ChannelResult(true, null);
+ } catch (Exception e) {
+ log.warn("[NotifyAction] channel={} 发送失败: {}", channel, e.getMessage());
+ return new ChannelResult(false, e.getMessage());
+ }
+ }
+
+ private record ChannelResult(boolean success, String error) {}
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DagExecutor.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DagExecutor.java
index c436cd41..9f8df6ea 100644
--- a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DagExecutor.java
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DagExecutor.java
@@ -1,6 +1,7 @@
package com.viewsh.module.iot.rule.engine;
import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType;
+import com.viewsh.module.iot.rule.engine.branch.BranchNode;
import com.viewsh.module.iot.rule.engine.exception.RuleChainException;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
@@ -41,6 +42,9 @@ public class DagExecutor {
throw new RuleChainException(chain.getId(), null, "规则链缺少 Trigger 入口节点");
}
+ // Branch 节点需要通过 ctx.metadata 获取 CompiledRuleChain 以执行内联 action
+ ctx.getMetadata().put(BranchNode.CTX_CHAIN_KEY, chain);
+
Deque queue = new ArrayDeque<>();
queue.offer(entry);
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchConfiguration.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchConfiguration.java
new file mode 100644
index 00000000..da003d2a
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchConfiguration.java
@@ -0,0 +1,104 @@
+package com.viewsh.module.iot.rule.engine.branch;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.JsonNode;
+import lombok.Data;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Branch 节点配置 POJO(反序列化自 rule_node.configuration JSON)。
+ *
+ * 配置示例:
+ *
{@code
+ * {
+ * "branches": [
+ * {
+ * "name": "高温",
+ * "condition": { "type": "expression", "config": { "expression": "${data.temp} > 40" } },
+ * "actions": ["nodeId_alarm"],
+ * "executeAnyway": false
+ * },
+ * {
+ * "name": "else",
+ * "condition": null,
+ * "actions": ["nodeId_log"]
+ * }
+ * ]
+ * }
+ * }
+ *
+ * 约束(B7 规格):
+ *
+ * - {@code condition=null} 的 else 分支必须是最后一个
+ * - {@code executeAnyway=true}:命中后继续评估后续分支(重叠触发)
+ * - {@code executeAnyway=false}(默认):命中后跳过后续分支(if/else-if 语义)
+ *
+ */
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class BranchConfiguration {
+
+ private List branches = Collections.emptyList();
+
+ /**
+ * 校验配置合法性:else 分支(condition=null)必须是最后一个。
+ *
+ * @throws IllegalArgumentException 配置非法时抛出
+ */
+ public void validate() {
+ if (branches == null || branches.isEmpty()) {
+ return;
+ }
+ for (int i = 0; i < branches.size() - 1; i++) {
+ if (branches.get(i).getCondition() == null) {
+ throw new IllegalArgumentException(
+ "[BranchConfiguration] else 分支(condition=null)必须是最后一个,但在 index=" + i + " 处发现");
+ }
+ }
+ }
+
+ /**
+ * 单个分支配置。
+ */
+ @Data
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class BranchItem {
+
+ /** 分支名称(用于日志 / metadata 标记,必填) */
+ private String name;
+
+ /**
+ * 条件配置(null 表示 else 分支)。
+ * 格式:{"type": "expression", "config": {...}}
+ */
+ private BranchCondition condition;
+
+ /**
+ * 命中时需要执行的下游节点 ID 列表。
+ * BranchNode 按此列表在 CompiledRuleChain 中查找对应节点并内联执行。
+ */
+ private List actions = Collections.emptyList();
+
+ /**
+ * 是否无论前面是否命中都继续评估此分支(重叠触发)。
+ * true = 执行后继续评估后续分支;false(默认)= 命中后跳过后续分支。
+ */
+ private boolean executeAnyway = false;
+ }
+
+ /**
+ * 分支条件配置(内嵌 type + config)。
+ */
+ @Data
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class BranchCondition {
+
+ /** 条件类型,对应 {@link com.viewsh.module.iot.rule.spi.ConditionEvaluator#getType()} */
+ private String type;
+
+ /** 条件具体配置(由对应的 ConditionEvaluator 解析) */
+ private JsonNode config;
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchExecutor.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchExecutor.java
new file mode 100644
index 00000000..be94d1aa
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchExecutor.java
@@ -0,0 +1,153 @@
+package com.viewsh.module.iot.rule.engine.branch;
+
+import com.viewsh.module.iot.rule.condition.ConditionEvaluatorManager;
+import com.viewsh.module.iot.rule.engine.RuleContext;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Branch 节点核心执行语义(executeAnyway if/else-if/else)。
+ *
+ * 执行流程(B7 §3.2 规格):
+ *
+ * - 按数组顺序遍历 branches
+ * - condition=null 视为 else:仅当前面所有分支都未命中时执行
+ * - executeAnyway=false(默认):命中后执行 actions,跳过后续分支
+ * - executeAnyway=true:命中后执行 actions,继续评估后续分支
+ * - 条件评估异常:该分支跳过(短路求值,不当失败),继续下一个
+ * - action 执行异常:记录日志后继续执行同 branch 其余 action(异常隔离)
+ *
+ *
+ * 此类不依赖 Spring,便于单元测试(mock ConditionEvaluatorManager 和 ActionExecutor)。
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class BranchExecutor {
+
+ private final ConditionEvaluatorManager conditionEvaluatorManager;
+
+ /**
+ * 执行分支逻辑。
+ *
+ * @param ctx 规则执行上下文
+ * @param configuration branch 节点配置(已解析,已 validate)
+ * @param actionExecutor 回调:执行指定 nodeId 对应的 action(由 BranchNode 提供真实实现,测试可 mock)
+ * @return 命中的分支名列表(可能为空)
+ */
+ public List execute(RuleContext ctx,
+ BranchConfiguration configuration,
+ ActionExecutor actionExecutor) {
+ List branches = configuration.getBranches();
+ List matchedBranches = new ArrayList<>();
+
+ boolean anyPreviousMatched = false;
+
+ for (BranchConfiguration.BranchItem branch : branches) {
+ boolean match = evaluateMatch(ctx, branch, anyPreviousMatched);
+
+ if (match) {
+ String branchName = branch.getName();
+ log.debug("[BranchExecutor] chain={} branch='{}' 命中,executeAnyway={}",
+ ctx.getChainId(), branchName, branch.isExecuteAnyway());
+
+ matchedBranches.add(branchName);
+ executeActions(ctx, branch, actionExecutor);
+
+ if (!branch.isExecuteAnyway()) {
+ // if/else-if 语义:命中后跳过后续
+ log.debug("[BranchExecutor] chain={} branch='{}' executeAnyway=false,终止后续分支评估",
+ ctx.getChainId(), branchName);
+ break;
+ }
+ // executeAnyway=true:继续评估后续分支
+ anyPreviousMatched = true;
+ } else {
+ if (branch.isExecuteAnyway()) {
+ // executeAnyway=true 但条件未命中:跳过,继续后续
+ log.debug("[BranchExecutor] chain={} branch='{}' 未命中(executeAnyway=true),继续评估",
+ ctx.getChainId(), branch.getName());
+ }
+ // 不更新 anyPreviousMatched(未命中的 executeAnyway 分支不算 "previous matched")
+ }
+ }
+
+ if (matchedBranches.isEmpty()) {
+ log.debug("[BranchExecutor] chain={} 无分支命中", ctx.getChainId());
+ }
+
+ return matchedBranches;
+ }
+
+ /**
+ * 判断某个分支是否命中。
+ *
+ * @param anyPreviousMatched 前面是否有非 executeAnyway 的分支已命中
+ */
+ private boolean evaluateMatch(RuleContext ctx,
+ BranchConfiguration.BranchItem branch,
+ boolean anyPreviousMatched) {
+ BranchConfiguration.BranchCondition condition = branch.getCondition();
+ if (condition == null) {
+ // else 分支:仅当前面都未命中时执行
+ return !anyPreviousMatched;
+ }
+
+ try {
+ boolean result = conditionEvaluatorManager.evaluate(condition.getType(), ctx, condition.getConfig());
+ log.debug("[BranchExecutor] chain={} branch='{}' 条件评估结果={}",
+ ctx.getChainId(), branch.getName(), result);
+ return result;
+ } catch (Exception e) {
+ // 短路求值:条件异常 → 跳过该 branch,不当失败
+ log.warn("[BranchExecutor] chain={} branch='{}' 条件评估异常,跳过:{}",
+ ctx.getChainId(), branch.getName(), e.getMessage());
+ return false;
+ }
+ }
+
+ /**
+ * 执行分支内所有 actions(action 异常隔离,不影响其他 action)。
+ */
+ private void executeActions(RuleContext ctx,
+ BranchConfiguration.BranchItem branch,
+ ActionExecutor actionExecutor) {
+ String branchName = branch.getName();
+ List actions = branch.getActions();
+ if (actions == null || actions.isEmpty()) {
+ return;
+ }
+
+ for (String actionNodeId : actions) {
+ try {
+ log.debug("[BranchExecutor] chain={} branch='{}' 执行 action nodeId={}",
+ ctx.getChainId(), branchName, actionNodeId);
+ actionExecutor.execute(ctx, branchName, actionNodeId);
+ } catch (Exception e) {
+ // action 异常隔离:记录日志后继续下一个 action
+ log.warn("[BranchExecutor] chain={} branch='{}' action nodeId={} 执行异常(隔离):{}",
+ ctx.getChainId(), branchName, actionNodeId, e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * action 执行回调接口(用于解耦 BranchExecutor 与 NodeProvider/DagExecutor)。
+ *
+ * 生产实现由 {@link BranchNode} 提供(通过 CompiledRuleChain + NodeProviderRegistry 执行);
+ * 测试时可使用 mock 或 lambda 替代。
+ */
+ @FunctionalInterface
+ public interface ActionExecutor {
+ /**
+ * 执行指定分支内的一个 action 节点。
+ *
+ * @param ctx 规则上下文(已含 branch_name metadata)
+ * @param branchName 所属分支名(供日志 / metadata 使用)
+ * @param actionNodeId 节点 ID(来自 BranchConfiguration.BranchItem.actions)
+ */
+ void execute(RuleContext ctx, String branchName, String actionNodeId);
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchNode.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchNode.java
new file mode 100644
index 00000000..f0a3d734
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/branch/BranchNode.java
@@ -0,0 +1,153 @@
+package com.viewsh.module.iot.rule.engine.branch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.viewsh.module.iot.rule.condition.ConditionEvaluatorManager;
+import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory;
+import com.viewsh.module.iot.rule.engine.CompiledNode;
+import com.viewsh.module.iot.rule.engine.CompiledRuleChain;
+import com.viewsh.module.iot.rule.engine.NodeProvider;
+import com.viewsh.module.iot.rule.engine.NodeProviderRegistry;
+import com.viewsh.module.iot.rule.engine.NodeResult;
+import com.viewsh.module.iot.rule.engine.RuleContext;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Branch 类型节点 Provider(category=ACTION, type="branch")。
+ *
+ *
职责:
+ *
+ * - 将 rule_node.configuration JSON 解析为 {@link BranchConfiguration}
+ * - 校验 else 分支(condition=null)是否在最后
+ * - 调用 {@link BranchExecutor} 执行分支语义(if/else-if/executeAnyway)
+ * - 命中的 branch 的 actions 通过 {@link NodeProviderRegistry} 内联执行
+ * - 将命中分支名列表写入 ctx.metadata["branch_names"],供下游节点参考
+ * - 返回 {@link NodeResult#success()} 让 DagExecutor 沿 SUCCESS 出边走公共下游
+ *
+ *
+ * 内联执行说明:DAG 中 BranchNode 的 SUCCESS 出边指向 Branch 之后的公共节点;
+ * Branch 内 actions(互斥子节点)由 BranchNode 自行执行,不依赖 DagExecutor 的出边路由。
+ * 因此 Branch actions 对应的子节点在 rule_node 中存在但没有从 BranchNode 发出的 link,
+ * 或者有 link 但 DagExecutor 会通过 BranchNode 内联执行后跳过(取决于链设计)。
+ *
+ *
CompiledRuleChain 通过 DagExecutor 写入 ctx.metadata["{@value #CTX_CHAIN_KEY}"] 获得,
+ * 供 BranchNode 查找子节点配置。
+ */
+@Slf4j
+@Component
+public class BranchNode implements NodeProvider {
+
+ /** DagExecutor 在执行前将 CompiledRuleChain 写入 ctx.metadata 的 key */
+ public static final String CTX_CHAIN_KEY = "__compiledRuleChain__";
+
+ public static final String TYPE = "branch";
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private final ConditionEvaluatorManager conditionEvaluatorManager;
+ private final NodeProviderRegistry nodeProviderRegistry;
+ private final BranchExecutor branchExecutor;
+
+ public BranchNode(ConditionEvaluatorManager conditionEvaluatorManager,
+ NodeProviderRegistry nodeProviderRegistry) {
+ this.conditionEvaluatorManager = conditionEvaluatorManager;
+ this.nodeProviderRegistry = nodeProviderRegistry;
+ this.branchExecutor = new BranchExecutor(conditionEvaluatorManager);
+ }
+
+ @Override
+ public RuleNodeCategory getCategory() {
+ return RuleNodeCategory.ACTION;
+ }
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ @Override
+ public NodeResult execute(RuleContext ctx, String config) {
+ // 1. 解析配置
+ BranchConfiguration configuration;
+ try {
+ configuration = MAPPER.readValue(config == null ? "{}" : config, BranchConfiguration.class);
+ } catch (Exception e) {
+ log.warn("[BranchNode] chain={} config JSON 解析失败: {}", ctx.getChainId(), e.getMessage());
+ return NodeResult.failure("BranchNode config 解析失败: " + e.getMessage(), e);
+ }
+
+ // 2. 校验(else 分支必须在最后)
+ try {
+ configuration.validate();
+ } catch (IllegalArgumentException e) {
+ log.warn("[BranchNode] chain={} 配置校验失败: {}", ctx.getChainId(), e.getMessage());
+ return NodeResult.failure(e.getMessage());
+ }
+
+ if (configuration.getBranches().isEmpty()) {
+ log.debug("[BranchNode] chain={} branches 为空,no-op", ctx.getChainId());
+ return NodeResult.success();
+ }
+
+ // 3. 获取 CompiledRuleChain(用于内联执行 action 节点)
+ CompiledRuleChain chain = (CompiledRuleChain) ctx.getMetadata().get(CTX_CHAIN_KEY);
+
+ // 4. 构建 ActionExecutor(使用 NodeProviderRegistry 内联执行)
+ BranchExecutor.ActionExecutor actionExecutor = buildActionExecutor(ctx, chain);
+
+ // 5. 执行分支逻辑
+ List matchedBranches = branchExecutor.execute(ctx, configuration, actionExecutor);
+
+ // 6. 将命中分支名写入 metadata,供下游节点参考
+ if (!matchedBranches.isEmpty()) {
+ ctx.getMetadata().put("branch_names", matchedBranches);
+ ctx.getMetadata().put("branch_name", matchedBranches.get(0));
+ }
+
+ return NodeResult.success(Map.of("branch_names", matchedBranches));
+ }
+
+ /**
+ * 构建 ActionExecutor:通过 NodeProviderRegistry 内联执行 action 节点。
+ *
+ * 若 CompiledRuleChain 不可用(chain=null 或节点不存在),则打印 warn 日志跳过。
+ */
+ private BranchExecutor.ActionExecutor buildActionExecutor(RuleContext ctx, CompiledRuleChain chain) {
+ return (ruleCtx, branchName, actionNodeId) -> {
+ if (chain == null) {
+ log.warn("[BranchNode] chain={} branch='{}' 无法获取 CompiledRuleChain,跳过 action={}",
+ ruleCtx.getChainId(), branchName, actionNodeId);
+ return;
+ }
+
+ CompiledNode actionNode = chain.nodeById(actionNodeId);
+ if (actionNode == null) {
+ log.warn("[BranchNode] chain={} branch='{}' action nodeId={} 不存在于链中,跳过",
+ ruleCtx.getChainId(), branchName, actionNodeId);
+ return;
+ }
+
+ try {
+ NodeProvider provider = nodeProviderRegistry.resolve(actionNode.getCategory(), actionNode.getType());
+ // 更新 ctx 的当前节点(与 DagExecutor.fork 行为一致)
+ ruleCtx.fork(actionNodeId);
+ // 将 branch_name 写入 metadata,便于 action 内的日志追踪
+ ruleCtx.getMetadata().put("branch_name", branchName);
+
+ NodeResult result = provider.execute(ruleCtx, actionNode.getConfiguration());
+ if (result != null) {
+ ruleCtx.recordNodeOutput(actionNodeId, result.getOutputs());
+ }
+ log.debug("[BranchNode] chain={} branch='{}' action={} 执行完成,result={}",
+ ruleCtx.getChainId(), branchName, actionNodeId,
+ result != null ? result.getRelationType() : "null");
+ } catch (Exception e) {
+ // 异常已在 BranchExecutor.executeActions 中捕获,此处不再 re-throw
+ throw new RuntimeException("action=" + actionNodeId + " 执行异常: " + e.getMessage(), e);
+ }
+ };
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/result/ActionResult.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/result/ActionResult.java
new file mode 100644
index 00000000..45ff9298
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/result/ActionResult.java
@@ -0,0 +1,54 @@
+package com.viewsh.module.iot.rule.result;
+
+import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType;
+
+/**
+ * Action 执行结果。
+ *
+ *
relationType 决定 DAG 走哪条 outgoing link:
+ *
+ * - {@link RuleLinkRelationType#SUCCESS} — Action 执行成功
+ * - {@link RuleLinkRelationType#FAILURE} — Action 执行失败(内部捕获,不上抛)
+ * - {@link RuleLinkRelationType#SKIP} — 静默跳过(如告警已清除,重复清除为 no-op)
+ *
+ *
+ * output 会注入 {@code ctx.nodeOutputs}(key="output"),可供后继节点引用。
+ */
+public record ActionResult(
+ RuleLinkRelationType relationType,
+ String message,
+ Object output
+) {
+
+ public static ActionResult success() {
+ return new ActionResult(RuleLinkRelationType.SUCCESS, null, null);
+ }
+
+ public static ActionResult success(Object output) {
+ return new ActionResult(RuleLinkRelationType.SUCCESS, null, output);
+ }
+
+ public static ActionResult successMsg(String message) {
+ return new ActionResult(RuleLinkRelationType.SUCCESS, message, null);
+ }
+
+ public static ActionResult success(String message, Object output) {
+ return new ActionResult(RuleLinkRelationType.SUCCESS, message, output);
+ }
+
+ public static ActionResult failure(String message) {
+ return new ActionResult(RuleLinkRelationType.FAILURE, message, null);
+ }
+
+ public static ActionResult skip(String reason) {
+ return new ActionResult(RuleLinkRelationType.SKIP, reason, null);
+ }
+
+ public boolean isSuccess() {
+ return relationType == RuleLinkRelationType.SUCCESS;
+ }
+
+ public boolean isFailure() {
+ return relationType == RuleLinkRelationType.FAILURE;
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/result/RpcCommand.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/result/RpcCommand.java
new file mode 100644
index 00000000..89dc0025
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/result/RpcCommand.java
@@ -0,0 +1,37 @@
+package com.viewsh.module.iot.rule.result;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * 设备 RPC 指令封装(预留,B29 期实现完整持久化)。
+ *
+ *
第一期 DeviceServiceInvokeAction 先调用现有 {@code IotDeviceControlApi.invokeService()};
+ * 第二期 B27-B34 引入 B29 后,Action 层改为创建 RpcCommand 记录并异步跟踪执行结果。
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class RpcCommand {
+
+ /** 目标设备 ID */
+ private Long deviceId;
+
+ /** 服务标识符(物模型 identifier) */
+ private String serviceId;
+
+ /** 入参(JSON 格式) */
+ private String inputParams;
+
+ /** 调用超时(ms),默认 30000 */
+ private Integer timeoutMs;
+
+ /** 关联的规则链 ID(用于追踪) */
+ private Long ruleChainId;
+
+ /** 关联的节点 ID(用于追踪) */
+ private String nodeId;
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/spi/ActionProvider.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/spi/ActionProvider.java
new file mode 100644
index 00000000..c66653b9
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/spi/ActionProvider.java
@@ -0,0 +1,78 @@
+package com.viewsh.module.iot.rule.spi;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory;
+import com.viewsh.module.iot.rule.engine.NodeProvider;
+import com.viewsh.module.iot.rule.engine.NodeResult;
+import com.viewsh.module.iot.rule.engine.RuleContext;
+import com.viewsh.module.iot.rule.result.ActionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Action SPI — 规则链动作节点执行接口。
+ *
+ *
实现通过 Spring {@code @Component} + {@link #getType()} 注册,由
+ * {@link com.viewsh.module.iot.rule.action.ActionProviderManager} 管理。
+ * 同时实现 {@link NodeProvider}(category=ACTION),使其可被 DagExecutor 直接路由。
+ *
+ *
约定(评审 B6):
+ *
+ * - 所有模板变量解析统一走 {@link com.viewsh.module.iot.rule.template.TemplateResolver}
+ * - Action 异常必须捕获并转为 {@link ActionResult#failure(String)},不上抛到链级
+ * - 不使用 {@code @Async},保持同步以保留 traceId 和 tenant 上下文
+ *
+ */
+public interface ActionProvider extends NodeProvider {
+
+ Logger LOG = LoggerFactory.getLogger(ActionProvider.class);
+ ObjectMapper CONFIG_MAPPER = new ObjectMapper();
+
+ @Override
+ default RuleNodeCategory getCategory() {
+ return RuleNodeCategory.ACTION;
+ }
+
+ /**
+ * 执行 Action(JsonNode config 版本)。
+ *
+ * @param ctx 规则执行上下文
+ * @param config 节点 configuration(已解析为 JsonNode)
+ * @return ActionResult(不允许返回 null)
+ */
+ ActionResult executeAction(RuleContext ctx, JsonNode config);
+
+ /**
+ * NodeProvider.execute 默认桥接:将 String config 解析为 JsonNode,
+ * 执行 {@link #executeAction(RuleContext, JsonNode)},并映射为 {@link NodeResult}。
+ *
+ * config 解析异常或 executeAction 异常,均转为 {@link NodeResult#failure(String)}。
+ */
+ @Override
+ default NodeResult execute(RuleContext ctx, String config) {
+ JsonNode cfg;
+ try {
+ cfg = CONFIG_MAPPER.readTree(config == null ? "{}" : config);
+ } catch (Exception e) {
+ LOG.warn("[ActionProvider] type={} config JSON 解析失败: {}", getType(), e.getMessage());
+ return NodeResult.failure("config JSON 解析失败: " + e.getMessage(), e);
+ }
+ try {
+ ActionResult result = executeAction(ctx, cfg);
+ if (result == null) {
+ LOG.warn("[ActionProvider] type={} executeAction 返回 null", getType());
+ return NodeResult.failure("executeAction 返回 null");
+ }
+ Map outputs = result.output() != null
+ ? Map.of("output", result.output())
+ : Map.of();
+ return NodeResult.of(result.relationType(), outputs);
+ } catch (Exception e) {
+ LOG.warn("[ActionProvider] type={} 执行异常(转为 FAILURE): {}", getType(), e.getMessage());
+ return NodeResult.failure(e.getMessage(), e);
+ }
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/action/ActionProviderTest.java b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/action/ActionProviderTest.java
new file mode 100644
index 00000000..1177bc73
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/action/ActionProviderTest.java
@@ -0,0 +1,286 @@
+package com.viewsh.module.iot.rule.action;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.viewsh.framework.common.pojo.CommonResult;
+import com.viewsh.module.iot.api.alarm.IotAlarmRecordApi;
+import com.viewsh.module.iot.api.alarm.dto.IotAlarmTriggerReqDTO;
+import com.viewsh.module.iot.api.alarm.dto.IotAlarmClearReqDTO;
+import com.viewsh.module.iot.api.device.IotDeviceControlApi;
+import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeReqDTO;
+import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeRespDTO;
+import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
+import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType;
+import com.viewsh.module.iot.rule.engine.NodeResult;
+import com.viewsh.module.iot.rule.engine.RuleContext;
+import com.viewsh.module.iot.rule.result.ActionResult;
+import com.viewsh.module.iot.rule.template.TemplateResolver;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.time.Instant;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+/**
+ * B6 ActionProvider 单元测试:5 个 Action + ActionProvider 接口桥接(NodeProvider)。
+ */
+class ActionProviderTest {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private IotAlarmRecordApi alarmRecordApi;
+ private IotDeviceControlApi deviceControlApi;
+ private TemplateResolver templateResolver;
+
+ @BeforeEach
+ void setUp() {
+ alarmRecordApi = mock(IotAlarmRecordApi.class);
+ deviceControlApi = mock(IotDeviceControlApi.class);
+ templateResolver = new TemplateResolver(MAPPER);
+ }
+
+ // ======================== AlarmTriggerAction ========================
+
+ @Test
+ void alarmTrigger_firstTime_callsServiceAndWritesAlarmIdToMeta() throws Exception {
+ when(alarmRecordApi.triggerAlarm(any())).thenReturn(CommonResult.success(42L));
+
+ AlarmTriggerAction action = new AlarmTriggerAction(alarmRecordApi, templateResolver);
+ RuleContext ctx = ctx(1L, 10L);
+
+ JsonNode config = MAPPER.readTree("""
+ {"alarmConfigId": 100, "severity": 3, "name": "温度告警"}
+ """);
+ ActionResult result = action.executeAction(ctx, config);
+
+ assertTrue(result.isSuccess());
+ assertEquals(42L, ctx.getMetadata().get("alarmId"));
+ ArgumentCaptor cap = ArgumentCaptor.forClass(IotAlarmTriggerReqDTO.class);
+ verify(alarmRecordApi).triggerAlarm(cap.capture());
+ assertEquals(100L, cap.getValue().getAlarmConfigId());
+ assertEquals(3, cap.getValue().getSeverity());
+ }
+
+ @Test
+ void alarmTrigger_missingAlarmConfigId_returnsFailure() throws Exception {
+ AlarmTriggerAction action = new AlarmTriggerAction(alarmRecordApi, templateResolver);
+ RuleContext ctx = ctx(1L, 10L);
+
+ JsonNode config = MAPPER.readTree("""
+ {"severity": 3}
+ """);
+ ActionResult result = action.executeAction(ctx, config);
+
+ assertTrue(result.isFailure());
+ verifyNoInteractions(alarmRecordApi);
+ }
+
+ @Test
+ void alarmTrigger_templateName_resolved() throws Exception {
+ when(alarmRecordApi.triggerAlarm(any())).thenReturn(CommonResult.success(99L));
+
+ AlarmTriggerAction action = new AlarmTriggerAction(alarmRecordApi, templateResolver);
+ RuleContext ctx = ctx(1L, 10L);
+ ctx.getMetadata().put("deviceName", "传感器A");
+
+ JsonNode config = MAPPER.readTree("""
+ {"alarmConfigId": 1, "severity": 2, "name": "设备 ${meta.deviceName} 过热"}
+ """);
+ action.executeAction(ctx, config);
+
+ ArgumentCaptor cap = ArgumentCaptor.forClass(IotAlarmTriggerReqDTO.class);
+ verify(alarmRecordApi).triggerAlarm(cap.capture());
+ assertEquals("设备 传感器A 过热", cap.getValue().getAlarmName());
+ }
+
+ // ======================== AlarmClearAction ========================
+
+ @Test
+ void alarmClear_withAlarmId_callsService() throws Exception {
+ when(alarmRecordApi.clearAlarm(any())).thenReturn(CommonResult.success(true));
+
+ AlarmClearAction action = new AlarmClearAction(alarmRecordApi);
+ RuleContext ctx = ctx(1L, 10L);
+
+ JsonNode config = MAPPER.readTree("""
+ {"alarmId": 55, "operator": "rule-engine"}
+ """);
+ ActionResult result = action.executeAction(ctx, config);
+
+ assertTrue(result.isSuccess());
+ ArgumentCaptor cap = ArgumentCaptor.forClass(IotAlarmClearReqDTO.class);
+ verify(alarmRecordApi).clearAlarm(cap.capture());
+ assertEquals(55L, cap.getValue().getAlarmId());
+ }
+
+ @Test
+ void alarmClear_alarmIdFromMeta_resolvedCorrectly() throws Exception {
+ when(alarmRecordApi.clearAlarm(any())).thenReturn(CommonResult.success(true));
+
+ AlarmClearAction action = new AlarmClearAction(alarmRecordApi);
+ RuleContext ctx = ctx(1L, 10L);
+ ctx.getMetadata().put("alarmId", 77L);
+
+ JsonNode config = MAPPER.readTree("""
+ {"alarmId": "${meta.alarmId}"}
+ """);
+ ActionResult result = action.executeAction(ctx, config);
+
+ assertTrue(result.isSuccess());
+ ArgumentCaptor cap = ArgumentCaptor.forClass(IotAlarmClearReqDTO.class);
+ verify(alarmRecordApi).clearAlarm(cap.capture());
+ assertEquals(77L, cap.getValue().getAlarmId());
+ }
+
+ @Test
+ void alarmClear_noAlarmId_returnsFailure() throws Exception {
+ AlarmClearAction action = new AlarmClearAction(alarmRecordApi);
+ RuleContext ctx = ctx(1L, 10L);
+
+ JsonNode config = MAPPER.readTree("{}");
+ ActionResult result = action.executeAction(ctx, config);
+
+ assertTrue(result.isFailure());
+ verifyNoInteractions(alarmRecordApi);
+ }
+
+ // ======================== DeviceServiceInvokeAction ========================
+
+ @Test
+ void deviceServiceInvoke_success_returnsSuccess() throws Exception {
+ IotDeviceServiceInvokeRespDTO resp = IotDeviceServiceInvokeRespDTO.builder()
+ .success(true).build();
+ when(deviceControlApi.invokeService(any())).thenReturn(CommonResult.success(resp));
+
+ DeviceServiceInvokeAction action = new DeviceServiceInvokeAction(deviceControlApi);
+ RuleContext ctx = ctx(1L, 10L);
+
+ JsonNode config = MAPPER.readTree("""
+ {"identifier": "playVoice", "params": {"text": "告警!"}}
+ """);
+ ActionResult result = action.executeAction(ctx, config);
+
+ assertTrue(result.isSuccess());
+ ArgumentCaptor cap = ArgumentCaptor.forClass(IotDeviceServiceInvokeReqDTO.class);
+ verify(deviceControlApi).invokeService(cap.capture());
+ assertEquals(10L, cap.getValue().getDeviceId());
+ assertEquals("playVoice", cap.getValue().getIdentifier());
+ }
+
+ @Test
+ void deviceServiceInvoke_missingIdentifier_returnsFailure() throws Exception {
+ DeviceServiceInvokeAction action = new DeviceServiceInvokeAction(deviceControlApi);
+ RuleContext ctx = ctx(1L, 10L);
+
+ JsonNode config = MAPPER.readTree("{}");
+ ActionResult result = action.executeAction(ctx, config);
+
+ assertTrue(result.isFailure());
+ verifyNoInteractions(deviceControlApi);
+ }
+
+ // ======================== DevicePropertySetAction ========================
+
+ @Test
+ void devicePropertySet_withProperties_resolveTemplateAndReturnSuccess() throws Exception {
+ DevicePropertySetAction action = new DevicePropertySetAction(templateResolver);
+ RuleContext ctx = ctx(1L, 10L);
+ IotDeviceMessage msg = new IotDeviceMessage();
+ ctx.setMessage(msg);
+
+ JsonNode config = MAPPER.readTree("""
+ {"properties": {"mode": "auto", "targetTemp": "28"}}
+ """);
+ ActionResult result = action.executeAction(ctx, config);
+
+ assertTrue(result.isSuccess());
+ assertNotNull(result.message());
+ assertTrue(result.message().contains("2"));
+ }
+
+ @Test
+ void devicePropertySet_missingProperties_returnsFailure() throws Exception {
+ DevicePropertySetAction action = new DevicePropertySetAction(templateResolver);
+ RuleContext ctx = ctx(1L, 10L);
+
+ JsonNode config = MAPPER.readTree("{}");
+ ActionResult result = action.executeAction(ctx, config);
+
+ assertTrue(result.isFailure());
+ }
+
+ // ======================== NotifyAction ========================
+
+ @Test
+ void notify_allChannelsOk_returnsSuccess() throws Exception {
+ NotifyAction action = new NotifyAction(templateResolver);
+ RuleContext ctx = ctx(1L, 10L);
+
+ JsonNode config = MAPPER.readTree("""
+ {
+ "channels": ["sms","email","in_app","webhook"],
+ "receivers": {"webhookUrl": "https://example.com"},
+ "template": {"title": "告警", "body": "消息体"}
+ }
+ """);
+ ActionResult result = action.executeAction(ctx, config);
+
+ assertTrue(result.isSuccess());
+ }
+
+ @Test
+ void notify_noChannels_returnsFailure() throws Exception {
+ NotifyAction action = new NotifyAction(templateResolver);
+ RuleContext ctx = ctx(1L, 10L);
+
+ JsonNode config = MAPPER.readTree("""
+ {"channels": [], "template": {"title": "T", "body": "B"}}
+ """);
+ ActionResult result = action.executeAction(ctx, config);
+
+ assertTrue(result.isFailure());
+ }
+
+ // ======================== NodeProvider bridge ========================
+
+ @Test
+ void actionProvider_executeViaNodeProviderBridge_convertsToNodeResult() throws Exception {
+ when(alarmRecordApi.triggerAlarm(any())).thenReturn(CommonResult.success(99L));
+ AlarmTriggerAction action = new AlarmTriggerAction(alarmRecordApi, templateResolver);
+ RuleContext ctx = ctx(1L, 10L);
+
+ String configJson = """
+ {"alarmConfigId": 1, "severity": 2}
+ """;
+ NodeResult result = action.execute(ctx, configJson);
+
+ assertEquals(RuleLinkRelationType.SUCCESS, result.getRelationType());
+ }
+
+ @Test
+ void actionProvider_badConfigJson_returnsFailureNodeResult() throws Exception {
+ AlarmTriggerAction action = new AlarmTriggerAction(alarmRecordApi, templateResolver);
+ RuleContext ctx = ctx(1L, 10L);
+
+ NodeResult result = action.execute(ctx, "{invalid json}");
+
+ assertEquals(RuleLinkRelationType.FAILURE, result.getRelationType());
+ verifyNoInteractions(alarmRecordApi);
+ }
+
+ // ======================== Helper ========================
+
+ private RuleContext ctx(Long chainId, Long deviceId) {
+ RuleContext ctx = new RuleContext();
+ ctx.setChainId(chainId);
+ ctx.setDeviceId(deviceId);
+ ctx.setProductId(100L);
+ ctx.setTenantId(1L);
+ ctx.setSubsystemId(1L);
+ ctx.setStartedAt(Instant.now());
+ return ctx;
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/branch/BranchExecutorTest.java b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/branch/BranchExecutorTest.java
new file mode 100644
index 00000000..2c2c8701
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/branch/BranchExecutorTest.java
@@ -0,0 +1,269 @@
+package com.viewsh.module.iot.rule.engine.branch;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.viewsh.module.iot.rule.condition.ConditionEvaluatorManager;
+import com.viewsh.module.iot.rule.engine.RuleContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.*;
+
+/**
+ * {@link BranchExecutor} 单元测试(7 个用例,B7 §6)。
+ *
+ * 使用 Mockito mock ConditionEvaluatorManager,不启动 Spring 容器。
+ * ActionExecutor 通过 lambda 记录执行了哪些 nodeId,供断言使用。
+ */
+class BranchExecutorTest {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private ConditionEvaluatorManager conditionEvaluatorManager;
+ private BranchExecutor branchExecutor;
+ private RuleContext ctx;
+
+ /** 记录 actionExecutor 被调用的 (branchName, actionNodeId) 对 */
+ private List executedActions;
+
+ @BeforeEach
+ void setUp() {
+ conditionEvaluatorManager = mock(ConditionEvaluatorManager.class);
+ branchExecutor = new BranchExecutor(conditionEvaluatorManager);
+
+ ctx = new RuleContext();
+ ctx.setChainId(1L);
+ ctx.setTraceId("test-trace");
+
+ executedActions = new ArrayList<>();
+ }
+
+ /**
+ * 用例 1: if_match — [A] A=true → A 执行
+ */
+ @Test
+ void testIfMatch_singleBranchTrue() throws Exception {
+ when(conditionEvaluatorManager.evaluate(eq("expression"), any(), any())).thenReturn(true);
+
+ BranchConfiguration config = buildConfig(
+ branch("A", "expression", false, "nodeA")
+ );
+
+ List matched = branchExecutor.execute(ctx, config, recordingExecutor());
+
+ assertEquals(List.of("A"), matched);
+ assertEquals(List.of("nodeA"), executedActions);
+ }
+
+ /**
+ * 用例 2: if_else (A=true) — [A, null] A=true → A 执行,else 跳过
+ */
+ @Test
+ void testIfElse_firstBranchTrue_elseSkipped() throws Exception {
+ when(conditionEvaluatorManager.evaluate(eq("expression"), any(), any())).thenReturn(true);
+
+ BranchConfiguration config = buildConfig(
+ branch("A", "expression", false, "nodeA"),
+ elseBranch("else", "nodeElse")
+ );
+
+ List matched = branchExecutor.execute(ctx, config, recordingExecutor());
+
+ assertEquals(List.of("A"), matched);
+ assertEquals(List.of("nodeA"), executedActions);
+ assertFalse(executedActions.contains("nodeElse"), "else 分支不应执行");
+ }
+
+ /**
+ * 用例 3: if_else (A=false) — [A, null] A=false → else 执行
+ */
+ @Test
+ void testIfElse_firstBranchFalse_elseExecuted() throws Exception {
+ when(conditionEvaluatorManager.evaluate(eq("expression"), any(), any())).thenReturn(false);
+
+ BranchConfiguration config = buildConfig(
+ branch("A", "expression", false, "nodeA"),
+ elseBranch("else", "nodeElse")
+ );
+
+ List matched = branchExecutor.execute(ctx, config, recordingExecutor());
+
+ assertEquals(List.of("else"), matched);
+ assertFalse(executedActions.contains("nodeA"), "A 分支不应执行");
+ assertEquals(List.of("nodeElse"), executedActions);
+ }
+
+ /**
+ * 用例 4: else_if — [A, B] A=false, B=true → B 执行
+ */
+ @Test
+ void testElseIf_firstFalseSecondTrue() throws Exception {
+ // A=false, B=true
+ when(conditionEvaluatorManager.evaluate(eq("exprA"), any(), any())).thenReturn(false);
+ when(conditionEvaluatorManager.evaluate(eq("exprB"), any(), any())).thenReturn(true);
+
+ BranchConfiguration config = buildConfigWithTypes(
+ branchWithType("A", "exprA", false, "nodeA"),
+ branchWithType("B", "exprB", false, "nodeB")
+ );
+
+ List matched = branchExecutor.execute(ctx, config, recordingExecutor());
+
+ assertEquals(List.of("B"), matched);
+ assertFalse(executedActions.contains("nodeA"));
+ assertEquals(List.of("nodeB"), executedActions);
+ }
+
+ /**
+ * 用例 5: executeAnyway=true — [A(executeAnyway=true), B] A=true, B=true → A+B 都执行
+ */
+ @Test
+ void testExecuteAnyway_true_bothExecuted() throws Exception {
+ // A=true (executeAnyway=true), B=true
+ when(conditionEvaluatorManager.evaluate(eq("exprA"), any(), any())).thenReturn(true);
+ when(conditionEvaluatorManager.evaluate(eq("exprB"), any(), any())).thenReturn(true);
+
+ BranchConfiguration config = buildConfigWithTypes(
+ branchWithType("A", "exprA", true, "nodeA"), // executeAnyway=true
+ branchWithType("B", "exprB", false, "nodeB")
+ );
+
+ List matched = branchExecutor.execute(ctx, config, recordingExecutor());
+
+ assertEquals(List.of("A", "B"), matched);
+ assertTrue(executedActions.contains("nodeA"), "A 分支应执行");
+ assertTrue(executedActions.contains("nodeB"), "B 分支应执行");
+ }
+
+ /**
+ * 用例 6: executeAnyway=false — [A, B] A=true → 只 A 执行(B 跳过)
+ */
+ @Test
+ void testExecuteAnyway_false_onlyFirstExecuted() throws Exception {
+ when(conditionEvaluatorManager.evaluate(eq("exprA"), any(), any())).thenReturn(true);
+ when(conditionEvaluatorManager.evaluate(eq("exprB"), any(), any())).thenReturn(true);
+
+ BranchConfiguration config = buildConfigWithTypes(
+ branchWithType("A", "exprA", false, "nodeA"), // executeAnyway=false(默认)
+ branchWithType("B", "exprB", false, "nodeB")
+ );
+
+ List matched = branchExecutor.execute(ctx, config, recordingExecutor());
+
+ assertEquals(List.of("A"), matched);
+ assertTrue(executedActions.contains("nodeA"), "A 分支应执行");
+ assertFalse(executedActions.contains("nodeB"), "B 分支不应执行(已被 A 命中后跳过)");
+ }
+
+ /**
+ * 用例 7: all_no_match — [A, B] A=false, B=false → 都不执行(无 else)
+ */
+ @Test
+ void testAllNoMatch_noExecution() throws Exception {
+ when(conditionEvaluatorManager.evaluate(eq("exprA"), any(), any())).thenReturn(false);
+ when(conditionEvaluatorManager.evaluate(eq("exprB"), any(), any())).thenReturn(false);
+
+ BranchConfiguration config = buildConfigWithTypes(
+ branchWithType("A", "exprA", false, "nodeA"),
+ branchWithType("B", "exprB", false, "nodeB")
+ );
+
+ List matched = branchExecutor.execute(ctx, config, recordingExecutor());
+
+ assertTrue(matched.isEmpty(), "无分支命中");
+ assertTrue(executedActions.isEmpty(), "无 action 执行");
+ }
+
+ /**
+ * 额外:条件评估异常 → 跳过该分支(短路求值,不当失败),继续下一个
+ */
+ @Test
+ void testConditionException_branchSkipped() throws Exception {
+ when(conditionEvaluatorManager.evaluate(eq("exprA"), any(), any()))
+ .thenThrow(new RuntimeException("evaluator error"));
+ when(conditionEvaluatorManager.evaluate(eq("exprB"), any(), any())).thenReturn(true);
+
+ BranchConfiguration config = buildConfigWithTypes(
+ branchWithType("A", "exprA", false, "nodeA"),
+ branchWithType("B", "exprB", false, "nodeB")
+ );
+
+ // 不抛异常
+ List matched = assertDoesNotThrow(
+ () -> branchExecutor.execute(ctx, config, recordingExecutor()));
+
+ assertFalse(executedActions.contains("nodeA"), "A 条件异常,不执行");
+ assertTrue(executedActions.contains("nodeB"), "B 条件正常,执行");
+ assertEquals(List.of("B"), matched);
+ }
+
+ /**
+ * 额外:else 分支不在最后 → validate() 抛 IllegalArgumentException
+ */
+ @Test
+ void testElseNotLast_validationFails() {
+ BranchConfiguration config = buildConfig(
+ elseBranch("else", "nodeElse"), // else 不在最后
+ branch("A", "expression", false, "nodeA")
+ );
+
+ assertThrows(IllegalArgumentException.class, config::validate,
+ "else 分支不在最后时,validate() 应抛出异常");
+ }
+
+ // --- helpers ---
+
+ private BranchExecutor.ActionExecutor recordingExecutor() {
+ return (ctx, branchName, actionNodeId) -> executedActions.add(actionNodeId);
+ }
+
+ private BranchConfiguration buildConfig(BranchConfiguration.BranchItem... items) {
+ BranchConfiguration config = new BranchConfiguration();
+ config.setBranches(Arrays.asList(items));
+ return config;
+ }
+
+ private BranchConfiguration buildConfigWithTypes(BranchConfiguration.BranchItem... items) {
+ return buildConfig(items);
+ }
+
+ /** 带 condition 的分支(condition.type = "expression") */
+ private BranchConfiguration.BranchItem branch(String name, String conditionType,
+ boolean executeAnyway, String... actionIds) {
+ return branchWithType(name, conditionType, executeAnyway, actionIds);
+ }
+
+ /** 带 condition 的分支(condition.type 可自定义) */
+ private BranchConfiguration.BranchItem branchWithType(String name, String conditionType,
+ boolean executeAnyway, String... actionIds) {
+ BranchConfiguration.BranchItem item = new BranchConfiguration.BranchItem();
+ item.setName(name);
+ item.setExecuteAnyway(executeAnyway);
+ item.setActions(Arrays.asList(actionIds));
+
+ BranchConfiguration.BranchCondition condition = new BranchConfiguration.BranchCondition();
+ condition.setType(conditionType);
+ condition.setConfig(MAPPER.createObjectNode().put("expression", "dummy"));
+ item.setCondition(condition);
+
+ return item;
+ }
+
+ /** else 分支(condition=null) */
+ private BranchConfiguration.BranchItem elseBranch(String name, String... actionIds) {
+ BranchConfiguration.BranchItem item = new BranchConfiguration.BranchItem();
+ item.setName(name);
+ item.setCondition(null);
+ item.setActions(Arrays.asList(actionIds));
+ item.setExecuteAnyway(false);
+ return item;
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/alarm/IotAlarmRecordApiImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/alarm/IotAlarmRecordApiImpl.java
new file mode 100644
index 00000000..45f3a482
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/alarm/IotAlarmRecordApiImpl.java
@@ -0,0 +1,62 @@
+package com.viewsh.module.iot.api.alarm;
+
+import com.viewsh.framework.common.pojo.CommonResult;
+import com.viewsh.module.iot.api.alarm.dto.IotAlarmClearReqDTO;
+import com.viewsh.module.iot.api.alarm.dto.IotAlarmTriggerReqDTO;
+import com.viewsh.module.iot.service.alarm.IotAlarmRecordService;
+import com.viewsh.module.iot.service.alarm.dto.AlarmStateTransitionRequest;
+import com.viewsh.module.iot.service.alarm.dto.AlarmTriggerRequest;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.annotation.Primary;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+
+import static com.viewsh.framework.common.pojo.CommonResult.success;
+
+/**
+ * IoT 告警记录 API 实现(供 rule 模块 FeignClient 调用)
+ *
+ * @author B6
+ */
+@RestController
+@Validated
+@Primary
+@Slf4j
+public class IotAlarmRecordApiImpl implements IotAlarmRecordApi {
+
+ @Resource
+ private IotAlarmRecordService alarmRecordService;
+
+ @Override
+ @PostMapping(PREFIX + "/trigger")
+ public CommonResult triggerAlarm(@RequestBody IotAlarmTriggerReqDTO reqDTO) {
+ AlarmTriggerRequest req = AlarmTriggerRequest.builder()
+ .deviceId(reqDTO.getDeviceId())
+ .alarmConfigId(reqDTO.getAlarmConfigId())
+ .severity(reqDTO.getSeverity())
+ .alarmName(reqDTO.getAlarmName())
+ .productId(reqDTO.getProductId())
+ .subsystemId(reqDTO.getSubsystemId())
+ .ruleChainId(reqDTO.getRuleChainId())
+ .sceneRuleId(reqDTO.getSceneRuleId())
+ .details(reqDTO.getDetails())
+ .build();
+ Long alarmId = alarmRecordService.triggerAlarm(req);
+ return success(alarmId);
+ }
+
+ @Override
+ @PostMapping(PREFIX + "/clear")
+ public CommonResult clearAlarm(@RequestBody IotAlarmClearReqDTO reqDTO) {
+ AlarmStateTransitionRequest req = AlarmStateTransitionRequest.builder()
+ .alarmId(reqDTO.getAlarmId())
+ .operator(reqDTO.getOperator())
+ .remark(reqDTO.getRemark())
+ .build();
+ alarmRecordService.clearAlarm(req);
+ return success(true);
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/DataRuleMigrationController.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/DataRuleMigrationController.java
new file mode 100644
index 00000000..68c43ad2
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/DataRuleMigrationController.java
@@ -0,0 +1,62 @@
+package com.viewsh.module.iot.migration;
+
+import com.viewsh.framework.common.pojo.CommonResult;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.Parameter;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import jakarta.annotation.Resource;
+import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.*;
+
+import java.util.List;
+
+import static com.viewsh.framework.common.pojo.CommonResult.success;
+
+/**
+ * B18 — DataRule 迁移 REST API
+ *
+ * 3 个端点(与 B17 对称):
+ *
+ * - POST /iot/migration/data-rule/dry-run — 预览迁移结果
+ * - POST /iot/migration/data-rule/execute — 执行迁移(幂等)
+ * - GET /iot/migration/data-rule/mapping — 查询已迁移映射关系
+ *
+ */
+@Tag(name = "管理后台 - IoT DataRule 迁移工具(B18)")
+@RestController
+@RequestMapping("/iot/migration/data-rule")
+@Validated
+public class DataRuleMigrationController {
+
+ @Resource
+ private DataRuleMigrator datRuleMigrator;
+
+ @PostMapping("/dry-run")
+ @Operation(summary = "预览 DataRule 迁移结果(不写库)",
+ description = "返回每条 v1 DataRule 将被转换成几个 v2 RuleChain 以及 chain 名称列表")
+ @PreAuthorize("@ss.hasPermission('iot:migration:data-rule:dry-run')")
+ public CommonResult> dryRun() {
+ return success(datRuleMigrator.dryRun());
+ }
+
+ @PostMapping("/execute")
+ @Operation(summary = "执行 DataRule 迁移(幂等)",
+ description = "将 v1 iot_data_rule + iot_data_sink 迁移为 v2 DAG RuleChain;已迁移的会被跳过")
+ @PreAuthorize("@ss.hasPermission('iot:migration:data-rule:execute')")
+ public CommonResult execute(
+ @RequestParam(value = "migrator", defaultValue = "system")
+ @Parameter(description = "操作人标识(审计用)", example = "admin")
+ String migrator) {
+ return success(datRuleMigrator.execute(migrator));
+ }
+
+ @GetMapping("/mapping")
+ @Operation(summary = "查询已迁移的映射关系",
+ description = "查询 iot_data_rule_migration 表中的所有映射记录")
+ @PreAuthorize("@ss.hasPermission('iot:migration:data-rule:mapping')")
+ public CommonResult> queryMapping() {
+ return success(datRuleMigrator.queryMappings());
+ }
+
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/DataRuleMigrator.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/DataRuleMigrator.java
new file mode 100644
index 00000000..34290d16
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/DataRuleMigrator.java
@@ -0,0 +1,265 @@
+package com.viewsh.module.iot.migration;
+
+import com.viewsh.module.iot.dal.dataobject.rule.IotDataRuleDO;
+import com.viewsh.module.iot.dal.dataobject.rule.IotDataSinkDO;
+import com.viewsh.module.iot.dal.mysql.rule.IotDataRuleMapper;
+import com.viewsh.module.iot.dal.mysql.rule.IotDataSinkMapper;
+import com.viewsh.module.iot.migration.mapping.DataRuleToChainMapper;
+import com.viewsh.module.iot.rule.controller.admin.vo.IotRuleChainSaveReqVO;
+import com.viewsh.module.iot.rule.service.IotRuleChainService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.dao.DuplicateKeyException;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.time.LocalDateTime;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * B18 — DataRule + DataSink → v2 DAG RuleChain 迁移主服务
+ *
+ * 策略:
+ *
+ * - 读取所有 v1 data_rule
+ * - 通过 {@link DataRuleToChainMapper} 转换为 v2 chain 请求
+ * - 调用 {@link IotRuleChainService#createRuleChain} 写入 v2 chain
+ * - 向映射表 {@code iot_data_rule_migration} 写入记录(幂等:UK 冲突时跳过)
+ *
+ *
+ * 幂等:映射表唯一键 {@code uk_old_source (old_rule_id, source_index, tenant_id)},
+ * 重复执行时 INSERT IGNORE 跳过已迁移记录。
+ *
+ *
映射表 DDL(参考):
+ *
{@code
+ * CREATE TABLE iot_data_rule_migration (
+ * id BIGINT PRIMARY KEY AUTO_INCREMENT,
+ * old_rule_id BIGINT NOT NULL,
+ * new_chain_id BIGINT NOT NULL,
+ * source_index INT DEFAULT 0,
+ * migrated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
+ * migrator VARCHAR(64),
+ * tenant_id BIGINT NOT NULL,
+ * UNIQUE KEY uk_old_source (old_rule_id, source_index, tenant_id)
+ * ) COMMENT='DataRule 迁移映射';
+ * }
+ */
+@Service
+@Slf4j
+@RequiredArgsConstructor
+public class DataRuleMigrator {
+
+ private final IotDataRuleMapper dataRuleMapper;
+ private final IotDataSinkMapper dataSinkMapper;
+ private final DataRuleToChainMapper chainMapper;
+ private final IotRuleChainService ruleChainService;
+ private final JdbcTemplate jdbcTemplate;
+
+ // -------------------------------------------------------------------------
+ // Public API
+ // -------------------------------------------------------------------------
+
+ /**
+ * 预览(dry-run):不实际写库,仅返回每条 v1 rule 会生成几个 v2 chain
+ *
+ * @return 预览结果列表
+ */
+ public List dryRun() {
+ List rules = dataRuleMapper.selectList(null);
+ if (rules.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ // 批量加载所有 sink
+ Map sinkMap = loadAllSinks();
+
+ List results = new ArrayList<>();
+ for (IotDataRuleDO rule : rules) {
+ List sinks = resolveSinks(rule, sinkMap);
+ List candidates = chainMapper.toChainCandidates(rule, sinks);
+ results.add(new DryRunResult(
+ rule.getId(),
+ rule.getName(),
+ candidates.size(),
+ candidates.stream()
+ .map(c -> c.saveReqVO().getName())
+ .collect(Collectors.toList())
+ ));
+ }
+ return results;
+ }
+
+ /**
+ * 执行迁移(幂等):已迁移的 rule+sourceIndex 会被跳过
+ *
+ * @param migrator 操作人标识(用于审计)
+ * @return 执行结果汇总
+ */
+ @Transactional(rollbackFor = Exception.class)
+ public ExecuteResult execute(String migrator) {
+ List rules = dataRuleMapper.selectList(null);
+ if (rules.isEmpty()) {
+ return new ExecuteResult(0, 0, 0, Collections.emptyList());
+ }
+
+ Map sinkMap = loadAllSinks();
+
+ int totalRules = rules.size();
+ int migratedCount = 0;
+ int skippedCount = 0;
+ List errors = new ArrayList<>();
+
+ for (IotDataRuleDO rule : rules) {
+ try {
+ List sinks = resolveSinks(rule, sinkMap);
+ List candidates = chainMapper.toChainCandidates(rule, sinks);
+
+ for (DataRuleToChainMapper.ChainCandidate candidate : candidates) {
+ boolean inserted = persistCandidate(rule.getId(), candidate, migrator);
+ if (inserted) {
+ migratedCount++;
+ } else {
+ skippedCount++;
+ }
+ }
+ } catch (Exception ex) {
+ log.error("[B18] 迁移 DataRule(id={}) 失败", rule.getId(), ex);
+ errors.add("rule=" + rule.getId() + ": " + ex.getMessage());
+ }
+ }
+
+ return new ExecuteResult(totalRules, migratedCount, skippedCount, errors);
+ }
+
+ /**
+ * 查询已迁移的映射关系
+ *
+ * @return 映射记录列表
+ */
+ public List queryMappings() {
+ String sql = "SELECT id, old_rule_id, new_chain_id, source_index, migrated_at, migrator, tenant_id "
+ + "FROM iot_data_rule_migration ORDER BY old_rule_id, source_index";
+ return jdbcTemplate.query(sql, (rs, rowNum) -> new MappingRecord(
+ rs.getLong("id"),
+ rs.getLong("old_rule_id"),
+ rs.getLong("new_chain_id"),
+ rs.getInt("source_index"),
+ rs.getObject("migrated_at", LocalDateTime.class),
+ rs.getString("migrator"),
+ rs.getLong("tenant_id")
+ ));
+ }
+
+ // -------------------------------------------------------------------------
+ // Private helpers
+ // -------------------------------------------------------------------------
+
+ /**
+ * 尝试持久化单个 ChainCandidate:
+ *
+ * - 调用 createRuleChain 写入 v2 chain
+ * - INSERT IGNORE 写映射表
+ *
+ *
+ * @return true=新增迁移;false=已存在(幂等跳过)
+ */
+ private boolean persistCandidate(Long oldRuleId,
+ DataRuleToChainMapper.ChainCandidate candidate,
+ String migrator) {
+ // 先检查映射表是否已存在(幂等判断)
+ Integer existing = jdbcTemplate.queryForObject(
+ "SELECT COUNT(1) FROM iot_data_rule_migration WHERE old_rule_id = ? AND source_index = ?",
+ Integer.class,
+ oldRuleId, candidate.sourceIndex());
+ if (existing != null && existing > 0) {
+ log.info("[B18] DataRule(id={}) sourceIndex={} 已迁移,跳过", oldRuleId, candidate.sourceIndex());
+ return false;
+ }
+
+ // 创建 v2 chain(使用空 links 列表,节点本身携带 DAG 语义信息)
+ IotRuleChainSaveReqVO req = candidate.saveReqVO();
+ if (req.getLinks() == null) {
+ req.setLinks(Collections.emptyList());
+ }
+ Long newChainId = ruleChainService.createRuleChain(req);
+
+ // 写映射表
+ try {
+ jdbcTemplate.update(
+ "INSERT INTO iot_data_rule_migration (old_rule_id, new_chain_id, source_index, migrator, tenant_id) "
+ + "VALUES (?, ?, ?, ?, ?)",
+ oldRuleId, newChainId, candidate.sourceIndex(), migrator, 0L /* tenantId 由多租户框架注入,此处占位 */
+ );
+ } catch (DuplicateKeyException e) {
+ // 并发场景下极少出现,直接忽略
+ log.warn("[B18] 映射表 UK 冲突,DataRule(id={}) sourceIndex={},忽略", oldRuleId, candidate.sourceIndex());
+ }
+
+ log.info("[B18] DataRule(id={}) sourceIndex={} → RuleChain(id={})", oldRuleId, candidate.sourceIndex(), newChainId);
+ return true;
+ }
+
+ /** 批量加载所有 Sink,构成 id→DO 索引 */
+ private Map loadAllSinks() {
+ List allSinks = dataSinkMapper.selectList(null);
+ Map map = new HashMap<>();
+ for (IotDataSinkDO sink : allSinks) {
+ map.put(sink.getId(), sink);
+ }
+ return map;
+ }
+
+ /** 根据 rule.sinkIds 从 map 中取出对应 Sink 列表 */
+ private List resolveSinks(IotDataRuleDO rule, Map sinkMap) {
+ if (rule.getSinkIds() == null || rule.getSinkIds().isEmpty()) {
+ return Collections.emptyList();
+ }
+ return rule.getSinkIds().stream()
+ .map(sinkMap::get)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
+ // -------------------------------------------------------------------------
+ // Result DTOs
+ // -------------------------------------------------------------------------
+
+ /**
+ * dry-run 结果:每条 v1 rule 会生成几个 v2 chain
+ */
+ public record DryRunResult(
+ Long oldRuleId,
+ String oldRuleName,
+ int willCreateChainCount,
+ List chainNames
+ ) {
+ }
+
+ /**
+ * 执行结果汇总
+ */
+ public record ExecuteResult(
+ int totalRules,
+ int migratedCount,
+ int skippedCount,
+ List errors
+ ) {
+ }
+
+ /**
+ * 映射表记录 DTO
+ */
+ public record MappingRecord(
+ Long id,
+ Long oldRuleId,
+ Long newChainId,
+ Integer sourceIndex,
+ LocalDateTime migratedAt,
+ String migrator,
+ Long tenantId
+ ) {
+ }
+
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/mapping/DataRuleToChainMapper.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/mapping/DataRuleToChainMapper.java
new file mode 100644
index 00000000..47ac8b45
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/migration/mapping/DataRuleToChainMapper.java
@@ -0,0 +1,314 @@
+package com.viewsh.module.iot.migration.mapping;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.viewsh.module.iot.dal.dataobject.rule.IotDataRuleDO;
+import com.viewsh.module.iot.dal.dataobject.rule.IotDataSinkDO;
+import com.viewsh.module.iot.dal.dataobject.rule.config.*;
+import com.viewsh.module.iot.enums.rule.IotDataSinkTypeEnum;
+import com.viewsh.module.iot.rule.controller.admin.vo.IotRuleChainSaveReqVO;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * v1 DataRule + DataSink → v2 DAG RuleChain 映射器
+ *
+ * 映射规则(评审 B7):
+ *
+ * - http → http_push
+ * - rocketmq → mq_push (provider=rocketmq)
+ * - kafka → mq_push (provider=kafka)
+ * - rabbitmq → mq_push (provider=rabbitmq)
+ * - redis → redis_push
+ * - tcp → tcp_push
+ *
+ *
+ * 多 sourceConfig 处理策略(评审 A5):
+ *
+ * - 若所有 sourceConfig 的 productId 相同且 method 相同 → 合并为 1 chain(identifiers 数组合并)
+ * - 否则 → 拆分为多个 chain,名称加 "-source{n}"
+ *
+ */
+@Component
+@Slf4j
+@RequiredArgsConstructor
+public class DataRuleToChainMapper {
+
+ private final ObjectMapper objectMapper;
+
+ /**
+ * 将 1 个 v1 DataRule + 对应 Sinks 转换为 1 或 N 个 v2 RuleChain 请求 VO
+ *
+ * @param rule v1 数据流转规则
+ * @param sinks v1 数据目的列表(与 rule.sinkIds 对应)
+ * @return 转换结果列表,每个元素对应一个 chain(含 sourceIndex)
+ */
+ public List toChainCandidates(IotDataRuleDO rule, List sinks) {
+ List sources = rule.getSourceConfigs();
+ if (sources == null || sources.isEmpty()) {
+ log.warn("[B18] DataRule(id={}) 无 sourceConfigs,跳过", rule.getId());
+ return Collections.emptyList();
+ }
+
+ // 尝试合并策略:所有 source 的 productId 和 method 相同
+ boolean mergeable = canMerge(sources);
+ if (mergeable) {
+ // 合并为单 chain
+ IotRuleChainSaveReqVO req = buildChainReq(rule, sources, sinks, null);
+ return List.of(new ChainCandidate(0, req));
+ }
+
+ // 降级:每个 source 单独生成一个 chain
+ List candidates = new ArrayList<>();
+ for (int i = 0; i < sources.size(); i++) {
+ IotDataRuleDO.SourceConfig src = sources.get(i);
+ IotRuleChainSaveReqVO req = buildChainReq(rule, List.of(src), sinks, i);
+ candidates.add(new ChainCandidate(i, req));
+ }
+ return candidates;
+ }
+
+ /**
+ * 判断多个 sourceConfig 是否可以合并(productId 相同 + method 相同)
+ */
+ public boolean canMerge(List sources) {
+ if (sources.size() <= 1) {
+ return true;
+ }
+ Long firstProductId = sources.get(0).getProductId();
+ String firstMethod = sources.get(0).getMethod();
+ return sources.stream().allMatch(s ->
+ Objects.equals(s.getProductId(), firstProductId)
+ && Objects.equals(s.getMethod(), firstMethod));
+ }
+
+ /**
+ * 构建单个 RuleChain 请求 VO
+ *
+ * 节点顺序约定(匹配 IotRuleChainServiceImpl.validateNoCycle 的临时 ID 规则):
+ *
+ * - index 0 → trigger 节点,临时 key = -1
+ * - index 1..N → action 节点,临时 key = -(2), -(3), ...
+ *
+ * links 的 sourceNodeId/targetNodeId 使用上述临时 key(负数)。
+ *
+ * @param rule v1 规则
+ * @param sources 用于本 chain 的 sourceConfigs(1 个或合并多个)
+ * @param sinks v1 sinks
+ * @param sourceIndex 若为拆分模式,传序号(0-based);合并模式传 null
+ */
+ private IotRuleChainSaveReqVO buildChainReq(
+ IotDataRuleDO rule,
+ List sources,
+ List sinks,
+ Integer sourceIndex) {
+
+ String chainName = sourceIndex == null
+ ? rule.getName()
+ : rule.getName() + "-source" + (sourceIndex + 1);
+
+ IotRuleChainSaveReqVO req = new IotRuleChainSaveReqVO();
+ req.setName(chainName);
+ req.setDescription(rule.getDescription());
+ req.setType("DATA");
+ req.setPriority(10);
+ req.setDebugMode(false);
+
+ // 取第一个 source 的 productId/deviceId 作为 chain 级别过滤
+ IotDataRuleDO.SourceConfig firstSrc = sources.get(0);
+ req.setProductId(firstSrc.getProductId());
+ req.setDeviceId(firstSrc.getDeviceId());
+
+ List nodes = new ArrayList<>();
+ List links = new ArrayList<>();
+
+ // --- Trigger 节点(index=0,临时 key=-1)---
+ IotRuleChainSaveReqVO.NodeVO triggerNode = buildTriggerNode(sources);
+ triggerNode.setPositionX(100);
+ triggerNode.setPositionY(200);
+ nodes.add(triggerNode); // index 0
+
+ // --- Action 节点(index=1..N,临时 key=-(i+1))---
+ int actionY = 100;
+ int nodeIndex = 1; // 从 1 开始,trigger 占 0
+ for (IotDataSinkDO sink : sinks) {
+ IotRuleChainSaveReqVO.NodeVO actionNode = buildActionNode(sink);
+ if (actionNode == null) {
+ log.warn("[B18] DataSink(id={}, type={}) 无法映射,跳过", sink.getId(), sink.getType());
+ continue;
+ }
+ actionNode.setPositionX(400);
+ actionNode.setPositionY(actionY);
+ actionY += 120;
+ nodes.add(actionNode);
+
+ // 建立 trigger → action 的 link(临时 key)
+ // trigger 的临时 key = -(0+1) = -1
+ // action 的临时 key = -(nodeIndex+1)
+ IotRuleChainSaveReqVO.LinkVO link = new IotRuleChainSaveReqVO.LinkVO();
+ link.setSourceNodeId(-1L);
+ link.setTargetNodeId(-(long) (nodeIndex + 1));
+ link.setRelationType("Success");
+ link.setSortOrder(nodeIndex - 1);
+ links.add(link);
+
+ nodeIndex++;
+ }
+
+ req.setNodes(nodes);
+ req.setLinks(links);
+ return req;
+ }
+
+ /**
+ * 构建 Trigger 节点(device.property.trigger 或 device.event.trigger)
+ *
+ * 若 sources 均为同 productId,则 identifiers 合并为数组(JSON config)
+ */
+ private IotRuleChainSaveReqVO.NodeVO buildTriggerNode(List sources) {
+ IotRuleChainSaveReqVO.NodeVO node = new IotRuleChainSaveReqVO.NodeVO();
+ node.setCategory("trigger");
+ node.setName("设备数据触发器");
+
+ // 根据 method 决定 trigger 类型
+ String method = sources.get(0).getMethod();
+ String triggerType;
+ if (method != null && method.toLowerCase().contains("event")) {
+ triggerType = "device.event.trigger";
+ } else {
+ triggerType = "device.property.trigger";
+ }
+ node.setType(triggerType);
+
+ // 收集 identifiers
+ List identifiers = sources.stream()
+ .map(IotDataRuleDO.SourceConfig::getIdentifier)
+ .filter(Objects::nonNull)
+ .distinct()
+ .collect(Collectors.toList());
+
+ // 构建 configuration JSON
+ Map config = new LinkedHashMap<>();
+ config.put("productId", sources.get(0).getProductId());
+ config.put("deviceId", sources.get(0).getDeviceId());
+ config.put("method", method);
+ if (!identifiers.isEmpty()) {
+ config.put("identifiers", identifiers);
+ }
+
+ node.setConfiguration(toJson(config));
+ return node;
+ }
+
+ /**
+ * 将 v1 DataSink 映射为 v2 Action 节点
+ *
+ * @return null 表示无法映射(未知类型),调用方跳过
+ */
+ IotRuleChainSaveReqVO.NodeVO buildActionNode(IotDataSinkDO sink) {
+ if (sink.getType() == null || sink.getConfig() == null) {
+ return null;
+ }
+
+ IotRuleChainSaveReqVO.NodeVO node = new IotRuleChainSaveReqVO.NodeVO();
+ node.setCategory("action");
+ node.setName(sink.getName() != null ? sink.getName() : "推送动作");
+
+ int type = sink.getType();
+ Map config = new LinkedHashMap<>();
+
+ if (type == IotDataSinkTypeEnum.HTTP.getType()) {
+ // http → http_push
+ node.setType("http_push");
+ IotDataSinkHttpConfig http = (IotDataSinkHttpConfig) sink.getConfig();
+ config.put("url", http.getUrl());
+ config.put("method", http.getMethod());
+ config.put("headers", http.getHeaders());
+ config.put("body", http.getBody());
+
+ } else if (type == IotDataSinkTypeEnum.ROCKETMQ.getType()) {
+ // rocketmq → mq_push (provider=rocketmq)
+ node.setType("mq_push");
+ IotDataSinkRocketMQConfig rmq = (IotDataSinkRocketMQConfig) sink.getConfig();
+ config.put("provider", "rocketmq");
+ config.put("nameServer", rmq.getNameServer());
+ config.put("topic", rmq.getTopic());
+ config.put("tag", rmq.getTags());
+ config.put("group", rmq.getGroup());
+
+ } else if (type == IotDataSinkTypeEnum.KAFKA.getType()) {
+ // kafka → mq_push (provider=kafka)
+ node.setType("mq_push");
+ IotDataSinkKafkaConfig kafka = (IotDataSinkKafkaConfig) sink.getConfig();
+ config.put("provider", "kafka");
+ config.put("bootstrapServers", kafka.getBootstrapServers());
+ config.put("topic", kafka.getTopic());
+
+ } else if (type == IotDataSinkTypeEnum.RABBITMQ.getType()) {
+ // rabbitmq → mq_push (provider=rabbitmq)
+ node.setType("mq_push");
+ IotDataSinkRabbitMQConfig rmq = (IotDataSinkRabbitMQConfig) sink.getConfig();
+ config.put("provider", "rabbitmq");
+ config.put("host", rmq.getHost());
+ config.put("port", rmq.getPort());
+ config.put("exchange", rmq.getExchange());
+ config.put("routingKey", rmq.getRoutingKey());
+
+ } else if (type == IotDataSinkTypeEnum.REDIS.getType()) {
+ // redis → redis_push
+ node.setType("redis_push");
+ IotDataSinkRedisConfig redis = (IotDataSinkRedisConfig) sink.getConfig();
+ config.put("host", redis.getHost());
+ config.put("port", redis.getPort());
+ config.put("channel", redis.getTopic());
+ config.put("key", redis.getTopic());
+ config.put("dataStructure", redis.getDataStructure());
+
+ } else if (type == IotDataSinkTypeEnum.TCP.getType()) {
+ // tcp → tcp_push
+ node.setType("tcp_push");
+ IotDataSinkTcpConfig tcp = (IotDataSinkTcpConfig) sink.getConfig();
+ config.put("host", tcp.getHost());
+ config.put("port", tcp.getPort());
+
+ } else {
+ // 未知类型(WebSocket/MQTT/DATABASE 暂不支持)
+ log.warn("[B18] DataSink type={} 暂不支持迁移,跳过 sinkId={}", type, sink.getId());
+ return null;
+ }
+
+ node.setConfiguration(toJson(config));
+ return node;
+ }
+
+ /**
+ * 将 Map 序列化为 JSON 字符串(迁移工具内部使用)
+ */
+ private String toJson(Map map) {
+ try {
+ return objectMapper.writeValueAsString(map);
+ } catch (Exception e) {
+ log.error("[B18] JSON 序列化失败", e);
+ return "{}";
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // 内部 DTO
+ // -------------------------------------------------------------------------
+
+ /**
+ * 单次迁移候选(对应一个将要创建的 v2 chain)
+ */
+ public record ChainCandidate(
+ /** 对应 v1 sourceConfigs 的序号(合并模式固定为 0) */
+ int sourceIndex,
+ /** 待创建的 chain 请求 VO */
+ IotRuleChainSaveReqVO saveReqVO
+ ) {
+ }
+
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/migration/DataRuleMigratorTest.java b/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/migration/DataRuleMigratorTest.java
new file mode 100644
index 00000000..0449e8a6
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/migration/DataRuleMigratorTest.java
@@ -0,0 +1,427 @@
+package com.viewsh.module.iot.migration;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.viewsh.framework.test.core.ut.BaseMockitoUnitTest;
+import com.viewsh.module.iot.dal.dataobject.rule.IotDataRuleDO;
+import com.viewsh.module.iot.dal.dataobject.rule.IotDataSinkDO;
+import com.viewsh.module.iot.dal.dataobject.rule.config.*;
+import com.viewsh.module.iot.dal.mysql.rule.IotDataRuleMapper;
+import com.viewsh.module.iot.dal.mysql.rule.IotDataSinkMapper;
+import com.viewsh.module.iot.enums.rule.IotDataSinkTypeEnum;
+import com.viewsh.module.iot.migration.mapping.DataRuleToChainMapper;
+import com.viewsh.module.iot.rule.controller.admin.vo.IotRuleChainSaveReqVO;
+import com.viewsh.module.iot.rule.service.IotRuleChainService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.springframework.dao.DuplicateKeyException;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.RowMapper;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * B18 — {@link DataRuleMigrator} 单元测试
+ *
+ * 6 个测试用例:
+ *
+ * - single_source_http — 1 source + 1 http sink → 1 chain (Trigger + HttpPush)
+ * - multi_sink — 1 source + 5 sinks → 1 chain (Trigger + 5 Actions)
+ * - multi_source_mergeable — 2 sources 同 productId + method → 合并为 1 chain
+ * - multi_source_split — 2 sources 不同 productId → 拆为 2 chain
+ * - mq_provider_preserve — sink=rocketmq → mq_push, config.provider="rocketmq"
+ * - idempotent — 重跑 → 跳过(映射表 UK 冲突不抛错)
+ *
+ */
+class DataRuleMigratorTest extends BaseMockitoUnitTest {
+
+ @Mock
+ private IotDataRuleMapper dataRuleMapper;
+
+ @Mock
+ private IotDataSinkMapper dataSinkMapper;
+
+ @Mock
+ private IotRuleChainService ruleChainService;
+
+ @Mock
+ private JdbcTemplate jdbcTemplate;
+
+ private DataRuleToChainMapper chainMapper;
+ private DataRuleMigrator migrator;
+
+ @BeforeEach
+ void setUp() {
+ chainMapper = new DataRuleToChainMapper(new ObjectMapper());
+ migrator = new DataRuleMigrator(dataRuleMapper, dataSinkMapper, chainMapper, ruleChainService, jdbcTemplate);
+ }
+
+ // =========================================================================
+ // 用例 1:single_source_http
+ // =========================================================================
+ @Test
+ @DisplayName("single_source_http: 1 source + 1 http sink → 1 chain with Trigger + HttpPush action")
+ void testSingleSourceHttp() {
+ // Given
+ Long ruleId = 1L;
+ Long sinkId = 100L;
+
+ IotDataRuleDO rule = buildRule(ruleId, "HTTP 规则",
+ List.of(buildSource(10L, 101L, "testProp", "property")),
+ List.of(sinkId));
+
+ IotDataSinkDO httpSink = buildSink(sinkId, "HTTP推送", IotDataSinkTypeEnum.HTTP.getType(), buildHttpConfig());
+
+ mockMappers(List.of(rule), List.of(httpSink));
+ mockMigrationTableEmpty(ruleId, 0);
+ when(ruleChainService.createRuleChain(any())).thenReturn(200L);
+ mockInsertMapping();
+
+ // When
+ DataRuleMigrator.ExecuteResult result = migrator.execute("test");
+
+ // Then
+ assertThat(result.totalRules()).isEqualTo(1);
+ assertThat(result.migratedCount()).isEqualTo(1);
+ assertThat(result.skippedCount()).isEqualTo(0);
+ assertThat(result.errors()).isEmpty();
+
+ // 验证 createRuleChain 被调用,且 chain type=DATA
+ ArgumentCaptor captor = ArgumentCaptor.forClass(IotRuleChainSaveReqVO.class);
+ verify(ruleChainService).createRuleChain(captor.capture());
+ IotRuleChainSaveReqVO req = captor.getValue();
+ assertThat(req.getType()).isEqualTo("DATA");
+ // trigger 节点
+ IotRuleChainSaveReqVO.NodeVO trigger = req.getNodes().stream()
+ .filter(n -> "trigger".equals(n.getCategory())).findFirst().orElseThrow();
+ assertThat(trigger.getType()).startsWith("device.");
+ // action 节点
+ IotRuleChainSaveReqVO.NodeVO action = req.getNodes().stream()
+ .filter(n -> "action".equals(n.getCategory())).findFirst().orElseThrow();
+ assertThat(action.getType()).isEqualTo("http_push");
+ }
+
+ // =========================================================================
+ // 用例 2:multi_sink
+ // =========================================================================
+ @Test
+ @DisplayName("multi_sink: 1 source + 5 sinks → 1 chain with Trigger + 5 action nodes")
+ void testMultiSink() {
+ // Given
+ Long ruleId = 2L;
+ List sinkIds = List.of(201L, 202L, 203L, 204L, 205L);
+
+ IotDataRuleDO rule = buildRule(ruleId, "多 Sink 规则",
+ List.of(buildSource(10L, 0L, null, "property")),
+ sinkIds);
+
+ List sinks = List.of(
+ buildSink(201L, "HTTP", IotDataSinkTypeEnum.HTTP.getType(), buildHttpConfig()),
+ buildSink(202L, "RocketMQ", IotDataSinkTypeEnum.ROCKETMQ.getType(), buildRocketMQConfig()),
+ buildSink(203L, "Kafka", IotDataSinkTypeEnum.KAFKA.getType(), buildKafkaConfig()),
+ buildSink(204L, "RabbitMQ", IotDataSinkTypeEnum.RABBITMQ.getType(), buildRabbitMQConfig()),
+ buildSink(205L, "Redis", IotDataSinkTypeEnum.REDIS.getType(), buildRedisConfig())
+ );
+
+ mockMappers(List.of(rule), sinks);
+ mockMigrationTableEmpty(ruleId, 0);
+ when(ruleChainService.createRuleChain(any())).thenReturn(300L);
+ mockInsertMapping();
+
+ // When
+ DataRuleMigrator.ExecuteResult result = migrator.execute("test");
+
+ // Then
+ assertThat(result.migratedCount()).isEqualTo(1);
+ ArgumentCaptor captor = ArgumentCaptor.forClass(IotRuleChainSaveReqVO.class);
+ verify(ruleChainService).createRuleChain(captor.capture());
+ IotRuleChainSaveReqVO req = captor.getValue();
+
+ long triggerCount = req.getNodes().stream().filter(n -> "trigger".equals(n.getCategory())).count();
+ long actionCount = req.getNodes().stream().filter(n -> "action".equals(n.getCategory())).count();
+ assertThat(triggerCount).isEqualTo(1);
+ assertThat(actionCount).isEqualTo(5);
+ }
+
+ // =========================================================================
+ // 用例 3:multi_source_mergeable
+ // =========================================================================
+ @Test
+ @DisplayName("multi_source_mergeable: 2 sources 同 productId + method → 合并为 1 chain")
+ void testMultiSourceMergeable() {
+ // Given - same productId + method
+ Long ruleId = 3L;
+ Long sinkId = 301L;
+
+ IotDataRuleDO rule = buildRule(ruleId, "可合并多源",
+ List.of(
+ buildSource(50L, 0L, "temp", "property"),
+ buildSource(50L, 0L, "humidity", "property")
+ ),
+ List.of(sinkId));
+
+ IotDataSinkDO sink = buildSink(sinkId, "HTTP", IotDataSinkTypeEnum.HTTP.getType(), buildHttpConfig());
+
+ mockMappers(List.of(rule), List.of(sink));
+ mockMigrationTableEmpty(ruleId, 0);
+ when(ruleChainService.createRuleChain(any())).thenReturn(400L);
+ mockInsertMapping();
+
+ // When
+ DataRuleMigrator.ExecuteResult result = migrator.execute("test");
+
+ // Then: only 1 chain created (merged)
+ assertThat(result.migratedCount()).isEqualTo(1);
+ verify(ruleChainService, times(1)).createRuleChain(any());
+
+ // 验证 trigger config 中 identifiers 包含两个值
+ ArgumentCaptor captor = ArgumentCaptor.forClass(IotRuleChainSaveReqVO.class);
+ verify(ruleChainService).createRuleChain(captor.capture());
+ IotRuleChainSaveReqVO req = captor.getValue();
+ IotRuleChainSaveReqVO.NodeVO trigger = req.getNodes().stream()
+ .filter(n -> "trigger".equals(n.getCategory())).findFirst().orElseThrow();
+ assertThat(trigger.getConfiguration()).contains("temp").contains("humidity");
+ }
+
+ // =========================================================================
+ // 用例 4:multi_source_split
+ // =========================================================================
+ @Test
+ @DisplayName("multi_source_split: 2 sources 不同 productId → 拆为 2 chain")
+ void testMultiSourceSplit() {
+ // Given - different productId
+ Long ruleId = 4L;
+ Long sinkId = 401L;
+
+ IotDataRuleDO rule = buildRule(ruleId, "拆分多源",
+ List.of(
+ buildSource(60L, 0L, "temp", "property"),
+ buildSource(70L, 0L, "temp", "property")
+ ),
+ List.of(sinkId));
+
+ IotDataSinkDO sink = buildSink(sinkId, "HTTP", IotDataSinkTypeEnum.HTTP.getType(), buildHttpConfig());
+
+ mockMappers(List.of(rule), List.of(sink));
+ // source_index 0 未迁移,source_index 1 未迁移
+ mockMigrationTableEmpty(ruleId, 0);
+ mockMigrationTableEmpty(ruleId, 1);
+ when(ruleChainService.createRuleChain(any())).thenReturn(500L).thenReturn(501L);
+ mockInsertMapping();
+
+ // When
+ DataRuleMigrator.ExecuteResult result = migrator.execute("test");
+
+ // Then: 2 chains created (split)
+ assertThat(result.migratedCount()).isEqualTo(2);
+ verify(ruleChainService, times(2)).createRuleChain(any());
+
+ // 验证 chain 名称带序号
+ ArgumentCaptor captor = ArgumentCaptor.forClass(IotRuleChainSaveReqVO.class);
+ verify(ruleChainService, times(2)).createRuleChain(captor.capture());
+ List reqs = captor.getAllValues();
+ assertThat(reqs.get(0).getName()).endsWith("-source1");
+ assertThat(reqs.get(1).getName()).endsWith("-source2");
+ }
+
+ // =========================================================================
+ // 用例 5:mq_provider_preserve
+ // =========================================================================
+ @Test
+ @DisplayName("mq_provider_preserve: sink=rocketmq → action type=mq_push, config.provider=rocketmq")
+ void testMqProviderPreserve() {
+ // Given
+ Long ruleId = 5L;
+ Long sinkId = 501L;
+
+ IotDataRuleDO rule = buildRule(ruleId, "RocketMQ 规则",
+ List.of(buildSource(10L, 0L, "temp", "property")),
+ List.of(sinkId));
+
+ IotDataSinkDO rmqSink = buildSink(sinkId, "RMQ", IotDataSinkTypeEnum.ROCKETMQ.getType(), buildRocketMQConfig());
+
+ mockMappers(List.of(rule), List.of(rmqSink));
+ mockMigrationTableEmpty(ruleId, 0);
+ when(ruleChainService.createRuleChain(any())).thenReturn(600L);
+ mockInsertMapping();
+
+ // When
+ migrator.execute("test");
+
+ // Then: action type=mq_push + provider=rocketmq
+ ArgumentCaptor captor = ArgumentCaptor.forClass(IotRuleChainSaveReqVO.class);
+ verify(ruleChainService).createRuleChain(captor.capture());
+ IotRuleChainSaveReqVO req = captor.getValue();
+
+ IotRuleChainSaveReqVO.NodeVO action = req.getNodes().stream()
+ .filter(n -> "action".equals(n.getCategory())).findFirst().orElseThrow();
+ assertThat(action.getType()).isEqualTo("mq_push");
+ assertThat(action.getConfiguration()).contains("\"provider\"").contains("rocketmq");
+ }
+
+ // =========================================================================
+ // 用例 6:idempotent
+ // =========================================================================
+ @Test
+ @DisplayName("idempotent: 重跑时映射表已存在 → 跳过,不重复创建 chain")
+ void testIdempotent() {
+ // Given
+ Long ruleId = 6L;
+ Long sinkId = 601L;
+
+ IotDataRuleDO rule = buildRule(ruleId, "幂等测试",
+ List.of(buildSource(10L, 0L, "temp", "property")),
+ List.of(sinkId));
+
+ IotDataSinkDO sink = buildSink(sinkId, "HTTP", IotDataSinkTypeEnum.HTTP.getType(), buildHttpConfig());
+
+ mockMappers(List.of(rule), List.of(sink));
+
+ // 模拟:映射表已存在该记录(COUNT=1)
+ when(jdbcTemplate.queryForObject(
+ contains("COUNT(1)"),
+ eq(Integer.class),
+ eq(ruleId), eq(0)
+ )).thenReturn(1);
+
+ // When
+ DataRuleMigrator.ExecuteResult result = migrator.execute("test");
+
+ // Then: skipped=1, createRuleChain 不被调用
+ assertThat(result.skippedCount()).isEqualTo(1);
+ assertThat(result.migratedCount()).isEqualTo(0);
+ verify(ruleChainService, never()).createRuleChain(any());
+ }
+
+ // =========================================================================
+ // Helper: DataRuleToChainMapper 直接测试(canMerge 逻辑)
+ // =========================================================================
+ @Test
+ @DisplayName("canMerge: 相同 productId + method 可合并")
+ void testCanMerge_same() {
+ DataRuleToChainMapper m = new DataRuleToChainMapper(new ObjectMapper());
+ List sources = List.of(
+ buildSource(1L, 0L, "a", "property"),
+ buildSource(1L, 0L, "b", "property")
+ );
+ assertThat(m.canMerge(sources)).isTrue();
+ }
+
+ @Test
+ @DisplayName("canMerge: 不同 productId 不可合并")
+ void testCanMerge_diff() {
+ DataRuleToChainMapper m = new DataRuleToChainMapper(new ObjectMapper());
+ List sources = List.of(
+ buildSource(1L, 0L, "a", "property"),
+ buildSource(2L, 0L, "a", "property")
+ );
+ assertThat(m.canMerge(sources)).isFalse();
+ }
+
+ // =========================================================================
+ // Mock helpers
+ // =========================================================================
+
+ private void mockMappers(List rules, List sinks) {
+ when(dataRuleMapper.selectList(null)).thenReturn(rules);
+ when(dataSinkMapper.selectList(null)).thenReturn(sinks);
+ }
+
+ /** 模拟映射表 COUNT 查询返回 0(未迁移) */
+ private void mockMigrationTableEmpty(Long ruleId, int sourceIndex) {
+ when(jdbcTemplate.queryForObject(
+ contains("COUNT(1)"),
+ eq(Integer.class),
+ eq(ruleId), eq(sourceIndex)
+ )).thenReturn(0);
+ }
+
+ /** 模拟 INSERT 映射表(正常写入) */
+ private void mockInsertMapping() {
+ when(jdbcTemplate.update(anyString(), any(), any(), any(), any(), any())).thenReturn(1);
+ }
+
+ // =========================================================================
+ // DO builders
+ // =========================================================================
+
+ private IotDataRuleDO buildRule(Long id, String name,
+ List sources,
+ List sinkIds) {
+ IotDataRuleDO rule = new IotDataRuleDO();
+ rule.setId(id);
+ rule.setName(name);
+ rule.setSourceConfigs(sources);
+ rule.setSinkIds(sinkIds);
+ return rule;
+ }
+
+ private IotDataRuleDO.SourceConfig buildSource(Long productId, Long deviceId, String identifier, String method) {
+ IotDataRuleDO.SourceConfig src = new IotDataRuleDO.SourceConfig();
+ src.setProductId(productId);
+ src.setDeviceId(deviceId);
+ src.setIdentifier(identifier);
+ src.setMethod(method);
+ return src;
+ }
+
+ private IotDataSinkDO buildSink(Long id, String name, Integer type, IotAbstractDataSinkConfig config) {
+ IotDataSinkDO sink = new IotDataSinkDO();
+ sink.setId(id);
+ sink.setName(name);
+ sink.setType(type);
+ sink.setConfig(config);
+ return sink;
+ }
+
+ // Config builders
+ private IotDataSinkHttpConfig buildHttpConfig() {
+ IotDataSinkHttpConfig cfg = new IotDataSinkHttpConfig();
+ cfg.setUrl("http://example.com/api");
+ cfg.setMethod("POST");
+ cfg.setBody("{\"msg\":\"hello\"}");
+ return cfg;
+ }
+
+ private IotDataSinkRocketMQConfig buildRocketMQConfig() {
+ IotDataSinkRocketMQConfig cfg = new IotDataSinkRocketMQConfig();
+ cfg.setNameServer("127.0.0.1:9876");
+ cfg.setTopic("iot-topic");
+ cfg.setTags("tag1");
+ cfg.setGroup("iot-group");
+ return cfg;
+ }
+
+ private IotDataSinkKafkaConfig buildKafkaConfig() {
+ IotDataSinkKafkaConfig cfg = new IotDataSinkKafkaConfig();
+ cfg.setBootstrapServers("127.0.0.1:9092");
+ cfg.setTopic("iot-kafka-topic");
+ return cfg;
+ }
+
+ private IotDataSinkRabbitMQConfig buildRabbitMQConfig() {
+ IotDataSinkRabbitMQConfig cfg = new IotDataSinkRabbitMQConfig();
+ cfg.setHost("127.0.0.1");
+ cfg.setPort(5672);
+ cfg.setExchange("iot-exchange");
+ cfg.setRoutingKey("iot.key");
+ return cfg;
+ }
+
+ private IotDataSinkRedisConfig buildRedisConfig() {
+ IotDataSinkRedisConfig cfg = new IotDataSinkRedisConfig();
+ cfg.setHost("127.0.0.1");
+ cfg.setPort(6379);
+ cfg.setTopic("iot-channel");
+ return cfg;
+ }
+
+}