新增 TTS 队列相关文件

This commit is contained in:
lzh
2026-02-01 02:05:08 +08:00
parent fecaa28bc7
commit 41c4f57f05
3 changed files with 593 additions and 0 deletions

View File

@@ -0,0 +1,53 @@
package com.viewsh.module.ops.environment.job;
import com.viewsh.module.ops.environment.service.voice.TtsQueueConsumer;
import com.viewsh.framework.tenant.core.job.TenantJob;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* TTS 语音播报队列处理 Job
* <p>
* 职责:
* 1. 定时调用 TtsQueueConsumer 处理播报队列
* 2. 记录处理结果
* <p>
* XXL-Job 配置:
* - JobHandler: ttsQueueProcessJob
* - Cron: 0/1 * * * * ? (每 1 秒执行一次)
* - 执行器路由策略:轮询
*
* @author lzh
*/
@Slf4j
@Component
public class TtsQueueProcessJob {
@Resource
private TtsQueueConsumer ttsQueueConsumer;
/**
* 处理 TTS 播报队列
* <p>
* XXL-Job 调度入口
*
* @return 执行结果
*/
@XxlJob("ttsQueueProcessJob")
@TenantJob
public String execute() {
if (!ttsQueueConsumer.isEnabled()) {
return "TTS队列未启用";
}
try {
int processedCount = ttsQueueConsumer.processAllQueues();
return String.format("TTS队列处理完成: 处理 %d 条消息", processedCount);
} catch (Exception e) {
log.error("[TTS队列Job] 处理失败", e);
return String.format("TTS队列处理失败: %s", e.getMessage());
}
}
}

View File

