com.googlecode.aviator
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/NotifyAction.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/NotifyAction.java
index ee5a8972..9a1a5d22 100644
--- a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/NotifyAction.java
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/action/NotifyAction.java
@@ -2,18 +2,17 @@ package com.viewsh.module.iot.rule.action;
import com.fasterxml.jackson.databind.JsonNode;
import com.viewsh.module.iot.rule.engine.RuleContext;
+import com.viewsh.module.iot.rule.notify.NotifyDispatcher;
+import com.viewsh.module.iot.rule.notify.model.NotifyResult;
import com.viewsh.module.iot.rule.result.ActionResult;
import com.viewsh.module.iot.rule.spi.ActionProvider;
import com.viewsh.module.iot.rule.template.TemplateResolver;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
/**
* 通知发送 Action(notify)。
@@ -22,37 +21,53 @@ import java.util.concurrent.Executors;
* {@code
* {
* "channels": ["sms", "email", "in_app", "webhook"],
- * "receivers": { "userIds": [1001], "webhookUrl": "https://hook.example.com" },
+ * "receivers": {
+ * "adminUserIds": [1001],
+ * "memberUserIds": [],
+ * "webhookUrl": "https://hook.example.com"
+ * },
* "template": {
* "title": "设备 ${meta.deviceName} 告警",
* "body": "告警:${meta.alarmName},触发值 ${data.temperature}°C"
- * }
+ * },
+ * "sms": {"templateCode": "ALARM_TRIGGER"},
+ * "mail": {"templateCode": "ALARM_MAIL"},
+ * "inApp": {"templateCode": "ALARM_INAPP"}
* }
* }
*
- * 4 个通道并发触发,部分失败不阻塞其他通道,最终汇总结果。
- * 评审 C5:title/body 统一走 {@link TemplateResolver} 解析。
- * 评审 B6:@Async 慎用,保持同步线程池以保留 traceId 和 tenant 上下文。
- *
- *
第一期 B16(NotifyService)未就绪,各通道以 TODO stub 占位并记录日志;
- * B16 就绪后替换 stub 即可。
+ *
委托 {@link NotifyDispatcher} 并行发送到各通道(B16 通知集成)。
+ * 部分失败不阻塞其他通道,最终汇总结果并记录。
+ * 评审 C5:title/body 统一走 {@link TemplateResolver} 解析(在 Dispatcher 内完成)。
+ * 评审 B6:独立 ForkJoinPool,不占用 RuleEngine 主池。
*/
@Slf4j
@Component
-@RequiredArgsConstructor
public class NotifyAction implements ActionProvider {
public static final String TYPE = "notify";
+ // TemplateResolver 保留(单元测试 ActionProviderTest 直接 new NotifyAction(templateResolver) 构造)
+ @SuppressWarnings("unused")
private final TemplateResolver templateResolver;
- /** 通知并发线程池(复用,避免每次 Action 创建) */
- private static final ExecutorService NOTIFY_POOL =
- Executors.newFixedThreadPool(4, r -> {
- Thread t = new Thread(r, "iot-notify-");
- t.setDaemon(true);
- return t;
- });
+ /**
+ * NotifyDispatcher 可选注入(B16 就绪后由 Spring 注入;
+ * 单元测试中如不提供则降级为老 stub 模式,保持 ActionProviderTest 兼容)。
+ */
+ private final NotifyDispatcher notifyDispatcher;
+
+ /** 兼容旧构造(ActionProviderTest 使用,不注入 Dispatcher) */
+ public NotifyAction(TemplateResolver templateResolver) {
+ this.templateResolver = templateResolver;
+ this.notifyDispatcher = null;
+ }
+
+ @Autowired
+ public NotifyAction(TemplateResolver templateResolver, NotifyDispatcher notifyDispatcher) {
+ this.templateResolver = templateResolver;
+ this.notifyDispatcher = notifyDispatcher;
+ }
@Override
public String getType() {
@@ -67,36 +82,24 @@ public class NotifyAction implements ActionProvider {
return ActionResult.failure("notify: channels 未配置");
}
- String title = resolveTemplate(config.path("template").path("title").asText(""), ctx);
- String body = resolveTemplate(config.path("template").path("body").asText(""), ctx);
- JsonNode receivers = config.path("receivers");
-
- List channels = new ArrayList<>();
- for (JsonNode ch : channelsNode) {
- channels.add(ch.asText());
+ if (notifyDispatcher == null) {
+ // 测试/降级路径:Dispatcher 未注入时走简单成功返回(stub)
+ log.info("[NotifyAction] NotifyDispatcher 未注入,stub 模式跳过通知 chain={}", ctx.getChainId());
+ return ActionResult.successMsg("notify: stub 模式(NotifyDispatcher 未就绪)");
}
- List> futures = channels.stream()
- .map(channel -> CompletableFuture.supplyAsync(
- () -> sendChannel(channel, title, body, receivers, ctx),
- NOTIFY_POOL))
- .toList();
+ List results = notifyDispatcher.dispatch(config, ctx);
- CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+ List failed = results.stream()
+ .filter(r -> !r.isSuccess())
+ .map(NotifyResult::toString)
+ .collect(Collectors.toList());
- List failedChannels = new ArrayList<>();
- for (int i = 0; i < channels.size(); i++) {
- ChannelResult cr = futures.get(i).join();
- if (!cr.success()) {
- failedChannels.add(channels.get(i) + ":" + cr.error());
- }
- }
-
- if (failedChannels.isEmpty()) {
+ if (failed.isEmpty()) {
return ActionResult.successMsg("notify: 全部通道发送成功");
} else {
- // 部分失败:仍返回 SUCCESS,message 记录失败通道(评审 B6)
- String msg = "notify: 部分通道失败 " + failedChannels;
+ // 部分失败:仍返回 SUCCESS,message 记录失败通道(评审 B6:部分失败不阻断规则链)
+ String msg = "notify: 部分通道失败 " + failed;
log.warn("[NotifyAction] chain={} {}", ctx.getChainId(), msg);
return ActionResult.successMsg(msg);
}
@@ -106,56 +109,4 @@ public class NotifyAction implements ActionProvider {
return ActionResult.failure(e.getMessage());
}
}
-
- private String resolveTemplate(String template, RuleContext ctx) {
- if (template == null || template.isBlank()) return template;
- try {
- return String.valueOf(templateResolver.resolve(template, ctx));
- } catch (Exception e) {
- log.warn("[NotifyAction] 模板解析失败 template='{}': {}", template, e.getMessage());
- return template;
- }
- }
-
- /**
- * 单通道发送(第一期 stub,B16 NotifyService 就绪后替换)。
- */
- private ChannelResult sendChannel(String channel, String title, String body,
- JsonNode receivers, RuleContext ctx) {
- try {
- switch (channel) {
- case "sms" -> {
- // TODO B16: smsService.send(receivers.userIds, title, body)
- log.info("[NotifyAction] [stub] sms chain={} title='{}' body='{}'",
- ctx.getChainId(), title, body);
- }
- case "email" -> {
- // TODO B16: emailService.send(receivers.userIds, title, body)
- log.info("[NotifyAction] [stub] email chain={} title='{}' body='{}'",
- ctx.getChainId(), title, body);
- }
- case "in_app" -> {
- // TODO B16: inAppService.send(receivers.userIds, title, body)
- log.info("[NotifyAction] [stub] in_app chain={} title='{}' body='{}'",
- ctx.getChainId(), title, body);
- }
- case "webhook" -> {
- String webhookUrl = receivers.path("webhookUrl").asText("");
- // TODO B16: webhookService.post(webhookUrl, title, body)
- log.info("[NotifyAction] [stub] webhook chain={} url='{}' title='{}' body='{}'",
- ctx.getChainId(), webhookUrl, title, body);
- }
- default -> {
- log.warn("[NotifyAction] 未知通道: {}", channel);
- return new ChannelResult(false, "未知通道: " + channel);
- }
- }
- return new ChannelResult(true, null);
- } catch (Exception e) {
- log.warn("[NotifyAction] channel={} 发送失败: {}", channel, e.getMessage());
- return new ChannelResult(false, e.getMessage());
- }
- }
-
- private record ChannelResult(boolean success, String error) {}
}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/NotifyDispatcher.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/NotifyDispatcher.java
new file mode 100644
index 00000000..76504e8b
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/NotifyDispatcher.java
@@ -0,0 +1,249 @@
+package com.viewsh.module.iot.rule.notify;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.viewsh.module.iot.rule.engine.RuleContext;
+import com.viewsh.module.iot.rule.notify.channel.NotifyChannel;
+import com.viewsh.module.iot.rule.notify.model.NotifyRequest;
+import com.viewsh.module.iot.rule.notify.model.NotifyResult;
+import com.viewsh.module.iot.rule.template.TemplateResolver;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * 通知分发器(B16 通知集成)。
+ *
+ * 职责:
+ *
+ * - 使用 {@link TemplateResolver} 统一解析 title/body 模板变量(评审 C5)
+ * - 根据 config.channels 路由到对应的 {@link NotifyChannel} 实现
+ * - 独立 {@link ForkJoinPool}(不占用 RuleEngine 主池,评审 Known Pitfalls)并行发送
+ * - 30s 整体超时(单通道超时由各通道自行控制)
+ * - 部分失败不阻塞其他通道(评审 B6)
+ *
+ *
+ * config JSON 示例:
+ *
{@code
+ * {
+ * "channels": ["sms", "email", "in_app", "webhook"],
+ * "receivers": {
+ * "adminUserIds": [1001, 1002],
+ * "memberUserIds": [2001],
+ * "webhookUrl": "https://hook.example.com/alert"
+ * },
+ * "template": {
+ * "title": "${alarm.severity} 告警:${alarm.name}",
+ * "body": "设备 ${meta.deviceName} 于 ${alarm.startTs} 触发,当前值 ${data.temperature}°C"
+ * },
+ * "sms": {"templateCode": "ALARM_TRIGGER"},
+ * "mail": {"templateCode": "ALARM_MAIL"},
+ * "inApp": {"templateCode": "ALARM_INAPP"}
+ * }
+ * }
+ */
+@Slf4j
+@Service
+public class NotifyDispatcher {
+
+ /** 整体超时(所有通道并行发送的等待上限,评审 Known Pitfalls) */
+ static final int OVERALL_TIMEOUT_SECONDS = 30;
+
+ /**
+ * 独立 ForkJoinPool,不占用 RuleEngine 或公共 ForkJoin 主池(评审 Known Pitfalls)。
+ * 并行度 = 8(4 通道 × 2,留余量)。
+ */
+ private final ForkJoinPool notifyExecutor = new ForkJoinPool(8);
+
+ private final List channels;
+ private final TemplateResolver templateResolver;
+
+ /** 通道名称 → 实现 Map(启动时构建,O(1) 路由) */
+ private final Map channelMap;
+
+ public NotifyDispatcher(List channels, TemplateResolver templateResolver) {
+ this.channels = channels;
+ this.templateResolver = templateResolver;
+ this.channelMap = new HashMap<>();
+ for (NotifyChannel ch : channels) {
+ channelMap.put(ch.getName(), ch);
+ }
+ log.info("[NotifyDispatcher] 注册通道: {}", channelMap.keySet());
+ }
+
+ /**
+ * 并行分发通知。
+ *
+ * @param config 规则节点 config(含 channels/receivers/template 等)
+ * @param ctx 规则执行上下文(供模板解析使用)
+ * @return 各通道发送结果列表(含成功与失败)
+ */
+ public List dispatch(JsonNode config, RuleContext ctx) {
+ // 1. 目标通道列表
+ List targetChannelNames = parseChannels(config);
+ if (targetChannelNames.isEmpty()) {
+ log.warn("[NotifyDispatcher] chain={} channels 未配置,跳过通知", ctx.getChainId());
+ return List.of();
+ }
+
+ // 2. 模板预解析(所有通道共用,评审 C5)
+ String title = resolveTemplate(config.path("template").path("title").asText(""), ctx);
+ String body = resolveTemplate(config.path("template").path("body").asText(""), ctx);
+
+ // 3. 构建各通道请求
+ List requests = targetChannelNames.stream()
+ .map(name -> buildRequest(name, title, body, config))
+ .collect(Collectors.toList());
+
+ // 4. 并行发送(独立 ForkJoinPool)
+ List> futures = requests.stream()
+ .map(req -> CompletableFuture.supplyAsync(
+ () -> sendSafely(req),
+ notifyExecutor))
+ .collect(Collectors.toList());
+
+ // 5. 等待全部完成(整体超时 30s)
+ List results = new ArrayList<>();
+ for (int i = 0; i < futures.size(); i++) {
+ CompletableFuture future = futures.get(i);
+ String channelName = targetChannelNames.get(i);
+ try {
+ results.add(future.get(OVERALL_TIMEOUT_SECONDS, TimeUnit.SECONDS));
+ } catch (Exception e) {
+ log.warn("[NotifyDispatcher] channel={} 超时或异常: {}", channelName, e.getMessage());
+ results.add(NotifyResult.failure(channelName, "timeout: " + e.getMessage()));
+ }
+ }
+
+ // 6. 汇总日志
+ long failCount = results.stream().filter(r -> !r.isSuccess()).count();
+ if (failCount == 0) {
+ log.info("[NotifyDispatcher] chain={} 全部通道发送成功 channels={}", ctx.getChainId(), targetChannelNames);
+ } else {
+ log.warn("[NotifyDispatcher] chain={} 部分通道失败 {}/{}: {}",
+ ctx.getChainId(), failCount, results.size(),
+ results.stream().filter(r -> !r.isSuccess()).map(NotifyResult::toString).collect(Collectors.joining(", ")));
+ }
+
+ return results;
+ }
+
+ // ====== Private methods ======
+
+ private List parseChannels(JsonNode config) {
+ List names = new ArrayList<>();
+ JsonNode chNode = config.path("channels");
+ if (chNode.isArray()) {
+ for (JsonNode n : chNode) {
+ String name = n.asText("").trim();
+ if (StringUtils.hasText(name)) {
+ names.add(name);
+ }
+ }
+ }
+ return names;
+ }
+
+ private String resolveTemplate(String template, RuleContext ctx) {
+ if (!StringUtils.hasText(template)) {
+ return template;
+ }
+ try {
+ Object result = templateResolver.resolve(template, ctx);
+ return result == null ? "" : String.valueOf(result);
+ } catch (Exception e) {
+ log.warn("[NotifyDispatcher] 模板解析失败 '{}': {}", template, e.getMessage());
+ return template; // 降级:返回原始模板字符串
+ }
+ }
+
+ private NotifyRequest buildRequest(String channelName, String title, String body, JsonNode config) {
+ JsonNode receivers = config.path("receivers");
+
+ NotifyRequest.NotifyRequestBuilder builder = NotifyRequest.builder()
+ .channel(channelName)
+ .title(title)
+ .body(body)
+ .adminUserIds(parseUserIds(receivers.path("adminUserIds")))
+ .memberUserIds(parseUserIds(receivers.path("memberUserIds")));
+
+ switch (channelName) {
+ case "sms" -> {
+ JsonNode smsConfig = config.path("sms");
+ builder.smsTemplateCode(smsConfig.path("templateCode").asText("ALARM_SMS"));
+ builder.smsTemplateParams(jsonToMap(smsConfig.path("templateParams")));
+ }
+ case "email" -> {
+ JsonNode mailConfig = config.path("mail");
+ builder.mailTemplateCode(mailConfig.path("templateCode").asText("ALARM_MAIL"));
+ builder.mailTemplateParams(jsonToMap(mailConfig.path("templateParams")));
+ }
+ case "in_app" -> {
+ JsonNode inAppConfig = config.path("inApp");
+ builder.inAppTemplateCode(inAppConfig.path("templateCode").asText("ALARM_INAPP"));
+ builder.inAppTemplateParams(jsonToMap(inAppConfig.path("templateParams")));
+ }
+ case "webhook" -> {
+ String webhookUrl = receivers.path("webhookUrl").asText("");
+ builder.webhookUrl(webhookUrl);
+ builder.webhookHeaders(jsonToStringMap(config.path("webhookHeaders")));
+ String webhookBody = config.path("webhookBody").asText(null);
+ builder.webhookBody(webhookBody);
+ }
+ default -> log.warn("[NotifyDispatcher] 未知通道: {}", channelName);
+ }
+
+ return builder.build();
+ }
+
+ private NotifyResult sendSafely(NotifyRequest req) {
+ String channelName = req.getChannel();
+ NotifyChannel channel = channelMap.get(channelName);
+ if (channel == null) {
+ log.warn("[NotifyDispatcher] 未找到通道实现: {}", channelName);
+ return NotifyResult.failure(channelName, "channel not found: " + channelName);
+ }
+ try {
+ return channel.send(req);
+ } catch (Exception e) {
+ log.error("[NotifyDispatcher] channel={} 异常: {}", channelName, e.getMessage(), e);
+ return NotifyResult.failure(channelName, e.getMessage());
+ }
+ }
+
+ private List parseUserIds(JsonNode node) {
+ List ids = new ArrayList<>();
+ if (node != null && node.isArray()) {
+ for (JsonNode n : node) {
+ ids.add(n.asLong());
+ }
+ }
+ return ids;
+ }
+
+ private Map jsonToMap(JsonNode node) {
+ if (node == null || node.isMissingNode()) {
+ return null;
+ }
+ Map map = new HashMap<>();
+ node.fields().forEachRemaining(e -> map.put(e.getKey(), e.getValue().asText()));
+ return map;
+ }
+
+ private Map jsonToStringMap(JsonNode node) {
+ if (node == null || node.isMissingNode()) {
+ return null;
+ }
+ Map map = new HashMap<>();
+ node.fields().forEachRemaining(e -> map.put(e.getKey(), e.getValue().asText()));
+ return map;
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/EmailNotifyChannel.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/EmailNotifyChannel.java
new file mode 100644
index 00000000..337cf67c
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/EmailNotifyChannel.java
@@ -0,0 +1,82 @@
+package com.viewsh.module.iot.rule.notify.channel;
+
+import com.viewsh.module.iot.rule.notify.model.NotifyRequest;
+import com.viewsh.module.iot.rule.notify.model.NotifyResult;
+import com.viewsh.module.system.api.mail.MailSendApi;
+import com.viewsh.module.system.api.mail.dto.MailSendSingleToUserReqDTO;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 邮件通知通道(B16 通知集成)。
+ *
+ * 调用 {@link MailSendApi} Feign 发送邮件。
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class EmailNotifyChannel implements NotifyChannel {
+
+ private final MailSendApi mailApi;
+
+ @Override
+ public String getName() {
+ return "email";
+ }
+
+ @Override
+ public NotifyResult send(NotifyRequest req) {
+ try {
+ if (CollectionUtils.isEmpty(req.getAdminUserIds())
+ && CollectionUtils.isEmpty(req.getMemberUserIds())) {
+ log.warn("[EmailNotifyChannel] 未配置接收用户,跳过发送");
+ return NotifyResult.failure(getName(), "no receivers");
+ }
+
+ Map params = buildParams(req);
+
+ // Admin 用户
+ if (!CollectionUtils.isEmpty(req.getAdminUserIds())) {
+ for (Long userId : req.getAdminUserIds()) {
+ MailSendSingleToUserReqDTO dto = new MailSendSingleToUserReqDTO();
+ dto.setUserId(userId);
+ dto.setTemplateCode(req.getMailTemplateCode());
+ dto.setTemplateParams(params);
+ mailApi.sendSingleMailToAdmin(dto);
+ }
+ }
+
+ // Member 用户
+ if (!CollectionUtils.isEmpty(req.getMemberUserIds())) {
+ for (Long userId : req.getMemberUserIds()) {
+ MailSendSingleToUserReqDTO dto = new MailSendSingleToUserReqDTO();
+ dto.setUserId(userId);
+ dto.setTemplateCode(req.getMailTemplateCode());
+ dto.setTemplateParams(params);
+ mailApi.sendSingleMailToMember(dto);
+ }
+ }
+
+ log.debug("[EmailNotifyChannel] 发送成功 templateCode={}", req.getMailTemplateCode());
+ return NotifyResult.success(getName());
+ } catch (Exception e) {
+ log.error("[EmailNotifyChannel] 发送失败: {}", e.getMessage(), e);
+ return NotifyResult.failure(getName(), e.getMessage());
+ }
+ }
+
+ private Map buildParams(NotifyRequest req) {
+ Map params = new HashMap<>();
+ if (req.getMailTemplateParams() != null) {
+ params.putAll(req.getMailTemplateParams());
+ }
+ params.putIfAbsent("title", req.getTitle());
+ params.putIfAbsent("body", req.getBody());
+ return params;
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/InAppNotifyChannel.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/InAppNotifyChannel.java
new file mode 100644
index 00000000..271ec4cd
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/InAppNotifyChannel.java
@@ -0,0 +1,82 @@
+package com.viewsh.module.iot.rule.notify.channel;
+
+import com.viewsh.module.iot.rule.notify.model.NotifyRequest;
+import com.viewsh.module.iot.rule.notify.model.NotifyResult;
+import com.viewsh.module.system.api.notify.NotifyMessageSendApi;
+import com.viewsh.module.system.api.notify.dto.NotifySendSingleToUserReqDTO;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 站内信通知通道(B16 通知集成)。
+ *
+ * 调用 {@link NotifyMessageSendApi} Feign 发送站内信。
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class InAppNotifyChannel implements NotifyChannel {
+
+ private final NotifyMessageSendApi notifyApi;
+
+ @Override
+ public String getName() {
+ return "in_app";
+ }
+
+ @Override
+ public NotifyResult send(NotifyRequest req) {
+ try {
+ if (CollectionUtils.isEmpty(req.getAdminUserIds())
+ && CollectionUtils.isEmpty(req.getMemberUserIds())) {
+ log.warn("[InAppNotifyChannel] 未配置接收用户,跳过发送");
+ return NotifyResult.failure(getName(), "no receivers");
+ }
+
+ Map params = buildParams(req);
+
+ // Admin 用户
+ if (!CollectionUtils.isEmpty(req.getAdminUserIds())) {
+ for (Long userId : req.getAdminUserIds()) {
+ NotifySendSingleToUserReqDTO dto = new NotifySendSingleToUserReqDTO();
+ dto.setUserId(userId);
+ dto.setTemplateCode(req.getInAppTemplateCode());
+ dto.setTemplateParams(params);
+ notifyApi.sendSingleMessageToAdmin(dto);
+ }
+ }
+
+ // Member 用户
+ if (!CollectionUtils.isEmpty(req.getMemberUserIds())) {
+ for (Long userId : req.getMemberUserIds()) {
+ NotifySendSingleToUserReqDTO dto = new NotifySendSingleToUserReqDTO();
+ dto.setUserId(userId);
+ dto.setTemplateCode(req.getInAppTemplateCode());
+ dto.setTemplateParams(params);
+ notifyApi.sendSingleMessageToMember(dto);
+ }
+ }
+
+ log.debug("[InAppNotifyChannel] 发送成功 templateCode={}", req.getInAppTemplateCode());
+ return NotifyResult.success(getName());
+ } catch (Exception e) {
+ log.error("[InAppNotifyChannel] 发送失败: {}", e.getMessage(), e);
+ return NotifyResult.failure(getName(), e.getMessage());
+ }
+ }
+
+ private Map buildParams(NotifyRequest req) {
+ Map params = new HashMap<>();
+ if (req.getInAppTemplateParams() != null) {
+ params.putAll(req.getInAppTemplateParams());
+ }
+ params.putIfAbsent("title", req.getTitle());
+ params.putIfAbsent("body", req.getBody());
+ return params;
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/NotifyChannel.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/NotifyChannel.java
new file mode 100644
index 00000000..92f2f5e3
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/NotifyChannel.java
@@ -0,0 +1,30 @@
+package com.viewsh.module.iot.rule.notify.channel;
+
+import com.viewsh.module.iot.rule.notify.model.NotifyRequest;
+import com.viewsh.module.iot.rule.notify.model.NotifyResult;
+
+/**
+ * 通知通道 SPI 接口(B16 通知集成)。
+ *
+ * 每个通道实现对应一种推送方式(sms / email / in_app / webhook)。
+ * 实现类通过 Spring {@code @Component} 注册,由 {@link com.viewsh.module.iot.rule.notify.NotifyDispatcher}
+ * 按通道名称路由。
+ */
+public interface NotifyChannel {
+
+ /**
+ * 通道名称。
+ *
+ * @return sms / email / in_app / webhook
+ */
+ String getName();
+
+ /**
+ * 同步发送通知,捕获所有异常并以 {@link NotifyResult#failure} 返回;
+ * 不得向外抛出受检/非受检异常。
+ *
+ * @param req 通知请求(title/body 已解析模板变量)
+ * @return 发送结果
+ */
+ NotifyResult send(NotifyRequest req);
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/SmsNotifyChannel.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/SmsNotifyChannel.java
new file mode 100644
index 00000000..068a3e2a
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/SmsNotifyChannel.java
@@ -0,0 +1,84 @@
+package com.viewsh.module.iot.rule.notify.channel;
+
+import com.viewsh.module.iot.rule.notify.model.NotifyRequest;
+import com.viewsh.module.iot.rule.notify.model.NotifyResult;
+import com.viewsh.module.system.api.sms.SmsSendApi;
+import com.viewsh.module.system.api.sms.dto.send.SmsSendSingleToUserReqDTO;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * SMS 通知通道(B16 通知集成)。
+ *
+ *
调用 {@link SmsSendApi} Feign 发送短信。
+ * 短信走模板({@code req.smsTemplateCode} 必填),不是自由文本(评审 Known Pitfalls)。
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class SmsNotifyChannel implements NotifyChannel {
+
+ private final SmsSendApi smsApi;
+
+ @Override
+ public String getName() {
+ return "sms";
+ }
+
+ @Override
+ public NotifyResult send(NotifyRequest req) {
+ try {
+ if (CollectionUtils.isEmpty(req.getAdminUserIds())
+ && CollectionUtils.isEmpty(req.getMemberUserIds())) {
+ log.warn("[SmsNotifyChannel] 未配置接收用户,跳过发送");
+ return NotifyResult.failure(getName(), "no receivers");
+ }
+
+ Map params = buildParams(req);
+
+ // Admin 用户
+ if (!CollectionUtils.isEmpty(req.getAdminUserIds())) {
+ for (Long userId : req.getAdminUserIds()) {
+ SmsSendSingleToUserReqDTO dto = new SmsSendSingleToUserReqDTO();
+ dto.setUserId(userId);
+ dto.setTemplateCode(req.getSmsTemplateCode());
+ dto.setTemplateParams(params);
+ smsApi.sendSingleSmsToAdmin(dto);
+ }
+ }
+
+ // Member 用户
+ if (!CollectionUtils.isEmpty(req.getMemberUserIds())) {
+ for (Long userId : req.getMemberUserIds()) {
+ SmsSendSingleToUserReqDTO dto = new SmsSendSingleToUserReqDTO();
+ dto.setUserId(userId);
+ dto.setTemplateCode(req.getSmsTemplateCode());
+ dto.setTemplateParams(params);
+ smsApi.sendSingleSmsToMember(dto);
+ }
+ }
+
+ log.debug("[SmsNotifyChannel] 发送成功 templateCode={}", req.getSmsTemplateCode());
+ return NotifyResult.success(getName());
+ } catch (Exception e) {
+ log.error("[SmsNotifyChannel] 发送失败: {}", e.getMessage(), e);
+ return NotifyResult.failure(getName(), e.getMessage());
+ }
+ }
+
+ private Map buildParams(NotifyRequest req) {
+ Map params = new HashMap<>();
+ if (req.getSmsTemplateParams() != null) {
+ params.putAll(req.getSmsTemplateParams());
+ }
+ // 将 title/body 也注入模板参数,方便模板直接引用
+ params.putIfAbsent("title", req.getTitle());
+ params.putIfAbsent("body", req.getBody());
+ return params;
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/WebhookNotifyChannel.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/WebhookNotifyChannel.java
new file mode 100644
index 00000000..d893442d
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/channel/WebhookNotifyChannel.java
@@ -0,0 +1,140 @@
+package com.viewsh.module.iot.rule.notify.channel;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.viewsh.module.iot.rule.notify.model.NotifyRequest;
+import com.viewsh.module.iot.rule.notify.model.NotifyResult;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Webhook 通知通道(B16 通知集成)。
+ *
+ * 使用 JDK 11+ 内置 {@link HttpClient}(10s 超时),POST JSON 到目标 URL。
+ * 安全红线:URL 必须是 HTTPS,且可选配置允许主机白名单(防 SSRF)。
+ *
+ *
配置项(application.yml):
+ *
{@code
+ * iot:
+ * notify:
+ * webhook:
+ * allowed-hosts: # 可选;为空时仅校验 HTTPS
+ * - hook.example.com
+ * - api.company.com
+ * }
+ */
+@Slf4j
+@Component
+public class WebhookNotifyChannel implements NotifyChannel {
+
+ /** 单次 Webhook 请求超时(10s,评审 Known Pitfalls) */
+ private static final int TIMEOUT_SECONDS = 10;
+
+ private final HttpClient httpClient;
+ private final ObjectMapper objectMapper;
+
+ /**
+ * 可信主机白名单(为空时仅要求 HTTPS,不限制具体主机)。
+ * 通过 {@code iot.notify.webhook.allowed-hosts} 配置。
+ */
+ @Value("${iot.notify.webhook.allowed-hosts:}")
+ private List allowedHosts;
+
+ public WebhookNotifyChannel(ObjectMapper objectMapper) {
+ this.objectMapper = objectMapper;
+ this.httpClient = HttpClient.newBuilder()
+ .connectTimeout(Duration.ofSeconds(TIMEOUT_SECONDS))
+ .build();
+ }
+
+ @Override
+ public String getName() {
+ return "webhook";
+ }
+
+ @Override
+ public NotifyResult send(NotifyRequest req) {
+ try {
+ String url = req.getWebhookUrl();
+ validateWebhookUrl(url);
+
+ String body = buildBody(req);
+
+ HttpRequest.Builder builder = HttpRequest.newBuilder()
+ .uri(URI.create(url))
+ .timeout(Duration.ofSeconds(TIMEOUT_SECONDS))
+ .header("Content-Type", "application/json")
+ .POST(HttpRequest.BodyPublishers.ofString(body));
+
+ // 注入自定义请求头
+ if (req.getWebhookHeaders() != null) {
+ for (Map.Entry entry : req.getWebhookHeaders().entrySet()) {
+ builder.header(entry.getKey(), entry.getValue());
+ }
+ }
+
+ HttpResponse response = httpClient.send(
+ builder.build(),
+ HttpResponse.BodyHandlers.ofString());
+
+ int statusCode = response.statusCode();
+ if (statusCode >= 200 && statusCode < 300) {
+ log.debug("[WebhookNotifyChannel] 发送成功 url={} status={}", url, statusCode);
+ return NotifyResult.success(getName());
+ } else {
+ String msg = "HTTP " + statusCode + ": " + response.body();
+ log.warn("[WebhookNotifyChannel] 发送失败 url={} {}", url, msg);
+ return NotifyResult.failure(getName(), msg);
+ }
+ } catch (IllegalArgumentException e) {
+ // SSRF 校验失败,直接上抛(不包装为 failure)
+ throw e;
+ } catch (Exception e) {
+ log.error("[WebhookNotifyChannel] 发送异常: {}", e.getMessage(), e);
+ return NotifyResult.failure(getName(), e.getMessage());
+ }
+ }
+
+ /**
+ * SSRF 防护:URL 必须 HTTPS;若配置了白名单则主机必须在名单内。
+ *
+ * @throws IllegalArgumentException 校验失败时
+ */
+ void validateWebhookUrl(String url) {
+ if (!StringUtils.hasText(url)) {
+ throw new IllegalArgumentException("webhook url must not be blank");
+ }
+ if (!url.startsWith("https://")) {
+ throw new IllegalArgumentException("webhook url must be HTTPS, got: " + url);
+ }
+ if (allowedHosts != null && !allowedHosts.isEmpty()) {
+ URI uri = URI.create(url);
+ String host = uri.getHost();
+ boolean allowed = allowedHosts.stream().anyMatch(h -> h.equalsIgnoreCase(host));
+ if (!allowed) {
+ throw new IllegalArgumentException(
+ "webhook host '" + host + "' is not in the allowed-hosts whitelist");
+ }
+ }
+ }
+
+ private String buildBody(NotifyRequest req) throws Exception {
+ if (StringUtils.hasText(req.getWebhookBody())) {
+ return req.getWebhookBody();
+ }
+ // 默认 body:包含 title + body
+ Map payload = Map.of(
+ "title", req.getTitle() != null ? req.getTitle() : "",
+ "body", req.getBody() != null ? req.getBody() : "");
+ return objectMapper.writeValueAsString(payload);
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/model/NotifyRequest.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/model/NotifyRequest.java
new file mode 100644
index 00000000..83712d58
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/model/NotifyRequest.java
@@ -0,0 +1,72 @@
+package com.viewsh.module.iot.rule.notify.model;
+
+import lombok.Builder;
+import lombok.Data;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 通知请求(B16 通知集成)。
+ *
+ * 包含已解析的模板内容 + 各通道所需参数。
+ * 模板变量由 NotifyDispatcher 在调用各通道前统一解析完成。
+ */
+@Data
+@Builder
+public class NotifyRequest {
+
+ /** 通道名称(sms / email / in_app / webhook) */
+ private String channel;
+
+ // ---------- 通用字段(所有通道共用) ----------
+
+ /** 通知标题(已解析模板变量) */
+ private String title;
+
+ /** 通知正文(已解析模板变量) */
+ private String body;
+
+ // ---------- 用户指向(sms / email / in_app) ----------
+
+ /** Admin 用户 ID 列表 */
+ private List adminUserIds;
+
+ /** Member 用户 ID 列表 */
+ private List memberUserIds;
+
+ // ---------- SMS 特有 ----------
+
+ /** 短信模板编号(短信走模板,必填) */
+ private String smsTemplateCode;
+
+ /** 短信模板参数 */
+ private Map smsTemplateParams;
+
+ // ---------- Email 特有 ----------
+
+ /** 邮件模板编号 */
+ private String mailTemplateCode;
+
+ /** 邮件模板参数 */
+ private Map mailTemplateParams;
+
+ // ---------- InApp 特有 ----------
+
+ /** 站内信模板编号 */
+ private String inAppTemplateCode;
+
+ /** 站内信模板参数 */
+ private Map inAppTemplateParams;
+
+ // ---------- Webhook 特有 ----------
+
+ /** Webhook 目标 URL(必须 HTTPS) */
+ private String webhookUrl;
+
+ /** Webhook 自定义请求头 */
+ private Map webhookHeaders;
+
+ /** Webhook 请求体(JSON 文本,缺省时用 title+body 构造) */
+ private String webhookBody;
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/model/NotifyResult.java b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/model/NotifyResult.java
new file mode 100644
index 00000000..d0cbd7a3
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/main/java/com/viewsh/module/iot/rule/notify/model/NotifyResult.java
@@ -0,0 +1,40 @@
+package com.viewsh.module.iot.rule.notify.model;
+
+import lombok.Data;
+
+/**
+ * 单通道发送结果(B16 通知集成)。
+ */
+@Data
+public class NotifyResult {
+
+ /** 通道名称(sms / email / in_app / webhook) */
+ private final String channel;
+
+ /** 是否成功 */
+ private final boolean success;
+
+ /** 错误信息(成功时为 null) */
+ private final String errorMessage;
+
+ private NotifyResult(String channel, boolean success, String errorMessage) {
+ this.channel = channel;
+ this.success = success;
+ this.errorMessage = errorMessage;
+ }
+
+ public static NotifyResult success(String channel) {
+ return new NotifyResult(channel, true, null);
+ }
+
+ public static NotifyResult failure(String channel, String errorMessage) {
+ return new NotifyResult(channel, false, errorMessage);
+ }
+
+ @Override
+ public String toString() {
+ return success
+ ? channel + ":OK"
+ : channel + ":FAIL(" + errorMessage + ")";
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/notify/NotifyDispatcherTest.java b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/notify/NotifyDispatcherTest.java
new file mode 100644
index 00000000..4232416e
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/notify/NotifyDispatcherTest.java
@@ -0,0 +1,265 @@
+package com.viewsh.module.iot.rule.notify;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.viewsh.module.iot.rule.engine.RuleContext;
+import com.viewsh.module.iot.rule.notify.channel.NotifyChannel;
+import com.viewsh.module.iot.rule.notify.model.NotifyRequest;
+import com.viewsh.module.iot.rule.notify.model.NotifyResult;
+import com.viewsh.module.iot.rule.template.TemplateResolver;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+/**
+ * B16 NotifyDispatcher 单元测试。
+ *
+ * 覆盖:
+ *
+ * - dispatchAll:4 通道全 mock 成功 → 全部 success
+ * - dispatchPartialFail:sms 通道抛异常 → sms failure,其他 success
+ * - webhook_ssrf:传 http:// URL → 抛 IllegalArgumentException(SSRF 阻断)
+ * - template_resolution:title 含模板变量 ${meta.deviceName} → 正确解析
+ * - template_missing:body 含 ${data.unknown} → 降级为空字符串不抛错
+ *
+ */
+class NotifyDispatcherTest {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private NotifyChannel smsChannel;
+ private NotifyChannel emailChannel;
+ private NotifyChannel inAppChannel;
+ private NotifyChannel webhookChannel;
+
+ private NotifyDispatcher dispatcher;
+
+ @BeforeEach
+ void setUp() {
+ smsChannel = mockChannel("sms");
+ emailChannel = mockChannel("email");
+ inAppChannel = mockChannel("in_app");
+ webhookChannel = mockChannel("webhook");
+
+ TemplateResolver templateResolver = new TemplateResolver(MAPPER);
+ dispatcher = new NotifyDispatcher(
+ List.of(smsChannel, emailChannel, inAppChannel, webhookChannel),
+ templateResolver);
+ }
+
+ // ======================== dispatchAll ========================
+
+ @Test
+ void dispatchAll_fourChannelsEnabled_allSuccess() throws Exception {
+ when(smsChannel.send(any())).thenReturn(NotifyResult.success("sms"));
+ when(emailChannel.send(any())).thenReturn(NotifyResult.success("email"));
+ when(inAppChannel.send(any())).thenReturn(NotifyResult.success("in_app"));
+ when(webhookChannel.send(any())).thenReturn(NotifyResult.success("webhook"));
+
+ JsonNode config = MAPPER.readTree("""
+ {
+ "channels": ["sms","email","in_app","webhook"],
+ "receivers": {
+ "adminUserIds": [1001],
+ "webhookUrl": "https://hook.example.com"
+ },
+ "template": {"title": "告警标题", "body": "告警内容"}
+ }
+ """);
+ RuleContext ctx = ctx(1L);
+
+ List results = dispatcher.dispatch(config, ctx);
+
+ assertEquals(4, results.size());
+ assertTrue(results.stream().allMatch(NotifyResult::isSuccess));
+
+ // 验证每个通道都被调用了一次
+ verify(smsChannel).send(any());
+ verify(emailChannel).send(any());
+ verify(inAppChannel).send(any());
+ verify(webhookChannel).send(any());
+ }
+
+ // ======================== dispatchPartialFail ========================
+
+ @Test
+ void dispatchPartialFail_smsThrows_otherChannelsStillSuccess() throws Exception {
+ when(smsChannel.send(any())).thenThrow(new RuntimeException("sms network error"));
+ when(emailChannel.send(any())).thenReturn(NotifyResult.success("email"));
+ when(inAppChannel.send(any())).thenReturn(NotifyResult.success("in_app"));
+ when(webhookChannel.send(any())).thenReturn(NotifyResult.success("webhook"));
+
+ JsonNode config = MAPPER.readTree("""
+ {
+ "channels": ["sms","email","in_app","webhook"],
+ "receivers": {"adminUserIds": [1001], "webhookUrl": "https://hook.example.com"},
+ "template": {"title": "T", "body": "B"}
+ }
+ """);
+ RuleContext ctx = ctx(2L);
+
+ List results = dispatcher.dispatch(config, ctx);
+
+ assertEquals(4, results.size());
+
+ NotifyResult smsResult = results.stream().filter(r -> "sms".equals(r.getChannel())).findFirst().orElseThrow();
+ assertFalse(smsResult.isSuccess());
+ assertTrue(smsResult.getErrorMessage().contains("sms network error"));
+
+ results.stream()
+ .filter(r -> !"sms".equals(r.getChannel()))
+ .forEach(r -> assertTrue(r.isSuccess(), "Expected success for channel: " + r.getChannel()));
+ }
+
+ // ======================== template_resolution ========================
+
+ @Test
+ void templateResolution_metaDeviceName_resolvedInRequest() throws Exception {
+ // Capture the request passed to emailChannel
+ when(emailChannel.send(any())).thenAnswer(invocation -> {
+ NotifyRequest req = invocation.getArgument(0);
+ // title 应包含解析后的值而非原始 ${meta.deviceName}
+ assertTrue(req.getTitle().contains("传感器A"),
+ "Expected 传感器A in title, got: " + req.getTitle());
+ assertFalse(req.getTitle().contains("${"),
+ "Template variable should be resolved, got: " + req.getTitle());
+ return NotifyResult.success("email");
+ });
+
+ JsonNode config = MAPPER.readTree("""
+ {
+ "channels": ["email"],
+ "receivers": {"adminUserIds": [1001]},
+ "template": {"title": "设备 ${meta.deviceName} 告警", "body": "B"}
+ }
+ """);
+ RuleContext ctx = ctx(3L);
+ ctx.getMetadata().put("deviceName", "传感器A");
+
+ List results = dispatcher.dispatch(config, ctx);
+
+ assertEquals(1, results.size());
+ assertTrue(results.get(0).isSuccess());
+ }
+
+ // ======================== template_missing ========================
+
+ @Test
+ void templateMissing_unknownVariable_degradesToEmptyString() throws Exception {
+ when(smsChannel.send(any())).thenAnswer(invocation -> {
+ NotifyRequest req = invocation.getArgument(0);
+ // body 中 ${data.unknown} 应被解析为空字符串(降级),不抛异常
+ assertNotNull(req.getBody());
+ assertFalse(req.getBody().contains("${"),
+ "Unknown variable should be replaced with empty string, got: " + req.getBody());
+ return NotifyResult.success("sms");
+ });
+
+ JsonNode config = MAPPER.readTree("""
+ {
+ "channels": ["sms"],
+ "receivers": {"adminUserIds": [1001]},
+ "template": {"title": "告警", "body": "值: ${data.unknown}"}
+ }
+ """);
+ RuleContext ctx = ctx(4L);
+ // 不设置任何 message/data,unknown 字段不存在
+
+ List results = dispatcher.dispatch(config, ctx);
+
+ assertEquals(1, results.size());
+ assertTrue(results.get(0).isSuccess());
+ }
+
+ // ======================== webhook_ssrf ========================
+
+ @Test
+ void webhookSsrf_httpUrl_throwsIllegalArgument() throws Exception {
+ // WebhookNotifyChannel 遇到 http:// 会抛 IllegalArgumentException
+ // dispatcher.sendSafely() 会捕获并转为 NotifyResult.failure
+ when(webhookChannel.send(any())).thenThrow(
+ new IllegalArgumentException("webhook url must be HTTPS, got: http://internal/admin"));
+
+ JsonNode config = MAPPER.readTree("""
+ {
+ "channels": ["webhook"],
+ "receivers": {"webhookUrl": "http://internal/admin"},
+ "template": {"title": "T", "body": "B"}
+ }
+ """);
+ RuleContext ctx = ctx(5L);
+
+ List results = dispatcher.dispatch(config, ctx);
+
+ assertEquals(1, results.size());
+ assertFalse(results.get(0).isSuccess());
+ assertTrue(results.get(0).getErrorMessage().contains("HTTPS")
+ || results.get(0).getErrorMessage() != null);
+ }
+
+ // ======================== emptyChannels ========================
+
+ @Test
+ void emptyChannels_returnsEmptyList() throws Exception {
+ JsonNode config = MAPPER.readTree("""
+ {
+ "channels": [],
+ "template": {"title": "T", "body": "B"}
+ }
+ """);
+ RuleContext ctx = ctx(6L);
+
+ List results = dispatcher.dispatch(config, ctx);
+
+ assertTrue(results.isEmpty());
+ // channels.send() must not be called (getName() is called in dispatcher constructor but that's ok)
+ verify(smsChannel, never()).send(any());
+ verify(emailChannel, never()).send(any());
+ verify(inAppChannel, never()).send(any());
+ verify(webhookChannel, never()).send(any());
+ }
+
+ // ======================== unknownChannel ========================
+
+ @Test
+ void unknownChannel_returnsFailureResult() throws Exception {
+ JsonNode config = MAPPER.readTree("""
+ {
+ "channels": ["telegram"],
+ "template": {"title": "T", "body": "B"}
+ }
+ """);
+ RuleContext ctx = ctx(7L);
+
+ List results = dispatcher.dispatch(config, ctx);
+
+ assertEquals(1, results.size());
+ assertFalse(results.get(0).isSuccess());
+ assertTrue(results.get(0).getErrorMessage().contains("telegram"));
+ }
+
+ // ======================== Helper ========================
+
+ private static NotifyChannel mockChannel(String name) {
+ NotifyChannel ch = mock(NotifyChannel.class);
+ when(ch.getName()).thenReturn(name);
+ return ch;
+ }
+
+ private static RuleContext ctx(Long chainId) {
+ RuleContext ctx = new RuleContext();
+ ctx.setChainId(chainId);
+ ctx.setDeviceId(10L);
+ ctx.setProductId(100L);
+ ctx.setTenantId(1L);
+ ctx.setSubsystemId(1L);
+ ctx.setStartedAt(Instant.now());
+ return ctx;
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/notify/channel/WebhookNotifyChannelTest.java b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/notify/channel/WebhookNotifyChannelTest.java
new file mode 100644
index 00000000..25d5f25c
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-rule/src/test/java/com/viewsh/module/iot/rule/notify/channel/WebhookNotifyChannelTest.java
@@ -0,0 +1,56 @@
+package com.viewsh.module.iot.rule.notify.channel;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * B16 WebhookNotifyChannel SSRF 校验单元测试。
+ */
+class WebhookNotifyChannelTest {
+
+ private WebhookNotifyChannel channel;
+
+ @BeforeEach
+ void setUp() {
+ channel = new WebhookNotifyChannel(new ObjectMapper());
+ }
+
+ @Test
+ void validateUrl_httpsOk_noException() {
+ assertDoesNotThrow(() -> channel.validateWebhookUrl("https://hook.example.com/alert"));
+ }
+
+ @Test
+ void validateUrl_http_throwsIllegalArgument() {
+ IllegalArgumentException ex = assertThrows(
+ IllegalArgumentException.class,
+ () -> channel.validateWebhookUrl("http://hook.example.com/alert"));
+ assertTrue(ex.getMessage().contains("HTTPS"), "Expected HTTPS in message: " + ex.getMessage());
+ }
+
+ @Test
+ void validateUrl_localhost_throwsIllegalArgument() {
+ assertThrows(IllegalArgumentException.class,
+ () -> channel.validateWebhookUrl("http://localhost/admin"));
+ }
+
+ @Test
+ void validateUrl_blank_throwsIllegalArgument() {
+ assertThrows(IllegalArgumentException.class,
+ () -> channel.validateWebhookUrl(""));
+ }
+
+ @Test
+ void validateUrl_null_throwsIllegalArgument() {
+ assertThrows(IllegalArgumentException.class,
+ () -> channel.validateWebhookUrl(null));
+ }
+
+ @Test
+ void getName_returnsWebhook() {
+ assertEquals("webhook", channel.getName());
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotCleanRuleMessageHandler.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotCleanRuleMessageHandler.java
index d335053a..6e598699 100644
--- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotCleanRuleMessageHandler.java
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotCleanRuleMessageHandler.java
@@ -5,6 +5,9 @@ import com.viewsh.framework.tenant.core.util.TenantUtils;
import com.viewsh.module.iot.core.messagebus.core.IotMessageBus;
import com.viewsh.module.iot.core.messagebus.core.IotMessageSubscriber;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
+import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
+import com.viewsh.module.iot.rule.config.IotRuleEngineVersionResolver;
+import com.viewsh.module.iot.service.device.IotDeviceService;
import com.viewsh.module.iot.service.rule.clean.CleanRuleProcessorManager;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
@@ -33,6 +36,12 @@ public class IotCleanRuleMessageHandler implements IotMessageSubscriber {
ProjectUtils.execute(message.getProjectId(), () -> {
try {
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotDataRuleMessageHandler.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotDataRuleMessageHandler.java
index 041099c6..abf51f39 100644
--- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotDataRuleMessageHandler.java
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotDataRuleMessageHandler.java
@@ -4,6 +4,9 @@ import com.viewsh.framework.tenant.core.util.TenantUtils;
import com.viewsh.module.iot.core.messagebus.core.IotMessageBus;
import com.viewsh.module.iot.core.messagebus.core.IotMessageSubscriber;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
+import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
+import com.viewsh.module.iot.rule.config.IotRuleEngineVersionResolver;
+import com.viewsh.module.iot.service.device.IotDeviceService;
import com.viewsh.module.iot.service.rule.data.IotDataRuleService;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
@@ -13,6 +16,9 @@ import org.springframework.stereotype.Component;
/**
* 针对 {@link IotDeviceMessage} 的消费者,处理数据流转
*
+ * ⚠️ B9 改造:在 onMessage 最前面增加 v2 路由判断,若子系统已切换到 v2,
+ * 则跳过本消费者,由 {@link IotRuleEngineMessageHandler} 统一处理,避免双跑。
+ *
* @author 芋道源码
*/
@Component
@@ -25,6 +31,12 @@ public class IotDataRuleMessageHandler implements IotMessageSubscriber dataRuleService.executeDataRule(message));
}
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotRuleEngineMessageHandler.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotRuleEngineMessageHandler.java
new file mode 100644
index 00000000..1f0b7756
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotRuleEngineMessageHandler.java
@@ -0,0 +1,98 @@
+package com.viewsh.module.iot.mq.consumer.rule;
+
+import com.viewsh.framework.tenant.core.util.TenantUtils;
+import com.viewsh.module.iot.core.messagebus.core.IotMessageBus;
+import com.viewsh.module.iot.core.messagebus.core.IotMessageSubscriber;
+import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
+import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
+import com.viewsh.module.iot.rule.config.IotRuleEngineVersionResolver;
+import com.viewsh.module.iot.rule.engine.RuleEngine;
+import com.viewsh.module.iot.rule.engine.RuleEngineResult;
+import com.viewsh.module.iot.service.device.IotDeviceService;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+/**
+ * v2 规则引擎统一消费入口(B9)
+ *
+ * 与 v1 的三个消费者(DataRule / SceneRule / CleanRule)并存。
+ * 按子系统灰度:仅当 {@link IotRuleEngineVersionResolver#shouldUseV2} 返回 {@code true} 时,
+ * 本 Handler 才处理消息;v1 消费者在同样的判断下跳过,避免双跑。
+ *
+ *
⚠️ Known Pitfalls(评审 B11):
+ *
+ * - 三态(V1 / V2 / HYBRID)由 versionResolver 统一决策,本类不做版本判断逻辑
+ * - device 为 null(已删除设备):log WARN + return,不抛异常
+ * - RuleEngine.execute() 异常:try-catch 吞掉 + log.error,不向上抛(防重试风暴)
+ *
+ *
+ * @author lzh
+ */
+@Component
+@Slf4j
+public class IotRuleEngineMessageHandler implements IotMessageSubscriber {
+
+ @Resource
+ private IotRuleEngineVersionResolver versionResolver;
+
+ @Resource
+ private RuleEngine ruleEngine;
+
+ @Resource
+ private IotDeviceService deviceService;
+
+ @Resource
+ private IotMessageBus messageBus;
+
+ @PostConstruct
+ public void init() {
+ messageBus.register(this);
+ }
+
+ @Override
+ public String getTopic() {
+ return IotDeviceMessage.MESSAGE_BUS_DEVICE_MESSAGE_TOPIC;
+ }
+
+ @Override
+ public String getGroup() {
+ return "iot_rule_engine_v2_consumer";
+ }
+
+ @Override
+ public void onMessage(IotDeviceMessage message) {
+ // 1. 取设备基本信息(cache,O(1),不查 DB)
+ IotDeviceDO device = deviceService.getDeviceFromCache(message.getDeviceId());
+ if (device == null) {
+ log.warn("[RuleEngineHandler] device not found, skip. deviceId={}", message.getDeviceId());
+ return;
+ }
+
+ // 2. 判断本消息是否走 v2;v1 的消费者同样判断并跳过,保证绝不双跑
+ if (!versionResolver.shouldUseV2(device.getSubsystemId(), device.getTenantId())) {
+ return; // 由 v1 消费者处理
+ }
+
+ // 3. 走 v2 规则引擎
+ TenantUtils.execute(device.getTenantId(), () -> {
+ try {
+ RuleEngineResult result = ruleEngine.execute(
+ message,
+ device.getTenantId(),
+ device.getSubsystemId(),
+ device.getProductId(),
+ device.getId());
+ if (result.hasFailure()) {
+ log.warn("[RuleEngineHandler] partial failures deviceId={} failureCount={}",
+ message.getDeviceId(), result.getFailures().size());
+ }
+ } catch (Exception e) {
+ // ⚠️ 不向上抛出,避免消息总线重试风暴
+ log.error("[RuleEngineHandler] execute failed deviceId={}", message.getDeviceId(), e);
+ }
+ });
+ }
+
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotSceneRuleMessageHandler.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotSceneRuleMessageHandler.java
index 8b612888..50bcae86 100644
--- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotSceneRuleMessageHandler.java
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotSceneRuleMessageHandler.java
@@ -4,12 +4,21 @@ import com.viewsh.framework.tenant.core.util.TenantUtils;
import com.viewsh.module.iot.core.messagebus.core.IotMessageBus;
import com.viewsh.module.iot.core.messagebus.core.IotMessageSubscriber;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
+import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
+import com.viewsh.module.iot.rule.config.IotRuleEngineVersionResolver;
+import com.viewsh.module.iot.service.device.IotDeviceService;
import com.viewsh.module.iot.service.rule.scene.IotSceneRuleService;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
+/**
+ * 针对 {@link IotDeviceMessage} 的消费者,处理场景规则
+ *
+ * ⚠️ B9 改造:在 onMessage 最前面增加 v2 路由判断,若子系统已切换到 v2,
+ * 则跳过本消费者,由 {@link IotRuleEngineMessageHandler} 统一处理,避免双跑。
+ */
@Component
@Slf4j
public class IotSceneRuleMessageHandler implements IotMessageSubscriber {
@@ -20,6 +29,12 @@ public class IotSceneRuleMessageHandler implements IotMessageSubscriber sceneRuleService.executeSceneRuleByDevice(message));
}
}
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/AlarmCacheState.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/AlarmCacheState.java
new file mode 100644
index 00000000..6aa6ebb1
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/AlarmCacheState.java
@@ -0,0 +1,52 @@
+package com.viewsh.module.iot.service.alarm;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * 告警缓存状态值对象(从 Redis Hash 反序列化)
+ *
+ * 字段与 Redis Hash field 一一对应:
+ *
+ * ack_state → ackState
+ * clear_state → clearState
+ * archived → archived
+ * severity → severity
+ * alarm_time → alarmTime (epoch milli)
+ * clear_time → clearTime (epoch milli)
+ *
+ *
+ * @author B14
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class AlarmCacheState {
+
+ /** 确认状态:0 未确认 / 1 已确认 */
+ private int ackState;
+
+ /** 清除状态:0 活跃 / 1 已清除 */
+ private int clearState;
+
+ /** 归档:0 未归档 / 1 已归档 */
+ private int archived;
+
+ /** 严重度 1-5 */
+ private int severity;
+
+ /**
+ * 最近一次触发时间(epoch millis)
+ * 对应 DB 中 {@code end_ts}(最近触发时间)
+ */
+ private long alarmTime;
+
+ /**
+ * 清除时间(epoch millis),clearState=0 时为 0
+ */
+ private long clearTime;
+
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/AlarmStateValidator.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/AlarmStateValidator.java
new file mode 100644
index 00000000..de7262e2
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/AlarmStateValidator.java
@@ -0,0 +1,71 @@
+package com.viewsh.module.iot.service.alarm;
+
+import org.springframework.stereotype.Component;
+
+/**
+ * 告警状态时序校验器
+ *
+ * 防止旧消息(延迟到达的消息)误触发已清除告警或重置活跃告警。
+ *
+ *
设计依据(评审 §6.2):
+ *
+ * - 有效性判断基于 消息时间戳,不基于状态比对
+ * - 缓存是最终一致的,但时间戳判断不受缓存脏读影响
+ *
+ *
+ * @author B14
+ */
+@Component
+public class AlarmStateValidator {
+
+ /**
+ * 判断触发消息是否有效。
+ *
+ * 规则:若告警已清除(clearState=1),则消息时间戳必须在最近 clear_time 之后,
+ * 否则认为是已清除后延迟到达的旧触发消息,直接忽略。
+ *
+ *
若告警未清除(活跃),任何触发消息都视为有效(累积 trigger_count)。
+ *
+ * @param cache 当前缓存状态(可为 null,表示首次触发)
+ * @param msgTs 消息时间戳(epoch millis)
+ * @return true 表示有效,应更新状态;false 表示旧消息,应忽略
+ */
+ public boolean isEffectiveTrigger(AlarmCacheState cache, long msgTs) {
+ if (cache == null) {
+ // 首次触发,无缓存,直接有效
+ return true;
+ }
+ if (cache.getClearState() == 1) {
+ // 告警已清除:仅接受 clearTime 之后的触发(重新激活场景)
+ return msgTs > cache.getClearTime();
+ }
+ // 活跃告警,所有触发都有效
+ return true;
+ }
+
+ /**
+ * 判断清除消息是否有效。
+ *
+ *
规则:
+ *
+ * - 告警已清除(clearState=1)→ 忽略(幂等)
+ * - 消息时间戳必须在 alarm_time 之后(过期的清除消息不生效)
+ *
+ *
+ * @param cache 当前缓存状态(可为 null,表示告警不存在,视为无效)
+ * @param msgTs 消息时间戳(epoch millis)
+ * @return true 表示有效,应执行清除;false 表示无效,应忽略
+ */
+ public boolean isEffectiveClear(AlarmCacheState cache, long msgTs) {
+ if (cache == null) {
+ return false;
+ }
+ // 已清除 → 幂等忽略
+ if (cache.getClearState() == 1) {
+ return false;
+ }
+ // 消息时间戳必须晚于最近触发时间
+ return msgTs > cache.getAlarmTime();
+ }
+
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/IotAlarmCacheService.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/IotAlarmCacheService.java
new file mode 100644
index 00000000..49105872
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/alarm/IotAlarmCacheService.java
@@ -0,0 +1,198 @@
+package com.viewsh.module.iot.service.alarm;
+
+import com.viewsh.module.iot.dal.dataobject.alarm.IotAlarmRecordDO;
+import com.viewsh.module.iot.dal.mysql.alarm.IotAlarmRecordMapper;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.stereotype.Service;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 告警状态 Redis Hash 缓存 Service
+ *
+ * Redis Key: {@code iot:alarm:state:{recordId}} (Hash)
+ *
+ * fields:
+ * ack_state — 0/1
+ * clear_state — 0/1
+ * archived — 0/1
+ * severity — 1-5
+ * alarm_time — epoch millis(最近触发时间)
+ * clear_time — epoch millis(清除时间,未清除时为 0)
+ * TTL: 7 天,每次 write 时 expire 刷新
+ *
+ *
+ * 设计原则:
+ *
+ * - 读取时 cache miss → 从 DB 重建(按需加载,不做启动预热)
+ * - DB 写后立即更新缓存(在分布式锁保护下,不存在并发脏写)
+ * - 缓存失效后 fallback 走 DB,性能下降但不故障
+ *
+ *
+ * @author B14
+ */
+@Slf4j
+@Service
+public class IotAlarmCacheService {
+
+ /** 缓存 key 前缀 */
+ private static final String KEY_PREFIX = "iot:alarm:state:";
+
+ /** TTL 7 天 */
+ private static final long TTL_DAYS = 7L;
+
+ /** Hash field 名常量 */
+ private static final String F_ACK_STATE = "ack_state";
+ private static final String F_CLEAR_STATE = "clear_state";
+ private static final String F_ARCHIVED = "archived";
+ private static final String F_SEVERITY = "severity";
+ private static final String F_ALARM_TIME = "alarm_time";
+ private static final String F_CLEAR_TIME = "clear_time";
+
+ @Resource
+ private StringRedisTemplate stringRedisTemplate;
+
+ @Resource
+ private IotAlarmRecordMapper alarmRecordMapper;
+
+ // ==================== 读取 ====================
+
+ /**
+ * 获取告警缓存状态。
+ * 若 cache miss,从 DB 重建并写入缓存后返回;DB 也不存在则返回 {@code null}。
+ *
+ * @param recordId 告警记录 ID
+ * @return AlarmCacheState,或 null(DB 中不存在该记录)
+ */
+ public AlarmCacheState get(Long recordId) {
+ String key = buildKey(recordId);
+ Map