feat(iot): Wave 5 Round 2 — B9/B14/B16 统一消费入口 + 告警分布式锁 + 通知集成

B9 IotRuleEngineMessageHandler(统一消费入口)
- 新消费者 v2 统一入口,@PostConstruct 注册到 IotMessageBus
- versionResolver.shouldUseV2 三态路由(V1/V2/HYBRID),绝不双跑
- device null 时 WARN + skip;RuleEngine 异常 try-catch 吞掉防重试风暴
- v1 三消费者(DataRule/SceneRule/CleanRule)增加前置 v2 bypass 判断
- 6 个单元测试(global-v1/v2/hybrid 白名单命中/未命中/device-null/引擎异常)

B14 告警缓存 + SET NX PX 分布式锁 + 有效性判断
- IotAlarmLockService:SET NX PX + Lua 原子解锁,锁冲突抛 ALARM_LOCK_CONFLICT
- IotAlarmCacheService:Redis Hash iot:alarm:state:{id},TTL 7天,cache miss 从DB重建
- AlarmStateValidator:isEffectiveTrigger/isEffectiveClear 时序校验,防旧消息重置已清除告警
- IotAlarmRecordServiceImpl:trigger/clear/ack/archive 全部在锁内,DB写后立即同步缓存
- 新增 ALARM_LOCK_CONFLICT 错误码;AlarmTriggerRequest 增加 timestamp 字段
- 17 个单元测试(锁 4 + 缓存 5 + 校验 9 + 集成 3)

B16 NotifyAction 4 通道集成 + 模板解析
- NotifyChannel SPI 接口 + Sms/Email/InApp/Webhook 四实现(@Component 注册)
- WebhookNotifyChannel:JDK 17 HttpClient 10s 超时 + SSRF 强制 HTTPS 校验
- NotifyDispatcher:独立 ForkJoinPool(8) 并行分发,30s 整体超时,部分失败不阻塞
- 模板变量统一走 TemplateResolver(评审 C5),缺失变量降级为空串
- NotifyAction 移除 stub,委托 NotifyDispatcher
- viewsh-module-system-api 依赖引入;13 个测试(Dispatcher 7 + Webhook SSRF 6)

测试:iot-rule 177/177 全绿;iot-server 251/251 全绿(含 Skipped 161 旧 v1 测试)

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
lzh
2026-04-24 11:00:38 +08:00
parent 8e7631987f
commit 171f201384
28 changed files with 2559 additions and 171 deletions

View File

