From 4f737a5dd132b900a534a1c31aac4be441a73d29 Mon Sep 17 00:00:00 2001 From: lzh Date: Sun, 8 Feb 2026 00:19:24 +0800 Subject: [PATCH] =?UTF-8?q?refactor(ops):=20=E9=87=8D=E6=9E=84=20TTS=20?= =?UTF-8?q?=E8=AF=AD=E9=9F=B3=E6=92=AD=E6=8A=A5=E9=98=9F=E5=88=97=EF=BC=8C?= =?UTF-8?q?=E8=A7=A3=E8=80=A6=20ttsFlag=20=E4=B8=8E=E9=98=9F=E5=88=97?= =?UTF-8?q?=E4=BC=98=E5=85=88=E7=BA=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 移除 Thread.sleep(5000) 阻塞,改由 TTS 队列按设备维度控制播报顺序和间隔: - 解耦 ttsFlag(硬件行为 0x09)与 priority(队列位置),全部使用 0x09 发送 - TtsQueueMessage 新增 inOrder/urgent 工厂方法,VoiceBroadcastService 精简为 broadcastInOrder(FIFO rightPush)和 broadcastUrgent(leftPush 插队)两个入口 - 同设备播报间隔 3s → 6s,消息过期时间 30s → 60s - 修复原 leftPush+leftPop LIFO 导致连续入队顺序反转的问题 Co-Authored-By: Claude Opus 4.6 --- .../consumer/CleanOrderAuditEventHandler.java | 2 +- .../listener/CleanOrderEventListener.java | 142 ++++++++-------- .../service/voice/TtsQueueConsumer.java | 2 +- .../service/voice/TtsQueueMessage.java | 52 +++--- .../service/voice/VoiceBroadcastService.java | 155 +++++------------- .../cleanorder/CleanOrderEndToEndTest.java | 2 +- .../src/main/resources/application.yaml | 2 +- 7 files changed, 144 insertions(+), 213 deletions(-) diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderAuditEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderAuditEventHandler.java index 0836cef..229e15c 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderAuditEventHandler.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderAuditEventHandler.java @@ -199,7 +199,7 @@ public class CleanOrderAuditEventHandler implements RocketMQListener { */ private void sendTts(Long deviceId, String text, Long orderId) { try { - voiceBroadcastService.broadcast(deviceId, text, orderId); + voiceBroadcastService.broadcastInOrder(deviceId, text, orderId); log.info("[CleanOrderAuditEventHandler] TTS 下发成功: deviceId={}, text={}", deviceId, text); } catch (Exception e) { log.error("[CleanOrderAuditEventHandler] TTS 下发异常: deviceId={}", deviceId, e); diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/CleanOrderEventListener.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/CleanOrderEventListener.java index 89f55f3..6b24c5f 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/CleanOrderEventListener.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/CleanOrderEventListener.java @@ -1,7 +1,5 @@ package com.viewsh.module.ops.environment.integration.listener; -import com.viewsh.module.iot.api.device.IotDeviceControlApi; -import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO; import com.viewsh.module.ops.api.queue.OrderQueueService; import com.viewsh.module.ops.core.dispatch.DispatchEngine; import com.viewsh.module.ops.core.dispatch.model.DispatchResult; @@ -15,6 +13,7 @@ import com.viewsh.module.ops.enums.PriorityEnum; import com.viewsh.module.ops.environment.constants.CleanNotificationConstants; import com.viewsh.module.ops.environment.dal.dataobject.workorder.OpsOrderCleanExtDO; import com.viewsh.module.ops.environment.dal.mysql.workorder.OpsOrderCleanExtMapper; +import com.viewsh.module.ops.environment.dal.redis.TrafficActiveOrderRedisDAO; import com.viewsh.module.ops.environment.service.cleanorder.CleanOrderService; import com.viewsh.module.ops.environment.service.voice.VoiceBroadcastService; import com.viewsh.module.ops.infrastructure.log.enumeration.EventDomain; @@ -78,15 +77,15 @@ public class CleanOrderEventListener { @Resource private NotifyMessageSendApi notifyMessageSendApi; - @Resource - private IotDeviceControlApi iotDeviceControlApi; - @Resource private EventLogRecorder eventLogRecorder; @Resource private OrderQueueService orderQueueService; + @Resource + private TrafficActiveOrderRedisDAO trafficActiveOrderRedisDAO; + // ==================== 工单创建事件 ==================== /** @@ -108,10 +107,7 @@ public class CleanOrderEventListener { // 异步触发调度 asyncDispatchAfterCreated(event); - // 如果是客流触发的工单,重置客流计数器 - if ("IOT_TRAFFIC".equals(event.getPayload().get("triggerSource"))) { - asyncResetTrafficCounter(event); - } + // 注意:客流计数器重置已移至 CleanOrderCreateEventHandler,在消息消费时统一处理 } /** @@ -152,50 +148,6 @@ public class CleanOrderEventListener { } } - /** - * 异步重置客流计数器 - *

- * 在工单创建成功后重置阈值计数器,确保: - * 1. 工单创建和计数器重置在同一事务语义下(AFTER_COMMIT) - * 2. 如果工单创建失败,计数器不会被错误重置 - * 3. 由 Ops 模块(业务方)决定重置时机,职责清晰 - */ - @Async("ops-task-executor") - public void asyncResetTrafficCounter(OrderCreatedEvent event) { - try { - Long deviceId = (Long) event.getPayload().get("triggerDeviceId"); - - if (deviceId == null) { - log.warn("[CleanOrderEventListener] 缺少设备ID,跳过重置: orderId={}", - event.getOrderId()); - return; - } - - // 构建重置请求 - ResetTrafficCounterReqDTO reqDTO = ResetTrafficCounterReqDTO.builder() - .deviceId(deviceId) - .orderId(event.getOrderId()) - .remark("工单创建成功后重置阈值计数器") - .build(); - - // 调用 IoT 模块 RPC 接口 - var result = iotDeviceControlApi.resetTrafficCounter(reqDTO); - - if (result.getData() != null && result.getData()) { - log.info("[CleanOrderEventListener] 阈值计数器重置成功: orderId={}, deviceId={}", - event.getOrderId(), deviceId); - } else { - log.error("[CleanOrderEventListener] 阈值计数器重置失败: orderId={}, deviceId={}", - event.getOrderId(), deviceId); - // TODO: 发送告警,需要人工介入检查 - } - - } catch (Exception e) { - log.error("[CleanOrderEventListener] 阈值计数器重置异常: orderId={}", event.getOrderId(), e); - // TODO: 发送告警 - } - } - // ==================== 状态变更事件 ==================== /** @@ -216,25 +168,31 @@ public class CleanOrderEventListener { switch (event.getNewStatus()) { case DISPATCHED: handleDispatched(event); + updateTrafficActiveOrderStatus(event); break; case CONFIRMED: handleConfirmed(event); + updateTrafficActiveOrderStatus(event); break; case ARRIVED: handleArrived(event); + updateTrafficActiveOrderStatus(event); break; case PAUSED: handlePaused(event); break; case COMPLETED: handleCompleted(event); + clearTrafficActiveOrder(event); break; case CANCELLED: // 设备状态由 BadgeDeviceStatusEventListener 统一处理 log.debug("[CleanOrderEventListener] CANCELLED 状态已处理: orderId={}", event.getOrderId()); + clearTrafficActiveOrder(event); break; case QUEUED: handleQueued(event); + updateTrafficActiveOrderStatus(event); break; default: break; @@ -437,7 +395,7 @@ public class CleanOrderEventListener { Long deviceId = event.getAssigneeId(); if (deviceId != null) { - // 异步执行:先发送完成通知,等待5秒后再派发下一个任务 + // 异步执行:发送完成通知,然后派发下一个任务(TTS队列控制播报间隔) asyncCompleteAndDispatchNext(event.getOrderId(), deviceId); } } @@ -445,23 +403,16 @@ public class CleanOrderEventListener { /** * 异步执行工单完成后的通知和派单 *

- * 保证顺序:先播报"工单已完成",间隔5秒后再派发下一个任务(触发"新工单来啦") - * 避免两条语音播报顺序混乱或被覆盖 + * 先发送完成通知,再立即触发下一个任务派发。 + * 语音播报顺序和间隔由 TTS 队列({@link com.viewsh.module.ops.environment.service.voice.TtsQueueConsumer}) + * 按设备维度控制,同一设备前后播报间隔由 ops.tts.queue.interval-ms 配置。 */ @Async("ops-task-executor") public void asyncCompleteAndDispatchNext(Long orderId, Long deviceId) { - // 1. 先发送完成通知 + // 1. 发送完成通知(入TTS队列) sendOrderCompletedNotification(orderId, deviceId); - // 2. 等待5秒,确保完成语音播报完毕后再触发下一个任务的通知 - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.warn("[CleanOrderEventListener] 等待被中断: orderId={}", orderId); - } - - // 3. 自动推送下一个任务 + // 2. 自动推送下一个任务(新任务通知也入TTS队列,由队列控制播报间隔) cleanOrderService.autoDispatchNextOrder(orderId, deviceId); } @@ -552,9 +503,9 @@ public class CleanOrderEventListener { try { log.warn("[P0紧急通知] deviceId={}, orderCode={}", deviceId, orderCode); - // 1. 语音播报(使用统一模板构建器) + // 1. 语音播报(P0紧急,插队到队列头部) String voiceMessage = CleanNotificationConstants.VoiceBuilder.buildPriorityUpgrade(orderCode); - playVoice(deviceId, voiceMessage, orderId); + playVoiceUrgent(deviceId, voiceMessage, orderId); // 2. 发送站内信 sendNotifyMessage(1L, @@ -637,11 +588,14 @@ public class CleanOrderEventListener { } /** - * 语音播报(带工单ID) + * 语音播报(带工单ID,按序入队 FIFO) + *

+ * 大多数业务通知使用此方法,保证同一设备上的播报按入队顺序播放。 + * 仅 P0 紧急插队场景使用 {@link #playVoiceUrgent}。 */ private void playVoice(Long deviceId, String message, Long orderId) { try { - voiceBroadcastService.broadcast(deviceId, message, orderId); + voiceBroadcastService.broadcastInOrder(deviceId, message, orderId); log.debug("[语音播报] 调用成功: deviceId={}, message={}", deviceId, message); } catch (Exception e) { @@ -649,6 +603,21 @@ public class CleanOrderEventListener { } } + /** + * 紧急语音播报(插队到队列头部) + *

+ * 仅用于 P0 紧急任务打断等需要立即播报的场景 + */ + private void playVoiceUrgent(Long deviceId, String message, Long orderId) { + try { + voiceBroadcastService.broadcastUrgent(deviceId, message, orderId); + log.debug("[语音播报-紧急] 调用成功: deviceId={}, message={}", deviceId, message); + + } catch (Exception e) { + log.error("[语音播报-紧急] 调用失败: deviceId={}, message={}", deviceId, message, e); + } + } + // ==================== 站内信发送方法 ==================== /** @@ -679,6 +648,39 @@ public class CleanOrderEventListener { return "某区域"; } + /** + * 工单状态变更时,更新 Redis 中的活跃工单状态标记 + *

+ * 仅处理客流触发的工单 + */ + private void updateTrafficActiveOrderStatus(OrderStateChangedEvent event) { + try { + OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId()); + if (order != null && "IOT_TRAFFIC".equals(order.getTriggerSource()) && order.getAreaId() != null) { + trafficActiveOrderRedisDAO.updateStatus(order.getAreaId(), event.getNewStatus().getStatus()); + } + } catch (Exception e) { + log.warn("[CleanOrderEventListener] 更新客流活跃工单状态失败: orderId={}", event.getOrderId(), e); + } + } + + /** + * 工单终态时,清除 Redis 中的活跃工单标记 + *

+ * 仅处理客流触发的工单。清除后下次客流达标将创建新工单(新周期)。 + */ + private void clearTrafficActiveOrder(OrderStateChangedEvent event) { + try { + OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId()); + if (order != null && "IOT_TRAFFIC".equals(order.getTriggerSource()) && order.getAreaId() != null) { + trafficActiveOrderRedisDAO.removeActive(order.getAreaId()); + log.info("[CleanOrderEventListener] 客流工单周期结束,已清除区域{}活跃标记", order.getAreaId()); + } + } catch (Exception e) { + log.warn("[CleanOrderEventListener] 清除客流活跃工单标记失败: orderId={}", event.getOrderId(), e); + } + } + /** * 记录暂停开始时间 */ diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java index 07bc5fc..e418a46 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java @@ -49,7 +49,7 @@ public class TtsQueueConsumer { @Value("${ops.tts.queue.enabled:true}") private boolean queueEnabled; - @Value("${ops.tts.queue.interval-ms:3000}") + @Value("${ops.tts.queue.interval-ms:6000}") private long broadcastIntervalMs; @Value("${ops.tts.queue.max-queue-size:50}") diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueMessage.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueMessage.java index cb142ec..7cdad09 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueMessage.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueMessage.java @@ -37,6 +37,16 @@ public class TtsQueueMessage implements Serializable { */ public static final int TTS_FLAG_URGENT = 0x09; + /** + * 队列优先级:紧急(leftPush 插队到头部) + */ + public static final int PRIORITY_URGENT = 1; + + /** + * 队列优先级:普通(rightPush 按序追加到尾部) + */ + public static final int PRIORITY_NORMAL = 5; + /** * 设备ID */ @@ -79,14 +89,18 @@ public class TtsQueueMessage implements Serializable { private Integer maxRetry; /** - * 创建普通消息 + * 创建按序消息(FIFO,rightPush 追加到尾部) + *

+ * ttsFlag=0x09(紧急通知,带显示),priority=5(普通优先级) + * 适用于大多数业务通知,保证同一设备播报按入队顺序播放 */ - public static TtsQueueMessage normal(Long deviceId, String text) { + public static TtsQueueMessage inOrder(Long deviceId, String text, Long orderId) { return TtsQueueMessage.builder() .deviceId(deviceId) .text(text) - .ttsFlag(TTS_FLAG_NORMAL) - .priority(5) + .ttsFlag(TTS_FLAG_URGENT) + .orderId(orderId) + .priority(PRIORITY_NORMAL) .createTime(System.currentTimeMillis()) .retryCount(0) .maxRetry(2) @@ -94,36 +108,24 @@ public class TtsQueueMessage implements Serializable { } /** - * 创建紧急消息 + * 创建紧急消息(leftPush 插队到队列头部) + *

+ * ttsFlag=0x09(紧急通知,带显示),priority=1(紧急优先级) + * 仅用于 P0 紧急任务打断等需要立即播报的场景 */ - public static TtsQueueMessage urgent(Long deviceId, String text) { + public static TtsQueueMessage urgent(Long deviceId, String text, Long orderId) { return TtsQueueMessage.builder() .deviceId(deviceId) .text(text) .ttsFlag(TTS_FLAG_URGENT) - .priority(1) + .orderId(orderId) + .priority(PRIORITY_URGENT) .createTime(System.currentTimeMillis()) .retryCount(0) .maxRetry(3) .build(); } - /** - * 创建带工单ID的消息 - */ - public static TtsQueueMessage withOrder(Long deviceId, String text, int ttsFlag, Long orderId) { - return TtsQueueMessage.builder() - .deviceId(deviceId) - .text(text) - .ttsFlag(ttsFlag) - .orderId(orderId) - .priority(ttsFlag == TTS_FLAG_URGENT ? 1 : 5) - .createTime(System.currentTimeMillis()) - .retryCount(0) - .maxRetry(ttsFlag == TTS_FLAG_URGENT ? 3 : 2) - .build(); - } - /** * 增加重试次数 */ @@ -139,12 +141,12 @@ public class TtsQueueMessage implements Serializable { } /** - * 检查消息是否过期(超过30秒认为过期) + * 检查消息是否过期(超过60秒认为过期) */ public boolean isExpired() { if (createTime == null) { return false; } - return System.currentTimeMillis() - createTime > 30_000; + return System.currentTimeMillis() - createTime > 60_000; } } diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/VoiceBroadcastService.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/VoiceBroadcastService.java index b93eb1b..4b3e9bd 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/VoiceBroadcastService.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/VoiceBroadcastService.java @@ -10,7 +10,6 @@ import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; /** @@ -18,20 +17,13 @@ import org.springframework.stereotype.Service; *

* 职责: * 1. 统一所有 TTS 下发入口 - * 2. 提供同步/异步播报接口 - * 3. 通过队列控制播报间隔,防止连续播报被覆盖 - * 4. 记录播报日志 - *

- * 设计原则: - * - 接受 deviceId 参数(而非 cleanerId) - * - 默认使用队列模式,确保播报不丢失 - * - 支持直接播报模式(特殊场景) - * - 按设备分组,独立队列管理 + * 2. 通过队列控制播报间隔,防止连续播报被覆盖 + * 3. 记录播报日志 *

* 队列机制: * - 相同设备的播报请求进入 Redis 队列 - * - 消费者按顺序取出,间隔 1-2 秒播报 - * - 紧急消息可插队(优先级高) + * - 消费者按顺序取出,按配置间隔播报 + * - 紧急消息可插队(leftPush 到队列头部) * - 支持失败重试(最多 2-3 次) *

* JT808 TTS 播报标志 (tts_flag) 说明: @@ -50,27 +42,6 @@ public class VoiceBroadcastService { */ private static final String TTS_IDENTIFIER = "TTS"; - /** - * TTS 播报标志:静默执行 - *

- * 设备收到后解析指令,修改参数,回复 0x0001,但不发声 - */ - public static final int TTS_FLAG_SILENT = 0x01; - - /** - * TTS 播报标志:普通通知 - *

- * 播放语音,设备将文本内容通过喇叭朗读出来 - */ - public static final int TTS_FLAG_NORMAL = 0x08; - - /** - * TTS 播报标志:紧急通知 - *

- * 播放语音(通常带显示),用于重要通知 - */ - public static final int TTS_FLAG_URGENT = 0x09; - @Resource private IotDeviceControlApi iotDeviceControlApi; @@ -86,85 +57,48 @@ public class VoiceBroadcastService { @Value("${ops.tts.queue.enabled:true}") private boolean queueEnabled; - // ==================== 队列模式播报(推荐) ==================== + // ==================== 队列模式播报 ==================== /** - * 播报语音(紧急通知,默认,使用队列) + * 按序播报(FIFO,rightPush 追加到队列尾部) *

- * 使用 tts_flag=0x09,适用于工单确认、到岗提醒等重要通知 + * 使用 tts_flag=0x09(紧急通知,带显示),以普通优先级入队, + * 保证同一设备上的播报按入队顺序播放。适用于大多数业务通知。 * * @param deviceId 设备ID * @param text 播报文本 + * @param orderId 工单ID(可选,用于日志记录) */ - public void broadcast(Long deviceId, String text) { - broadcast(deviceId, text, TTS_FLAG_URGENT, null); - } - - /** - * 播报语音(带工单ID,使用队列) - */ - public void broadcast(Long deviceId, String text, Long orderId) { - broadcast(deviceId, text, TTS_FLAG_URGENT, orderId); - } - - /** - * 播报语音(指定播报类型,使用队列) - * - * @param deviceId 设备ID - * @param text 播报文本 - * @param ttsFlag 播报标志(0x01=静默, 0x08=普通, 0x09=紧急) - */ - public void broadcast(Long deviceId, String text, int ttsFlag) { - broadcast(deviceId, text, ttsFlag, null); - } - - /** - * 播报语音(指定播报类型和工单ID,使用队列) - *

- * 优先使用队列模式,确保播报不丢失 - * - * @param deviceId 设备ID - * @param text 播报文本 - * @param ttsFlag 播报标志 - * @param orderId 工单ID(可选) - */ - public void broadcast(Long deviceId, String text, int ttsFlag, Long orderId) { + public void broadcastInOrder(Long deviceId, String text, Long orderId) { if (deviceId == null || text == null) { return; } - - // 优先使用队列模式 if (queueEnabled) { - broadcastWithQueue(deviceId, text, ttsFlag, orderId); + enqueueOrFallback(TtsQueueMessage.inOrder(deviceId, text, orderId)); } else { - broadcastDirect(deviceId, text, ttsFlag, orderId); + broadcastDirect(deviceId, text, TtsQueueMessage.TTS_FLAG_URGENT, orderId); } } /** - * 播报语音(普通通知,使用队列) + * 紧急播报(leftPush 插队到队列头部) + *

+ * 使用 tts_flag=0x09(紧急通知,带显示),以紧急优先级入队, + * 仅用于 P0 紧急任务打断等需要立即播报的场景。 + * + * @param deviceId 设备ID + * @param text 播报文本 + * @param orderId 工单ID(可选,用于日志记录) */ - public void broadcastNormal(Long deviceId, String text) { - broadcast(deviceId, text, TTS_FLAG_NORMAL); - } - - /** - * 播报语音(紧急通知,使用队列) - */ - public void broadcastUrgent(Long deviceId, String text) { - broadcast(deviceId, text, TTS_FLAG_URGENT, null); - } - public void broadcastUrgent(Long deviceId, String text, Long orderId) { - broadcast(deviceId, text, TTS_FLAG_URGENT, orderId); - } - - /** - * 播报语音(异步,使用队列) - */ - @Async("ops-task-executor") - public void broadcastAsync(Long deviceId, String text) { - broadcast(deviceId, text); + if (deviceId == null || text == null) { + return; + } + if (queueEnabled) { + enqueueOrFallback(TtsQueueMessage.urgent(deviceId, text, orderId)); + } else { + broadcastDirect(deviceId, text, TtsQueueMessage.TTS_FLAG_URGENT, orderId); + } } // ==================== 直接播报模式(特殊场景) ==================== @@ -206,38 +140,31 @@ public class VoiceBroadcastService { } } - // ==================== 队列播报(内部方法) ==================== + // ==================== 内部方法 ==================== /** - * 通过队列播报语音 + * 入队,失败时降级为直接播报 */ - private void broadcastWithQueue(Long deviceId, String text, int ttsFlag, Long orderId) { + private void enqueueOrFallback(TtsQueueMessage message) { try { - TtsQueueMessage message = TtsQueueMessage.builder() - .deviceId(deviceId) - .text(text) - .ttsFlag(ttsFlag) - .orderId(orderId) - .priority(ttsFlag == TTS_FLAG_URGENT ? 1 : 5) - .createTime(System.currentTimeMillis()) - .retryCount(0) - .maxRetry(ttsFlag == TTS_FLAG_URGENT ? 3 : 2) - .build(); - boolean enqueued = ttsQueueConsumer.enqueue(message); if (enqueued) { log.debug("[VoiceBroadcast] 消息入队: deviceId={}, text={}, flag=0x{}, priority={}", - deviceId, text, Integer.toHexString(ttsFlag), message.getPriority()); + message.getDeviceId(), message.getText(), + Integer.toHexString(message.getTtsFlag()), message.getPriority()); } else { - // 队列满或其他原因入队失败,降级为直接播报 - log.warn("[VoiceBroadcast] 入队失败,降级为直接播报: deviceId={}, text={}", deviceId, text); - broadcastDirect(deviceId, text, ttsFlag, orderId); + log.warn("[VoiceBroadcast] 入队失败,降级为直接播报: deviceId={}, text={}", + message.getDeviceId(), message.getText()); + broadcastDirect(message.getDeviceId(), message.getText(), + message.getTtsFlag(), message.getOrderId()); } } catch (Exception e) { - log.error("[VoiceBroadcast] 队列入队异常,降级为直接播报: deviceId={}, text={}", deviceId, text, e); - broadcastDirect(deviceId, text, ttsFlag, orderId); + log.error("[VoiceBroadcast] 队列入队异常,降级为直接播报: deviceId={}, text={}", + message.getDeviceId(), message.getText(), e); + broadcastDirect(message.getDeviceId(), message.getText(), + message.getTtsFlag(), message.getOrderId()); } } diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/test/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderEndToEndTest.java b/viewsh-module-ops/viewsh-module-environment-biz/src/test/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderEndToEndTest.java index b71080c..2722bea 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/test/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderEndToEndTest.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/test/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderEndToEndTest.java @@ -338,7 +338,7 @@ public class CleanOrderEndToEndTest { verify(eventLogRecorder).record(any()); // 2. TTS sent (orderId can be null for TTS_REQUEST events) - verify(voiceBroadcastService).broadcast(eq(5001L), contains("请回到作业区域"), eq((Long) null)); + verify(voiceBroadcastService).broadcastInOrder(eq(5001L), contains("请回到作业区域"), eq((Long) null)); } // ========================================== diff --git a/viewsh-module-ops/viewsh-module-ops-server/src/main/resources/application.yaml b/viewsh-module-ops/viewsh-module-ops-server/src/main/resources/application.yaml index 8c18da4..fcd5750 100644 --- a/viewsh-module-ops/viewsh-module-ops-server/src/main/resources/application.yaml +++ b/viewsh-module-ops/viewsh-module-ops-server/src/main/resources/application.yaml @@ -149,7 +149,7 @@ ops: tts: queue: enabled: true # 是否启用 TTS 语音播报队列 - interval-ms: 3000 # 同一设备播报间隔(毫秒) + interval-ms: 6000 # 同一设备播报间隔(毫秒) max-queue-size: 50 # 单个设备队列最大长度 debug: false