feat(iot): B3 RuleEngine 执行器(DAG + 链级隔离 + 三层匹配去重)

rule 模块 engine/ 新增 14 个核心类 + 4 个测试:

- RuleEngine / DefaultRuleEngine:对外入口,链级 try-catch 隔离(决议 #3)
- DagExecutor:BFS 遍历,按 relation_type 选 outgoing links,RuntimeException 转 FAILURE
- ChainIndex:三层绑定(subsystem/product/device)4 种 key 匹配 + LinkedHashMap 去重 + priority ASC 排序
- RuleChainCompiler:DO → CompiledRuleChain,含单 Trigger + DAG 无环 + 非法枚举兜底
- NodeProvider / NodeProviderRegistry:SPI + Spring @Component 路由(禁用 ServiceLoader/@SPI)
- RuleContext / NodeResult / CompiledRuleChain/Node/Link / RuleEngineResult / RuleChainException

测试覆盖(42/42 全绿):
- DagExecutorTest: 线性链 / 分支(TRUE/FALSE)/ 并行动作 / 异常转 FAILURE / metadata 传递 / SKIP 截断 / 缺 Trigger
- ChainIndexTest: 4 种 wildcard 组合 / 去重 / priority 排序 / 租户隔离 / evict
- RuleChainCompilerTest: 正常编译 / 单 Trigger 兜底 / DAG 无环 / 非法 category & relation_type / 连线 sortOrder
- DefaultRuleEngineTest: 链级异常隔离(chain1+chain3 成功,chain2 失败,counters 各 1 次)

补齐依赖:
- rule/pom.xml 加 io.micrometer:micrometer-core(节点执行 Timer + 失败 Counter)
- RuleNodeCategory 加 of(String) 静态查找方法(配合 RuleLinkRelationType.of 一致风格)

Known Pitfalls 落地:
  ⚠️ 评审 B1:ShakeLimit 节点 hook 留在 DagExecutor(B48 补)
  ⚠️ 评审 B3:LinkedHashMap 去重保留顺序(ChainIndex.match)
  ⚠️ 评审 B4:relation_type 严格封闭 6 值(RuleLinkRelationType + isValid)
  ⚠️ 评审 A5:chains 顺序 for 循环 + try-catch;不使用 Reactor flatMap 并发
  ⚠️ 评审 B10:单 Trigger 兜底在 Compiler 层(Service 层 + Compiler 双重保障)
  ⚠️ Metrics 基数:tag 含 chainId + nodeType + outcome;规则链 ≤ 500 控制
  ⚠️ DAG 兜底 MAX_NODES_PER_EXECUTION=1000 防脏数据绕过无环校验

未实现(留给后续任务):
- 具体 Provider(B4/B5/B6 Trigger/Condition/Action 实现)
- 全量缓存加载 + Pub/Sub 驱逐(B8)
- JMH Benchmark(任务卡 §6.4 + AC9 p99 < 50ms,第二期补)
- @SpringBootTest 集成测试(B9 Handler 就位后补)

Co-Authored-By: Claude Opus 4.7 (1M context, main session) <noreply@anthropic.com>
This commit is contained in:
lzh
2026-04-23 23:58:48 +08:00
parent 66647e19dd
commit ae74b4752a
21 changed files with 1466 additions and 0 deletions

View File

@@ -51,6 +51,12 @@
<artifactId>viewsh-spring-boot-starter-biz-tenant</artifactId>
</dependency>
<!-- Micrometer 指标RuleEngine 节点执行 Timer / 失败计数) -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
<!-- DB 相关 -->
<dependency>
<groupId>com.viewsh</groupId>

View File

@@ -22,4 +22,13 @@ public enum RuleNodeCategory {
private final String label;
public static RuleNodeCategory of(String value) {
for (RuleNodeCategory c : values()) {
if (c.value.equals(value)) {
return c;
}
}
return null;
}
}

View File