@@ -0,0 +1,390 @@
package com.viewsh.module.ops.environment.service.voice;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.module.iot.api.device.IotDeviceControlApi;
import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeReqDTO;
import com.viewsh.module.ops.infrastructure.log.enumeration.EventDomain;
import com.viewsh.module.ops.infrastructure.log.enumeration.EventLevel;
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord;
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder;
import cn.hutool.core.map.MapUtil;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* TTS 语音播报队列消费者
* <p>
* 职责:
* 1. 管理播报队列(入队、出队、查询)
* 2. 控制播报间隔,防止连续播报被覆盖
* 3. 支持优先级插队
* 4. 失败重试机制
* <p>
* 设计说明:
* - 每个设备独立队列,互不干扰
* - 同一设备两条播报间隔至少 1-2 秒
* - 紧急消息可以插队(优先级高)
* - 过期消息自动丢弃
* - 由 {@link com.viewsh.module.ops.environment.job.TtsQueueProcessJob} 定时调用处理
*
* @author lzh
*/
@Slf4j
@Service
public class TtsQueueConsumer {
private static final String QUEUE_KEY_PREFIX = "ops:tts:queue:";
@Value("${ops.tts.queue.enabled:true}")
private boolean queueEnabled;
@Value("${ops.tts.queue.interval-ms:1500}")
private long broadcastIntervalMs;
@Value("${ops.tts.queue.max-queue-size:50}")
private int maxQueueSize;
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private IotDeviceControlApi iotDeviceControlApi;
@Resource
private EventLogRecorder eventLogRecorder;
@Resource
private ObjectMapper objectMapper;
/**
* 设备播报锁(防止同一设备并发播报)
* key: deviceId, value: 上次播报时间戳
*/
private final Map<Long, Long> deviceBroadcastLock = new ConcurrentHashMap<>();
/**
* 服务是否运行中
*/
private volatile boolean running = true;
/**
* 获取队列 key
*/
public String getQueueKey(Long deviceId) {
return QUEUE_KEY_PREFIX + deviceId;
}
/**
* 将消息加入队列
*
* @param message 消息
* @return 是否加入成功
*/
public boolean enqueue(TtsQueueMessage message) {
if (!queueEnabled || message == null || message.getDeviceId() == null) {
return false;
}
try {
String key = getQueueKey(message.getDeviceId());
// 检查队列长度,防止无限增长
Long size = redisTemplate.opsForList().size(key);
if (size != null && size >= maxQueueSize) {
log.warn("[TTS队列] 队列已满,丢弃消息: deviceId={}, size={}",
message.getDeviceId(), size);
return false;
}
// 序列化消息
String json = objectMapper.writeValueAsString(message);
// 根据优先级决定插入位置
// 优先级 0-2 为紧急,插入队列头部;其他插入尾部
if (message.getPriority() != null && message.getPriority() <= 2) {
redisTemplate.opsForList().leftPush(key, json);
} else {
redisTemplate.opsForList().rightPush(key, json);
}
// 设置过期时间1小时
redisTemplate.expire(key, 1, TimeUnit.HOURS);
log.debug("[TTS队列] 消息入队: deviceId={}, text={}, priority={}",
message.getDeviceId(), message.getText(), message.getPriority());
return true;
} catch (Exception e) {
log.error("[TTS队列] 入队失败: deviceId={}", message.getDeviceId(), e);
return false;
}
}
/**
* 处理单个设备的队列(公开方法,供 Job 调用)
*
* @param queueKey 队列 key
* @return 是否处理了消息
*/
public boolean processSingleQueue(String queueKey) {
// 从 key 中提取 deviceId
String deviceIdStr = queueKey.substring(QUEUE_KEY_PREFIX.length());
Long deviceId;
try {
deviceId = Long.parseLong(deviceIdStr);
} catch (NumberFormatException e) {
log.warn("[TTS队列] 无效的 deviceId: {}", deviceIdStr);
return false;
}
// 检查设备播报锁
long now = System.currentTimeMillis();
Long lastBroadcastTime = deviceBroadcastLock.get(deviceId);
if (lastBroadcastTime != null && now - lastBroadcastTime < broadcastIntervalMs) {
// 距离上次播报时间太短,跳过
return false;
}
// 从队列中取出一条消息
Object messageObj = redisTemplate.opsForList().leftPop(queueKey);
if (messageObj == null) {
// 队列为空
return false;
}
try {
// 反序列化消息
TtsQueueMessage message = parseMessage(messageObj);
if (message == null) {
return false;
}
// 检查消息是否过期
if (message.isExpired()) {
log.debug("[TTS队列] 消息已过期,跳过: deviceId={}, text={}",
deviceId, message.getText());
return false;
}
// 执行播报
boolean success = doBroadcast(message);
if (success) {
// 播报成功,更新锁时间
deviceBroadcastLock.put(deviceId, now);
log.debug("[TTS队列] 播报成功: deviceId={}, text={}",
deviceId, message.getText());
return true;
} else {
// 播报失败,检查是否需要重试
handleFailure(message, queueKey);
return false;
}
} catch (Exception e) {
log.error("[TTS队列] 处理消息失败: deviceId={}", deviceId, e);
return false;
}
}
/**
* 处理所有队列(供 XXL-Job 调用)
*
* @return 处理的消息数量
*/
public int processAllQueues() {
if (!queueEnabled || !running) {
return 0;
}
int processedCount = 0;
try {
// 获取所有队列 key
String pattern = QUEUE_KEY_PREFIX + "*";
List<String> keys = redisTemplate.keys(pattern);
if (keys == null || keys.isEmpty()) {
return 0;
}
// 处理每个队列
for (String key : keys) {
try {
if (processSingleQueue(key)) {
processedCount++;
}
} catch (Exception e) {
log.error("[TTS队列] 处理队列失败: key={}", key, e);
}
}
} catch (Exception e) {
log.error("[TTS队列] 处理队列异常", e);
}
return processedCount;
}
/**
* 解析消息
*/
private TtsQueueMessage parseMessage(Object messageObj) {
try {
String json = messageObj.toString();
return objectMapper.readValue(json, TtsQueueMessage.class);
} catch (Exception e) {
log.error("[TTS队列] 解析消息失败: {}", messageObj, e);
return null;
}
}
/**
* 执行实际的播报操作
*/
private boolean doBroadcast(TtsQueueMessage message) {
try {
IotDeviceServiceInvokeReqDTO reqDTO = new IotDeviceServiceInvokeReqDTO();
reqDTO.setDeviceId(message.getDeviceId());
reqDTO.setIdentifier("TTS");
reqDTO.setParams(MapUtil.<String, Object>builder()
.put("tts_text", message.getText())
.put("tts_flag", message.getTtsFlag())
.build());
iotDeviceControlApi.invokeService(reqDTO);
// 记录日志
if (message.getOrderId() != null) {
eventLogRecorder.info("clean", EventDomain.DEVICE, "TTS_SENT",
"语音播报: " + message.getText(), message.getOrderId(), message.getDeviceId(), null);
}
return true;
} catch (Exception e) {
log.error("[TTS队列] 播报失败: deviceId={}, text={}",
message.getDeviceId(), message.getText(), e);
// 记录错误日志
eventLogRecorder.record(EventLogRecord.builder()
.module("clean")
.domain(EventDomain.DEVICE)
.eventType("TTS_FAILED")
.message("语音播报失败: " + e.getMessage())
.targetId(message.getOrderId())
.targetType("order")
.deviceId(message.getDeviceId())
.level(EventLevel.ERROR)
.build());
return false;
}
}
/**
* 处理播报失败
*/
private void handleFailure(TtsQueueMessage message, String queueKey) {
message.incrementRetry();
if (message.canRetry()) {
// 可以重试,放回队列头部
try {
String json = objectMapper.writeValueAsString(message);
redisTemplate.opsForList().leftPush(queueKey, json);
log.debug("[TTS队列] 重试: deviceId={}, retryCount={}",
message.getDeviceId(), message.getRetryCount());
} catch (Exception e) {
log.error("[TTS队列] 重试入队失败", e);
}
} else {
// 超过最大重试次数,丢弃
log.warn("[TTS队列] 超过最大重试次数,丢弃消息: deviceId={}, text={}",
message.getDeviceId(), message.getText());
}
}
/**
* 获取队列长度
*/
public long getQueueSize(Long deviceId) {
String key = getQueueKey(deviceId);
Long size = redisTemplate.opsForList().size(key);
return size != null ? size : 0;
}
/**
* 清空指定设备的队列
*/
public void clearQueue(Long deviceId) {
String key = getQueueKey(deviceId);
redisTemplate.delete(key);
deviceBroadcastLock.remove(deviceId);
log.info("[TTS队列] 清空队列: deviceId={}", deviceId);
}
/**
* 清空所有队列
*/
public void clearAllQueues() {
String pattern = QUEUE_KEY_PREFIX + "*";
List<String> keys = redisTemplate.keys(pattern);
if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
deviceBroadcastLock.clear();
log.info("[TTS队列] 清空所有队列, count={}", keys.size());
}
}
/**
* 获取所有队列状态
*/
public Map<String, Object> getQueueStatus() {
Map<String, Object> status = new ConcurrentHashMap<>();
String pattern = QUEUE_KEY_PREFIX + "*";
List<String> keys = redisTemplate.keys(pattern);
if (keys != null && !keys.isEmpty()) {
for (String key : keys) {
String deviceIdStr = key.substring(QUEUE_KEY_PREFIX.length());
Long size = redisTemplate.opsForList().size(key);
status.put(deviceIdStr, size != null ? size : 0);
}
status.put("totalDevices", keys.size());
} else {
status.put("totalDevices", 0);
}
status.put("broadcastIntervalMs", broadcastIntervalMs);
status.put("enabled", queueEnabled);
return status;
}
/**
* 检查是否启用队列
*/
public boolean isEnabled() {
return queueEnabled;
}
/**
* 服务关闭时清理资源
*/
@PreDestroy
public void destroy() {
running = false;
log.info("[TTS队列] 消费者已停止");
}
}

