From d34f3bd80d7e71c2253751ff402c0ae094dbe139 Mon Sep 17 00:00:00 2001 From: lzh Date: Tue, 3 Feb 2026 23:59:01 +0800 Subject: [PATCH] =?UTF-8?q?fix(ops):=20=E4=BF=AE=E5=A4=8D=20TTS=20?= =?UTF-8?q?=E8=AF=AD=E9=9F=B3=E6=92=AD=E6=8A=A5=E5=B9=B6=E5=8F=91=E9=97=AE?= =?UTF-8?q?=E9=A2=98=EF=BC=8C=E4=BF=9D=E8=AF=81=E9=A1=BA=E5=BA=8F=E5=92=8C?= =?UTF-8?q?=E9=97=B4=E9=9A=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题: - 多线程同时处理同一设备的 TTS 队列,导致多条语音同时下发 - 设备只能播报最后一条,前面的被覆盖 解决方案: - 使用 Redis watch + multi + exec 事务保证原子性 - 使用 Redis 存储上次播报时间(跨线程/跨实例共享) - 间隔不够时取消 watch,消息留在队列下次再试 - 播报失败时清除时间记录,允许立即重试 效果: - 保证播报顺序:先入队的消息先播报(FIFO) - 保证播报间隔:两条播报之间至少间隔 3 秒 - 并发安全:多线程/多实例环境下不会冲突 Co-Authored-By: Claude Opus 4.5 --- .../service/voice/TtsQueueConsumer.java | 79 +++++++++++++++---- 1 file changed, 64 insertions(+), 15 deletions(-) diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java index 80cd9f9..c9bc254 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java @@ -45,6 +45,8 @@ public class TtsQueueConsumer { private static final String QUEUE_KEY_PREFIX = "ops:tts:queue:"; + private static final String LOCK_KEY_PREFIX = "ops:tts:lock:"; + @Value("${ops.tts.queue.enabled:true}") private boolean queueEnabled; @@ -134,6 +136,11 @@ public class TtsQueueConsumer { /** * 处理单个设备的队列(公开方法,供 Job 调用) * + * 保证顺序和间隔: + * 1. 使用 Redis 存储上次播报时间(跨线程/跨实例共享) + * 2. 使用 watch + 事务保证原子性 + * 3. 如果间隔不够,消息留在队列,下次再试 + * * @param queueKey 队列 key * @return 是否处理了消息 */ @@ -148,23 +155,61 @@ public class TtsQueueConsumer { return false; } - // 检查设备播报锁 + String lastBroadcastKey = LOCK_KEY_PREFIX + deviceId; long now = System.currentTimeMillis(); - Long lastBroadcastTime = deviceBroadcastLock.get(deviceId); - if (lastBroadcastTime != null && now - lastBroadcastTime < broadcastIntervalMs) { - // 距离上次播报时间太短,跳过 - log.debug("[TTS队列] 间隔太近跳过: deviceId={}, 间隔={}ms", deviceId, now - lastBroadcastTime); - return false; - } - // 从队列中取出一条消息 - Object messageObj = redisTemplate.opsForList().leftPop(queueKey); - if (messageObj == null) { - // 队列为空 - return false; - } + // 使用 watch 监听上次播报时间 key + redisTemplate.watch(lastBroadcastKey); try { + // 获取上次播报���间 + Object lastTimeObj = redisTemplate.opsForValue().get(lastBroadcastKey); + long lastBroadcastTime = 0; + if (lastTimeObj != null) { + try { + lastBroadcastTime = Long.parseLong(String.valueOf(lastTimeObj)); + } catch (NumberFormatException e) { + log.warn("[TTS队列] 上次播报时间格式异常: deviceId={}, value={}", deviceId, lastTimeObj); + lastBroadcastTime = 0; + } + } + + // 检查间隔 + if (lastBroadcastTime > 0 && now - lastBroadcastTime < broadcastIntervalMs) { + // 间隔不够,取消 watch,消息留在队列下次再试 + redisTemplate.unwatch(); + log.debug("[TTS队列] 间隔不够,跳过: deviceId={}, 间隔={}ms, 需要={}ms", + deviceId, now - lastBroadcastTime, broadcastIntervalMs); + return false; + } + + // 开启 Redis 事务 + redisTemplate.multi(); + + // 1. 取出消息 + redisTemplate.opsForList().leftPop(queueKey); + + // 2. 更新播报时间 + redisTemplate.opsForValue().set(lastBroadcastKey, String.valueOf(now), + broadcastIntervalMs, TimeUnit.MILLISECONDS); + + // 执行事务 + List execResult = redisTemplate.exec(); + if (execResult == null) { + // 事务被取消(其他线程修改了 key),跳过 + log.debug("[TTS队列] 事务被取消,跳过: deviceId={}", deviceId); + return false; + } + + // 检查取出的消息 + Object messageObj = execResult.isEmpty() ? null : execResult.get(0); + + if (messageObj == null) { + // 队列为空,回滚时间记录 + redisTemplate.delete(lastBroadcastKey); + return false; + } + // 反序列化消息 TtsQueueMessage message = parseMessage(messageObj); if (message == null) { @@ -182,18 +227,22 @@ public class TtsQueueConsumer { boolean success = doBroadcast(message); if (success) { - // 播报成功,更新锁时间 + // 同步更新内存锁(用于快速检查,非强制) deviceBroadcastLock.put(deviceId, now); log.info("[TTS队列] 播报成功: deviceId={}, text={}", deviceId, message.getText()); return true; } else { - // 播报失败,检查是否需要重试 + // 播报失败,清除时间记录(允许立即重试) + redisTemplate.delete(lastBroadcastKey); + // 检查是否需要重试 handleFailure(message, queueKey); return false; } } catch (Exception e) { + // 异常时取消 watch + redisTemplate.unwatch(); log.error("[TTS队列] 处理消息失败: deviceId={}", deviceId, e); return false; }