diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java index 80cd9f9..c9bc254 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java @@ -45,6 +45,8 @@ public class TtsQueueConsumer { private static final String QUEUE_KEY_PREFIX = "ops:tts:queue:"; + private static final String LOCK_KEY_PREFIX = "ops:tts:lock:"; + @Value("${ops.tts.queue.enabled:true}") private boolean queueEnabled; @@ -134,6 +136,11 @@ public class TtsQueueConsumer { /** * 处理单个设备的队列(公开方法,供 Job 调用) * + * 保证顺序和间隔: + * 1. 使用 Redis 存储上次播报时间(跨线程/跨实例共享) + * 2. 使用 watch + 事务保证原子性 + * 3. 如果间隔不够,消息留在队列,下次再试 + * * @param queueKey 队列 key * @return 是否处理了消息 */ @@ -148,23 +155,61 @@ public class TtsQueueConsumer { return false; } - // 检查设备播报锁 + String lastBroadcastKey = LOCK_KEY_PREFIX + deviceId; long now = System.currentTimeMillis(); - Long lastBroadcastTime = deviceBroadcastLock.get(deviceId); - if (lastBroadcastTime != null && now - lastBroadcastTime < broadcastIntervalMs) { - // 距离上次播报时间太短,跳过 - log.debug("[TTS队列] 间隔太近跳过: deviceId={}, 间隔={}ms", deviceId, now - lastBroadcastTime); - return false; - } - // 从队列中取出一条消息 - Object messageObj = redisTemplate.opsForList().leftPop(queueKey); - if (messageObj == null) { - // 队列为空 - return false; - } + // 使用 watch 监听上次播报时间 key + redisTemplate.watch(lastBroadcastKey); try { + // 获取上次播报���间 + Object lastTimeObj = redisTemplate.opsForValue().get(lastBroadcastKey); + long lastBroadcastTime = 0; + if (lastTimeObj != null) { + try { + lastBroadcastTime = Long.parseLong(String.valueOf(lastTimeObj)); + } catch (NumberFormatException e) { + log.warn("[TTS队列] 上次播报时间格式异常: deviceId={}, value={}", deviceId, lastTimeObj); + lastBroadcastTime = 0; + } + } + + // 检查间隔 + if (lastBroadcastTime > 0 && now - lastBroadcastTime < broadcastIntervalMs) { + // 间隔不够,取消 watch,消息留在队列下次再试 + redisTemplate.unwatch(); + log.debug("[TTS队列] 间隔不够,跳过: deviceId={}, 间隔={}ms, 需要={}ms", + deviceId, now - lastBroadcastTime, broadcastIntervalMs); + return false; + } + + // 开启 Redis 事务 + redisTemplate.multi(); + + // 1. 取出消息 + redisTemplate.opsForList().leftPop(queueKey); + + // 2. 更新播报时间 + redisTemplate.opsForValue().set(lastBroadcastKey, String.valueOf(now), + broadcastIntervalMs, TimeUnit.MILLISECONDS); + + // 执行事务 + List execResult = redisTemplate.exec(); + if (execResult == null) { + // 事务被取消(其他线程修改了 key),跳过 + log.debug("[TTS队列] 事务被取消,跳过: deviceId={}", deviceId); + return false; + } + + // 检查取出的消息 + Object messageObj = execResult.isEmpty() ? null : execResult.get(0); + + if (messageObj == null) { + // 队列为空,回滚时间记录 + redisTemplate.delete(lastBroadcastKey); + return false; + } + // 反序列化消息 TtsQueueMessage message = parseMessage(messageObj); if (message == null) { @@ -182,18 +227,22 @@ public class TtsQueueConsumer { boolean success = doBroadcast(message); if (success) { - // 播报成功,更新锁时间 + // 同步更新内存锁(用于快速检查,非强制) deviceBroadcastLock.put(deviceId, now); log.info("[TTS队列] 播报成功: deviceId={}, text={}", deviceId, message.getText()); return true; } else { - // 播报失败,检查是否需要重试 + // 播报失败,清除时间记录(允许立即重试) + redisTemplate.delete(lastBroadcastKey); + // 检查是否需要重试 handleFailure(message, queueKey); return false; } } catch (Exception e) { + // 异常时取消 watch + redisTemplate.unwatch(); log.error("[TTS队列] 处理消息失败: deviceId={}", deviceId, e); return false; }