fix(ops): 修复 TTS 队列 Redisson 事务不兼容和客流统计租户上下文缺失
Some checks failed
Java CI with Maven / build (11) (push) Has been cancelled
Java CI with Maven / build (17) (push) Has been cancelled
Java CI with Maven / build (8) (push) Has been cancelled

1. TtsQueueConsumer: 用 SETNX+TTL 替代 watch/multi/exec 事务模式,
   解决 Redisson 不支持 WATCH 导致 exec() 抛出异常的问题
2. TrafficStatisticsPersistJob: 用 TenantUtils.executeIgnore() 包裹
   设备区域查询,解决 xxl-job 线程无租户上下文导致查询失败的问题

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
lzh
2026-02-04 10:13:41 +08:00
parent a18d1a7d8d
commit 117ad2c405
2 changed files with 23 additions and 54 deletions

View File

@@ -15,7 +15,6 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -137,9 +136,9 @@ public class TtsQueueConsumer {
* 处理单个设备的队列(公开方法,供 Job 调用)
*
* 保证顺序和间隔:
* 1. 使用 Redis 存储上次播报时间(跨线程/跨实例共享
* 2. 使用 watch + 事务保证原子性
* 3. 如果间隔不够,消息留在队列下次再试
* 1. 使用 Redis SETNX 作为播报间隔锁(原子操作,兼容 Redisson
* 2. 锁的 TTL 等于播报间隔,自然过期后允许下一次播报
* 3. 如果获取锁失败,说明间隔不够,消息留在队列下次再试
*
* @param queueKey 队列 key
* @return 是否处理了消息
@@ -155,58 +154,26 @@ public class TtsQueueConsumer {
return false;
}
String lastBroadcastKey = LOCK_KEY_PREFIX + deviceId;
long now = System.currentTimeMillis();
// 使用 watch 监听上次播报时间 key
redisTemplate.watch(lastBroadcastKey);
String lockKey = LOCK_KEY_PREFIX + deviceId;
try {
// 获取上次播报<EFBFBD><EFBFBD><EFBFBD>
Object lastTimeObj = redisTemplate.opsForValue().get(lastBroadcastKey);
long lastBroadcastTime = 0;
if (lastTimeObj != null) {
try {
lastBroadcastTime = Long.parseLong(String.valueOf(lastTimeObj));
} catch (NumberFormatException e) {
log.warn("[TTS队列] 上次播报时间格式异常: deviceId={}, value={}", deviceId, lastTimeObj);
lastBroadcastTime = 0;
}
}
// 尝试获取播报间隔锁SETNX + TTL原子操作
Boolean locked = redisTemplate.opsForValue()
.setIfAbsent(lockKey, String.valueOf(System.currentTimeMillis()),
broadcastIntervalMs, TimeUnit.MILLISECONDS);
// 检查间隔
if (lastBroadcastTime > 0 && now - lastBroadcastTime < broadcastIntervalMs) {
// 间隔不够,取消 watch消息留在队列下次再试
redisTemplate.unwatch();
log.debug("[TTS队列] 间隔不够,跳过: deviceId={}, 间隔={}ms, 需要={}ms",
deviceId, now - lastBroadcastTime, broadcastIntervalMs);
if (!Boolean.TRUE.equals(locked)) {
// 间隔不够,消息留在队列下次再试
log.debug("[TTS队列] 间隔不够,跳过: deviceId={}, 需要={}ms", deviceId, broadcastIntervalMs);
return false;
}
// 开启 Redis 事务
redisTemplate.multi();
// 1. 取出消息
redisTemplate.opsForList().leftPop(queueKey);
// 2. 更新播报时间
redisTemplate.opsForValue().set(lastBroadcastKey, String.valueOf(now),
broadcastIntervalMs, TimeUnit.MILLISECONDS);
// 执行事务
List<Object> execResult = redisTemplate.exec();
if (execResult == null) {
// 事务被取消(其他线程修改了 key跳过
log.debug("[TTS队列] 事务被取消,跳过: deviceId={}", deviceId);
return false;
}
// 检查取出的消息
Object messageObj = execResult.isEmpty() ? null : execResult.get(0);
// 获取锁成功,从队列取出消息
Object messageObj = redisTemplate.opsForList().leftPop(queueKey);
if (messageObj == null) {
// 队列为空,回滚时间记录
redisTemplate.delete(lastBroadcastKey);
// 队列为空,释放锁(允许其他线程立即使用)
redisTemplate.delete(lockKey);
return false;
}
@@ -220,6 +187,8 @@ public class TtsQueueConsumer {
if (message.isExpired()) {
log.info("[TTS队列] 消息已过期: deviceId={}, text={}",
deviceId, message.getText());
// 释放锁,继续尝试下一条消息
redisTemplate.delete(lockKey);
return false;
}
@@ -228,21 +197,19 @@ public class TtsQueueConsumer {
if (success) {
// 同步更新内存锁(用于快速检查,非强制)
deviceBroadcastLock.put(deviceId, now);
deviceBroadcastLock.put(deviceId, System.currentTimeMillis());
log.info("[TTS队列] 播报成功: deviceId={}, text={}",
deviceId, message.getText());
return true;
} else {
// 播报失败,清除时间记录(允许立即重试)
redisTemplate.delete(lastBroadcastKey);
// 播报失败,释放锁(允许立即重试)
redisTemplate.delete(lockKey);
// 检查是否需要重试
handleFailure(message, queueKey);
return false;
}
} catch (Exception e) {
// 异常时取消 watch
redisTemplate.unwatch();
log.error("[TTS队列] 处理消息失败: deviceId={}", deviceId, e);
return false;
}

View File

@@ -1,5 +1,6 @@
package com.viewsh.module.ops.service.job;
import com.viewsh.framework.tenant.core.util.TenantUtils;
import com.viewsh.module.ops.dal.dataobject.statistics.OpsTrafficStatisticsDO;
import com.viewsh.module.ops.dal.mysql.statistics.OpsTrafficStatisticsMapper;
import com.viewsh.module.ops.service.area.OpsBusAreaService;
@@ -243,7 +244,8 @@ public class TrafficStatisticsPersistJob {
private Long getAreaIdForDevice(Long deviceId) {
try {
// 通过 OpsBusAreaService 查询设备关联的区域
return areaService.getAreaIdByDeviceId(deviceId);
// 使用 executeIgnore 忽略租户过滤,因为 xxl-job 线程无租户上下文
return TenantUtils.executeIgnore(() -> areaService.getAreaIdByDeviceId(deviceId));
} catch (Exception e) {
log.error("[TrafficStatisticsPersistJob] 查询设备区域失败deviceId={}", deviceId, e);
return null;