diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/TtsQueueProcessJob.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/TtsQueueProcessJob.java new file mode 100644 index 0000000..a11dc26 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/TtsQueueProcessJob.java @@ -0,0 +1,53 @@ +package com.viewsh.module.ops.environment.job; + +import com.viewsh.module.ops.environment.service.voice.TtsQueueConsumer; +import com.viewsh.framework.tenant.core.job.TenantJob; +import com.xxl.job.core.handler.annotation.XxlJob; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * TTS 语音播报队列处理 Job + *

+ * 职责: + * 1. 定时调用 TtsQueueConsumer 处理播报队列 + * 2. 记录处理结果 + *

+ * XXL-Job 配置: + * - JobHandler: ttsQueueProcessJob + * - Cron: 0/1 * * * * ? (每 1 秒执行一次) + * - 执行器路由策略:轮询 + * + * @author lzh + */ +@Slf4j +@Component +public class TtsQueueProcessJob { + + @Resource + private TtsQueueConsumer ttsQueueConsumer; + + /** + * 处理 TTS 播报队列 + *

+ * XXL-Job 调度入口 + * + * @return 执行结果 + */ + @XxlJob("ttsQueueProcessJob") + @TenantJob + public String execute() { + if (!ttsQueueConsumer.isEnabled()) { + return "TTS队列未启用"; + } + + try { + int processedCount = ttsQueueConsumer.processAllQueues(); + return String.format("TTS队列处理完成: 处理 %d 条消息", processedCount); + } catch (Exception e) { + log.error("[TTS队列Job] 处理失败", e); + return String.format("TTS队列处理失败: %s", e.getMessage()); + } + } +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java new file mode 100644 index 0000000..90c1886 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueConsumer.java @@ -0,0 +1,390 @@ +package com.viewsh.module.ops.environment.service.voice; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.iot.api.device.IotDeviceControlApi; +import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeReqDTO; +import com.viewsh.module.ops.infrastructure.log.enumeration.EventDomain; +import com.viewsh.module.ops.infrastructure.log.enumeration.EventLevel; +import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord; +import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder; +import cn.hutool.core.map.MapUtil; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * TTS 语音播报队列消费者 + *

+ * 职责: + * 1. 管理播报队列(入队、出队、查询) + * 2. 控制播报间隔,防止连续播报被覆盖 + * 3. 支持优先级插队 + * 4. 失败重试机制 + *

+ * 设计说明: + * - 每个设备独立队列,互不干扰 + * - 同一设备两条播报间隔至少 1-2 秒 + * - 紧急消息可以插队(优先级高) + * - 过期消息自动丢弃 + * - 由 {@link com.viewsh.module.ops.environment.job.TtsQueueProcessJob} 定时调用处理 + * + * @author lzh + */ +@Slf4j +@Service +public class TtsQueueConsumer { + + private static final String QUEUE_KEY_PREFIX = "ops:tts:queue:"; + + @Value("${ops.tts.queue.enabled:true}") + private boolean queueEnabled; + + @Value("${ops.tts.queue.interval-ms:1500}") + private long broadcastIntervalMs; + + @Value("${ops.tts.queue.max-queue-size:50}") + private int maxQueueSize; + + @Resource + private RedisTemplate redisTemplate; + + @Resource + private IotDeviceControlApi iotDeviceControlApi; + + @Resource + private EventLogRecorder eventLogRecorder; + + @Resource + private ObjectMapper objectMapper; + + /** + * 设备播报锁(防止同一设备并发播报) + * key: deviceId, value: 上次播报时间戳 + */ + private final Map deviceBroadcastLock = new ConcurrentHashMap<>(); + + /** + * 服务是否运行中 + */ + private volatile boolean running = true; + + /** + * 获取队列 key + */ + public String getQueueKey(Long deviceId) { + return QUEUE_KEY_PREFIX + deviceId; + } + + /** + * 将消息加入队列 + * + * @param message 消息 + * @return 是否加入成功 + */ + public boolean enqueue(TtsQueueMessage message) { + if (!queueEnabled || message == null || message.getDeviceId() == null) { + return false; + } + + try { + String key = getQueueKey(message.getDeviceId()); + + // 检查队列长度,防止无限增长 + Long size = redisTemplate.opsForList().size(key); + if (size != null && size >= maxQueueSize) { + log.warn("[TTS队列] 队列已满,丢弃消息: deviceId={}, size={}", + message.getDeviceId(), size); + return false; + } + + // 序列化消息 + String json = objectMapper.writeValueAsString(message); + + // 根据优先级决定插入位置 + // 优先级 0-2 为紧急,插入队列头部;其他插入尾部 + if (message.getPriority() != null && message.getPriority() <= 2) { + redisTemplate.opsForList().leftPush(key, json); + } else { + redisTemplate.opsForList().rightPush(key, json); + } + + // 设置过期时间(1小时) + redisTemplate.expire(key, 1, TimeUnit.HOURS); + + log.debug("[TTS队列] 消息入队: deviceId={}, text={}, priority={}", + message.getDeviceId(), message.getText(), message.getPriority()); + + return true; + + } catch (Exception e) { + log.error("[TTS队列] 入队失败: deviceId={}", message.getDeviceId(), e); + return false; + } + } + + /** + * 处理单个设备的队列(公开方法,供 Job 调用) + * + * @param queueKey 队列 key + * @return 是否处理了消息 + */ + public boolean processSingleQueue(String queueKey) { + // 从 key 中提取 deviceId + String deviceIdStr = queueKey.substring(QUEUE_KEY_PREFIX.length()); + Long deviceId; + try { + deviceId = Long.parseLong(deviceIdStr); + } catch (NumberFormatException e) { + log.warn("[TTS队列] 无效的 deviceId: {}", deviceIdStr); + return false; + } + + // 检查设备播报锁 + long now = System.currentTimeMillis(); + Long lastBroadcastTime = deviceBroadcastLock.get(deviceId); + if (lastBroadcastTime != null && now - lastBroadcastTime < broadcastIntervalMs) { + // 距离上次播报时间太短,跳过 + return false; + } + + // 从队列中取出一条消息 + Object messageObj = redisTemplate.opsForList().leftPop(queueKey); + if (messageObj == null) { + // 队列为空 + return false; + } + + try { + // 反序列化消息 + TtsQueueMessage message = parseMessage(messageObj); + if (message == null) { + return false; + } + + // 检查消息是否过期 + if (message.isExpired()) { + log.debug("[TTS队列] 消息已过期,跳过: deviceId={}, text={}", + deviceId, message.getText()); + return false; + } + + // 执行播报 + boolean success = doBroadcast(message); + + if (success) { + // 播报成功,更新锁时间 + deviceBroadcastLock.put(deviceId, now); + log.debug("[TTS队列] 播报成功: deviceId={}, text={}", + deviceId, message.getText()); + return true; + } else { + // 播报失败,检查是否需要重试 + handleFailure(message, queueKey); + return false; + } + + } catch (Exception e) { + log.error("[TTS队列] 处理消息失败: deviceId={}", deviceId, e); + return false; + } + } + + /** + * 处理所有队列(供 XXL-Job 调用) + * + * @return 处理的消息数量 + */ + public int processAllQueues() { + if (!queueEnabled || !running) { + return 0; + } + + int processedCount = 0; + + try { + // 获取所有队列 key + String pattern = QUEUE_KEY_PREFIX + "*"; + List keys = redisTemplate.keys(pattern); + + if (keys == null || keys.isEmpty()) { + return 0; + } + + // 处理每个队列 + for (String key : keys) { + try { + if (processSingleQueue(key)) { + processedCount++; + } + } catch (Exception e) { + log.error("[TTS队列] 处理队列失败: key={}", key, e); + } + } + + } catch (Exception e) { + log.error("[TTS队列] 处理队列异常", e); + } + + return processedCount; + } + + /** + * 解析消息 + */ + private TtsQueueMessage parseMessage(Object messageObj) { + try { + String json = messageObj.toString(); + return objectMapper.readValue(json, TtsQueueMessage.class); + } catch (Exception e) { + log.error("[TTS队列] 解析消息失败: {}", messageObj, e); + return null; + } + } + + /** + * 执行实际的播报操作 + */ + private boolean doBroadcast(TtsQueueMessage message) { + try { + IotDeviceServiceInvokeReqDTO reqDTO = new IotDeviceServiceInvokeReqDTO(); + reqDTO.setDeviceId(message.getDeviceId()); + reqDTO.setIdentifier("TTS"); + reqDTO.setParams(MapUtil.builder() + .put("tts_text", message.getText()) + .put("tts_flag", message.getTtsFlag()) + .build()); + + iotDeviceControlApi.invokeService(reqDTO); + + // 记录日志 + if (message.getOrderId() != null) { + eventLogRecorder.info("clean", EventDomain.DEVICE, "TTS_SENT", + "语音播报: " + message.getText(), message.getOrderId(), message.getDeviceId(), null); + } + + return true; + + } catch (Exception e) { + log.error("[TTS队列] 播报失败: deviceId={}, text={}", + message.getDeviceId(), message.getText(), e); + + // 记录错误日志 + eventLogRecorder.record(EventLogRecord.builder() + .module("clean") + .domain(EventDomain.DEVICE) + .eventType("TTS_FAILED") + .message("语音播报失败: " + e.getMessage()) + .targetId(message.getOrderId()) + .targetType("order") + .deviceId(message.getDeviceId()) + .level(EventLevel.ERROR) + .build()); + + return false; + } + } + + /** + * 处理播报失败 + */ + private void handleFailure(TtsQueueMessage message, String queueKey) { + message.incrementRetry(); + + if (message.canRetry()) { + // 可以重试,放回队列头部 + try { + String json = objectMapper.writeValueAsString(message); + redisTemplate.opsForList().leftPush(queueKey, json); + log.debug("[TTS队列] 重试: deviceId={}, retryCount={}", + message.getDeviceId(), message.getRetryCount()); + } catch (Exception e) { + log.error("[TTS队列] 重试入队失败", e); + } + } else { + // 超过最大重试次数,丢弃 + log.warn("[TTS队列] 超过最大重试次数,丢弃消息: deviceId={}, text={}", + message.getDeviceId(), message.getText()); + } + } + + /** + * 获取队列长度 + */ + public long getQueueSize(Long deviceId) { + String key = getQueueKey(deviceId); + Long size = redisTemplate.opsForList().size(key); + return size != null ? size : 0; + } + + /** + * 清空指定设备的队列 + */ + public void clearQueue(Long deviceId) { + String key = getQueueKey(deviceId); + redisTemplate.delete(key); + deviceBroadcastLock.remove(deviceId); + log.info("[TTS队列] 清空队列: deviceId={}", deviceId); + } + + /** + * 清空所有队列 + */ + public void clearAllQueues() { + String pattern = QUEUE_KEY_PREFIX + "*"; + List keys = redisTemplate.keys(pattern); + if (keys != null && !keys.isEmpty()) { + redisTemplate.delete(keys); + deviceBroadcastLock.clear(); + log.info("[TTS队列] 清空所有队列, count={}", keys.size()); + } + } + + /** + * 获取所有队列状态 + */ + public Map getQueueStatus() { + Map status = new ConcurrentHashMap<>(); + + String pattern = QUEUE_KEY_PREFIX + "*"; + List keys = redisTemplate.keys(pattern); + + if (keys != null && !keys.isEmpty()) { + for (String key : keys) { + String deviceIdStr = key.substring(QUEUE_KEY_PREFIX.length()); + Long size = redisTemplate.opsForList().size(key); + status.put(deviceIdStr, size != null ? size : 0); + } + status.put("totalDevices", keys.size()); + } else { + status.put("totalDevices", 0); + } + + status.put("broadcastIntervalMs", broadcastIntervalMs); + status.put("enabled", queueEnabled); + + return status; + } + + /** + * 检查是否启用队列 + */ + public boolean isEnabled() { + return queueEnabled; + } + + /** + * 服务关闭时清理资源 + */ + @PreDestroy + public void destroy() { + running = false; + log.info("[TTS队列] 消费者已停止"); + } +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueMessage.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueMessage.java new file mode 100644 index 0000000..cb142ec --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/TtsQueueMessage.java @@ -0,0 +1,150 @@ +package com.viewsh.module.ops.environment.service.voice; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * TTS 语音播报队列消息 + *

+ * 存储在 Redis List 中,用于异步播报 + * + * @author lzh + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TtsQueueMessage implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * TTS 播报标志:静默执行 + */ + public static final int TTS_FLAG_SILENT = 0x01; + + /** + * TTS 播报标志:普通通知 + */ + public static final int TTS_FLAG_NORMAL = 0x08; + + /** + * TTS 播报标志:紧急通知 + */ + public static final int TTS_FLAG_URGENT = 0x09; + + /** + * 设备ID + */ + private Long deviceId; + + /** + * 播报文本 + */ + private String text; + + /** + * 播报标志(0x01=静默, 0x08=普通, 0x09=紧急) + */ + private Integer ttsFlag; + + /** + * 工单ID(可选,用于日志记录) + */ + private Long orderId; + + /** + * 消息创建时间戳(毫秒) + */ + private Long createTime; + + /** + * 消息优先级(数字越小优先级越高,0-9) + * 默认为 5,紧急消息可设为 0-2 + */ + private Integer priority; + + /** + * 重试次数 + */ + private Integer retryCount; + + /** + * 最大重试次数 + */ + private Integer maxRetry; + + /** + * 创建普通消息 + */ + public static TtsQueueMessage normal(Long deviceId, String text) { + return TtsQueueMessage.builder() + .deviceId(deviceId) + .text(text) + .ttsFlag(TTS_FLAG_NORMAL) + .priority(5) + .createTime(System.currentTimeMillis()) + .retryCount(0) + .maxRetry(2) + .build(); + } + + /** + * 创建紧急消息 + */ + public static TtsQueueMessage urgent(Long deviceId, String text) { + return TtsQueueMessage.builder() + .deviceId(deviceId) + .text(text) + .ttsFlag(TTS_FLAG_URGENT) + .priority(1) + .createTime(System.currentTimeMillis()) + .retryCount(0) + .maxRetry(3) + .build(); + } + + /** + * 创建带工单ID的消息 + */ + public static TtsQueueMessage withOrder(Long deviceId, String text, int ttsFlag, Long orderId) { + return TtsQueueMessage.builder() + .deviceId(deviceId) + .text(text) + .ttsFlag(ttsFlag) + .orderId(orderId) + .priority(ttsFlag == TTS_FLAG_URGENT ? 1 : 5) + .createTime(System.currentTimeMillis()) + .retryCount(0) + .maxRetry(ttsFlag == TTS_FLAG_URGENT ? 3 : 2) + .build(); + } + + /** + * 增加重试次数 + */ + public void incrementRetry() { + this.retryCount = (this.retryCount == null ? 0 : this.retryCount) + 1; + } + + /** + * 检查是否可以重试 + */ + public boolean canRetry() { + return retryCount == null || retryCount < maxRetry; + } + + /** + * 检查消息是否过期(超过30秒认为过期) + */ + public boolean isExpired() { + if (createTime == null) { + return false; + } + return System.currentTimeMillis() - createTime > 30_000; + } +}