Compare commits
3 Commits
3443d4dcd4
...
940a968560
| Author | SHA1 | Date | |
|---|---|---|---|
| 940a968560 | |||
| d34f3bd80d | |||
| 5edbc9f287 |
@@ -19,6 +19,7 @@ import org.springframework.stereotype.Component;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* 蓝牙信标检测规则处理器
|
||||
@@ -53,6 +54,12 @@ public class BeaconDetectionRuleProcessor {
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
/**
|
||||
* 上次检测的工单ID缓存(设备ID -> 工单ID)
|
||||
* 用于检测工单切换,清理旧工单的检测状态
|
||||
*/
|
||||
private final Map<Long, Long> lastDetectedOrderCache = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 处理蓝牙属性上报
|
||||
* <p>
|
||||
@@ -75,12 +82,27 @@ public class BeaconDetectionRuleProcessor {
|
||||
|
||||
if (currentOrder == null || currentOrder.getAreaId() == null) {
|
||||
log.debug("[BeaconDetection] 无当前工单,跳过检测:deviceId={}", deviceId);
|
||||
// 无工单时清理本地缓存
|
||||
lastDetectedOrderCache.remove(deviceId);
|
||||
return;
|
||||
}
|
||||
|
||||
Long areaId = currentOrder.getAreaId();
|
||||
Long orderId = currentOrder.getOrderId();
|
||||
|
||||
// 3. 检测工单切换,清理旧工单的检测状态
|
||||
Long lastOrderId = lastDetectedOrderCache.get(deviceId);
|
||||
if (lastOrderId != null && !lastOrderId.equals(orderId)) {
|
||||
log.warn("[BeaconDetection] 检测到工单切换,清理旧工单的检测状态: " +
|
||||
"deviceId={}, oldOrderId={}, newOrderId={}", deviceId, lastOrderId, orderId);
|
||||
// 清理旧的检测状态(清理当前设备的所有区域检测状态)
|
||||
cleanupAllDetectionState(deviceId);
|
||||
}
|
||||
// 更新缓存
|
||||
lastDetectedOrderCache.put(deviceId, orderId);
|
||||
|
||||
log.debug("[BeaconDetection] 从工单状态获取区域:deviceId={}, areaId={}, orderId={}",
|
||||
deviceId, areaId, currentOrder.getOrderId());
|
||||
deviceId, areaId, orderId);
|
||||
|
||||
// 3. 获取该区域的信标配置(从 BEACON 类型的设备获取)
|
||||
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper beaconConfigWrapper = configService
|
||||
@@ -331,4 +353,26 @@ public class BeaconDetectionRuleProcessor {
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
|
||||
return currentOrder != null && !currentOrder.getAreaId().equals(areaId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理设备所有区域的检测状态
|
||||
* <p>
|
||||
* 用于工单切换场景,清理本地缓存。
|
||||
* Redis 数据(arrivedTime、signalLoss、rssiWindow)由以下路径清理:
|
||||
* <ul>
|
||||
* <li>工单完成时:SignalLossRuleProcessor.cleanupRedisData()</li>
|
||||
* <li>自然过期:Redis TTL 自动清理</li>
|
||||
* <li>新数据覆盖:每次检测都会更新滑动窗口</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
*/
|
||||
private void cleanupAllDetectionState(Long deviceId) {
|
||||
if (deviceId == null) {
|
||||
return;
|
||||
}
|
||||
// 清理本地缓存
|
||||
lastDetectedOrderCache.remove(deviceId);
|
||||
log.info("[BeaconDetection] 已清理设备工单切换检测状态: deviceId={}", deviceId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,11 +3,14 @@ package com.viewsh.module.ops.environment.integration.listener;
|
||||
import com.viewsh.module.ops.api.badge.BadgeDeviceStatusDTO;
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueService;
|
||||
import com.viewsh.module.ops.core.event.OrderStateChangedEvent;
|
||||
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum;
|
||||
import com.viewsh.module.ops.enums.OperatorTypeEnum;
|
||||
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
|
||||
import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusService;
|
||||
import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusServiceImpl.BadgeDeviceOfflineEvent;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.event.EventListener;
|
||||
@@ -78,6 +81,9 @@ public class BadgeDeviceStatusEventListener {
|
||||
@Resource
|
||||
private OrderQueueService orderQueueService;
|
||||
|
||||
@Resource
|
||||
private OrderLifecycleManager orderLifecycleManager;
|
||||
|
||||
/**
|
||||
* 监听工单状态变更事件,同步更新设备工单关联
|
||||
* <p>
|
||||
@@ -135,7 +141,7 @@ public class BadgeDeviceStatusEventListener {
|
||||
* </ul>
|
||||
*/
|
||||
private void handleOrderStatusTransition(Long deviceId, Long orderId, WorkOrderStatusEnum newStatus,
|
||||
OrderStateChangedEvent event, OpsOrderDO order) {
|
||||
OrderStateChangedEvent event, OpsOrderDO order) {
|
||||
switch (newStatus) {
|
||||
case DISPATCHED:
|
||||
handleDispatched(deviceId, orderId, order);
|
||||
@@ -170,6 +176,37 @@ public class BadgeDeviceStatusEventListener {
|
||||
* 处理工单推送状态(首次设置工单关联)
|
||||
*/
|
||||
private void handleDispatched(Long deviceId, Long orderId, OpsOrderDO order) {
|
||||
// 检查并清理旧工单(防止工单切换时状态残留)
|
||||
BadgeDeviceStatusDTO deviceStatus = badgeDeviceStatusService.getBadgeStatus(deviceId);
|
||||
if (deviceStatus != null && deviceStatus.getCurrentOpsOrderId() != null) {
|
||||
Long oldOrderId = deviceStatus.getCurrentOpsOrderId();
|
||||
if (!oldOrderId.equals(orderId)) {
|
||||
log.warn("[BadgeDeviceStatusEventListener] 派发新工单时检测到旧工单残留: " +
|
||||
"deviceId={}, oldOrderId={}, newOrderId={}", deviceId, oldOrderId, orderId);
|
||||
|
||||
// 检查旧工单是否仍在进行中,如果是则先取消
|
||||
OpsOrderDO oldOrder = opsOrderMapper.selectById(oldOrderId);
|
||||
if (oldOrder != null) {
|
||||
WorkOrderStatusEnum oldStatus = WorkOrderStatusEnum.fromStatus(oldOrder.getStatus());
|
||||
if (oldStatus == WorkOrderStatusEnum.DISPATCHED
|
||||
|| oldStatus == WorkOrderStatusEnum.CONFIRMED
|
||||
|| oldStatus == WorkOrderStatusEnum.ARRIVED) {
|
||||
// 旧工单仍在进行,先取消
|
||||
log.warn("[BadgeDeviceStatusEventListener] 取消残留的旧工单: oldOrderId={}", oldOrderId);
|
||||
try {
|
||||
orderLifecycleManager.cancelOrder(oldOrderId, deviceId,
|
||||
OperatorTypeEnum.SYSTEM, "新工单派发,自动取消旧工单");
|
||||
} catch (Exception e) {
|
||||
log.error("[BadgeDeviceStatusEventListener] 取消旧工单失败: oldOrderId={}", oldOrderId, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 确保设备状态清理(无论旧工单是否取消成功)
|
||||
badgeDeviceStatusService.clearCurrentOrder(deviceId);
|
||||
}
|
||||
}
|
||||
|
||||
// 设备状态转为 BUSY
|
||||
badgeDeviceStatusService.updateBadgeStatus(deviceId, BadgeDeviceStatusEnum.BUSY, null, "新工单已推送");
|
||||
|
||||
@@ -271,4 +308,61 @@ public class BadgeDeviceStatusEventListener {
|
||||
orderId, currentOrderId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 监听设备离线事件,自动取消未完成的工单
|
||||
*/
|
||||
@EventListener
|
||||
public void onDeviceOffline(BadgeDeviceOfflineEvent event) {
|
||||
try {
|
||||
Long orderId = event.getOrderId();
|
||||
if (orderId == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("[BadgeDeviceStatusEventListener] 收到设备离线事件: deviceId={}, orderId={}, areaId={}",
|
||||
event.getDeviceId(), orderId, event.getAreaId());
|
||||
|
||||
// 查询工单状态
|
||||
OpsOrderDO order = opsOrderMapper.selectById(orderId);
|
||||
if (order == null) {
|
||||
log.warn("[BadgeDeviceStatusEventListener] 工单不存在,清除设备关联: orderId={}", orderId);
|
||||
badgeDeviceStatusService.clearCurrentOrder(event.getDeviceId());
|
||||
return;
|
||||
}
|
||||
|
||||
WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus());
|
||||
|
||||
// 只有进行中的工单才自动取消
|
||||
if (currentStatus == WorkOrderStatusEnum.DISPATCHED
|
||||
|| currentStatus == WorkOrderStatusEnum.CONFIRMED
|
||||
|| currentStatus == WorkOrderStatusEnum.ARRIVED) {
|
||||
|
||||
log.info("[BadgeDeviceStatusEventListener] 设备离线,自动取消工单: orderId={}, currentStatus={}",
|
||||
orderId, currentStatus);
|
||||
|
||||
// 取消工单(系统操作,operatorId 传 null)
|
||||
orderLifecycleManager.cancelOrder(orderId, null,
|
||||
OperatorTypeEnum.SYSTEM, "设备离线自动取消");
|
||||
|
||||
// 注意:工单取消后,handleCancelled 方法会被调用,会自动清理设备状态
|
||||
} else if (currentStatus == WorkOrderStatusEnum.COMPLETED
|
||||
|| currentStatus == WorkOrderStatusEnum.CANCELLED) {
|
||||
// 终态工单,清理设备状态(如果未清理)
|
||||
BadgeDeviceStatusDTO deviceStatus = badgeDeviceStatusService.getBadgeStatus(event.getDeviceId());
|
||||
if (deviceStatus != null && orderId.equals(deviceStatus.getCurrentOpsOrderId())) {
|
||||
log.info("[BadgeDeviceStatusEventListener] 工单已终态,清除设备关联: orderId={}, currentStatus={}",
|
||||
orderId, currentStatus);
|
||||
badgeDeviceStatusService.clearCurrentOrder(event.getDeviceId());
|
||||
}
|
||||
} else {
|
||||
log.warn("[BadgeDeviceStatusEventListener] 工单状态异常,清除设备关联: orderId={}, currentStatus={}",
|
||||
orderId, currentStatus);
|
||||
badgeDeviceStatusService.clearCurrentOrder(event.getDeviceId());
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[BadgeDeviceStatusEventListener] 处理设备离线事件失败: deviceId={}", event.getDeviceId(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,7 +46,7 @@ public class TtsQueueProcessJob {
|
||||
try {
|
||||
int processedCount = ttsQueueConsumer.processAllQueues();
|
||||
if (processedCount > 0) {
|
||||
log.info("[TTS队列Job] 处理完成: 处理 {} 条消<EFBFBD><EFBFBD>", processedCount);
|
||||
log.info("[TTS队列Job] 处理完成: 处理 {} 条消息", processedCount);
|
||||
}
|
||||
return String.format("TTS队列处理完成: 处理 %d 条消息", processedCount);
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -167,7 +167,7 @@ public interface BadgeDeviceStatusService {
|
||||
void updateOrderStatusAndBeacon(Long deviceId, Long orderId, String orderStatus, String beaconMac);
|
||||
|
||||
/**
|
||||
* 清除当前工单(包括工单ID、工单状态、信标MAC)
|
||||
* 清除当前工单(包括工单ID、工单状态、区域ID、区域名称、信标MAC)
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
*/
|
||||
|
||||
@@ -6,6 +6,7 @@ import com.viewsh.module.ops.service.area.AreaDeviceService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@@ -41,6 +42,9 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
@Resource
|
||||
private AreaDeviceService areaDeviceService;
|
||||
|
||||
@Resource
|
||||
private ApplicationEventPublisher eventPublisher;
|
||||
|
||||
/**
|
||||
* Redis Key 前缀
|
||||
*/
|
||||
@@ -247,6 +251,19 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
// 获取当前状态
|
||||
Map<Object, Object> currentMap = redisTemplate.opsForHash().entries(key);
|
||||
|
||||
// 设备离线时,检查是否有正在执行的工单,发布事件处理
|
||||
if (status == BadgeDeviceStatusEnum.OFFLINE && currentMap.containsKey("currentOpsOrderId")) {
|
||||
Object currentOrderIdObj = currentMap.get("currentOpsOrderId");
|
||||
Object currentAreaIdObj = currentMap.get("currentAreaId");
|
||||
if (currentOrderIdObj != null) {
|
||||
log.warn("[updateBadgeOnlineStatus] 设备离线,有进行中工单,发布取消事件: deviceId={}, orderId={}",
|
||||
deviceId, currentOrderIdObj);
|
||||
// 发布设备离线事件,由监听器处理工单取消
|
||||
eventPublisher.publishEvent(new BadgeDeviceOfflineEvent(deviceId,
|
||||
getLong(currentOrderIdObj), getLong(currentAreaIdObj), reason));
|
||||
}
|
||||
}
|
||||
|
||||
// 核心修复逻辑:如果不为 OFFLINE (即 IDLE/BUSY),且当前有正在进行的工单,则强制保持 BUSY 状态
|
||||
// 防止设备心跳/上线事件将 BUSY 重置为 IDLE,导致调度状态不一致
|
||||
if (status != BadgeDeviceStatusEnum.OFFLINE && currentMap.containsKey("currentOpsOrderId")) {
|
||||
@@ -282,19 +299,17 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
}
|
||||
|
||||
// 更新区域信息
|
||||
// 注意:currentAreaId 应该表示当前工单所属区域,而非设备物理位置
|
||||
// 只有在没有工单时,才同步 IoT 上报的物理位置
|
||||
// 注意:currentAreaId 仅表示当前工单所属区域,不存储设备物理位置
|
||||
// 无工单时不填充 currentAreaId,只在派单时由工单区域设置
|
||||
if (areaId != null) {
|
||||
// 检查是否有正在执行的工单
|
||||
Object currentOrderId = currentMap.get("currentOpsOrderId");
|
||||
if (currentOrderId != null) {
|
||||
// 有工单:保持 currentAreaId 不变(用工单区域),不覆盖
|
||||
// 有工单:保持 currentAreaId 不变(用工单区域),不覆盖 IoT 物理位置
|
||||
log.debug("设备 {} 有执行中工单 {},保持 currentAreaId 不变,忽略 IoT 上报的物理位置 {}",
|
||||
deviceId, currentOrderId, areaId);
|
||||
} else {
|
||||
// 无工单:用 IoT 上报的物理位置
|
||||
statusMap.put("currentAreaId", areaId);
|
||||
}
|
||||
// 无工单:不填充 currentAreaId,保持为空
|
||||
}
|
||||
|
||||
// 保持当前工单相关字段(如果存在)
|
||||
@@ -417,8 +432,8 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
|
||||
try {
|
||||
String key = BADGE_STATUS_KEY_PREFIX + deviceId;
|
||||
// 清除工单相关字段:currentOpsOrderId、currentOrderStatus、beaconMac
|
||||
redisTemplate.opsForHash().delete(key, "currentOpsOrderId", "currentOrderStatus", "beaconMac");
|
||||
// 清除工单相关字段:currentOpsOrderId、currentOrderStatus、currentAreaId、currentAreaName、beaconMac
|
||||
redisTemplate.opsForHash().delete(key, "currentOpsOrderId", "currentOrderStatus", "currentAreaId", "currentAreaName", "beaconMac");
|
||||
log.info("清除工牌设备当前工单: deviceId={}", deviceId);
|
||||
} catch (Exception e) {
|
||||
log.error("清除工牌设备当前工单失败: deviceId={}", deviceId, e);
|
||||
@@ -585,4 +600,45 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 工牌设备离线事件
|
||||
* <p>
|
||||
* 当设备离线且有正在执行的工单时发布,由监听器处理工单自动取消
|
||||
*/
|
||||
public static class BadgeDeviceOfflineEvent {
|
||||
private final Long deviceId;
|
||||
private final Long orderId;
|
||||
private final Long areaId;
|
||||
private final String reason;
|
||||
private final long timestamp;
|
||||
|
||||
public BadgeDeviceOfflineEvent(Long deviceId, Long orderId, Long areaId, String reason) {
|
||||
this.deviceId = deviceId;
|
||||
this.orderId = orderId;
|
||||
this.areaId = areaId;
|
||||
this.reason = reason;
|
||||
this.timestamp = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
public Long getDeviceId() {
|
||||
return deviceId;
|
||||
}
|
||||
|
||||
public Long getOrderId() {
|
||||
return orderId;
|
||||
}
|
||||
|
||||
public Long getAreaId() {
|
||||
return areaId;
|
||||
}
|
||||
|
||||
public String getReason() {
|
||||
return reason;
|
||||
}
|
||||
|
||||
public long getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,6 +45,8 @@ public class TtsQueueConsumer {
|
||||
|
||||
private static final String QUEUE_KEY_PREFIX = "ops:tts:queue:";
|
||||
|
||||
private static final String LOCK_KEY_PREFIX = "ops:tts:lock:";
|
||||
|
||||
@Value("${ops.tts.queue.enabled:true}")
|
||||
private boolean queueEnabled;
|
||||
|
||||
@@ -134,6 +136,11 @@ public class TtsQueueConsumer {
|
||||
/**
|
||||
* 处理单个设备的队列(公开方法,供 Job 调用)
|
||||
*
|
||||
* 保证顺序和间隔:
|
||||
* 1. 使用 Redis 存储上次播报时间(跨线程/跨实例共享)
|
||||
* 2. 使用 watch + 事务保证原子性
|
||||
* 3. 如果间隔不够,消息留在队列,下次再试
|
||||
*
|
||||
* @param queueKey 队列 key
|
||||
* @return 是否处理了消息
|
||||
*/
|
||||
@@ -148,23 +155,61 @@ public class TtsQueueConsumer {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 检查设备播报锁
|
||||
String lastBroadcastKey = LOCK_KEY_PREFIX + deviceId;
|
||||
long now = System.currentTimeMillis();
|
||||
Long lastBroadcastTime = deviceBroadcastLock.get(deviceId);
|
||||
if (lastBroadcastTime != null && now - lastBroadcastTime < broadcastIntervalMs) {
|
||||
// 距离上次播报时间太短,跳过
|
||||
log.debug("[TTS队列] 间隔太近跳过: deviceId={}, 间隔={}ms", deviceId, now - lastBroadcastTime);
|
||||
return false;
|
||||
}
|
||||
|
||||
// 从队列中取出一条消息
|
||||
Object messageObj = redisTemplate.opsForList().leftPop(queueKey);
|
||||
if (messageObj == null) {
|
||||
// 队列为空
|
||||
return false;
|
||||
}
|
||||
// 使用 watch 监听上次播报时间 key
|
||||
redisTemplate.watch(lastBroadcastKey);
|
||||
|
||||
try {
|
||||
// 获取上次播报<E692AD><E68AA5><EFBFBD>间
|
||||
Object lastTimeObj = redisTemplate.opsForValue().get(lastBroadcastKey);
|
||||
long lastBroadcastTime = 0;
|
||||
if (lastTimeObj != null) {
|
||||
try {
|
||||
lastBroadcastTime = Long.parseLong(String.valueOf(lastTimeObj));
|
||||
} catch (NumberFormatException e) {
|
||||
log.warn("[TTS队列] 上次播报时间格式异常: deviceId={}, value={}", deviceId, lastTimeObj);
|
||||
lastBroadcastTime = 0;
|
||||
}
|
||||
}
|
||||
|
||||
// 检查间隔
|
||||
if (lastBroadcastTime > 0 && now - lastBroadcastTime < broadcastIntervalMs) {
|
||||
// 间隔不够,取消 watch,消息留在队列下次再试
|
||||
redisTemplate.unwatch();
|
||||
log.debug("[TTS队列] 间隔不够,跳过: deviceId={}, 间隔={}ms, 需要={}ms",
|
||||
deviceId, now - lastBroadcastTime, broadcastIntervalMs);
|
||||
return false;
|
||||
}
|
||||
|
||||
// 开启 Redis 事务
|
||||
redisTemplate.multi();
|
||||
|
||||
// 1. 取出消息
|
||||
redisTemplate.opsForList().leftPop(queueKey);
|
||||
|
||||
// 2. 更新播报时间
|
||||
redisTemplate.opsForValue().set(lastBroadcastKey, String.valueOf(now),
|
||||
broadcastIntervalMs, TimeUnit.MILLISECONDS);
|
||||
|
||||
// 执行事务
|
||||
List<Object> execResult = redisTemplate.exec();
|
||||
if (execResult == null) {
|
||||
// 事务被取消(其他线程修改了 key),跳过
|
||||
log.debug("[TTS队列] 事务被取消,跳过: deviceId={}", deviceId);
|
||||
return false;
|
||||
}
|
||||
|
||||
// 检查取出的消息
|
||||
Object messageObj = execResult.isEmpty() ? null : execResult.get(0);
|
||||
|
||||
if (messageObj == null) {
|
||||
// 队列为空,回滚时间记录
|
||||
redisTemplate.delete(lastBroadcastKey);
|
||||
return false;
|
||||
}
|
||||
|
||||
// 反序列化消息
|
||||
TtsQueueMessage message = parseMessage(messageObj);
|
||||
if (message == null) {
|
||||
@@ -182,18 +227,22 @@ public class TtsQueueConsumer {
|
||||
boolean success = doBroadcast(message);
|
||||
|
||||
if (success) {
|
||||
// 播报成功,更新锁时间
|
||||
// 同步更新内存锁(用于快速检查,非强制)
|
||||
deviceBroadcastLock.put(deviceId, now);
|
||||
log.info("[TTS队列] 播报成功: deviceId={}, text={}",
|
||||
deviceId, message.getText());
|
||||
return true;
|
||||
} else {
|
||||
// 播报失败,检查是否需要重试
|
||||
// 播报失败,清除时间记录(允许立即重试)
|
||||
redisTemplate.delete(lastBroadcastKey);
|
||||
// 检查是否需要重试
|
||||
handleFailure(message, queueKey);
|
||||
return false;
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
// 异常时取消 watch
|
||||
redisTemplate.unwatch();
|
||||
log.error("[TTS队列] 处理消息失败: deviceId={}", deviceId, e);
|
||||
return false;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user