refactor(ops): 重构语音播报服务为统一入口
- 删除 VoiceBroadcastDeduplicationService(去重服务) - 新增 VoiceBroadcastService 作为 TTS 统一入口 - broadcast(deviceId, text): 同步播报 - broadcast(deviceId, text, volume): 带音量播报 - broadcastAsync(): 异步播报 - 简化设计:接受 deviceId 参数,不实现复杂去重逻辑 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,228 +0,0 @@
|
||||
package com.viewsh.module.ops.environment.service.voice;
|
||||
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 语音播报去重服务
|
||||
*
|
||||
* 功能:
|
||||
* 1. 短时间内多个工单入队,合并播报
|
||||
* 2. 使用计数器累积播报请求
|
||||
* 3. 定时任务定期合并播报
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class VoiceBroadcastDeduplicationService {
|
||||
|
||||
@Resource
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
/**
|
||||
* 本地缓存:存储待合并的播报请求(保洁员ID -> 新增工单数量)
|
||||
* Key: cleanerId
|
||||
* Value: 新增工单数量
|
||||
*/
|
||||
private final Map<Long, Integer> pendingBroadcasts = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 本地缓存:存储最后播报时间(保洁员ID -> 最后播报时间)
|
||||
*/
|
||||
private final Map<Long, LocalDateTime> lastBroadcastTime = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 播报合并时间窗口(秒)
|
||||
* 在此时间窗口内的多个播报请求将被合并
|
||||
*/
|
||||
private static final int BROADCAST_WINDOW_SECONDS = 5;
|
||||
|
||||
/**
|
||||
* Redis Key 前缀
|
||||
*/
|
||||
private static final String REDIS_KEY_PREFIX = "ops:voice:pending:";
|
||||
private static final String REDIS_LAST_BROADCAST_PREFIX = "ops:voice:last:";
|
||||
|
||||
/**
|
||||
* 记录播报请求(不立即播报,而是累积)
|
||||
*
|
||||
* @param cleanerId 保洁员ID
|
||||
* @param orderCount 新增工单数量(通常为1)
|
||||
*/
|
||||
public void recordBroadcastRequest(Long cleanerId, int orderCount) {
|
||||
// 1. 累积到本地缓存
|
||||
pendingBroadcasts.merge(cleanerId, orderCount, Integer::sum);
|
||||
|
||||
// 2. 同步到 Redis(支持分布式环境)
|
||||
String redisKey = REDIS_KEY_PREFIX + cleanerId;
|
||||
redisTemplate.opsForValue().increment(redisKey, orderCount);
|
||||
redisTemplate.expire(redisKey, BROADCAST_WINDOW_SECONDS + 2, TimeUnit.SECONDS);
|
||||
|
||||
log.debug("记录播报请求: cleanerId={}, orderCount={}, totalPending={}",
|
||||
cleanerId, orderCount, pendingBroadcasts.getOrDefault(cleanerId, 0));
|
||||
}
|
||||
|
||||
/**
|
||||
* 记录播报请求并立即播报
|
||||
* 如果距离上次播报时间过短,则合并播报
|
||||
*
|
||||
* @param cleanerId 保洁员ID
|
||||
* @param orderCount 新增工单数量
|
||||
* @param immediate 是否立即播报
|
||||
*/
|
||||
public void recordAndBroadcast(Long cleanerId, int orderCount, boolean immediate) {
|
||||
recordBroadcastRequest(cleanerId, orderCount);
|
||||
|
||||
if (immediate) {
|
||||
// 检查是否应该立即播报(距离上次播报超过时间窗口)
|
||||
LocalDateTime lastTime = lastBroadcastTime.get(cleanerId);
|
||||
if (lastTime == null ||
|
||||
lastTime.plusSeconds(BROADCAST_WINDOW_SECONDS).isBefore(LocalDateTime.now())) {
|
||||
// 立即播报
|
||||
flushBroadcast(cleanerId);
|
||||
}
|
||||
// 否则等待定时任务合并播报
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 立即播报(合并累积的请求)
|
||||
*
|
||||
* @param cleanerId 保洁员ID
|
||||
*/
|
||||
public void flushBroadcast(Long cleanerId) {
|
||||
// 1. 获取累积的工单数量
|
||||
Integer totalOrders = pendingBroadcasts.get(cleanerId);
|
||||
if (totalOrders == null || totalOrders == 0) {
|
||||
// 从 Redis 获取
|
||||
String redisKey = REDIS_KEY_PREFIX + cleanerId;
|
||||
Object value = redisTemplate.opsForValue().get(redisKey);
|
||||
if (value instanceof Integer) {
|
||||
totalOrders = (Integer) value;
|
||||
} else {
|
||||
return; // 没有待播报的请求
|
||||
}
|
||||
}
|
||||
|
||||
if (totalOrders <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 计算总待办数量(当前队列中的工单数)
|
||||
// TODO: 调用 OrderQueueService 获取队列中的工单数量
|
||||
// int totalQueueCount = orderQueueService.countByUserId(cleanerId);
|
||||
int totalQueueCount = totalOrders; // 临时使用 totalOrders
|
||||
|
||||
// 3. 生成播报内容
|
||||
String message = String.format("新增%d项待办,您共有%d个待办工单",
|
||||
totalOrders, totalQueueCount);
|
||||
|
||||
// 4. 调用 IoT 设备服务播报
|
||||
// TODO: iotDeviceService.playVoice(cleanerId, message);
|
||||
|
||||
// 5. 清除累积的请求
|
||||
pendingBroadcasts.remove(cleanerId);
|
||||
String redisKey = REDIS_KEY_PREFIX + cleanerId;
|
||||
redisTemplate.delete(redisKey);
|
||||
|
||||
// 6. 更新最后播报时间
|
||||
lastBroadcastTime.put(cleanerId, LocalDateTime.now());
|
||||
String lastBroadcastKey = REDIS_LAST_BROADCAST_PREFIX + cleanerId;
|
||||
redisTemplate.opsForValue().set(lastBroadcastKey,
|
||||
LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME),
|
||||
1, TimeUnit.HOURS);
|
||||
|
||||
log.info("语音播报已发送: cleanerId={}, newOrders={}, totalQueue={}, message={}",
|
||||
cleanerId, totalOrders, totalQueueCount, message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 定时任务:定期合并播报(每5秒执行一次)
|
||||
* 将累积的播报请求合并后发送
|
||||
*/
|
||||
@Scheduled(fixedDelay = BROADCAST_WINDOW_SECONDS * 1000)
|
||||
public void scheduledFlush() {
|
||||
if (pendingBroadcasts.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug("定时合并播报开始: pendingCount={}", pendingBroadcasts.size());
|
||||
|
||||
// 复制一份,避免并发修改
|
||||
Map<Long, Integer> toFlush = new HashMap<>(pendingBroadcasts);
|
||||
|
||||
for (Map.Entry<Long, Integer> entry : toFlush.entrySet()) {
|
||||
Long cleanerId = entry.getKey();
|
||||
try {
|
||||
flushBroadcast(cleanerId);
|
||||
} catch (Exception e) {
|
||||
log.error("定时播报失败: cleanerId={}", cleanerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
log.debug("定时合并播报完成");
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理过期的播报记录(每小时执行一次)
|
||||
*/
|
||||
@Scheduled(fixedDelay = 3600 * 1000)
|
||||
public void cleanupExpiredRecords() {
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
lastBroadcastTime.entrySet().removeIf(entry ->
|
||||
entry.getValue().plusHours(2).isBefore(now));
|
||||
|
||||
log.debug("清理过期播报记录完成");
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取待播报的工单数量
|
||||
*
|
||||
* @param cleanerId 保洁员ID
|
||||
* @return 待播报的工单数量
|
||||
*/
|
||||
public int getPendingOrders(Long cleanerId) {
|
||||
Integer count = pendingBroadcasts.getOrDefault(cleanerId, 0);
|
||||
if (count == 0) {
|
||||
String redisKey = REDIS_KEY_PREFIX + cleanerId;
|
||||
Object value = redisTemplate.opsForValue().get(redisKey);
|
||||
if (value instanceof Integer) {
|
||||
count = (Integer) value;
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查是否可以立即播报
|
||||
*
|
||||
* @param cleanerId 保洁员ID
|
||||
* @return true 如果距离上次播报超过时间窗口
|
||||
*/
|
||||
public boolean canBroadcastImmediately(Long cleanerId) {
|
||||
LocalDateTime lastTime = lastBroadcastTime.get(cleanerId);
|
||||
if (lastTime == null) {
|
||||
// 从 Redis 获取
|
||||
String lastBroadcastKey = REDIS_LAST_BROADCAST_PREFIX + cleanerId;
|
||||
Object value = redisTemplate.opsForValue().get(lastBroadcastKey);
|
||||
if (value instanceof String) {
|
||||
lastTime = LocalDateTime.parse((String) value, DateTimeFormatter.ISO_LOCAL_DATE_TIME);
|
||||
lastBroadcastTime.put(cleanerId, lastTime);
|
||||
} else {
|
||||
return true; // 没有播报记录,可以立即播报
|
||||
}
|
||||
}
|
||||
|
||||
return lastTime.plusSeconds(BROADCAST_WINDOW_SECONDS).isBefore(LocalDateTime.now());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
package com.viewsh.module.ops.environment.service.voice;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import com.viewsh.module.iot.api.device.IotDeviceControlApi;
|
||||
import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeReqDTO;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* 语音播报服务(统一入口)
|
||||
* <p>
|
||||
* 职责:
|
||||
* 1. 统一所有 TTS 下发入口
|
||||
* 2. 提供同步/异步播报接口
|
||||
* 3. 管理播报音量参数
|
||||
* <p>
|
||||
* 设计原则:
|
||||
* - 接受 deviceId 参数(而非 cleanerId)
|
||||
* - 简单可靠,不实现复杂的去重逻辑,由调用方控制
|
||||
* - 直接调用 IoT 设备控制接口
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class VoiceBroadcastService {
|
||||
|
||||
@Resource
|
||||
private IotDeviceControlApi iotDeviceControlApi;
|
||||
|
||||
/**
|
||||
* 播报语音(同步)
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param text 播报文本
|
||||
*/
|
||||
public void broadcast(Long deviceId, String text) {
|
||||
broadcast(deviceId, text, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* 播报语音(带音量)
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param text 播报文本
|
||||
* @param volume 音量(0-100)
|
||||
*/
|
||||
public void broadcast(Long deviceId, String text, Integer volume) {
|
||||
if (deviceId == null || text == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
IotDeviceServiceInvokeReqDTO reqDTO = new IotDeviceServiceInvokeReqDTO();
|
||||
reqDTO.setDeviceId(deviceId);
|
||||
reqDTO.setIdentifier("tts");
|
||||
reqDTO.setParams(MapUtil.<String, Object>builder()
|
||||
.put("text", text)
|
||||
.put("playMode", 1) // 立即播报
|
||||
.put("volume", volume != null ? volume : 50)
|
||||
.build());
|
||||
|
||||
iotDeviceControlApi.invokeService(reqDTO);
|
||||
log.debug("[VoiceBroadcast] 播报成功: deviceId={}, text={}", deviceId, text);
|
||||
} catch (Exception e) {
|
||||
log.error("[VoiceBroadcast] 播报失败: deviceId={}, text={}", deviceId, text, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 播报语音(异步)
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param text 播报文本
|
||||
*/
|
||||
@Async("ops-task-executor")
|
||||
public void broadcastAsync(Long deviceId, String text) {
|
||||
broadcast(deviceId, text);
|
||||
}
|
||||
|
||||
/**
|
||||
* 播报语音(异步,带音量)
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param text 播报文本
|
||||
* @param volume 音量(0-100)
|
||||
*/
|
||||
@Async("ops-task-executor")
|
||||
public void broadcastAsync(Long deviceId, String text, Integer volume) {
|
||||
broadcast(deviceId, text, volume);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user