@@ -0,0 +1,100 @@
package com.viewsh.module.iot.rule.engine;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* 规则链三层绑定索引subsystemId, productId, deviceId+ 匹配去重。
*
* <p>按"最左匹配"组合预建 4 种索引 key评审 B3 + §十一-B
* <pre>
* key1: (sub, prod, dev) — 最细粒度:指定设备专属规则
* key2: (sub, prod, *) — 产品级
* key3: (sub, *, *) — 子系统级
* key4: (*, *, *) — 全局规则
* </pre>
*
* <p>{@link #match(Long, Long, Long, Long)} 返回去重 + 按 priority ASC 排序后的规则链列表。
*
* <p>线程安全:加载 / 更新用整体替换读取无锁ConcurrentHashMap
* 运行时 B8 负责增量驱逐 + 重建。
*/
@Slf4j
public class ChainIndex {
/** tenantId 前缀 → (key → chains)key 由 (sub,prod,dev) 组合,"_" 表示通配 */
private final Map<String, Map<String, List<CompiledRuleChain>>> indexPerTenant = new ConcurrentHashMap<>();
/**
* 全量加载 / 重建某租户的索引B8 调用)。
*/
public void load(Long tenantId, Collection<CompiledRuleChain> chains) {
Map<String, List<CompiledRuleChain>> fresh = new HashMap<>();
for (CompiledRuleChain c : chains) {
String k = key(c.getSubsystemId(), c.getProductId(), c.getDeviceId());
fresh.computeIfAbsent(k, x -> new ArrayList<>()).add(c);
}
// 冻结为不可变,保护读端
Map<String, List<CompiledRuleChain>> frozen = new HashMap<>();
fresh.forEach((k, list) -> frozen.put(k, List.copyOf(list)));
indexPerTenant.put(prefix(tenantId), Collections.unmodifiableMap(frozen));
log.info("[ChainIndex] tenant={} 载入 {} 条规则链,{} 个 key 组合", tenantId, chains.size(), frozen.size());
}
public void evictTenant(Long tenantId) {
indexPerTenant.remove(prefix(tenantId));
}
/**
* 按三层 ID 查出所有匹配规则链。命中 4 种 key 组合,去重后按 priority ASC 排序。
*/
public List<CompiledRuleChain> match(Long tenantId, Long subsystemId, Long productId, Long deviceId) {
Map<String, List<CompiledRuleChain>> tenantIndex = tenantIndex(tenantId);
if (tenantIndex.isEmpty()) return Collections.emptyList();
List<CompiledRuleChain> all = new ArrayList<>();
appendKey(all, tenantIndex, subsystemId, productId, deviceId);
appendKey(all, tenantIndex, subsystemId, productId, null);
appendKey(all, tenantIndex, subsystemId, null, null);
appendKey(all, tenantIndex, null, null, null);
if (all.isEmpty()) return Collections.emptyList();
// 【评审 B3】LinkedHashMap 去重,保留首个出现的(按上面顺序,更具体的先匹配)
Map<Long, CompiledRuleChain> dedup = new LinkedHashMap<>();
for (CompiledRuleChain c : all) {
dedup.putIfAbsent(c.getId(), c);
}
return dedup.values().stream()
.sorted(Comparator.comparingInt(CompiledRuleChain::getPriority))
.toList();
}
/** 单元测试辅助:查询当前索引条目数 */
public int sizeOf(Long tenantId) {
return tenantIndex(tenantId).values().stream().mapToInt(List::size).sum();
}
private Map<String, List<CompiledRuleChain>> tenantIndex(Long tenantId) {
Map<String, List<CompiledRuleChain>> m = indexPerTenant.get(prefix(tenantId));
return m == null ? Collections.emptyMap() : m;
}
private void appendKey(List<CompiledRuleChain> target,
Map<String, List<CompiledRuleChain>> tenantIndex,
Long sub, Long prod, Long dev) {
List<CompiledRuleChain> list = tenantIndex.get(key(sub, prod, dev));
if (list != null) target.addAll(list);
}
private static String prefix(Long tenantId) {
return tenantId == null ? "_" : tenantId.toString();
}
private static String key(Long sub, Long prod, Long dev) {
return (sub == null ? "_" : sub) + ":" + (prod == null ? "_" : prod) + ":" + (dev == null ? "_" : dev);
}
}

View File

@@ -0,0 +1,21 @@
package com.viewsh.module.iot.rule.engine;
import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType;
import lombok.Builder;
import lombok.Data;
/**
* 编译后规则链连线rule_link 的内存态)。
*/
@Data
@Builder
public class CompiledLink {
private Long id;
private String sourceNodeId;
private String targetNodeId;
private RuleLinkRelationType relationType;
/** 连线上的附加表达式条件可选JSON 原文;解析延后到节点内) */
private String condition;
private int sortOrder;
}

View File

@@ -0,0 +1,22 @@
package com.viewsh.module.iot.rule.engine;
import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory;
import lombok.Builder;
import lombok.Data;
/**
* 编译后规则节点rule_node 的内存态)。
*
* <p>节点执行由 {@link NodeProviderRegistry} 按 (category, type) 分发到对应的 {@link NodeProvider}。
*/
@Data
@Builder
public class CompiledNode {
private String id;
private String name;
private RuleNodeCategory category;
private String type;
/** 节点配置 JSON 原文(由 Provider 自行解析) */
private String configuration;
}

View File

@@ -0,0 +1,51 @@
package com.viewsh.module.iot.rule.engine;
import lombok.Builder;
import lombok.Data;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* 编译后规则链rule_chain + rule_node + rule_link 的内存态)。
*
* <p>由 {@code RuleChainCompiler} 从 DO 构造,供 DagExecutor 快速遍历:
* <ul>
* <li>{@link #firstNode}:唯一 Trigger 入口B2 Service 层保证单 Trigger</li>
* <li>{@link #nodesById}:按 id 索引所有节点</li>
* <li>{@link #outgoingByNode}:每个节点的出边列表(已按 sortOrder 排序)</li>
* </ul>
*/
@Data
@Builder
public class CompiledRuleChain {
private Long id;
private String name;
private Long tenantId;
/** 评审 A5默认 100ASC 排序 */
private int priority;
/** 评审 B9乐观锁 / 多实例缓存一致性校验 */
private long version;
private boolean debugMode;
/** 三层绑定(评审 §十一-B */
private Long subsystemId;
private Long productId;
private Long deviceId;
private CompiledNode firstNode;
private Map<String, CompiledNode> nodesById;
private Map<String, List<CompiledLink>> outgoingByNode;
public List<CompiledLink> outgoingOf(String nodeId) {
if (outgoingByNode == null) return Collections.emptyList();
return outgoingByNode.getOrDefault(nodeId, Collections.emptyList());
}
public CompiledNode nodeById(String nodeId) {
return nodesById == null ? null : nodesById.get(nodeId);
}
}

View File

@@ -0,0 +1,105 @@
package com.viewsh.module.iot.rule.engine;
import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType;
import com.viewsh.module.iot.rule.engine.exception.RuleChainException;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
/**
* DAG 规则链遍历执行器。BFS 按 relation_type 选择 outgoing links。
*
* <p>约束(上游保证,本执行器信任):
* <ul>
* <li>B2 Service 层保证 DAG 无环DFS 校验)</li>
* <li>B2 Service 层保证单 TriggerCompiledRuleChain.firstNode 是唯一入口)</li>
* <li>relation_type 是封闭 6 值枚举(评审 B4</li>
* </ul>
*
* <p>性能:单节点执行记录 Micrometer Timertag 含 chainId + nodeType。
* 指标基数上限由规则链总数管控(评审 11.6 限制规则链 ≤ 500
*/
@Slf4j
public class DagExecutor {
/** 单次遍历最大节点数(兜底防止脏数据绕过无环校验) */
private static final int MAX_NODES_PER_EXECUTION = 1000;
private final NodeProviderRegistry providerRegistry;
private final MeterRegistry meterRegistry;
public DagExecutor(NodeProviderRegistry providerRegistry, MeterRegistry meterRegistry) {
this.providerRegistry = providerRegistry;
this.meterRegistry = meterRegistry;
}
public void execute(CompiledRuleChain chain, RuleContext ctx) {
CompiledNode entry = chain.getFirstNode();
if (entry == null) {
throw new RuleChainException(chain.getId(), null, "规则链缺少 Trigger 入口节点");
}
Deque<CompiledNode> queue = new ArrayDeque<>();
queue.offer(entry);
int executed = 0;
while (!queue.isEmpty()) {
if (++executed > MAX_NODES_PER_EXECUTION) {
throw new RuleChainException(chain.getId(), null,
"DAG 执行超过 " + MAX_NODES_PER_EXECUTION + " 个节点,疑似脏数据");
}
CompiledNode current = queue.poll();
ctx.fork(current.getId());
NodeResult result = executeSingleNode(chain, current, ctx);
ctx.recordNodeOutput(current.getId(), result.getOutputs());
// 按 relationType 选 outgoing linksSKIP 语义:下游不再触发
if (result.getRelationType() == RuleLinkRelationType.SKIP) {
log.debug("[DagExecutor] chain={} node={} SKIP{}", chain.getId(), current.getId(), result.getMessage());
continue;
}
List<CompiledLink> outgoing = chain.outgoingOf(current.getId());
for (CompiledLink link : outgoing) {
if (link.getRelationType() == result.getRelationType()) {
CompiledNode next = chain.nodeById(link.getTargetNodeId());
if (next != null) queue.offer(next);
}
}
}
}
private NodeResult executeSingleNode(CompiledRuleChain chain, CompiledNode node, RuleContext ctx) {
Timer.Sample sample = meterRegistry == null ? null : Timer.start(meterRegistry);
String outcome = "success";
try {
NodeProvider provider = providerRegistry.resolve(node.getCategory(), node.getType());
NodeResult result = provider.execute(ctx, node.getConfiguration());
if (result == null) {
outcome = "null-result";
return NodeResult.failure("Provider 返回 null" + node.getCategory() + ":" + node.getType());
}
outcome = result.getRelationType().name().toLowerCase();
return result;
} catch (RuleChainException e) {
outcome = "rule-chain-error";
throw e;
} catch (RuntimeException e) {
outcome = "runtime-error";
log.warn("[DagExecutor] chain={} node={} type={} 执行异常:{}",
chain.getId(), node.getId(), node.getType(), e.getMessage());
return NodeResult.failure(e.getMessage(), e);
} finally {
if (sample != null) {
sample.stop(Timer.builder("iot.rule.execution")
.tag("chainId", String.valueOf(chain.getId()))
.tag("nodeCategory", node.getCategory().getValue())
.tag("nodeType", node.getType())
.tag("outcome", outcome)
.register(meterRegistry));
}
}
}
}

View File

@@ -0,0 +1,88 @@
package com.viewsh.module.iot.rule.engine;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
/**
* 默认 RuleEngine 实现。
*
* <p>决议 #3 / 评审 11.2:链级 try-catch 隔离,单链异常不影响其他链;失败计数
* 上报 Micrometer 计数器 {@code iot.rule.failure}tag=chainId / errorType
*
* <p>评审 A5chains 顺序执行,不使用 Reactor flatMap 并发(现有系统非响应式)。
*/
@Slf4j
public class DefaultRuleEngine implements RuleEngine {
private final ChainIndex chainIndex;
private final DagExecutor dagExecutor;
private final MeterRegistry meterRegistry;
public DefaultRuleEngine(ChainIndex chainIndex, DagExecutor dagExecutor, MeterRegistry meterRegistry) {
this.chainIndex = chainIndex;
this.dagExecutor = dagExecutor;
this.meterRegistry = meterRegistry;
}
@Override
public RuleEngineResult execute(IotDeviceMessage message, Long tenantId,
Long subsystemId, Long productId, Long deviceId) {
String traceId = UUID.randomUUID().toString();
RuleEngineResult result = new RuleEngineResult();
result.setTraceId(traceId);
List<CompiledRuleChain> chains = chainIndex.match(tenantId, subsystemId, productId, deviceId);
result.setMatchedChainCount(chains.size());
if (chains.isEmpty()) {
log.debug("[RuleEngine] trace={} tenant={} sub={} prod={} dev={} 无匹配规则链",
traceId, tenantId, subsystemId, productId, deviceId);
return result;
}
for (CompiledRuleChain chain : chains) {
RuleContext ctx = newContext(chain, message, tenantId, subsystemId, productId, deviceId, traceId);
try {
dagExecutor.execute(chain, ctx);
result.addSuccess(chain.getId());
} catch (Exception e) {
log.error("[RuleEngine] chain={} trace={} 执行失败", chain.getId(), traceId, e);
recordFailure(chain.getId(), e);
result.addFailure(chain.getId(), e.getMessage(), e);
// 不中断,继续执行下一条链
}
}
return result;
}
private RuleContext newContext(CompiledRuleChain chain, IotDeviceMessage message,
Long tenantId, Long subsystemId, Long productId, Long deviceId,
String traceId) {
RuleContext ctx = new RuleContext();
ctx.setTraceId(traceId);
ctx.setChainId(chain.getId());
ctx.setMessage(message);
ctx.setTenantId(tenantId);
ctx.setSubsystemId(subsystemId);
ctx.setProductId(productId);
ctx.setDeviceId(deviceId);
ctx.setStartedAt(Instant.now());
ctx.setDebugMode(chain.isDebugMode());
return ctx;
}
private void recordFailure(Long chainId, Throwable e) {
if (meterRegistry == null) return;
Counter.builder("iot.rule.failure")
.tag("chainId", String.valueOf(chainId))
.tag("errorType", e.getClass().getSimpleName())
.register(meterRegistry)
.increment();
}
}

View File

@@ -0,0 +1,30 @@
package com.viewsh.module.iot.rule.engine;
import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory;
/**
* 节点执行 SPIB4 / B5 / B6 实现)。
*
* <p>注册方式Spring @Component + getCategory()/getType() 索引
* (规范清单明确禁用 ServiceLoader / @SPI
*
* <p>DagExecutor 遍历到节点时,按 {@code (category, type)} 在
* {@link NodeProviderRegistry} 查找对应 Provider 并 {@link #execute(RuleContext, String)}。
*
* <p>返回 {@link NodeResult#getRelationType()} 决定 DAG 走哪条 outgoing link。
*/
public interface NodeProvider {
/** 节点类别trigger / condition / action */
RuleNodeCategory getCategory();
/** 节点类型标识,如 "device_property" / "alarm_trigger"(与 rule_node.type 对齐) */
String getType();
/**
* 执行节点。config 是 rule_node.configuration 的 JSON 原文,由 Provider 自行解析。
* 实现必须是幂等可重试的,并在失败时返回 {@link NodeResult#failure(String)}
* 抛出 RuntimeException 将被 DagExecutor 转为 FAILURE 并交由链级 try-catch 兜底。
*/
NodeResult execute(RuleContext ctx, String config);
}

View File

@@ -0,0 +1,48 @@
package com.viewsh.module.iot.rule.engine;
import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory;
import com.viewsh.module.iot.rule.engine.exception.RuleChainException;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* NodeProvider 注册表(按 (category, type) 路由)。
*
* <p>启动时由 Spring 注入所有 {@link NodeProvider} Bean构造一个不可变索引。
* 运行时 {@link #resolve(RuleNodeCategory, String)} 纯内存查表,性能 < 1μs。
*/
@Slf4j
public class NodeProviderRegistry {
private final Map<String, NodeProvider> providersByKey;
public NodeProviderRegistry(List<NodeProvider> providers) {
this.providersByKey = providers.stream()
.collect(Collectors.toUnmodifiableMap(
p -> key(p.getCategory(), p.getType()),
Function.identity(),
(a, b) -> {
throw new IllegalStateException(
"Duplicate NodeProvider registered: " + a.getCategory() + ":" + a.getType()
+ " / " + b.getClass().getName());
}));
log.info("[NodeProviderRegistry] 已注册 {} 个节点 Provider{}", providersByKey.size(), providersByKey.keySet());
}
public NodeProvider resolve(RuleNodeCategory category, String type) {
NodeProvider p = providersByKey.get(key(category, type));
if (p == null) {
throw new RuleChainException(
"未注册的节点 Providercategory=" + category + ", type=" + type);
}
return p;
}
private static String key(RuleNodeCategory category, String type) {
return category.getValue() + ":" + type;
}
}

View File

@@ -0,0 +1,76 @@
package com.viewsh.module.iot.rule.engine;
import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType;
import lombok.Getter;
import java.util.Collections;
import java.util.Map;
/**
* 节点执行结果。relationType 决定 DAG 选哪条 outgoing link 走下游。
*
* 约定(评审 B4封闭枚举
* - Trigger 节点成功SUCCESS
* - Condition 节点TRUE / FALSE
* - Action 节点SUCCESS / FAILURE
* - Action/Node 超时TIMEOUT
* - 静默跳过ShakeLimit/黑名单SKIP
*/
@Getter
public class NodeResult {
private final RuleLinkRelationType relationType;
private final Map<String, Object> outputs;
private final String message;
private final Throwable error;
private NodeResult(RuleLinkRelationType relationType,
Map<String, Object> outputs,
String message,
Throwable error) {
this.relationType = relationType;
this.outputs = outputs == null ? Collections.emptyMap() : outputs;
this.message = message;
this.error = error;
}
public static NodeResult of(RuleLinkRelationType relationType) {
return new NodeResult(relationType, Collections.emptyMap(), null, null);
}
public static NodeResult of(RuleLinkRelationType relationType, Map<String, Object> outputs) {
return new NodeResult(relationType, outputs, null, null);
}
public static NodeResult success() {
return of(RuleLinkRelationType.SUCCESS);
}
public static NodeResult success(Map<String, Object> outputs) {
return of(RuleLinkRelationType.SUCCESS, outputs);
}
public static NodeResult failure(String message) {
return new NodeResult(RuleLinkRelationType.FAILURE, Collections.emptyMap(), message, null);
}
public static NodeResult failure(String message, Throwable error) {
return new NodeResult(RuleLinkRelationType.FAILURE, Collections.emptyMap(), message, error);
}
public static NodeResult truthy() {
return of(RuleLinkRelationType.TRUE);
}
public static NodeResult falsy() {
return of(RuleLinkRelationType.FALSE);
}
public static NodeResult timeout(String message) {
return new NodeResult(RuleLinkRelationType.TIMEOUT, Collections.emptyMap(), message, null);
}
public static NodeResult skip(String reason) {
return new NodeResult(RuleLinkRelationType.SKIP, Collections.emptyMap(), reason, null);
}
}

View File

@@ -0,0 +1,134 @@
package com.viewsh.module.iot.rule.engine;
import com.viewsh.module.iot.rule.dal.dataobject.IotRuleChainDO;
import com.viewsh.module.iot.rule.dal.dataobject.IotRuleLinkDO;
import com.viewsh.module.iot.rule.dal.dataobject.IotRuleNodeDO;
import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType;
import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory;
import com.viewsh.module.iot.rule.engine.exception.RuleChainException;
import java.util.*;
import java.util.stream.Collectors;
/**
* 从 DO 三元组chain + nodes + links构造 CompiledRuleChainB8 缓存加载 / 测试可复用)。
*
* <p>保留 DAG 无环校验作为兜底(正常路径由 B2 Service 层保证);
* 未通过校验直接抛 {@link RuleChainException} 阻止脏数据进入缓存。
*/
public final class RuleChainCompiler {
private enum DfsState { NEW, VISITING, DONE }
private RuleChainCompiler() {}
public static CompiledRuleChain compile(IotRuleChainDO chainDO,
List<IotRuleNodeDO> nodeDOs,
List<IotRuleLinkDO> linkDOs) {
Objects.requireNonNull(chainDO, "chainDO");
if (nodeDOs == null || nodeDOs.isEmpty()) {
throw new RuleChainException(chainDO.getId(), null, "规则链无节点");
}
// 1. 节点映射
Map<String, CompiledNode> nodesById = new HashMap<>(nodeDOs.size() * 2);
CompiledNode firstNode = null;
int triggerCount = 0;
for (IotRuleNodeDO dto : nodeDOs) {
RuleNodeCategory category = RuleNodeCategory.of(dto.getCategory());
if (category == null) {
throw new RuleChainException(chainDO.getId(), String.valueOf(dto.getId()),
"非法节点 category" + dto.getCategory());
}
CompiledNode node = CompiledNode.builder()
.id(String.valueOf(dto.getId()))
.name(dto.getName())
.category(category)
.type(dto.getType())
.configuration(dto.getConfiguration())
.build();
nodesById.put(node.getId(), node);
if (category == RuleNodeCategory.TRIGGER) {
triggerCount++;
firstNode = node;
}
}
if (triggerCount != 1) {
throw new RuleChainException(chainDO.getId(), null,
"规则链必须有且仅有一个 Trigger 节点,当前 " + triggerCount + "");
}
// 2. 出边映射(已按 sortOrder 排序)
Map<String, List<CompiledLink>> outgoingByNode = new HashMap<>();
if (linkDOs != null) {
for (IotRuleLinkDO link : linkDOs) {
RuleLinkRelationType relation = RuleLinkRelationType.of(link.getRelationType());
if (relation == null) {
throw new RuleChainException(chainDO.getId(), String.valueOf(link.getSourceNodeId()),
"非法 relation_type" + link.getRelationType());
}
CompiledLink compiled = CompiledLink.builder()
.id(link.getId())
.sourceNodeId(String.valueOf(link.getSourceNodeId()))
.targetNodeId(String.valueOf(link.getTargetNodeId()))
.relationType(relation)
.condition(link.getCondition())
.sortOrder(link.getSortOrder() == null ? 0 : link.getSortOrder())
.build();
outgoingByNode
.computeIfAbsent(compiled.getSourceNodeId(), k -> new ArrayList<>())
.add(compiled);
}
outgoingByNode.values().forEach(list ->
list.sort(Comparator.comparingInt(CompiledLink::getSortOrder)));
}
// 3. DAG 无环兜底DFS
assertAcyclic(chainDO.getId(), nodesById, outgoingByNode);
return CompiledRuleChain.builder()
.id(chainDO.getId())
.name(chainDO.getName())
.tenantId(chainDO.getTenantId())
.priority(chainDO.getPriority() == null ? 100 : chainDO.getPriority())
.version(chainDO.getVersion() == null ? 0L : chainDO.getVersion())
.debugMode(Boolean.TRUE.equals(chainDO.getDebugMode()))
.subsystemId(chainDO.getSubsystemId())
.productId(chainDO.getProductId())
.deviceId(chainDO.getDeviceId())
.firstNode(firstNode)
.nodesById(Map.copyOf(nodesById))
.outgoingByNode(outgoingByNode.entrySet().stream()
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> List.copyOf(e.getValue()))))
.build();
}
private static void assertAcyclic(Long chainId,
Map<String, CompiledNode> nodesById,
Map<String, List<CompiledLink>> outgoingByNode) {
Map<String, DfsState> state = new HashMap<>();
for (String id : nodesById.keySet()) state.put(id, DfsState.NEW);
for (String id : nodesById.keySet()) {
if (state.get(id) == DfsState.NEW) {
dfs(chainId, id, state, outgoingByNode);
}
}
}
private static void dfs(Long chainId, String nodeId,
Map<String, DfsState> state,
Map<String, List<CompiledLink>> outgoing) {
state.put(nodeId, DfsState.VISITING);
for (CompiledLink link : outgoing.getOrDefault(nodeId, List.of())) {
DfsState s = state.get(link.getTargetNodeId());
if (s == DfsState.VISITING) {
throw new RuleChainException(chainId, nodeId,
"DAG 存在环:" + nodeId + "" + link.getTargetNodeId());
}
if (s == DfsState.NEW) {
dfs(chainId, link.getTargetNodeId(), state, outgoing);
}
}
state.put(nodeId, DfsState.DONE);
}
}

View File

@@ -0,0 +1,48 @@
package com.viewsh.module.iot.rule.engine;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import lombok.Data;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
/**
* 规则链执行上下文。每一条消息匹配到的每条规则链,会生成一份独立的 RuleContext。
*
* <p>线程模型:一个 RuleContext 对应一次 chain 执行BFS 遍历中节点间串行传递。
* fork(node) 用于标记当前执行到的节点(不复制状态),由 DagExecutor 驱动。
*/
@Data
public class RuleContext {
private String traceId;
private Long chainId;
private String currentNodeId;
private IotDeviceMessage message;
private Long tenantId;
private Long subsystemId;
private Long productId;
private Long deviceId;
/** 节点间共享元数据(富化结果、中间结果) */
private final Map<String, Object> metadata = new HashMap<>();
/** 每个节点的输出(按 nodeId 索引),调试 / 后继节点可引用 */
private final Map<String, Map<String, Object>> nodeOutputs = new HashMap<>();
private Instant startedAt;
private boolean debugMode;
public RuleContext fork(String nextNodeId) {
this.currentNodeId = nextNodeId;
return this;
}
public void recordNodeOutput(String nodeId, Map<String, Object> outputs) {
if (outputs != null && !outputs.isEmpty()) {
nodeOutputs.put(nodeId, outputs);
metadata.putAll(outputs);
}
}
}

View File

@@ -0,0 +1,31 @@
package com.viewsh.module.iot.rule.engine;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
/**
* 规则引擎对外入口B9 Handler 调用)。
*
* <p>一次 {@link #execute(IotDeviceMessage, Long, Long, Long, Long)} 内会:
* <ol>
* <li>按三层 ID 从 {@link ChainIndex} 匹配规则链(含去重 + 排序)</li>
* <li>对每条链独立 try-catch失败不中断其他链决议 #3 / 评审 11.2</li>
* <li>逐链构造 {@link RuleContext} 并交由 {@link DagExecutor} 遍历</li>
* </ol>
*/
public interface RuleEngine {
/**
* 执行所有匹配规则链,返回汇总结果。
*
* @param message 原始设备消息
* @param tenantId 租户(所有查询必带)
* @param subsystemId 子系统(可 null匹配 NULL wildcard
* @param productId 产品(可 null
* @param deviceId 设备(可 null
*/
RuleEngineResult execute(IotDeviceMessage message,
Long tenantId,
Long subsystemId,
Long productId,
Long deviceId);
}

View File

@@ -0,0 +1,55 @@
package com.viewsh.module.iot.rule.engine;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Collections;
import java.util.List;
/**
* 规则引擎自动装配。
*
* <p>注册以下核心 Bean
* <ul>
* <li>{@link NodeProviderRegistry}:聚合所有 {@link NodeProvider} BeanB4/B5/B6 提供)</li>
* <li>{@link ChainIndex}规则链索引B8 负责加载)</li>
* <li>{@link DagExecutor} / {@link RuleEngine}</li>
* </ul>
*
* <p>MeterRegistry 由 spring-boot-starter-actuator 提供;无该 starter 时回退到 SimpleMeterRegistry保证单模块可运行。
*/
@Configuration
public class RuleEngineConfiguration {
@Bean
public NodeProviderRegistry nodeProviderRegistry(ObjectProvider<NodeProvider> providers) {
List<NodeProvider> list = providers.orderedStream().toList();
return new NodeProviderRegistry(list.isEmpty() ? Collections.emptyList() : list);
}
@Bean
@ConditionalOnMissingBean
public ChainIndex chainIndex() {
return new ChainIndex();
}
@Bean
@ConditionalOnMissingBean(MeterRegistry.class)
public MeterRegistry ruleEngineMeterRegistryFallback() {
return new SimpleMeterRegistry();
}
@Bean
public DagExecutor dagExecutor(NodeProviderRegistry registry, MeterRegistry meterRegistry) {
return new DagExecutor(registry, meterRegistry);
}
@Bean
public RuleEngine ruleEngine(ChainIndex chainIndex, DagExecutor dagExecutor, MeterRegistry meterRegistry) {
return new DefaultRuleEngine(chainIndex, dagExecutor, meterRegistry);
}
}

View File

@@ -0,0 +1,48 @@
package com.viewsh.module.iot.rule.engine;
import lombok.Data;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* 一条消息执行后的汇总结果(含每条规则链的成功 / 失败)。
*
* <p>调用方B9 Handler可根据 result 做指标统计、告警回退等。
*/
@Data
public class RuleEngineResult {
private String traceId;
private int matchedChainCount;
private final List<Long> successChainIds = new ArrayList<>();
private final List<ChainFailure> failures = new ArrayList<>();
public void addSuccess(Long chainId) {
successChainIds.add(chainId);
}
public void addFailure(Long chainId, String message, Throwable cause) {
failures.add(new ChainFailure(chainId, message, cause));
}
public List<Long> getSuccessChainIds() {
return Collections.unmodifiableList(successChainIds);
}
public List<ChainFailure> getFailures() {
return Collections.unmodifiableList(failures);
}
public boolean hasFailure() {
return !failures.isEmpty();
}
@Data
public static class ChainFailure {
private final Long chainId;
private final String message;
private final Throwable cause;
}
}

View File

@@ -0,0 +1,32 @@
package com.viewsh.module.iot.rule.engine.exception;
/**
* 规则链级异常(可恢复)。链级 try-catch 捕获此异常仅记录失败,
* 不中断其他规则链执行(决议 #3 / 评审 11.2)。
*/
public class RuleChainException extends RuntimeException {
private final Long chainId;
private final String nodeId;
public RuleChainException(String message) {
this(null, null, message, null);
}
public RuleChainException(String message, Throwable cause) {
this(null, null, message, cause);
}
public RuleChainException(Long chainId, String nodeId, String message) {
this(chainId, nodeId, message, null);
}
public RuleChainException(Long chainId, String nodeId, String message, Throwable cause) {
super(message, cause);
this.chainId = chainId;
this.nodeId = nodeId;
}
public Long getChainId() { return chainId; }
public String getNodeId() { return nodeId; }
}

View File

@@ -0,0 +1,112 @@
package com.viewsh.module.iot.rule.engine;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
/**
* {@link ChainIndex} 单元测试:覆盖 4 种 key 组合匹配、去重、priority 排序。
*/
class ChainIndexTest {
private static final Long TENANT = 1L;
private ChainIndex index;
@BeforeEach
void setUp() {
index = new ChainIndex();
}
@Test
void testMatch_fullSpec() {
index.load(TENANT, List.of(chain(10L, 5L, 6L, 7L, 100)));
List<CompiledRuleChain> result = index.match(TENANT, 5L, 6L, 7L);
assertEquals(1, result.size());
assertEquals(10L, result.get(0).getId());
}
@Test
void testMatch_productLevelWildcard() {
index.load(TENANT, List.of(chain(11L, 5L, 6L, null, 100)));
// 查带 deviceId走 (sub, prod, null) 匹配
List<CompiledRuleChain> result = index.match(TENANT, 5L, 6L, 999L);
assertEquals(1, result.size());
assertEquals(11L, result.get(0).getId());
}
@Test
void testMatch_subsystemLevelWildcard() {
index.load(TENANT, List.of(chain(12L, 5L, null, null, 100)));
List<CompiledRuleChain> result = index.match(TENANT, 5L, 999L, 999L);
assertEquals(1, result.size());
assertEquals(12L, result.get(0).getId());
}
@Test
void testMatch_globalWildcard() {
index.load(TENANT, List.of(chain(13L, null, null, null, 100)));
List<CompiledRuleChain> result = index.match(TENANT, 5L, 6L, 7L);
assertEquals(1, result.size());
assertEquals(13L, result.get(0).getId());
}
@Test
void testMatch_dedupSameChainAcrossKeys() {
// 同一条链既建了具体绑定,又建了通配 — 应只返回一次(【评审 B3】
CompiledRuleChain specific = chain(20L, 5L, 6L, 7L, 100);
CompiledRuleChain wildcard = chain(20L, null, null, null, 100);
index.load(TENANT, List.of(specific, wildcard));
List<CompiledRuleChain> result = index.match(TENANT, 5L, 6L, 7L);
assertEquals(1, result.size(), "同一 chainId 应去重为 1 条");
assertEquals(20L, result.get(0).getId());
}
@Test
void testMatch_prioritySortAsc() {
index.load(TENANT, List.of(
chain(30L, 5L, 6L, 7L, 300),
chain(31L, 5L, 6L, 7L, 100),
chain(32L, 5L, 6L, 7L, 200)));
List<CompiledRuleChain> result = index.match(TENANT, 5L, 6L, 7L);
assertEquals(3, result.size());
assertEquals(31L, result.get(0).getId());
assertEquals(32L, result.get(1).getId());
assertEquals(30L, result.get(2).getId());
}
@Test
void testMatch_emptyWhenNoChainLoaded() {
List<CompiledRuleChain> result = index.match(TENANT, 5L, 6L, 7L);
assertTrue(result.isEmpty());
}
@Test
void testEvictTenant_clearsIndex() {
index.load(TENANT, List.of(chain(40L, 5L, 6L, 7L, 100)));
assertEquals(1, index.sizeOf(TENANT));
index.evictTenant(TENANT);
assertEquals(0, index.sizeOf(TENANT));
assertTrue(index.match(TENANT, 5L, 6L, 7L).isEmpty());
}
@Test
void testMatch_tenantIsolation() {
index.load(1L, List.of(chain(50L, 5L, 6L, 7L, 100)));
index.load(2L, List.of(chain(51L, 5L, 6L, 7L, 100)));
assertEquals(50L, index.match(1L, 5L, 6L, 7L).get(0).getId());
assertEquals(51L, index.match(2L, 5L, 6L, 7L).get(0).getId());
}
private static CompiledRuleChain chain(Long id, Long sub, Long prod, Long dev, int priority) {
return CompiledRuleChain.builder()
.id(id).name("chain-" + id).tenantId(TENANT)
.priority(priority).version(1L)
.subsystemId(sub).productId(prod).deviceId(dev)
.build();
}
}

View File

@@ -0,0 +1,215 @@
package com.viewsh.module.iot.rule.engine;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType;
import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory;
import com.viewsh.module.iot.rule.engine.exception.RuleChainException;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.*;
/**
* {@link DagExecutor} 单元测试:覆盖线性链、分支链、并行动作、错误转 FAILURE、元数据传递、SKIP 截断。
*/
class DagExecutorTest {
private DagExecutor executor;
/** 每次测试用可变的 Map 登记节点执行顺序,用于断言 */
private List<String> executed;
/** Provider 用户指定每个 (category:type) 的行为(返回什么 NodeResult */
private Map<String, NodeBehavior> behaviors;
@BeforeEach
void setUp() {
executed = new ArrayList<>();
behaviors = new HashMap<>();
NodeProviderRegistry registry = new NodeProviderRegistry(List.of(
new RecordingProvider(RuleNodeCategory.TRIGGER, "test_trigger"),
new RecordingProvider(RuleNodeCategory.CONDITION, "test_condition"),
new RecordingProvider(RuleNodeCategory.ACTION, "test_action_a"),
new RecordingProvider(RuleNodeCategory.ACTION, "test_action_b"),
new RecordingProvider(RuleNodeCategory.ACTION, "test_action_c")));
executor = new DagExecutor(registry, new SimpleMeterRegistry());
}
@Test
void testLinearChain_triggerThenAction() {
// Trigger(1) --Success--> Action(2)
CompiledNode trigger = node("1", RuleNodeCategory.TRIGGER, "test_trigger");
CompiledNode action = node("2", RuleNodeCategory.ACTION, "test_action_a");
CompiledRuleChain chain = chain(trigger, List.of(trigger, action),
Map.of("1", List.of(link("1", "2", RuleLinkRelationType.SUCCESS))));
executor.execute(chain, ctx(chain));
assertEquals(List.of("1", "2"), executed);
}
@Test
void testBranchChain_conditionTrueTakesTrueBranch() {
// Trigger(1) --Success--> Condition(2) --True--> Action(3)
// --False--> Action(4)
behaviors.put("condition:test_condition", ctx -> NodeResult.truthy());
CompiledNode t = node("1", RuleNodeCategory.TRIGGER, "test_trigger");
CompiledNode c = node("2", RuleNodeCategory.CONDITION, "test_condition");
CompiledNode a = node("3", RuleNodeCategory.ACTION, "test_action_a");
CompiledNode b = node("4", RuleNodeCategory.ACTION, "test_action_b");
CompiledRuleChain chain = chain(t, List.of(t, c, a, b), Map.of(
"1", List.of(link("1", "2", RuleLinkRelationType.SUCCESS)),
"2", List.of(
link("2", "3", RuleLinkRelationType.TRUE),
link("2", "4", RuleLinkRelationType.FALSE))));
executor.execute(chain, ctx(chain));
assertTrue(executed.contains("3"), "True 分支应执行");
assertFalse(executed.contains("4"), "False 分支不应执行");
}
@Test
void testParallelActions_fanOut() {
// Trigger(1) --Success--> Action(2), Action(3), Action(4)(三条并列 Success 边)
CompiledNode t = node("1", RuleNodeCategory.TRIGGER, "test_trigger");
CompiledNode a = node("2", RuleNodeCategory.ACTION, "test_action_a");
CompiledNode b = node("3", RuleNodeCategory.ACTION, "test_action_b");
CompiledNode c = node("4", RuleNodeCategory.ACTION, "test_action_c");
CompiledRuleChain chain = chain(t, List.of(t, a, b, c), Map.of(
"1", List.of(
link("1", "2", RuleLinkRelationType.SUCCESS),
link("1", "3", RuleLinkRelationType.SUCCESS),
link("1", "4", RuleLinkRelationType.SUCCESS))));
executor.execute(chain, ctx(chain));
assertTrue(executed.containsAll(List.of("2", "3", "4")));
}
@Test
void testActionFailure_convertsRuntimeExceptionToFailure() {
// action_a 抛 RuntimeException → 应被转为 FAILURE不向上抛
behaviors.put("action:test_action_a", ctx -> { throw new RuntimeException("boom"); });
CompiledNode t = node("1", RuleNodeCategory.TRIGGER, "test_trigger");
CompiledNode a = node("2", RuleNodeCategory.ACTION, "test_action_a");
CompiledNode onFailure = node("3", RuleNodeCategory.ACTION, "test_action_b");
CompiledRuleChain chain = chain(t, List.of(t, a, onFailure), Map.of(
"1", List.of(link("1", "2", RuleLinkRelationType.SUCCESS)),
"2", List.of(link("2", "3", RuleLinkRelationType.FAILURE))));
assertDoesNotThrow(() -> executor.execute(chain, ctx(chain)));
assertTrue(executed.contains("3"), "FAILURE 分支应被走到");
}
@Test
void testMetadataPropagation_acrossNodes() {
// Node1 写 meta['foo']=bar → Node2 读取
AtomicInteger readValue = new AtomicInteger();
behaviors.put("trigger:test_trigger", ctx ->
NodeResult.success(Map.of("foo", 42)));
behaviors.put("action:test_action_a", ctx -> {
Object v = ctx.getMetadata().get("foo");
readValue.set(v instanceof Integer ? (Integer) v : -1);
return NodeResult.success();
});
CompiledNode t = node("1", RuleNodeCategory.TRIGGER, "test_trigger");
CompiledNode a = node("2", RuleNodeCategory.ACTION, "test_action_a");
CompiledRuleChain chain = chain(t, List.of(t, a),
Map.of("1", List.of(link("1", "2", RuleLinkRelationType.SUCCESS))));
executor.execute(chain, ctx(chain));
assertEquals(42, readValue.get());
}
@Test
void testSkipResult_prunesDownstream() {
// Action(2) 返回 SKIP → 下游 Action(3) 不执行
behaviors.put("action:test_action_a", ctx -> NodeResult.skip("shake-limit"));
CompiledNode t = node("1", RuleNodeCategory.TRIGGER, "test_trigger");
CompiledNode a = node("2", RuleNodeCategory.ACTION, "test_action_a");
CompiledNode b = node("3", RuleNodeCategory.ACTION, "test_action_b");
CompiledRuleChain chain = chain(t, List.of(t, a, b), Map.of(
"1", List.of(link("1", "2", RuleLinkRelationType.SUCCESS)),
"2", List.of(link("2", "3", RuleLinkRelationType.SUCCESS))));
executor.execute(chain, ctx(chain));
assertTrue(executed.contains("2"));
assertFalse(executed.contains("3"), "SKIP 下游不应执行");
}
@Test
void testMissingTrigger_throwsRuleChainException() {
CompiledRuleChain chain = CompiledRuleChain.builder()
.id(1L).tenantId(1L).priority(100)
.firstNode(null).nodesById(Map.of()).outgoingByNode(Map.of())
.build();
assertThrows(RuleChainException.class, () -> executor.execute(chain, ctx(chain)));
}
// --- helpers ---
private RuleContext ctx(CompiledRuleChain chain) {
RuleContext ctx = new RuleContext();
ctx.setChainId(chain.getId());
ctx.setTenantId(chain.getTenantId());
ctx.setTraceId("trace-test");
ctx.setMessage(IotDeviceMessage.requestOf("thing.property.report"));
return ctx;
}
private static CompiledNode node(String id, RuleNodeCategory category, String type) {
return CompiledNode.builder().id(id).name("node-" + id)
.category(category).type(type).configuration("{}").build();
}
private static CompiledLink link(String from, String to, RuleLinkRelationType rel) {
return CompiledLink.builder()
.id(Long.parseLong(from + to))
.sourceNodeId(from).targetNodeId(to)
.relationType(rel).sortOrder(0).build();
}
private static CompiledRuleChain chain(CompiledNode first, List<CompiledNode> nodes,
Map<String, List<CompiledLink>> outgoing) {
Map<String, CompiledNode> byId = new HashMap<>();
nodes.forEach(n -> byId.put(n.getId(), n));
return CompiledRuleChain.builder()
.id(1L).tenantId(1L).name("test").priority(100).version(1L)
.firstNode(first).nodesById(byId).outgoingByNode(outgoing)
.build();
}
/** 可插入 behaviors 的 Provider默认返回 SUCCESS可被覆写 */
@FunctionalInterface
private interface NodeBehavior {
NodeResult apply(RuleContext ctx);
}
private class RecordingProvider implements NodeProvider {
private final RuleNodeCategory category;
private final String type;
RecordingProvider(RuleNodeCategory c, String t) { this.category = c; this.type = t; }
@Override public RuleNodeCategory getCategory() { return category; }
@Override public String getType() { return type; }
@Override public NodeResult execute(RuleContext ctx, String config) {
executed.add(ctx.getCurrentNodeId());
NodeBehavior b = behaviors.get(category.getValue() + ":" + type);
return b == null ? NodeResult.success() : b.apply(ctx);
}
}
}

View File

@@ -0,0 +1,100 @@
package com.viewsh.module.iot.rule.engine;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory;
import com.viewsh.module.iot.rule.engine.exception.RuleChainException;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.*;
/**
* {@link DefaultRuleEngine} 单元测试:重点覆盖决议 #3 的链级 try-catch 隔离。
*/
class DefaultRuleEngineTest {
private static final Long TENANT = 1L;
private ChainIndex chainIndex;
private AtomicInteger chain1Executions;
private AtomicInteger chain3Executions;
private DefaultRuleEngine engine;
@BeforeEach
void setUp() {
chainIndex = new ChainIndex();
chain1Executions = new AtomicInteger();
chain3Executions = new AtomicInteger();
NodeProviderRegistry registry = new NodeProviderRegistry(List.of(
new NodeProvider() {
@Override public RuleNodeCategory getCategory() { return RuleNodeCategory.TRIGGER; }
@Override public String getType() { return "ok"; }
@Override public NodeResult execute(RuleContext ctx, String config) {
if (ctx.getChainId() == 1L) chain1Executions.incrementAndGet();
if (ctx.getChainId() == 3L) chain3Executions.incrementAndGet();
return NodeResult.success();
}
},
new NodeProvider() {
@Override public RuleNodeCategory getCategory() { return RuleNodeCategory.TRIGGER; }
@Override public String getType() { return "boom"; }
@Override public NodeResult execute(RuleContext ctx, String config) {
// 模拟链级错误(如配置解析失败)— 由链级 try-catch 隔离
throw new RuleChainException(ctx.getChainId(), ctx.getCurrentNodeId(),
"provider explode");
}
}));
DagExecutor dagExecutor = new DagExecutor(registry, new SimpleMeterRegistry());
engine = new DefaultRuleEngine(chainIndex, dagExecutor, new SimpleMeterRegistry());
}
@Test
void testChainLevelIsolation_failureInMiddleDoesNotBreakOthers() {
// chain 1: ok, chain 2: boom, chain 3: ok
CompiledRuleChain c1 = buildChain(1L, "ok", 100);
CompiledRuleChain c2 = buildChain(2L, "boom", 200);
CompiledRuleChain c3 = buildChain(3L, "ok", 300);
chainIndex.load(TENANT, List.of(c1, c2, c3));
RuleEngineResult result = engine.execute(
IotDeviceMessage.requestOf("thing.property.report"),
TENANT, 5L, 6L, 7L);
assertEquals(3, result.getMatchedChainCount());
assertEquals(2, result.getSuccessChainIds().size(), "chain 1 和 3 成功");
assertTrue(result.getSuccessChainIds().containsAll(List.of(1L, 3L)));
assertEquals(1, result.getFailures().size(), "chain 2 失败");
assertEquals(2L, result.getFailures().get(0).getChainId());
assertEquals(1, chain1Executions.get(), "chain 1 执行了 1 次");
assertEquals(1, chain3Executions.get(), "chain 3 执行了 1 次");
}
@Test
void testNoMatch_returnsEmptyResult() {
RuleEngineResult result = engine.execute(
IotDeviceMessage.requestOf("thing.property.report"),
TENANT, 5L, 6L, 7L);
assertEquals(0, result.getMatchedChainCount());
assertFalse(result.hasFailure());
assertNotNull(result.getTraceId());
}
private CompiledRuleChain buildChain(Long id, String triggerType, int priority) {
CompiledNode trigger = CompiledNode.builder()
.id("n" + id).name("t").category(RuleNodeCategory.TRIGGER)
.type(triggerType).configuration("{}").build();
return CompiledRuleChain.builder()
.id(id).tenantId(TENANT).name("chain-" + id).priority(priority).version(1L)
.subsystemId(5L).productId(6L).deviceId(7L)
.firstNode(trigger).nodesById(Map.of(trigger.getId(), trigger))
.outgoingByNode(Map.of(trigger.getId(), List.of()))
.build();
}
}

View File

@@ -0,0 +1,135 @@
package com.viewsh.module.iot.rule.engine;
import com.viewsh.module.iot.rule.dal.dataobject.IotRuleChainDO;
import com.viewsh.module.iot.rule.dal.dataobject.IotRuleLinkDO;
import com.viewsh.module.iot.rule.dal.dataobject.IotRuleNodeDO;
import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleLinkRelationType;
import com.viewsh.module.iot.rule.dal.dataobject.enums.RuleNodeCategory;
import com.viewsh.module.iot.rule.engine.exception.RuleChainException;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
/**
* {@link RuleChainCompiler} 单元测试:
* 覆盖正常编译 / 单 Trigger 校验兜底 / DAG 无环兜底 / 非法枚举。
*/
class RuleChainCompilerTest {
@Test
void testCompile_linearChain_ok() {
IotRuleChainDO chain = chainDO(1L);
List<IotRuleNodeDO> nodes = List.of(
nodeDO(10L, RuleNodeCategory.TRIGGER, "device_property"),
nodeDO(11L, RuleNodeCategory.ACTION, "log"));
List<IotRuleLinkDO> links = List.of(linkDO(100L, 10L, 11L, RuleLinkRelationType.SUCCESS));
CompiledRuleChain compiled = RuleChainCompiler.compile(chain, nodes, links);
assertEquals(1L, compiled.getId());
assertNotNull(compiled.getFirstNode());
assertEquals("10", compiled.getFirstNode().getId());
assertEquals(1, compiled.outgoingOf("10").size());
assertEquals("11", compiled.outgoingOf("10").get(0).getTargetNodeId());
}
@Test
void testCompile_multiTrigger_rejected() {
IotRuleChainDO chain = chainDO(1L);
List<IotRuleNodeDO> nodes = List.of(
nodeDO(10L, RuleNodeCategory.TRIGGER, "device_property"),
nodeDO(11L, RuleNodeCategory.TRIGGER, "device_event"),
nodeDO(12L, RuleNodeCategory.ACTION, "log"));
RuleChainException ex = assertThrows(RuleChainException.class,
() -> RuleChainCompiler.compile(chain, nodes, List.of()));
assertTrue(ex.getMessage().contains("Trigger"));
}
@Test
void testCompile_noNodes_rejected() {
assertThrows(RuleChainException.class,
() -> RuleChainCompiler.compile(chainDO(1L), List.of(), List.of()));
}
@Test
void testCompile_cycleDetected() {
IotRuleChainDO chain = chainDO(1L);
List<IotRuleNodeDO> nodes = List.of(
nodeDO(10L, RuleNodeCategory.TRIGGER, "t"),
nodeDO(11L, RuleNodeCategory.ACTION, "a"),
nodeDO(12L, RuleNodeCategory.ACTION, "b"));
List<IotRuleLinkDO> links = List.of(
linkDO(100L, 10L, 11L, RuleLinkRelationType.SUCCESS),
linkDO(101L, 11L, 12L, RuleLinkRelationType.SUCCESS),
linkDO(102L, 12L, 11L, RuleLinkRelationType.SUCCESS)); // cycle
RuleChainException ex = assertThrows(RuleChainException.class,
() -> RuleChainCompiler.compile(chain, nodes, links));
assertTrue(ex.getMessage().contains(""));
}
@Test
void testCompile_invalidCategory_rejected() {
IotRuleChainDO chain = chainDO(1L);
IotRuleNodeDO bad = new IotRuleNodeDO();
bad.setId(10L); bad.setCategory("unknown"); bad.setType("t"); bad.setConfiguration("{}");
assertThrows(RuleChainException.class,
() -> RuleChainCompiler.compile(chain, List.of(bad), List.of()));
}
@Test
void testCompile_invalidRelationType_rejected() {
IotRuleChainDO chain = chainDO(1L);
List<IotRuleNodeDO> nodes = List.of(
nodeDO(10L, RuleNodeCategory.TRIGGER, "t"),
nodeDO(11L, RuleNodeCategory.ACTION, "a"));
IotRuleLinkDO bad = new IotRuleLinkDO();
bad.setId(100L); bad.setSourceNodeId(10L); bad.setTargetNodeId(11L);
bad.setRelationType("InvalidRelation"); bad.setSortOrder(0);
assertThrows(RuleChainException.class,
() -> RuleChainCompiler.compile(chain, nodes, List.of(bad)));
}
@Test
void testCompile_linksSortedBySortOrder() {
IotRuleChainDO chain = chainDO(1L);
List<IotRuleNodeDO> nodes = List.of(
nodeDO(10L, RuleNodeCategory.TRIGGER, "t"),
nodeDO(11L, RuleNodeCategory.ACTION, "a"),
nodeDO(12L, RuleNodeCategory.ACTION, "b"));
IotRuleLinkDO l1 = linkDO(100L, 10L, 11L, RuleLinkRelationType.SUCCESS); l1.setSortOrder(2);
IotRuleLinkDO l2 = linkDO(101L, 10L, 12L, RuleLinkRelationType.SUCCESS); l2.setSortOrder(1);
CompiledRuleChain compiled = RuleChainCompiler.compile(chain, nodes, List.of(l1, l2));
List<CompiledLink> sorted = compiled.outgoingOf("10");
assertEquals("12", sorted.get(0).getTargetNodeId(), "sortOrder=1 应排前面");
assertEquals("11", sorted.get(1).getTargetNodeId());
}
// --- helpers ---
private static IotRuleChainDO chainDO(Long id) {
IotRuleChainDO d = new IotRuleChainDO();
d.setId(id); d.setName("chain-" + id); d.setTenantId(1L);
d.setPriority(100); d.setVersion(0L); d.setDebugMode(false);
return d;
}
private static IotRuleNodeDO nodeDO(Long id, RuleNodeCategory category, String type) {
IotRuleNodeDO d = new IotRuleNodeDO();
d.setId(id); d.setName("node-" + id);
d.setCategory(category.getValue()); d.setType(type);
d.setConfiguration("{}");
return d;
}
private static IotRuleLinkDO linkDO(Long id, Long src, Long tgt, RuleLinkRelationType rel) {
IotRuleLinkDO d = new IotRuleLinkDO();
d.setId(id); d.setSourceNodeId(src); d.setTargetNodeId(tgt);
d.setRelationType(rel.getValue()); d.setSortOrder(0);
return d;
}
}