View File

@@ -0,0 +1,150 @@
package com.viewsh.module.ops.environment.service.voice;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* TTS 语音播报队列消息
* <p>
* 存储在 Redis List 中,用于异步播报
*
* @author lzh
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TtsQueueMessage implements Serializable {
private static final long serialVersionUID = 1L;
/**
* TTS 播报标志:静默执行
*/
public static final int TTS_FLAG_SILENT = 0x01;
/**
* TTS 播报标志:普通通知
*/
public static final int TTS_FLAG_NORMAL = 0x08;
/**
* TTS 播报标志:紧急通知
*/
public static final int TTS_FLAG_URGENT = 0x09;
/**
* 设备ID
*/
private Long deviceId;
/**
* 播报文本
*/
private String text;
/**
* 播报标志0x01=静默, 0x08=普通, 0x09=紧急)
*/
private Integer ttsFlag;
/**
* 工单ID可选用于日志记录
*/
private Long orderId;
/**
* 消息创建时间戳(毫秒)
*/
private Long createTime;
/**
* 消息优先级数字越小优先级越高0-9
* 默认为 5紧急消息可设为 0-2
*/
private Integer priority;
/**
* 重试次数
*/
private Integer retryCount;
/**
* 最大重试次数
*/
private Integer maxRetry;
/**
* 创建普通消息
*/
public static TtsQueueMessage normal(Long deviceId, String text) {
return TtsQueueMessage.builder()
.deviceId(deviceId)
.text(text)
.ttsFlag(TTS_FLAG_NORMAL)
.priority(5)
.createTime(System.currentTimeMillis())
.retryCount(0)
.maxRetry(2)
.build();
}
/**
* 创建紧急消息
*/
public static TtsQueueMessage urgent(Long deviceId, String text) {
return TtsQueueMessage.builder()
.deviceId(deviceId)
.text(text)
.ttsFlag(TTS_FLAG_URGENT)
.priority(1)
.createTime(System.currentTimeMillis())
.retryCount(0)
.maxRetry(3)
.build();
}
/**
* 创建带工单ID的消息
*/
public static TtsQueueMessage withOrder(Long deviceId, String text, int ttsFlag, Long orderId) {
return TtsQueueMessage.builder()
.deviceId(deviceId)
.text(text)
.ttsFlag(ttsFlag)
.orderId(orderId)
.priority(ttsFlag == TTS_FLAG_URGENT ? 1 : 5)
.createTime(System.currentTimeMillis())
.retryCount(0)
.maxRetry(ttsFlag == TTS_FLAG_URGENT ? 3 : 2)
.build();
}
/**
* 增加重试次数
*/
public void incrementRetry() {
this.retryCount = (this.retryCount == null ? 0 : this.retryCount) + 1;
}
/**
* 检查是否可以重试
*/
public boolean canRetry() {
return retryCount == null || retryCount < maxRetry;
}
/**
* 检查消息是否过期超过30秒认为过期
*/
public boolean isExpired() {
if (createTime == null) {
return false;
}
return System.currentTimeMillis() - createTime > 30_000;
}
}