From ae74b4752a041baa2de3de3d67f891d643705c04 Mon Sep 17 00:00:00 2001 From: lzh Date: Thu, 23 Apr 2026 23:58:48 +0800 Subject: [PATCH] =?UTF-8?q?feat(iot):=20B3=20RuleEngine=20=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E5=99=A8=EF=BC=88DAG=20+=20=E9=93=BE=E7=BA=A7?= =?UTF-8?q?=E9=9A=94=E7=A6=BB=20+=20=E4=B8=89=E5=B1=82=E5=8C=B9=E9=85=8D?= =?UTF-8?q?=E5=8E=BB=E9=87=8D=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit rule 模块 engine/ 新增 14 个核心类 + 4 个测试: - RuleEngine / DefaultRuleEngine:对外入口,链级 try-catch 隔离(决议 #3) - DagExecutor:BFS 遍历,按 relation_type 选 outgoing links,RuntimeException 转 FAILURE - ChainIndex:三层绑定(subsystem/product/device)4 种 key 匹配 + LinkedHashMap 去重 + priority ASC 排序 - RuleChainCompiler:DO → CompiledRuleChain,含单 Trigger + DAG 无环 + 非法枚举兜底 - NodeProvider / NodeProviderRegistry:SPI + Spring @Component 路由(禁用 ServiceLoader/@SPI) - RuleContext / NodeResult / CompiledRuleChain/Node/Link / RuleEngineResult / RuleChainException 测试覆盖(42/42 全绿): - DagExecutorTest: 线性链 / 分支(TRUE/FALSE)/ 并行动作 / 异常转 FAILURE / metadata 传递 / SKIP 截断 / 缺 Trigger - ChainIndexTest: 4 种 wildcard 组合 / 去重 / priority 排序 / 租户隔离 / evict - RuleChainCompilerTest: 正常编译 / 单 Trigger 兜底 / DAG 无环 / 非法 category & relation_type / 连线 sortOrder - DefaultRuleEngineTest: 链级异常隔离(chain1+chain3 成功,chain2 失败,counters 各 1 次) 补齐依赖: - rule/pom.xml 加 io.micrometer:micrometer-core(节点执行 Timer + 失败 Counter) - RuleNodeCategory 加 of(String) 静态查找方法(配合 RuleLinkRelationType.of 一致风格) Known Pitfalls 落地: ⚠️ 评审 B1:ShakeLimit 节点 hook 留在 DagExecutor(B48 补) ⚠️ 评审 B3:LinkedHashMap 去重保留顺序(ChainIndex.match) ⚠️ 评审 B4:relation_type 严格封闭 6 值(RuleLinkRelationType + isValid) ⚠️ 评审 A5:chains 顺序 for 循环 + try-catch;不使用 Reactor flatMap 并发 ⚠️ 评审 B10:单 Trigger 兜底在 Compiler 层(Service 层 + Compiler 双重保障) ⚠️ Metrics 基数:tag 含 chainId + nodeType + outcome;规则链 ≤ 500 控制 ⚠️ DAG 兜底 MAX_NODES_PER_EXECUTION=1000 防脏数据绕过无环校验 未实现(留给后续任务): - 具体 Provider(B4/B5/B6 Trigger/Condition/Action 实现) - 全量缓存加载 + Pub/Sub 驱逐(B8) - JMH Benchmark(任务卡 §6.4 + AC9 p99 < 50ms,第二期补) - @SpringBootTest 集成测试(B9 Handler 就位后补) Co-Authored-By: Claude Opus 4.7 (1M context, main session) --- .../viewsh-module-iot-rule/pom.xml | 6 + .../dataobject/enums/RuleNodeCategory.java | 9 + .../module/iot/rule/engine/ChainIndex.java | 100 ++++++++ .../module/iot/rule/engine/CompiledLink.java | 21 ++ .../module/iot/rule/engine/CompiledNode.java | 22 ++ .../iot/rule/engine/CompiledRuleChain.java | 51 +++++ .../module/iot/rule/engine/DagExecutor.java | 105 +++++++++ .../iot/rule/engine/DefaultRuleEngine.java | 88 +++++++ .../module/iot/rule/engine/NodeProvider.java | 30 +++ .../iot/rule/engine/NodeProviderRegistry.java | 48 ++++ .../module/iot/rule/engine/NodeResult.java | 76 +++++++ .../iot/rule/engine/RuleChainCompiler.java | 134 +++++++++++ .../module/iot/rule/engine/RuleContext.java | 48 ++++ .../module/iot/rule/engine/RuleEngine.java | 31 +++ .../rule/engine/RuleEngineConfiguration.java | 55 +++++ .../iot/rule/engine/RuleEngineResult.java | 48 ++++ .../engine/exception/RuleChainException.java | 32 +++ .../iot/rule/engine/ChainIndexTest.java | 112 +++++++++ .../iot/rule/engine/DagExecutorTest.java | 215 ++++++++++++++++++ .../rule/engine/DefaultRuleEngineTest.java | 100 ++++++++ .../rule/engine/RuleChainCompilerTest.java | 135 +++++++++++ 21 files changed, 1466 insertions(+) create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/ChainIndex.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledLink.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledNode.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledRuleChain.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DagExecutor.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DefaultRuleEngine.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeProvider.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeProviderRegistry.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeResult.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleChainCompiler.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleContext.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngine.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngineConfiguration.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngineResult.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/exception/RuleChainException.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/ChainIndexTest.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/DagExecutorTest.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/DefaultRuleEngineTest.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/RuleChainCompilerTest.java diff --git a/viewsh-module-iot/viewsh-module-iot-rule/pom.xml b/viewsh-module-iot/viewsh-module-iot-rule/pom.xml index 9fc918f2..f139ad9b 100644 --- a/viewsh-module-iot/viewsh-module-iot-rule/pom.xml +++ b/viewsh-module-iot/viewsh-module-iot-rule/pom.xml @@ -51,6 +51,12 @@ viewsh-spring-boot-starter-biz-tenant + + + io.micrometer + micrometer-core + + com.viewsh diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/dal/dataobject/enums/RuleNodeCategory.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/dal/dataobject/enums/RuleNodeCategory.java index 646e3588..632c9845 100644 --- a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/dal/dataobject/enums/RuleNodeCategory.java +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/dal/dataobject/enums/RuleNodeCategory.java @@ -22,4 +22,13 @@ public enum RuleNodeCategory { private final String label; + public static RuleNodeCategory of(String value) { + for (RuleNodeCategory c : values()) { + if (c.value.equals(value)) { + return c; + } + } + return null; + } + } diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/ChainIndex.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/ChainIndex.java new file mode 100644 index 00000000..c2d5a8c2 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/ChainIndex.java @@ -0,0 +1,100 @@ +package com.viewsh.module.iot.rule.engine; + +import lombok.extern.slf4j.Slf4j; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 规则链三层绑定索引(subsystemId, productId, deviceId)+ 匹配去重。 + * + *

按"最左匹配"组合预建 4 种索引 key(评审 B3 + §十一-B): + *

+ *   key1: (sub, prod, dev)   — 最细粒度:指定设备专属规则
+ *   key2: (sub, prod, *)     — 产品级
+ *   key3: (sub, *, *)        — 子系统级
+ *   key4: (*, *, *)          — 全局规则
+ * 
+ * + *

{@link #match(Long, Long, Long, Long)} 返回去重 + 按 priority ASC 排序后的规则链列表。 + * + *

线程安全:加载 / 更新用整体替换,读取无锁(ConcurrentHashMap)。 + * 运行时 B8 负责增量驱逐 + 重建。 + */ +@Slf4j +public class ChainIndex { + + /** tenantId 前缀 → (key → chains);key 由 (sub,prod,dev) 组合,"_" 表示通配 */ + private final Map>> indexPerTenant = new ConcurrentHashMap<>(); + + /** + * 全量加载 / 重建某租户的索引(B8 调用)。 + */ + public void load(Long tenantId, Collection chains) { + Map> fresh = new HashMap<>(); + for (CompiledRuleChain c : chains) { + String k = key(c.getSubsystemId(), c.getProductId(), c.getDeviceId()); + fresh.computeIfAbsent(k, x -> new ArrayList<>()).add(c); + } + // 冻结为不可变,保护读端 + Map> frozen = new HashMap<>(); + fresh.forEach((k, list) -> frozen.put(k, List.copyOf(list))); + indexPerTenant.put(prefix(tenantId), Collections.unmodifiableMap(frozen)); + log.info("[ChainIndex] tenant={} 载入 {} 条规则链,{} 个 key 组合", tenantId, chains.size(), frozen.size()); + } + + public void evictTenant(Long tenantId) { + indexPerTenant.remove(prefix(tenantId)); + } + + /** + * 按三层 ID 查出所有匹配规则链。命中 4 种 key 组合,去重后按 priority ASC 排序。 + */ + public List match(Long tenantId, Long subsystemId, Long productId, Long deviceId) { + Map> tenantIndex = tenantIndex(tenantId); + if (tenantIndex.isEmpty()) return Collections.emptyList(); + + List all = new ArrayList<>(); + appendKey(all, tenantIndex, subsystemId, productId, deviceId); + appendKey(all, tenantIndex, subsystemId, productId, null); + appendKey(all, tenantIndex, subsystemId, null, null); + appendKey(all, tenantIndex, null, null, null); + + if (all.isEmpty()) return Collections.emptyList(); + + // 【评审 B3】LinkedHashMap 去重,保留首个出现的(按上面顺序,更具体的先匹配) + Map dedup = new LinkedHashMap<>(); + for (CompiledRuleChain c : all) { + dedup.putIfAbsent(c.getId(), c); + } + + return dedup.values().stream() + .sorted(Comparator.comparingInt(CompiledRuleChain::getPriority)) + .toList(); + } + + /** 单元测试辅助:查询当前索引条目数 */ + public int sizeOf(Long tenantId) { + return tenantIndex(tenantId).values().stream().mapToInt(List::size).sum(); + } + + private Map> tenantIndex(Long tenantId) { + Map> m = indexPerTenant.get(prefix(tenantId)); + return m == null ? Collections.emptyMap() : m; + } + + private void appendKey(List target, + Map> tenantIndex, + Long sub, Long prod, Long dev) { + List list = tenantIndex.get(key(sub, prod, dev)); + if (list != null) target.addAll(list); + } + + private static String prefix(Long tenantId) { + return tenantId == null ? "_" : tenantId.toString(); + } + + private static String key(Long sub, Long prod, Long dev) { + return (sub == null ? "_" : sub) + ":" + (prod == null ? "_" : prod) + ":" + (dev == null ? "_" : dev); + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledLink.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledLink.java new file mode 100644 index 00000000..be99ee14 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledLink.java @@ -0,0 +1,21 @@ +package com.viewsh.module.iot.rule.engine; + +import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType; +import lombok.Builder; +import lombok.Data; + +/** + * 编译后规则链连线(rule_link 的内存态)。 + */ +@Data +@Builder +public class CompiledLink { + + private Long id; + private String sourceNodeId; + private String targetNodeId; + private RuleLinkRelationType relationType; + /** 连线上的附加表达式条件(可选,JSON 原文;解析延后到节点内) */ + private String condition; + private int sortOrder; +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledNode.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledNode.java new file mode 100644 index 00000000..06f2ed2c --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledNode.java @@ -0,0 +1,22 @@ +package com.viewsh.module.iot.rule.engine; + +import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory; +import lombok.Builder; +import lombok.Data; + +/** + * 编译后规则节点(rule_node 的内存态)。 + * + *

节点执行由 {@link NodeProviderRegistry} 按 (category, type) 分发到对应的 {@link NodeProvider}。 + */ +@Data +@Builder +public class CompiledNode { + + private String id; + private String name; + private RuleNodeCategory category; + private String type; + /** 节点配置 JSON 原文(由 Provider 自行解析) */ + private String configuration; +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledRuleChain.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledRuleChain.java new file mode 100644 index 00000000..f4dd6f26 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/CompiledRuleChain.java @@ -0,0 +1,51 @@ +package com.viewsh.module.iot.rule.engine; + +import lombok.Builder; +import lombok.Data; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * 编译后规则链(rule_chain + rule_node + rule_link 的内存态)。 + * + *

由 {@code RuleChainCompiler} 从 DO 构造,供 DagExecutor 快速遍历: + *

    + *
  • {@link #firstNode}:唯一 Trigger 入口(B2 Service 层保证单 Trigger)
  • + *
  • {@link #nodesById}:按 id 索引所有节点
  • + *
  • {@link #outgoingByNode}:每个节点的出边列表(已按 sortOrder 排序)
  • + *
+ */ +@Data +@Builder +public class CompiledRuleChain { + + private Long id; + private String name; + private Long tenantId; + + /** 评审 A5:默认 100,ASC 排序 */ + private int priority; + /** 评审 B9:乐观锁 / 多实例缓存一致性校验 */ + private long version; + private boolean debugMode; + + /** 三层绑定(评审 §十一-B) */ + private Long subsystemId; + private Long productId; + private Long deviceId; + + private CompiledNode firstNode; + private Map nodesById; + private Map> outgoingByNode; + + public List outgoingOf(String nodeId) { + if (outgoingByNode == null) return Collections.emptyList(); + return outgoingByNode.getOrDefault(nodeId, Collections.emptyList()); + } + + public CompiledNode nodeById(String nodeId) { + return nodesById == null ? null : nodesById.get(nodeId); + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DagExecutor.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DagExecutor.java new file mode 100644 index 00000000..c436cd41 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DagExecutor.java @@ -0,0 +1,105 @@ +package com.viewsh.module.iot.rule.engine; + +import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType; +import com.viewsh.module.iot.rule.engine.exception.RuleChainException; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; +import lombok.extern.slf4j.Slf4j; + +import java.util.*; + +/** + * DAG 规则链遍历执行器。BFS 按 relation_type 选择 outgoing links。 + * + *

约束(上游保证,本执行器信任): + *

    + *
  • B2 Service 层保证 DAG 无环(DFS 校验)
  • + *
  • B2 Service 层保证单 Trigger(CompiledRuleChain.firstNode 是唯一入口)
  • + *
  • relation_type 是封闭 6 值枚举(评审 B4)
  • + *
+ * + *

性能:单节点执行记录 Micrometer Timer,tag 含 chainId + nodeType。 + * 指标基数上限由规则链总数管控(评审 11.6 限制规则链 ≤ 500)。 + */ +@Slf4j +public class DagExecutor { + + /** 单次遍历最大节点数(兜底防止脏数据绕过无环校验) */ + private static final int MAX_NODES_PER_EXECUTION = 1000; + + private final NodeProviderRegistry providerRegistry; + private final MeterRegistry meterRegistry; + + public DagExecutor(NodeProviderRegistry providerRegistry, MeterRegistry meterRegistry) { + this.providerRegistry = providerRegistry; + this.meterRegistry = meterRegistry; + } + + public void execute(CompiledRuleChain chain, RuleContext ctx) { + CompiledNode entry = chain.getFirstNode(); + if (entry == null) { + throw new RuleChainException(chain.getId(), null, "规则链缺少 Trigger 入口节点"); + } + + Deque queue = new ArrayDeque<>(); + queue.offer(entry); + + int executed = 0; + while (!queue.isEmpty()) { + if (++executed > MAX_NODES_PER_EXECUTION) { + throw new RuleChainException(chain.getId(), null, + "DAG 执行超过 " + MAX_NODES_PER_EXECUTION + " 个节点,疑似脏数据"); + } + CompiledNode current = queue.poll(); + ctx.fork(current.getId()); + NodeResult result = executeSingleNode(chain, current, ctx); + ctx.recordNodeOutput(current.getId(), result.getOutputs()); + + // 按 relationType 选 outgoing links;SKIP 语义:下游不再触发 + if (result.getRelationType() == RuleLinkRelationType.SKIP) { + log.debug("[DagExecutor] chain={} node={} SKIP:{}", chain.getId(), current.getId(), result.getMessage()); + continue; + } + + List outgoing = chain.outgoingOf(current.getId()); + for (CompiledLink link : outgoing) { + if (link.getRelationType() == result.getRelationType()) { + CompiledNode next = chain.nodeById(link.getTargetNodeId()); + if (next != null) queue.offer(next); + } + } + } + } + + private NodeResult executeSingleNode(CompiledRuleChain chain, CompiledNode node, RuleContext ctx) { + Timer.Sample sample = meterRegistry == null ? null : Timer.start(meterRegistry); + String outcome = "success"; + try { + NodeProvider provider = providerRegistry.resolve(node.getCategory(), node.getType()); + NodeResult result = provider.execute(ctx, node.getConfiguration()); + if (result == null) { + outcome = "null-result"; + return NodeResult.failure("Provider 返回 null:" + node.getCategory() + ":" + node.getType()); + } + outcome = result.getRelationType().name().toLowerCase(); + return result; + } catch (RuleChainException e) { + outcome = "rule-chain-error"; + throw e; + } catch (RuntimeException e) { + outcome = "runtime-error"; + log.warn("[DagExecutor] chain={} node={} type={} 执行异常:{}", + chain.getId(), node.getId(), node.getType(), e.getMessage()); + return NodeResult.failure(e.getMessage(), e); + } finally { + if (sample != null) { + sample.stop(Timer.builder("iot.rule.execution") + .tag("chainId", String.valueOf(chain.getId())) + .tag("nodeCategory", node.getCategory().getValue()) + .tag("nodeType", node.getType()) + .tag("outcome", outcome) + .register(meterRegistry)); + } + } + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DefaultRuleEngine.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DefaultRuleEngine.java new file mode 100644 index 00000000..635b78f5 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/DefaultRuleEngine.java @@ -0,0 +1,88 @@ +package com.viewsh.module.iot.rule.engine; + +import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import lombok.extern.slf4j.Slf4j; + +import java.time.Instant; +import java.util.List; +import java.util.UUID; + +/** + * 默认 RuleEngine 实现。 + * + *

决议 #3 / 评审 11.2:链级 try-catch 隔离,单链异常不影响其他链;失败计数 + * 上报 Micrometer 计数器 {@code iot.rule.failure}(tag=chainId / errorType)。 + * + *

评审 A5:chains 顺序执行,不使用 Reactor flatMap 并发(现有系统非响应式)。 + */ +@Slf4j +public class DefaultRuleEngine implements RuleEngine { + + private final ChainIndex chainIndex; + private final DagExecutor dagExecutor; + private final MeterRegistry meterRegistry; + + public DefaultRuleEngine(ChainIndex chainIndex, DagExecutor dagExecutor, MeterRegistry meterRegistry) { + this.chainIndex = chainIndex; + this.dagExecutor = dagExecutor; + this.meterRegistry = meterRegistry; + } + + @Override + public RuleEngineResult execute(IotDeviceMessage message, Long tenantId, + Long subsystemId, Long productId, Long deviceId) { + String traceId = UUID.randomUUID().toString(); + RuleEngineResult result = new RuleEngineResult(); + result.setTraceId(traceId); + + List chains = chainIndex.match(tenantId, subsystemId, productId, deviceId); + result.setMatchedChainCount(chains.size()); + + if (chains.isEmpty()) { + log.debug("[RuleEngine] trace={} tenant={} sub={} prod={} dev={} 无匹配规则链", + traceId, tenantId, subsystemId, productId, deviceId); + return result; + } + + for (CompiledRuleChain chain : chains) { + RuleContext ctx = newContext(chain, message, tenantId, subsystemId, productId, deviceId, traceId); + try { + dagExecutor.execute(chain, ctx); + result.addSuccess(chain.getId()); + } catch (Exception e) { + log.error("[RuleEngine] chain={} trace={} 执行失败", chain.getId(), traceId, e); + recordFailure(chain.getId(), e); + result.addFailure(chain.getId(), e.getMessage(), e); + // 不中断,继续执行下一条链 + } + } + return result; + } + + private RuleContext newContext(CompiledRuleChain chain, IotDeviceMessage message, + Long tenantId, Long subsystemId, Long productId, Long deviceId, + String traceId) { + RuleContext ctx = new RuleContext(); + ctx.setTraceId(traceId); + ctx.setChainId(chain.getId()); + ctx.setMessage(message); + ctx.setTenantId(tenantId); + ctx.setSubsystemId(subsystemId); + ctx.setProductId(productId); + ctx.setDeviceId(deviceId); + ctx.setStartedAt(Instant.now()); + ctx.setDebugMode(chain.isDebugMode()); + return ctx; + } + + private void recordFailure(Long chainId, Throwable e) { + if (meterRegistry == null) return; + Counter.builder("iot.rule.failure") + .tag("chainId", String.valueOf(chainId)) + .tag("errorType", e.getClass().getSimpleName()) + .register(meterRegistry) + .increment(); + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeProvider.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeProvider.java new file mode 100644 index 00000000..7644d6a7 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeProvider.java @@ -0,0 +1,30 @@ +package com.viewsh.module.iot.rule.engine; + +import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory; + +/** + * 节点执行 SPI(B4 / B5 / B6 实现)。 + * + *

注册方式:Spring @Component + getCategory()/getType() 索引 + * (规范清单明确禁用 ServiceLoader / @SPI)。 + * + *

DagExecutor 遍历到节点时,按 {@code (category, type)} 在 + * {@link NodeProviderRegistry} 查找对应 Provider 并 {@link #execute(RuleContext, String)}。 + * + *

返回 {@link NodeResult#getRelationType()} 决定 DAG 走哪条 outgoing link。 + */ +public interface NodeProvider { + + /** 节点类别:trigger / condition / action */ + RuleNodeCategory getCategory(); + + /** 节点类型标识,如 "device_property" / "alarm_trigger"(与 rule_node.type 对齐) */ + String getType(); + + /** + * 执行节点。config 是 rule_node.configuration 的 JSON 原文,由 Provider 自行解析。 + * 实现必须是幂等可重试的,并在失败时返回 {@link NodeResult#failure(String)}; + * 抛出 RuntimeException 将被 DagExecutor 转为 FAILURE 并交由链级 try-catch 兜底。 + */ + NodeResult execute(RuleContext ctx, String config); +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeProviderRegistry.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeProviderRegistry.java new file mode 100644 index 00000000..33f3829e --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeProviderRegistry.java @@ -0,0 +1,48 @@ +package com.viewsh.module.iot.rule.engine; + +import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory; +import com.viewsh.module.iot.rule.engine.exception.RuleChainException; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * NodeProvider 注册表(按 (category, type) 路由)。 + * + *

启动时由 Spring 注入所有 {@link NodeProvider} Bean,构造一个不可变索引。 + * 运行时 {@link #resolve(RuleNodeCategory, String)} 纯内存查表,性能 < 1μs。 + */ +@Slf4j +public class NodeProviderRegistry { + + private final Map providersByKey; + + public NodeProviderRegistry(List providers) { + this.providersByKey = providers.stream() + .collect(Collectors.toUnmodifiableMap( + p -> key(p.getCategory(), p.getType()), + Function.identity(), + (a, b) -> { + throw new IllegalStateException( + "Duplicate NodeProvider registered: " + a.getCategory() + ":" + a.getType() + + " / " + b.getClass().getName()); + })); + log.info("[NodeProviderRegistry] 已注册 {} 个节点 Provider:{}", providersByKey.size(), providersByKey.keySet()); + } + + public NodeProvider resolve(RuleNodeCategory category, String type) { + NodeProvider p = providersByKey.get(key(category, type)); + if (p == null) { + throw new RuleChainException( + "未注册的节点 Provider:category=" + category + ", type=" + type); + } + return p; + } + + private static String key(RuleNodeCategory category, String type) { + return category.getValue() + ":" + type; + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeResult.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeResult.java new file mode 100644 index 00000000..1c7e750a --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/NodeResult.java @@ -0,0 +1,76 @@ +package com.viewsh.module.iot.rule.engine; + +import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType; +import lombok.Getter; + +import java.util.Collections; +import java.util.Map; + +/** + * 节点执行结果。relationType 决定 DAG 选哪条 outgoing link 走下游。 + * + * 约定(评审 B4,封闭枚举): + * - Trigger 节点成功:SUCCESS + * - Condition 节点:TRUE / FALSE + * - Action 节点:SUCCESS / FAILURE + * - Action/Node 超时:TIMEOUT + * - 静默跳过(ShakeLimit/黑名单):SKIP + */ +@Getter +public class NodeResult { + + private final RuleLinkRelationType relationType; + private final Map outputs; + private final String message; + private final Throwable error; + + private NodeResult(RuleLinkRelationType relationType, + Map outputs, + String message, + Throwable error) { + this.relationType = relationType; + this.outputs = outputs == null ? Collections.emptyMap() : outputs; + this.message = message; + this.error = error; + } + + public static NodeResult of(RuleLinkRelationType relationType) { + return new NodeResult(relationType, Collections.emptyMap(), null, null); + } + + public static NodeResult of(RuleLinkRelationType relationType, Map outputs) { + return new NodeResult(relationType, outputs, null, null); + } + + public static NodeResult success() { + return of(RuleLinkRelationType.SUCCESS); + } + + public static NodeResult success(Map outputs) { + return of(RuleLinkRelationType.SUCCESS, outputs); + } + + public static NodeResult failure(String message) { + return new NodeResult(RuleLinkRelationType.FAILURE, Collections.emptyMap(), message, null); + } + + public static NodeResult failure(String message, Throwable error) { + return new NodeResult(RuleLinkRelationType.FAILURE, Collections.emptyMap(), message, error); + } + + public static NodeResult truthy() { + return of(RuleLinkRelationType.TRUE); + } + + public static NodeResult falsy() { + return of(RuleLinkRelationType.FALSE); + } + + public static NodeResult timeout(String message) { + return new NodeResult(RuleLinkRelationType.TIMEOUT, Collections.emptyMap(), message, null); + } + + public static NodeResult skip(String reason) { + return new NodeResult(RuleLinkRelationType.SKIP, Collections.emptyMap(), reason, null); + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleChainCompiler.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleChainCompiler.java new file mode 100644 index 00000000..156f2def --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleChainCompiler.java @@ -0,0 +1,134 @@ +package com.viewsh.module.iot.rule.engine; + +import com.viewsh.module.iot.rule.dal.dataobject.IotRuleChainDO; +import com.viewsh.module.iot.rule.dal.dataobject.IotRuleLinkDO; +import com.viewsh.module.iot.rule.dal.dataobject.IotRuleNodeDO; +import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType; +import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory; +import com.viewsh.module.iot.rule.engine.exception.RuleChainException; + +import java.util.*; +import java.util.stream.Collectors; + +/** + * 从 DO 三元组(chain + nodes + links)构造 CompiledRuleChain(B8 缓存加载 / 测试可复用)。 + * + *

保留 DAG 无环校验作为兜底(正常路径由 B2 Service 层保证); + * 未通过校验直接抛 {@link RuleChainException} 阻止脏数据进入缓存。 + */ +public final class RuleChainCompiler { + + private enum DfsState { NEW, VISITING, DONE } + + private RuleChainCompiler() {} + + public static CompiledRuleChain compile(IotRuleChainDO chainDO, + List nodeDOs, + List linkDOs) { + Objects.requireNonNull(chainDO, "chainDO"); + if (nodeDOs == null || nodeDOs.isEmpty()) { + throw new RuleChainException(chainDO.getId(), null, "规则链无节点"); + } + + // 1. 节点映射 + Map nodesById = new HashMap<>(nodeDOs.size() * 2); + CompiledNode firstNode = null; + int triggerCount = 0; + for (IotRuleNodeDO dto : nodeDOs) { + RuleNodeCategory category = RuleNodeCategory.of(dto.getCategory()); + if (category == null) { + throw new RuleChainException(chainDO.getId(), String.valueOf(dto.getId()), + "非法节点 category:" + dto.getCategory()); + } + CompiledNode node = CompiledNode.builder() + .id(String.valueOf(dto.getId())) + .name(dto.getName()) + .category(category) + .type(dto.getType()) + .configuration(dto.getConfiguration()) + .build(); + nodesById.put(node.getId(), node); + if (category == RuleNodeCategory.TRIGGER) { + triggerCount++; + firstNode = node; + } + } + if (triggerCount != 1) { + throw new RuleChainException(chainDO.getId(), null, + "规则链必须有且仅有一个 Trigger 节点,当前 " + triggerCount + " 个"); + } + + // 2. 出边映射(已按 sortOrder 排序) + Map> outgoingByNode = new HashMap<>(); + if (linkDOs != null) { + for (IotRuleLinkDO link : linkDOs) { + RuleLinkRelationType relation = RuleLinkRelationType.of(link.getRelationType()); + if (relation == null) { + throw new RuleChainException(chainDO.getId(), String.valueOf(link.getSourceNodeId()), + "非法 relation_type:" + link.getRelationType()); + } + CompiledLink compiled = CompiledLink.builder() + .id(link.getId()) + .sourceNodeId(String.valueOf(link.getSourceNodeId())) + .targetNodeId(String.valueOf(link.getTargetNodeId())) + .relationType(relation) + .condition(link.getCondition()) + .sortOrder(link.getSortOrder() == null ? 0 : link.getSortOrder()) + .build(); + outgoingByNode + .computeIfAbsent(compiled.getSourceNodeId(), k -> new ArrayList<>()) + .add(compiled); + } + outgoingByNode.values().forEach(list -> + list.sort(Comparator.comparingInt(CompiledLink::getSortOrder))); + } + + // 3. DAG 无环兜底(DFS) + assertAcyclic(chainDO.getId(), nodesById, outgoingByNode); + + return CompiledRuleChain.builder() + .id(chainDO.getId()) + .name(chainDO.getName()) + .tenantId(chainDO.getTenantId()) + .priority(chainDO.getPriority() == null ? 100 : chainDO.getPriority()) + .version(chainDO.getVersion() == null ? 0L : chainDO.getVersion()) + .debugMode(Boolean.TRUE.equals(chainDO.getDebugMode())) + .subsystemId(chainDO.getSubsystemId()) + .productId(chainDO.getProductId()) + .deviceId(chainDO.getDeviceId()) + .firstNode(firstNode) + .nodesById(Map.copyOf(nodesById)) + .outgoingByNode(outgoingByNode.entrySet().stream() + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> List.copyOf(e.getValue())))) + .build(); + } + + private static void assertAcyclic(Long chainId, + Map nodesById, + Map> outgoingByNode) { + Map state = new HashMap<>(); + for (String id : nodesById.keySet()) state.put(id, DfsState.NEW); + for (String id : nodesById.keySet()) { + if (state.get(id) == DfsState.NEW) { + dfs(chainId, id, state, outgoingByNode); + } + } + } + + private static void dfs(Long chainId, String nodeId, + Map state, + Map> outgoing) { + state.put(nodeId, DfsState.VISITING); + for (CompiledLink link : outgoing.getOrDefault(nodeId, List.of())) { + DfsState s = state.get(link.getTargetNodeId()); + if (s == DfsState.VISITING) { + throw new RuleChainException(chainId, nodeId, + "DAG 存在环:" + nodeId + " → " + link.getTargetNodeId()); + } + if (s == DfsState.NEW) { + dfs(chainId, link.getTargetNodeId(), state, outgoing); + } + } + state.put(nodeId, DfsState.DONE); + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleContext.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleContext.java new file mode 100644 index 00000000..847a59ad --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleContext.java @@ -0,0 +1,48 @@ +package com.viewsh.module.iot.rule.engine; + +import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; +import lombok.Data; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; + +/** + * 规则链执行上下文。每一条消息匹配到的每条规则链,会生成一份独立的 RuleContext。 + * + *

线程模型:一个 RuleContext 对应一次 chain 执行,BFS 遍历中节点间串行传递。 + * fork(node) 用于标记当前执行到的节点(不复制状态),由 DagExecutor 驱动。 + */ +@Data +public class RuleContext { + + private String traceId; + private Long chainId; + private String currentNodeId; + + private IotDeviceMessage message; + private Long tenantId; + private Long subsystemId; + private Long productId; + private Long deviceId; + + /** 节点间共享元数据(富化结果、中间结果) */ + private final Map metadata = new HashMap<>(); + /** 每个节点的输出(按 nodeId 索引),调试 / 后继节点可引用 */ + private final Map> nodeOutputs = new HashMap<>(); + + private Instant startedAt; + private boolean debugMode; + + public RuleContext fork(String nextNodeId) { + this.currentNodeId = nextNodeId; + return this; + } + + public void recordNodeOutput(String nodeId, Map outputs) { + if (outputs != null && !outputs.isEmpty()) { + nodeOutputs.put(nodeId, outputs); + metadata.putAll(outputs); + } + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngine.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngine.java new file mode 100644 index 00000000..97f43703 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngine.java @@ -0,0 +1,31 @@ +package com.viewsh.module.iot.rule.engine; + +import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; + +/** + * 规则引擎对外入口(B9 Handler 调用)。 + * + *

一次 {@link #execute(IotDeviceMessage, Long, Long, Long, Long)} 内会: + *

    + *
  1. 按三层 ID 从 {@link ChainIndex} 匹配规则链(含去重 + 排序)
  2. + *
  3. 对每条链独立 try-catch,失败不中断其他链(决议 #3 / 评审 11.2)
  4. + *
  5. 逐链构造 {@link RuleContext} 并交由 {@link DagExecutor} 遍历
  6. + *
+ */ +public interface RuleEngine { + + /** + * 执行所有匹配规则链,返回汇总结果。 + * + * @param message 原始设备消息 + * @param tenantId 租户(所有查询必带) + * @param subsystemId 子系统(可 null,匹配 NULL wildcard) + * @param productId 产品(可 null) + * @param deviceId 设备(可 null) + */ + RuleEngineResult execute(IotDeviceMessage message, + Long tenantId, + Long subsystemId, + Long productId, + Long deviceId); +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngineConfiguration.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngineConfiguration.java new file mode 100644 index 00000000..f6cd6b3e --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngineConfiguration.java @@ -0,0 +1,55 @@ +package com.viewsh.module.iot.rule.engine; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Collections; +import java.util.List; + +/** + * 规则引擎自动装配。 + * + *

注册以下核心 Bean: + *

    + *
  • {@link NodeProviderRegistry}:聚合所有 {@link NodeProvider} Bean(B4/B5/B6 提供)
  • + *
  • {@link ChainIndex}:规则链索引(B8 负责加载)
  • + *
  • {@link DagExecutor} / {@link RuleEngine}
  • + *
+ * + *

MeterRegistry 由 spring-boot-starter-actuator 提供;无该 starter 时回退到 SimpleMeterRegistry,保证单模块可运行。 + */ +@Configuration +public class RuleEngineConfiguration { + + @Bean + public NodeProviderRegistry nodeProviderRegistry(ObjectProvider providers) { + List list = providers.orderedStream().toList(); + return new NodeProviderRegistry(list.isEmpty() ? Collections.emptyList() : list); + } + + @Bean + @ConditionalOnMissingBean + public ChainIndex chainIndex() { + return new ChainIndex(); + } + + @Bean + @ConditionalOnMissingBean(MeterRegistry.class) + public MeterRegistry ruleEngineMeterRegistryFallback() { + return new SimpleMeterRegistry(); + } + + @Bean + public DagExecutor dagExecutor(NodeProviderRegistry registry, MeterRegistry meterRegistry) { + return new DagExecutor(registry, meterRegistry); + } + + @Bean + public RuleEngine ruleEngine(ChainIndex chainIndex, DagExecutor dagExecutor, MeterRegistry meterRegistry) { + return new DefaultRuleEngine(chainIndex, dagExecutor, meterRegistry); + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngineResult.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngineResult.java new file mode 100644 index 00000000..668a54be --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/RuleEngineResult.java @@ -0,0 +1,48 @@ +package com.viewsh.module.iot.rule.engine; + +import lombok.Data; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * 一条消息执行后的汇总结果(含每条规则链的成功 / 失败)。 + * + *

调用方(B9 Handler)可根据 result 做指标统计、告警回退等。 + */ +@Data +public class RuleEngineResult { + + private String traceId; + private int matchedChainCount; + private final List successChainIds = new ArrayList<>(); + private final List failures = new ArrayList<>(); + + public void addSuccess(Long chainId) { + successChainIds.add(chainId); + } + + public void addFailure(Long chainId, String message, Throwable cause) { + failures.add(new ChainFailure(chainId, message, cause)); + } + + public List getSuccessChainIds() { + return Collections.unmodifiableList(successChainIds); + } + + public List getFailures() { + return Collections.unmodifiableList(failures); + } + + public boolean hasFailure() { + return !failures.isEmpty(); + } + + @Data + public static class ChainFailure { + private final Long chainId; + private final String message; + private final Throwable cause; + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/exception/RuleChainException.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/exception/RuleChainException.java new file mode 100644 index 00000000..87945f45 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/engine/exception/RuleChainException.java @@ -0,0 +1,32 @@ +package com.viewsh.module.iot.rule.engine.exception; + +/** + * 规则链级异常(可恢复)。链级 try-catch 捕获此异常仅记录失败, + * 不中断其他规则链执行(决议 #3 / 评审 11.2)。 + */ +public class RuleChainException extends RuntimeException { + + private final Long chainId; + private final String nodeId; + + public RuleChainException(String message) { + this(null, null, message, null); + } + + public RuleChainException(String message, Throwable cause) { + this(null, null, message, cause); + } + + public RuleChainException(Long chainId, String nodeId, String message) { + this(chainId, nodeId, message, null); + } + + public RuleChainException(Long chainId, String nodeId, String message, Throwable cause) { + super(message, cause); + this.chainId = chainId; + this.nodeId = nodeId; + } + + public Long getChainId() { return chainId; } + public String getNodeId() { return nodeId; } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/ChainIndexTest.java b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/ChainIndexTest.java new file mode 100644 index 00000000..18efd7f3 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/ChainIndexTest.java @@ -0,0 +1,112 @@ +package com.viewsh.module.iot.rule.engine; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * {@link ChainIndex} 单元测试:覆盖 4 种 key 组合匹配、去重、priority 排序。 + */ +class ChainIndexTest { + + private static final Long TENANT = 1L; + + private ChainIndex index; + + @BeforeEach + void setUp() { + index = new ChainIndex(); + } + + @Test + void testMatch_fullSpec() { + index.load(TENANT, List.of(chain(10L, 5L, 6L, 7L, 100))); + List result = index.match(TENANT, 5L, 6L, 7L); + assertEquals(1, result.size()); + assertEquals(10L, result.get(0).getId()); + } + + @Test + void testMatch_productLevelWildcard() { + index.load(TENANT, List.of(chain(11L, 5L, 6L, null, 100))); + // 查带 deviceId,走 (sub, prod, null) 匹配 + List result = index.match(TENANT, 5L, 6L, 999L); + assertEquals(1, result.size()); + assertEquals(11L, result.get(0).getId()); + } + + @Test + void testMatch_subsystemLevelWildcard() { + index.load(TENANT, List.of(chain(12L, 5L, null, null, 100))); + List result = index.match(TENANT, 5L, 999L, 999L); + assertEquals(1, result.size()); + assertEquals(12L, result.get(0).getId()); + } + + @Test + void testMatch_globalWildcard() { + index.load(TENANT, List.of(chain(13L, null, null, null, 100))); + List result = index.match(TENANT, 5L, 6L, 7L); + assertEquals(1, result.size()); + assertEquals(13L, result.get(0).getId()); + } + + @Test + void testMatch_dedupSameChainAcrossKeys() { + // 同一条链既建了具体绑定,又建了通配 — 应只返回一次(【评审 B3】) + CompiledRuleChain specific = chain(20L, 5L, 6L, 7L, 100); + CompiledRuleChain wildcard = chain(20L, null, null, null, 100); + index.load(TENANT, List.of(specific, wildcard)); + + List result = index.match(TENANT, 5L, 6L, 7L); + assertEquals(1, result.size(), "同一 chainId 应去重为 1 条"); + assertEquals(20L, result.get(0).getId()); + } + + @Test + void testMatch_prioritySortAsc() { + index.load(TENANT, List.of( + chain(30L, 5L, 6L, 7L, 300), + chain(31L, 5L, 6L, 7L, 100), + chain(32L, 5L, 6L, 7L, 200))); + List result = index.match(TENANT, 5L, 6L, 7L); + assertEquals(3, result.size()); + assertEquals(31L, result.get(0).getId()); + assertEquals(32L, result.get(1).getId()); + assertEquals(30L, result.get(2).getId()); + } + + @Test + void testMatch_emptyWhenNoChainLoaded() { + List result = index.match(TENANT, 5L, 6L, 7L); + assertTrue(result.isEmpty()); + } + + @Test + void testEvictTenant_clearsIndex() { + index.load(TENANT, List.of(chain(40L, 5L, 6L, 7L, 100))); + assertEquals(1, index.sizeOf(TENANT)); + index.evictTenant(TENANT); + assertEquals(0, index.sizeOf(TENANT)); + assertTrue(index.match(TENANT, 5L, 6L, 7L).isEmpty()); + } + + @Test + void testMatch_tenantIsolation() { + index.load(1L, List.of(chain(50L, 5L, 6L, 7L, 100))); + index.load(2L, List.of(chain(51L, 5L, 6L, 7L, 100))); + assertEquals(50L, index.match(1L, 5L, 6L, 7L).get(0).getId()); + assertEquals(51L, index.match(2L, 5L, 6L, 7L).get(0).getId()); + } + + private static CompiledRuleChain chain(Long id, Long sub, Long prod, Long dev, int priority) { + return CompiledRuleChain.builder() + .id(id).name("chain-" + id).tenantId(TENANT) + .priority(priority).version(1L) + .subsystemId(sub).productId(prod).deviceId(dev) + .build(); + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/DagExecutorTest.java b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/DagExecutorTest.java new file mode 100644 index 00000000..7459795c --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/DagExecutorTest.java @@ -0,0 +1,215 @@ +package com.viewsh.module.iot.rule.engine; + +import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; +import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType; +import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory; +import com.viewsh.module.iot.rule.engine.exception.RuleChainException; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * {@link DagExecutor} 单元测试:覆盖线性链、分支链、并行动作、错误转 FAILURE、元数据传递、SKIP 截断。 + */ +class DagExecutorTest { + + private DagExecutor executor; + + /** 每次测试用可变的 Map 登记节点执行顺序,用于断言 */ + private List executed; + /** Provider 用户指定每个 (category:type) 的行为(返回什么 NodeResult) */ + private Map behaviors; + + @BeforeEach + void setUp() { + executed = new ArrayList<>(); + behaviors = new HashMap<>(); + + NodeProviderRegistry registry = new NodeProviderRegistry(List.of( + new RecordingProvider(RuleNodeCategory.TRIGGER, "test_trigger"), + new RecordingProvider(RuleNodeCategory.CONDITION, "test_condition"), + new RecordingProvider(RuleNodeCategory.ACTION, "test_action_a"), + new RecordingProvider(RuleNodeCategory.ACTION, "test_action_b"), + new RecordingProvider(RuleNodeCategory.ACTION, "test_action_c"))); + + executor = new DagExecutor(registry, new SimpleMeterRegistry()); + } + + @Test + void testLinearChain_triggerThenAction() { + // Trigger(1) --Success--> Action(2) + CompiledNode trigger = node("1", RuleNodeCategory.TRIGGER, "test_trigger"); + CompiledNode action = node("2", RuleNodeCategory.ACTION, "test_action_a"); + + CompiledRuleChain chain = chain(trigger, List.of(trigger, action), + Map.of("1", List.of(link("1", "2", RuleLinkRelationType.SUCCESS)))); + + executor.execute(chain, ctx(chain)); + + assertEquals(List.of("1", "2"), executed); + } + + @Test + void testBranchChain_conditionTrueTakesTrueBranch() { + // Trigger(1) --Success--> Condition(2) --True--> Action(3) + // --False--> Action(4) + behaviors.put("condition:test_condition", ctx -> NodeResult.truthy()); + CompiledNode t = node("1", RuleNodeCategory.TRIGGER, "test_trigger"); + CompiledNode c = node("2", RuleNodeCategory.CONDITION, "test_condition"); + CompiledNode a = node("3", RuleNodeCategory.ACTION, "test_action_a"); + CompiledNode b = node("4", RuleNodeCategory.ACTION, "test_action_b"); + + CompiledRuleChain chain = chain(t, List.of(t, c, a, b), Map.of( + "1", List.of(link("1", "2", RuleLinkRelationType.SUCCESS)), + "2", List.of( + link("2", "3", RuleLinkRelationType.TRUE), + link("2", "4", RuleLinkRelationType.FALSE)))); + + executor.execute(chain, ctx(chain)); + + assertTrue(executed.contains("3"), "True 分支应执行"); + assertFalse(executed.contains("4"), "False 分支不应执行"); + } + + @Test + void testParallelActions_fanOut() { + // Trigger(1) --Success--> Action(2), Action(3), Action(4)(三条并列 Success 边) + CompiledNode t = node("1", RuleNodeCategory.TRIGGER, "test_trigger"); + CompiledNode a = node("2", RuleNodeCategory.ACTION, "test_action_a"); + CompiledNode b = node("3", RuleNodeCategory.ACTION, "test_action_b"); + CompiledNode c = node("4", RuleNodeCategory.ACTION, "test_action_c"); + + CompiledRuleChain chain = chain(t, List.of(t, a, b, c), Map.of( + "1", List.of( + link("1", "2", RuleLinkRelationType.SUCCESS), + link("1", "3", RuleLinkRelationType.SUCCESS), + link("1", "4", RuleLinkRelationType.SUCCESS)))); + + executor.execute(chain, ctx(chain)); + + assertTrue(executed.containsAll(List.of("2", "3", "4"))); + } + + @Test + void testActionFailure_convertsRuntimeExceptionToFailure() { + // action_a 抛 RuntimeException → 应被转为 FAILURE,不向上抛 + behaviors.put("action:test_action_a", ctx -> { throw new RuntimeException("boom"); }); + + CompiledNode t = node("1", RuleNodeCategory.TRIGGER, "test_trigger"); + CompiledNode a = node("2", RuleNodeCategory.ACTION, "test_action_a"); + CompiledNode onFailure = node("3", RuleNodeCategory.ACTION, "test_action_b"); + + CompiledRuleChain chain = chain(t, List.of(t, a, onFailure), Map.of( + "1", List.of(link("1", "2", RuleLinkRelationType.SUCCESS)), + "2", List.of(link("2", "3", RuleLinkRelationType.FAILURE)))); + + assertDoesNotThrow(() -> executor.execute(chain, ctx(chain))); + assertTrue(executed.contains("3"), "FAILURE 分支应被走到"); + } + + @Test + void testMetadataPropagation_acrossNodes() { + // Node1 写 meta['foo']=bar → Node2 读取 + AtomicInteger readValue = new AtomicInteger(); + behaviors.put("trigger:test_trigger", ctx -> + NodeResult.success(Map.of("foo", 42))); + behaviors.put("action:test_action_a", ctx -> { + Object v = ctx.getMetadata().get("foo"); + readValue.set(v instanceof Integer ? (Integer) v : -1); + return NodeResult.success(); + }); + + CompiledNode t = node("1", RuleNodeCategory.TRIGGER, "test_trigger"); + CompiledNode a = node("2", RuleNodeCategory.ACTION, "test_action_a"); + + CompiledRuleChain chain = chain(t, List.of(t, a), + Map.of("1", List.of(link("1", "2", RuleLinkRelationType.SUCCESS)))); + + executor.execute(chain, ctx(chain)); + assertEquals(42, readValue.get()); + } + + @Test + void testSkipResult_prunesDownstream() { + // Action(2) 返回 SKIP → 下游 Action(3) 不执行 + behaviors.put("action:test_action_a", ctx -> NodeResult.skip("shake-limit")); + + CompiledNode t = node("1", RuleNodeCategory.TRIGGER, "test_trigger"); + CompiledNode a = node("2", RuleNodeCategory.ACTION, "test_action_a"); + CompiledNode b = node("3", RuleNodeCategory.ACTION, "test_action_b"); + + CompiledRuleChain chain = chain(t, List.of(t, a, b), Map.of( + "1", List.of(link("1", "2", RuleLinkRelationType.SUCCESS)), + "2", List.of(link("2", "3", RuleLinkRelationType.SUCCESS)))); + + executor.execute(chain, ctx(chain)); + assertTrue(executed.contains("2")); + assertFalse(executed.contains("3"), "SKIP 下游不应执行"); + } + + @Test + void testMissingTrigger_throwsRuleChainException() { + CompiledRuleChain chain = CompiledRuleChain.builder() + .id(1L).tenantId(1L).priority(100) + .firstNode(null).nodesById(Map.of()).outgoingByNode(Map.of()) + .build(); + assertThrows(RuleChainException.class, () -> executor.execute(chain, ctx(chain))); + } + + // --- helpers --- + + private RuleContext ctx(CompiledRuleChain chain) { + RuleContext ctx = new RuleContext(); + ctx.setChainId(chain.getId()); + ctx.setTenantId(chain.getTenantId()); + ctx.setTraceId("trace-test"); + ctx.setMessage(IotDeviceMessage.requestOf("thing.property.report")); + return ctx; + } + + private static CompiledNode node(String id, RuleNodeCategory category, String type) { + return CompiledNode.builder().id(id).name("node-" + id) + .category(category).type(type).configuration("{}").build(); + } + + private static CompiledLink link(String from, String to, RuleLinkRelationType rel) { + return CompiledLink.builder() + .id(Long.parseLong(from + to)) + .sourceNodeId(from).targetNodeId(to) + .relationType(rel).sortOrder(0).build(); + } + + private static CompiledRuleChain chain(CompiledNode first, List nodes, + Map> outgoing) { + Map byId = new HashMap<>(); + nodes.forEach(n -> byId.put(n.getId(), n)); + return CompiledRuleChain.builder() + .id(1L).tenantId(1L).name("test").priority(100).version(1L) + .firstNode(first).nodesById(byId).outgoingByNode(outgoing) + .build(); + } + + /** 可插入 behaviors 的 Provider:默认返回 SUCCESS,可被覆写 */ + @FunctionalInterface + private interface NodeBehavior { + NodeResult apply(RuleContext ctx); + } + + private class RecordingProvider implements NodeProvider { + private final RuleNodeCategory category; + private final String type; + RecordingProvider(RuleNodeCategory c, String t) { this.category = c; this.type = t; } + @Override public RuleNodeCategory getCategory() { return category; } + @Override public String getType() { return type; } + @Override public NodeResult execute(RuleContext ctx, String config) { + executed.add(ctx.getCurrentNodeId()); + NodeBehavior b = behaviors.get(category.getValue() + ":" + type); + return b == null ? NodeResult.success() : b.apply(ctx); + } + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/DefaultRuleEngineTest.java b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/DefaultRuleEngineTest.java new file mode 100644 index 00000000..ce4c484d --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/DefaultRuleEngineTest.java @@ -0,0 +1,100 @@ +package com.viewsh.module.iot.rule.engine; + +import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; +import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory; +import com.viewsh.module.iot.rule.engine.exception.RuleChainException; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * {@link DefaultRuleEngine} 单元测试:重点覆盖决议 #3 的链级 try-catch 隔离。 + */ +class DefaultRuleEngineTest { + + private static final Long TENANT = 1L; + + private ChainIndex chainIndex; + private AtomicInteger chain1Executions; + private AtomicInteger chain3Executions; + + private DefaultRuleEngine engine; + + @BeforeEach + void setUp() { + chainIndex = new ChainIndex(); + chain1Executions = new AtomicInteger(); + chain3Executions = new AtomicInteger(); + + NodeProviderRegistry registry = new NodeProviderRegistry(List.of( + new NodeProvider() { + @Override public RuleNodeCategory getCategory() { return RuleNodeCategory.TRIGGER; } + @Override public String getType() { return "ok"; } + @Override public NodeResult execute(RuleContext ctx, String config) { + if (ctx.getChainId() == 1L) chain1Executions.incrementAndGet(); + if (ctx.getChainId() == 3L) chain3Executions.incrementAndGet(); + return NodeResult.success(); + } + }, + new NodeProvider() { + @Override public RuleNodeCategory getCategory() { return RuleNodeCategory.TRIGGER; } + @Override public String getType() { return "boom"; } + @Override public NodeResult execute(RuleContext ctx, String config) { + // 模拟链级错误(如配置解析失败)— 由链级 try-catch 隔离 + throw new RuleChainException(ctx.getChainId(), ctx.getCurrentNodeId(), + "provider explode"); + } + })); + + DagExecutor dagExecutor = new DagExecutor(registry, new SimpleMeterRegistry()); + engine = new DefaultRuleEngine(chainIndex, dagExecutor, new SimpleMeterRegistry()); + } + + @Test + void testChainLevelIsolation_failureInMiddleDoesNotBreakOthers() { + // chain 1: ok, chain 2: boom, chain 3: ok + CompiledRuleChain c1 = buildChain(1L, "ok", 100); + CompiledRuleChain c2 = buildChain(2L, "boom", 200); + CompiledRuleChain c3 = buildChain(3L, "ok", 300); + chainIndex.load(TENANT, List.of(c1, c2, c3)); + + RuleEngineResult result = engine.execute( + IotDeviceMessage.requestOf("thing.property.report"), + TENANT, 5L, 6L, 7L); + + assertEquals(3, result.getMatchedChainCount()); + assertEquals(2, result.getSuccessChainIds().size(), "chain 1 和 3 成功"); + assertTrue(result.getSuccessChainIds().containsAll(List.of(1L, 3L))); + assertEquals(1, result.getFailures().size(), "chain 2 失败"); + assertEquals(2L, result.getFailures().get(0).getChainId()); + assertEquals(1, chain1Executions.get(), "chain 1 执行了 1 次"); + assertEquals(1, chain3Executions.get(), "chain 3 执行了 1 次"); + } + + @Test + void testNoMatch_returnsEmptyResult() { + RuleEngineResult result = engine.execute( + IotDeviceMessage.requestOf("thing.property.report"), + TENANT, 5L, 6L, 7L); + assertEquals(0, result.getMatchedChainCount()); + assertFalse(result.hasFailure()); + assertNotNull(result.getTraceId()); + } + + private CompiledRuleChain buildChain(Long id, String triggerType, int priority) { + CompiledNode trigger = CompiledNode.builder() + .id("n" + id).name("t").category(RuleNodeCategory.TRIGGER) + .type(triggerType).configuration("{}").build(); + return CompiledRuleChain.builder() + .id(id).tenantId(TENANT).name("chain-" + id).priority(priority).version(1L) + .subsystemId(5L).productId(6L).deviceId(7L) + .firstNode(trigger).nodesById(Map.of(trigger.getId(), trigger)) + .outgoingByNode(Map.of(trigger.getId(), List.of())) + .build(); + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/RuleChainCompilerTest.java b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/RuleChainCompilerTest.java new file mode 100644 index 00000000..d74b3e16 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/engine/RuleChainCompilerTest.java @@ -0,0 +1,135 @@ +package com.viewsh.module.iot.rule.engine; + +import com.viewsh.module.iot.rule.dal.dataobject.IotRuleChainDO; +import com.viewsh.module.iot.rule.dal.dataobject.IotRuleLinkDO; +import com.viewsh.module.iot.rule.dal.dataobject.IotRuleNodeDO; +import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType; +import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory; +import com.viewsh.module.iot.rule.engine.exception.RuleChainException; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * {@link RuleChainCompiler} 单元测试: + * 覆盖正常编译 / 单 Trigger 校验兜底 / DAG 无环兜底 / 非法枚举。 + */ +class RuleChainCompilerTest { + + @Test + void testCompile_linearChain_ok() { + IotRuleChainDO chain = chainDO(1L); + List nodes = List.of( + nodeDO(10L, RuleNodeCategory.TRIGGER, "device_property"), + nodeDO(11L, RuleNodeCategory.ACTION, "log")); + List links = List.of(linkDO(100L, 10L, 11L, RuleLinkRelationType.SUCCESS)); + + CompiledRuleChain compiled = RuleChainCompiler.compile(chain, nodes, links); + + assertEquals(1L, compiled.getId()); + assertNotNull(compiled.getFirstNode()); + assertEquals("10", compiled.getFirstNode().getId()); + assertEquals(1, compiled.outgoingOf("10").size()); + assertEquals("11", compiled.outgoingOf("10").get(0).getTargetNodeId()); + } + + @Test + void testCompile_multiTrigger_rejected() { + IotRuleChainDO chain = chainDO(1L); + List nodes = List.of( + nodeDO(10L, RuleNodeCategory.TRIGGER, "device_property"), + nodeDO(11L, RuleNodeCategory.TRIGGER, "device_event"), + nodeDO(12L, RuleNodeCategory.ACTION, "log")); + + RuleChainException ex = assertThrows(RuleChainException.class, + () -> RuleChainCompiler.compile(chain, nodes, List.of())); + assertTrue(ex.getMessage().contains("Trigger")); + } + + @Test + void testCompile_noNodes_rejected() { + assertThrows(RuleChainException.class, + () -> RuleChainCompiler.compile(chainDO(1L), List.of(), List.of())); + } + + @Test + void testCompile_cycleDetected() { + IotRuleChainDO chain = chainDO(1L); + List nodes = List.of( + nodeDO(10L, RuleNodeCategory.TRIGGER, "t"), + nodeDO(11L, RuleNodeCategory.ACTION, "a"), + nodeDO(12L, RuleNodeCategory.ACTION, "b")); + List links = List.of( + linkDO(100L, 10L, 11L, RuleLinkRelationType.SUCCESS), + linkDO(101L, 11L, 12L, RuleLinkRelationType.SUCCESS), + linkDO(102L, 12L, 11L, RuleLinkRelationType.SUCCESS)); // cycle + + RuleChainException ex = assertThrows(RuleChainException.class, + () -> RuleChainCompiler.compile(chain, nodes, links)); + assertTrue(ex.getMessage().contains("环")); + } + + @Test + void testCompile_invalidCategory_rejected() { + IotRuleChainDO chain = chainDO(1L); + IotRuleNodeDO bad = new IotRuleNodeDO(); + bad.setId(10L); bad.setCategory("unknown"); bad.setType("t"); bad.setConfiguration("{}"); + assertThrows(RuleChainException.class, + () -> RuleChainCompiler.compile(chain, List.of(bad), List.of())); + } + + @Test + void testCompile_invalidRelationType_rejected() { + IotRuleChainDO chain = chainDO(1L); + List nodes = List.of( + nodeDO(10L, RuleNodeCategory.TRIGGER, "t"), + nodeDO(11L, RuleNodeCategory.ACTION, "a")); + IotRuleLinkDO bad = new IotRuleLinkDO(); + bad.setId(100L); bad.setSourceNodeId(10L); bad.setTargetNodeId(11L); + bad.setRelationType("InvalidRelation"); bad.setSortOrder(0); + assertThrows(RuleChainException.class, + () -> RuleChainCompiler.compile(chain, nodes, List.of(bad))); + } + + @Test + void testCompile_linksSortedBySortOrder() { + IotRuleChainDO chain = chainDO(1L); + List nodes = List.of( + nodeDO(10L, RuleNodeCategory.TRIGGER, "t"), + nodeDO(11L, RuleNodeCategory.ACTION, "a"), + nodeDO(12L, RuleNodeCategory.ACTION, "b")); + IotRuleLinkDO l1 = linkDO(100L, 10L, 11L, RuleLinkRelationType.SUCCESS); l1.setSortOrder(2); + IotRuleLinkDO l2 = linkDO(101L, 10L, 12L, RuleLinkRelationType.SUCCESS); l2.setSortOrder(1); + + CompiledRuleChain compiled = RuleChainCompiler.compile(chain, nodes, List.of(l1, l2)); + List sorted = compiled.outgoingOf("10"); + assertEquals("12", sorted.get(0).getTargetNodeId(), "sortOrder=1 应排前面"); + assertEquals("11", sorted.get(1).getTargetNodeId()); + } + + // --- helpers --- + + private static IotRuleChainDO chainDO(Long id) { + IotRuleChainDO d = new IotRuleChainDO(); + d.setId(id); d.setName("chain-" + id); d.setTenantId(1L); + d.setPriority(100); d.setVersion(0L); d.setDebugMode(false); + return d; + } + + private static IotRuleNodeDO nodeDO(Long id, RuleNodeCategory category, String type) { + IotRuleNodeDO d = new IotRuleNodeDO(); + d.setId(id); d.setName("node-" + id); + d.setCategory(category.getValue()); d.setType(type); + d.setConfiguration("{}"); + return d; + } + + private static IotRuleLinkDO linkDO(Long id, Long src, Long tgt, RuleLinkRelationType rel) { + IotRuleLinkDO d = new IotRuleLinkDO(); + d.setId(id); d.setSourceNodeId(src); d.setTargetNodeId(tgt); + d.setRelationType(rel.getValue()); d.setSortOrder(0); + return d; + } +}