refactor(ops): 重构 TTS 语音播报队列,解耦 ttsFlag 与队列优先级
移除 Thread.sleep(5000) 阻塞,改由 TTS 队列按设备维度控制播报顺序和间隔: - 解耦 ttsFlag(硬件行为 0x09)与 priority(队列位置),全部使用 0x09 发送 - TtsQueueMessage 新增 inOrder/urgent 工厂方法,VoiceBroadcastService 精简为 broadcastInOrder(FIFO rightPush)和 broadcastUrgent(leftPush 插队)两个入口 - 同设备播报间隔 3s → 6s,消息过期时间 30s → 60s - 修复原 leftPush+leftPop LIFO 导致连续入队顺序反转的问题 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -199,7 +199,7 @@ public class CleanOrderAuditEventHandler implements RocketMQListener<String> {
|
||||
*/
|
||||
private void sendTts(Long deviceId, String text, Long orderId) {
|
||||
try {
|
||||
voiceBroadcastService.broadcast(deviceId, text, orderId);
|
||||
voiceBroadcastService.broadcastInOrder(deviceId, text, orderId);
|
||||
log.info("[CleanOrderAuditEventHandler] TTS 下发成功: deviceId={}, text={}", deviceId, text);
|
||||
} catch (Exception e) {
|
||||
log.error("[CleanOrderAuditEventHandler] TTS 下发异常: deviceId={}", deviceId, e);
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
package com.viewsh.module.ops.environment.integration.listener;
|
||||
|
||||
import com.viewsh.module.iot.api.device.IotDeviceControlApi;
|
||||
import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO;
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueService;
|
||||
import com.viewsh.module.ops.core.dispatch.DispatchEngine;
|
||||
import com.viewsh.module.ops.core.dispatch.model.DispatchResult;
|
||||
@@ -15,6 +13,7 @@ import com.viewsh.module.ops.enums.PriorityEnum;
|
||||
import com.viewsh.module.ops.environment.constants.CleanNotificationConstants;
|
||||
import com.viewsh.module.ops.environment.dal.dataobject.workorder.OpsOrderCleanExtDO;
|
||||
import com.viewsh.module.ops.environment.dal.mysql.workorder.OpsOrderCleanExtMapper;
|
||||
import com.viewsh.module.ops.environment.dal.redis.TrafficActiveOrderRedisDAO;
|
||||
import com.viewsh.module.ops.environment.service.cleanorder.CleanOrderService;
|
||||
import com.viewsh.module.ops.environment.service.voice.VoiceBroadcastService;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.EventDomain;
|
||||
@@ -78,15 +77,15 @@ public class CleanOrderEventListener {
|
||||
@Resource
|
||||
private NotifyMessageSendApi notifyMessageSendApi;
|
||||
|
||||
@Resource
|
||||
private IotDeviceControlApi iotDeviceControlApi;
|
||||
|
||||
@Resource
|
||||
private EventLogRecorder eventLogRecorder;
|
||||
|
||||
@Resource
|
||||
private OrderQueueService orderQueueService;
|
||||
|
||||
@Resource
|
||||
private TrafficActiveOrderRedisDAO trafficActiveOrderRedisDAO;
|
||||
|
||||
// ==================== 工单创建事件 ====================
|
||||
|
||||
/**
|
||||
@@ -108,10 +107,7 @@ public class CleanOrderEventListener {
|
||||
// 异步触发调度
|
||||
asyncDispatchAfterCreated(event);
|
||||
|
||||
// 如果是客流触发的工单,重置客流计数器
|
||||
if ("IOT_TRAFFIC".equals(event.getPayload().get("triggerSource"))) {
|
||||
asyncResetTrafficCounter(event);
|
||||
}
|
||||
// 注意:客流计数器重置已移至 CleanOrderCreateEventHandler,在消息消费时统一处理
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -152,50 +148,6 @@ public class CleanOrderEventListener {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 异步重置客流计数器
|
||||
* <p>
|
||||
* 在工单创建成功后重置阈值计数器,确保:
|
||||
* 1. 工单创建和计数器重置在同一事务语义下(AFTER_COMMIT)
|
||||
* 2. 如果工单创建失败,计数器不会被错误重置
|
||||
* 3. 由 Ops 模块(业务方)决定重置时机,职责清晰
|
||||
*/
|
||||
@Async("ops-task-executor")
|
||||
public void asyncResetTrafficCounter(OrderCreatedEvent event) {
|
||||
try {
|
||||
Long deviceId = (Long) event.getPayload().get("triggerDeviceId");
|
||||
|
||||
if (deviceId == null) {
|
||||
log.warn("[CleanOrderEventListener] 缺少设备ID,跳过重置: orderId={}",
|
||||
event.getOrderId());
|
||||
return;
|
||||
}
|
||||
|
||||
// 构建重置请求
|
||||
ResetTrafficCounterReqDTO reqDTO = ResetTrafficCounterReqDTO.builder()
|
||||
.deviceId(deviceId)
|
||||
.orderId(event.getOrderId())
|
||||
.remark("工单创建成功后重置阈值计数器")
|
||||
.build();
|
||||
|
||||
// 调用 IoT 模块 RPC 接口
|
||||
var result = iotDeviceControlApi.resetTrafficCounter(reqDTO);
|
||||
|
||||
if (result.getData() != null && result.getData()) {
|
||||
log.info("[CleanOrderEventListener] 阈值计数器重置成功: orderId={}, deviceId={}",
|
||||
event.getOrderId(), deviceId);
|
||||
} else {
|
||||
log.error("[CleanOrderEventListener] 阈值计数器重置失败: orderId={}, deviceId={}",
|
||||
event.getOrderId(), deviceId);
|
||||
// TODO: 发送告警,需要人工介入检查
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[CleanOrderEventListener] 阈值计数器重置异常: orderId={}", event.getOrderId(), e);
|
||||
// TODO: 发送告警
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== 状态变更事件 ====================
|
||||
|
||||
/**
|
||||
@@ -216,25 +168,31 @@ public class CleanOrderEventListener {
|
||||
switch (event.getNewStatus()) {
|
||||
case DISPATCHED:
|
||||
handleDispatched(event);
|
||||
updateTrafficActiveOrderStatus(event);
|
||||
break;
|
||||
case CONFIRMED:
|
||||
handleConfirmed(event);
|
||||
updateTrafficActiveOrderStatus(event);
|
||||
break;
|
||||
case ARRIVED:
|
||||
handleArrived(event);
|
||||
updateTrafficActiveOrderStatus(event);
|
||||
break;
|
||||
case PAUSED:
|
||||
handlePaused(event);
|
||||
break;
|
||||
case COMPLETED:
|
||||
handleCompleted(event);
|
||||
clearTrafficActiveOrder(event);
|
||||
break;
|
||||
case CANCELLED:
|
||||
// 设备状态由 BadgeDeviceStatusEventListener 统一处理
|
||||
log.debug("[CleanOrderEventListener] CANCELLED 状态已处理: orderId={}", event.getOrderId());
|
||||
clearTrafficActiveOrder(event);
|
||||
break;
|
||||
case QUEUED:
|
||||
handleQueued(event);
|
||||
updateTrafficActiveOrderStatus(event);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
@@ -437,7 +395,7 @@ public class CleanOrderEventListener {
|
||||
|
||||
Long deviceId = event.getAssigneeId();
|
||||
if (deviceId != null) {
|
||||
// 异步执行:先发送完成通知,等待5秒后再派发下一个任务
|
||||
// 异步执行:发送完成通知,然后派发下一个任务(TTS队列控制播报间隔)
|
||||
asyncCompleteAndDispatchNext(event.getOrderId(), deviceId);
|
||||
}
|
||||
}
|
||||
@@ -445,23 +403,16 @@ public class CleanOrderEventListener {
|
||||
/**
|
||||
* 异步执行工单完成后的通知和派单
|
||||
* <p>
|
||||
* 保证顺序:先播报"工单已完成",间隔5秒后再派发下一个任务(触发"新工单来啦")
|
||||
* 避免两条语音播报顺序混乱或被覆盖
|
||||
* 先发送完成通知,再立即触发下一个任务派发。
|
||||
* 语音播报顺序和间隔由 TTS 队列({@link com.viewsh.module.ops.environment.service.voice.TtsQueueConsumer})
|
||||
* 按设备维度控制,同一设备前后播报间隔由 ops.tts.queue.interval-ms 配置。
|
||||
*/
|
||||
@Async("ops-task-executor")
|
||||
public void asyncCompleteAndDispatchNext(Long orderId, Long deviceId) {
|
||||
// 1. 先发送完成通知
|
||||
// 1. 发送完成通知(入TTS队列)
|
||||
sendOrderCompletedNotification(orderId, deviceId);
|
||||
|
||||
// 2. 等待5秒,确保完成语音播报完毕后再触发下一个任务的通知
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.warn("[CleanOrderEventListener] 等待被中断: orderId={}", orderId);
|
||||
}
|
||||
|
||||
// 3. 自动推送下一个任务
|
||||
// 2. 自动推送下一个任务(新任务通知也入TTS队列,由队列控制播报间隔)
|
||||
cleanOrderService.autoDispatchNextOrder(orderId, deviceId);
|
||||
}
|
||||
|
||||
@@ -552,9 +503,9 @@ public class CleanOrderEventListener {
|
||||
try {
|
||||
log.warn("[P0紧急通知] deviceId={}, orderCode={}", deviceId, orderCode);
|
||||
|
||||
// 1. 语音播报(使用统一模板构建器)
|
||||
// 1. 语音播报(P0紧急,插队到队列头部)
|
||||
String voiceMessage = CleanNotificationConstants.VoiceBuilder.buildPriorityUpgrade(orderCode);
|
||||
playVoice(deviceId, voiceMessage, orderId);
|
||||
playVoiceUrgent(deviceId, voiceMessage, orderId);
|
||||
|
||||
// 2. 发送站内信
|
||||
sendNotifyMessage(1L,
|
||||
@@ -637,11 +588,14 @@ public class CleanOrderEventListener {
|
||||
}
|
||||
|
||||
/**
|
||||
* 语音播报(带工单ID)
|
||||
* 语音播报(带工单ID,按序入队 FIFO)
|
||||
* <p>
|
||||
* 大多数业务通知使用此方法,保证同一设备上的播报按入队顺序播放。
|
||||
* 仅 P0 紧急插队场景使用 {@link #playVoiceUrgent}。
|
||||
*/
|
||||
private void playVoice(Long deviceId, String message, Long orderId) {
|
||||
try {
|
||||
voiceBroadcastService.broadcast(deviceId, message, orderId);
|
||||
voiceBroadcastService.broadcastInOrder(deviceId, message, orderId);
|
||||
log.debug("[语音播报] 调用成功: deviceId={}, message={}", deviceId, message);
|
||||
|
||||
} catch (Exception e) {
|
||||
@@ -649,6 +603,21 @@ public class CleanOrderEventListener {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 紧急语音播报(插队到队列头部)
|
||||
* <p>
|
||||
* 仅用于 P0 紧急任务打断等需要立即播报的场景
|
||||
*/
|
||||
private void playVoiceUrgent(Long deviceId, String message, Long orderId) {
|
||||
try {
|
||||
voiceBroadcastService.broadcastUrgent(deviceId, message, orderId);
|
||||
log.debug("[语音播报-紧急] 调用成功: deviceId={}, message={}", deviceId, message);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[语音播报-紧急] 调用失败: deviceId={}, message={}", deviceId, message, e);
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== 站内信发送方法 ====================
|
||||
|
||||
/**
|
||||
@@ -679,6 +648,39 @@ public class CleanOrderEventListener {
|
||||
return "某区域";
|
||||
}
|
||||
|
||||
/**
|
||||
* 工单状态变更时,更新 Redis 中的活跃工单状态标记
|
||||
* <p>
|
||||
* 仅处理客流触发的工单
|
||||
*/
|
||||
private void updateTrafficActiveOrderStatus(OrderStateChangedEvent event) {
|
||||
try {
|
||||
OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId());
|
||||
if (order != null && "IOT_TRAFFIC".equals(order.getTriggerSource()) && order.getAreaId() != null) {
|
||||
trafficActiveOrderRedisDAO.updateStatus(order.getAreaId(), event.getNewStatus().getStatus());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[CleanOrderEventListener] 更新客流活跃工单状态失败: orderId={}", event.getOrderId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 工单终态时,清除 Redis 中的活跃工单标记
|
||||
* <p>
|
||||
* 仅处理客流触发的工单。清除后下次客流达标将创建新工单(新周期)。
|
||||
*/
|
||||
private void clearTrafficActiveOrder(OrderStateChangedEvent event) {
|
||||
try {
|
||||
OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId());
|
||||
if (order != null && "IOT_TRAFFIC".equals(order.getTriggerSource()) && order.getAreaId() != null) {
|
||||
trafficActiveOrderRedisDAO.removeActive(order.getAreaId());
|
||||
log.info("[CleanOrderEventListener] 客流工单周期结束,已清除区域{}活跃标记", order.getAreaId());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[CleanOrderEventListener] 清除客流活跃工单标记失败: orderId={}", event.getOrderId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 记录暂停开始时间
|
||||
*/
|
||||
|
||||
@@ -49,7 +49,7 @@ public class TtsQueueConsumer {
|
||||
@Value("${ops.tts.queue.enabled:true}")
|
||||
private boolean queueEnabled;
|
||||
|
||||
@Value("${ops.tts.queue.interval-ms:3000}")
|
||||
@Value("${ops.tts.queue.interval-ms:6000}")
|
||||
private long broadcastIntervalMs;
|
||||
|
||||
@Value("${ops.tts.queue.max-queue-size:50}")
|
||||
|
||||
@@ -37,6 +37,16 @@ public class TtsQueueMessage implements Serializable {
|
||||
*/
|
||||
public static final int TTS_FLAG_URGENT = 0x09;
|
||||
|
||||
/**
|
||||
* 队列优先级:紧急(leftPush 插队到头部)
|
||||
*/
|
||||
public static final int PRIORITY_URGENT = 1;
|
||||
|
||||
/**
|
||||
* 队列优先级:普通(rightPush 按序追加到尾部)
|
||||
*/
|
||||
public static final int PRIORITY_NORMAL = 5;
|
||||
|
||||
/**
|
||||
* 设备ID
|
||||
*/
|
||||
@@ -79,14 +89,18 @@ public class TtsQueueMessage implements Serializable {
|
||||
private Integer maxRetry;
|
||||
|
||||
/**
|
||||
* 创建普通消息
|
||||
* 创建按序消息(FIFO,rightPush 追加到尾部)
|
||||
* <p>
|
||||
* ttsFlag=0x09(紧急通知,带显示),priority=5(普通优先级)
|
||||
* 适用于大多数业务通知,保证同一设备播报按入队顺序播放
|
||||
*/
|
||||
public static TtsQueueMessage normal(Long deviceId, String text) {
|
||||
public static TtsQueueMessage inOrder(Long deviceId, String text, Long orderId) {
|
||||
return TtsQueueMessage.builder()
|
||||
.deviceId(deviceId)
|
||||
.text(text)
|
||||
.ttsFlag(TTS_FLAG_NORMAL)
|
||||
.priority(5)
|
||||
.ttsFlag(TTS_FLAG_URGENT)
|
||||
.orderId(orderId)
|
||||
.priority(PRIORITY_NORMAL)
|
||||
.createTime(System.currentTimeMillis())
|
||||
.retryCount(0)
|
||||
.maxRetry(2)
|
||||
@@ -94,36 +108,24 @@ public class TtsQueueMessage implements Serializable {
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建紧急消息
|
||||
* 创建紧急消息(leftPush 插队到队列头部)
|
||||
* <p>
|
||||
* ttsFlag=0x09(紧急通知,带显示),priority=1(紧急优先级)
|
||||
* 仅用于 P0 紧急任务打断等需要立即播报的场景
|
||||
*/
|
||||
public static TtsQueueMessage urgent(Long deviceId, String text) {
|
||||
public static TtsQueueMessage urgent(Long deviceId, String text, Long orderId) {
|
||||
return TtsQueueMessage.builder()
|
||||
.deviceId(deviceId)
|
||||
.text(text)
|
||||
.ttsFlag(TTS_FLAG_URGENT)
|
||||
.priority(1)
|
||||
.orderId(orderId)
|
||||
.priority(PRIORITY_URGENT)
|
||||
.createTime(System.currentTimeMillis())
|
||||
.retryCount(0)
|
||||
.maxRetry(3)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建带工单ID的消息
|
||||
*/
|
||||
public static TtsQueueMessage withOrder(Long deviceId, String text, int ttsFlag, Long orderId) {
|
||||
return TtsQueueMessage.builder()
|
||||
.deviceId(deviceId)
|
||||
.text(text)
|
||||
.ttsFlag(ttsFlag)
|
||||
.orderId(orderId)
|
||||
.priority(ttsFlag == TTS_FLAG_URGENT ? 1 : 5)
|
||||
.createTime(System.currentTimeMillis())
|
||||
.retryCount(0)
|
||||
.maxRetry(ttsFlag == TTS_FLAG_URGENT ? 3 : 2)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 增加重试次数
|
||||
*/
|
||||
@@ -139,12 +141,12 @@ public class TtsQueueMessage implements Serializable {
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查消息是否过期(超过30秒认为过期)
|
||||
* 检查消息是否过期(超过60秒认为过期)
|
||||
*/
|
||||
public boolean isExpired() {
|
||||
if (createTime == null) {
|
||||
return false;
|
||||
}
|
||||
return System.currentTimeMillis() - createTime > 30_000;
|
||||
return System.currentTimeMillis() - createTime > 60_000;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@ import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
@@ -18,20 +17,13 @@ import org.springframework.stereotype.Service;
|
||||
* <p>
|
||||
* 职责:
|
||||
* 1. 统一所有 TTS 下发入口
|
||||
* 2. 提供同步/异步播报接口
|
||||
* 3. 通过队列控制播报间隔,防止连续播报被覆盖
|
||||
* 4. 记录播报日志
|
||||
* <p>
|
||||
* 设计原则:
|
||||
* - 接受 deviceId 参数(而非 cleanerId)
|
||||
* - 默认使用队列模式,确保播报不丢失
|
||||
* - 支持直接播报模式(特殊场景)
|
||||
* - 按设备分组,独立队列管理
|
||||
* 2. 通过队列控制播报间隔,防止连续播报被覆盖
|
||||
* 3. 记录播报日志
|
||||
* <p>
|
||||
* 队列机制:
|
||||
* - 相同设备的播报请求进入 Redis 队列
|
||||
* - 消费者按顺序取出,间隔 1-2 秒播报
|
||||
* - 紧急消息可插队(优先级高)
|
||||
* - 消费者按顺序取出,按配置间隔播报
|
||||
* - 紧急消息可插队(leftPush 到队列头部)
|
||||
* - 支持失败重试(最多 2-3 次)
|
||||
* <p>
|
||||
* JT808 TTS 播报标志 (tts_flag) 说明:
|
||||
@@ -50,27 +42,6 @@ public class VoiceBroadcastService {
|
||||
*/
|
||||
private static final String TTS_IDENTIFIER = "TTS";
|
||||
|
||||
/**
|
||||
* TTS 播报标志:静默执行
|
||||
* <p>
|
||||
* 设备收到后解析指令,修改参数,回复 0x0001,但不发声
|
||||
*/
|
||||
public static final int TTS_FLAG_SILENT = 0x01;
|
||||
|
||||
/**
|
||||
* TTS 播报标志:普通通知
|
||||
* <p>
|
||||
* 播放语音,设备将文本内容通过喇叭朗读出来
|
||||
*/
|
||||
public static final int TTS_FLAG_NORMAL = 0x08;
|
||||
|
||||
/**
|
||||
* TTS 播报标志:紧急通知
|
||||
* <p>
|
||||
* 播放语音(通常带显示),用于重要通知
|
||||
*/
|
||||
public static final int TTS_FLAG_URGENT = 0x09;
|
||||
|
||||
@Resource
|
||||
private IotDeviceControlApi iotDeviceControlApi;
|
||||
|
||||
@@ -86,85 +57,48 @@ public class VoiceBroadcastService {
|
||||
@Value("${ops.tts.queue.enabled:true}")
|
||||
private boolean queueEnabled;
|
||||
|
||||
// ==================== 队列模式播报(推荐) ====================
|
||||
// ==================== 队列模式播报 ====================
|
||||
|
||||
/**
|
||||
* 播报语音(紧急通知,默认,使用队列)
|
||||
* 按序播报(FIFO,rightPush 追加到队列尾部)
|
||||
* <p>
|
||||
* 使用 tts_flag=0x09,适用于工单确认、到岗提醒等重要通知
|
||||
* 使用 tts_flag=0x09(紧急通知,带显示),以普通优先级入队,
|
||||
* 保证同一设备上的播报按入队顺序播放。适用于大多数业务通知。
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param text 播报文本
|
||||
* @param orderId 工单ID(可选,用于日志记录)
|
||||
*/
|
||||
public void broadcast(Long deviceId, String text) {
|
||||
broadcast(deviceId, text, TTS_FLAG_URGENT, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* 播报语音(带工单ID,使用队列)
|
||||
*/
|
||||
public void broadcast(Long deviceId, String text, Long orderId) {
|
||||
broadcast(deviceId, text, TTS_FLAG_URGENT, orderId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 播报语音(指定播报类型,使用队列)
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param text 播报文本
|
||||
* @param ttsFlag 播报标志(0x01=静默, 0x08=普通, 0x09=紧急)
|
||||
*/
|
||||
public void broadcast(Long deviceId, String text, int ttsFlag) {
|
||||
broadcast(deviceId, text, ttsFlag, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* 播报语音(指定播报类型和工单ID,使用队列)
|
||||
* <p>
|
||||
* 优先使用队列模式,确保播报不丢失
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param text 播报文本
|
||||
* @param ttsFlag 播报标志
|
||||
* @param orderId 工单ID(可选)
|
||||
*/
|
||||
public void broadcast(Long deviceId, String text, int ttsFlag, Long orderId) {
|
||||
public void broadcastInOrder(Long deviceId, String text, Long orderId) {
|
||||
if (deviceId == null || text == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 优先使用队列模式
|
||||
if (queueEnabled) {
|
||||
broadcastWithQueue(deviceId, text, ttsFlag, orderId);
|
||||
enqueueOrFallback(TtsQueueMessage.inOrder(deviceId, text, orderId));
|
||||
} else {
|
||||
broadcastDirect(deviceId, text, ttsFlag, orderId);
|
||||
broadcastDirect(deviceId, text, TtsQueueMessage.TTS_FLAG_URGENT, orderId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 播报语音(普通通知,使用队列)
|
||||
* 紧急播报(leftPush 插队到队列头部)
|
||||
* <p>
|
||||
* 使用 tts_flag=0x09(紧急通知,带显示),以紧急优先级入队,
|
||||
* 仅用于 P0 紧急任务打断等需要立即播报的场景。
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param text 播报文本
|
||||
* @param orderId 工单ID(可选,用于日志记录)
|
||||
*/
|
||||
public void broadcastNormal(Long deviceId, String text) {
|
||||
broadcast(deviceId, text, TTS_FLAG_NORMAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* 播报语音(紧急通知,使用队列)
|
||||
*/
|
||||
public void broadcastUrgent(Long deviceId, String text) {
|
||||
broadcast(deviceId, text, TTS_FLAG_URGENT, null);
|
||||
}
|
||||
|
||||
public void broadcastUrgent(Long deviceId, String text, Long orderId) {
|
||||
broadcast(deviceId, text, TTS_FLAG_URGENT, orderId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 播报语音(异步,使用队列)
|
||||
*/
|
||||
@Async("ops-task-executor")
|
||||
public void broadcastAsync(Long deviceId, String text) {
|
||||
broadcast(deviceId, text);
|
||||
if (deviceId == null || text == null) {
|
||||
return;
|
||||
}
|
||||
if (queueEnabled) {
|
||||
enqueueOrFallback(TtsQueueMessage.urgent(deviceId, text, orderId));
|
||||
} else {
|
||||
broadcastDirect(deviceId, text, TtsQueueMessage.TTS_FLAG_URGENT, orderId);
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== 直接播报模式(特殊场景) ====================
|
||||
@@ -206,38 +140,31 @@ public class VoiceBroadcastService {
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== 队列播报(内部方法) ====================
|
||||
// ==================== 内部方法 ====================
|
||||
|
||||
/**
|
||||
* 通过队列播报语音
|
||||
* 入队,失败时降级为直接播报
|
||||
*/
|
||||
private void broadcastWithQueue(Long deviceId, String text, int ttsFlag, Long orderId) {
|
||||
private void enqueueOrFallback(TtsQueueMessage message) {
|
||||
try {
|
||||
TtsQueueMessage message = TtsQueueMessage.builder()
|
||||
.deviceId(deviceId)
|
||||
.text(text)
|
||||
.ttsFlag(ttsFlag)
|
||||
.orderId(orderId)
|
||||
.priority(ttsFlag == TTS_FLAG_URGENT ? 1 : 5)
|
||||
.createTime(System.currentTimeMillis())
|
||||
.retryCount(0)
|
||||
.maxRetry(ttsFlag == TTS_FLAG_URGENT ? 3 : 2)
|
||||
.build();
|
||||
|
||||
boolean enqueued = ttsQueueConsumer.enqueue(message);
|
||||
|
||||
if (enqueued) {
|
||||
log.debug("[VoiceBroadcast] 消息入队: deviceId={}, text={}, flag=0x{}, priority={}",
|
||||
deviceId, text, Integer.toHexString(ttsFlag), message.getPriority());
|
||||
message.getDeviceId(), message.getText(),
|
||||
Integer.toHexString(message.getTtsFlag()), message.getPriority());
|
||||
} else {
|
||||
// 队列满或其他原因入队失败,降级为直接播报
|
||||
log.warn("[VoiceBroadcast] 入队失败,降级为直接播报: deviceId={}, text={}", deviceId, text);
|
||||
broadcastDirect(deviceId, text, ttsFlag, orderId);
|
||||
log.warn("[VoiceBroadcast] 入队失败,降级为直接播报: deviceId={}, text={}",
|
||||
message.getDeviceId(), message.getText());
|
||||
broadcastDirect(message.getDeviceId(), message.getText(),
|
||||
message.getTtsFlag(), message.getOrderId());
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[VoiceBroadcast] 队列入队异常,降级为直接播报: deviceId={}, text={}", deviceId, text, e);
|
||||
broadcastDirect(deviceId, text, ttsFlag, orderId);
|
||||
log.error("[VoiceBroadcast] 队列入队异常,降级为直接播报: deviceId={}, text={}",
|
||||
message.getDeviceId(), message.getText(), e);
|
||||
broadcastDirect(message.getDeviceId(), message.getText(),
|
||||
message.getTtsFlag(), message.getOrderId());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -338,7 +338,7 @@ public class CleanOrderEndToEndTest {
|
||||
verify(eventLogRecorder).record(any());
|
||||
|
||||
// 2. TTS sent (orderId can be null for TTS_REQUEST events)
|
||||
verify(voiceBroadcastService).broadcast(eq(5001L), contains("请回到作业区域"), eq((Long) null));
|
||||
verify(voiceBroadcastService).broadcastInOrder(eq(5001L), contains("请回到作业区域"), eq((Long) null));
|
||||
}
|
||||
|
||||
// ==========================================
|
||||
|
||||
@@ -149,7 +149,7 @@ ops:
|
||||
tts:
|
||||
queue:
|
||||
enabled: true # 是否启用 TTS 语音播报队列
|
||||
interval-ms: 3000 # 同一设备播报间隔(毫秒)
|
||||
interval-ms: 6000 # 同一设备播报间隔(毫秒)
|
||||
max-queue-size: 50 # 单个设备队列最大长度
|
||||
|
||||
debug: false
|
||||
|
||||
Reference in New Issue
Block a user