diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java index c9bc254..07bc5fc 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java @@ -15,7 +15,6 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -137,9 +136,9 @@ public class TtsQueueConsumer { * 处理单个设备的队列(公开方法,供 Job 调用) * * 保证顺序和间隔: - * 1. 使用 Redis 存储上次播报时间(跨线程/跨实例共享) - * 2. 使用 watch + 事务保证原子性 - * 3. 如果间隔不够,消息留在队列,下次再试 + * 1. 使用 Redis SETNX 作为播报间隔锁(原子操作,兼容 Redisson) + * 2. 锁的 TTL 等于播报间隔,自然过期后允许下一次播报 + * 3. 如果获取锁失败,说明间隔不够,消息留在队列下次再试 * * @param queueKey 队列 key * @return 是否处理了消息 @@ -155,58 +154,26 @@ public class TtsQueueConsumer { return false; } - String lastBroadcastKey = LOCK_KEY_PREFIX + deviceId; - long now = System.currentTimeMillis(); - - // 使用 watch 监听上次播报时间 key - redisTemplate.watch(lastBroadcastKey); + String lockKey = LOCK_KEY_PREFIX + deviceId; try { - // 获取上次播报���间 - Object lastTimeObj = redisTemplate.opsForValue().get(lastBroadcastKey); - long lastBroadcastTime = 0; - if (lastTimeObj != null) { - try { - lastBroadcastTime = Long.parseLong(String.valueOf(lastTimeObj)); - } catch (NumberFormatException e) { - log.warn("[TTS队列] 上次播报时间格式异常: deviceId={}, value={}", deviceId, lastTimeObj); - lastBroadcastTime = 0; - } - } + // 尝试获取播报间隔锁(SETNX + TTL,原子操作) + Boolean locked = redisTemplate.opsForValue() + .setIfAbsent(lockKey, String.valueOf(System.currentTimeMillis()), + broadcastIntervalMs, TimeUnit.MILLISECONDS); - // 检查间隔 - if (lastBroadcastTime > 0 && now - lastBroadcastTime < broadcastIntervalMs) { - // 间隔不够,取消 watch,消息留在队列下次再试 - redisTemplate.unwatch(); - log.debug("[TTS队列] 间隔不够,跳过: deviceId={}, 间隔={}ms, 需要={}ms", - deviceId, now - lastBroadcastTime, broadcastIntervalMs); + if (!Boolean.TRUE.equals(locked)) { + // 间隔不够,消息留在队列下次再试 + log.debug("[TTS队列] 间隔不够,跳过: deviceId={}, 需要={}ms", deviceId, broadcastIntervalMs); return false; } - // 开启 Redis 事务 - redisTemplate.multi(); - - // 1. 取出消息 - redisTemplate.opsForList().leftPop(queueKey); - - // 2. 更新播报时间 - redisTemplate.opsForValue().set(lastBroadcastKey, String.valueOf(now), - broadcastIntervalMs, TimeUnit.MILLISECONDS); - - // 执行事务 - List execResult = redisTemplate.exec(); - if (execResult == null) { - // 事务被取消(其他线程修改了 key),跳过 - log.debug("[TTS队列] 事务被取消,跳过: deviceId={}", deviceId); - return false; - } - - // 检查取出的消息 - Object messageObj = execResult.isEmpty() ? null : execResult.get(0); + // 获取锁成功,从队列取出消息 + Object messageObj = redisTemplate.opsForList().leftPop(queueKey); if (messageObj == null) { - // 队列为空,回滚时间记录 - redisTemplate.delete(lastBroadcastKey); + // 队列为空,释放锁(允许其他线程立即使用) + redisTemplate.delete(lockKey); return false; } @@ -220,6 +187,8 @@ public class TtsQueueConsumer { if (message.isExpired()) { log.info("[TTS队列] 消息已过期: deviceId={}, text={}", deviceId, message.getText()); + // 释放锁,继续尝试下一条消息 + redisTemplate.delete(lockKey); return false; } @@ -228,21 +197,19 @@ public class TtsQueueConsumer { if (success) { // 同步更新内存锁(用于快速检查,非强制) - deviceBroadcastLock.put(deviceId, now); + deviceBroadcastLock.put(deviceId, System.currentTimeMillis()); log.info("[TTS队列] 播报成功: deviceId={}, text={}", deviceId, message.getText()); return true; } else { - // 播报失败,清除时间记录(允许立即重试) - redisTemplate.delete(lastBroadcastKey); + // 播报失败,释放锁(允许立即重试) + redisTemplate.delete(lockKey); // 检查是否需要重试 handleFailure(message, queueKey); return false; } } catch (Exception e) { - // 异常时取消 watch - redisTemplate.unwatch(); log.error("[TTS队列] 处理消息失败: deviceId={}", deviceId, e); return false; } diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/TrafficStatisticsPersistJob.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/TrafficStatisticsPersistJob.java index e522f18..c262c0b 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/TrafficStatisticsPersistJob.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/TrafficStatisticsPersistJob.java @@ -1,5 +1,6 @@ package com.viewsh.module.ops.service.job; +import com.viewsh.framework.tenant.core.util.TenantUtils; import com.viewsh.module.ops.dal.dataobject.statistics.OpsTrafficStatisticsDO; import com.viewsh.module.ops.dal.mysql.statistics.OpsTrafficStatisticsMapper; import com.viewsh.module.ops.service.area.OpsBusAreaService; @@ -243,7 +244,8 @@ public class TrafficStatisticsPersistJob { private Long getAreaIdForDevice(Long deviceId) { try { // 通过 OpsBusAreaService 查询设备关联的区域 - return areaService.getAreaIdByDeviceId(deviceId); + // 使用 executeIgnore 忽略租户过滤,因为 xxl-job 线程无租户上下文 + return TenantUtils.executeIgnore(() -> areaService.getAreaIdByDeviceId(deviceId)); } catch (Exception e) { log.error("[TrafficStatisticsPersistJob] 查询设备区域失败:deviceId={}", deviceId, e); return null;