fix(ops): 修复 TTS 语音播报并发问题,保证顺序和间隔

问题:
- 多线程同时处理同一设备的 TTS 队列,导致多条语音同时下发
- 设备只能播报最后一条,前面的被覆盖

解决方案:
- 使用 Redis watch + multi + exec 事务保证原子性
- 使用 Redis 存储上次播报时间(跨线程/跨实例共享)
- 间隔不够时取消 watch,消息留在队列下次再试
- 播报失败时清除时间记录,允许立即重试

效果:
- 保证播报顺序:先入队的消息先播报(FIFO)
- 保证播报间隔:两条播报之间至少间隔 3 秒
- 并发安全:多线程/多实例环境下不会冲突

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
lzh
2026-02-03 23:59:01 +08:00
parent 5edbc9f287
commit d34f3bd80d

View File

@@ -45,6 +45,8 @@ public class TtsQueueConsumer {
private static final String QUEUE_KEY_PREFIX = "ops:tts:queue:";
private static final String LOCK_KEY_PREFIX = "ops:tts:lock:";
@Value("${ops.tts.queue.enabled:true}")
private boolean queueEnabled;
@@ -134,6 +136,11 @@ public class TtsQueueConsumer {
/**
* 处理单个设备的队列(公开方法,供 Job 调用)
*
* 保证顺序和间隔:
* 1. 使用 Redis 存储上次播报时间(跨线程/跨实例共享)
* 2. 使用 watch + 事务保证原子性
* 3. 如果间隔不够,消息留在队列,下次再试
*
* @param queueKey 队列 key
* @return 是否处理了消息
*/
@@ -148,23 +155,61 @@ public class TtsQueueConsumer {
return false;
}
// 检查设备播报锁
String lastBroadcastKey = LOCK_KEY_PREFIX + deviceId;
long now = System.currentTimeMillis();
Long lastBroadcastTime = deviceBroadcastLock.get(deviceId);
if (lastBroadcastTime != null && now - lastBroadcastTime < broadcastIntervalMs) {
// 距离上次播报时间太短,跳过
log.debug("[TTS队列] 间隔太近跳过: deviceId={}, 间隔={}ms", deviceId, now - lastBroadcastTime);
return false;
}
// 从队列中取出一条消息
Object messageObj = redisTemplate.opsForList().leftPop(queueKey);
if (messageObj == null) {
// 队列为空
return false;
}
// 使用 watch 监听上次播报时间 key
redisTemplate.watch(lastBroadcastKey);
try {
// 获取上次播报<E692AD><E68AA5><EFBFBD>
Object lastTimeObj = redisTemplate.opsForValue().get(lastBroadcastKey);
long lastBroadcastTime = 0;
if (lastTimeObj != null) {
try {
lastBroadcastTime = Long.parseLong(String.valueOf(lastTimeObj));
} catch (NumberFormatException e) {
log.warn("[TTS队列] 上次播报时间格式异常: deviceId={}, value={}", deviceId, lastTimeObj);
lastBroadcastTime = 0;
}
}
// 检查间隔
if (lastBroadcastTime > 0 && now - lastBroadcastTime < broadcastIntervalMs) {
// 间隔不够,取消 watch消息留在队列下次再试
redisTemplate.unwatch();
log.debug("[TTS队列] 间隔不够,跳过: deviceId={}, 间隔={}ms, 需要={}ms",
deviceId, now - lastBroadcastTime, broadcastIntervalMs);
return false;
}
// 开启 Redis 事务
redisTemplate.multi();
// 1. 取出消息
redisTemplate.opsForList().leftPop(queueKey);
// 2. 更新播报时间
redisTemplate.opsForValue().set(lastBroadcastKey, String.valueOf(now),
broadcastIntervalMs, TimeUnit.MILLISECONDS);
// 执行事务
List<Object> execResult = redisTemplate.exec();
if (execResult == null) {
// 事务被取消(其他线程修改了 key跳过
log.debug("[TTS队列] 事务被取消,跳过: deviceId={}", deviceId);
return false;
}
// 检查取出的消息
Object messageObj = execResult.isEmpty() ? null : execResult.get(0);
if (messageObj == null) {
// 队列为空,回滚时间记录
redisTemplate.delete(lastBroadcastKey);
return false;
}
// 反序列化消息
TtsQueueMessage message = parseMessage(messageObj);
if (message == null) {
@@ -182,18 +227,22 @@ public class TtsQueueConsumer {
boolean success = doBroadcast(message);
if (success) {
// 播报成功,更新锁时间
// 同步更新内存锁(用于快速检查,非强制)
deviceBroadcastLock.put(deviceId, now);
log.info("[TTS队列] 播报成功: deviceId={}, text={}",
deviceId, message.getText());
return true;
} else {
// 播报失败,检查是否需要重试
// 播报失败,清除时间记录(允许立即重试
redisTemplate.delete(lastBroadcastKey);
// 检查是否需要重试
handleFailure(message, queueKey);
return false;
}
} catch (Exception e) {
// 异常时取消 watch
redisTemplate.unwatch();
log.error("[TTS队列] 处理消息失败: deviceId={}", deviceId, e);
return false;
}