From 2afc173e18f9cbe19da1c4bf7825b53502bc40a1 Mon Sep 17 00:00:00 2001 From: lzh Date: Sun, 1 Feb 2026 02:05:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20ops=5Fbusiness=5Fevent=5Fl?= =?UTF-8?q?og=20=E8=A1=A8=E4=B8=AD=E6=97=A5=E5=BF=97=E7=BC=BA=E5=B0=91=20t?= =?UTF-8?q?argetType=20=E5=92=8C=20targetId=20=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../listener/CleanOrderEventListener.java | 46 ++-- .../service/cleanorder/CleanOrderService.java | 2 +- .../cleanorder/CleanOrderServiceImpl.java | 6 +- .../service/voice/VoiceBroadcastService.java | 198 +++++++++++++----- .../ops/core/dispatch/DispatchEngineImpl.java | 4 +- .../log/aspect/BusinessLogAspect.java | 24 +++ 6 files changed, 214 insertions(+), 66 deletions(-) diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/CleanOrderEventListener.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/CleanOrderEventListener.java index a6022ca..62b5be6 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/CleanOrderEventListener.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/CleanOrderEventListener.java @@ -295,7 +295,7 @@ public class CleanOrderEventListener { int queueCount = waitingTasks != null ? waitingTasks.size() : 0; // 发送待办增加通知 - sendQueuedOrderNotification(deviceId, queueCount); + sendQueuedOrderNotification(deviceId, queueCount, orderId); log.info("[CleanOrderEventListener] 入队语音播报已发送: deviceId={}, queueCount={}", deviceId, queueCount); } catch (Exception e) { @@ -324,7 +324,7 @@ public class CleanOrderEventListener { // 发送确认成功语音播报(使用统一模板) OpsOrderDO order = opsOrderMapper.selectById(orderId); String confirmMessage = CleanNotificationConstants.VoiceBuilder.buildOrderConfirmed(order); - playVoice(deviceId, confirmMessage); + playVoice(deviceId, confirmMessage, orderId); } log.info("[CleanOrderEventListener] 工单已确认: orderId={}, deviceId={}", orderId, deviceId); @@ -356,7 +356,7 @@ public class CleanOrderEventListener { if (deviceId != null) { OpsOrderDO order = opsOrderMapper.selectById(orderId); String arrivedMessage = CleanNotificationConstants.VoiceBuilder.buildOrderArrived(order); - playVoice(deviceId, arrivedMessage); + playVoice(deviceId, arrivedMessage, orderId); } // 4. 记录到岗业务日志 @@ -393,11 +393,31 @@ public class CleanOrderEventListener { // 获取 deviceId(优先从 payload 获取,其次从工单获取) Long deviceId = event.getPayloadLong("deviceId"); - // 记录完成时间到扩展表 - OpsOrderCleanExtDO updateObj = new OpsOrderCleanExtDO(); - updateObj.setOpsOrderId(orderId); - updateObj.setCompletedTime(LocalDateTime.now()); - cleanExtMapper.insertOnDuplicateKeyUpdate(updateObj); + // 记录完成时间并计算完成时长 + LocalDateTime completedTime = LocalDateTime.now(); + OpsOrderCleanExtDO ext = cleanExtMapper.selectByOpsOrderId(orderId); + + // 更新扩展表完成时间 + OpsOrderCleanExtDO updateExt = new OpsOrderCleanExtDO(); + updateExt.setOpsOrderId(orderId); + updateExt.setCompletedTime(completedTime); + cleanExtMapper.insertOnDuplicateKeyUpdate(updateExt); + + // 计算并更新完成时长到主表 + if (ext != null && ext.getArrivedTime() != null) { + int totalPauseSeconds = ext.getTotalPauseSeconds() != null ? ext.getTotalPauseSeconds() : 0; + long completionSeconds = java.time.Duration.between(ext.getArrivedTime(), completedTime).getSeconds(); + completionSeconds = Math.max(completionSeconds - totalPauseSeconds, 0); + + OpsOrderDO orderUpdate = new OpsOrderDO(); + orderUpdate.setId(orderId); + orderUpdate.setCompletionSeconds((int) completionSeconds); + opsOrderMapper.updateById(orderUpdate); + + log.info("[CleanOrderEventListener] 完成时长已更新: orderId={}, 完成时长={}秒", orderId, completionSeconds); + } else { + log.warn("[CleanOrderEventListener] 缺少到岗时间,无法计算完成时长: orderId={}", orderId); + } // 记录业务日志 recordOrderCompletedLog(orderId, deviceId, event); @@ -461,7 +481,7 @@ public class CleanOrderEventListener { // 1. 语音播报(使用统一模板构建器) String voiceMessage = CleanNotificationConstants.VoiceBuilder.buildNewOrder(order, true); - playVoice(deviceId, voiceMessage); + playVoice(deviceId, voiceMessage, orderId); // 2. 发送站内信(暂时发送到管理员) sendNotifyMessage(1L, @@ -480,13 +500,13 @@ public class CleanOrderEventListener { * 发送待办增加通知 */ @Async("ops-task-executor") - public void sendQueuedOrderNotification(Long deviceId, int queueCount) { + public void sendQueuedOrderNotification(Long deviceId, int queueCount, Long orderId) { try { log.info("[待办增加通知] deviceId={}, queueCount={}", deviceId, queueCount); // 1. 语音播报(使用统一模板构建器) String voiceMessage = CleanNotificationConstants.VoiceBuilder.buildQueuedOrder(queueCount); - playVoice(deviceId, voiceMessage); + playVoice(deviceId, voiceMessage, orderId); // 2. 发送站内信(待办数量较多时) if (queueCount >= 3) { @@ -526,13 +546,13 @@ public class CleanOrderEventListener { * 发送P0紧急任务插队通知 */ @Async("ops-task-executor") - public void sendPriorityUpgradeNotification(Long deviceId, String orderCode) { + public void sendPriorityUpgradeNotification(Long deviceId, String orderCode, Long orderId) { try { log.warn("[P0紧急通知] deviceId={}, orderCode={}", deviceId, orderCode); // 1. 语音播报(使用统一模板构建器) String voiceMessage = CleanNotificationConstants.VoiceBuilder.buildPriorityUpgrade(orderCode); - playVoice(deviceId, voiceMessage); + playVoice(deviceId, voiceMessage, orderId); // 2. 发送站内信 sendNotifyMessage(1L, diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderService.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderService.java index b4ece92..f800dae 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderService.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderService.java @@ -109,7 +109,7 @@ public interface CleanOrderService { * @param deviceId 工牌设备ID * @param queueCount 当前待办数量 */ - void playVoiceForQueuedOrder(Long deviceId, int queueCount); + void playVoiceForQueuedOrder(Long deviceId, int queueCount, Long orderId); /** * 语音播报:任务完成后自动推送下一个 diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderServiceImpl.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderServiceImpl.java index 96808f7..4d61b01 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderServiceImpl.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderServiceImpl.java @@ -248,7 +248,7 @@ public class CleanOrderServiceImpl implements CleanOrderService { DispatchResult result = dispatchEngine.urgentInterrupt(orderId, queueDTO.getUserId()); // 6. 发送优先级升级通知 - cleanOrderEventListener.sendPriorityUpgradeNotification(queueDTO.getUserId(), order.getOrderCode()); + cleanOrderEventListener.sendPriorityUpgradeNotification(queueDTO.getUserId(), order.getOrderCode(), orderId); return result.isSuccess(); } @@ -349,8 +349,8 @@ public class CleanOrderServiceImpl implements CleanOrderService { } @Override - public void playVoiceForQueuedOrder(Long deviceId, int queueCount) { - cleanOrderEventListener.sendQueuedOrderNotification(deviceId, queueCount); + public void playVoiceForQueuedOrder(Long deviceId, int queueCount, Long orderId) { + cleanOrderEventListener.sendQueuedOrderNotification(deviceId, queueCount, orderId); } @Override diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/VoiceBroadcastService.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/VoiceBroadcastService.java index d101d6c..b93eb1b 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/VoiceBroadcastService.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/VoiceBroadcastService.java @@ -9,6 +9,7 @@ import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord; import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @@ -18,12 +19,20 @@ import org.springframework.stereotype.Service; * 职责: * 1. 统一所有 TTS 下发入口 * 2. 提供同步/异步播报接口 - * 3. 记录播报日志 + * 3. 通过队列控制播报间隔,防止连续播报被覆盖 + * 4. 记录播报日志 *

* 设计原则: * - 接受 deviceId 参数(而非 cleanerId) - * - 简单可靠,不实现复杂的去重逻辑,由调用方控制 - * - 直接调用 IoT 设备控制接口 + * - 默认使用队列模式,确保播报不丢失 + * - 支持直接播报模式(特殊场景) + * - 按设备分组,独立队列管理 + *

+ * 队列机制: + * - 相同设备的播报请求进入 Redis 队列 + * - 消费者按顺序取出,间隔 1-2 秒播报 + * - 紧急消息可插队(优先级高) + * - 支持失败重试(最多 2-3 次) *

* JT808 TTS 播报标志 (tts_flag) 说明: * - 0x01: 静默执行 - 设备收到后解析指令,修改参数,回复 0x0001,但不发声 @@ -68,8 +77,19 @@ public class VoiceBroadcastService { @Resource private EventLogRecorder eventLogRecorder; + @Resource + private TtsQueueConsumer ttsQueueConsumer; + /** - * 播报语音(紧急通知,默认) + * 是否启用队列模式 + */ + @Value("${ops.tts.queue.enabled:true}") + private boolean queueEnabled; + + // ==================== 队列模式播报(推荐) ==================== + + /** + * 播报语音(紧急通知,默认,使用队列) *

* 使用 tts_flag=0x09,适用于工单确认、到岗提醒等重要通知 * @@ -81,14 +101,14 @@ public class VoiceBroadcastService { } /** - * 播报语音(带工单ID) + * 播报语音(带工单ID,使用队列) */ public void broadcast(Long deviceId, String text, Long orderId) { broadcast(deviceId, text, TTS_FLAG_URGENT, orderId); } /** - * 播报语音(指定播报类型) + * 播报语音(指定播报类型,使用队列) * * @param deviceId 设备ID * @param text 播报文本 @@ -99,7 +119,9 @@ public class VoiceBroadcastService { } /** - * 播报语音(指定播报类型和工单ID) + * 播报语音(指定播报类型和工单ID,使用队列) + *

+ * 优先使用队列模式,确保播报不丢失 * * @param deviceId 设备ID * @param text 播报文本 @@ -111,6 +133,58 @@ public class VoiceBroadcastService { return; } + // 优先使用队列模式 + if (queueEnabled) { + broadcastWithQueue(deviceId, text, ttsFlag, orderId); + } else { + broadcastDirect(deviceId, text, ttsFlag, orderId); + } + } + + /** + * 播报语音(普通通知,使用队列) + */ + public void broadcastNormal(Long deviceId, String text) { + broadcast(deviceId, text, TTS_FLAG_NORMAL); + } + + /** + * 播报语音(紧急通知,使用队列) + */ + public void broadcastUrgent(Long deviceId, String text) { + broadcast(deviceId, text, TTS_FLAG_URGENT, null); + } + + public void broadcastUrgent(Long deviceId, String text, Long orderId) { + broadcast(deviceId, text, TTS_FLAG_URGENT, orderId); + } + + /** + * 播报语音(异步,使用队列) + */ + @Async("ops-task-executor") + public void broadcastAsync(Long deviceId, String text) { + broadcast(deviceId, text); + } + + // ==================== 直接播报模式(特殊场景) ==================== + + /** + * 直接播报语音(不经过队列) + *

+ * 用于需要立即播报的场景,如紧急告警 + * 注意:连续调用可能导致播报被覆盖 + * + * @param deviceId 设备ID + * @param text 播报文本 + * @param ttsFlag 播报标志 + * @param orderId 工单ID(可选) + */ + public void broadcastDirect(Long deviceId, String text, int ttsFlag, Long orderId) { + if (deviceId == null || text == null) { + return; + } + try { IotDeviceServiceInvokeReqDTO reqDTO = new IotDeviceServiceInvokeReqDTO(); reqDTO.setDeviceId(deviceId); @@ -121,10 +195,59 @@ public class VoiceBroadcastService { .build()); iotDeviceControlApi.invokeService(reqDTO); - log.debug("[VoiceBroadcast] 播报成功: deviceId={}, text={}, flag=0x{}", + log.debug("[VoiceBroadcast] 直接播报成功: deviceId={}, text={}, flag=0x{}", deviceId, text, Integer.toHexString(ttsFlag)); - // 记录日志 + recordLog(deviceId, text, orderId, true, null); + + } catch (Exception e) { + log.error("[VoiceBroadcast] 直接播报失败: deviceId={}, text={}", deviceId, text, e); + recordLog(deviceId, text, orderId, false, e); + } + } + + // ==================== 队列播报(内部方法) ==================== + + /** + * 通过队列播报语音 + */ + private void broadcastWithQueue(Long deviceId, String text, int ttsFlag, Long orderId) { + try { + TtsQueueMessage message = TtsQueueMessage.builder() + .deviceId(deviceId) + .text(text) + .ttsFlag(ttsFlag) + .orderId(orderId) + .priority(ttsFlag == TTS_FLAG_URGENT ? 1 : 5) + .createTime(System.currentTimeMillis()) + .retryCount(0) + .maxRetry(ttsFlag == TTS_FLAG_URGENT ? 3 : 2) + .build(); + + boolean enqueued = ttsQueueConsumer.enqueue(message); + + if (enqueued) { + log.debug("[VoiceBroadcast] 消息入队: deviceId={}, text={}, flag=0x{}, priority={}", + deviceId, text, Integer.toHexString(ttsFlag), message.getPriority()); + } else { + // 队列满或其他原因入队失败,降级为直接播报 + log.warn("[VoiceBroadcast] 入队失败,降级为直接播报: deviceId={}, text={}", deviceId, text); + broadcastDirect(deviceId, text, ttsFlag, orderId); + } + + } catch (Exception e) { + log.error("[VoiceBroadcast] 队列入队异常,降级为直接播报: deviceId={}, text={}", deviceId, text, e); + broadcastDirect(deviceId, text, ttsFlag, orderId); + } + } + + // ==================== 辅助方法 ==================== + + /** + * 记录播报日志 + */ + private void recordLog(Long deviceId, String text, Long orderId, boolean success, Exception e) { + if (success) { if (orderId != null) { eventLogRecorder.info("clean", EventDomain.DEVICE, "TTS_SENT", "语音播报: " + text, orderId, deviceId, null); @@ -132,22 +255,13 @@ public class VoiceBroadcastService { eventLogRecorder.info("clean", EventDomain.DEVICE, "TTS_SENT", "语音播报: " + text, deviceId); } - - } catch (Exception e) { - log.error("[VoiceBroadcast] 播报失败: deviceId={}, text={}", deviceId, text, e); - - // 记录错误日志 + } else { if (orderId != null) { - eventLogRecorder.error("clean", EventDomain.DEVICE, "TTS_FAILED", - "语音播报失败: " + e.getMessage(), deviceId, e); // error方法目前没有支持orderId的重载,或者需要检查一下 - // 检查 EventLogRecorder 接口,error 方法可能有带 orderId 的重载吗? - // 刚才看 EventLogRecorderImpl 好像没有特别全的重载,或者我需要用 record() 方法手动构建 - // 让我先用简单方式,error 日志暂时不强求 orderId,或者手动构建 eventLogRecorder.record(EventLogRecord.builder() .module("clean") .domain(EventDomain.DEVICE) .eventType("TTS_FAILED") - .message("语音播报失败: " + e.getMessage()) + .message("语音播报失败: " + (e != null ? e.getMessage() : "unknown")) .targetId(orderId) .targetType("order") .deviceId(deviceId) @@ -155,47 +269,35 @@ public class VoiceBroadcastService { .build()); } else { eventLogRecorder.error("clean", EventDomain.DEVICE, "TTS_FAILED", - "语音播报失败: " + e.getMessage(), deviceId, e); + "语音播报失败: " + (e != null ? e.getMessage() : "unknown"), deviceId, e); } } } /** - * 播报语音(普通通知) - *

- * 使用 tts_flag=0x08,适用于一般性通知 - * - * @param deviceId 设备ID - * @param text 播报文本 + * 获取指定设备的队列长度 */ - public void broadcastNormal(Long deviceId, String text) { - broadcast(deviceId, text, TTS_FLAG_NORMAL); + public long getQueueSize(Long deviceId) { + if (!queueEnabled) { + return 0; + } + return ttsQueueConsumer.getQueueSize(deviceId); } /** - * 播报语音(紧急通知) - *

- * 使用 tts_flag=0x09,适用于工单确认、到岗提醒等重要通知 - * - * @param deviceId 设备ID - * @param text 播报文本 + * 清空指定设备的播报队列 */ - public void broadcastUrgent(Long deviceId, String text) { - broadcast(deviceId, text, TTS_FLAG_URGENT, null); - } - - public void broadcastUrgent(Long deviceId, String text, Long orderId) { - broadcast(deviceId, text, TTS_FLAG_URGENT, orderId); + public void clearQueue(Long deviceId) { + if (!queueEnabled) { + return; + } + ttsQueueConsumer.clearQueue(deviceId); } /** - * 播报语音(异步) - * - * @param deviceId 设备ID - * @param text 播报文本 + * 检查是否启用队列模式 */ - @Async("ops-task-executor") - public void broadcastAsync(Long deviceId, String text) { - broadcast(deviceId, text); + public boolean isQueueEnabled() { + return queueEnabled; } } diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java index 066c912..1ffef64 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java @@ -83,7 +83,9 @@ public class DispatchEngineImpl implements DispatchEngine { includeParams = true, includeResult = true, result = "#result.success", - params = {"#context.orderId", "#context.businessType", "#context.priority"} + params = {"#context.orderId", "#context.businessType", "#context.priority"}, + targetId = "#context.orderId", + targetType = "order" ) @Transactional(rollbackFor = Exception.class) public DispatchResult dispatch(OrderDispatchContext context) { diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/infrastructure/log/aspect/BusinessLogAspect.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/infrastructure/log/aspect/BusinessLogAspect.java index dac2e44..e4f692c 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/infrastructure/log/aspect/BusinessLogAspect.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/infrastructure/log/aspect/BusinessLogAspect.java @@ -157,6 +157,30 @@ public class BusinessLogAspect { // 提取业务实体ID if (!businessLog.targetId().isEmpty()) { extractId(record, businessLog.targetId(), evalContext, "targetId", true); + } else if (businessLog.scope() == LogScope.ORDER) { + // 当使用旧版注解且 scope 为 ORDER 时,尝试从参数中提取 orderId 作为 targetId + // 常见的参数名:orderId, context.orderId, request.orderId 等 + String[] possibleExpressions = { + "#orderId", + "#context.orderId", + "#request.orderId", + "#param.orderId", + "#ctx.orderId" + }; + + for (String expr : possibleExpressions) { + try { + Expression expression = parser.parseExpression(expr); + Object value = expression.getValue(evalContext); + if (value instanceof Long || value instanceof Integer) { + long longValue = value instanceof Long ? (Long) value : ((Integer) value).longValue(); + record.setTargetId(longValue); + break; + } + } catch (Exception e) { + // 忽略解析错误,尝试下一个表达式 + } + } } // 设置业务实体类型