refactor(iot): 移除Redis DAO,使用统一服务接口

- 删除 DeviceCurrentOrderRedisDAO
- 各 RuleProcessor 直接调用 BadgeDeviceStatusService
- 简化调用链,移除中间层

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
lzh
2026-01-25 18:23:15 +08:00
parent 116c1ec773
commit 373f379b01
6 changed files with 35 additions and 264 deletions

View File

@@ -1,178 +0,0 @@
package com.viewsh.module.iot.dal.redis.clean;
import com.viewsh.framework.common.util.json.JsonUtils;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Repository;
import java.util.concurrent.TimeUnit;
/**
* 设备当前工单缓存 Redis DAO
* <p>
* 用于缓存设备当前执行的工单信息(由 Ops 下发)
* 减少物联网模块查询数据库的频率
*
* @author AI
*/
@Repository
public class DeviceCurrentOrderRedisDAO {
/**
* 工单缓存 Key 模式
* <p>
* 格式ops:clean:device:order:{deviceId}
*/
private static final String ORDER_KEY_PATTERN = "ops:clean:device:order:%s";
/**
* 工单缓存的 TTL
* <p>
* 默认保留 1 小时
*/
private static final int ORDER_TTL_SECONDS = 3600;
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 缓存设备当前工单
*
* @param deviceId 设备ID
* @param orderInfo 工单缓存信息
*/
public void cacheCurrentOrder(Long deviceId, OrderCacheInfo orderInfo) {
String key = formatKey(deviceId);
String json = JsonUtils.toJsonString(orderInfo);
stringRedisTemplate.opsForValue().set(key, json, ORDER_TTL_SECONDS, TimeUnit.SECONDS);
}
/**
* 获取当前工单
*
* @param deviceId 设备ID
* @return 工单缓存信息,如果不存在返回 null
*/
public OrderCacheInfo getCurrentOrder(Long deviceId) {
String key = formatKey(deviceId);
String json = stringRedisTemplate.opsForValue().get(key);
if (json == null) {
return null;
}
return JsonUtils.parseObject(json, OrderCacheInfo.class);
}
/**
* 清除当前工单缓存
*
* @param deviceId 设备ID
*/
public void clearCurrentOrder(Long deviceId) {
String key = formatKey(deviceId);
stringRedisTemplate.delete(key);
}
/**
* 缓存被暂停的工单ID
* <p>
* 用于 P0 插队场景,恢复被暂停的工单
*
* @param deviceId 设备ID
* @param pausedOrderId 被暂停的工单ID
*/
public void cachePausedOrder(Long deviceId, Long pausedOrderId) {
String key = formatKey(deviceId) + ":paused";
stringRedisTemplate.opsForValue().set(key, String.valueOf(pausedOrderId),
ORDER_TTL_SECONDS, TimeUnit.SECONDS);
}
/**
* 获取被暂停的工单ID
*
* @param deviceId 设备ID
* @return 被暂停的工单ID如果不存在返回 null
*/
public Long getPausedOrderId(Long deviceId) {
String key = formatKey(deviceId) + ":paused";
String orderIdStr = stringRedisTemplate.opsForValue().get(key);
return orderIdStr != null ? Long.parseLong(orderIdStr) : null;
}
/**
* 清除被暂停的工单缓存
*
* @param deviceId 设备ID
*/
public void clearPausedOrder(Long deviceId) {
String key = formatKey(deviceId) + ":paused";
stringRedisTemplate.delete(key);
}
/**
* 格式化 Redis Key
*/
private static String formatKey(Long deviceId) {
return String.format(ORDER_KEY_PATTERN, deviceId);
}
/**
* 工单缓存信息
*/
public static class OrderCacheInfo {
/**
* 工单ID
*/
private Long orderId;
/**
* 工单状态
* <p>
* DISPATCHED/ARRIVED/PAUSED/COMPLETED
*/
private String status;
/**
* 区域ID
*/
private Long areaId;
/**
* 信标 MAC 地址
*/
private String beaconMac;
public Long getOrderId() {
return orderId;
}
public void setOrderId(Long orderId) {
this.orderId = orderId;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public Long getAreaId() {
return areaId;
}
public void setAreaId(Long areaId) {
this.areaId = areaId;
}
public String getBeaconMac() {
return beaconMac;
}
public void setBeaconMac(String beaconMac) {
this.beaconMac = beaconMac;
}
}
}

View File

@@ -4,9 +4,9 @@ import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderArriveEvent;
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderAuditEvent;
import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
import com.viewsh.module.iot.dal.redis.clean.BadgeDeviceStatusRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.BeaconArrivedTimeRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.BeaconRssiWindowRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.DeviceCurrentOrderRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.SignalLossRedisDAO;
import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
import com.viewsh.module.iot.service.rule.clean.detector.RssiSlidingWindowDetector;
@@ -42,7 +42,7 @@ public class BeaconDetectionRuleProcessor {
private SignalLossRedisDAO signalLossRedisDAO;
@Resource
private DeviceCurrentOrderRedisDAO deviceCurrentOrderRedisDAO;
private BadgeDeviceStatusRedisDAO badgeDeviceStatusRedisDAO;
@Resource
private CleanOrderIntegrationConfigService configService;
@@ -110,8 +110,8 @@ public class BeaconDetectionRuleProcessor {
List<Integer> window = windowRedisDAO.getWindow(deviceId, areaId);
// 6. 获取设备当前工单状态
DeviceCurrentOrderRedisDAO.OrderCacheInfo currentOrder =
deviceCurrentOrderRedisDAO.getCurrentOrder(deviceId);
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder =
badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
// 7. 确定当前状态
RssiSlidingWindowDetector.AreaState currentState = determineState(currentOrder, areaId);
@@ -284,7 +284,7 @@ public class BeaconDetectionRuleProcessor {
* 确定当前状态
*/
private RssiSlidingWindowDetector.AreaState determineState(
DeviceCurrentOrderRedisDAO.OrderCacheInfo currentOrder, Long areaId) {
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder, Long areaId) {
if (currentOrder == null) {
return RssiSlidingWindowDetector.AreaState.OUT_AREA;
@@ -308,8 +308,8 @@ public class BeaconDetectionRuleProcessor {
* @return true-工单切换场景false-正常离岗场景
*/
private boolean isSwitchingOrder(Long deviceId, Long areaId) {
DeviceCurrentOrderRedisDAO.OrderCacheInfo currentOrder =
deviceCurrentOrderRedisDAO.getCurrentOrder(deviceId);
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder =
badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
return currentOrder != null && !currentOrder.getAreaId().equals(areaId);
}
}

View File

@@ -2,7 +2,7 @@ package com.viewsh.module.iot.service.rule.clean.processor;
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
import com.viewsh.module.iot.dal.dataobject.integration.clean.ButtonEventConfig;
import com.viewsh.module.iot.dal.redis.clean.DeviceCurrentOrderRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.BadgeDeviceStatusRedisDAO;
import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
@@ -34,7 +34,7 @@ public class ButtonEventRuleProcessor {
private CleanOrderIntegrationConfigService configService;
@Resource
private DeviceCurrentOrderRedisDAO deviceCurrentOrderRedisDAO;
private BadgeDeviceStatusRedisDAO badgeDeviceStatusRedisDAO;
@Resource
private RocketMQTemplate rocketMQTemplate;
@@ -104,20 +104,15 @@ public class ButtonEventRuleProcessor {
log.info("[ButtonEvent] 确认键按下deviceId={}, buttonId={}", deviceId, buttonId);
// 1. 查询设备当前工单
DeviceCurrentOrderRedisDAO.OrderCacheInfo currentOrderJson = deviceCurrentOrderRedisDAO.getCurrentOrder(deviceId);
if (currentOrderJson == null) {
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
if (currentOrder == null) {
log.warn("[ButtonEvent] 设备无当前工单跳过确认deviceId={}", deviceId);
return;
}
// 2. 解析工单ID简单解析生产环境可用 JSON 库)
Long orderId = extractOrderIdFromJson(currentOrderJson.getOrderId().toString());
if (orderId == null) {
log.warn("[ButtonEvent] 工单ID解析失败deviceId={}, json={}", deviceId, currentOrderJson);
return;
}
Long orderId = currentOrder.getOrderId();
// 3. 防重复检查(短时间内同一工单的确认操作去重)
// 2. 防重复检查(短时间内同一工单的确认操作去重)
String dedupKey = String.format("iot:clean:button:dedup:confirm:%s:%s", deviceId, orderId);
Boolean firstTime = stringRedisTemplate.opsForValue()
.setIfAbsent(dedupKey, "1", 10, java.util.concurrent.TimeUnit.SECONDS);
@@ -127,7 +122,7 @@ public class ButtonEventRuleProcessor {
return;
}
// 4. 发布工单确认事件
// 3. 发布工单确认事件
publishConfirmEvent(configWrapper, orderId, buttonId);
log.info("[ButtonEvent] 发布工单确认事件deviceId={}, orderId={}", deviceId, orderId);
@@ -145,25 +140,18 @@ public class ButtonEventRuleProcessor {
log.info("[ButtonEvent] 查询键按下deviceId={}, buttonId={}", deviceId, buttonId);
// 1. 查询设备当前工单
DeviceCurrentOrderRedisDAO.OrderCacheInfo currentOrderJson = deviceCurrentOrderRedisDAO.getCurrentOrder(deviceId);
if (currentOrderJson == null) {
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
if (currentOrder == null) {
log.info("[ButtonEvent] 设备无当前工单deviceId={}", deviceId);
// 发布查询结果事件(无工单)
publishQueryEvent(configWrapper, null, buttonId, "当前无工单");
return;
}
// 2. 解析工单信息
Long orderId = extractOrderIdFromJson(currentOrderJson.getOrderId().toString());
if (orderId == null) {
log.warn("[ButtonEvent] 工单ID解析失败deviceId={}, json={}", deviceId, currentOrderJson);
return;
}
// 2. 发布查询事件
publishQueryEvent(configWrapper, currentOrder.getOrderId(), buttonId, "查询当前工单");
// 3. 发布查询事件
publishQueryEvent(configWrapper, orderId, buttonId, "查询当前工单");
log.info("[ButtonEvent] 发布工单查询事件deviceId={}, orderId={}", deviceId, orderId);
log.info("[ButtonEvent] 发布工单查询事件deviceId={}, orderId={}", deviceId, currentOrder.getOrderId());
}
/**
@@ -225,41 +213,6 @@ public class ButtonEventRuleProcessor {
}
}
/**
* 从 JSON 字符串中提取工单ID
* <p>
* 简单实现,生产环境建议使用 Jackson 或 Gson
*/
private Long extractOrderIdFromJson(String json) {
try {
// 简单解析:查找 "orderId":12345
int orderIdIndex = json.indexOf("\"orderId\"");
if (orderIdIndex == -1) {
return null;
}
int colonIndex = json.indexOf(":", orderIdIndex);
if (colonIndex == -1) {
return null;
}
int endIndex = json.indexOf(",", colonIndex);
if (endIndex == -1) {
endIndex = json.indexOf("}", colonIndex);
}
if (endIndex == -1) {
return null;
}
String idStr = json.substring(colonIndex + 1, endIndex).trim();
return Long.parseLong(idStr);
} catch (Exception e) {
log.error("[ButtonEvent] 解析工单ID失败json={}", json, e);
return null;
}
}
/**
* 获取配置包装器
*/

View File

@@ -4,9 +4,9 @@ import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderAuditEvent;
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderCompleteEvent;
import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
import com.viewsh.module.iot.dal.redis.clean.BadgeDeviceStatusRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.BeaconArrivedTimeRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.BeaconRssiWindowRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.DeviceCurrentOrderRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.SignalLossRedisDAO;
import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
import com.xxl.job.core.handler.annotation.XxlJob;
@@ -43,7 +43,7 @@ public class SignalLossRuleProcessor {
private BeaconRssiWindowRedisDAO windowRedisDAO;
@Resource
private DeviceCurrentOrderRedisDAO deviceCurrentOrderRedisDAO;
private BadgeDeviceStatusRedisDAO badgeDeviceStatusRedisDAO;
@Resource
private CleanOrderIntegrationConfigService configService;
@@ -85,12 +85,12 @@ public class SignalLossRuleProcessor {
// 解析 deviceId 和 areaId
// Key 格式iot:clean:signal:loss:{deviceId}:{areaId}
String[] parts = key.split(":");
if (parts.length < 6) {
continue;
}
Long deviceId = Long.parseLong(parts[4]);
Long areaId = Long.parseLong(parts[5]);
if (parts.length < 6) {
continue;
}
Long deviceId = Long.parseLong(parts[4]);
Long areaId = Long.parseLong(parts[5]);
// 检查超时
checkTimeoutForDevice(deviceId, areaId);
@@ -227,8 +227,8 @@ public class SignalLossRuleProcessor {
deviceId, areaId, durationMs);
// 1. 获取当前工单
DeviceCurrentOrderRedisDAO.OrderCacheInfo currentOrder =
deviceCurrentOrderRedisDAO.getCurrentOrder(deviceId);
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder =
badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
if (currentOrder == null) {
log.warn("[SignalLoss] 设备无当前工单deviceId={}", deviceId);
@@ -332,8 +332,8 @@ public class SignalLossRuleProcessor {
* @return true-工单切换场景false-正常离岗场景
*/
private boolean isSwitchingOrder(Long deviceId, Long areaId) {
DeviceCurrentOrderRedisDAO.OrderCacheInfo currentOrder =
deviceCurrentOrderRedisDAO.getCurrentOrder(deviceId);
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder =
badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
return currentOrder != null && !currentOrder.getAreaId().equals(areaId);
}
}

View File

@@ -3,7 +3,6 @@ package com.viewsh.module.iot.service.rule.clean.processor;
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderCreateEvent;
import com.viewsh.module.iot.dal.dataobject.integration.clean.TrafficThresholdConfig;
import com.viewsh.module.iot.dal.redis.clean.DeviceCurrentOrderRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.TrafficCounterBaseRedisDAO;
import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
import jakarta.annotation.Resource;
@@ -34,9 +33,6 @@ public class TrafficThresholdRuleProcessor {
@Resource
private TrafficCounterBaseRedisDAO trafficBaseRedisDAO;
@Resource
private DeviceCurrentOrderRedisDAO deviceCurrentOrderRedisDAO;
@Resource
private RocketMQTemplate rocketMQTemplate;

View File

@@ -3,9 +3,9 @@ package com.viewsh.module.iot.service.rule.clean.processor;
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
import com.viewsh.module.iot.dal.dataobject.integration.clean.CleanOrderIntegrationConfig;
import com.viewsh.module.iot.dal.redis.clean.BadgeDeviceStatusRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.BeaconArrivedTimeRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.BeaconRssiWindowRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.DeviceCurrentOrderRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.SignalLossRedisDAO;
import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
@@ -37,7 +37,7 @@ class SignalLossRuleProcessorTest {
@Mock
private BeaconRssiWindowRedisDAO windowRedisDAO;
@Mock
private DeviceCurrentOrderRedisDAO deviceCurrentOrderRedisDAO;
private BadgeDeviceStatusRedisDAO badgeDeviceStatusRedisDAO;
@Mock
private CleanOrderIntegrationConfigService configService;
@Mock
@@ -71,10 +71,10 @@ class SignalLossRuleProcessorTest {
when(arrivedTimeRedisDAO.getArrivedTime(DEVICE_ID, AREA_ID)).thenReturn(arrivedTime);
// Mock Current Order
DeviceCurrentOrderRedisDAO.OrderCacheInfo orderInfo = new DeviceCurrentOrderRedisDAO.OrderCacheInfo();
BadgeDeviceStatusRedisDAO.OrderInfo orderInfo = new BadgeDeviceStatusRedisDAO.OrderInfo();
orderInfo.setOrderId(500L);
orderInfo.setAreaId(AREA_ID); // Same area, valid
when(deviceCurrentOrderRedisDAO.getCurrentOrder(DEVICE_ID)).thenReturn(orderInfo);
when(badgeDeviceStatusRedisDAO.getCurrentOrder(DEVICE_ID)).thenReturn(orderInfo);
// Mock Config
BeaconPresenceConfig.ExitConfig exitConfig = new BeaconPresenceConfig.ExitConfig();