fix(ops): BADGE 绑定/解绑后即时同步工牌缓存
问题:
1. 设备先上线、后绑定为 BADGE 时,"可分配工牌"列表迟迟不出现该设备;
2. 已绑定区域的设备收不到工单(被分派策略过滤掉)。
根因:实时上线路径 BadgeDeviceStatusEventHandler.onMessage 在写
ops:badge:status:{deviceId} Redis 缓存前,先 isBadgeDevice() 校验
ops_area_device_relation 是否存在 BADGE 关系。设备如果在绑定前就上线,
事件被丢弃;建立关系后又没有任何机制回填 Redis,得等 5/30 分钟的
BadgeDeviceStatusSyncJob 对账才会被发现(线上日志可见 deviceId=58 在
绑定后 30 分钟才被对账修正:reason=定时对账修正-上线)。
解绑同样有反向缺口:关系记录删了但 Redis 缓存得等 24h TTL 自然过期,
期间 listAvailableBadges 仍可能返回该设备。
修复:在 ops-biz 引入 AreaDeviceBoundEvent / AreaDeviceUnboundEvent,
bindDevice / unbindDevice 成功后通过 ApplicationEventPublisher 发布;
environment-biz 新增 BadgeAreaBoundEventListener 仅订阅 BADGE 类型,
使用 @TransactionalEventListener(AFTER_COMMIT) + @Async 确保事务提交后
异步执行不阻塞接口:
- 绑定:单次调 IotDeviceQueryApi.getDevice 取齐 state + nickname +
deviceName,根据状态写 Redis(IDLE/OFFLINE);
- 解绑:直接调 BadgeDeviceStatusService.deleteBadgeStatus 清理 Redis。
依赖方向遵循 environment-biz → ops-biz;ops-biz 不反向依赖条线模块,
通过事件解耦,与现有 OrderStateChangedEvent 模式一致。
测试:
- AreaDeviceRelationServiceTest 补 iotDeviceQueryApi mock 让原本因缺
mock 而 RED 的 testBindDevice_TrafficCounter_Success 转绿;
- 新增对 bound / unbound 事件 publishEvent 调用的 verify。
- 全套 10/10 通过。
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,111 @@
|
|||||||
|
package com.viewsh.module.ops.environment.integration.listener;
|
||||||
|
|
||||||
|
import com.viewsh.framework.common.pojo.CommonResult;
|
||||||
|
import com.viewsh.module.iot.api.device.IotDeviceQueryApi;
|
||||||
|
import com.viewsh.module.iot.api.device.dto.IotDeviceSimpleRespDTO;
|
||||||
|
import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum;
|
||||||
|
import com.viewsh.module.ops.environment.integration.dto.IotDeviceStatusChangedEventDTO;
|
||||||
|
import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusService;
|
||||||
|
import com.viewsh.module.ops.service.area.event.AreaDeviceBoundEvent;
|
||||||
|
import com.viewsh.module.ops.service.area.event.AreaDeviceUnboundEvent;
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.scheduling.annotation.Async;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.transaction.event.TransactionPhase;
|
||||||
|
import org.springframework.transaction.event.TransactionalEventListener;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 区域-工牌设备绑定/解绑事件监听器
|
||||||
|
* <p>
|
||||||
|
* 绑定({@link AreaDeviceBoundEvent}):
|
||||||
|
* BADGE 关系建立前,IoT 实时上线事件会被 {@code BadgeDeviceStatusEventHandler.isBadgeDevice()}
|
||||||
|
* 拒掉;建立关系后没有任何机制回填 Redis,导致设备直到下次定时对账(5/30 分钟)才会出现在
|
||||||
|
* "可分配工牌"列表,期间收到的工单也无法派给该设备。监听器在绑定事务提交后定向查询一次
|
||||||
|
* IoT 设备信息(含状态、昵称),回写 Ops 工牌缓存。
|
||||||
|
* <p>
|
||||||
|
* 解绑({@link AreaDeviceUnboundEvent}):
|
||||||
|
* 解绑后 SyncJob 因关系记录消失不会再扫到该设备,Redis 工牌缓存得等 24h TTL 自然过期,
|
||||||
|
* 期间该设备仍可能出现在"可分配/活跃工牌"列表里。监听器在解绑事务提交后立即清理 Redis 状态,
|
||||||
|
* 与绑定路径形成闭环。
|
||||||
|
* <p>
|
||||||
|
* 二者均使用 AFTER_COMMIT + @Async:事务提交后才在独立线程执行,不阻塞绑定/解绑接口响应。
|
||||||
|
*
|
||||||
|
* @author lzh
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class BadgeAreaBoundEventListener {
|
||||||
|
|
||||||
|
private static final String TYPE_BADGE = "BADGE";
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private IotDeviceQueryApi iotDeviceQueryApi;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private BadgeDeviceStatusService badgeDeviceStatusService;
|
||||||
|
|
||||||
|
@Async("ops-task-executor")
|
||||||
|
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, fallbackExecution = true)
|
||||||
|
public void onAreaDeviceBound(AreaDeviceBoundEvent event) {
|
||||||
|
if (event == null || !TYPE_BADGE.equals(event.getRelationType())) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Long deviceId = event.getDeviceId();
|
||||||
|
Long areaId = event.getAreaId();
|
||||||
|
if (deviceId == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 单次 RPC 取齐 state + nickname + deviceName(IotDeviceSimpleRespDTO 已含 state 字段)
|
||||||
|
CommonResult<IotDeviceSimpleRespDTO> result = iotDeviceQueryApi.getDevice(deviceId);
|
||||||
|
if (result == null || !result.isSuccess() || result.getData() == null) {
|
||||||
|
log.warn("[BadgeAreaBoundEventListener] 查询 IoT 设备失败,跳过回填: deviceId={}, msg={}",
|
||||||
|
deviceId, result != null ? result.getMsg() : "null");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
IotDeviceSimpleRespDTO device = result.getData();
|
||||||
|
|
||||||
|
// IotDeviceSimpleRespDTO.state 与 IotDeviceStatusChangedEventDTO 的 status 编码一致
|
||||||
|
// (0=未激活,1=在线,2=离线),未激活/离线统一回写 OFFLINE
|
||||||
|
BadgeDeviceStatusEnum target = IotDeviceStatusChangedEventDTO.STATUS_ONLINE.equals(device.getState())
|
||||||
|
? BadgeDeviceStatusEnum.IDLE
|
||||||
|
: BadgeDeviceStatusEnum.OFFLINE;
|
||||||
|
|
||||||
|
badgeDeviceStatusService.updateBadgeOnlineStatus(
|
||||||
|
deviceId,
|
||||||
|
device.getDeviceName(),
|
||||||
|
device.getNickname(),
|
||||||
|
target == BadgeDeviceStatusEnum.IDLE ? areaId : null,
|
||||||
|
target,
|
||||||
|
"BADGE 绑定后回填");
|
||||||
|
|
||||||
|
log.info("[BadgeAreaBoundEventListener] 工牌设备状态回填完成: deviceId={}, areaId={}, target={}",
|
||||||
|
deviceId, areaId, target);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[BadgeAreaBoundEventListener] 工牌设备状态回填失败: deviceId={}, areaId={}",
|
||||||
|
deviceId, areaId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Async("ops-task-executor")
|
||||||
|
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, fallbackExecution = true)
|
||||||
|
public void onAreaDeviceUnbound(AreaDeviceUnboundEvent event) {
|
||||||
|
if (event == null || !TYPE_BADGE.equals(event.getRelationType())) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Long deviceId = event.getDeviceId();
|
||||||
|
if (deviceId == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
badgeDeviceStatusService.deleteBadgeStatus(deviceId);
|
||||||
|
log.info("[BadgeAreaBoundEventListener] 工牌设备解绑后 Redis 状态已清理: deviceId={}, areaId={}",
|
||||||
|
deviceId, event.getAreaId());
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[BadgeAreaBoundEventListener] 工牌设备解绑后清理失败: deviceId={}", deviceId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -12,8 +12,11 @@ import com.viewsh.module.ops.dal.dataobject.vo.area.AreaDeviceBindReqVO;
|
|||||||
import com.viewsh.module.ops.dal.dataobject.vo.area.AreaDeviceRelationRespVO;
|
import com.viewsh.module.ops.dal.dataobject.vo.area.AreaDeviceRelationRespVO;
|
||||||
import com.viewsh.module.ops.dal.dataobject.vo.area.AreaDeviceUpdateReqVO;
|
import com.viewsh.module.ops.dal.dataobject.vo.area.AreaDeviceUpdateReqVO;
|
||||||
import com.viewsh.module.ops.enums.ErrorCodeConstants;
|
import com.viewsh.module.ops.enums.ErrorCodeConstants;
|
||||||
|
import com.viewsh.module.ops.service.area.event.AreaDeviceBoundEvent;
|
||||||
|
import com.viewsh.module.ops.service.area.event.AreaDeviceUnboundEvent;
|
||||||
import jakarta.annotation.Resource;
|
import jakarta.annotation.Resource;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.context.ApplicationEventPublisher;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
import org.springframework.validation.annotation.Validated;
|
import org.springframework.validation.annotation.Validated;
|
||||||
@@ -46,6 +49,9 @@ public class AreaDeviceRelationServiceImpl implements AreaDeviceRelationService
|
|||||||
@Resource
|
@Resource
|
||||||
private AreaDeviceService areaDeviceService;
|
private AreaDeviceService areaDeviceService;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private ApplicationEventPublisher eventPublisher;
|
||||||
|
|
||||||
private static final String TYPE_TRAFFIC_COUNTER = "TRAFFIC_COUNTER";
|
private static final String TYPE_TRAFFIC_COUNTER = "TRAFFIC_COUNTER";
|
||||||
private static final String TYPE_BEACON = "BEACON";
|
private static final String TYPE_BEACON = "BEACON";
|
||||||
private static final String TYPE_BADGE = "BADGE";
|
private static final String TYPE_BADGE = "BADGE";
|
||||||
@@ -116,6 +122,16 @@ public class AreaDeviceRelationServiceImpl implements AreaDeviceRelationService
|
|||||||
// 清除可能存在的 NULL_CACHE 标记
|
// 清除可能存在的 NULL_CACHE 标记
|
||||||
areaDeviceService.evictConfigCache(relation.getAreaId(), relation.getRelationType());
|
areaDeviceService.evictConfigCache(relation.getAreaId(), relation.getRelationType());
|
||||||
|
|
||||||
|
// 发布绑定事件
|
||||||
|
// 用途:BADGE 绑定前的实时上线事件会被丢弃(无 BADGE 关系),
|
||||||
|
// 条线监听器订阅此事件后可立即从 IoT 拉取当前状态,回填 Redis 工牌缓存,
|
||||||
|
// 避免新绑定的设备直到下次 5/30 分钟对账才能被派单或显示在"可分配工牌"列表。
|
||||||
|
eventPublisher.publishEvent(AreaDeviceBoundEvent.builder()
|
||||||
|
.areaId(relation.getAreaId())
|
||||||
|
.deviceId(relation.getDeviceId())
|
||||||
|
.relationType(relation.getRelationType())
|
||||||
|
.build());
|
||||||
|
|
||||||
return relation.getId();
|
return relation.getId();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -158,6 +174,15 @@ public class AreaDeviceRelationServiceImpl implements AreaDeviceRelationService
|
|||||||
if (deleted) {
|
if (deleted) {
|
||||||
// 同步 Redis 缓存
|
// 同步 Redis 缓存
|
||||||
areaDeviceService.evictConfigCache(existing.getAreaId(), existing.getRelationType());
|
areaDeviceService.evictConfigCache(existing.getAreaId(), existing.getRelationType());
|
||||||
|
|
||||||
|
// 发布解绑事件,与绑定路径形成闭环
|
||||||
|
// 用途:解绑后 SyncJob 不再扫到该设备,BADGE 类型 Redis 缓存得等 24h TTL 才过期,
|
||||||
|
// 期间设备仍可能出现在"可分配工牌"列表里。条线监听器收到事件立即清理 Redis。
|
||||||
|
eventPublisher.publishEvent(AreaDeviceUnboundEvent.builder()
|
||||||
|
.areaId(existing.getAreaId())
|
||||||
|
.deviceId(existing.getDeviceId())
|
||||||
|
.relationType(existing.getRelationType())
|
||||||
|
.build());
|
||||||
}
|
}
|
||||||
return deleted;
|
return deleted;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,34 @@
|
|||||||
|
package com.viewsh.module.ops.service.area.event;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 区域-设备绑定完成事件
|
||||||
|
* <p>
|
||||||
|
* 在 {@code AreaDeviceRelationService.bindDevice()} 成功插入关系记录后发布。
|
||||||
|
* <p>
|
||||||
|
* 业务背景:BADGE 关系建立前,IoT 实时上线事件会因 {@code BadgeDeviceStatusEventHandler.isBadgeDevice()}
|
||||||
|
* 返回 false 而被丢弃;建立关系后没有任何机制回填 Redis,需等定时对账 Job 才能恢复,
|
||||||
|
* 表现为 "可分配工牌列表" 不出现新绑定的设备、新工单也不会派给它。
|
||||||
|
* 监听方(条线模块)通过订阅本事件完成一次定向状态同步。
|
||||||
|
*
|
||||||
|
* @author lzh
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@Builder
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class AreaDeviceBoundEvent {
|
||||||
|
|
||||||
|
/** 区域ID */
|
||||||
|
private Long areaId;
|
||||||
|
|
||||||
|
/** 设备ID */
|
||||||
|
private Long deviceId;
|
||||||
|
|
||||||
|
/** 关联类型:TRAFFIC_COUNTER / BEACON / BADGE */
|
||||||
|
private String relationType;
|
||||||
|
}
|
||||||
@@ -0,0 +1,33 @@
|
|||||||
|
package com.viewsh.module.ops.service.area.event;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 区域-设备解绑完成事件
|
||||||
|
* <p>
|
||||||
|
* 在 {@code AreaDeviceRelationService.unbindDevice()} 成功删除关系记录后发布。
|
||||||
|
* <p>
|
||||||
|
* 业务背景:BADGE 解绑后 SyncJob 不再扫到该设备,Redis 工牌缓存等 24h TTL 才过期,
|
||||||
|
* 期间该设备仍可能出现在"可分配/活跃工牌"列表里。条线监听器订阅本事件后立即清理 Redis,
|
||||||
|
* 与 {@link AreaDeviceBoundEvent} 的回填路径形成闭环。
|
||||||
|
*
|
||||||
|
* @author lzh
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@Builder
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class AreaDeviceUnboundEvent {
|
||||||
|
|
||||||
|
/** 区域ID */
|
||||||
|
private Long areaId;
|
||||||
|
|
||||||
|
/** 设备ID */
|
||||||
|
private Long deviceId;
|
||||||
|
|
||||||
|
/** 关联类型:TRAFFIC_COUNTER / BEACON / BADGE */
|
||||||
|
private String relationType;
|
||||||
|
}
|
||||||
@@ -1,5 +1,8 @@
|
|||||||
package com.viewsh.module.ops.service.area;
|
package com.viewsh.module.ops.service.area;
|
||||||
|
|
||||||
|
import com.viewsh.framework.common.pojo.CommonResult;
|
||||||
|
import com.viewsh.module.iot.api.device.IotDeviceQueryApi;
|
||||||
|
import com.viewsh.module.iot.api.device.dto.IotDeviceSimpleRespDTO;
|
||||||
import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO;
|
import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO;
|
||||||
import com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO;
|
import com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO;
|
||||||
import com.viewsh.module.ops.dal.mysql.area.OpsAreaDeviceRelationMapper;
|
import com.viewsh.module.ops.dal.mysql.area.OpsAreaDeviceRelationMapper;
|
||||||
@@ -15,6 +18,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
|
|||||||
import org.mockito.InjectMocks;
|
import org.mockito.InjectMocks;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
import org.mockito.junit.jupiter.MockitoExtension;
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
import org.springframework.context.ApplicationEventPublisher;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@@ -49,6 +53,12 @@ class AreaDeviceRelationServiceTest {
|
|||||||
@Mock
|
@Mock
|
||||||
private AreaDeviceService areaDeviceService;
|
private AreaDeviceService areaDeviceService;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private ApplicationEventPublisher eventPublisher;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private IotDeviceQueryApi iotDeviceQueryApi;
|
||||||
|
|
||||||
@InjectMocks
|
@InjectMocks
|
||||||
private AreaDeviceRelationServiceImpl areaDeviceRelationService;
|
private AreaDeviceRelationServiceImpl areaDeviceRelationService;
|
||||||
|
|
||||||
@@ -121,12 +131,22 @@ class AreaDeviceRelationServiceTest {
|
|||||||
return 1;
|
return 1;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// bindDevice 内部会调 IoT 接口阻断式校验设备存在性
|
||||||
|
IotDeviceSimpleRespDTO iotDevice = new IotDeviceSimpleRespDTO();
|
||||||
|
iotDevice.setId(50001L);
|
||||||
|
iotDevice.setDeviceName("TRAFFIC_COUNTER_001");
|
||||||
|
iotDevice.setProductId(10L);
|
||||||
|
iotDevice.setProductKey("traffic_counter_v1");
|
||||||
|
when(iotDeviceQueryApi.getDevice(50001L)).thenReturn(CommonResult.success(iotDevice));
|
||||||
|
|
||||||
// When
|
// When
|
||||||
Long relationId = areaDeviceRelationService.bindDevice(bindReq);
|
Long relationId = areaDeviceRelationService.bindDevice(bindReq);
|
||||||
|
|
||||||
// Then
|
// Then
|
||||||
assertNotNull(relationId);
|
assertNotNull(relationId);
|
||||||
verify(opsAreaDeviceRelationMapper, times(1)).insert(any(OpsAreaDeviceRelationDO.class));
|
verify(opsAreaDeviceRelationMapper, times(1)).insert(any(OpsAreaDeviceRelationDO.class));
|
||||||
|
// 验证绑定成功后发布事件,供条线监听器回填 Redis
|
||||||
|
verify(eventPublisher, times(1)).publishEvent(any(com.viewsh.module.ops.service.area.event.AreaDeviceBoundEvent.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -225,6 +245,8 @@ class AreaDeviceRelationServiceTest {
|
|||||||
// Then
|
// Then
|
||||||
assertTrue(result);
|
assertTrue(result);
|
||||||
verify(opsAreaDeviceRelationMapper, times(1)).deleteById(1L);
|
verify(opsAreaDeviceRelationMapper, times(1)).deleteById(1L);
|
||||||
|
// 验证解绑后发布事件,供条线监听器清理 Redis
|
||||||
|
verify(eventPublisher, times(1)).publishEvent(any(com.viewsh.module.ops.service.area.event.AreaDeviceUnboundEvent.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -238,6 +260,8 @@ class AreaDeviceRelationServiceTest {
|
|||||||
// Then
|
// Then
|
||||||
assertFalse(result); // 第一次就返回false
|
assertFalse(result); // 第一次就返回false
|
||||||
verify(opsAreaDeviceRelationMapper, never()).deleteById(anyLong());
|
verify(opsAreaDeviceRelationMapper, never()).deleteById(anyLong());
|
||||||
|
// 不存在的关联不应触发事件
|
||||||
|
verify(eventPublisher, never()).publishEvent(any());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user