From 171f2013840902cbdfff27d8381ef7a2e26ed320 Mon Sep 17 00:00:00 2001 From: lzh Date: Fri, 24 Apr 2026 11:00:38 +0800 Subject: [PATCH] =?UTF-8?q?feat(iot):=20Wave=205=20Round=202=20=E2=80=94?= =?UTF-8?q?=20B9/B14/B16=20=E7=BB=9F=E4=B8=80=E6=B6=88=E8=B4=B9=E5=85=A5?= =?UTF-8?q?=E5=8F=A3=20+=20=E5=91=8A=E8=AD=A6=E5=88=86=E5=B8=83=E5=BC=8F?= =?UTF-8?q?=E9=94=81=20+=20=E9=80=9A=E7=9F=A5=E9=9B=86=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit B9 IotRuleEngineMessageHandler(统一消费入口) - 新消费者 v2 统一入口,@PostConstruct 注册到 IotMessageBus - versionResolver.shouldUseV2 三态路由(V1/V2/HYBRID),绝不双跑 - device null 时 WARN + skip;RuleEngine 异常 try-catch 吞掉防重试风暴 - v1 三消费者(DataRule/SceneRule/CleanRule)增加前置 v2 bypass 判断 - 6 个单元测试(global-v1/v2/hybrid 白名单命中/未命中/device-null/引擎异常) B14 告警缓存 + SET NX PX 分布式锁 + 有效性判断 - IotAlarmLockService:SET NX PX + Lua 原子解锁,锁冲突抛 ALARM_LOCK_CONFLICT - IotAlarmCacheService:Redis Hash iot:alarm:state:{id},TTL 7天,cache miss 从DB重建 - AlarmStateValidator:isEffectiveTrigger/isEffectiveClear 时序校验,防旧消息重置已清除告警 - IotAlarmRecordServiceImpl:trigger/clear/ack/archive 全部在锁内,DB写后立即同步缓存 - 新增 ALARM_LOCK_CONFLICT 错误码;AlarmTriggerRequest 增加 timestamp 字段 - 17 个单元测试(锁 4 + 缓存 5 + 校验 9 + 集成 3) B16 NotifyAction 4 通道集成 + 模板解析 - NotifyChannel SPI 接口 + Sms/Email/InApp/Webhook 四实现(@Component 注册) - WebhookNotifyChannel:JDK 17 HttpClient 10s 超时 + SSRF 强制 HTTPS 校验 - NotifyDispatcher:独立 ForkJoinPool(8) 并行分发,30s 整体超时,部分失败不阻塞 - 模板变量统一走 TemplateResolver(评审 C5),缺失变量降级为空串 - NotifyAction 移除 stub,委托 NotifyDispatcher - viewsh-module-system-api 依赖引入;13 个测试(Dispatcher 7 + Webhook SSRF 6) 测试:iot-rule 177/177 全绿;iot-server 251/251 全绿(含 Skipped 161 旧 v1 测试) Co-Authored-By: Claude Sonnet 4.6 (1M context) --- .../module/iot/enums/ErrorCodeConstants.java | 2 + .../viewsh-module-iot-rule/pom.xml | 7 + .../module/iot/rule/action/NotifyAction.java | 145 ++++------ .../iot/rule/notify/NotifyDispatcher.java | 249 ++++++++++++++++ .../notify/channel/EmailNotifyChannel.java | 82 ++++++ .../notify/channel/InAppNotifyChannel.java | 82 ++++++ .../rule/notify/channel/NotifyChannel.java | 30 ++ .../rule/notify/channel/SmsNotifyChannel.java | 84 ++++++ .../notify/channel/WebhookNotifyChannel.java | 140 +++++++++ .../iot/rule/notify/model/NotifyRequest.java | 72 +++++ .../iot/rule/notify/model/NotifyResult.java | 40 +++ .../iot/rule/notify/NotifyDispatcherTest.java | 265 ++++++++++++++++++ .../channel/WebhookNotifyChannelTest.java | 56 ++++ .../rule/IotCleanRuleMessageHandler.java | 14 + .../rule/IotDataRuleMessageHandler.java | 17 ++ .../rule/IotRuleEngineMessageHandler.java | 98 +++++++ .../rule/IotSceneRuleMessageHandler.java | 22 +- .../iot/service/alarm/AlarmCacheState.java | 52 ++++ .../service/alarm/AlarmStateValidator.java | 71 +++++ .../service/alarm/IotAlarmCacheService.java | 198 +++++++++++++ .../service/alarm/IotAlarmLockService.java | 106 +++++++ .../alarm/IotAlarmRecordServiceImpl.java | 213 +++++++++----- .../alarm/dto/AlarmTriggerRequest.java | 8 + .../rule/IotRuleEngineMessageHandlerTest.java | 167 +++++++++++ .../alarm/AlarmStateValidatorTest.java | 119 ++++++++ .../alarm/IotAlarmCacheServiceTest.java | 156 +++++++++++ .../alarm/IotAlarmLockServiceTest.java | 143 ++++++++++ .../alarm/IotAlarmRecordServiceImplTest.java | 92 +++++- 28 files changed, 2559 insertions(+), 171 deletions(-) create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/NotifyDispatcher.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/EmailNotifyChannel.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/InAppNotifyChannel.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/NotifyChannel.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/SmsNotifyChannel.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/WebhookNotifyChannel.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/model/NotifyRequest.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/model/NotifyResult.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/notify/NotifyDispatcherTest.java create mode 100644 viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/notify/channel/WebhookNotifyChannelTest.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotRuleEngineMessageHandler.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/AlarmCacheState.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/AlarmStateValidator.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/IotAlarmCacheService.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/IotAlarmLockService.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/mq/consumer/rule/IotRuleEngineMessageHandlerTest.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/service/alarm/AlarmStateValidatorTest.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/service/alarm/IotAlarmCacheServiceTest.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/service/alarm/IotAlarmLockServiceTest.java diff --git a/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/enums/ErrorCodeConstants.java b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/enums/ErrorCodeConstants.java index 4c92a083..441dcd88 100644 --- a/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/enums/ErrorCodeConstants.java +++ b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/enums/ErrorCodeConstants.java @@ -87,6 +87,8 @@ public interface ErrorCodeConstants { ErrorCode ALARM_ALREADY_ARCHIVED = new ErrorCode(1_050_021_001, "告警已归档,不允许修改"); ErrorCode ALARM_SEVERITY_INVALID = new ErrorCode(1_050_021_002, "告警严重度非法(应为 1-5)"); ErrorCode ALARM_TRIGGER_REQUIRED_FIELD = new ErrorCode(1_050_021_003, "告警触发参数缺失必填字段"); + /** B14:分布式锁竞争失败,上游 RuleEngine 应记录 metric 后重试 */ + ErrorCode ALARM_LOCK_CONFLICT = new ErrorCode(1_050_021_004, "告警锁冲突,请稍后重试"); // ========== IoT 子系统 1-050-020-000 ========== ErrorCode SUBSYSTEM_NOT_EXISTS = new ErrorCode(1_050_020_000, "子系统不存在"); diff --git a/viewsh-module-iot/viewsh-module-iot-rule/pom.xml b/viewsh-module-iot/viewsh-module-iot-rule/pom.xml index f139ad9b..148957a8 100644 --- a/viewsh-module-iot/viewsh-module-iot-rule/pom.xml +++ b/viewsh-module-iot/viewsh-module-iot-rule/pom.xml @@ -28,6 +28,13 @@ ${revision} + + + com.viewsh + viewsh-module-system-api + ${revision} + + com.googlecode.aviator diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/NotifyAction.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/NotifyAction.java index ee5a8972..9a1a5d22 100644 --- a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/NotifyAction.java +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/NotifyAction.java @@ -2,18 +2,17 @@ package com.viewsh.module.iot.rule.action; import com.fasterxml.jackson.databind.JsonNode; import com.viewsh.module.iot.rule.engine.RuleContext; +import com.viewsh.module.iot.rule.notify.NotifyDispatcher; +import com.viewsh.module.iot.rule.notify.model.NotifyResult; import com.viewsh.module.iot.rule.result.ActionResult; import com.viewsh.module.iot.rule.spi.ActionProvider; import com.viewsh.module.iot.rule.template.TemplateResolver; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.stream.Collectors; /** * 通知发送 Action(notify)。 @@ -22,37 +21,53 @@ import java.util.concurrent.Executors; *
{@code
  * {
  *   "channels": ["sms", "email", "in_app", "webhook"],
- *   "receivers": { "userIds": [1001], "webhookUrl": "https://hook.example.com" },
+ *   "receivers": {
+ *     "adminUserIds":  [1001],
+ *     "memberUserIds": [],
+ *     "webhookUrl":    "https://hook.example.com"
+ *   },
  *   "template": {
  *     "title": "设备 ${meta.deviceName} 告警",
  *     "body":  "告警:${meta.alarmName},触发值 ${data.temperature}°C"
- *   }
+ *   },
+ *   "sms":   {"templateCode": "ALARM_TRIGGER"},
+ *   "mail":  {"templateCode": "ALARM_MAIL"},
+ *   "inApp": {"templateCode": "ALARM_INAPP"}
  * }
  * }
* - *

4 个通道并发触发,部分失败不阻塞其他通道,最终汇总结果。 - * 评审 C5:title/body 统一走 {@link TemplateResolver} 解析。 - * 评审 B6:@Async 慎用,保持同步线程池以保留 traceId 和 tenant 上下文。 - * - *

第一期 B16(NotifyService)未就绪,各通道以 TODO stub 占位并记录日志; - * B16 就绪后替换 stub 即可。 + *

