chore: 【ops】语音播报去重简易实现

This commit is contained in:
lzh
2026-01-09 17:42:46 +08:00
parent 6be3e5b0e5
commit cd8d38149c

View File

@@ -0,0 +1,228 @@
package com.viewsh.module.ops.environment.service.voice;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* 语音播报去重服务
*
* 功能:
* 1. 短时间内多个工单入队,合并播报
* 2. 使用计数器累积播报请求
* 3. 定时任务定期合并播报
*
* @author lzh
*/
@Slf4j
@Service
public class VoiceBroadcastDeduplicationService {
@Resource
private RedisTemplate<String, Object> redisTemplate;
/**
* 本地缓存存储待合并的播报请求保洁员ID -> 新增工单数量)
* Key: cleanerId
* Value: 新增工单数量
*/
private final Map<Long, Integer> pendingBroadcasts = new ConcurrentHashMap<>();
/**
* 本地缓存存储最后播报时间保洁员ID -> 最后播报时间)
*/
private final Map<Long, LocalDateTime> lastBroadcastTime = new ConcurrentHashMap<>();
/**
* 播报合并时间窗口(秒)
* 在此时间窗口内的多个播报请求将被合并
*/
private static final int BROADCAST_WINDOW_SECONDS = 5;
/**
* Redis Key 前缀
*/
private static final String REDIS_KEY_PREFIX = "ops:voice:pending:";
private static final String REDIS_LAST_BROADCAST_PREFIX = "ops:voice:last:";
/**
* 记录播报请求(不立即播报,而是累积)
*
* @param cleanerId 保洁员ID
* @param orderCount 新增工单数量通常为1
*/
public void recordBroadcastRequest(Long cleanerId, int orderCount) {
// 1. 累积到本地缓存
pendingBroadcasts.merge(cleanerId, orderCount, Integer::sum);
// 2. 同步到 Redis支持分布式环境
String redisKey = REDIS_KEY_PREFIX + cleanerId;
redisTemplate.opsForValue().increment(redisKey, orderCount);
redisTemplate.expire(redisKey, BROADCAST_WINDOW_SECONDS + 2, TimeUnit.SECONDS);
log.debug("记录播报请求: cleanerId={}, orderCount={}, totalPending={}",
cleanerId, orderCount, pendingBroadcasts.getOrDefault(cleanerId, 0));
}
/**
* 记录播报请求并立即播报
* 如果距离上次播报时间过短,则合并播报
*
* @param cleanerId 保洁员ID
* @param orderCount 新增工单数量
* @param immediate 是否立即播报
*/
public void recordAndBroadcast(Long cleanerId, int orderCount, boolean immediate) {
recordBroadcastRequest(cleanerId, orderCount);
if (immediate) {
// 检查是否应该立即播报(距离上次播报超过时间窗口)
LocalDateTime lastTime = lastBroadcastTime.get(cleanerId);
if (lastTime == null ||
lastTime.plusSeconds(BROADCAST_WINDOW_SECONDS).isBefore(LocalDateTime.now())) {
// 立即播报
flushBroadcast(cleanerId);
}
// 否则等待定时任务合并播报
}
}
/**
* 立即播报(合并累积的请求)
*
* @param cleanerId 保洁员ID
*/
public void flushBroadcast(Long cleanerId) {
// 1. 获取累积的工单数量
Integer totalOrders = pendingBroadcasts.get(cleanerId);
if (totalOrders == null || totalOrders == 0) {
// 从 Redis 获取
String redisKey = REDIS_KEY_PREFIX + cleanerId;
Object value = redisTemplate.opsForValue().get(redisKey);
if (value instanceof Integer) {
totalOrders = (Integer) value;
} else {
return; // 没有待播报的请求
}
}
if (totalOrders <= 0) {
return;
}
// 2. 计算总待办数量(当前队列中的工单数)
// TODO: 调用 OrderQueueService 获取队列中的工单数量
// int totalQueueCount = orderQueueService.countByUserId(cleanerId);
int totalQueueCount = totalOrders; // 临时使用 totalOrders
// 3. 生成播报内容
String message = String.format("新增%d项待办您共有%d个待办工单",
totalOrders, totalQueueCount);
// 4. 调用 IoT 设备服务播报
// TODO: iotDeviceService.playVoice(cleanerId, message);
// 5. 清除累积的请求
pendingBroadcasts.remove(cleanerId);
String redisKey = REDIS_KEY_PREFIX + cleanerId;
redisTemplate.delete(redisKey);
// 6. 更新最后播报时间
lastBroadcastTime.put(cleanerId, LocalDateTime.now());
String lastBroadcastKey = REDIS_LAST_BROADCAST_PREFIX + cleanerId;
redisTemplate.opsForValue().set(lastBroadcastKey,
LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME),
1, TimeUnit.HOURS);
log.info("语音播报已发送: cleanerId={}, newOrders={}, totalQueue={}, message={}",
cleanerId, totalOrders, totalQueueCount, message);
}
/**
* 定时任务定期合并播报每5秒执行一次
* 将累积的播报请求合并后发送
*/
@Scheduled(fixedDelay = BROADCAST_WINDOW_SECONDS * 1000)
public void scheduledFlush() {
if (pendingBroadcasts.isEmpty()) {
return;
}
log.debug("定时合并播报开始: pendingCount={}", pendingBroadcasts.size());
// 复制一份,避免并发修改
Map<Long, Integer> toFlush = new HashMap<>(pendingBroadcasts);
for (Map.Entry<Long, Integer> entry : toFlush.entrySet()) {
Long cleanerId = entry.getKey();
try {
flushBroadcast(cleanerId);
} catch (Exception e) {
log.error("定时播报失败: cleanerId={}", cleanerId, e);
}
}
log.debug("定时合并播报完成");
}
/**
* 清理过期的播报记录(每小时执行一次)
*/
@Scheduled(fixedDelay = 3600 * 1000)
public void cleanupExpiredRecords() {
LocalDateTime now = LocalDateTime.now();
lastBroadcastTime.entrySet().removeIf(entry ->
entry.getValue().plusHours(2).isBefore(now));
log.debug("清理过期播报记录完成");
}
/**
* 获取待播报的工单数量
*
* @param cleanerId 保洁员ID
* @return 待播报的工单数量
*/
public int getPendingOrders(Long cleanerId) {
Integer count = pendingBroadcasts.getOrDefault(cleanerId, 0);
if (count == 0) {
String redisKey = REDIS_KEY_PREFIX + cleanerId;
Object value = redisTemplate.opsForValue().get(redisKey);
if (value instanceof Integer) {
count = (Integer) value;
}
}
return count;
}
/**
* 检查是否可以立即播报
*
* @param cleanerId 保洁员ID
* @return true 如果距离上次播报超过时间窗口
*/
public boolean canBroadcastImmediately(Long cleanerId) {
LocalDateTime lastTime = lastBroadcastTime.get(cleanerId);
if (lastTime == null) {
// 从 Redis 获取
String lastBroadcastKey = REDIS_LAST_BROADCAST_PREFIX + cleanerId;
Object value = redisTemplate.opsForValue().get(lastBroadcastKey);
if (value instanceof String) {
lastTime = LocalDateTime.parse((String) value, DateTimeFormatter.ISO_LOCAL_DATE_TIME);
lastBroadcastTime.put(cleanerId, lastTime);
} else {
return true; // 没有播报记录,可以立即播报
}
}
return lastTime.plusSeconds(BROADCAST_WINDOW_SECONDS).isBefore(LocalDateTime.now());
}
}