From cd8d38149c00035d961687e90fe97db30a35b203 Mon Sep 17 00:00:00 2001 From: lzh Date: Fri, 9 Jan 2026 17:42:46 +0800 Subject: [PATCH] =?UTF-8?q?chore:=20=E3=80=90ops=E3=80=91=E8=AF=AD?= =?UTF-8?q?=E9=9F=B3=E6=92=AD=E6=8A=A5=E5=8E=BB=E9=87=8D=E7=AE=80=E6=98=93?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../VoiceBroadcastDeduplicationService.java | 228 ++++++++++++++++++ 1 file changed, 228 insertions(+) create mode 100644 viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/VoiceBroadcastDeduplicationService.java diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/VoiceBroadcastDeduplicationService.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/VoiceBroadcastDeduplicationService.java new file mode 100644 index 0000000..1636dff --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/voice/VoiceBroadcastDeduplicationService.java @@ -0,0 +1,228 @@ +package com.viewsh.module.ops.environment.service.voice; + +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * 语音播报去重服务 + * + * 功能: + * 1. 短时间内多个工单入队,合并播报 + * 2. 使用计数器累积播报请求 + * 3. 定时任务定期合并播报 + * + * @author lzh + */ +@Slf4j +@Service +public class VoiceBroadcastDeduplicationService { + + @Resource + private RedisTemplate redisTemplate; + + /** + * 本地缓存:存储待合并的播报请求(保洁员ID -> 新增工单数量) + * Key: cleanerId + * Value: 新增工单数量 + */ + private final Map pendingBroadcasts = new ConcurrentHashMap<>(); + + /** + * 本地缓存:存储最后播报时间(保洁员ID -> 最后播报时间) + */ + private final Map lastBroadcastTime = new ConcurrentHashMap<>(); + + /** + * 播报合并时间窗口(秒) + * 在此时间窗口内的多个播报请求将被合并 + */ + private static final int BROADCAST_WINDOW_SECONDS = 5; + + /** + * Redis Key 前缀 + */ + private static final String REDIS_KEY_PREFIX = "ops:voice:pending:"; + private static final String REDIS_LAST_BROADCAST_PREFIX = "ops:voice:last:"; + + /** + * 记录播报请求(不立即播报,而是累积) + * + * @param cleanerId 保洁员ID + * @param orderCount 新增工单数量(通常为1) + */ + public void recordBroadcastRequest(Long cleanerId, int orderCount) { + // 1. 累积到本地缓存 + pendingBroadcasts.merge(cleanerId, orderCount, Integer::sum); + + // 2. 同步到 Redis(支持分布式环境) + String redisKey = REDIS_KEY_PREFIX + cleanerId; + redisTemplate.opsForValue().increment(redisKey, orderCount); + redisTemplate.expire(redisKey, BROADCAST_WINDOW_SECONDS + 2, TimeUnit.SECONDS); + + log.debug("记录播报请求: cleanerId={}, orderCount={}, totalPending={}", + cleanerId, orderCount, pendingBroadcasts.getOrDefault(cleanerId, 0)); + } + + /** + * 记录播报请求并立即播报 + * 如果距离上次播报时间过短,则合并播报 + * + * @param cleanerId 保洁员ID + * @param orderCount 新增工单数量 + * @param immediate 是否立即播报 + */ + public void recordAndBroadcast(Long cleanerId, int orderCount, boolean immediate) { + recordBroadcastRequest(cleanerId, orderCount); + + if (immediate) { + // 检查是否应该立即播报(距离上次播报超过时间窗口) + LocalDateTime lastTime = lastBroadcastTime.get(cleanerId); + if (lastTime == null || + lastTime.plusSeconds(BROADCAST_WINDOW_SECONDS).isBefore(LocalDateTime.now())) { + // 立即播报 + flushBroadcast(cleanerId); + } + // 否则等待定时任务合并播报 + } + } + + /** + * 立即播报(合并累积的请求) + * + * @param cleanerId 保洁员ID + */ + public void flushBroadcast(Long cleanerId) { + // 1. 获取累积的工单数量 + Integer totalOrders = pendingBroadcasts.get(cleanerId); + if (totalOrders == null || totalOrders == 0) { + // 从 Redis 获取 + String redisKey = REDIS_KEY_PREFIX + cleanerId; + Object value = redisTemplate.opsForValue().get(redisKey); + if (value instanceof Integer) { + totalOrders = (Integer) value; + } else { + return; // 没有待播报的请求 + } + } + + if (totalOrders <= 0) { + return; + } + + // 2. 计算总待办数量(当前队列中的工单数) + // TODO: 调用 OrderQueueService 获取队列中的工单数量 + // int totalQueueCount = orderQueueService.countByUserId(cleanerId); + int totalQueueCount = totalOrders; // 临时使用 totalOrders + + // 3. 生成播报内容 + String message = String.format("新增%d项待办,您共有%d个待办工单", + totalOrders, totalQueueCount); + + // 4. 调用 IoT 设备服务播报 + // TODO: iotDeviceService.playVoice(cleanerId, message); + + // 5. 清除累积的请求 + pendingBroadcasts.remove(cleanerId); + String redisKey = REDIS_KEY_PREFIX + cleanerId; + redisTemplate.delete(redisKey); + + // 6. 更新最后播报时间 + lastBroadcastTime.put(cleanerId, LocalDateTime.now()); + String lastBroadcastKey = REDIS_LAST_BROADCAST_PREFIX + cleanerId; + redisTemplate.opsForValue().set(lastBroadcastKey, + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME), + 1, TimeUnit.HOURS); + + log.info("语音播报已发送: cleanerId={}, newOrders={}, totalQueue={}, message={}", + cleanerId, totalOrders, totalQueueCount, message); + } + + /** + * 定时任务:定期合并播报(每5秒执行一次) + * 将累积的播报请求合并后发送 + */ + @Scheduled(fixedDelay = BROADCAST_WINDOW_SECONDS * 1000) + public void scheduledFlush() { + if (pendingBroadcasts.isEmpty()) { + return; + } + + log.debug("定时合并播报开始: pendingCount={}", pendingBroadcasts.size()); + + // 复制一份,避免并发修改 + Map toFlush = new HashMap<>(pendingBroadcasts); + + for (Map.Entry entry : toFlush.entrySet()) { + Long cleanerId = entry.getKey(); + try { + flushBroadcast(cleanerId); + } catch (Exception e) { + log.error("定时播报失败: cleanerId={}", cleanerId, e); + } + } + + log.debug("定时合并播报完成"); + } + + /** + * 清理过期的播报记录(每小时执行一次) + */ + @Scheduled(fixedDelay = 3600 * 1000) + public void cleanupExpiredRecords() { + LocalDateTime now = LocalDateTime.now(); + lastBroadcastTime.entrySet().removeIf(entry -> + entry.getValue().plusHours(2).isBefore(now)); + + log.debug("清理过期播报记录完成"); + } + + /** + * 获取待播报的工单数量 + * + * @param cleanerId 保洁员ID + * @return 待播报的工单数量 + */ + public int getPendingOrders(Long cleanerId) { + Integer count = pendingBroadcasts.getOrDefault(cleanerId, 0); + if (count == 0) { + String redisKey = REDIS_KEY_PREFIX + cleanerId; + Object value = redisTemplate.opsForValue().get(redisKey); + if (value instanceof Integer) { + count = (Integer) value; + } + } + return count; + } + + /** + * 检查是否可以立即播报 + * + * @param cleanerId 保洁员ID + * @return true 如果距离上次播报超过时间窗口 + */ + public boolean canBroadcastImmediately(Long cleanerId) { + LocalDateTime lastTime = lastBroadcastTime.get(cleanerId); + if (lastTime == null) { + // 从 Redis 获取 + String lastBroadcastKey = REDIS_LAST_BROADCAST_PREFIX + cleanerId; + Object value = redisTemplate.opsForValue().get(lastBroadcastKey); + if (value instanceof String) { + lastTime = LocalDateTime.parse((String) value, DateTimeFormatter.ISO_LOCAL_DATE_TIME); + lastBroadcastTime.put(cleanerId, lastTime); + } else { + return true; // 没有播报记录,可以立即播报 + } + } + + return lastTime.plusSeconds(BROADCAST_WINDOW_SECONDS).isBefore(LocalDateTime.now()); + } +}