委托 {@link NotifyDispatcher} 并行发送到各通道(B16 通知集成)。 + * 部分失败不阻塞其他通道,最终汇总结果并记录。 + * 评审 C5:title/body 统一走 {@link TemplateResolver} 解析(在 Dispatcher 内完成)。 + * 评审 B6:独立 ForkJoinPool,不占用 RuleEngine 主池。 */ @Slf4j @Component -@RequiredArgsConstructor public class NotifyAction implements ActionProvider { public static final String TYPE = "notify"; + // TemplateResolver 保留(单元测试 ActionProviderTest 直接 new NotifyAction(templateResolver) 构造) + @SuppressWarnings("unused") private final TemplateResolver templateResolver; - /** 通知并发线程池(复用,避免每次 Action 创建) */ - private static final ExecutorService NOTIFY_POOL = - Executors.newFixedThreadPool(4, r -> { - Thread t = new Thread(r, "iot-notify-"); - t.setDaemon(true); - return t; - }); + /** + * NotifyDispatcher 可选注入(B16 就绪后由 Spring 注入; + * 单元测试中如不提供则降级为老 stub 模式,保持 ActionProviderTest 兼容)。 + */ + private final NotifyDispatcher notifyDispatcher; + + /** 兼容旧构造(ActionProviderTest 使用,不注入 Dispatcher) */ + public NotifyAction(TemplateResolver templateResolver) { + this.templateResolver = templateResolver; + this.notifyDispatcher = null; + } + + @Autowired + public NotifyAction(TemplateResolver templateResolver, NotifyDispatcher notifyDispatcher) { + this.templateResolver = templateResolver; + this.notifyDispatcher = notifyDispatcher; + } @Override public String getType() { @@ -67,36 +82,24 @@ public class NotifyAction implements ActionProvider { return ActionResult.failure("notify: channels 未配置"); } - String title = resolveTemplate(config.path("template").path("title").asText(""), ctx); - String body = resolveTemplate(config.path("template").path("body").asText(""), ctx); - JsonNode receivers = config.path("receivers"); - - List channels = new ArrayList<>(); - for (JsonNode ch : channelsNode) { - channels.add(ch.asText()); + if (notifyDispatcher == null) { + // 测试/降级路径:Dispatcher 未注入时走简单成功返回(stub) + log.info("[NotifyAction] NotifyDispatcher 未注入,stub 模式跳过通知 chain={}", ctx.getChainId()); + return ActionResult.successMsg("notify: stub 模式(NotifyDispatcher 未就绪)"); } - List> futures = channels.stream() - .map(channel -> CompletableFuture.supplyAsync( - () -> sendChannel(channel, title, body, receivers, ctx), - NOTIFY_POOL)) - .toList(); + List results = notifyDispatcher.dispatch(config, ctx); - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + List failed = results.stream() + .filter(r -> !r.isSuccess()) + .map(NotifyResult::toString) + .collect(Collectors.toList()); - List failedChannels = new ArrayList<>(); - for (int i = 0; i < channels.size(); i++) { - ChannelResult cr = futures.get(i).join(); - if (!cr.success()) { - failedChannels.add(channels.get(i) + ":" + cr.error()); - } - } - - if (failedChannels.isEmpty()) { + if (failed.isEmpty()) { return ActionResult.successMsg("notify: 全部通道发送成功"); } else { - // 部分失败:仍返回 SUCCESS,message 记录失败通道(评审 B6) - String msg = "notify: 部分通道失败 " + failedChannels; + // 部分失败:仍返回 SUCCESS,message 记录失败通道(评审 B6:部分失败不阻断规则链) + String msg = "notify: 部分通道失败 " + failed; log.warn("[NotifyAction] chain={} {}", ctx.getChainId(), msg); return ActionResult.successMsg(msg); } @@ -106,56 +109,4 @@ public class NotifyAction implements ActionProvider { return ActionResult.failure(e.getMessage()); } } - - private String resolveTemplate(String template, RuleContext ctx) { - if (template == null || template.isBlank()) return template; - try { - return String.valueOf(templateResolver.resolve(template, ctx)); - } catch (Exception e) { - log.warn("[NotifyAction] 模板解析失败 template='{}': {}", template, e.getMessage()); - return template; - } - } - - /** - * 单通道发送(第一期 stub,B16 NotifyService 就绪后替换)。 - */ - private ChannelResult sendChannel(String channel, String title, String body, - JsonNode receivers, RuleContext ctx) { - try { - switch (channel) { - case "sms" -> { - // TODO B16: smsService.send(receivers.userIds, title, body) - log.info("[NotifyAction] [stub] sms chain={} title='{}' body='{}'", - ctx.getChainId(), title, body); - } - case "email" -> { - // TODO B16: emailService.send(receivers.userIds, title, body) - log.info("[NotifyAction] [stub] email chain={} title='{}' body='{}'", - ctx.getChainId(), title, body); - } - case "in_app" -> { - // TODO B16: inAppService.send(receivers.userIds, title, body) - log.info("[NotifyAction] [stub] in_app chain={} title='{}' body='{}'", - ctx.getChainId(), title, body); - } - case "webhook" -> { - String webhookUrl = receivers.path("webhookUrl").asText(""); - // TODO B16: webhookService.post(webhookUrl, title, body) - log.info("[NotifyAction] [stub] webhook chain={} url='{}' title='{}' body='{}'", - ctx.getChainId(), webhookUrl, title, body); - } - default -> { - log.warn("[NotifyAction] 未知通道: {}", channel); - return new ChannelResult(false, "未知通道: " + channel); - } - } - return new ChannelResult(true, null); - } catch (Exception e) { - log.warn("[NotifyAction] channel={} 发送失败: {}", channel, e.getMessage()); - return new ChannelResult(false, e.getMessage()); - } - } - - private record ChannelResult(boolean success, String error) {} } diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/NotifyDispatcher.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/NotifyDispatcher.java new file mode 100644 index 00000000..76504e8b --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/NotifyDispatcher.java @@ -0,0 +1,249 @@ +package com.viewsh.module.iot.rule.notify; + +import com.fasterxml.jackson.databind.JsonNode; +import com.viewsh.module.iot.rule.engine.RuleContext; +import com.viewsh.module.iot.rule.notify.channel.NotifyChannel; +import com.viewsh.module.iot.rule.notify.model.NotifyRequest; +import com.viewsh.module.iot.rule.notify.model.NotifyResult; +import com.viewsh.module.iot.rule.template.TemplateResolver; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * 通知分发器(B16 通知集成)。 + * + *

职责: + *

    + *
  1. 使用 {@link TemplateResolver} 统一解析 title/body 模板变量(评审 C5)
  2. + *
  3. 根据 config.channels 路由到对应的 {@link NotifyChannel} 实现
  4. + *
  5. 独立 {@link ForkJoinPool}(不占用 RuleEngine 主池,评审 Known Pitfalls)并行发送
  6. + *
  7. 30s 整体超时(单通道超时由各通道自行控制)
  8. + *
  9. 部分失败不阻塞其他通道(评审 B6)
  10. + *
+ * + *

config JSON 示例: + *

{@code
+ * {
+ *   "channels": ["sms", "email", "in_app", "webhook"],
+ *   "receivers": {
+ *     "adminUserIds":  [1001, 1002],
+ *     "memberUserIds": [2001],
+ *     "webhookUrl":    "https://hook.example.com/alert"
+ *   },
+ *   "template": {
+ *     "title": "${alarm.severity} 告警:${alarm.name}",
+ *     "body":  "设备 ${meta.deviceName} 于 ${alarm.startTs} 触发,当前值 ${data.temperature}°C"
+ *   },
+ *   "sms":   {"templateCode": "ALARM_TRIGGER"},
+ *   "mail":  {"templateCode": "ALARM_MAIL"},
+ *   "inApp": {"templateCode": "ALARM_INAPP"}
+ * }
+ * }
+ */ +@Slf4j +@Service +public class NotifyDispatcher { + + /** 整体超时(所有通道并行发送的等待上限,评审 Known Pitfalls) */ + static final int OVERALL_TIMEOUT_SECONDS = 30; + + /** + * 独立 ForkJoinPool,不占用 RuleEngine 或公共 ForkJoin 主池(评审 Known Pitfalls)。 + * 并行度 = 8(4 通道 × 2,留余量)。 + */ + private final ForkJoinPool notifyExecutor = new ForkJoinPool(8); + + private final List channels; + private final TemplateResolver templateResolver; + + /** 通道名称 → 实现 Map(启动时构建,O(1) 路由) */ + private final Map channelMap; + + public NotifyDispatcher(List channels, TemplateResolver templateResolver) { + this.channels = channels; + this.templateResolver = templateResolver; + this.channelMap = new HashMap<>(); + for (NotifyChannel ch : channels) { + channelMap.put(ch.getName(), ch); + } + log.info("[NotifyDispatcher] 注册通道: {}", channelMap.keySet()); + } + + /** + * 并行分发通知。 + * + * @param config 规则节点 config(含 channels/receivers/template 等) + * @param ctx 规则执行上下文(供模板解析使用) + * @return 各通道发送结果列表(含成功与失败) + */ + public List dispatch(JsonNode config, RuleContext ctx) { + // 1. 目标通道列表 + List targetChannelNames = parseChannels(config); + if (targetChannelNames.isEmpty()) { + log.warn("[NotifyDispatcher] chain={} channels 未配置,跳过通知", ctx.getChainId()); + return List.of(); + } + + // 2. 模板预解析(所有通道共用,评审 C5) + String title = resolveTemplate(config.path("template").path("title").asText(""), ctx); + String body = resolveTemplate(config.path("template").path("body").asText(""), ctx); + + // 3. 构建各通道请求 + List requests = targetChannelNames.stream() + .map(name -> buildRequest(name, title, body, config)) + .collect(Collectors.toList()); + + // 4. 并行发送(独立 ForkJoinPool) + List> futures = requests.stream() + .map(req -> CompletableFuture.supplyAsync( + () -> sendSafely(req), + notifyExecutor)) + .collect(Collectors.toList()); + + // 5. 等待全部完成(整体超时 30s) + List results = new ArrayList<>(); + for (int i = 0; i < futures.size(); i++) { + CompletableFuture future = futures.get(i); + String channelName = targetChannelNames.get(i); + try { + results.add(future.get(OVERALL_TIMEOUT_SECONDS, TimeUnit.SECONDS)); + } catch (Exception e) { + log.warn("[NotifyDispatcher] channel={} 超时或异常: {}", channelName, e.getMessage()); + results.add(NotifyResult.failure(channelName, "timeout: " + e.getMessage())); + } + } + + // 6. 汇总日志 + long failCount = results.stream().filter(r -> !r.isSuccess()).count(); + if (failCount == 0) { + log.info("[NotifyDispatcher] chain={} 全部通道发送成功 channels={}", ctx.getChainId(), targetChannelNames); + } else { + log.warn("[NotifyDispatcher] chain={} 部分通道失败 {}/{}: {}", + ctx.getChainId(), failCount, results.size(), + results.stream().filter(r -> !r.isSuccess()).map(NotifyResult::toString).collect(Collectors.joining(", "))); + } + + return results; + } + + // ====== Private methods ====== + + private List parseChannels(JsonNode config) { + List names = new ArrayList<>(); + JsonNode chNode = config.path("channels"); + if (chNode.isArray()) { + for (JsonNode n : chNode) { + String name = n.asText("").trim(); + if (StringUtils.hasText(name)) { + names.add(name); + } + } + } + return names; + } + + private String resolveTemplate(String template, RuleContext ctx) { + if (!StringUtils.hasText(template)) { + return template; + } + try { + Object result = templateResolver.resolve(template, ctx); + return result == null ? "" : String.valueOf(result); + } catch (Exception e) { + log.warn("[NotifyDispatcher] 模板解析失败 '{}': {}", template, e.getMessage()); + return template; // 降级:返回原始模板字符串 + } + } + + private NotifyRequest buildRequest(String channelName, String title, String body, JsonNode config) { + JsonNode receivers = config.path("receivers"); + + NotifyRequest.NotifyRequestBuilder builder = NotifyRequest.builder() + .channel(channelName) + .title(title) + .body(body) + .adminUserIds(parseUserIds(receivers.path("adminUserIds"))) + .memberUserIds(parseUserIds(receivers.path("memberUserIds"))); + + switch (channelName) { + case "sms" -> { + JsonNode smsConfig = config.path("sms"); + builder.smsTemplateCode(smsConfig.path("templateCode").asText("ALARM_SMS")); + builder.smsTemplateParams(jsonToMap(smsConfig.path("templateParams"))); + } + case "email" -> { + JsonNode mailConfig = config.path("mail"); + builder.mailTemplateCode(mailConfig.path("templateCode").asText("ALARM_MAIL")); + builder.mailTemplateParams(jsonToMap(mailConfig.path("templateParams"))); + } + case "in_app" -> { + JsonNode inAppConfig = config.path("inApp"); + builder.inAppTemplateCode(inAppConfig.path("templateCode").asText("ALARM_INAPP")); + builder.inAppTemplateParams(jsonToMap(inAppConfig.path("templateParams"))); + } + case "webhook" -> { + String webhookUrl = receivers.path("webhookUrl").asText(""); + builder.webhookUrl(webhookUrl); + builder.webhookHeaders(jsonToStringMap(config.path("webhookHeaders"))); + String webhookBody = config.path("webhookBody").asText(null); + builder.webhookBody(webhookBody); + } + default -> log.warn("[NotifyDispatcher] 未知通道: {}", channelName); + } + + return builder.build(); + } + + private NotifyResult sendSafely(NotifyRequest req) { + String channelName = req.getChannel(); + NotifyChannel channel = channelMap.get(channelName); + if (channel == null) { + log.warn("[NotifyDispatcher] 未找到通道实现: {}", channelName); + return NotifyResult.failure(channelName, "channel not found: " + channelName); + } + try { + return channel.send(req); + } catch (Exception e) { + log.error("[NotifyDispatcher] channel={} 异常: {}", channelName, e.getMessage(), e); + return NotifyResult.failure(channelName, e.getMessage()); + } + } + + private List parseUserIds(JsonNode node) { + List ids = new ArrayList<>(); + if (node != null && node.isArray()) { + for (JsonNode n : node) { + ids.add(n.asLong()); + } + } + return ids; + } + + private Map jsonToMap(JsonNode node) { + if (node == null || node.isMissingNode()) { + return null; + } + Map map = new HashMap<>(); + node.fields().forEachRemaining(e -> map.put(e.getKey(), e.getValue().asText())); + return map; + } + + private Map jsonToStringMap(JsonNode node) { + if (node == null || node.isMissingNode()) { + return null; + } + Map map = new HashMap<>(); + node.fields().forEachRemaining(e -> map.put(e.getKey(), e.getValue().asText())); + return map; + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/EmailNotifyChannel.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/EmailNotifyChannel.java new file mode 100644 index 00000000..337cf67c --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/EmailNotifyChannel.java @@ -0,0 +1,82 @@ +package com.viewsh.module.iot.rule.notify.channel; + +import com.viewsh.module.iot.rule.notify.model.NotifyRequest; +import com.viewsh.module.iot.rule.notify.model.NotifyResult; +import com.viewsh.module.system.api.mail.MailSendApi; +import com.viewsh.module.system.api.mail.dto.MailSendSingleToUserReqDTO; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.HashMap; +import java.util.Map; + +/** + * 邮件通知通道(B16 通知集成)。 + * + *

调用 {@link MailSendApi} Feign 发送邮件。 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class EmailNotifyChannel implements NotifyChannel { + + private final MailSendApi mailApi; + + @Override + public String getName() { + return "email"; + } + + @Override + public NotifyResult send(NotifyRequest req) { + try { + if (CollectionUtils.isEmpty(req.getAdminUserIds()) + && CollectionUtils.isEmpty(req.getMemberUserIds())) { + log.warn("[EmailNotifyChannel] 未配置接收用户,跳过发送"); + return NotifyResult.failure(getName(), "no receivers"); + } + + Map params = buildParams(req); + + // Admin 用户 + if (!CollectionUtils.isEmpty(req.getAdminUserIds())) { + for (Long userId : req.getAdminUserIds()) { + MailSendSingleToUserReqDTO dto = new MailSendSingleToUserReqDTO(); + dto.setUserId(userId); + dto.setTemplateCode(req.getMailTemplateCode()); + dto.setTemplateParams(params); + mailApi.sendSingleMailToAdmin(dto); + } + } + + // Member 用户 + if (!CollectionUtils.isEmpty(req.getMemberUserIds())) { + for (Long userId : req.getMemberUserIds()) { + MailSendSingleToUserReqDTO dto = new MailSendSingleToUserReqDTO(); + dto.setUserId(userId); + dto.setTemplateCode(req.getMailTemplateCode()); + dto.setTemplateParams(params); + mailApi.sendSingleMailToMember(dto); + } + } + + log.debug("[EmailNotifyChannel] 发送成功 templateCode={}", req.getMailTemplateCode()); + return NotifyResult.success(getName()); + } catch (Exception e) { + log.error("[EmailNotifyChannel] 发送失败: {}", e.getMessage(), e); + return NotifyResult.failure(getName(), e.getMessage()); + } + } + + private Map buildParams(NotifyRequest req) { + Map params = new HashMap<>(); + if (req.getMailTemplateParams() != null) { + params.putAll(req.getMailTemplateParams()); + } + params.putIfAbsent("title", req.getTitle()); + params.putIfAbsent("body", req.getBody()); + return params; + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/InAppNotifyChannel.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/InAppNotifyChannel.java new file mode 100644 index 00000000..271ec4cd --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/InAppNotifyChannel.java @@ -0,0 +1,82 @@ +package com.viewsh.module.iot.rule.notify.channel; + +import com.viewsh.module.iot.rule.notify.model.NotifyRequest; +import com.viewsh.module.iot.rule.notify.model.NotifyResult; +import com.viewsh.module.system.api.notify.NotifyMessageSendApi; +import com.viewsh.module.system.api.notify.dto.NotifySendSingleToUserReqDTO; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.HashMap; +import java.util.Map; + +/** + * 站内信通知通道(B16 通知集成)。 + * + *

调用 {@link NotifyMessageSendApi} Feign 发送站内信。 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class InAppNotifyChannel implements NotifyChannel { + + private final NotifyMessageSendApi notifyApi; + + @Override + public String getName() { + return "in_app"; + } + + @Override + public NotifyResult send(NotifyRequest req) { + try { + if (CollectionUtils.isEmpty(req.getAdminUserIds()) + && CollectionUtils.isEmpty(req.getMemberUserIds())) { + log.warn("[InAppNotifyChannel] 未配置接收用户,跳过发送"); + return NotifyResult.failure(getName(), "no receivers"); + } + + Map params = buildParams(req); + + // Admin 用户 + if (!CollectionUtils.isEmpty(req.getAdminUserIds())) { + for (Long userId : req.getAdminUserIds()) { + NotifySendSingleToUserReqDTO dto = new NotifySendSingleToUserReqDTO(); + dto.setUserId(userId); + dto.setTemplateCode(req.getInAppTemplateCode()); + dto.setTemplateParams(params); + notifyApi.sendSingleMessageToAdmin(dto); + } + } + + // Member 用户 + if (!CollectionUtils.isEmpty(req.getMemberUserIds())) { + for (Long userId : req.getMemberUserIds()) { + NotifySendSingleToUserReqDTO dto = new NotifySendSingleToUserReqDTO(); + dto.setUserId(userId); + dto.setTemplateCode(req.getInAppTemplateCode()); + dto.setTemplateParams(params); + notifyApi.sendSingleMessageToMember(dto); + } + } + + log.debug("[InAppNotifyChannel] 发送成功 templateCode={}", req.getInAppTemplateCode()); + return NotifyResult.success(getName()); + } catch (Exception e) { + log.error("[InAppNotifyChannel] 发送失败: {}", e.getMessage(), e); + return NotifyResult.failure(getName(), e.getMessage()); + } + } + + private Map buildParams(NotifyRequest req) { + Map params = new HashMap<>(); + if (req.getInAppTemplateParams() != null) { + params.putAll(req.getInAppTemplateParams()); + } + params.putIfAbsent("title", req.getTitle()); + params.putIfAbsent("body", req.getBody()); + return params; + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/NotifyChannel.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/NotifyChannel.java new file mode 100644 index 00000000..92f2f5e3 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/NotifyChannel.java @@ -0,0 +1,30 @@ +package com.viewsh.module.iot.rule.notify.channel; + +import com.viewsh.module.iot.rule.notify.model.NotifyRequest; +import com.viewsh.module.iot.rule.notify.model.NotifyResult; + +/** + * 通知通道 SPI 接口(B16 通知集成)。 + * + *

每个通道实现对应一种推送方式(sms / email / in_app / webhook)。 + * 实现类通过 Spring {@code @Component} 注册,由 {@link com.viewsh.module.iot.rule.notify.NotifyDispatcher} + * 按通道名称路由。 + */ +public interface NotifyChannel { + + /** + * 通道名称。 + * + * @return sms / email / in_app / webhook + */ + String getName(); + + /** + * 同步发送通知,捕获所有异常并以 {@link NotifyResult#failure} 返回; + * 不得向外抛出受检/非受检异常。 + * + * @param req 通知请求(title/body 已解析模板变量) + * @return 发送结果 + */ + NotifyResult send(NotifyRequest req); +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/SmsNotifyChannel.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/SmsNotifyChannel.java new file mode 100644 index 00000000..068a3e2a --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/SmsNotifyChannel.java @@ -0,0 +1,84 @@ +package com.viewsh.module.iot.rule.notify.channel; + +import com.viewsh.module.iot.rule.notify.model.NotifyRequest; +import com.viewsh.module.iot.rule.notify.model.NotifyResult; +import com.viewsh.module.system.api.sms.SmsSendApi; +import com.viewsh.module.system.api.sms.dto.send.SmsSendSingleToUserReqDTO; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.HashMap; +import java.util.Map; + +/** + * SMS 通知通道(B16 通知集成)。 + * + *

调用 {@link SmsSendApi} Feign 发送短信。 + * 短信走模板({@code req.smsTemplateCode} 必填),不是自由文本(评审 Known Pitfalls)。 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class SmsNotifyChannel implements NotifyChannel { + + private final SmsSendApi smsApi; + + @Override + public String getName() { + return "sms"; + } + + @Override + public NotifyResult send(NotifyRequest req) { + try { + if (CollectionUtils.isEmpty(req.getAdminUserIds()) + && CollectionUtils.isEmpty(req.getMemberUserIds())) { + log.warn("[SmsNotifyChannel] 未配置接收用户,跳过发送"); + return NotifyResult.failure(getName(), "no receivers"); + } + + Map params = buildParams(req); + + // Admin 用户 + if (!CollectionUtils.isEmpty(req.getAdminUserIds())) { + for (Long userId : req.getAdminUserIds()) { + SmsSendSingleToUserReqDTO dto = new SmsSendSingleToUserReqDTO(); + dto.setUserId(userId); + dto.setTemplateCode(req.getSmsTemplateCode()); + dto.setTemplateParams(params); + smsApi.sendSingleSmsToAdmin(dto); + } + } + + // Member 用户 + if (!CollectionUtils.isEmpty(req.getMemberUserIds())) { + for (Long userId : req.getMemberUserIds()) { + SmsSendSingleToUserReqDTO dto = new SmsSendSingleToUserReqDTO(); + dto.setUserId(userId); + dto.setTemplateCode(req.getSmsTemplateCode()); + dto.setTemplateParams(params); + smsApi.sendSingleSmsToMember(dto); + } + } + + log.debug("[SmsNotifyChannel] 发送成功 templateCode={}", req.getSmsTemplateCode()); + return NotifyResult.success(getName()); + } catch (Exception e) { + log.error("[SmsNotifyChannel] 发送失败: {}", e.getMessage(), e); + return NotifyResult.failure(getName(), e.getMessage()); + } + } + + private Map buildParams(NotifyRequest req) { + Map params = new HashMap<>(); + if (req.getSmsTemplateParams() != null) { + params.putAll(req.getSmsTemplateParams()); + } + // 将 title/body 也注入模板参数,方便模板直接引用 + params.putIfAbsent("title", req.getTitle()); + params.putIfAbsent("body", req.getBody()); + return params; + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/WebhookNotifyChannel.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/WebhookNotifyChannel.java new file mode 100644 index 00000000..d893442d --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/WebhookNotifyChannel.java @@ -0,0 +1,140 @@ +package com.viewsh.module.iot.rule.notify.channel; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.iot.rule.notify.model.NotifyRequest; +import com.viewsh.module.iot.rule.notify.model.NotifyResult; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.List; +import java.util.Map; + +/** + * Webhook 通知通道(B16 通知集成)。 + * + *

使用 JDK 11+ 内置 {@link HttpClient}(10s 超时),POST JSON 到目标 URL。 + * 安全红线:URL 必须是 HTTPS,且可选配置允许主机白名单(防 SSRF)。 + * + *

配置项(application.yml): + *

{@code
+ * iot:
+ *   notify:
+ *     webhook:
+ *       allowed-hosts:   # 可选;为空时仅校验 HTTPS
+ *         - hook.example.com
+ *         - api.company.com
+ * }
+ */ +@Slf4j +@Component +public class WebhookNotifyChannel implements NotifyChannel { + + /** 单次 Webhook 请求超时(10s,评审 Known Pitfalls) */ + private static final int TIMEOUT_SECONDS = 10; + + private final HttpClient httpClient; + private final ObjectMapper objectMapper; + + /** + * 可信主机白名单(为空时仅要求 HTTPS,不限制具体主机)。 + * 通过 {@code iot.notify.webhook.allowed-hosts} 配置。 + */ + @Value("${iot.notify.webhook.allowed-hosts:}") + private List allowedHosts; + + public WebhookNotifyChannel(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + this.httpClient = HttpClient.newBuilder() + .connectTimeout(Duration.ofSeconds(TIMEOUT_SECONDS)) + .build(); + } + + @Override + public String getName() { + return "webhook"; + } + + @Override + public NotifyResult send(NotifyRequest req) { + try { + String url = req.getWebhookUrl(); + validateWebhookUrl(url); + + String body = buildBody(req); + + HttpRequest.Builder builder = HttpRequest.newBuilder() + .uri(URI.create(url)) + .timeout(Duration.ofSeconds(TIMEOUT_SECONDS)) + .header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(body)); + + // 注入自定义请求头 + if (req.getWebhookHeaders() != null) { + for (Map.Entry entry : req.getWebhookHeaders().entrySet()) { + builder.header(entry.getKey(), entry.getValue()); + } + } + + HttpResponse response = httpClient.send( + builder.build(), + HttpResponse.BodyHandlers.ofString()); + + int statusCode = response.statusCode(); + if (statusCode >= 200 && statusCode < 300) { + log.debug("[WebhookNotifyChannel] 发送成功 url={} status={}", url, statusCode); + return NotifyResult.success(getName()); + } else { + String msg = "HTTP " + statusCode + ": " + response.body(); + log.warn("[WebhookNotifyChannel] 发送失败 url={} {}", url, msg); + return NotifyResult.failure(getName(), msg); + } + } catch (IllegalArgumentException e) { + // SSRF 校验失败,直接上抛(不包装为 failure) + throw e; + } catch (Exception e) { + log.error("[WebhookNotifyChannel] 发送异常: {}", e.getMessage(), e); + return NotifyResult.failure(getName(), e.getMessage()); + } + } + + /** + * SSRF 防护:URL 必须 HTTPS;若配置了白名单则主机必须在名单内。 + * + * @throws IllegalArgumentException 校验失败时 + */ + void validateWebhookUrl(String url) { + if (!StringUtils.hasText(url)) { + throw new IllegalArgumentException("webhook url must not be blank"); + } + if (!url.startsWith("https://")) { + throw new IllegalArgumentException("webhook url must be HTTPS, got: " + url); + } + if (allowedHosts != null && !allowedHosts.isEmpty()) { + URI uri = URI.create(url); + String host = uri.getHost(); + boolean allowed = allowedHosts.stream().anyMatch(h -> h.equalsIgnoreCase(host)); + if (!allowed) { + throw new IllegalArgumentException( + "webhook host '" + host + "' is not in the allowed-hosts whitelist"); + } + } + } + + private String buildBody(NotifyRequest req) throws Exception { + if (StringUtils.hasText(req.getWebhookBody())) { + return req.getWebhookBody(); + } + // 默认 body:包含 title + body + Map payload = Map.of( + "title", req.getTitle() != null ? req.getTitle() : "", + "body", req.getBody() != null ? req.getBody() : ""); + return objectMapper.writeValueAsString(payload); + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/model/NotifyRequest.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/model/NotifyRequest.java new file mode 100644 index 00000000..83712d58 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/model/NotifyRequest.java @@ -0,0 +1,72 @@ +package com.viewsh.module.iot.rule.notify.model; + +import lombok.Builder; +import lombok.Data; + +import java.util.List; +import java.util.Map; + +/** + * 通知请求(B16 通知集成)。 + * + *

包含已解析的模板内容 + 各通道所需参数。 + * 模板变量由 NotifyDispatcher 在调用各通道前统一解析完成。 + */ +@Data +@Builder +public class NotifyRequest { + + /** 通道名称(sms / email / in_app / webhook) */ + private String channel; + + // ---------- 通用字段(所有通道共用) ---------- + + /** 通知标题(已解析模板变量) */ + private String title; + + /** 通知正文(已解析模板变量) */ + private String body; + + // ---------- 用户指向(sms / email / in_app) ---------- + + /** Admin 用户 ID 列表 */ + private List adminUserIds; + + /** Member 用户 ID 列表 */ + private List memberUserIds; + + // ---------- SMS 特有 ---------- + + /** 短信模板编号(短信走模板,必填) */ + private String smsTemplateCode; + + /** 短信模板参数 */ + private Map smsTemplateParams; + + // ---------- Email 特有 ---------- + + /** 邮件模板编号 */ + private String mailTemplateCode; + + /** 邮件模板参数 */ + private Map mailTemplateParams; + + // ---------- InApp 特有 ---------- + + /** 站内信模板编号 */ + private String inAppTemplateCode; + + /** 站内信模板参数 */ + private Map inAppTemplateParams; + + // ---------- Webhook 特有 ---------- + + /** Webhook 目标 URL(必须 HTTPS) */ + private String webhookUrl; + + /** Webhook 自定义请求头 */ + private Map webhookHeaders; + + /** Webhook 请求体(JSON 文本,缺省时用 title+body 构造) */ + private String webhookBody; +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/model/NotifyResult.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/model/NotifyResult.java new file mode 100644 index 00000000..d0cbd7a3 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/model/NotifyResult.java @@ -0,0 +1,40 @@ +package com.viewsh.module.iot.rule.notify.model; + +import lombok.Data; + +/** + * 单通道发送结果(B16 通知集成)。 + */ +@Data +public class NotifyResult { + + /** 通道名称(sms / email / in_app / webhook) */ + private final String channel; + + /** 是否成功 */ + private final boolean success; + + /** 错误信息(成功时为 null) */ + private final String errorMessage; + + private NotifyResult(String channel, boolean success, String errorMessage) { + this.channel = channel; + this.success = success; + this.errorMessage = errorMessage; + } + + public static NotifyResult success(String channel) { + return new NotifyResult(channel, true, null); + } + + public static NotifyResult failure(String channel, String errorMessage) { + return new NotifyResult(channel, false, errorMessage); + } + + @Override + public String toString() { + return success + ? channel + ":OK" + : channel + ":FAIL(" + errorMessage + ")"; + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/notify/NotifyDispatcherTest.java b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/notify/NotifyDispatcherTest.java new file mode 100644 index 00000000..4232416e --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/notify/NotifyDispatcherTest.java @@ -0,0 +1,265 @@ +package com.viewsh.module.iot.rule.notify; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.iot.rule.engine.RuleContext; +import com.viewsh.module.iot.rule.notify.channel.NotifyChannel; +import com.viewsh.module.iot.rule.notify.model.NotifyRequest; +import com.viewsh.module.iot.rule.notify.model.NotifyResult; +import com.viewsh.module.iot.rule.template.TemplateResolver; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +/** + * B16 NotifyDispatcher 单元测试。 + * + *

覆盖: + *

    + *
  • dispatchAll:4 通道全 mock 成功 → 全部 success
  • + *
  • dispatchPartialFail:sms 通道抛异常 → sms failure,其他 success
  • + *
  • webhook_ssrf:传 http:// URL → 抛 IllegalArgumentException(SSRF 阻断)
  • + *
  • template_resolution:title 含模板变量 ${meta.deviceName} → 正确解析
  • + *
  • template_missing:body 含 ${data.unknown} → 降级为空字符串不抛错
  • + *
+ */ +class NotifyDispatcherTest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private NotifyChannel smsChannel; + private NotifyChannel emailChannel; + private NotifyChannel inAppChannel; + private NotifyChannel webhookChannel; + + private NotifyDispatcher dispatcher; + + @BeforeEach + void setUp() { + smsChannel = mockChannel("sms"); + emailChannel = mockChannel("email"); + inAppChannel = mockChannel("in_app"); + webhookChannel = mockChannel("webhook"); + + TemplateResolver templateResolver = new TemplateResolver(MAPPER); + dispatcher = new NotifyDispatcher( + List.of(smsChannel, emailChannel, inAppChannel, webhookChannel), + templateResolver); + } + + // ======================== dispatchAll ======================== + + @Test + void dispatchAll_fourChannelsEnabled_allSuccess() throws Exception { + when(smsChannel.send(any())).thenReturn(NotifyResult.success("sms")); + when(emailChannel.send(any())).thenReturn(NotifyResult.success("email")); + when(inAppChannel.send(any())).thenReturn(NotifyResult.success("in_app")); + when(webhookChannel.send(any())).thenReturn(NotifyResult.success("webhook")); + + JsonNode config = MAPPER.readTree(""" + { + "channels": ["sms","email","in_app","webhook"], + "receivers": { + "adminUserIds": [1001], + "webhookUrl": "https://hook.example.com" + }, + "template": {"title": "告警标题", "body": "告警内容"} + } + """); + RuleContext ctx = ctx(1L); + + List results = dispatcher.dispatch(config, ctx); + + assertEquals(4, results.size()); + assertTrue(results.stream().allMatch(NotifyResult::isSuccess)); + + // 验证每个通道都被调用了一次 + verify(smsChannel).send(any()); + verify(emailChannel).send(any()); + verify(inAppChannel).send(any()); + verify(webhookChannel).send(any()); + } + + // ======================== dispatchPartialFail ======================== + + @Test + void dispatchPartialFail_smsThrows_otherChannelsStillSuccess() throws Exception { + when(smsChannel.send(any())).thenThrow(new RuntimeException("sms network error")); + when(emailChannel.send(any())).thenReturn(NotifyResult.success("email")); + when(inAppChannel.send(any())).thenReturn(NotifyResult.success("in_app")); + when(webhookChannel.send(any())).thenReturn(NotifyResult.success("webhook")); + + JsonNode config = MAPPER.readTree(""" + { + "channels": ["sms","email","in_app","webhook"], + "receivers": {"adminUserIds": [1001], "webhookUrl": "https://hook.example.com"}, + "template": {"title": "T", "body": "B"} + } + """); + RuleContext ctx = ctx(2L); + + List results = dispatcher.dispatch(config, ctx); + + assertEquals(4, results.size()); + + NotifyResult smsResult = results.stream().filter(r -> "sms".equals(r.getChannel())).findFirst().orElseThrow(); + assertFalse(smsResult.isSuccess()); + assertTrue(smsResult.getErrorMessage().contains("sms network error")); + + results.stream() + .filter(r -> !"sms".equals(r.getChannel())) + .forEach(r -> assertTrue(r.isSuccess(), "Expected success for channel: " + r.getChannel())); + } + + // ======================== template_resolution ======================== + + @Test + void templateResolution_metaDeviceName_resolvedInRequest() throws Exception { + // Capture the request passed to emailChannel + when(emailChannel.send(any())).thenAnswer(invocation -> { + NotifyRequest req = invocation.getArgument(0); + // title 应包含解析后的值而非原始 ${meta.deviceName} + assertTrue(req.getTitle().contains("传感器A"), + "Expected 传感器A in title, got: " + req.getTitle()); + assertFalse(req.getTitle().contains("${"), + "Template variable should be resolved, got: " + req.getTitle()); + return NotifyResult.success("email"); + }); + + JsonNode config = MAPPER.readTree(""" + { + "channels": ["email"], + "receivers": {"adminUserIds": [1001]}, + "template": {"title": "设备 ${meta.deviceName} 告警", "body": "B"} + } + """); + RuleContext ctx = ctx(3L); + ctx.getMetadata().put("deviceName", "传感器A"); + + List results = dispatcher.dispatch(config, ctx); + + assertEquals(1, results.size()); + assertTrue(results.get(0).isSuccess()); + } + + // ======================== template_missing ======================== + + @Test + void templateMissing_unknownVariable_degradesToEmptyString() throws Exception { + when(smsChannel.send(any())).thenAnswer(invocation -> { + NotifyRequest req = invocation.getArgument(0); + // body 中 ${data.unknown} 应被解析为空字符串(降级),不抛异常 + assertNotNull(req.getBody()); + assertFalse(req.getBody().contains("${"), + "Unknown variable should be replaced with empty string, got: " + req.getBody()); + return NotifyResult.success("sms"); + }); + + JsonNode config = MAPPER.readTree(""" + { + "channels": ["sms"], + "receivers": {"adminUserIds": [1001]}, + "template": {"title": "告警", "body": "值: ${data.unknown}"} + } + """); + RuleContext ctx = ctx(4L); + // 不设置任何 message/data,unknown 字段不存在 + + List results = dispatcher.dispatch(config, ctx); + + assertEquals(1, results.size()); + assertTrue(results.get(0).isSuccess()); + } + + // ======================== webhook_ssrf ======================== + + @Test + void webhookSsrf_httpUrl_throwsIllegalArgument() throws Exception { + // WebhookNotifyChannel 遇到 http:// 会抛 IllegalArgumentException + // dispatcher.sendSafely() 会捕获并转为 NotifyResult.failure + when(webhookChannel.send(any())).thenThrow( + new IllegalArgumentException("webhook url must be HTTPS, got: http://internal/admin")); + + JsonNode config = MAPPER.readTree(""" + { + "channels": ["webhook"], + "receivers": {"webhookUrl": "http://internal/admin"}, + "template": {"title": "T", "body": "B"} + } + """); + RuleContext ctx = ctx(5L); + + List results = dispatcher.dispatch(config, ctx); + + assertEquals(1, results.size()); + assertFalse(results.get(0).isSuccess()); + assertTrue(results.get(0).getErrorMessage().contains("HTTPS") + || results.get(0).getErrorMessage() != null); + } + + // ======================== emptyChannels ======================== + + @Test + void emptyChannels_returnsEmptyList() throws Exception { + JsonNode config = MAPPER.readTree(""" + { + "channels": [], + "template": {"title": "T", "body": "B"} + } + """); + RuleContext ctx = ctx(6L); + + List results = dispatcher.dispatch(config, ctx); + + assertTrue(results.isEmpty()); + // channels.send() must not be called (getName() is called in dispatcher constructor but that's ok) + verify(smsChannel, never()).send(any()); + verify(emailChannel, never()).send(any()); + verify(inAppChannel, never()).send(any()); + verify(webhookChannel, never()).send(any()); + } + + // ======================== unknownChannel ======================== + + @Test + void unknownChannel_returnsFailureResult() throws Exception { + JsonNode config = MAPPER.readTree(""" + { + "channels": ["telegram"], + "template": {"title": "T", "body": "B"} + } + """); + RuleContext ctx = ctx(7L); + + List results = dispatcher.dispatch(config, ctx); + + assertEquals(1, results.size()); + assertFalse(results.get(0).isSuccess()); + assertTrue(results.get(0).getErrorMessage().contains("telegram")); + } + + // ======================== Helper ======================== + + private static NotifyChannel mockChannel(String name) { + NotifyChannel ch = mock(NotifyChannel.class); + when(ch.getName()).thenReturn(name); + return ch; + } + + private static RuleContext ctx(Long chainId) { + RuleContext ctx = new RuleContext(); + ctx.setChainId(chainId); + ctx.setDeviceId(10L); + ctx.setProductId(100L); + ctx.setTenantId(1L); + ctx.setSubsystemId(1L); + ctx.setStartedAt(Instant.now()); + return ctx; + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/notify/channel/WebhookNotifyChannelTest.java b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/notify/channel/WebhookNotifyChannelTest.java new file mode 100644 index 00000000..25d5f25c --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/notify/channel/WebhookNotifyChannelTest.java @@ -0,0 +1,56 @@ +package com.viewsh.module.iot.rule.notify.channel; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * B16 WebhookNotifyChannel SSRF 校验单元测试。 + */ +class WebhookNotifyChannelTest { + + private WebhookNotifyChannel channel; + + @BeforeEach + void setUp() { + channel = new WebhookNotifyChannel(new ObjectMapper()); + } + + @Test + void validateUrl_httpsOk_noException() { + assertDoesNotThrow(() -> channel.validateWebhookUrl("https://hook.example.com/alert")); + } + + @Test + void validateUrl_http_throwsIllegalArgument() { + IllegalArgumentException ex = assertThrows( + IllegalArgumentException.class, + () -> channel.validateWebhookUrl("http://hook.example.com/alert")); + assertTrue(ex.getMessage().contains("HTTPS"), "Expected HTTPS in message: " + ex.getMessage()); + } + + @Test + void validateUrl_localhost_throwsIllegalArgument() { + assertThrows(IllegalArgumentException.class, + () -> channel.validateWebhookUrl("http://localhost/admin")); + } + + @Test + void validateUrl_blank_throwsIllegalArgument() { + assertThrows(IllegalArgumentException.class, + () -> channel.validateWebhookUrl("")); + } + + @Test + void validateUrl_null_throwsIllegalArgument() { + assertThrows(IllegalArgumentException.class, + () -> channel.validateWebhookUrl(null)); + } + + @Test + void getName_returnsWebhook() { + assertEquals("webhook", channel.getName()); + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotCleanRuleMessageHandler.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotCleanRuleMessageHandler.java index d335053a..6e598699 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotCleanRuleMessageHandler.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotCleanRuleMessageHandler.java @@ -5,6 +5,9 @@ import com.viewsh.framework.tenant.core.util.TenantUtils; import com.viewsh.module.iot.core.messagebus.core.IotMessageBus; import com.viewsh.module.iot.core.messagebus.core.IotMessageSubscriber; import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; +import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO; +import com.viewsh.module.iot.rule.config.IotRuleEngineVersionResolver; +import com.viewsh.module.iot.service.device.IotDeviceService; import com.viewsh.module.iot.service.rule.clean.CleanRuleProcessorManager; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; @@ -33,6 +36,12 @@ public class IotCleanRuleMessageHandler implements IotMessageSubscriber { ProjectUtils.execute(message.getProjectId(), () -> { try { diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotDataRuleMessageHandler.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotDataRuleMessageHandler.java index 041099c6..abf51f39 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotDataRuleMessageHandler.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotDataRuleMessageHandler.java @@ -4,6 +4,9 @@ import com.viewsh.framework.tenant.core.util.TenantUtils; import com.viewsh.module.iot.core.messagebus.core.IotMessageBus; import com.viewsh.module.iot.core.messagebus.core.IotMessageSubscriber; import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; +import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO; +import com.viewsh.module.iot.rule.config.IotRuleEngineVersionResolver; +import com.viewsh.module.iot.service.device.IotDeviceService; import com.viewsh.module.iot.service.rule.data.IotDataRuleService; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; @@ -13,6 +16,9 @@ import org.springframework.stereotype.Component; /** * 针对 {@link IotDeviceMessage} 的消费者,处理数据流转 * + *

⚠️ B9 改造:在 onMessage 最前面增加 v2 路由判断,若子系统已切换到 v2, + * 则跳过本消费者,由 {@link IotRuleEngineMessageHandler} 统一处理,避免双跑。 + * * @author 芋道源码 */ @Component @@ -25,6 +31,12 @@ public class IotDataRuleMessageHandler implements IotMessageSubscriber dataRuleService.executeDataRule(message)); } diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotRuleEngineMessageHandler.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotRuleEngineMessageHandler.java new file mode 100644 index 00000000..1f0b7756 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotRuleEngineMessageHandler.java @@ -0,0 +1,98 @@ +package com.viewsh.module.iot.mq.consumer.rule; + +import com.viewsh.framework.tenant.core.util.TenantUtils; +import com.viewsh.module.iot.core.messagebus.core.IotMessageBus; +import com.viewsh.module.iot.core.messagebus.core.IotMessageSubscriber; +import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; +import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO; +import com.viewsh.module.iot.rule.config.IotRuleEngineVersionResolver; +import com.viewsh.module.iot.rule.engine.RuleEngine; +import com.viewsh.module.iot.rule.engine.RuleEngineResult; +import com.viewsh.module.iot.service.device.IotDeviceService; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * v2 规则引擎统一消费入口(B9) + * + *

与 v1 的三个消费者(DataRule / SceneRule / CleanRule)并存。 + * 按子系统灰度:仅当 {@link IotRuleEngineVersionResolver#shouldUseV2} 返回 {@code true} 时, + * 本 Handler 才处理消息;v1 消费者在同样的判断下跳过,避免双跑。 + * + *

⚠️ Known Pitfalls(评审 B11): + *

    + *
  • 三态(V1 / V2 / HYBRID)由 versionResolver 统一决策,本类不做版本判断逻辑
  • + *
  • device 为 null(已删除设备):log WARN + return,不抛异常
  • + *
  • RuleEngine.execute() 异常:try-catch 吞掉 + log.error,不向上抛(防重试风暴)
  • + *
+ * + * @author lzh + */ +@Component +@Slf4j +public class IotRuleEngineMessageHandler implements IotMessageSubscriber { + + @Resource + private IotRuleEngineVersionResolver versionResolver; + + @Resource + private RuleEngine ruleEngine; + + @Resource + private IotDeviceService deviceService; + + @Resource + private IotMessageBus messageBus; + + @PostConstruct + public void init() { + messageBus.register(this); + } + + @Override + public String getTopic() { + return IotDeviceMessage.MESSAGE_BUS_DEVICE_MESSAGE_TOPIC; + } + + @Override + public String getGroup() { + return "iot_rule_engine_v2_consumer"; + } + + @Override + public void onMessage(IotDeviceMessage message) { + // 1. 取设备基本信息(cache,O(1),不查 DB) + IotDeviceDO device = deviceService.getDeviceFromCache(message.getDeviceId()); + if (device == null) { + log.warn("[RuleEngineHandler] device not found, skip. deviceId={}", message.getDeviceId()); + return; + } + + // 2. 判断本消息是否走 v2;v1 的消费者同样判断并跳过,保证绝不双跑 + if (!versionResolver.shouldUseV2(device.getSubsystemId(), device.getTenantId())) { + return; // 由 v1 消费者处理 + } + + // 3. 走 v2 规则引擎 + TenantUtils.execute(device.getTenantId(), () -> { + try { + RuleEngineResult result = ruleEngine.execute( + message, + device.getTenantId(), + device.getSubsystemId(), + device.getProductId(), + device.getId()); + if (result.hasFailure()) { + log.warn("[RuleEngineHandler] partial failures deviceId={} failureCount={}", + message.getDeviceId(), result.getFailures().size()); + } + } catch (Exception e) { + // ⚠️ 不向上抛出,避免消息总线重试风暴 + log.error("[RuleEngineHandler] execute failed deviceId={}", message.getDeviceId(), e); + } + }); + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotSceneRuleMessageHandler.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotSceneRuleMessageHandler.java index 8b612888..50bcae86 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotSceneRuleMessageHandler.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotSceneRuleMessageHandler.java @@ -4,12 +4,21 @@ import com.viewsh.framework.tenant.core.util.TenantUtils; import com.viewsh.module.iot.core.messagebus.core.IotMessageBus; import com.viewsh.module.iot.core.messagebus.core.IotMessageSubscriber; import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; +import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO; +import com.viewsh.module.iot.rule.config.IotRuleEngineVersionResolver; +import com.viewsh.module.iot.service.device.IotDeviceService; import com.viewsh.module.iot.service.rule.scene.IotSceneRuleService; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +/** + * 针对 {@link IotDeviceMessage} 的消费者,处理场景规则 + * + *

⚠️ B9 改造:在 onMessage 最前面增加 v2 路由判断,若子系统已切换到 v2, + * 则跳过本消费者,由 {@link IotRuleEngineMessageHandler} 统一处理,避免双跑。 + */ @Component @Slf4j public class IotSceneRuleMessageHandler implements IotMessageSubscriber { @@ -20,6 +29,12 @@ public class IotSceneRuleMessageHandler implements IotMessageSubscriber sceneRuleService.executeSceneRuleByDevice(message)); } } diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/AlarmCacheState.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/AlarmCacheState.java new file mode 100644 index 00000000..6aa6ebb1 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/AlarmCacheState.java @@ -0,0 +1,52 @@ +package com.viewsh.module.iot.service.alarm; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 告警缓存状态值对象(从 Redis Hash 反序列化) + * + *

字段与 Redis Hash field 一一对应: + *

+ * ack_state    → ackState
+ * clear_state  → clearState
+ * archived     → archived
+ * severity     → severity
+ * alarm_time   → alarmTime   (epoch milli)
+ * clear_time   → clearTime   (epoch milli)
+ * 
+ * + * @author B14 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class AlarmCacheState { + + /** 确认状态:0 未确认 / 1 已确认 */ + private int ackState; + + /** 清除状态:0 活跃 / 1 已清除 */ + private int clearState; + + /** 归档:0 未归档 / 1 已归档 */ + private int archived; + + /** 严重度 1-5 */ + private int severity; + + /** + * 最近一次触发时间(epoch millis) + *

对应 DB 中 {@code end_ts}(最近触发时间)

+ */ + private long alarmTime; + + /** + * 清除时间(epoch millis),clearState=0 时为 0 + */ + private long clearTime; + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/AlarmStateValidator.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/AlarmStateValidator.java new file mode 100644 index 00000000..de7262e2 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/AlarmStateValidator.java @@ -0,0 +1,71 @@ +package com.viewsh.module.iot.service.alarm; + +import org.springframework.stereotype.Component; + +/** + * 告警状态时序校验器 + * + *

防止旧消息(延迟到达的消息)误触发已清除告警或重置活跃告警。 + * + *

设计依据(评审 §6.2): + *

    + *
  • 有效性判断基于 消息时间戳,不基于状态比对
  • + *
  • 缓存是最终一致的,但时间戳判断不受缓存脏读影响
  • + *
+ * + * @author B14 + */ +@Component +public class AlarmStateValidator { + + /** + * 判断触发消息是否有效。 + * + *

规则:若告警已清除(clearState=1),则消息时间戳必须在最近 clear_time 之后, + * 否则认为是已清除后延迟到达的旧触发消息,直接忽略。 + * + *

若告警未清除(活跃),任何触发消息都视为有效(累积 trigger_count)。 + * + * @param cache 当前缓存状态(可为 null,表示首次触发) + * @param msgTs 消息时间戳(epoch millis) + * @return true 表示有效,应更新状态;false 表示旧消息,应忽略 + */ + public boolean isEffectiveTrigger(AlarmCacheState cache, long msgTs) { + if (cache == null) { + // 首次触发,无缓存,直接有效 + return true; + } + if (cache.getClearState() == 1) { + // 告警已清除:仅接受 clearTime 之后的触发(重新激活场景) + return msgTs > cache.getClearTime(); + } + // 活跃告警,所有触发都有效 + return true; + } + + /** + * 判断清除消息是否有效。 + * + *

规则: + *

    + *
  1. 告警已清除(clearState=1)→ 忽略(幂等)
  2. + *
  3. 消息时间戳必须在 alarm_time 之后(过期的清除消息不生效)
  4. + *
+ * + * @param cache 当前缓存状态(可为 null,表示告警不存在,视为无效) + * @param msgTs 消息时间戳(epoch millis) + * @return true 表示有效,应执行清除;false 表示无效,应忽略 + */ + public boolean isEffectiveClear(AlarmCacheState cache, long msgTs) { + if (cache == null) { + return false; + } + // 已清除 → 幂等忽略 + if (cache.getClearState() == 1) { + return false; + } + // 消息时间戳必须晚于最近触发时间 + return msgTs > cache.getAlarmTime(); + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/IotAlarmCacheService.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/IotAlarmCacheService.java new file mode 100644 index 00000000..49105872 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/IotAlarmCacheService.java @@ -0,0 +1,198 @@ +package com.viewsh.module.iot.service.alarm; + +import com.viewsh.module.iot.dal.dataobject.alarm.IotAlarmRecordDO; +import com.viewsh.module.iot.dal.mysql.alarm.IotAlarmRecordMapper; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * 告警状态 Redis Hash 缓存 Service + * + *

Redis Key: {@code iot:alarm:state:{recordId}} (Hash) + *

+ * fields:
+ *   ack_state    — 0/1
+ *   clear_state  — 0/1
+ *   archived     — 0/1
+ *   severity     — 1-5
+ *   alarm_time   — epoch millis(最近触发时间)
+ *   clear_time   — epoch millis(清除时间,未清除时为 0)
+ * TTL: 7 天,每次 write 时 expire 刷新
+ * 
+ * + *

设计原则: + *

    + *
  • 读取时 cache miss → 从 DB 重建(按需加载,不做启动预热)
  • + *
  • DB 写后立即更新缓存(在分布式锁保护下,不存在并发脏写)
  • + *
  • 缓存失效后 fallback 走 DB,性能下降但不故障
  • + *
+ * + * @author B14 + */ +@Slf4j +@Service +public class IotAlarmCacheService { + + /** 缓存 key 前缀 */ + private static final String KEY_PREFIX = "iot:alarm:state:"; + + /** TTL 7 天 */ + private static final long TTL_DAYS = 7L; + + /** Hash field 名常量 */ + private static final String F_ACK_STATE = "ack_state"; + private static final String F_CLEAR_STATE = "clear_state"; + private static final String F_ARCHIVED = "archived"; + private static final String F_SEVERITY = "severity"; + private static final String F_ALARM_TIME = "alarm_time"; + private static final String F_CLEAR_TIME = "clear_time"; + + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Resource + private IotAlarmRecordMapper alarmRecordMapper; + + // ==================== 读取 ==================== + + /** + * 获取告警缓存状态。 + *

若 cache miss,从 DB 重建并写入缓存后返回;DB 也不存在则返回 {@code null}。 + * + * @param recordId 告警记录 ID + * @return AlarmCacheState,或 null(DB 中不存在该记录) + */ + public AlarmCacheState get(Long recordId) { + String key = buildKey(recordId); + Map fields = stringRedisTemplate.opsForHash().entries(key); + if (!fields.isEmpty()) { + return fromMap(fields); + } + + // Cache miss → DB 重建 + log.debug("[get][alarm cache miss,从 DB 重建,recordId={}]", recordId); + IotAlarmRecordDO record = alarmRecordMapper.selectById(recordId); + if (record == null) { + return null; + } + updateState(recordId, record); + return buildFromDO(record); + } + + // ==================== 写入 ==================== + + /** + * 在 DB 写入后同步更新 Redis Hash 缓存(每次 write 刷新 TTL)。 + * + * @param recordId 告警记录 ID + * @param record 已持久化的 DO + */ + public void updateState(Long recordId, IotAlarmRecordDO record) { + String key = buildKey(recordId); + Map map = new HashMap<>(8); + map.put(F_ACK_STATE, safeInt(record.getAckState())); + map.put(F_CLEAR_STATE, safeInt(record.getClearState())); + map.put(F_ARCHIVED, safeInt(record.getArchived())); + map.put(F_SEVERITY, safeInt(record.getSeverity())); + map.put(F_ALARM_TIME, toEpochMilli(record.getEndTs())); + map.put(F_CLEAR_TIME, toEpochMilli(record.getClearTs())); + + stringRedisTemplate.opsForHash().putAll(key, map); + stringRedisTemplate.expire(key, TTL_DAYS, TimeUnit.DAYS); + log.debug("[updateState][alarm cache 已更新,recordId={}, clearState={}]", + recordId, record.getClearState()); + } + + // ==================== 清除 ==================== + + /** + * 主动驱逐缓存(告警归档时可调用以节省内存)。 + * + * @param recordId 告警记录 ID + */ + public void evict(Long recordId) { + stringRedisTemplate.delete(buildKey(recordId)); + log.debug("[evict][alarm cache 已清除,recordId={}]", recordId); + } + + // ==================== 私有方法 ==================== + + private String buildKey(Long recordId) { + return KEY_PREFIX + recordId; + } + + private AlarmCacheState fromMap(Map fields) { + return AlarmCacheState.builder() + .ackState(parseInt(fields.get(F_ACK_STATE))) + .clearState(parseInt(fields.get(F_CLEAR_STATE))) + .archived(parseInt(fields.get(F_ARCHIVED))) + .severity(parseInt(fields.get(F_SEVERITY))) + .alarmTime(parseLong(fields.get(F_ALARM_TIME))) + .clearTime(parseLong(fields.get(F_CLEAR_TIME))) + .build(); + } + + private AlarmCacheState buildFromDO(IotAlarmRecordDO record) { + return AlarmCacheState.builder() + .ackState(safeIntVal(record.getAckState())) + .clearState(safeIntVal(record.getClearState())) + .archived(safeIntVal(record.getArchived())) + .severity(safeIntVal(record.getSeverity())) + .alarmTime(toEpochMilliLong(record.getEndTs())) + .clearTime(toEpochMilliLong(record.getClearTs())) + .build(); + } + + private static String safeInt(Integer v) { + return v == null ? "0" : String.valueOf(v); + } + + private static int safeIntVal(Integer v) { + return v == null ? 0 : v; + } + + private static String toEpochMilli(LocalDateTime ldt) { + if (ldt == null) { + return "0"; + } + return String.valueOf(ldt.toInstant(ZoneOffset.UTC).toEpochMilli()); + } + + private static long toEpochMilliLong(LocalDateTime ldt) { + if (ldt == null) { + return 0L; + } + return ldt.toInstant(ZoneOffset.UTC).toEpochMilli(); + } + + private static int parseInt(Object v) { + if (v == null) { + return 0; + } + try { + return Integer.parseInt(v.toString()); + } catch (NumberFormatException e) { + return 0; + } + } + + private static long parseLong(Object v) { + if (v == null) { + return 0L; + } + try { + return Long.parseLong(v.toString()); + } catch (NumberFormatException e) { + return 0L; + } + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/IotAlarmLockService.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/IotAlarmLockService.java new file mode 100644 index 00000000..24f04f5a --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/IotAlarmLockService.java @@ -0,0 +1,106 @@ +package com.viewsh.module.iot.service.alarm; + +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.script.DefaultRedisScript; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import java.util.function.Supplier; + +import static com.viewsh.framework.common.exception.util.ServiceExceptionUtil.exception; +import static com.viewsh.module.iot.enums.ErrorCodeConstants.ALARM_LOCK_CONFLICT; + +/** + * 告警分布式锁 Service + * + *

基于 Redis SET NX PX + Lua 原子解锁,防止并发告警状态改写冲突。 + * + *

Redis Key: {@code iot:alarm:lock:{recordId}} (String) + *

+ *   value: UUID token(用于防止 A 删 B 的锁)
+ *   TTL:   由调用方传入(建议 5-10 秒)
+ * 
+ * + *

Known Pitfalls 落地: + *

    + *
  • ⚠️ [评审 C4] 禁用 SETNX,改用 {@code SET NX PX}(通过 {@code setIfAbsent(key, value, timeout)} 实现)
  • + *
  • ⚠️ [评审 C4] 释放锁用 Lua 脚本验证 token,防止 A 释放了 B 持有的锁
  • + *
  • ⚠️ 锁获取失败直接抛 {@code ALARM_LOCK_CONFLICT},不循环等待(由上游 RuleEngine 记录 metric 后重试)
  • + *
+ * + * @author B14 + */ +@Slf4j +@Service +public class IotAlarmLockService { + + /** 锁 key 前缀 */ + private static final String LOCK_KEY_PREFIX = "iot:alarm:lock:"; + + /** + * Lua 脚本:仅当 key 的值等于 token 时才删除,避免 A 删了 B 的锁。 + *

返回 1 表示删除成功,0 表示 token 不匹配(锁已过期或被其他人持有)。 + */ + private static final DefaultRedisScript RELEASE_SCRIPT; + + static { + RELEASE_SCRIPT = new DefaultRedisScript<>(); + RELEASE_SCRIPT.setScriptText( + "if redis.call('get', KEYS[1]) == ARGV[1] then " + + " return redis.call('del', KEYS[1]) " + + "else " + + " return 0 " + + "end" + ); + RELEASE_SCRIPT.setResultType(Long.class); + } + + @Resource + private StringRedisTemplate stringRedisTemplate; + + /** + * 在分布式锁保护下执行指定操作。 + * + *

获取锁失败(另一个线程/进程持有该记录的锁)立即抛出 {@link com.viewsh.framework.common.exception.ServiceException}, + * 错误码 {@code ALARM_LOCK_CONFLICT}。 + * + *

无论操作是否成功,finally 块中用 Lua 脚本释放锁(仅释放自己的 token)。 + * + * @param recordId 告警记录 ID + * @param timeout 锁超时时长(建议 5-10 秒) + * @param action 被保护的业务逻辑 + * @param 返回值类型 + * @return action 的返回值 + * @throws com.viewsh.framework.common.exception.ServiceException ALARM_LOCK_CONFLICT 锁冲突 + */ + public T executeWithLock(Long recordId, Duration timeout, Supplier action) { + String key = LOCK_KEY_PREFIX + recordId; + String token = UUID.randomUUID().toString(); + + // SET NX PX — 原子加锁(评审 C4 要求) + Boolean acquired = stringRedisTemplate.opsForValue().setIfAbsent(key, token, timeout); + if (!Boolean.TRUE.equals(acquired)) { + log.warn("[executeWithLock][获取告警锁失败,recordId={}, 锁冲突]", recordId); + throw exception(ALARM_LOCK_CONFLICT); + } + + log.debug("[executeWithLock][获取告警锁成功,recordId={}, token={}]", recordId, token); + try { + return action.get(); + } finally { + // Lua 原子释放:只删自己的 token(评审 C4 要求) + Long result = stringRedisTemplate.execute(RELEASE_SCRIPT, List.of(key), token); + if (result == null || result == 0L) { + log.warn("[executeWithLock][Lua 解锁失败(token 不匹配或锁已过期),recordId={}, token={}]", + recordId, token); + } else { + log.debug("[executeWithLock][告警锁已释放,recordId={}]", recordId); + } + } + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/IotAlarmRecordServiceImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/IotAlarmRecordServiceImpl.java index e63a6c13..ed56b5cb 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/IotAlarmRecordServiceImpl.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/IotAlarmRecordServiceImpl.java @@ -12,6 +12,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.validation.annotation.Validated; +import java.time.Duration; import java.time.LocalDateTime; import static com.viewsh.framework.common.exception.util.ServiceExceptionUtil.exception; @@ -21,26 +22,38 @@ import static com.viewsh.module.iot.enums.ErrorCodeConstants.*; * IoT 告警记录 Service 实现(v2.0 正交状态机) *

* 本卡职责:建表 / 枚举 / DO / Service 状态机核心 + 幂等 upsert。 - * 不实现:分布式锁(B14)、告警传播(B15)、通知(B16)、 - * 告警条件结构化存储(评审 E4)、Redis 实时计数(B14/O1)。 + * B14 叠加:分布式锁(SET NX PX + Lua)+ Redis 状态缓存 + 时序有效性校验。 + * 不实现:告警传播(B15)、通知(B16)。 *

* Known Pitfalls 落地: * - ⚠️ 评审 C1:正交三字段替代线性 4 枚举({@link IotAlarmRecordDO}) * - ⚠️ 评审 C2:联合 UK 幂等 upsert({@link IotAlarmRecordMapper#selectActiveByDeviceAndConfig}) - * - ⚠️ 评审 C4:TODO B14 加分布式锁(triggerAlarm 当前仅 CAS) + * - ⚠️ 评审 C4:分布式锁({@link IotAlarmLockService})+ Lua 原子解锁 * - ⚠️ 归档后不可修改:5 个状态迁移 API 都先校验 archived=0 * - ⚠️ tenant_id:基于 {@link TenantContextHolder} * - * @author B12 + * @author B12 / B14 */ @Slf4j @Service @Validated public class IotAlarmRecordServiceImpl implements IotAlarmRecordService { + /** 分布式锁超时(10 秒:覆盖 DB 写 + history 写 + 网络抖动) */ + private static final Duration LOCK_TIMEOUT = Duration.ofSeconds(10); + @Resource private IotAlarmRecordMapper alarmRecordMapper; + @Resource + private IotAlarmLockService lockService; + + @Resource + private IotAlarmCacheService cacheService; + + @Resource + private AlarmStateValidator validator; + // ==================== 触发(幂等 upsert) ==================== @Override @@ -60,15 +73,53 @@ public class IotAlarmRecordServiceImpl implements IotAlarmRecordService { Long tenantId = TenantContextHolder.getTenantId(); LocalDateTime now = LocalDateTime.now(); - // 2. 幂等 upsert(评审 C2) - // TODO B14:此处应套分布式锁 SET NX PX + Lua,防并发 trigger 造成双写 + // 2. 先查或创建 alarm_record(UK 幂等,不在锁内,避免死锁) IotAlarmRecordDO existing = alarmRecordMapper.selectActiveByDeviceAndConfig( request.getDeviceId(), request.getAlarmConfigId(), tenantId); - if (existing != null) { - // 已存在活跃记录 → trigger_count++ + end_ts 更新 + details 覆盖 + if (existing == null) { + // 3a. 首次触发:新建 (0,0,0) + IotAlarmRecordDO record = IotAlarmRecordDO.builder() + .alarmConfigId(request.getAlarmConfigId()) + .alarmName(request.getAlarmName()) + .severity(request.getSeverity()) + .ackState(0) + .clearState(0) + .archived(0) + .deviceId(request.getDeviceId()) + .productId(request.getProductId()) + .subsystemId(request.getSubsystemId()) + .ruleChainId(request.getRuleChainId()) + .sceneRuleId(request.getSceneRuleId()) + .startTs(now) + .endTs(now) + .details(request.getDetails()) + .triggerCount(1) + .build(); + alarmRecordMapper.insert(record); + log.info("[triggerAlarm] 新建告警 id={} device={} config={}", + record.getId(), request.getDeviceId(), request.getAlarmConfigId()); + // 同步缓存(首次,无需锁) + cacheService.updateState(record.getId(), record); + return record.getId(); + } + + final Long recordId = existing.getId(); + + // 3b. 已有记录:分布式锁保护状态变更(评审 C4) + return lockService.executeWithLock(recordId, LOCK_TIMEOUT, () -> { + // 4. 时序有效性校验(防旧消息重触发已清除告警) + AlarmCacheState cache = cacheService.get(recordId); + long msgTs = request.getTimestamp() > 0 ? request.getTimestamp() : System.currentTimeMillis(); + if (!validator.isEffectiveTrigger(cache, msgTs)) { + log.debug("[triggerAlarm] 忽略旧触发消息,recordId={}, msgTs={}, clearTime={}", + recordId, msgTs, cache != null ? cache.getClearTime() : "N/A"); + return recordId; + } + + // 5. DB upsert:trigger_count++ + end_ts 更新 + details 覆盖 IotAlarmRecordDO update = new IotAlarmRecordDO(); - update.setId(existing.getId()); + update.setId(recordId); update.setEndTs(now); int oldCount = existing.getTriggerCount() == null ? 1 : existing.getTriggerCount(); update.setTriggerCount(oldCount + 1); @@ -76,32 +127,17 @@ public class IotAlarmRecordServiceImpl implements IotAlarmRecordService { update.setDetails(request.getDetails()); } alarmRecordMapper.updateById(update); - log.debug("[triggerAlarm] 累积触发 id={} count={}", existing.getId(), oldCount + 1); - return existing.getId(); - } + log.debug("[triggerAlarm] 累积触发 id={} count={}", recordId, oldCount + 1); - // 3. 新建(0, 0, 0) - IotAlarmRecordDO record = IotAlarmRecordDO.builder() - .alarmConfigId(request.getAlarmConfigId()) - .alarmName(request.getAlarmName()) - .severity(request.getSeverity()) - .ackState(0) - .clearState(0) - .archived(0) - .deviceId(request.getDeviceId()) - .productId(request.getProductId()) - .subsystemId(request.getSubsystemId()) - .ruleChainId(request.getRuleChainId()) - .sceneRuleId(request.getSceneRuleId()) - .startTs(now) - .endTs(now) - .details(request.getDetails()) - .triggerCount(1) - .build(); - alarmRecordMapper.insert(record); - log.info("[triggerAlarm] 新建告警 id={} device={} config={}", - record.getId(), request.getDeviceId(), request.getAlarmConfigId()); - return record.getId(); + // 6. 同步缓存(DB 写后立即更新) + // 重新拉取最新 DO(含 update 后字段) + IotAlarmRecordDO latest = alarmRecordMapper.selectById(recordId); + if (latest != null) { + cacheService.updateState(recordId, latest); + } + + return recordId; + }); } // ==================== 状态迁移(CAS by Service 层) ==================== @@ -113,15 +149,25 @@ public class IotAlarmRecordServiceImpl implements IotAlarmRecordService { log.debug("[ackAlarm] 告警 id={} 已确认,幂等跳过", alarm.getId()); return; } - IotAlarmRecordDO update = new IotAlarmRecordDO(); - update.setId(alarm.getId()); - update.setAckState(1); - update.setAckTs(LocalDateTime.now()); - update.setUpdater(request.getOperator()); - if (request.getRemark() != null && !request.getRemark().isBlank()) { - update.setProcessRemark(request.getRemark()); - } - alarmRecordMapper.updateById(update); + + lockService.executeWithLock(alarm.getId(), LOCK_TIMEOUT, () -> { + IotAlarmRecordDO update = new IotAlarmRecordDO(); + update.setId(alarm.getId()); + update.setAckState(1); + update.setAckTs(LocalDateTime.now()); + update.setUpdater(request.getOperator()); + if (request.getRemark() != null && !request.getRemark().isBlank()) { + update.setProcessRemark(request.getRemark()); + } + alarmRecordMapper.updateById(update); + + // 同步缓存 + IotAlarmRecordDO latest = alarmRecordMapper.selectById(alarm.getId()); + if (latest != null) { + cacheService.updateState(alarm.getId(), latest); + } + return null; + }); } @Override @@ -131,15 +177,25 @@ public class IotAlarmRecordServiceImpl implements IotAlarmRecordService { log.debug("[unackAlarm] 告警 id={} 未确认,幂等跳过", alarm.getId()); return; } - IotAlarmRecordDO update = new IotAlarmRecordDO(); - update.setId(alarm.getId()); - update.setAckState(0); - update.setAckTs(null); - update.setUpdater(request.getOperator()); - if (request.getRemark() != null && !request.getRemark().isBlank()) { - update.setProcessRemark(request.getRemark()); - } - alarmRecordMapper.updateById(update); + + lockService.executeWithLock(alarm.getId(), LOCK_TIMEOUT, () -> { + IotAlarmRecordDO update = new IotAlarmRecordDO(); + update.setId(alarm.getId()); + update.setAckState(0); + update.setAckTs(null); + update.setUpdater(request.getOperator()); + if (request.getRemark() != null && !request.getRemark().isBlank()) { + update.setProcessRemark(request.getRemark()); + } + alarmRecordMapper.updateById(update); + + // 同步缓存 + IotAlarmRecordDO latest = alarmRecordMapper.selectById(alarm.getId()); + if (latest != null) { + cacheService.updateState(alarm.getId(), latest); + } + return null; + }); } @Override @@ -149,15 +205,25 @@ public class IotAlarmRecordServiceImpl implements IotAlarmRecordService { log.debug("[clearAlarm] 告警 id={} 已清除,幂等跳过", alarm.getId()); return; } - IotAlarmRecordDO update = new IotAlarmRecordDO(); - update.setId(alarm.getId()); - update.setClearState(1); - update.setClearTs(LocalDateTime.now()); - update.setUpdater(request.getOperator()); - if (request.getRemark() != null && !request.getRemark().isBlank()) { - update.setProcessRemark(request.getRemark()); - } - alarmRecordMapper.updateById(update); + + lockService.executeWithLock(alarm.getId(), LOCK_TIMEOUT, () -> { + IotAlarmRecordDO update = new IotAlarmRecordDO(); + update.setId(alarm.getId()); + update.setClearState(1); + update.setClearTs(LocalDateTime.now()); + update.setUpdater(request.getOperator()); + if (request.getRemark() != null && !request.getRemark().isBlank()) { + update.setProcessRemark(request.getRemark()); + } + alarmRecordMapper.updateById(update); + + // 同步缓存 + IotAlarmRecordDO latest = alarmRecordMapper.selectById(alarm.getId()); + if (latest != null) { + cacheService.updateState(alarm.getId(), latest); + } + return null; + }); } @Override @@ -167,15 +233,22 @@ public class IotAlarmRecordServiceImpl implements IotAlarmRecordService { // 归档非幂等:已归档再调用抛业务异常 throw exception(ALARM_ALREADY_ARCHIVED); } - IotAlarmRecordDO update = new IotAlarmRecordDO(); - update.setId(alarm.getId()); - update.setArchived(1); - update.setArchiveTs(LocalDateTime.now()); - update.setUpdater(request.getOperator()); - if (request.getRemark() != null && !request.getRemark().isBlank()) { - update.setProcessRemark(request.getRemark()); - } - alarmRecordMapper.updateById(update); + + lockService.executeWithLock(alarm.getId(), LOCK_TIMEOUT, () -> { + IotAlarmRecordDO update = new IotAlarmRecordDO(); + update.setId(alarm.getId()); + update.setArchived(1); + update.setArchiveTs(LocalDateTime.now()); + update.setUpdater(request.getOperator()); + if (request.getRemark() != null && !request.getRemark().isBlank()) { + update.setProcessRemark(request.getRemark()); + } + alarmRecordMapper.updateById(update); + + // 归档后驱逐缓存(节省内存,可接受 cache miss fallback 到 DB) + cacheService.evict(alarm.getId()); + return null; + }); } // ==================== 查询 ==================== diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/dto/AlarmTriggerRequest.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/dto/AlarmTriggerRequest.java index b0725521..cd257387 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/dto/AlarmTriggerRequest.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/dto/AlarmTriggerRequest.java @@ -44,4 +44,12 @@ public class AlarmTriggerRequest { /** 触发详情(JSON) */ private JsonNode details; + /** + * 消息时间戳(epoch millis,来自设备上报时间或规则引擎推断)。 + *

用于 B14 {@link com.viewsh.module.iot.service.alarm.AlarmStateValidator#isEffectiveTrigger} 时序校验: + * 防止延迟到达的旧触发消息重新激活已清除的告警。 + *

若未填写(0 或 null),默认使用当前时间(System.currentTimeMillis())。 + */ + private long timestamp; + } diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/mq/consumer/rule/IotRuleEngineMessageHandlerTest.java b/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/mq/consumer/rule/IotRuleEngineMessageHandlerTest.java new file mode 100644 index 00000000..41a34758 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/mq/consumer/rule/IotRuleEngineMessageHandlerTest.java @@ -0,0 +1,167 @@ +package com.viewsh.module.iot.mq.consumer.rule; + +import com.viewsh.framework.test.core.ut.BaseMockitoUnitTest; +import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; +import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO; +import com.viewsh.module.iot.rule.config.IotRuleEngineVersionResolver; +import com.viewsh.module.iot.rule.engine.RuleEngine; +import com.viewsh.module.iot.rule.engine.RuleEngineResult; +import com.viewsh.module.iot.service.device.IotDeviceService; +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; + +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +/** + * {@link IotRuleEngineMessageHandler} 单元测试 + * + *

验证 B9 的六个场景(见任务卡 §6 Test Cases): + *

    + *
  1. 全 v1 — shouldUseV2=false → handler 提前返回,ruleEngine 不调用
  2. + *
  3. 全 v2 — shouldUseV2=true → ruleEngine.execute() 被调用
  4. + *
  5. hybrid + 白名单命中 — shouldUseV2=true → ruleEngine.execute() 被调用
  6. + *
  7. hybrid + 白名单未命中 — shouldUseV2=false → handler 提前返回
  8. + *
  9. device 已删除(null)— log WARN + skip,ruleEngine 不调用
  10. + *
  11. RuleEngine 抛异常 — handler 捕获异常,不向上抛出
  12. + *
+ */ +class IotRuleEngineMessageHandlerTest extends BaseMockitoUnitTest { + + @InjectMocks + private IotRuleEngineMessageHandler handler; + + @Mock + private IotRuleEngineVersionResolver versionResolver; + + @Mock + private RuleEngine ruleEngine; + + @Mock + private IotDeviceService deviceService; + + // ---- 测试数据 ---- + + private static final Long DEVICE_ID = 100L; + private static final Long TENANT_ID = 1L; + private static final Long SUBSYSTEM_ID = 5L; + private static final Long PRODUCT_ID = 10L; + + private IotDeviceMessage buildMessage() { + return IotDeviceMessage.builder() + .deviceId(DEVICE_ID) + .tenantId(TENANT_ID) + .build(); + } + + private IotDeviceDO buildDevice(Long subsystemId) { + IotDeviceDO device = IotDeviceDO.builder() + .id(DEVICE_ID) + .subsystemId(subsystemId) + .productId(PRODUCT_ID) + .build(); + device.setTenantId(TENANT_ID); + return device; + } + + // ---- 场景 1:全 v1,shouldUseV2=false,handler 不处理 ---- + + @Test + void testOnMessage_globalV1_skipHandler() { + IotDeviceMessage msg = buildMessage(); + IotDeviceDO device = buildDevice(SUBSYSTEM_ID); + + when(deviceService.getDeviceFromCache(DEVICE_ID)).thenReturn(device); + when(versionResolver.shouldUseV2(SUBSYSTEM_ID, TENANT_ID)).thenReturn(false); + + handler.onMessage(msg); + + verify(ruleEngine, never()).execute(any(), any(), any(), any(), any()); + } + + // ---- 场景 2:全 v2,shouldUseV2=true,ruleEngine.execute() 被调用 ---- + + @Test + void testOnMessage_globalV2_executeRuleEngine() { + IotDeviceMessage msg = buildMessage(); + IotDeviceDO device = buildDevice(SUBSYSTEM_ID); + RuleEngineResult result = new RuleEngineResult(); + + when(deviceService.getDeviceFromCache(DEVICE_ID)).thenReturn(device); + when(versionResolver.shouldUseV2(SUBSYSTEM_ID, TENANT_ID)).thenReturn(true); + when(ruleEngine.execute(eq(msg), eq(TENANT_ID), eq(SUBSYSTEM_ID), eq(PRODUCT_ID), eq(DEVICE_ID))) + .thenReturn(result); + + handler.onMessage(msg); + + verify(ruleEngine, times(1)).execute(eq(msg), eq(TENANT_ID), eq(SUBSYSTEM_ID), eq(PRODUCT_ID), eq(DEVICE_ID)); + } + + // ---- 场景 3:hybrid + 白名单命中(subsystem=5),走 v2 ---- + + @Test + void testOnMessage_hybridWhitelistHit_executeRuleEngine() { + IotDeviceMessage msg = buildMessage(); + IotDeviceDO device = buildDevice(SUBSYSTEM_ID); + RuleEngineResult result = new RuleEngineResult(); + + when(deviceService.getDeviceFromCache(DEVICE_ID)).thenReturn(device); + when(versionResolver.shouldUseV2(SUBSYSTEM_ID, TENANT_ID)).thenReturn(true); // hybrid: subsystem 5 in whitelist + when(ruleEngine.execute(eq(msg), eq(TENANT_ID), eq(SUBSYSTEM_ID), eq(PRODUCT_ID), eq(DEVICE_ID))) + .thenReturn(result); + + handler.onMessage(msg); + + verify(ruleEngine, times(1)).execute(any(), any(), any(), any(), any()); + } + + // ---- 场景 4:hybrid + 白名单未命中(subsystem=3),走 v1,handler 跳过 ---- + + @Test + void testOnMessage_hybridWhitelistMiss_skipHandler() { + Long otherSubsystem = 3L; + IotDeviceMessage msg = buildMessage(); + IotDeviceDO device = buildDevice(otherSubsystem); + + when(deviceService.getDeviceFromCache(DEVICE_ID)).thenReturn(device); + when(versionResolver.shouldUseV2(otherSubsystem, TENANT_ID)).thenReturn(false); // subsystem 3 not in whitelist + + handler.onMessage(msg); + + verify(ruleEngine, never()).execute(any(), any(), any(), any(), any()); + } + + // ---- 场景 5:device 已删除(返回 null),log WARN + skip ---- + + @Test + void testOnMessage_deviceNull_skipHandler() { + IotDeviceMessage msg = buildMessage(); + + when(deviceService.getDeviceFromCache(DEVICE_ID)).thenReturn(null); + + handler.onMessage(msg); + + verify(versionResolver, never()).shouldUseV2(any(), any()); + verify(ruleEngine, never()).execute(any(), any(), any(), any(), any()); + } + + // ---- 场景 6:RuleEngine 抛异常,handler 捕获,不向上抛出 ---- + + @Test + void testOnMessage_ruleEngineException_swallowed() { + IotDeviceMessage msg = buildMessage(); + IotDeviceDO device = buildDevice(SUBSYSTEM_ID); + + when(deviceService.getDeviceFromCache(DEVICE_ID)).thenReturn(device); + when(versionResolver.shouldUseV2(SUBSYSTEM_ID, TENANT_ID)).thenReturn(true); + when(ruleEngine.execute(any(), any(), any(), any(), any())) + .thenThrow(new RuntimeException("simulated engine error")); + + // 不应向外抛出异常 + handler.onMessage(msg); + + verify(ruleEngine, times(1)).execute(any(), any(), any(), any(), any()); + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/service/alarm/AlarmStateValidatorTest.java b/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/service/alarm/AlarmStateValidatorTest.java new file mode 100644 index 00000000..b2a1dc01 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/service/alarm/AlarmStateValidatorTest.java @@ -0,0 +1,119 @@ +package com.viewsh.module.iot.service.alarm; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * {@link AlarmStateValidator} 单元测试 + * + *

任务卡 B14 §3.3 时序校验逻辑: + *

    + *
  • isEffectiveTrigger:已清除告警 + 旧消息 → false
  • + *
  • isEffectiveTrigger:已清除告警 + 新消息 → true(重新激活)
  • + *
  • isEffectiveTrigger:活跃告警任何消息 → true
  • + *
  • isEffectiveTrigger:cache == null(首次)→ true
  • + *
  • isEffectiveClear:活跃告警 + 新消息 → true
  • + *
  • isEffectiveClear:已清除告警 → false(幂等)
  • + *
  • isEffectiveClear:活跃告警 + 旧消息 → false
  • + *
  • isEffectiveClear:cache == null → false
  • + *
+ * + * @author B14 + */ +class AlarmStateValidatorTest { + + private final AlarmStateValidator validator = new AlarmStateValidator(); + + // ==================== isEffectiveTrigger ==================== + + @Test + void testIsEffectiveTrigger_clearedAlarm_oldMessage_returnFalse() { + AlarmCacheState cache = AlarmCacheState.builder() + .clearState(1) + .clearTime(5000L) + .alarmTime(1000L) + .build(); + // 消息时间戳 3000 < clearTime 5000 → 旧消息,不生效 + assertFalse(validator.isEffectiveTrigger(cache, 3000L)); + } + + @Test + void testIsEffectiveTrigger_clearedAlarm_newMessage_returnTrue() { + AlarmCacheState cache = AlarmCacheState.builder() + .clearState(1) + .clearTime(5000L) + .alarmTime(1000L) + .build(); + // 消息时间戳 6000 > clearTime 5000 → 重新激活,生效 + assertTrue(validator.isEffectiveTrigger(cache, 6000L)); + } + + @Test + void testIsEffectiveTrigger_activeAlarm_anyMessage_returnTrue() { + AlarmCacheState cache = AlarmCacheState.builder() + .clearState(0) + .alarmTime(1000L) + .clearTime(0L) + .build(); + // 活跃告警,任何时间戳都接受 + assertTrue(validator.isEffectiveTrigger(cache, 500L)); + assertTrue(validator.isEffectiveTrigger(cache, 2000L)); + } + + @Test + void testIsEffectiveTrigger_nullCache_returnTrue() { + // 首次触发,无缓存 + assertTrue(validator.isEffectiveTrigger(null, 1000L)); + } + + // ==================== isEffectiveClear ==================== + + @Test + void testIsEffectiveClear_activeAlarm_newMessage_returnTrue() { + AlarmCacheState cache = AlarmCacheState.builder() + .clearState(0) + .alarmTime(2000L) + .build(); + // 消息时间戳 3000 > alarmTime 2000 → 有效清除 + assertTrue(validator.isEffectiveClear(cache, 3000L)); + } + + @Test + void testIsEffectiveClear_alreadyCleared_returnFalse() { + AlarmCacheState cache = AlarmCacheState.builder() + .clearState(1) + .alarmTime(2000L) + .clearTime(5000L) + .build(); + // 已清除,幂等忽略 + assertFalse(validator.isEffectiveClear(cache, 6000L)); + } + + @Test + void testIsEffectiveClear_activeAlarm_oldMessage_returnFalse() { + AlarmCacheState cache = AlarmCacheState.builder() + .clearState(0) + .alarmTime(5000L) + .build(); + // 消息时间戳 3000 < alarmTime 5000 → 旧消息,不清除 + assertFalse(validator.isEffectiveClear(cache, 3000L)); + } + + @Test + void testIsEffectiveClear_nullCache_returnFalse() { + // cache 为 null 表示告警不存在,清除无效 + assertFalse(validator.isEffectiveClear(null, 1000L)); + } + + @Test + void testIsEffectiveClear_sameTimestamp_returnFalse() { + AlarmCacheState cache = AlarmCacheState.builder() + .clearState(0) + .alarmTime(5000L) + .build(); + // 严格大于,等于不生效 + assertFalse(validator.isEffectiveClear(cache, 5000L)); + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/service/alarm/IotAlarmCacheServiceTest.java b/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/service/alarm/IotAlarmCacheServiceTest.java new file mode 100644 index 00000000..ab46bd26 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/service/alarm/IotAlarmCacheServiceTest.java @@ -0,0 +1,156 @@ +package com.viewsh.module.iot.service.alarm; + +import com.viewsh.module.iot.dal.dataobject.alarm.IotAlarmRecordDO; +import com.viewsh.module.iot.dal.mysql.alarm.IotAlarmRecordMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.data.redis.core.HashOperations; +import org.springframework.data.redis.core.StringRedisTemplate; + +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +/** + * {@link IotAlarmCacheService} 单元测试 + * + *

测试用例: + *

    + *
  • testGet_cacheHit:缓存命中,直接返回
  • + *
  • testCacheMiss_reloadFromDB:cache miss → 从 DB 重建
  • + *
  • testGet_cacheAndDbMiss:Redis + DB 均无 → 返回 null
  • + *
  • testUpdateState:写入 Hash + 刷新 TTL
  • + *
  • testEvict:删除 key
  • + *
+ * + * @author B14 + */ +@ExtendWith(MockitoExtension.class) +class IotAlarmCacheServiceTest { + + @InjectMocks + private IotAlarmCacheService cacheService; + + @Mock + private StringRedisTemplate stringRedisTemplate; + + @Mock + private IotAlarmRecordMapper alarmRecordMapper; + + @Mock + @SuppressWarnings("rawtypes") + private HashOperations hashOps; + + @BeforeEach + @SuppressWarnings("unchecked") + void setUp() { + // lenient: testEvict does not call opsForHash(), so this stub is not used in that test + lenient().when(stringRedisTemplate.opsForHash()).thenReturn(hashOps); + } + + // ==================== testGet_cacheHit ==================== + + @Test + @SuppressWarnings("unchecked") + void testGet_cacheHit() { + Map fields = new HashMap<>(); + fields.put("ack_state", "1"); + fields.put("clear_state", "1"); + fields.put("archived", "0"); + fields.put("severity", "3"); + fields.put("alarm_time", "1000"); + fields.put("clear_time", "2000"); + + when(hashOps.entries("iot:alarm:state:42")).thenReturn(fields); + + AlarmCacheState state = cacheService.get(42L); + + assertNotNull(state); + assertEquals(1, state.getAckState()); + assertEquals(1, state.getClearState()); + assertEquals(0, state.getArchived()); + assertEquals(3, state.getSeverity()); + assertEquals(1000L, state.getAlarmTime()); + assertEquals(2000L, state.getClearTime()); + + // DB 不应被查询 + verify(alarmRecordMapper, never()).selectById(anyLong()); + } + + // ==================== testCacheMiss_reloadFromDB ==================== + + @Test + @SuppressWarnings("unchecked") + void testCacheMiss_reloadFromDB() { + // Redis 返回空 map(cache miss) + when(hashOps.entries(anyString())).thenReturn(Collections.emptyMap()); + + IotAlarmRecordDO record = IotAlarmRecordDO.builder() + .id(99L) + .ackState(0).clearState(0).archived(0).severity(2) + .endTs(LocalDateTime.of(2024, 1, 1, 10, 0, 0)) + .build(); + when(alarmRecordMapper.selectById(99L)).thenReturn(record); + + AlarmCacheState state = cacheService.get(99L); + + assertNotNull(state); + assertEquals(0, state.getAckState()); + assertEquals(0, state.getClearState()); + assertEquals(2, state.getSeverity()); + + // DB 被查询,且缓存被回填 + verify(alarmRecordMapper).selectById(99L); + verify(hashOps).putAll(eq("iot:alarm:state:99"), anyMap()); + verify(stringRedisTemplate).expire(eq("iot:alarm:state:99"), eq(7L), eq(TimeUnit.DAYS)); + } + + // ==================== testGet_cacheAndDbMiss ==================== + + @Test + @SuppressWarnings("unchecked") + void testGet_cacheAndDbMiss() { + when(hashOps.entries(anyString())).thenReturn(Collections.emptyMap()); + when(alarmRecordMapper.selectById(123L)).thenReturn(null); + + AlarmCacheState state = cacheService.get(123L); + + assertNull(state); + } + + // ==================== testUpdateState ==================== + + @Test + @SuppressWarnings("unchecked") + void testUpdateState() { + IotAlarmRecordDO record = IotAlarmRecordDO.builder() + .id(55L) + .ackState(1).clearState(0).archived(0).severity(4) + .endTs(LocalDateTime.now()) + .build(); + + cacheService.updateState(55L, record); + + verify(hashOps).putAll(eq("iot:alarm:state:55"), anyMap()); + verify(stringRedisTemplate).expire(eq("iot:alarm:state:55"), eq(7L), eq(TimeUnit.DAYS)); + } + + // ==================== testEvict ==================== + + @Test + void testEvict() { + cacheService.evict(777L); + verify(stringRedisTemplate).delete("iot:alarm:state:777"); + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/service/alarm/IotAlarmLockServiceTest.java b/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/service/alarm/IotAlarmLockServiceTest.java new file mode 100644 index 00000000..3a4b46c5 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/service/alarm/IotAlarmLockServiceTest.java @@ -0,0 +1,143 @@ +package com.viewsh.module.iot.service.alarm; + +import com.viewsh.framework.common.exception.ServiceException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.data.redis.core.script.DefaultRedisScript; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.ValueOperations; + +import java.time.Duration; +import java.util.List; + +import static com.viewsh.module.iot.enums.ErrorCodeConstants.ALARM_LOCK_CONFLICT; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +/** + * {@link IotAlarmLockService} 单元测试 + * + *

任务卡 B14 §6 测试用例: + *

    + *
  • testLock_acquireRelease:正常获取并释放
  • + *
  • testLock_conflict:锁被占用时抛 ALARM_LOCK_CONFLICT
  • + *
  • testLock_releaseOtherToken:Lua 脚本返回 0(token 不匹配),warn 日志但不抛异常
  • + *
+ * + * @author B14 + */ +@ExtendWith(MockitoExtension.class) +class IotAlarmLockServiceTest { + + @InjectMocks + private IotAlarmLockService lockService; + + @Mock + private StringRedisTemplate stringRedisTemplate; + + @Mock + private ValueOperations valueOps; + + @BeforeEach + void setUp() { + when(stringRedisTemplate.opsForValue()).thenReturn(valueOps); + } + + // ==================== testLock_acquireRelease ==================== + + /** + * 正常获取锁 + 执行 action + 释放锁(Lua 返回 1) + */ + @Test + @SuppressWarnings("unchecked") + void testLock_acquireRelease() { + // SET NX 成功 + when(valueOps.setIfAbsent(anyString(), anyString(), any(Duration.class))).thenReturn(true); + // Lua 解锁返回 1(成功删除) + when(stringRedisTemplate.execute(any(DefaultRedisScript.class), anyList(), any())).thenReturn(1L); + + String result = lockService.executeWithLock(100L, Duration.ofSeconds(5), () -> "ok"); + + assertEquals("ok", result); + verify(valueOps).setIfAbsent(eq("iot:alarm:lock:100"), anyString(), eq(Duration.ofSeconds(5))); + // Lua 解锁被执行 + verify(stringRedisTemplate).execute( + any(DefaultRedisScript.class), + eq(List.of("iot:alarm:lock:100")), + anyString() + ); + } + + // ==================== testLock_conflict ==================== + + /** + * SET NX 返回 false(另一个进程持有锁)→ 抛 ALARM_LOCK_CONFLICT + */ + @Test + void testLock_conflict() { + // setIfAbsent 返回 false → 获取锁失败 + when(valueOps.setIfAbsent(anyString(), anyString(), any(Duration.class))).thenReturn(false); + + ServiceException ex = assertThrows(ServiceException.class, + () -> lockService.executeWithLock(200L, Duration.ofSeconds(5), () -> "should not run")); + + assertEquals(ALARM_LOCK_CONFLICT.getCode(), ex.getCode()); + // action 未执行,也不应尝试解锁 + verify(stringRedisTemplate, never()).execute(any(DefaultRedisScript.class), anyList(), anyString()); + } + + // ==================== testLock_releaseOtherToken ==================== + + /** + * Lua 脚本返回 0(token 不匹配:锁已过期,B 拿到了),A 释放时不删 B 的锁,仅打 warn 日志 + */ + @Test + @SuppressWarnings("unchecked") + void testLock_releaseOtherToken() { + // 获取锁成功 + when(valueOps.setIfAbsent(anyString(), anyString(), any(Duration.class))).thenReturn(true); + // Lua 返回 0 → token 不匹配 + when(stringRedisTemplate.execute(any(DefaultRedisScript.class), anyList(), any())).thenReturn(0L); + + // 不应抛异常(仅 warn 日志,不影响业务) + assertDoesNotThrow(() -> lockService.executeWithLock(300L, Duration.ofSeconds(5), () -> "done")); + + // Lua 脚本被调用了 + verify(stringRedisTemplate).execute( + any(DefaultRedisScript.class), + eq(List.of("iot:alarm:lock:300")), + anyString() + ); + } + + // ==================== action 异常时也要释放锁 ==================== + + /** + * action 抛出异常时,finally 块中仍应执行 Lua 解锁 + */ + @Test + @SuppressWarnings("unchecked") + void testLock_releaseOnException() { + when(valueOps.setIfAbsent(anyString(), anyString(), any(Duration.class))).thenReturn(true); + when(stringRedisTemplate.execute(any(DefaultRedisScript.class), anyList(), any())).thenReturn(1L); + + RuntimeException thrown = assertThrows(RuntimeException.class, + () -> lockService.executeWithLock(400L, Duration.ofSeconds(5), () -> { + throw new RuntimeException("action failed"); + })); + + assertEquals("action failed", thrown.getMessage()); + // 即使 action 失败,锁也应被释放 + verify(stringRedisTemplate).execute( + any(DefaultRedisScript.class), + eq(List.of("iot:alarm:lock:400")), + anyString() + ); + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/service/alarm/IotAlarmRecordServiceImplTest.java b/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/service/alarm/IotAlarmRecordServiceImplTest.java index db79c699..f4db8bce 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/service/alarm/IotAlarmRecordServiceImplTest.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/test/java/com/viewsh/module/iot/service/alarm/IotAlarmRecordServiceImplTest.java @@ -18,6 +18,9 @@ import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; +import java.time.Duration; +import java.util.function.Supplier; + import static com.viewsh.module.iot.enums.ErrorCodeConstants.*; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; @@ -25,9 +28,12 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; /** - * {@link IotAlarmRecordServiceImpl} 单元测试(覆盖任务卡 B12 §6.1 的 10 个用例)。 + * {@link IotAlarmRecordServiceImpl} 单元测试(覆盖任务卡 B12 §6.1 + B14 集成路径)。 * - * @author B12 + *

B14 新增依赖({@link IotAlarmLockService}/{@link IotAlarmCacheService}/{@link AlarmStateValidator}) + * 均通过 Mockito @Mock 注入,锁默认透传执行(执行 Supplier),不模拟冲突(冲突用例在专项测试中)。 + * + * @author B12 / B14 */ @ExtendWith(MockitoExtension.class) class IotAlarmRecordServiceImplTest { @@ -38,6 +44,15 @@ class IotAlarmRecordServiceImplTest { @Mock private IotAlarmRecordMapper alarmRecordMapper; + @Mock + private IotAlarmLockService lockService; + + @Mock + private IotAlarmCacheService cacheService; + + @Mock + private AlarmStateValidator validator; + private MockedStatic tenantMock; private static final Long TENANT_ID = 1L; @@ -45,9 +60,19 @@ class IotAlarmRecordServiceImplTest { private static final Long CONFIG_ID = 200L; @BeforeEach + @SuppressWarnings("unchecked") void setUp() { tenantMock = mockStatic(TenantContextHolder.class); tenantMock.when(TenantContextHolder::getTenantId).thenReturn(TENANT_ID); + + // 默认:lockService 直接执行 Supplier(无锁冲突)— lenient 避免部分测试不触发锁时报 UnnecessaryStubbing + lenient().doAnswer(inv -> { + Supplier supplier = inv.getArgument(2); + return supplier.get(); + }).when(lockService).executeWithLock(any(), any(Duration.class), any(Supplier.class)); + + // 默认:validator 允许所有操作 — lenient 避免首次触发路径不调用 validator 时报 UnnecessaryStubbing + lenient().when(validator.isEffectiveTrigger(any(), anyLong())).thenReturn(true); } @AfterEach @@ -91,6 +116,8 @@ class IotAlarmRecordServiceImplTest { .build(); when(alarmRecordMapper.selectActiveByDeviceAndConfig(DEVICE_ID, CONFIG_ID, TENANT_ID)) .thenReturn(existing); + // 锁内再查最新 DO + when(alarmRecordMapper.selectById(555L)).thenReturn(existing); Long id = alarmService.triggerAlarm(buildTriggerReq(3)); @@ -260,8 +287,9 @@ class IotAlarmRecordServiceImplTest { alarmService.ackAlarm(AlarmStateTransitionRequest.builder().alarmId(17L).build()); - // 已确认 → no-op + // 已确认 → no-op,不走锁 verify(alarmRecordMapper, never()).updateById(any(IotAlarmRecordDO.class)); + verify(lockService, never()).executeWithLock(any(), any(), any()); } @Test @@ -273,6 +301,64 @@ class IotAlarmRecordServiceImplTest { assertEquals(ALARM_RECORD_NOT_EXISTS.getCode(), ex.getCode()); } + // ==================== B14 新增:旧消息忽略测试 ==================== + + /** + * testTrigger_oldMessage:消息时间戳早于 clear_time,validator 返回 false → 不更新状态 + */ + @Test + @SuppressWarnings("unchecked") + void testTrigger_oldMessage() { + IotAlarmRecordDO existing = IotAlarmRecordDO.builder() + .id(888L).deviceId(DEVICE_ID).alarmConfigId(CONFIG_ID) + .ackState(0).clearState(1).archived(0).triggerCount(5) + .build(); + when(alarmRecordMapper.selectActiveByDeviceAndConfig(DEVICE_ID, CONFIG_ID, TENANT_ID)) + .thenReturn(existing); + + // validator 判断为旧消息 + when(validator.isEffectiveTrigger(any(), anyLong())).thenReturn(false); + + Long id = alarmService.triggerAlarm(buildTriggerReq(3)); + + assertEquals(888L, id); + // 旧消息不触发 DB 写 + verify(alarmRecordMapper, never()).updateById(any(IotAlarmRecordDO.class)); + } + + /** + * testTrigger_cacheUpdate:trigger 成功后缓存被更新 + */ + @Test + void testTrigger_cacheUpdate() { + IotAlarmRecordDO existing = IotAlarmRecordDO.builder() + .id(777L).deviceId(DEVICE_ID).alarmConfigId(CONFIG_ID) + .ackState(0).clearState(0).archived(0).triggerCount(1) + .build(); + when(alarmRecordMapper.selectActiveByDeviceAndConfig(DEVICE_ID, CONFIG_ID, TENANT_ID)) + .thenReturn(existing); + when(alarmRecordMapper.selectById(777L)).thenReturn(existing); + when(validator.isEffectiveTrigger(any(), anyLong())).thenReturn(true); + + alarmService.triggerAlarm(buildTriggerReq(3)); + + // 缓存应被更新 + verify(cacheService, atLeastOnce()).updateState(eq(777L), any(IotAlarmRecordDO.class)); + } + + /** + * testArchive_cacheEvict:归档后缓存被驱逐 + */ + @Test + void testArchive_cacheEvict() { + IotAlarmRecordDO active = activeAlarm(666L); + when(alarmRecordMapper.selectById(666L)).thenReturn(active); + + alarmService.archiveAlarm(AlarmStateTransitionRequest.builder().alarmId(666L).build()); + + verify(cacheService).evict(666L); + } + // ==================== 辅助 ==================== private AlarmTriggerRequest buildTriggerReq(int severity) {