@@ -5,6 +5,9 @@ import com.viewsh.framework.tenant.core.util.TenantUtils;
import com.viewsh.module.iot.core.messagebus.core.IotMessageBus;
import com.viewsh.module.iot.core.messagebus.core.IotMessageSubscriber;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
import com.viewsh.module.iot.rule.config.IotRuleEngineVersionResolver;
import com.viewsh.module.iot.service.device.IotDeviceService;
import com.viewsh.module.iot.service.rule.clean.CleanRuleProcessorManager;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
@@ -33,6 +36,12 @@ public class IotCleanRuleMessageHandler implements IotMessageSubscriber<IotDevic
@Resource
private IotMessageBus messageBus;
@Resource
private IotDeviceService deviceService;
@Resource
private IotRuleEngineVersionResolver versionResolver;
@PostConstruct
public void init() {
messageBus.register(this);
@@ -50,6 +59,11 @@ public class IotCleanRuleMessageHandler implements IotMessageSubscriber<IotDevic
@Override
public void onMessage(IotDeviceMessage message) {
// ⚠️ B9v2 路由前置判断,避免与 IotRuleEngineMessageHandler 双跑
IotDeviceDO device = deviceService.getDeviceFromCache(message.getDeviceId());
if (device != null && versionResolver.shouldUseV2(device.getSubsystemId(), device.getTenantId())) {
return; // v2 处理,跳过 v1
}
TenantUtils.execute(message.getTenantId(), () -> {
ProjectUtils.execute(message.getProjectId(), () -> {
try {

View File

@@ -4,6 +4,9 @@ import com.viewsh.framework.tenant.core.util.TenantUtils;
import com.viewsh.module.iot.core.messagebus.core.IotMessageBus;
import com.viewsh.module.iot.core.messagebus.core.IotMessageSubscriber;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
import com.viewsh.module.iot.rule.config.IotRuleEngineVersionResolver;
import com.viewsh.module.iot.service.device.IotDeviceService;
import com.viewsh.module.iot.service.rule.data.IotDataRuleService;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
@@ -13,6 +16,9 @@ import org.springframework.stereotype.Component;
/**
* 针对 {@link IotDeviceMessage} 的消费者,处理数据流转
*
* <p>⚠️ B9 改造:在 onMessage 最前面增加 v2 路由判断,若子系统已切换到 v2
* 则跳过本消费者,由 {@link IotRuleEngineMessageHandler} 统一处理,避免双跑。
*
* @author 芋道源码
*/
@Component
@@ -25,6 +31,12 @@ public class IotDataRuleMessageHandler implements IotMessageSubscriber<IotDevice
@Resource
private IotMessageBus messageBus;
@Resource
private IotDeviceService deviceService;
@Resource
private IotRuleEngineVersionResolver versionResolver;
@PostConstruct
public void init() {
messageBus.register(this);
@@ -42,6 +54,11 @@ public class IotDataRuleMessageHandler implements IotMessageSubscriber<IotDevice
@Override
public void onMessage(IotDeviceMessage message) {
// ⚠️ B9v2 路由前置判断,避免与 IotRuleEngineMessageHandler 双跑
IotDeviceDO device = deviceService.getDeviceFromCache(message.getDeviceId());
if (device != null && versionResolver.shouldUseV2(device.getSubsystemId(), device.getTenantId())) {
return; // v2 处理,跳过 v1
}
TenantUtils.execute(message.getTenantId(), () -> dataRuleService.executeDataRule(message));
}

View File

@@ -0,0 +1,98 @@
package com.viewsh.module.iot.mq.consumer.rule;
import com.viewsh.framework.tenant.core.util.TenantUtils;
import com.viewsh.module.iot.core.messagebus.core.IotMessageBus;
import com.viewsh.module.iot.core.messagebus.core.IotMessageSubscriber;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
import com.viewsh.module.iot.rule.config.IotRuleEngineVersionResolver;
import com.viewsh.module.iot.rule.engine.RuleEngine;
import com.viewsh.module.iot.rule.engine.RuleEngineResult;
import com.viewsh.module.iot.service.device.IotDeviceService;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* v2 规则引擎统一消费入口B9
*
* <p>与 v1 的三个消费者DataRule / SceneRule / CleanRule并存。
* 按子系统灰度:仅当 {@link IotRuleEngineVersionResolver#shouldUseV2} 返回 {@code true} 时,
* 本 Handler 才处理消息v1 消费者在同样的判断下跳过,避免双跑。
*
* <p>⚠️ Known Pitfalls评审 B11
* <ul>
* <li>三态V1 / V2 / HYBRID由 versionResolver 统一决策,本类不做版本判断逻辑</li>
* <li>device 为 null已删除设备log WARN + return不抛异常</li>
* <li>RuleEngine.execute() 异常try-catch 吞掉 + log.error不向上抛防重试风暴</li>
* </ul>
*
* @author lzh
*/
@Component
@Slf4j
public class IotRuleEngineMessageHandler implements IotMessageSubscriber<IotDeviceMessage> {
@Resource
private IotRuleEngineVersionResolver versionResolver;
@Resource
private RuleEngine ruleEngine;
@Resource
private IotDeviceService deviceService;
@Resource
private IotMessageBus messageBus;
@PostConstruct
public void init() {
messageBus.register(this);
}
@Override
public String getTopic() {
return IotDeviceMessage.MESSAGE_BUS_DEVICE_MESSAGE_TOPIC;
}
@Override
public String getGroup() {
return "iot_rule_engine_v2_consumer";
}
@Override
public void onMessage(IotDeviceMessage message) {
// 1. 取设备基本信息cacheO(1),不查 DB
IotDeviceDO device = deviceService.getDeviceFromCache(message.getDeviceId());
if (device == null) {
log.warn("[RuleEngineHandler] device not found, skip. deviceId={}", message.getDeviceId());
return;
}
// 2. 判断本消息是否走 v2v1 的消费者同样判断并跳过,保证绝不双跑
if (!versionResolver.shouldUseV2(device.getSubsystemId(), device.getTenantId())) {
return; // 由 v1 消费者处理
}
// 3. 走 v2 规则引擎
TenantUtils.execute(device.getTenantId(), () -> {
try {
RuleEngineResult result = ruleEngine.execute(
message,
device.getTenantId(),
device.getSubsystemId(),
device.getProductId(),
device.getId());
if (result.hasFailure()) {
log.warn("[RuleEngineHandler] partial failures deviceId={} failureCount={}",
message.getDeviceId(), result.getFailures().size());
}
} catch (Exception e) {
// ⚠️ 不向上抛出,避免消息总线重试风暴
log.error("[RuleEngineHandler] execute failed deviceId={}", message.getDeviceId(), e);
}
});
}
}

View File

@@ -4,12 +4,21 @@ import com.viewsh.framework.tenant.core.util.TenantUtils;
import com.viewsh.module.iot.core.messagebus.core.IotMessageBus;
import com.viewsh.module.iot.core.messagebus.core.IotMessageSubscriber;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
import com.viewsh.module.iot.rule.config.IotRuleEngineVersionResolver;
import com.viewsh.module.iot.service.device.IotDeviceService;
import com.viewsh.module.iot.service.rule.scene.IotSceneRuleService;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 针对 {@link IotDeviceMessage} 的消费者,处理场景规则
*
* <p>⚠️ B9 改造:在 onMessage 最前面增加 v2 路由判断,若子系统已切换到 v2
* 则跳过本消费者,由 {@link IotRuleEngineMessageHandler} 统一处理,避免双跑。
*/
@Component
@Slf4j
public class IotSceneRuleMessageHandler implements IotMessageSubscriber<IotDeviceMessage> {
@@ -20,6 +29,12 @@ public class IotSceneRuleMessageHandler implements IotMessageSubscriber<IotDevic
@Resource
private IotMessageBus messageBus;
@Resource
private IotDeviceService deviceService;
@Resource
private IotRuleEngineVersionResolver versionResolver;
@PostConstruct
public void init() {
messageBus.register(this);
@@ -37,7 +52,12 @@ public class IotSceneRuleMessageHandler implements IotMessageSubscriber<IotDevic
@Override
public void onMessage(IotDeviceMessage message) {
log.info("[onMessage][娑堟伅鍐呭({})]", message);
// ⚠️ B9v2 路由前置判断,避免与 IotRuleEngineMessageHandler 双跑
IotDeviceDO device = deviceService.getDeviceFromCache(message.getDeviceId());
if (device != null && versionResolver.shouldUseV2(device.getSubsystemId(), device.getTenantId())) {
return; // v2 处理,跳过 v1
}
log.info("[onMessage][消息内容({})]", message);
TenantUtils.execute(message.getTenantId(), () -> sceneRuleService.executeSceneRuleByDevice(message));
}
}

View File

@@ -0,0 +1,52 @@
package com.viewsh.module.iot.service.alarm;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 告警缓存状态值对象(从 Redis Hash 反序列化)
*
* <p>字段与 Redis Hash field 一一对应:
* <pre>
* ack_state → ackState
* clear_state → clearState
* archived → archived
* severity → severity
* alarm_time → alarmTime (epoch milli)
* clear_time → clearTime (epoch milli)
* </pre>
*
* @author B14
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AlarmCacheState {
/** 确认状态0 未确认 / 1 已确认 */
private int ackState;
/** 清除状态0 活跃 / 1 已清除 */
private int clearState;
/** 归档0 未归档 / 1 已归档 */
private int archived;
/** 严重度 1-5 */
private int severity;
/**
* 最近一次触发时间epoch millis
* <p>对应 DB 中 {@code end_ts}(最近触发时间)</p>
*/
private long alarmTime;
/**
* 清除时间epoch millisclearState=0 时为 0
*/
private long clearTime;
}

View File

@@ -0,0 +1,71 @@
package com.viewsh.module.iot.service.alarm;
import org.springframework.stereotype.Component;
/**
* 告警状态时序校验器
*
* <p>防止旧消息(延迟到达的消息)误触发已清除告警或重置活跃告警。
*
* <p>设计依据(评审 §6.2
* <ul>
* <li>有效性判断基于 <b>消息时间戳</b>,不基于状态比对</li>
* <li>缓存是最终一致的,但时间戳判断不受缓存脏读影响</li>
* </ul>
*
* @author B14
*/
@Component
public class AlarmStateValidator {
/**
* 判断触发消息是否有效。
*
* <p>规则若告警已清除clearState=1则消息时间戳必须在最近 clear_time 之后,
* 否则认为是已清除后延迟到达的旧触发消息,直接忽略。
*
* <p>若告警未清除(活跃),任何触发消息都视为有效(累积 trigger_count
*
* @param cache 当前缓存状态(可为 null表示首次触发
* @param msgTs 消息时间戳epoch millis
* @return true 表示有效应更新状态false 表示旧消息,应忽略
*/
public boolean isEffectiveTrigger(AlarmCacheState cache, long msgTs) {
if (cache == null) {
// 首次触发,无缓存,直接有效
return true;
}
if (cache.getClearState() == 1) {
// 告警已清除:仅接受 clearTime 之后的触发(重新激活场景)
return msgTs > cache.getClearTime();
}
// 活跃告警,所有触发都有效
return true;
}
/**
* 判断清除消息是否有效。
*
* <p>规则:
* <ol>
* <li>告警已清除clearState=1→ 忽略(幂等)</li>
* <li>消息时间戳必须在 alarm_time 之后(过期的清除消息不生效)</li>
* </ol>
*
* @param cache 当前缓存状态(可为 null表示告警不存在视为无效
* @param msgTs 消息时间戳epoch millis
* @return true 表示有效应执行清除false 表示无效,应忽略
*/
public boolean isEffectiveClear(AlarmCacheState cache, long msgTs) {
if (cache == null) {
return false;
}
// 已清除 → 幂等忽略
if (cache.getClearState() == 1) {
return false;
}
// 消息时间戳必须晚于最近触发时间
return msgTs > cache.getAlarmTime();
}
}

View File

@@ -0,0 +1,198 @@
package com.viewsh.module.iot.service.alarm;
import com.viewsh.module.iot.dal.dataobject.alarm.IotAlarmRecordDO;
import com.viewsh.module.iot.dal.mysql.alarm.IotAlarmRecordMapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 告警状态 Redis Hash 缓存 Service
*
* <p>Redis Key: {@code iot:alarm:state:{recordId}} (Hash)
* <pre>
* fields:
* ack_state — 0/1
* clear_state — 0/1
* archived — 0/1
* severity — 1-5
* alarm_time — epoch millis最近触发时间
* clear_time — epoch millis清除时间未清除时为 0
* TTL: 7 天,每次 write 时 expire 刷新
* </pre>
*
* <p>设计原则:
* <ul>
* <li>读取时 cache miss → 从 DB 重建(按需加载,不做启动预热)</li>
* <li>DB 写后立即更新缓存(在分布式锁保护下,不存在并发脏写)</li>
* <li>缓存失效后 fallback 走 DB性能下降但不故障</li>
* </ul>
*
* @author B14
*/
@Slf4j
@Service
public class IotAlarmCacheService {
/** 缓存 key 前缀 */
private static final String KEY_PREFIX = "iot:alarm:state:";
/** TTL 7 天 */
private static final long TTL_DAYS = 7L;
/** Hash field 名常量 */
private static final String F_ACK_STATE = "ack_state";
private static final String F_CLEAR_STATE = "clear_state";
private static final String F_ARCHIVED = "archived";
private static final String F_SEVERITY = "severity";
private static final String F_ALARM_TIME = "alarm_time";
private static final String F_CLEAR_TIME = "clear_time";
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private IotAlarmRecordMapper alarmRecordMapper;
// ==================== 读取 ====================
/**
* 获取告警缓存状态。
* <p>若 cache miss从 DB 重建并写入缓存后返回DB 也不存在则返回 {@code null}。
*
* @param recordId 告警记录 ID
* @return AlarmCacheState或 nullDB 中不存在该记录)
*/
public AlarmCacheState get(Long recordId) {
String key = buildKey(recordId);
Map<Object, Object> fields = stringRedisTemplate.opsForHash().entries(key);
if (!fields.isEmpty()) {
return fromMap(fields);
}
// Cache miss → DB 重建
log.debug("[get][alarm cache miss从 DB 重建recordId={}]", recordId);
IotAlarmRecordDO record = alarmRecordMapper.selectById(recordId);
if (record == null) {
return null;
}
updateState(recordId, record);
return buildFromDO(record);
}
// ==================== 写入 ====================
/**
* 在 DB 写入后同步更新 Redis Hash 缓存(每次 write 刷新 TTL
*
* @param recordId 告警记录 ID
* @param record 已持久化的 DO
*/
public void updateState(Long recordId, IotAlarmRecordDO record) {
String key = buildKey(recordId);
Map<String, String> map = new HashMap<>(8);
map.put(F_ACK_STATE, safeInt(record.getAckState()));
map.put(F_CLEAR_STATE, safeInt(record.getClearState()));
map.put(F_ARCHIVED, safeInt(record.getArchived()));
map.put(F_SEVERITY, safeInt(record.getSeverity()));
map.put(F_ALARM_TIME, toEpochMilli(record.getEndTs()));
map.put(F_CLEAR_TIME, toEpochMilli(record.getClearTs()));
stringRedisTemplate.opsForHash().putAll(key, map);
stringRedisTemplate.expire(key, TTL_DAYS, TimeUnit.DAYS);
log.debug("[updateState][alarm cache 已更新recordId={}, clearState={}]",
recordId, record.getClearState());
}
// ==================== 清除 ====================
/**
* 主动驱逐缓存(告警归档时可调用以节省内存)。
*
* @param recordId 告警记录 ID
*/
public void evict(Long recordId) {
stringRedisTemplate.delete(buildKey(recordId));
log.debug("[evict][alarm cache 已清除recordId={}]", recordId);
}
// ==================== 私有方法 ====================
private String buildKey(Long recordId) {
return KEY_PREFIX + recordId;
}
private AlarmCacheState fromMap(Map<Object, Object> fields) {
return AlarmCacheState.builder()
.ackState(parseInt(fields.get(F_ACK_STATE)))
.clearState(parseInt(fields.get(F_CLEAR_STATE)))
.archived(parseInt(fields.get(F_ARCHIVED)))
.severity(parseInt(fields.get(F_SEVERITY)))
.alarmTime(parseLong(fields.get(F_ALARM_TIME)))
.clearTime(parseLong(fields.get(F_CLEAR_TIME)))
.build();
}
private AlarmCacheState buildFromDO(IotAlarmRecordDO record) {
return AlarmCacheState.builder()
.ackState(safeIntVal(record.getAckState()))
.clearState(safeIntVal(record.getClearState()))
.archived(safeIntVal(record.getArchived()))
.severity(safeIntVal(record.getSeverity()))
.alarmTime(toEpochMilliLong(record.getEndTs()))
.clearTime(toEpochMilliLong(record.getClearTs()))
.build();
}
private static String safeInt(Integer v) {
return v == null ? "0" : String.valueOf(v);
}
private static int safeIntVal(Integer v) {
return v == null ? 0 : v;
}
private static String toEpochMilli(LocalDateTime ldt) {
if (ldt == null) {
return "0";
}
return String.valueOf(ldt.toInstant(ZoneOffset.UTC).toEpochMilli());
}
private static long toEpochMilliLong(LocalDateTime ldt) {
if (ldt == null) {
return 0L;
}
return ldt.toInstant(ZoneOffset.UTC).toEpochMilli();
}
private static int parseInt(Object v) {
if (v == null) {
return 0;
}
try {
return Integer.parseInt(v.toString());
} catch (NumberFormatException e) {
return 0;
}
}
private static long parseLong(Object v) {
if (v == null) {
return 0L;
}
try {
return Long.parseLong(v.toString());
} catch (NumberFormatException e) {
return 0L;
}
}
}

View File

@@ -0,0 +1,106 @@
package com.viewsh.module.iot.service.alarm;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;
import static com.viewsh.framework.common.exception.util.ServiceExceptionUtil.exception;
import static com.viewsh.module.iot.enums.ErrorCodeConstants.ALARM_LOCK_CONFLICT;
/**
* 告警分布式锁 Service
*
* <p>基于 Redis SET NX PX + Lua 原子解锁,防止并发告警状态改写冲突。
*
* <p>Redis Key: {@code iot:alarm:lock:{recordId}} (String)
* <pre>
* value: UUID token用于防止 A 删 B 的锁)
* TTL: 由调用方传入(建议 5-10 秒)
* </pre>
*
* <p>Known Pitfalls 落地:
* <ul>
* <li>⚠️ [评审 C4] 禁用 SETNX改用 {@code SET NX PX}(通过 {@code setIfAbsent(key, value, timeout)} 实现)</li>
* <li>⚠️ [评审 C4] 释放锁用 Lua 脚本验证 token防止 A 释放了 B 持有的锁</li>
* <li>⚠️ 锁获取失败直接抛 {@code ALARM_LOCK_CONFLICT},不循环等待(由上游 RuleEngine 记录 metric 后重试)</li>
* </ul>
*
* @author B14
*/
@Slf4j
@Service
public class IotAlarmLockService {
/** 锁 key 前缀 */
private static final String LOCK_KEY_PREFIX = "iot:alarm:lock:";
/**
* Lua 脚本:仅当 key 的值等于 token 时才删除,避免 A 删了 B 的锁。
* <p>返回 1 表示删除成功0 表示 token 不匹配(锁已过期或被其他人持有)。
*/
private static final DefaultRedisScript<Long> RELEASE_SCRIPT;
static {
RELEASE_SCRIPT = new DefaultRedisScript<>();
RELEASE_SCRIPT.setScriptText(
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end"
);
RELEASE_SCRIPT.setResultType(Long.class);
}
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 在分布式锁保护下执行指定操作。
*
* <p>获取锁失败(另一个线程/进程持有该记录的锁)立即抛出 {@link com.viewsh.framework.common.exception.ServiceException}
* 错误码 {@code ALARM_LOCK_CONFLICT}。
*
* <p>无论操作是否成功finally 块中用 Lua 脚本释放锁(仅释放自己的 token
*
* @param recordId 告警记录 ID
* @param timeout 锁超时时长(建议 5-10 秒)
* @param action 被保护的业务逻辑
* @param <T> 返回值类型
* @return action 的返回值
* @throws com.viewsh.framework.common.exception.ServiceException ALARM_LOCK_CONFLICT 锁冲突
*/
public <T> T executeWithLock(Long recordId, Duration timeout, Supplier<T> action) {
String key = LOCK_KEY_PREFIX + recordId;
String token = UUID.randomUUID().toString();
// SET NX PX — 原子加锁(评审 C4 要求)
Boolean acquired = stringRedisTemplate.opsForValue().setIfAbsent(key, token, timeout);
if (!Boolean.TRUE.equals(acquired)) {
log.warn("[executeWithLock][获取告警锁失败recordId={}, 锁冲突]", recordId);
throw exception(ALARM_LOCK_CONFLICT);
}
log.debug("[executeWithLock][获取告警锁成功recordId={}, token={}]", recordId, token);
try {
return action.get();
} finally {
// Lua 原子释放:只删自己的 token评审 C4 要求)
Long result = stringRedisTemplate.execute(RELEASE_SCRIPT, List.of(key), token);
if (result == null || result == 0L) {
log.warn("[executeWithLock][Lua 解锁失败token 不匹配或锁已过期recordId={}, token={}]",
recordId, token);
} else {
log.debug("[executeWithLock][告警锁已释放recordId={}]", recordId);
}
}
}
}

View File

@@ -12,6 +12,7 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;
import java.time.Duration;
import java.time.LocalDateTime;
import static com.viewsh.framework.common.exception.util.ServiceExceptionUtil.exception;
@@ -21,26 +22,38 @@ import static com.viewsh.module.iot.enums.ErrorCodeConstants.*;
* IoT 告警记录 Service 实现v2.0 正交状态机)
* <p>
* 本卡职责:建表 / 枚举 / DO / Service 状态机核心 + 幂等 upsert。
* <b>不实现</b>:分布式锁(B14、告警传播B15、通知B16
* 告警条件结构化存储(评审 E4、Redis 实时计数B14/O1)。
* <b>B14 叠加</b>:分布式锁(SET NX PX + Lua+ Redis 状态缓存 + 时序有效性校验。
* <b>不实现</b>告警传播B15、通知B16)。
* <p>
* Known Pitfalls 落地:
* - ⚠️ 评审 C1正交三字段替代线性 4 枚举({@link IotAlarmRecordDO}
* - ⚠️ 评审 C2联合 UK 幂等 upsert{@link IotAlarmRecordMapper#selectActiveByDeviceAndConfig}
* - ⚠️ 评审 C4TODO B14 加分布式锁triggerAlarm 当前仅 CAS
* - ⚠️ 评审 C4分布式锁({@link IotAlarmLockService}+ Lua 原子解锁
* - ⚠️ 归档后不可修改5 个状态迁移 API 都先校验 archived=0
* - ⚠️ tenant_id基于 {@link TenantContextHolder}
*
* @author B12
* @author B12 / B14
*/
@Slf4j
@Service
@Validated
public class IotAlarmRecordServiceImpl implements IotAlarmRecordService {
/** 分布式锁超时10 秒:覆盖 DB 写 + history 写 + 网络抖动) */
private static final Duration LOCK_TIMEOUT = Duration.ofSeconds(10);
@Resource
private IotAlarmRecordMapper alarmRecordMapper;
@Resource
private IotAlarmLockService lockService;
@Resource
private IotAlarmCacheService cacheService;
@Resource
private AlarmStateValidator validator;
// ==================== 触发(幂等 upsert ====================
@Override
@@ -60,15 +73,53 @@ public class IotAlarmRecordServiceImpl implements IotAlarmRecordService {
Long tenantId = TenantContextHolder.getTenantId();
LocalDateTime now = LocalDateTime.now();
// 2. 幂等 upsert评审 C2
// TODO B14此处应套分布式锁 SET NX PX + Lua防并发 trigger 造成双写
// 2. 先查或创建 alarm_recordUK 幂等,不在锁内,避免死锁
IotAlarmRecordDO existing = alarmRecordMapper.selectActiveByDeviceAndConfig(
request.getDeviceId(), request.getAlarmConfigId(), tenantId);
if (existing != null) {
// 已存在活跃记录 → trigger_count++ + end_ts 更新 + details 覆盖
if (existing == null) {
// 3a. 首次触发:新建 (0,0,0)
IotAlarmRecordDO record = IotAlarmRecordDO.builder()
.alarmConfigId(request.getAlarmConfigId())
.alarmName(request.getAlarmName())
.severity(request.getSeverity())
.ackState(0)
.clearState(0)
.archived(0)
.deviceId(request.getDeviceId())
.productId(request.getProductId())
.subsystemId(request.getSubsystemId())
.ruleChainId(request.getRuleChainId())
.sceneRuleId(request.getSceneRuleId())
.startTs(now)
.endTs(now)
.details(request.getDetails())
.triggerCount(1)
.build();
alarmRecordMapper.insert(record);
log.info("[triggerAlarm] 新建告警 id={} device={} config={}",
record.getId(), request.getDeviceId(), request.getAlarmConfigId());
// 同步缓存(首次,无需锁)
cacheService.updateState(record.getId(), record);
return record.getId();
}
final Long recordId = existing.getId();
// 3b. 已有记录:分布式锁保护状态变更(评审 C4
return lockService.executeWithLock(recordId, LOCK_TIMEOUT, () -> {
// 4. 时序有效性校验(防旧消息重触发已清除告警)
AlarmCacheState cache = cacheService.get(recordId);
long msgTs = request.getTimestamp() > 0 ? request.getTimestamp() : System.currentTimeMillis();
if (!validator.isEffectiveTrigger(cache, msgTs)) {
log.debug("[triggerAlarm] 忽略旧触发消息recordId={}, msgTs={}, clearTime={}",
recordId, msgTs, cache != null ? cache.getClearTime() : "N/A");
return recordId;
}
// 5. DB upserttrigger_count++ + end_ts 更新 + details 覆盖
IotAlarmRecordDO update = new IotAlarmRecordDO();
update.setId(existing.getId());
update.setId(recordId);
update.setEndTs(now);
int oldCount = existing.getTriggerCount() == null ? 1 : existing.getTriggerCount();
update.setTriggerCount(oldCount + 1);
@@ -76,32 +127,17 @@ public class IotAlarmRecordServiceImpl implements IotAlarmRecordService {
update.setDetails(request.getDetails());
}
alarmRecordMapper.updateById(update);
log.debug("[triggerAlarm] 累积触发 id={} count={}", existing.getId(), oldCount + 1);
return existing.getId();
}
log.debug("[triggerAlarm] 累积触发 id={} count={}", recordId, oldCount + 1);
// 3. 新建0, 0, 0
IotAlarmRecordDO record = IotAlarmRecordDO.builder()
.alarmConfigId(request.getAlarmConfigId())
.alarmName(request.getAlarmName())
.severity(request.getSeverity())
.ackState(0)
.clearState(0)
.archived(0)
.deviceId(request.getDeviceId())
.productId(request.getProductId())
.subsystemId(request.getSubsystemId())
.ruleChainId(request.getRuleChainId())
.sceneRuleId(request.getSceneRuleId())
.startTs(now)
.endTs(now)
.details(request.getDetails())
.triggerCount(1)
.build();
alarmRecordMapper.insert(record);
log.info("[triggerAlarm] 新建告警 id={} device={} config={}",
record.getId(), request.getDeviceId(), request.getAlarmConfigId());
return record.getId();
// 6. 同步缓存DB 写后立即更新
// 重新拉取最新 DO含 update 后字段)
IotAlarmRecordDO latest = alarmRecordMapper.selectById(recordId);
if (latest != null) {
cacheService.updateState(recordId, latest);
}
return recordId;
});
}
// ==================== 状态迁移CAS by Service 层) ====================
@@ -113,15 +149,25 @@ public class IotAlarmRecordServiceImpl implements IotAlarmRecordService {
log.debug("[ackAlarm] 告警 id={} 已确认,幂等跳过", alarm.getId());
return;
}
IotAlarmRecordDO update = new IotAlarmRecordDO();
update.setId(alarm.getId());
update.setAckState(1);
update.setAckTs(LocalDateTime.now());
update.setUpdater(request.getOperator());
if (request.getRemark() != null && !request.getRemark().isBlank()) {
update.setProcessRemark(request.getRemark());
}
alarmRecordMapper.updateById(update);
lockService.executeWithLock(alarm.getId(), LOCK_TIMEOUT, () -> {
IotAlarmRecordDO update = new IotAlarmRecordDO();
update.setId(alarm.getId());
update.setAckState(1);
update.setAckTs(LocalDateTime.now());
update.setUpdater(request.getOperator());
if (request.getRemark() != null && !request.getRemark().isBlank()) {
update.setProcessRemark(request.getRemark());
}
alarmRecordMapper.updateById(update);
// 同步缓存
IotAlarmRecordDO latest = alarmRecordMapper.selectById(alarm.getId());
if (latest != null) {
cacheService.updateState(alarm.getId(), latest);
}
return null;
});
}
@Override
@@ -131,15 +177,25 @@ public class IotAlarmRecordServiceImpl implements IotAlarmRecordService {
log.debug("[unackAlarm] 告警 id={} 未确认,幂等跳过", alarm.getId());
return;
}
IotAlarmRecordDO update = new IotAlarmRecordDO();
update.setId(alarm.getId());
update.setAckState(0);
update.setAckTs(null);
update.setUpdater(request.getOperator());
if (request.getRemark() != null && !request.getRemark().isBlank()) {
update.setProcessRemark(request.getRemark());
}
alarmRecordMapper.updateById(update);
lockService.executeWithLock(alarm.getId(), LOCK_TIMEOUT, () -> {
IotAlarmRecordDO update = new IotAlarmRecordDO();
update.setId(alarm.getId());
update.setAckState(0);
update.setAckTs(null);
update.setUpdater(request.getOperator());
if (request.getRemark() != null && !request.getRemark().isBlank()) {
update.setProcessRemark(request.getRemark());
}
alarmRecordMapper.updateById(update);
// 同步缓存
IotAlarmRecordDO latest = alarmRecordMapper.selectById(alarm.getId());
if (latest != null) {
cacheService.updateState(alarm.getId(), latest);
}
return null;
});
}
@Override
@@ -149,15 +205,25 @@ public class IotAlarmRecordServiceImpl implements IotAlarmRecordService {
log.debug("[clearAlarm] 告警 id={} 已清除,幂等跳过", alarm.getId());
return;
}
IotAlarmRecordDO update = new IotAlarmRecordDO();
update.setId(alarm.getId());
update.setClearState(1);
update.setClearTs(LocalDateTime.now());
update.setUpdater(request.getOperator());
if (request.getRemark() != null && !request.getRemark().isBlank()) {
update.setProcessRemark(request.getRemark());
}
alarmRecordMapper.updateById(update);
lockService.executeWithLock(alarm.getId(), LOCK_TIMEOUT, () -> {
IotAlarmRecordDO update = new IotAlarmRecordDO();
update.setId(alarm.getId());
update.setClearState(1);
update.setClearTs(LocalDateTime.now());
update.setUpdater(request.getOperator());
if (request.getRemark() != null && !request.getRemark().isBlank()) {
update.setProcessRemark(request.getRemark());
}
alarmRecordMapper.updateById(update);
// 同步缓存
IotAlarmRecordDO latest = alarmRecordMapper.selectById(alarm.getId());
if (latest != null) {
cacheService.updateState(alarm.getId(), latest);
}
return null;
});
}
@Override
@@ -167,15 +233,22 @@ public class IotAlarmRecordServiceImpl implements IotAlarmRecordService {
// 归档非幂等:已归档再调用抛业务异常
throw exception(ALARM_ALREADY_ARCHIVED);
}
IotAlarmRecordDO update = new IotAlarmRecordDO();
update.setId(alarm.getId());
update.setArchived(1);
update.setArchiveTs(LocalDateTime.now());
update.setUpdater(request.getOperator());
if (request.getRemark() != null && !request.getRemark().isBlank()) {
update.setProcessRemark(request.getRemark());
}
alarmRecordMapper.updateById(update);
lockService.executeWithLock(alarm.getId(), LOCK_TIMEOUT, () -> {
IotAlarmRecordDO update = new IotAlarmRecordDO();
update.setId(alarm.getId());
update.setArchived(1);
update.setArchiveTs(LocalDateTime.now());
update.setUpdater(request.getOperator());
if (request.getRemark() != null && !request.getRemark().isBlank()) {
update.setProcessRemark(request.getRemark());
}
alarmRecordMapper.updateById(update);
// 归档后驱逐缓存(节省内存,可接受 cache miss fallback 到 DB
cacheService.evict(alarm.getId());
return null;
});
}
// ==================== 查询 ====================

View File

@@ -44,4 +44,12 @@ public class AlarmTriggerRequest {
/** 触发详情JSON */
private JsonNode details;
/**
* 消息时间戳epoch millis来自设备上报时间或规则引擎推断
* <p>用于 B14 {@link com.viewsh.module.iot.service.alarm.AlarmStateValidator#isEffectiveTrigger} 时序校验:
* 防止延迟到达的旧触发消息重新激活已清除的告警。
* <p>若未填写0 或 null默认使用当前时间System.currentTimeMillis())。
*/
private long timestamp;
}

View File

@@ -0,0 +1,167 @@
package com.viewsh.module.iot.mq.consumer.rule;
import com.viewsh.framework.test.core.ut.BaseMockitoUnitTest;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
import com.viewsh.module.iot.rule.config.IotRuleEngineVersionResolver;
import com.viewsh.module.iot.rule.engine.RuleEngine;
import com.viewsh.module.iot.rule.engine.RuleEngineResult;
import com.viewsh.module.iot.service.device.IotDeviceService;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
/**
* {@link IotRuleEngineMessageHandler} 单元测试
*
* <p>验证 B9 的六个场景(见任务卡 §6 Test Cases
* <ol>
* <li>全 v1 — shouldUseV2=false → handler 提前返回ruleEngine 不调用</li>
* <li>全 v2 — shouldUseV2=true → ruleEngine.execute() 被调用</li>
* <li>hybrid + 白名单命中 — shouldUseV2=true → ruleEngine.execute() 被调用</li>
* <li>hybrid + 白名单未命中 — shouldUseV2=false → handler 提前返回</li>
* <li>device 已删除null— log WARN + skipruleEngine 不调用</li>
* <li>RuleEngine 抛异常 — handler 捕获异常,不向上抛出</li>
* </ol>
*/
class IotRuleEngineMessageHandlerTest extends BaseMockitoUnitTest {
@InjectMocks
private IotRuleEngineMessageHandler handler;
@Mock
private IotRuleEngineVersionResolver versionResolver;
@Mock
private RuleEngine ruleEngine;
@Mock
private IotDeviceService deviceService;
// ---- 测试数据 ----
private static final Long DEVICE_ID = 100L;
private static final Long TENANT_ID = 1L;
private static final Long SUBSYSTEM_ID = 5L;
private static final Long PRODUCT_ID = 10L;
private IotDeviceMessage buildMessage() {
return IotDeviceMessage.builder()
.deviceId(DEVICE_ID)
.tenantId(TENANT_ID)
.build();
}
private IotDeviceDO buildDevice(Long subsystemId) {
IotDeviceDO device = IotDeviceDO.builder()
.id(DEVICE_ID)
.subsystemId(subsystemId)
.productId(PRODUCT_ID)
.build();
device.setTenantId(TENANT_ID);
return device;
}
// ---- 场景 1全 v1shouldUseV2=falsehandler 不处理 ----
@Test
void testOnMessage_globalV1_skipHandler() {
IotDeviceMessage msg = buildMessage();
IotDeviceDO device = buildDevice(SUBSYSTEM_ID);
when(deviceService.getDeviceFromCache(DEVICE_ID)).thenReturn(device);
when(versionResolver.shouldUseV2(SUBSYSTEM_ID, TENANT_ID)).thenReturn(false);
handler.onMessage(msg);
verify(ruleEngine, never()).execute(any(), any(), any(), any(), any());
}
// ---- 场景 2全 v2shouldUseV2=trueruleEngine.execute() 被调用 ----
@Test
void testOnMessage_globalV2_executeRuleEngine() {
IotDeviceMessage msg = buildMessage();
IotDeviceDO device = buildDevice(SUBSYSTEM_ID);
RuleEngineResult result = new RuleEngineResult();
when(deviceService.getDeviceFromCache(DEVICE_ID)).thenReturn(device);
when(versionResolver.shouldUseV2(SUBSYSTEM_ID, TENANT_ID)).thenReturn(true);
when(ruleEngine.execute(eq(msg), eq(TENANT_ID), eq(SUBSYSTEM_ID), eq(PRODUCT_ID), eq(DEVICE_ID)))
.thenReturn(result);
handler.onMessage(msg);
verify(ruleEngine, times(1)).execute(eq(msg), eq(TENANT_ID), eq(SUBSYSTEM_ID), eq(PRODUCT_ID), eq(DEVICE_ID));
}
// ---- 场景 3hybrid + 白名单命中subsystem=5走 v2 ----
@Test
void testOnMessage_hybridWhitelistHit_executeRuleEngine() {
IotDeviceMessage msg = buildMessage();
IotDeviceDO device = buildDevice(SUBSYSTEM_ID);
RuleEngineResult result = new RuleEngineResult();
when(deviceService.getDeviceFromCache(DEVICE_ID)).thenReturn(device);
when(versionResolver.shouldUseV2(SUBSYSTEM_ID, TENANT_ID)).thenReturn(true); // hybrid: subsystem 5 in whitelist
when(ruleEngine.execute(eq(msg), eq(TENANT_ID), eq(SUBSYSTEM_ID), eq(PRODUCT_ID), eq(DEVICE_ID)))
.thenReturn(result);
handler.onMessage(msg);
verify(ruleEngine, times(1)).execute(any(), any(), any(), any(), any());
}
// ---- 场景 4hybrid + 白名单未命中subsystem=3走 v1handler 跳过 ----
@Test
void testOnMessage_hybridWhitelistMiss_skipHandler() {
Long otherSubsystem = 3L;
IotDeviceMessage msg = buildMessage();
IotDeviceDO device = buildDevice(otherSubsystem);
when(deviceService.getDeviceFromCache(DEVICE_ID)).thenReturn(device);
when(versionResolver.shouldUseV2(otherSubsystem, TENANT_ID)).thenReturn(false); // subsystem 3 not in whitelist
handler.onMessage(msg);
verify(ruleEngine, never()).execute(any(), any(), any(), any(), any());
}
// ---- 场景 5device 已删除(返回 nulllog WARN + skip ----
@Test
void testOnMessage_deviceNull_skipHandler() {
IotDeviceMessage msg = buildMessage();
when(deviceService.getDeviceFromCache(DEVICE_ID)).thenReturn(null);
handler.onMessage(msg);
verify(versionResolver, never()).shouldUseV2(any(), any());
verify(ruleEngine, never()).execute(any(), any(), any(), any(), any());
}
// ---- 场景 6RuleEngine 抛异常handler 捕获,不向上抛出 ----
@Test
void testOnMessage_ruleEngineException_swallowed() {
IotDeviceMessage msg = buildMessage();
IotDeviceDO device = buildDevice(SUBSYSTEM_ID);
when(deviceService.getDeviceFromCache(DEVICE_ID)).thenReturn(device);
when(versionResolver.shouldUseV2(SUBSYSTEM_ID, TENANT_ID)).thenReturn(true);
when(ruleEngine.execute(any(), any(), any(), any(), any()))
.thenThrow(new RuntimeException("simulated engine error"));
// 不应向外抛出异常
handler.onMessage(msg);
verify(ruleEngine, times(1)).execute(any(), any(), any(), any(), any());
}
}

View File

@@ -0,0 +1,119 @@
package com.viewsh.module.iot.service.alarm;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
/**
* {@link AlarmStateValidator} 单元测试
*
* <p>任务卡 B14 §3.3 时序校验逻辑:
* <ul>
* <li>isEffectiveTrigger已清除告警 + 旧消息 → false</li>
* <li>isEffectiveTrigger已清除告警 + 新消息 → true重新激活</li>
* <li>isEffectiveTrigger活跃告警任何消息 → true</li>
* <li>isEffectiveTriggercache == null首次→ true</li>
* <li>isEffectiveClear活跃告警 + 新消息 → true</li>
* <li>isEffectiveClear已清除告警 → false幂等</li>
* <li>isEffectiveClear活跃告警 + 旧消息 → false</li>
* <li>isEffectiveClearcache == null → false</li>
* </ul>
*
* @author B14
*/
class AlarmStateValidatorTest {
private final AlarmStateValidator validator = new AlarmStateValidator();
// ==================== isEffectiveTrigger ====================
@Test
void testIsEffectiveTrigger_clearedAlarm_oldMessage_returnFalse() {
AlarmCacheState cache = AlarmCacheState.builder()
.clearState(1)
.clearTime(5000L)
.alarmTime(1000L)
.build();
// 消息时间戳 3000 < clearTime 5000 → 旧消息,不生效
assertFalse(validator.isEffectiveTrigger(cache, 3000L));
}
@Test
void testIsEffectiveTrigger_clearedAlarm_newMessage_returnTrue() {
AlarmCacheState cache = AlarmCacheState.builder()
.clearState(1)
.clearTime(5000L)
.alarmTime(1000L)
.build();
// 消息时间戳 6000 > clearTime 5000 → 重新激活,生效
assertTrue(validator.isEffectiveTrigger(cache, 6000L));
}
@Test
void testIsEffectiveTrigger_activeAlarm_anyMessage_returnTrue() {
AlarmCacheState cache = AlarmCacheState.builder()
.clearState(0)
.alarmTime(1000L)
.clearTime(0L)
.build();
// 活跃告警,任何时间戳都接受
assertTrue(validator.isEffectiveTrigger(cache, 500L));
assertTrue(validator.isEffectiveTrigger(cache, 2000L));
}
@Test
void testIsEffectiveTrigger_nullCache_returnTrue() {
// 首次触发,无缓存
assertTrue(validator.isEffectiveTrigger(null, 1000L));
}
// ==================== isEffectiveClear ====================
@Test
void testIsEffectiveClear_activeAlarm_newMessage_returnTrue() {
AlarmCacheState cache = AlarmCacheState.builder()
.clearState(0)
.alarmTime(2000L)
.build();
// 消息时间戳 3000 > alarmTime 2000 → 有效清除
assertTrue(validator.isEffectiveClear(cache, 3000L));
}
@Test
void testIsEffectiveClear_alreadyCleared_returnFalse() {
AlarmCacheState cache = AlarmCacheState.builder()
.clearState(1)
.alarmTime(2000L)
.clearTime(5000L)
.build();
// 已清除,幂等忽略
assertFalse(validator.isEffectiveClear(cache, 6000L));
}
@Test
void testIsEffectiveClear_activeAlarm_oldMessage_returnFalse() {
AlarmCacheState cache = AlarmCacheState.builder()
.clearState(0)
.alarmTime(5000L)
.build();
// 消息时间戳 3000 < alarmTime 5000 → 旧消息,不清除
assertFalse(validator.isEffectiveClear(cache, 3000L));
}
@Test
void testIsEffectiveClear_nullCache_returnFalse() {
// cache 为 null 表示告警不存在,清除无效
assertFalse(validator.isEffectiveClear(null, 1000L));
}
@Test
void testIsEffectiveClear_sameTimestamp_returnFalse() {
AlarmCacheState cache = AlarmCacheState.builder()
.clearState(0)
.alarmTime(5000L)
.build();
// 严格大于,等于不生效
assertFalse(validator.isEffectiveClear(cache, 5000L));
}
}

View File

@@ -0,0 +1,156 @@
package com.viewsh.module.iot.service.alarm;
import com.viewsh.module.iot.dal.dataobject.alarm.IotAlarmRecordDO;
import com.viewsh.module.iot.dal.mysql.alarm.IotAlarmRecordMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
/**
* {@link IotAlarmCacheService} 单元测试
*
* <p>测试用例:
* <ul>
* <li>testGet_cacheHit缓存命中直接返回</li>
* <li>testCacheMiss_reloadFromDBcache miss → 从 DB 重建</li>
* <li>testGet_cacheAndDbMissRedis + DB 均无 → 返回 null</li>
* <li>testUpdateState写入 Hash + 刷新 TTL</li>
* <li>testEvict删除 key</li>
* </ul>
*
* @author B14
*/
@ExtendWith(MockitoExtension.class)
class IotAlarmCacheServiceTest {
@InjectMocks
private IotAlarmCacheService cacheService;
@Mock
private StringRedisTemplate stringRedisTemplate;
@Mock
private IotAlarmRecordMapper alarmRecordMapper;
@Mock
@SuppressWarnings("rawtypes")
private HashOperations hashOps;
@BeforeEach
@SuppressWarnings("unchecked")
void setUp() {
// lenient: testEvict does not call opsForHash(), so this stub is not used in that test
lenient().when(stringRedisTemplate.opsForHash()).thenReturn(hashOps);
}
// ==================== testGet_cacheHit ====================
@Test
@SuppressWarnings("unchecked")
void testGet_cacheHit() {
Map<Object, Object> fields = new HashMap<>();
fields.put("ack_state", "1");
fields.put("clear_state", "1");
fields.put("archived", "0");
fields.put("severity", "3");
fields.put("alarm_time", "1000");
fields.put("clear_time", "2000");
when(hashOps.entries("iot:alarm:state:42")).thenReturn(fields);
AlarmCacheState state = cacheService.get(42L);
assertNotNull(state);
assertEquals(1, state.getAckState());
assertEquals(1, state.getClearState());
assertEquals(0, state.getArchived());
assertEquals(3, state.getSeverity());
assertEquals(1000L, state.getAlarmTime());
assertEquals(2000L, state.getClearTime());
// DB 不应被查询
verify(alarmRecordMapper, never()).selectById(anyLong());
}
// ==================== testCacheMiss_reloadFromDB ====================
@Test
@SuppressWarnings("unchecked")
void testCacheMiss_reloadFromDB() {
// Redis 返回空 mapcache miss
when(hashOps.entries(anyString())).thenReturn(Collections.emptyMap());
IotAlarmRecordDO record = IotAlarmRecordDO.builder()
.id(99L)
.ackState(0).clearState(0).archived(0).severity(2)
.endTs(LocalDateTime.of(2024, 1, 1, 10, 0, 0))
.build();
when(alarmRecordMapper.selectById(99L)).thenReturn(record);
AlarmCacheState state = cacheService.get(99L);
assertNotNull(state);
assertEquals(0, state.getAckState());
assertEquals(0, state.getClearState());
assertEquals(2, state.getSeverity());
// DB 被查询,且缓存被回填
verify(alarmRecordMapper).selectById(99L);
verify(hashOps).putAll(eq("iot:alarm:state:99"), anyMap());
verify(stringRedisTemplate).expire(eq("iot:alarm:state:99"), eq(7L), eq(TimeUnit.DAYS));
}
// ==================== testGet_cacheAndDbMiss ====================
@Test
@SuppressWarnings("unchecked")
void testGet_cacheAndDbMiss() {
when(hashOps.entries(anyString())).thenReturn(Collections.emptyMap());
when(alarmRecordMapper.selectById(123L)).thenReturn(null);
AlarmCacheState state = cacheService.get(123L);
assertNull(state);
}
// ==================== testUpdateState ====================
@Test
@SuppressWarnings("unchecked")
void testUpdateState() {
IotAlarmRecordDO record = IotAlarmRecordDO.builder()
.id(55L)
.ackState(1).clearState(0).archived(0).severity(4)
.endTs(LocalDateTime.now())
.build();
cacheService.updateState(55L, record);
verify(hashOps).putAll(eq("iot:alarm:state:55"), anyMap());
verify(stringRedisTemplate).expire(eq("iot:alarm:state:55"), eq(7L), eq(TimeUnit.DAYS));
}
// ==================== testEvict ====================
@Test
void testEvict() {
cacheService.evict(777L);
verify(stringRedisTemplate).delete("iot:alarm:state:777");
}
}

View File

@@ -0,0 +1,143 @@
package com.viewsh.module.iot.service.alarm;
import com.viewsh.framework.common.exception.ServiceException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import java.time.Duration;
import java.util.List;
import static com.viewsh.module.iot.enums.ErrorCodeConstants.ALARM_LOCK_CONFLICT;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
/**
* {@link IotAlarmLockService} 单元测试
*
* <p>任务卡 B14 §6 测试用例:
* <ul>
* <li>testLock_acquireRelease正常获取并释放</li>
* <li>testLock_conflict锁被占用时抛 ALARM_LOCK_CONFLICT</li>
* <li>testLock_releaseOtherTokenLua 脚本返回 0token 不匹配warn 日志但不抛异常</li>
* </ul>
*
* @author B14
*/
@ExtendWith(MockitoExtension.class)
class IotAlarmLockServiceTest {
@InjectMocks
private IotAlarmLockService lockService;
@Mock
private StringRedisTemplate stringRedisTemplate;
@Mock
private ValueOperations<String, String> valueOps;
@BeforeEach
void setUp() {
when(stringRedisTemplate.opsForValue()).thenReturn(valueOps);
}
// ==================== testLock_acquireRelease ====================
/**
* 正常获取锁 + 执行 action + 释放锁Lua 返回 1
*/
@Test
@SuppressWarnings("unchecked")
void testLock_acquireRelease() {
// SET NX 成功
when(valueOps.setIfAbsent(anyString(), anyString(), any(Duration.class))).thenReturn(true);
// Lua 解锁返回 1成功删除
when(stringRedisTemplate.execute(any(DefaultRedisScript.class), anyList(), any())).thenReturn(1L);
String result = lockService.executeWithLock(100L, Duration.ofSeconds(5), () -> "ok");
assertEquals("ok", result);
verify(valueOps).setIfAbsent(eq("iot:alarm:lock:100"), anyString(), eq(Duration.ofSeconds(5)));
// Lua 解锁被执行
verify(stringRedisTemplate).execute(
any(DefaultRedisScript.class),
eq(List.of("iot:alarm:lock:100")),
anyString()
);
}
// ==================== testLock_conflict ====================
/**
* SET NX 返回 false另一个进程持有锁→ 抛 ALARM_LOCK_CONFLICT
*/
@Test
void testLock_conflict() {
// setIfAbsent 返回 false → 获取锁失败
when(valueOps.setIfAbsent(anyString(), anyString(), any(Duration.class))).thenReturn(false);
ServiceException ex = assertThrows(ServiceException.class,
() -> lockService.executeWithLock(200L, Duration.ofSeconds(5), () -> "should not run"));
assertEquals(ALARM_LOCK_CONFLICT.getCode(), ex.getCode());
// action 未执行,也不应尝试解锁
verify(stringRedisTemplate, never()).execute(any(DefaultRedisScript.class), anyList(), anyString());
}
// ==================== testLock_releaseOtherToken ====================
/**
* Lua 脚本返回 0token 不匹配锁已过期B 拿到了A 释放时不删 B 的锁,仅打 warn 日志
*/
@Test
@SuppressWarnings("unchecked")
void testLock_releaseOtherToken() {
// 获取锁成功
when(valueOps.setIfAbsent(anyString(), anyString(), any(Duration.class))).thenReturn(true);
// Lua 返回 0 → token 不匹配
when(stringRedisTemplate.execute(any(DefaultRedisScript.class), anyList(), any())).thenReturn(0L);
// 不应抛异常(仅 warn 日志,不影响业务)
assertDoesNotThrow(() -> lockService.executeWithLock(300L, Duration.ofSeconds(5), () -> "done"));
// Lua 脚本被调用了
verify(stringRedisTemplate).execute(
any(DefaultRedisScript.class),
eq(List.of("iot:alarm:lock:300")),
anyString()
);
}
// ==================== action 异常时也要释放锁 ====================
/**
* action 抛出异常时finally 块中仍应执行 Lua 解锁
*/
@Test
@SuppressWarnings("unchecked")
void testLock_releaseOnException() {
when(valueOps.setIfAbsent(anyString(), anyString(), any(Duration.class))).thenReturn(true);
when(stringRedisTemplate.execute(any(DefaultRedisScript.class), anyList(), any())).thenReturn(1L);
RuntimeException thrown = assertThrows(RuntimeException.class,
() -> lockService.executeWithLock(400L, Duration.ofSeconds(5), () -> {
throw new RuntimeException("action failed");
}));
assertEquals("action failed", thrown.getMessage());
// 即使 action 失败,锁也应被释放
verify(stringRedisTemplate).execute(
any(DefaultRedisScript.class),
eq(List.of("iot:alarm:lock:400")),
anyString()
);
}
}

View File

@@ -18,6 +18,9 @@ import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import java.time.Duration;
import java.util.function.Supplier;
import static com.viewsh.module.iot.enums.ErrorCodeConstants.*;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
@@ -25,9 +28,12 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
/**
* {@link IotAlarmRecordServiceImpl} 单元测试(覆盖任务卡 B12 §6.1 的 10 个用例)。
* {@link IotAlarmRecordServiceImpl} 单元测试(覆盖任务卡 B12 §6.1 + B14 集成路径)。
*
* @author B12
* <p>B14 新增依赖({@link IotAlarmLockService}/{@link IotAlarmCacheService}/{@link AlarmStateValidator}
* 均通过 Mockito @Mock 注入,锁默认透传执行(执行 Supplier不模拟冲突冲突用例在专项测试中
*
* @author B12 / B14
*/
@ExtendWith(MockitoExtension.class)
class IotAlarmRecordServiceImplTest {
@@ -38,6 +44,15 @@ class IotAlarmRecordServiceImplTest {
@Mock
private IotAlarmRecordMapper alarmRecordMapper;
@Mock
private IotAlarmLockService lockService;
@Mock
private IotAlarmCacheService cacheService;
@Mock
private AlarmStateValidator validator;
private MockedStatic<TenantContextHolder> tenantMock;
private static final Long TENANT_ID = 1L;
@@ -45,9 +60,19 @@ class IotAlarmRecordServiceImplTest {
private static final Long CONFIG_ID = 200L;
@BeforeEach
@SuppressWarnings("unchecked")
void setUp() {
tenantMock = mockStatic(TenantContextHolder.class);
tenantMock.when(TenantContextHolder::getTenantId).thenReturn(TENANT_ID);
// 默认lockService 直接执行 Supplier无锁冲突— lenient 避免部分测试不触发锁时报 UnnecessaryStubbing
lenient().doAnswer(inv -> {
Supplier<?> supplier = inv.getArgument(2);
return supplier.get();
}).when(lockService).executeWithLock(any(), any(Duration.class), any(Supplier.class));
// 默认validator 允许所有操作 — lenient 避免首次触发路径不调用 validator 时报 UnnecessaryStubbing
lenient().when(validator.isEffectiveTrigger(any(), anyLong())).thenReturn(true);
}
@AfterEach
@@ -91,6 +116,8 @@ class IotAlarmRecordServiceImplTest {
.build();
when(alarmRecordMapper.selectActiveByDeviceAndConfig(DEVICE_ID, CONFIG_ID, TENANT_ID))
.thenReturn(existing);
// 锁内再查最新 DO
when(alarmRecordMapper.selectById(555L)).thenReturn(existing);
Long id = alarmService.triggerAlarm(buildTriggerReq(3));
@@ -260,8 +287,9 @@ class IotAlarmRecordServiceImplTest {
alarmService.ackAlarm(AlarmStateTransitionRequest.builder().alarmId(17L).build());
// 已确认 → no-op
// 已确认 → no-op,不走锁
verify(alarmRecordMapper, never()).updateById(any(IotAlarmRecordDO.class));
verify(lockService, never()).executeWithLock(any(), any(), any());
}
@Test
@@ -273,6 +301,64 @@ class IotAlarmRecordServiceImplTest {
assertEquals(ALARM_RECORD_NOT_EXISTS.getCode(), ex.getCode());
}
// ==================== B14 新增:旧消息忽略测试 ====================
/**
* testTrigger_oldMessage消息时间戳早于 clear_timevalidator 返回 false → 不更新状态
*/
@Test
@SuppressWarnings("unchecked")
void testTrigger_oldMessage() {
IotAlarmRecordDO existing = IotAlarmRecordDO.builder()
.id(888L).deviceId(DEVICE_ID).alarmConfigId(CONFIG_ID)
.ackState(0).clearState(1).archived(0).triggerCount(5)
.build();
when(alarmRecordMapper.selectActiveByDeviceAndConfig(DEVICE_ID, CONFIG_ID, TENANT_ID))
.thenReturn(existing);
// validator 判断为旧消息
when(validator.isEffectiveTrigger(any(), anyLong())).thenReturn(false);
Long id = alarmService.triggerAlarm(buildTriggerReq(3));
assertEquals(888L, id);
// 旧消息不触发 DB 写
verify(alarmRecordMapper, never()).updateById(any(IotAlarmRecordDO.class));
}
/**
* testTrigger_cacheUpdatetrigger 成功后缓存被更新
*/
@Test
void testTrigger_cacheUpdate() {
IotAlarmRecordDO existing = IotAlarmRecordDO.builder()
.id(777L).deviceId(DEVICE_ID).alarmConfigId(CONFIG_ID)
.ackState(0).clearState(0).archived(0).triggerCount(1)
.build();
when(alarmRecordMapper.selectActiveByDeviceAndConfig(DEVICE_ID, CONFIG_ID, TENANT_ID))
.thenReturn(existing);
when(alarmRecordMapper.selectById(777L)).thenReturn(existing);
when(validator.isEffectiveTrigger(any(), anyLong())).thenReturn(true);
alarmService.triggerAlarm(buildTriggerReq(3));
// 缓存应被更新
verify(cacheService, atLeastOnce()).updateState(eq(777L), any(IotAlarmRecordDO.class));
}
/**
* testArchive_cacheEvict归档后缓存被驱逐
*/
@Test
void testArchive_cacheEvict() {
IotAlarmRecordDO active = activeAlarm(666L);
when(alarmRecordMapper.selectById(666L)).thenReturn(active);
alarmService.archiveAlarm(AlarmStateTransitionRequest.builder().alarmId(666L).build());
verify(cacheService).evict(666L);
}
// ==================== 辅助 ====================
private AlarmTriggerRequest buildTriggerReq(int severity) {