com.viewsh
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/dal/dataobject/enums/RuleNodeCategory.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/dal/dataobject/enums/RuleNodeCategory.java
index 646e3588..632c9845 100644
--- a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/dal/dataobject/enums/RuleNodeCategory.java
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/dal/dataobject/enums/RuleNodeCategory.java
@@ -22,4 +22,13 @@ public enum RuleNodeCategory {
private final String label;
+ public static RuleNodeCategory of(String value) {
+ for (RuleNodeCategory c : values()) {
+ if (c.value.equals(value)) {
+ return c;
+ }
+ }
+ return null;
+ }
+
}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/ChainIndex.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/ChainIndex.java
new file mode 100644
index 00000000..c2d5a8c2
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/ChainIndex.java
@@ -0,0 +1,100 @@
+package com.viewsh.module.iot.rule.engine;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 规则链三层绑定索引(subsystemId, productId, deviceId)+ 匹配去重。
+ *
+ * 按"最左匹配"组合预建 4 种索引 key(评审 B3 + §十一-B):
+ *
+ * key1: (sub, prod, dev) — 最细粒度:指定设备专属规则
+ * key2: (sub, prod, *) — 产品级
+ * key3: (sub, *, *) — 子系统级
+ * key4: (*, *, *) — 全局规则
+ *
+ *
+ * {@link #match(Long, Long, Long, Long)} 返回去重 + 按 priority ASC 排序后的规则链列表。
+ *
+ *
线程安全:加载 / 更新用整体替换,读取无锁(ConcurrentHashMap)。
+ * 运行时 B8 负责增量驱逐 + 重建。
+ */
+@Slf4j
+public class ChainIndex {
+
+ /** tenantId 前缀 → (key → chains);key 由 (sub,prod,dev) 组合,"_" 表示通配 */
+ private final Map>> indexPerTenant = new ConcurrentHashMap<>();
+
+ /**
+ * 全量加载 / 重建某租户的索引(B8 调用)。
+ */
+ public void load(Long tenantId, Collection chains) {
+ Map> fresh = new HashMap<>();
+ for (CompiledRuleChain c : chains) {
+ String k = key(c.getSubsystemId(), c.getProductId(), c.getDeviceId());
+ fresh.computeIfAbsent(k, x -> new ArrayList<>()).add(c);
+ }
+ // 冻结为不可变,保护读端
+ Map> frozen = new HashMap<>();
+ fresh.forEach((k, list) -> frozen.put(k, List.copyOf(list)));
+ indexPerTenant.put(prefix(tenantId), Collections.unmodifiableMap(frozen));
+ log.info("[ChainIndex] tenant={} 载入 {} 条规则链,{} 个 key 组合", tenantId, chains.size(), frozen.size());
+ }
+
+ public void evictTenant(Long tenantId) {
+ indexPerTenant.remove(prefix(tenantId));
+ }
+
+ /**
+ * 按三层 ID 查出所有匹配规则链。命中 4 种 key 组合,去重后按 priority ASC 排序。
+ */
+ public List match(Long tenantId, Long subsystemId, Long productId, Long deviceId) {
+ Map> tenantIndex = tenantIndex(tenantId);
+ if (tenantIndex.isEmpty()) return Collections.emptyList();
+
+ List all = new ArrayList<>();
+ appendKey(all, tenantIndex, subsystemId, productId, deviceId);
+ appendKey(all, tenantIndex, subsystemId, productId, null);
+ appendKey(all, tenantIndex, subsystemId, null, null);
+ appendKey(all, tenantIndex, null, null, null);
+
+ if (all.isEmpty()) return Collections.emptyList();
+
+ // 【评审 B3】LinkedHashMap 去重,保留首个出现的(按上面顺序,更具体的先匹配)
+ Map dedup = new LinkedHashMap<>();
+ for (CompiledRuleChain c : all) {
+ dedup.putIfAbsent(c.getId(), c);
+ }
+
+ return dedup.values().stream()
+ .sorted(Comparator.comparingInt(CompiledRuleChain::getPriority))
+ .toList();
+ }
+
+ /** 单元测试辅助:查询当前索引条目数 */
+ public int sizeOf(Long tenantId) {
+ return tenantIndex(tenantId).values().stream().mapToInt(List::size).sum();
+ }
+
+ private Map> tenantIndex(Long tenantId) {
+ Map> m = indexPerTenant.get(prefix(tenantId));
+ return m == null ? Collections.emptyMap() : m;
+ }
+
+ private void appendKey(List target,
+ Map> tenantIndex,
+ Long sub, Long prod, Long dev) {
+ List list = tenantIndex.get(key(sub, prod, dev));
+ if (list != null) target.addAll(list);
+ }
+
+ private static String prefix(Long tenantId) {
+ return tenantId == null ? "_" : tenantId.toString();
+ }
+
+ private static String key(Long sub, Long prod, Long dev) {
+ return (sub == null ? "_" : sub) + ":" + (prod == null ? "_" : prod) + ":" + (dev == null ? "_" : dev);
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledLink.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledLink.java
new file mode 100644
index 00000000..be99ee14
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledLink.java
@@ -0,0 +1,21 @@
+package com.viewsh.module.iot.rule.engine;
+
+import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType;
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * 编译后规则链连线(rule_link 的内存态)。
+ */
+@Data
+@Builder
+public class CompiledLink {
+
+ private Long id;
+ private String sourceNodeId;
+ private String targetNodeId;
+ private RuleLinkRelationType relationType;
+ /** 连线上的附加表达式条件(可选,JSON 原文;解析延后到节点内) */
+ private String condition;
+ private int sortOrder;
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledNode.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledNode.java
new file mode 100644
index 00000000..06f2ed2c
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledNode.java
@@ -0,0 +1,22 @@
+package com.viewsh.module.iot.rule.engine;
+
+import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory;
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * 编译后规则节点(rule_node 的内存态)。
+ *
+ * 节点执行由 {@link NodeProviderRegistry} 按 (category, type) 分发到对应的 {@link NodeProvider}。
+ */
+@Data
+@Builder
+public class CompiledNode {
+
+ private String id;
+ private String name;
+ private RuleNodeCategory category;
+ private String type;
+ /** 节点配置 JSON 原文(由 Provider 自行解析) */
+ private String configuration;
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledRuleChain.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledRuleChain.java
new file mode 100644
index 00000000..f4dd6f26
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledRuleChain.java
@@ -0,0 +1,51 @@
+package com.viewsh.module.iot.rule.engine;
+
+import lombok.Builder;
+import lombok.Data;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 编译后规则链(rule_chain + rule_node + rule_link 的内存态)。
+ *
+ *
由 {@code RuleChainCompiler} 从 DO 构造,供 DagExecutor 快速遍历:
+ *
+ * - {@link #firstNode}:唯一 Trigger 入口(B2 Service 层保证单 Trigger)
+ * - {@link #nodesById}:按 id 索引所有节点
+ * - {@link #outgoingByNode}:每个节点的出边列表(已按 sortOrder 排序)
+ *
+ */
+@Data
+@Builder
+public class CompiledRuleChain {
+
+ private Long id;
+ private String name;
+ private Long tenantId;
+
+ /** 评审 A5:默认 100,ASC 排序 */
+ private int priority;
+ /** 评审 B9:乐观锁 / 多实例缓存一致性校验 */
+ private long version;
+ private boolean debugMode;
+
+ /** 三层绑定(评审 §十一-B) */
+ private Long subsystemId;
+ private Long productId;
+ private Long deviceId;
+
+ private CompiledNode firstNode;
+ private Map nodesById;
+ private Map> outgoingByNode;
+
+ public List outgoingOf(String nodeId) {
+ if (outgoingByNode == null) return Collections.emptyList();
+ return outgoingByNode.getOrDefault(nodeId, Collections.emptyList());
+ }
+
+ public CompiledNode nodeById(String nodeId) {
+ return nodesById == null ? null : nodesById.get(nodeId);
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DagExecutor.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DagExecutor.java
new file mode 100644
index 00000000..c436cd41
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DagExecutor.java
@@ -0,0 +1,105 @@
+package com.viewsh.module.iot.rule.engine;
+
+import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType;
+import com.viewsh.module.iot.rule.engine.exception.RuleChainException;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Timer;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.*;
+
+/**
+ * DAG 规则链遍历执行器。BFS 按 relation_type 选择 outgoing links。
+ *
+ * 约束(上游保证,本执行器信任):
+ *
+ * - B2 Service 层保证 DAG 无环(DFS 校验)
+ * - B2 Service 层保证单 Trigger(CompiledRuleChain.firstNode 是唯一入口)
+ * - relation_type 是封闭 6 值枚举(评审 B4)
+ *
+ *
+ * 性能:单节点执行记录 Micrometer Timer,tag 含 chainId + nodeType。
+ * 指标基数上限由规则链总数管控(评审 11.6 限制规则链 ≤ 500)。
+ */
+@Slf4j
+public class DagExecutor {
+
+ /** 单次遍历最大节点数(兜底防止脏数据绕过无环校验) */
+ private static final int MAX_NODES_PER_EXECUTION = 1000;
+
+ private final NodeProviderRegistry providerRegistry;
+ private final MeterRegistry meterRegistry;
+
+ public DagExecutor(NodeProviderRegistry providerRegistry, MeterRegistry meterRegistry) {
+ this.providerRegistry = providerRegistry;
+ this.meterRegistry = meterRegistry;
+ }
+
+ public void execute(CompiledRuleChain chain, RuleContext ctx) {
+ CompiledNode entry = chain.getFirstNode();
+ if (entry == null) {
+ throw new RuleChainException(chain.getId(), null, "规则链缺少 Trigger 入口节点");
+ }
+
+ Deque queue = new ArrayDeque<>();
+ queue.offer(entry);
+
+ int executed = 0;
+ while (!queue.isEmpty()) {
+ if (++executed > MAX_NODES_PER_EXECUTION) {
+ throw new RuleChainException(chain.getId(), null,
+ "DAG 执行超过 " + MAX_NODES_PER_EXECUTION + " 个节点,疑似脏数据");
+ }
+ CompiledNode current = queue.poll();
+ ctx.fork(current.getId());
+ NodeResult result = executeSingleNode(chain, current, ctx);
+ ctx.recordNodeOutput(current.getId(), result.getOutputs());
+
+ // 按 relationType 选 outgoing links;SKIP 语义:下游不再触发
+ if (result.getRelationType() == RuleLinkRelationType.SKIP) {
+ log.debug("[DagExecutor] chain={} node={} SKIP:{}", chain.getId(), current.getId(), result.getMessage());
+ continue;
+ }
+
+ List outgoing = chain.outgoingOf(current.getId());
+ for (CompiledLink link : outgoing) {
+ if (link.getRelationType() == result.getRelationType()) {
+ CompiledNode next = chain.nodeById(link.getTargetNodeId());
+ if (next != null) queue.offer(next);
+ }
+ }
+ }
+ }
+
+ private NodeResult executeSingleNode(CompiledRuleChain chain, CompiledNode node, RuleContext ctx) {
+ Timer.Sample sample = meterRegistry == null ? null : Timer.start(meterRegistry);
+ String outcome = "success";
+ try {
+ NodeProvider provider = providerRegistry.resolve(node.getCategory(), node.getType());
+ NodeResult result = provider.execute(ctx, node.getConfiguration());
+ if (result == null) {
+ outcome = "null-result";
+ return NodeResult.failure("Provider 返回 null:" + node.getCategory() + ":" + node.getType());
+ }
+ outcome = result.getRelationType().name().toLowerCase();
+ return result;
+ } catch (RuleChainException e) {
+ outcome = "rule-chain-error";
+ throw e;
+ } catch (RuntimeException e) {
+ outcome = "runtime-error";
+ log.warn("[DagExecutor] chain={} node={} type={} 执行异常:{}",
+ chain.getId(), node.getId(), node.getType(), e.getMessage());
+ return NodeResult.failure(e.getMessage(), e);
+ } finally {
+ if (sample != null) {
+ sample.stop(Timer.builder("iot.rule.execution")
+ .tag("chainId", String.valueOf(chain.getId()))
+ .tag("nodeCategory", node.getCategory().getValue())
+ .tag("nodeType", node.getType())
+ .tag("outcome", outcome)
+ .register(meterRegistry));
+ }
+ }
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DefaultRuleEngine.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DefaultRuleEngine.java
new file mode 100644
index 00000000..635b78f5
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DefaultRuleEngine.java
@@ -0,0 +1,88 @@
+package com.viewsh.module.iot.rule.engine;
+
+import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * 默认 RuleEngine 实现。
+ *
+ * 决议 #3 / 评审 11.2:链级 try-catch 隔离,单链异常不影响其他链;失败计数
+ * 上报 Micrometer 计数器 {@code iot.rule.failure}(tag=chainId / errorType)。
+ *
+ *
评审 A5:chains 顺序执行,不使用 Reactor flatMap 并发(现有系统非响应式)。
+ */
+@Slf4j
+public class DefaultRuleEngine implements RuleEngine {
+
+ private final ChainIndex chainIndex;
+ private final DagExecutor dagExecutor;
+ private final MeterRegistry meterRegistry;
+
+ public DefaultRuleEngine(ChainIndex chainIndex, DagExecutor dagExecutor, MeterRegistry meterRegistry) {
+ this.chainIndex = chainIndex;
+ this.dagExecutor = dagExecutor;
+ this.meterRegistry = meterRegistry;
+ }
+
+ @Override
+ public RuleEngineResult execute(IotDeviceMessage message, Long tenantId,
+ Long subsystemId, Long productId, Long deviceId) {
+ String traceId = UUID.randomUUID().toString();
+ RuleEngineResult result = new RuleEngineResult();
+ result.setTraceId(traceId);
+
+ List chains = chainIndex.match(tenantId, subsystemId, productId, deviceId);
+ result.setMatchedChainCount(chains.size());
+
+ if (chains.isEmpty()) {
+ log.debug("[RuleEngine] trace={} tenant={} sub={} prod={} dev={} 无匹配规则链",
+ traceId, tenantId, subsystemId, productId, deviceId);
+ return result;
+ }
+
+ for (CompiledRuleChain chain : chains) {
+ RuleContext ctx = newContext(chain, message, tenantId, subsystemId, productId, deviceId, traceId);
+ try {
+ dagExecutor.execute(chain, ctx);
+ result.addSuccess(chain.getId());
+ } catch (Exception e) {
+ log.error("[RuleEngine] chain={} trace={} 执行失败", chain.getId(), traceId, e);
+ recordFailure(chain.getId(), e);
+ result.addFailure(chain.getId(), e.getMessage(), e);
+ // 不中断,继续执行下一条链
+ }
+ }
+ return result;
+ }
+
+ private RuleContext newContext(CompiledRuleChain chain, IotDeviceMessage message,
+ Long tenantId, Long subsystemId, Long productId, Long deviceId,
+ String traceId) {
+ RuleContext ctx = new RuleContext();
+ ctx.setTraceId(traceId);
+ ctx.setChainId(chain.getId());
+ ctx.setMessage(message);
+ ctx.setTenantId(tenantId);
+ ctx.setSubsystemId(subsystemId);
+ ctx.setProductId(productId);
+ ctx.setDeviceId(deviceId);
+ ctx.setStartedAt(Instant.now());
+ ctx.setDebugMode(chain.isDebugMode());
+ return ctx;
+ }
+
+ private void recordFailure(Long chainId, Throwable e) {
+ if (meterRegistry == null) return;
+ Counter.builder("iot.rule.failure")
+ .tag("chainId", String.valueOf(chainId))
+ .tag("errorType", e.getClass().getSimpleName())
+ .register(meterRegistry)
+ .increment();
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeProvider.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeProvider.java
new file mode 100644
index 00000000..7644d6a7
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeProvider.java
@@ -0,0 +1,30 @@
+package com.viewsh.module.iot.rule.engine;
+
+import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory;
+
+/**
+ * 节点执行 SPI(B4 / B5 / B6 实现)。
+ *
+ * 注册方式:Spring @Component + getCategory()/getType() 索引
+ * (规范清单明确禁用 ServiceLoader / @SPI)。
+ *
+ *
DagExecutor 遍历到节点时,按 {@code (category, type)} 在
+ * {@link NodeProviderRegistry} 查找对应 Provider 并 {@link #execute(RuleContext, String)}。
+ *
+ *
返回 {@link NodeResult#getRelationType()} 决定 DAG 走哪条 outgoing link。
+ */
+public interface NodeProvider {
+
+ /** 节点类别:trigger / condition / action */
+ RuleNodeCategory getCategory();
+
+ /** 节点类型标识,如 "device_property" / "alarm_trigger"(与 rule_node.type 对齐) */
+ String getType();
+
+ /**
+ * 执行节点。config 是 rule_node.configuration 的 JSON 原文,由 Provider 自行解析。
+ * 实现必须是幂等可重试的,并在失败时返回 {@link NodeResult#failure(String)};
+ * 抛出 RuntimeException 将被 DagExecutor 转为 FAILURE 并交由链级 try-catch 兜底。
+ */
+ NodeResult execute(RuleContext ctx, String config);
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeProviderRegistry.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeProviderRegistry.java
new file mode 100644
index 00000000..33f3829e
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeProviderRegistry.java
@@ -0,0 +1,48 @@
+package com.viewsh.module.iot.rule.engine;
+
+import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory;
+import com.viewsh.module.iot.rule.engine.exception.RuleChainException;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * NodeProvider 注册表(按 (category, type) 路由)。
+ *
+ *
启动时由 Spring 注入所有 {@link NodeProvider} Bean,构造一个不可变索引。
+ * 运行时 {@link #resolve(RuleNodeCategory, String)} 纯内存查表,性能 < 1μs。
+ */
+@Slf4j
+public class NodeProviderRegistry {
+
+ private final Map providersByKey;
+
+ public NodeProviderRegistry(List providers) {
+ this.providersByKey = providers.stream()
+ .collect(Collectors.toUnmodifiableMap(
+ p -> key(p.getCategory(), p.getType()),
+ Function.identity(),
+ (a, b) -> {
+ throw new IllegalStateException(
+ "Duplicate NodeProvider registered: " + a.getCategory() + ":" + a.getType()
+ + " / " + b.getClass().getName());
+ }));
+ log.info("[NodeProviderRegistry] 已注册 {} 个节点 Provider:{}", providersByKey.size(), providersByKey.keySet());
+ }
+
+ public NodeProvider resolve(RuleNodeCategory category, String type) {
+ NodeProvider p = providersByKey.get(key(category, type));
+ if (p == null) {
+ throw new RuleChainException(
+ "未注册的节点 Provider:category=" + category + ", type=" + type);
+ }
+ return p;
+ }
+
+ private static String key(RuleNodeCategory category, String type) {
+ return category.getValue() + ":" + type;
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeResult.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeResult.java
new file mode 100644
index 00000000..1c7e750a
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeResult.java
@@ -0,0 +1,76 @@
+package com.viewsh.module.iot.rule.engine;
+
+import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType;
+import lombok.Getter;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * 节点执行结果。relationType 决定 DAG 选哪条 outgoing link 走下游。
+ *
+ * 约定(评审 B4,封闭枚举):
+ * - Trigger 节点成功:SUCCESS
+ * - Condition 节点:TRUE / FALSE
+ * - Action 节点:SUCCESS / FAILURE
+ * - Action/Node 超时:TIMEOUT
+ * - 静默跳过(ShakeLimit/黑名单):SKIP
+ */
+@Getter
+public class NodeResult {
+
+ private final RuleLinkRelationType relationType;
+ private final Map outputs;
+ private final String message;
+ private final Throwable error;
+
+ private NodeResult(RuleLinkRelationType relationType,
+ Map outputs,
+ String message,
+ Throwable error) {
+ this.relationType = relationType;
+ this.outputs = outputs == null ? Collections.emptyMap() : outputs;
+ this.message = message;
+ this.error = error;
+ }
+
+ public static NodeResult of(RuleLinkRelationType relationType) {
+ return new NodeResult(relationType, Collections.emptyMap(), null, null);
+ }
+
+ public static NodeResult of(RuleLinkRelationType relationType, Map outputs) {
+ return new NodeResult(relationType, outputs, null, null);
+ }
+
+ public static NodeResult success() {
+ return of(RuleLinkRelationType.SUCCESS);
+ }
+
+ public static NodeResult success(Map outputs) {
+ return of(RuleLinkRelationType.SUCCESS, outputs);
+ }
+
+ public static NodeResult failure(String message) {
+ return new NodeResult(RuleLinkRelationType.FAILURE, Collections.emptyMap(), message, null);
+ }
+
+ public static NodeResult failure(String message, Throwable error) {
+ return new NodeResult(RuleLinkRelationType.FAILURE, Collections.emptyMap(), message, error);
+ }
+
+ public static NodeResult truthy() {
+ return of(RuleLinkRelationType.TRUE);
+ }
+
+ public static NodeResult falsy() {
+ return of(RuleLinkRelationType.FALSE);
+ }
+
+ public static NodeResult timeout(String message) {
+ return new NodeResult(RuleLinkRelationType.TIMEOUT, Collections.emptyMap(), message, null);
+ }
+
+ public static NodeResult skip(String reason) {
+ return new NodeResult(RuleLinkRelationType.SKIP, Collections.emptyMap(), reason, null);
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleChainCompiler.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleChainCompiler.java
new file mode 100644
index 00000000..156f2def
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleChainCompiler.java
@@ -0,0 +1,134 @@
+package com.viewsh.module.iot.rule.engine;
+
+import com.viewsh.module.iot.rule.dal.dataobject.IotRuleChainDO;
+import com.viewsh.module.iot.rule.dal.dataobject.IotRuleLinkDO;
+import com.viewsh.module.iot.rule.dal.dataobject.IotRuleNodeDO;
+import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType;
+import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory;
+import com.viewsh.module.iot.rule.engine.exception.RuleChainException;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * 从 DO 三元组(chain + nodes + links)构造 CompiledRuleChain(B8 缓存加载 / 测试可复用)。
+ *
+ * 保留 DAG 无环校验作为兜底(正常路径由 B2 Service 层保证);
+ * 未通过校验直接抛 {@link RuleChainException} 阻止脏数据进入缓存。
+ */
+public final class RuleChainCompiler {
+
+ private enum DfsState { NEW, VISITING, DONE }
+
+ private RuleChainCompiler() {}
+
+ public static CompiledRuleChain compile(IotRuleChainDO chainDO,
+ List nodeDOs,
+ List linkDOs) {
+ Objects.requireNonNull(chainDO, "chainDO");
+ if (nodeDOs == null || nodeDOs.isEmpty()) {
+ throw new RuleChainException(chainDO.getId(), null, "规则链无节点");
+ }
+
+ // 1. 节点映射
+ Map nodesById = new HashMap<>(nodeDOs.size() * 2);
+ CompiledNode firstNode = null;
+ int triggerCount = 0;
+ for (IotRuleNodeDO dto : nodeDOs) {
+ RuleNodeCategory category = RuleNodeCategory.of(dto.getCategory());
+ if (category == null) {
+ throw new RuleChainException(chainDO.getId(), String.valueOf(dto.getId()),
+ "非法节点 category:" + dto.getCategory());
+ }
+ CompiledNode node = CompiledNode.builder()
+ .id(String.valueOf(dto.getId()))
+ .name(dto.getName())
+ .category(category)
+ .type(dto.getType())
+ .configuration(dto.getConfiguration())
+ .build();
+ nodesById.put(node.getId(), node);
+ if (category == RuleNodeCategory.TRIGGER) {
+ triggerCount++;
+ firstNode = node;
+ }
+ }
+ if (triggerCount != 1) {
+ throw new RuleChainException(chainDO.getId(), null,
+ "规则链必须有且仅有一个 Trigger 节点,当前 " + triggerCount + " 个");
+ }
+
+ // 2. 出边映射(已按 sortOrder 排序)
+ Map> outgoingByNode = new HashMap<>();
+ if (linkDOs != null) {
+ for (IotRuleLinkDO link : linkDOs) {
+ RuleLinkRelationType relation = RuleLinkRelationType.of(link.getRelationType());
+ if (relation == null) {
+ throw new RuleChainException(chainDO.getId(), String.valueOf(link.getSourceNodeId()),
+ "非法 relation_type:" + link.getRelationType());
+ }
+ CompiledLink compiled = CompiledLink.builder()
+ .id(link.getId())
+ .sourceNodeId(String.valueOf(link.getSourceNodeId()))
+ .targetNodeId(String.valueOf(link.getTargetNodeId()))
+ .relationType(relation)
+ .condition(link.getCondition())
+ .sortOrder(link.getSortOrder() == null ? 0 : link.getSortOrder())
+ .build();
+ outgoingByNode
+ .computeIfAbsent(compiled.getSourceNodeId(), k -> new ArrayList<>())
+ .add(compiled);
+ }
+ outgoingByNode.values().forEach(list ->
+ list.sort(Comparator.comparingInt(CompiledLink::getSortOrder)));
+ }
+
+ // 3. DAG 无环兜底(DFS)
+ assertAcyclic(chainDO.getId(), nodesById, outgoingByNode);
+
+ return CompiledRuleChain.builder()
+ .id(chainDO.getId())
+ .name(chainDO.getName())
+ .tenantId(chainDO.getTenantId())
+ .priority(chainDO.getPriority() == null ? 100 : chainDO.getPriority())
+ .version(chainDO.getVersion() == null ? 0L : chainDO.getVersion())
+ .debugMode(Boolean.TRUE.equals(chainDO.getDebugMode()))
+ .subsystemId(chainDO.getSubsystemId())
+ .productId(chainDO.getProductId())
+ .deviceId(chainDO.getDeviceId())
+ .firstNode(firstNode)
+ .nodesById(Map.copyOf(nodesById))
+ .outgoingByNode(outgoingByNode.entrySet().stream()
+ .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> List.copyOf(e.getValue()))))
+ .build();
+ }
+
+ private static void assertAcyclic(Long chainId,
+ Map nodesById,
+ Map> outgoingByNode) {
+ Map state = new HashMap<>();
+ for (String id : nodesById.keySet()) state.put(id, DfsState.NEW);
+ for (String id : nodesById.keySet()) {
+ if (state.get(id) == DfsState.NEW) {
+ dfs(chainId, id, state, outgoingByNode);
+ }
+ }
+ }
+
+ private static void dfs(Long chainId, String nodeId,
+ Map state,
+ Map> outgoing) {
+ state.put(nodeId, DfsState.VISITING);
+ for (CompiledLink link : outgoing.getOrDefault(nodeId, List.of())) {
+ DfsState s = state.get(link.getTargetNodeId());
+ if (s == DfsState.VISITING) {
+ throw new RuleChainException(chainId, nodeId,
+ "DAG 存在环:" + nodeId + " → " + link.getTargetNodeId());
+ }
+ if (s == DfsState.NEW) {
+ dfs(chainId, link.getTargetNodeId(), state, outgoing);
+ }
+ }
+ state.put(nodeId, DfsState.DONE);
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleContext.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleContext.java
new file mode 100644
index 00000000..847a59ad
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleContext.java
@@ -0,0 +1,48 @@
+package com.viewsh.module.iot.rule.engine;
+
+import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
+import lombok.Data;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 规则链执行上下文。每一条消息匹配到的每条规则链,会生成一份独立的 RuleContext。
+ *
+ * 线程模型:一个 RuleContext 对应一次 chain 执行,BFS 遍历中节点间串行传递。
+ * fork(node) 用于标记当前执行到的节点(不复制状态),由 DagExecutor 驱动。
+ */
+@Data
+public class RuleContext {
+
+ private String traceId;
+ private Long chainId;
+ private String currentNodeId;
+
+ private IotDeviceMessage message;
+ private Long tenantId;
+ private Long subsystemId;
+ private Long productId;
+ private Long deviceId;
+
+ /** 节点间共享元数据(富化结果、中间结果) */
+ private final Map metadata = new HashMap<>();
+ /** 每个节点的输出(按 nodeId 索引),调试 / 后继节点可引用 */
+ private final Map> nodeOutputs = new HashMap<>();
+
+ private Instant startedAt;
+ private boolean debugMode;
+
+ public RuleContext fork(String nextNodeId) {
+ this.currentNodeId = nextNodeId;
+ return this;
+ }
+
+ public void recordNodeOutput(String nodeId, Map outputs) {
+ if (outputs != null && !outputs.isEmpty()) {
+ nodeOutputs.put(nodeId, outputs);
+ metadata.putAll(outputs);
+ }
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngine.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngine.java
new file mode 100644
index 00000000..97f43703
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngine.java
@@ -0,0 +1,31 @@
+package com.viewsh.module.iot.rule.engine;
+
+import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
+
+/**
+ * 规则引擎对外入口(B9 Handler 调用)。
+ *
+ * 一次 {@link #execute(IotDeviceMessage, Long, Long, Long, Long)} 内会:
+ *
+ * - 按三层 ID 从 {@link ChainIndex} 匹配规则链(含去重 + 排序)
+ * - 对每条链独立 try-catch,失败不中断其他链(决议 #3 / 评审 11.2)
+ * - 逐链构造 {@link RuleContext} 并交由 {@link DagExecutor} 遍历
+ *
+ */
+public interface RuleEngine {
+
+ /**
+ * 执行所有匹配规则链,返回汇总结果。
+ *
+ * @param message 原始设备消息
+ * @param tenantId 租户(所有查询必带)
+ * @param subsystemId 子系统(可 null,匹配 NULL wildcard)
+ * @param productId 产品(可 null)
+ * @param deviceId 设备(可 null)
+ */
+ RuleEngineResult execute(IotDeviceMessage message,
+ Long tenantId,
+ Long subsystemId,
+ Long productId,
+ Long deviceId);
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngineConfiguration.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngineConfiguration.java
new file mode 100644
index 00000000..f6cd6b3e
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngineConfiguration.java
@@ -0,0 +1,55 @@
+package com.viewsh.module.iot.rule.engine;
+
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * 规则引擎自动装配。
+ *
+ * 注册以下核心 Bean:
+ *
+ * - {@link NodeProviderRegistry}:聚合所有 {@link NodeProvider} Bean(B4/B5/B6 提供)
+ * - {@link ChainIndex}:规则链索引(B8 负责加载)
+ * - {@link DagExecutor} / {@link RuleEngine}
+ *
+ *
+ * MeterRegistry 由 spring-boot-starter-actuator 提供;无该 starter 时回退到 SimpleMeterRegistry,保证单模块可运行。
+ */
+@Configuration
+public class RuleEngineConfiguration {
+
+ @Bean
+ public NodeProviderRegistry nodeProviderRegistry(ObjectProvider providers) {
+ List list = providers.orderedStream().toList();
+ return new NodeProviderRegistry(list.isEmpty() ? Collections.emptyList() : list);
+ }
+
+ @Bean
+ @ConditionalOnMissingBean
+ public ChainIndex chainIndex() {
+ return new ChainIndex();
+ }
+
+ @Bean
+ @ConditionalOnMissingBean(MeterRegistry.class)
+ public MeterRegistry ruleEngineMeterRegistryFallback() {
+ return new SimpleMeterRegistry();
+ }
+
+ @Bean
+ public DagExecutor dagExecutor(NodeProviderRegistry registry, MeterRegistry meterRegistry) {
+ return new DagExecutor(registry, meterRegistry);
+ }
+
+ @Bean
+ public RuleEngine ruleEngine(ChainIndex chainIndex, DagExecutor dagExecutor, MeterRegistry meterRegistry) {
+ return new DefaultRuleEngine(chainIndex, dagExecutor, meterRegistry);
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngineResult.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngineResult.java
new file mode 100644
index 00000000..668a54be
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngineResult.java
@@ -0,0 +1,48 @@
+package com.viewsh.module.iot.rule.engine;
+
+import lombok.Data;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * 一条消息执行后的汇总结果(含每条规则链的成功 / 失败)。
+ *
+ * 调用方(B9 Handler)可根据 result 做指标统计、告警回退等。
+ */
+@Data
+public class RuleEngineResult {
+
+ private String traceId;
+ private int matchedChainCount;
+ private final List successChainIds = new ArrayList<>();
+ private final List failures = new ArrayList<>();
+
+ public void addSuccess(Long chainId) {
+ successChainIds.add(chainId);
+ }
+
+ public void addFailure(Long chainId, String message, Throwable cause) {
+ failures.add(new ChainFailure(chainId, message, cause));
+ }
+
+ public List getSuccessChainIds() {
+ return Collections.unmodifiableList(successChainIds);
+ }
+
+ public List getFailures() {
+ return Collections.unmodifiableList(failures);
+ }
+
+ public boolean hasFailure() {
+ return !failures.isEmpty();
+ }
+
+ @Data
+ public static class ChainFailure {
+ private final Long chainId;
+ private final String message;
+ private final Throwable cause;
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/exception/RuleChainException.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/exception/RuleChainException.java
new file mode 100644
index 00000000..87945f45
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/exception/RuleChainException.java
@@ -0,0 +1,32 @@
+package com.viewsh.module.iot.rule.engine.exception;
+
+/**
+ * 规则链级异常(可恢复)。链级 try-catch 捕获此异常仅记录失败,
+ * 不中断其他规则链执行(决议 #3 / 评审 11.2)。
+ */
+public class RuleChainException extends RuntimeException {
+
+ private final Long chainId;
+ private final String nodeId;
+
+ public RuleChainException(String message) {
+ this(null, null, message, null);
+ }
+
+ public RuleChainException(String message, Throwable cause) {
+ this(null, null, message, cause);
+ }
+
+ public RuleChainException(Long chainId, String nodeId, String message) {
+ this(chainId, nodeId, message, null);
+ }
+
+ public RuleChainException(Long chainId, String nodeId, String message, Throwable cause) {
+ super(message, cause);
+ this.chainId = chainId;
+ this.nodeId = nodeId;
+ }
+
+ public Long getChainId() { return chainId; }
+ public String getNodeId() { return nodeId; }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/ChainIndexTest.java b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/ChainIndexTest.java
new file mode 100644
index 00000000..18efd7f3
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/ChainIndexTest.java
@@ -0,0 +1,112 @@
+package com.viewsh.module.iot.rule.engine;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * {@link ChainIndex} 单元测试:覆盖 4 种 key 组合匹配、去重、priority 排序。
+ */
+class ChainIndexTest {
+
+ private static final Long TENANT = 1L;
+
+ private ChainIndex index;
+
+ @BeforeEach
+ void setUp() {
+ index = new ChainIndex();
+ }
+
+ @Test
+ void testMatch_fullSpec() {
+ index.load(TENANT, List.of(chain(10L, 5L, 6L, 7L, 100)));
+ List result = index.match(TENANT, 5L, 6L, 7L);
+ assertEquals(1, result.size());
+ assertEquals(10L, result.get(0).getId());
+ }
+
+ @Test
+ void testMatch_productLevelWildcard() {
+ index.load(TENANT, List.of(chain(11L, 5L, 6L, null, 100)));
+ // 查带 deviceId,走 (sub, prod, null) 匹配
+ List result = index.match(TENANT, 5L, 6L, 999L);
+ assertEquals(1, result.size());
+ assertEquals(11L, result.get(0).getId());
+ }
+
+ @Test
+ void testMatch_subsystemLevelWildcard() {
+ index.load(TENANT, List.of(chain(12L, 5L, null, null, 100)));
+ List result = index.match(TENANT, 5L, 999L, 999L);
+ assertEquals(1, result.size());
+ assertEquals(12L, result.get(0).getId());
+ }
+
+ @Test
+ void testMatch_globalWildcard() {
+ index.load(TENANT, List.of(chain(13L, null, null, null, 100)));
+ List result = index.match(TENANT, 5L, 6L, 7L);
+ assertEquals(1, result.size());
+ assertEquals(13L, result.get(0).getId());
+ }
+
+ @Test
+ void testMatch_dedupSameChainAcrossKeys() {
+ // 同一条链既建了具体绑定,又建了通配 — 应只返回一次(【评审 B3】)
+ CompiledRuleChain specific = chain(20L, 5L, 6L, 7L, 100);
+ CompiledRuleChain wildcard = chain(20L, null, null, null, 100);
+ index.load(TENANT, List.of(specific, wildcard));
+
+ List result = index.match(TENANT, 5L, 6L, 7L);
+ assertEquals(1, result.size(), "同一 chainId 应去重为 1 条");
+ assertEquals(20L, result.get(0).getId());
+ }
+
+ @Test
+ void testMatch_prioritySortAsc() {
+ index.load(TENANT, List.of(
+ chain(30L, 5L, 6L, 7L, 300),
+ chain(31L, 5L, 6L, 7L, 100),
+ chain(32L, 5L, 6L, 7L, 200)));
+ List result = index.match(TENANT, 5L, 6L, 7L);
+ assertEquals(3, result.size());
+ assertEquals(31L, result.get(0).getId());
+ assertEquals(32L, result.get(1).getId());
+ assertEquals(30L, result.get(2).getId());
+ }
+
+ @Test
+ void testMatch_emptyWhenNoChainLoaded() {
+ List result = index.match(TENANT, 5L, 6L, 7L);
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ void testEvictTenant_clearsIndex() {
+ index.load(TENANT, List.of(chain(40L, 5L, 6L, 7L, 100)));
+ assertEquals(1, index.sizeOf(TENANT));
+ index.evictTenant(TENANT);
+ assertEquals(0, index.sizeOf(TENANT));
+ assertTrue(index.match(TENANT, 5L, 6L, 7L).isEmpty());
+ }
+
+ @Test
+ void testMatch_tenantIsolation() {
+ index.load(1L, List.of(chain(50L, 5L, 6L, 7L, 100)));
+ index.load(2L, List.of(chain(51L, 5L, 6L, 7L, 100)));
+ assertEquals(50L, index.match(1L, 5L, 6L, 7L).get(0).getId());
+ assertEquals(51L, index.match(2L, 5L, 6L, 7L).get(0).getId());
+ }
+
+ private static CompiledRuleChain chain(Long id, Long sub, Long prod, Long dev, int priority) {
+ return CompiledRuleChain.builder()
+ .id(id).name("chain-" + id).tenantId(TENANT)
+ .priority(priority).version(1L)
+ .subsystemId(sub).productId(prod).deviceId(dev)
+ .build();
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/DagExecutorTest.java b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/DagExecutorTest.java
new file mode 100644
index 00000000..7459795c
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/DagExecutorTest.java
@@ -0,0 +1,215 @@
+package com.viewsh.module.iot.rule.engine;
+
+import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
+import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType;
+import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory;
+import com.viewsh.module.iot.rule.engine.exception.RuleChainException;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * {@link DagExecutor} 单元测试:覆盖线性链、分支链、并行动作、错误转 FAILURE、元数据传递、SKIP 截断。
+ */
+class DagExecutorTest {
+
+ private DagExecutor executor;
+
+ /** 每次测试用可变的 Map 登记节点执行顺序,用于断言 */
+ private List executed;
+ /** Provider 用户指定每个 (category:type) 的行为(返回什么 NodeResult) */
+ private Map behaviors;
+
+ @BeforeEach
+ void setUp() {
+ executed = new ArrayList<>();
+ behaviors = new HashMap<>();
+
+ NodeProviderRegistry registry = new NodeProviderRegistry(List.of(
+ new RecordingProvider(RuleNodeCategory.TRIGGER, "test_trigger"),
+ new RecordingProvider(RuleNodeCategory.CONDITION, "test_condition"),
+ new RecordingProvider(RuleNodeCategory.ACTION, "test_action_a"),
+ new RecordingProvider(RuleNodeCategory.ACTION, "test_action_b"),
+ new RecordingProvider(RuleNodeCategory.ACTION, "test_action_c")));
+
+ executor = new DagExecutor(registry, new SimpleMeterRegistry());
+ }
+
+ @Test
+ void testLinearChain_triggerThenAction() {
+ // Trigger(1) --Success--> Action(2)
+ CompiledNode trigger = node("1", RuleNodeCategory.TRIGGER, "test_trigger");
+ CompiledNode action = node("2", RuleNodeCategory.ACTION, "test_action_a");
+
+ CompiledRuleChain chain = chain(trigger, List.of(trigger, action),
+ Map.of("1", List.of(link("1", "2", RuleLinkRelationType.SUCCESS))));
+
+ executor.execute(chain, ctx(chain));
+
+ assertEquals(List.of("1", "2"), executed);
+ }
+
+ @Test
+ void testBranchChain_conditionTrueTakesTrueBranch() {
+ // Trigger(1) --Success--> Condition(2) --True--> Action(3)
+ // --False--> Action(4)
+ behaviors.put("condition:test_condition", ctx -> NodeResult.truthy());
+ CompiledNode t = node("1", RuleNodeCategory.TRIGGER, "test_trigger");
+ CompiledNode c = node("2", RuleNodeCategory.CONDITION, "test_condition");
+ CompiledNode a = node("3", RuleNodeCategory.ACTION, "test_action_a");
+ CompiledNode b = node("4", RuleNodeCategory.ACTION, "test_action_b");
+
+ CompiledRuleChain chain = chain(t, List.of(t, c, a, b), Map.of(
+ "1", List.of(link("1", "2", RuleLinkRelationType.SUCCESS)),
+ "2", List.of(
+ link("2", "3", RuleLinkRelationType.TRUE),
+ link("2", "4", RuleLinkRelationType.FALSE))));
+
+ executor.execute(chain, ctx(chain));
+
+ assertTrue(executed.contains("3"), "True 分支应执行");
+ assertFalse(executed.contains("4"), "False 分支不应执行");
+ }
+
+ @Test
+ void testParallelActions_fanOut() {
+ // Trigger(1) --Success--> Action(2), Action(3), Action(4)(三条并列 Success 边)
+ CompiledNode t = node("1", RuleNodeCategory.TRIGGER, "test_trigger");
+ CompiledNode a = node("2", RuleNodeCategory.ACTION, "test_action_a");
+ CompiledNode b = node("3", RuleNodeCategory.ACTION, "test_action_b");
+ CompiledNode c = node("4", RuleNodeCategory.ACTION, "test_action_c");
+
+ CompiledRuleChain chain = chain(t, List.of(t, a, b, c), Map.of(
+ "1", List.of(
+ link("1", "2", RuleLinkRelationType.SUCCESS),
+ link("1", "3", RuleLinkRelationType.SUCCESS),
+ link("1", "4", RuleLinkRelationType.SUCCESS))));
+
+ executor.execute(chain, ctx(chain));
+
+ assertTrue(executed.containsAll(List.of("2", "3", "4")));
+ }
+
+ @Test
+ void testActionFailure_convertsRuntimeExceptionToFailure() {
+ // action_a 抛 RuntimeException → 应被转为 FAILURE,不向上抛
+ behaviors.put("action:test_action_a", ctx -> { throw new RuntimeException("boom"); });
+
+ CompiledNode t = node("1", RuleNodeCategory.TRIGGER, "test_trigger");
+ CompiledNode a = node("2", RuleNodeCategory.ACTION, "test_action_a");
+ CompiledNode onFailure = node("3", RuleNodeCategory.ACTION, "test_action_b");
+
+ CompiledRuleChain chain = chain(t, List.of(t, a, onFailure), Map.of(
+ "1", List.of(link("1", "2", RuleLinkRelationType.SUCCESS)),
+ "2", List.of(link("2", "3", RuleLinkRelationType.FAILURE))));
+
+ assertDoesNotThrow(() -> executor.execute(chain, ctx(chain)));
+ assertTrue(executed.contains("3"), "FAILURE 分支应被走到");
+ }
+
+ @Test
+ void testMetadataPropagation_acrossNodes() {
+ // Node1 写 meta['foo']=bar → Node2 读取
+ AtomicInteger readValue = new AtomicInteger();
+ behaviors.put("trigger:test_trigger", ctx ->
+ NodeResult.success(Map.of("foo", 42)));
+ behaviors.put("action:test_action_a", ctx -> {
+ Object v = ctx.getMetadata().get("foo");
+ readValue.set(v instanceof Integer ? (Integer) v : -1);
+ return NodeResult.success();
+ });
+
+ CompiledNode t = node("1", RuleNodeCategory.TRIGGER, "test_trigger");
+ CompiledNode a = node("2", RuleNodeCategory.ACTION, "test_action_a");
+
+ CompiledRuleChain chain = chain(t, List.of(t, a),
+ Map.of("1", List.of(link("1", "2", RuleLinkRelationType.SUCCESS))));
+
+ executor.execute(chain, ctx(chain));
+ assertEquals(42, readValue.get());
+ }
+
+ @Test
+ void testSkipResult_prunesDownstream() {
+ // Action(2) 返回 SKIP → 下游 Action(3) 不执行
+ behaviors.put("action:test_action_a", ctx -> NodeResult.skip("shake-limit"));
+
+ CompiledNode t = node("1", RuleNodeCategory.TRIGGER, "test_trigger");
+ CompiledNode a = node("2", RuleNodeCategory.ACTION, "test_action_a");
+ CompiledNode b = node("3", RuleNodeCategory.ACTION, "test_action_b");
+
+ CompiledRuleChain chain = chain(t, List.of(t, a, b), Map.of(
+ "1", List.of(link("1", "2", RuleLinkRelationType.SUCCESS)),
+ "2", List.of(link("2", "3", RuleLinkRelationType.SUCCESS))));
+
+ executor.execute(chain, ctx(chain));
+ assertTrue(executed.contains("2"));
+ assertFalse(executed.contains("3"), "SKIP 下游不应执行");
+ }
+
+ @Test
+ void testMissingTrigger_throwsRuleChainException() {
+ CompiledRuleChain chain = CompiledRuleChain.builder()
+ .id(1L).tenantId(1L).priority(100)
+ .firstNode(null).nodesById(Map.of()).outgoingByNode(Map.of())
+ .build();
+ assertThrows(RuleChainException.class, () -> executor.execute(chain, ctx(chain)));
+ }
+
+ // --- helpers ---
+
+ private RuleContext ctx(CompiledRuleChain chain) {
+ RuleContext ctx = new RuleContext();
+ ctx.setChainId(chain.getId());
+ ctx.setTenantId(chain.getTenantId());
+ ctx.setTraceId("trace-test");
+ ctx.setMessage(IotDeviceMessage.requestOf("thing.property.report"));
+ return ctx;
+ }
+
+ private static CompiledNode node(String id, RuleNodeCategory category, String type) {
+ return CompiledNode.builder().id(id).name("node-" + id)
+ .category(category).type(type).configuration("{}").build();
+ }
+
+ private static CompiledLink link(String from, String to, RuleLinkRelationType rel) {
+ return CompiledLink.builder()
+ .id(Long.parseLong(from + to))
+ .sourceNodeId(from).targetNodeId(to)
+ .relationType(rel).sortOrder(0).build();
+ }
+
+ private static CompiledRuleChain chain(CompiledNode first, List nodes,
+ Map> outgoing) {
+ Map byId = new HashMap<>();
+ nodes.forEach(n -> byId.put(n.getId(), n));
+ return CompiledRuleChain.builder()
+ .id(1L).tenantId(1L).name("test").priority(100).version(1L)
+ .firstNode(first).nodesById(byId).outgoingByNode(outgoing)
+ .build();
+ }
+
+ /** 可插入 behaviors 的 Provider:默认返回 SUCCESS,可被覆写 */
+ @FunctionalInterface
+ private interface NodeBehavior {
+ NodeResult apply(RuleContext ctx);
+ }
+
+ private class RecordingProvider implements NodeProvider {
+ private final RuleNodeCategory category;
+ private final String type;
+ RecordingProvider(RuleNodeCategory c, String t) { this.category = c; this.type = t; }
+ @Override public RuleNodeCategory getCategory() { return category; }
+ @Override public String getType() { return type; }
+ @Override public NodeResult execute(RuleContext ctx, String config) {
+ executed.add(ctx.getCurrentNodeId());
+ NodeBehavior b = behaviors.get(category.getValue() + ":" + type);
+ return b == null ? NodeResult.success() : b.apply(ctx);
+ }
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/DefaultRuleEngineTest.java b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/DefaultRuleEngineTest.java
new file mode 100644
index 00000000..ce4c484d
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/DefaultRuleEngineTest.java
@@ -0,0 +1,100 @@
+package com.viewsh.module.iot.rule.engine;
+
+import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
+import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory;
+import com.viewsh.module.iot.rule.engine.exception.RuleChainException;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * {@link DefaultRuleEngine} 单元测试:重点覆盖决议 #3 的链级 try-catch 隔离。
+ */
+class DefaultRuleEngineTest {
+
+ private static final Long TENANT = 1L;
+
+ private ChainIndex chainIndex;
+ private AtomicInteger chain1Executions;
+ private AtomicInteger chain3Executions;
+
+ private DefaultRuleEngine engine;
+
+ @BeforeEach
+ void setUp() {
+ chainIndex = new ChainIndex();
+ chain1Executions = new AtomicInteger();
+ chain3Executions = new AtomicInteger();
+
+ NodeProviderRegistry registry = new NodeProviderRegistry(List.of(
+ new NodeProvider() {
+ @Override public RuleNodeCategory getCategory() { return RuleNodeCategory.TRIGGER; }
+ @Override public String getType() { return "ok"; }
+ @Override public NodeResult execute(RuleContext ctx, String config) {
+ if (ctx.getChainId() == 1L) chain1Executions.incrementAndGet();
+ if (ctx.getChainId() == 3L) chain3Executions.incrementAndGet();
+ return NodeResult.success();
+ }
+ },
+ new NodeProvider() {
+ @Override public RuleNodeCategory getCategory() { return RuleNodeCategory.TRIGGER; }
+ @Override public String getType() { return "boom"; }
+ @Override public NodeResult execute(RuleContext ctx, String config) {
+ // 模拟链级错误(如配置解析失败)— 由链级 try-catch 隔离
+ throw new RuleChainException(ctx.getChainId(), ctx.getCurrentNodeId(),
+ "provider explode");
+ }
+ }));
+
+ DagExecutor dagExecutor = new DagExecutor(registry, new SimpleMeterRegistry());
+ engine = new DefaultRuleEngine(chainIndex, dagExecutor, new SimpleMeterRegistry());
+ }
+
+ @Test
+ void testChainLevelIsolation_failureInMiddleDoesNotBreakOthers() {
+ // chain 1: ok, chain 2: boom, chain 3: ok
+ CompiledRuleChain c1 = buildChain(1L, "ok", 100);
+ CompiledRuleChain c2 = buildChain(2L, "boom", 200);
+ CompiledRuleChain c3 = buildChain(3L, "ok", 300);
+ chainIndex.load(TENANT, List.of(c1, c2, c3));
+
+ RuleEngineResult result = engine.execute(
+ IotDeviceMessage.requestOf("thing.property.report"),
+ TENANT, 5L, 6L, 7L);
+
+ assertEquals(3, result.getMatchedChainCount());
+ assertEquals(2, result.getSuccessChainIds().size(), "chain 1 和 3 成功");
+ assertTrue(result.getSuccessChainIds().containsAll(List.of(1L, 3L)));
+ assertEquals(1, result.getFailures().size(), "chain 2 失败");
+ assertEquals(2L, result.getFailures().get(0).getChainId());
+ assertEquals(1, chain1Executions.get(), "chain 1 执行了 1 次");
+ assertEquals(1, chain3Executions.get(), "chain 3 执行了 1 次");
+ }
+
+ @Test
+ void testNoMatch_returnsEmptyResult() {
+ RuleEngineResult result = engine.execute(
+ IotDeviceMessage.requestOf("thing.property.report"),
+ TENANT, 5L, 6L, 7L);
+ assertEquals(0, result.getMatchedChainCount());
+ assertFalse(result.hasFailure());
+ assertNotNull(result.getTraceId());
+ }
+
+ private CompiledRuleChain buildChain(Long id, String triggerType, int priority) {
+ CompiledNode trigger = CompiledNode.builder()
+ .id("n" + id).name("t").category(RuleNodeCategory.TRIGGER)
+ .type(triggerType).configuration("{}").build();
+ return CompiledRuleChain.builder()
+ .id(id).tenantId(TENANT).name("chain-" + id).priority(priority).version(1L)
+ .subsystemId(5L).productId(6L).deviceId(7L)
+ .firstNode(trigger).nodesById(Map.of(trigger.getId(), trigger))
+ .outgoingByNode(Map.of(trigger.getId(), List.of()))
+ .build();
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/RuleChainCompilerTest.java b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/RuleChainCompilerTest.java
new file mode 100644
index 00000000..d74b3e16
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/RuleChainCompilerTest.java
@@ -0,0 +1,135 @@
+package com.viewsh.module.iot.rule.engine;
+
+import com.viewsh.module.iot.rule.dal.dataobject.IotRuleChainDO;
+import com.viewsh.module.iot.rule.dal.dataobject.IotRuleLinkDO;
+import com.viewsh.module.iot.rule.dal.dataobject.IotRuleNodeDO;
+import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType;
+import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory;
+import com.viewsh.module.iot.rule.engine.exception.RuleChainException;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * {@link RuleChainCompiler} 单元测试:
+ * 覆盖正常编译 / 单 Trigger 校验兜底 / DAG 无环兜底 / 非法枚举。
+ */
+class RuleChainCompilerTest {
+
+ @Test
+ void testCompile_linearChain_ok() {
+ IotRuleChainDO chain = chainDO(1L);
+ List nodes = List.of(
+ nodeDO(10L, RuleNodeCategory.TRIGGER, "device_property"),
+ nodeDO(11L, RuleNodeCategory.ACTION, "log"));
+ List links = List.of(linkDO(100L, 10L, 11L, RuleLinkRelationType.SUCCESS));
+
+ CompiledRuleChain compiled = RuleChainCompiler.compile(chain, nodes, links);
+
+ assertEquals(1L, compiled.getId());
+ assertNotNull(compiled.getFirstNode());
+ assertEquals("10", compiled.getFirstNode().getId());
+ assertEquals(1, compiled.outgoingOf("10").size());
+ assertEquals("11", compiled.outgoingOf("10").get(0).getTargetNodeId());
+ }
+
+ @Test
+ void testCompile_multiTrigger_rejected() {
+ IotRuleChainDO chain = chainDO(1L);
+ List nodes = List.of(
+ nodeDO(10L, RuleNodeCategory.TRIGGER, "device_property"),
+ nodeDO(11L, RuleNodeCategory.TRIGGER, "device_event"),
+ nodeDO(12L, RuleNodeCategory.ACTION, "log"));
+
+ RuleChainException ex = assertThrows(RuleChainException.class,
+ () -> RuleChainCompiler.compile(chain, nodes, List.of()));
+ assertTrue(ex.getMessage().contains("Trigger"));
+ }
+
+ @Test
+ void testCompile_noNodes_rejected() {
+ assertThrows(RuleChainException.class,
+ () -> RuleChainCompiler.compile(chainDO(1L), List.of(), List.of()));
+ }
+
+ @Test
+ void testCompile_cycleDetected() {
+ IotRuleChainDO chain = chainDO(1L);
+ List nodes = List.of(
+ nodeDO(10L, RuleNodeCategory.TRIGGER, "t"),
+ nodeDO(11L, RuleNodeCategory.ACTION, "a"),
+ nodeDO(12L, RuleNodeCategory.ACTION, "b"));
+ List links = List.of(
+ linkDO(100L, 10L, 11L, RuleLinkRelationType.SUCCESS),
+ linkDO(101L, 11L, 12L, RuleLinkRelationType.SUCCESS),
+ linkDO(102L, 12L, 11L, RuleLinkRelationType.SUCCESS)); // cycle
+
+ RuleChainException ex = assertThrows(RuleChainException.class,
+ () -> RuleChainCompiler.compile(chain, nodes, links));
+ assertTrue(ex.getMessage().contains("环"));
+ }
+
+ @Test
+ void testCompile_invalidCategory_rejected() {
+ IotRuleChainDO chain = chainDO(1L);
+ IotRuleNodeDO bad = new IotRuleNodeDO();
+ bad.setId(10L); bad.setCategory("unknown"); bad.setType("t"); bad.setConfiguration("{}");
+ assertThrows(RuleChainException.class,
+ () -> RuleChainCompiler.compile(chain, List.of(bad), List.of()));
+ }
+
+ @Test
+ void testCompile_invalidRelationType_rejected() {
+ IotRuleChainDO chain = chainDO(1L);
+ List nodes = List.of(
+ nodeDO(10L, RuleNodeCategory.TRIGGER, "t"),
+ nodeDO(11L, RuleNodeCategory.ACTION, "a"));
+ IotRuleLinkDO bad = new IotRuleLinkDO();
+ bad.setId(100L); bad.setSourceNodeId(10L); bad.setTargetNodeId(11L);
+ bad.setRelationType("InvalidRelation"); bad.setSortOrder(0);
+ assertThrows(RuleChainException.class,
+ () -> RuleChainCompiler.compile(chain, nodes, List.of(bad)));
+ }
+
+ @Test
+ void testCompile_linksSortedBySortOrder() {
+ IotRuleChainDO chain = chainDO(1L);
+ List nodes = List.of(
+ nodeDO(10L, RuleNodeCategory.TRIGGER, "t"),
+ nodeDO(11L, RuleNodeCategory.ACTION, "a"),
+ nodeDO(12L, RuleNodeCategory.ACTION, "b"));
+ IotRuleLinkDO l1 = linkDO(100L, 10L, 11L, RuleLinkRelationType.SUCCESS); l1.setSortOrder(2);
+ IotRuleLinkDO l2 = linkDO(101L, 10L, 12L, RuleLinkRelationType.SUCCESS); l2.setSortOrder(1);
+
+ CompiledRuleChain compiled = RuleChainCompiler.compile(chain, nodes, List.of(l1, l2));
+ List sorted = compiled.outgoingOf("10");
+ assertEquals("12", sorted.get(0).getTargetNodeId(), "sortOrder=1 应排前面");
+ assertEquals("11", sorted.get(1).getTargetNodeId());
+ }
+
+ // --- helpers ---
+
+ private static IotRuleChainDO chainDO(Long id) {
+ IotRuleChainDO d = new IotRuleChainDO();
+ d.setId(id); d.setName("chain-" + id); d.setTenantId(1L);
+ d.setPriority(100); d.setVersion(0L); d.setDebugMode(false);
+ return d;
+ }
+
+ private static IotRuleNodeDO nodeDO(Long id, RuleNodeCategory category, String type) {
+ IotRuleNodeDO d = new IotRuleNodeDO();
+ d.setId(id); d.setName("node-" + id);
+ d.setCategory(category.getValue()); d.setType(type);
+ d.setConfiguration("{}");
+ return d;
+ }
+
+ private static IotRuleLinkDO linkDO(Long id, Long src, Long tgt, RuleLinkRelationType rel) {
+ IotRuleLinkDO d = new IotRuleLinkDO();
+ d.setId(id); d.setSourceNodeId(src); d.setTargetNodeId(tgt);
+ d.setRelationType(rel.getValue()); d.setSortOrder(0);
+ return d;
+ }
+}