Compare commits

2 Commits

Author SHA1 Message Date
lzh
0775ead5ff feat(ops): 客流工单周期化,同区域复用活跃工单并逐级升级优先级
Some checks failed
Java CI with Maven / build (11) (push) Has been cancelled
Java CI with Maven / build (17) (push) Has been cancelled
Java CI with Maven / build (8) (push) Has been cancelled
解决同一区域客流持续达标时重复创建工单的问题。改为:无活跃工单时
创建新工单,有未派发工单(PENDING/QUEUED)时升级优先级一级,有已派发
工单时忽略,所有分支均重置阈值计数器。工单终态时清除活跃标记。

- 新增 TrafficActiveOrderRedisDAO 管理区域活跃工单 Redis 标记
- 新增 CleanOrderService.upgradeOneLevelPriority 逐级升级优先级
- 改造 CleanOrderCreateEventHandler 实现客流触发周期化分支逻辑
- 新增 OpsOrderMapper.selectActiveTrafficOrder 作为 DB 兜底查询

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 00:21:14 +08:00
lzh
4f737a5dd1 refactor(ops): 重构 TTS 语音播报队列,解耦 ttsFlag 与队列优先级
移除 Thread.sleep(5000) 阻塞,改由 TTS 队列按设备维度控制播报顺序和间隔:
- 解耦 ttsFlag(硬件行为 0x09)与 priority(队列位置),全部使用 0x09 发送
- TtsQueueMessage 新增 inOrder/urgent 工厂方法,VoiceBroadcastService 精简为
  broadcastInOrder(FIFO rightPush)和 broadcastUrgent(leftPush 插队)两个入口
- 同设备播报间隔 3s → 6s,消息过期时间 30s → 60s
- 修复原 leftPush+leftPop LIFO 导致连续入队顺序反转的问题

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 00:19:24 +08:00
13 changed files with 598 additions and 248 deletions

View File

@@ -0,0 +1,35 @@
package com.viewsh.module.ops.environment.dal.redis;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 客流触发的活跃工单信息 DTO
* <p>
* 存储在 Redis Hash 中,用于快速判断区域是否有活跃的客流触发工单
*
* @author AI
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ActiveOrderInfo {
/**
* 工单ID
*/
private Long orderId;
/**
* 工单状态
*/
private String status;
/**
* 工单优先级
*/
private Integer priority;
}

View File

@@ -0,0 +1,106 @@
package com.viewsh.module.ops.environment.dal.redis;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Repository;
import java.util.Map;
/**
* 客流触发活跃工单 Redis DAO
* <p>
* 用于标记某区域是否有客流触发的活跃工单,避免重复创建。
* <p>
* Redis Key: ops:clean:traffic:active-order:{areaId}
* Value: Hash { orderId, status, priority }
* TTL: 无(由终态主动删除)
*
* @author AI
*/
@Slf4j
@Repository
public class TrafficActiveOrderRedisDAO {
private static final String KEY_PATTERN = "ops:clean:traffic:active-order:%s";
private static final String FIELD_ORDER_ID = "orderId";
private static final String FIELD_STATUS = "status";
private static final String FIELD_PRIORITY = "priority";
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 标记区域有活跃的客流触发工单
*/
public void markActive(Long areaId, Long orderId, String status, Integer priority) {
String key = buildKey(areaId);
stringRedisTemplate.opsForHash().putAll(key, Map.of(
FIELD_ORDER_ID, String.valueOf(orderId),
FIELD_STATUS, status,
FIELD_PRIORITY, String.valueOf(priority)
));
log.debug("[TrafficActiveOrderRedisDAO] 标记活跃工单: areaId={}, orderId={}, status={}, priority={}",
areaId, orderId, status, priority);
}
/**
* 获取区域的活跃工单信息
*
* @return 活跃工单信息,不存在返回 null
*/
public ActiveOrderInfo getActive(Long areaId) {
String key = buildKey(areaId);
Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(key);
if (entries == null || entries.isEmpty()) {
return null;
}
String orderIdStr = (String) entries.get(FIELD_ORDER_ID);
if (orderIdStr == null) {
return null;
}
return ActiveOrderInfo.builder()
.orderId(Long.parseLong(orderIdStr))
.status((String) entries.get(FIELD_STATUS))
.priority(Integer.parseInt((String) entries.get(FIELD_PRIORITY)))
.build();
}
/**
* 更新活跃工单的状态
*/
public void updateStatus(Long areaId, String newStatus) {
String key = buildKey(areaId);
if (Boolean.TRUE.equals(stringRedisTemplate.hasKey(key))) {
stringRedisTemplate.opsForHash().put(key, FIELD_STATUS, newStatus);
log.debug("[TrafficActiveOrderRedisDAO] 更新状态: areaId={}, newStatus={}", areaId, newStatus);
}
}
/**
* 更新活跃工单的优先级
*/
public void updatePriority(Long areaId, Integer newPriority) {
String key = buildKey(areaId);
if (Boolean.TRUE.equals(stringRedisTemplate.hasKey(key))) {
stringRedisTemplate.opsForHash().put(key, FIELD_PRIORITY, String.valueOf(newPriority));
log.debug("[TrafficActiveOrderRedisDAO] 更新优先级: areaId={}, newPriority={}", areaId, newPriority);
}
}
/**
* 移除区域的活跃工单标记(工单终态时调用)
*/
public void removeActive(Long areaId) {
String key = buildKey(areaId);
stringRedisTemplate.delete(key);
log.debug("[TrafficActiveOrderRedisDAO] 移除活跃标记: areaId={}", areaId);
}
private String buildKey(Long areaId) {
return String.format(KEY_PATTERN, areaId);
}
}

View File

@@ -199,7 +199,7 @@ public class CleanOrderAuditEventHandler implements RocketMQListener<String> {
*/
private void sendTts(Long deviceId, String text, Long orderId) {
try {
voiceBroadcastService.broadcast(deviceId, text, orderId);
voiceBroadcastService.broadcastInOrder(deviceId, text, orderId);
log.info("[CleanOrderAuditEventHandler] TTS 下发成功: deviceId={}, text={}", deviceId, text);
} catch (Exception e) {
log.error("[CleanOrderAuditEventHandler] TTS 下发异常: deviceId={}", deviceId, e);

View File

@@ -1,8 +1,15 @@
package com.viewsh.module.ops.environment.integration.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.module.iot.api.device.IotDeviceControlApi;
import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.PriorityEnum;
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
import com.viewsh.module.ops.environment.dal.dataobject.CleanOrderAutoCreateReqDTO;
import com.viewsh.module.ops.environment.dal.redis.ActiveOrderInfo;
import com.viewsh.module.ops.environment.dal.redis.TrafficActiveOrderRedisDAO;
import com.viewsh.module.ops.environment.integration.dto.CleanOrderCreateEventDTO;
import com.viewsh.module.ops.environment.service.cleanorder.CleanOrderService;
import com.viewsh.module.ops.infrastructure.log.enumeration.EventDomain;
@@ -19,12 +26,19 @@ import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* 保洁工单创建事件消费者
* <p>
* 订阅 IoT 模块发布的保洁工单创建事件
* 订阅 IoT 模块发布的保洁工单创建事件
* <p>
* 客流触发逻辑(周期化):
* 1. 无活跃工单 → 创建新工单 → 标记活跃 → 重置阈值
* 2. 有未派发工单(PENDING/QUEUED) → 升级优先级一级 → 重置阈值
* 3. 有已派发工单(DISPATCHED/CONFIRMED/ARRIVED) → 忽略 → 重置阈值
* 4. 已是 P0 → 不升级,记录审计日志 → 重置阈值
* <p>
* RocketMQ 配置:
* - Topic: ops-order-create
@@ -47,6 +61,14 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
*/
private static final int DEDUP_TTL_SECONDS = 300;
/**
* 未派发状态集合(可升级优先级)
*/
private static final Set<String> UNDISPATCHED_STATUSES = Set.of(
WorkOrderStatusEnum.PENDING.getStatus(),
WorkOrderStatusEnum.QUEUED.getStatus()
);
@Resource
private ObjectMapper objectMapper;
@@ -59,6 +81,15 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
@Resource
private EventLogRecorder eventLogRecorder;
@Resource
private TrafficActiveOrderRedisDAO trafficActiveOrderRedisDAO;
@Resource
private IotDeviceControlApi iotDeviceControlApi;
@Resource
private OpsOrderMapper opsOrderMapper;
@Override
public void onMessage(String message) {
try {
@@ -70,7 +101,7 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
Boolean firstTime = stringRedisTemplate.opsForValue()
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
if (!firstTime) {
if (!Boolean.TRUE.equals(firstTime)) {
log.debug("[CleanOrderCreateEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
return;
}
@@ -91,10 +122,152 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
log.info("[CleanOrderCreateEventHandler] 收到工单创建事件: eventId={}, areaId={}, triggerSource={}",
event.getEventId(), event.getAreaId(), event.getTriggerSource());
// 1. 构建创建请求(使用 CleanOrderAutoCreateReqDTO
// 客流触发 → 周期化逻辑
if ("IOT_TRAFFIC".equals(event.getTriggerSource())) {
handleTrafficTrigger(event);
return;
}
// 非客流触发 → 原逻辑
handleNonTrafficTrigger(event);
}
/**
* 处理客流触发的工单创建(周期化逻辑)
*/
private void handleTrafficTrigger(CleanOrderCreateEventDTO event) {
Long areaId = event.getAreaId();
// 1. 查询活跃工单Redis 优先DB 兜底)
ActiveOrderInfo activeOrder = trafficActiveOrderRedisDAO.getActive(areaId);
if (activeOrder == null) {
activeOrder = queryActiveTrafficOrderFromDb(areaId);
if (activeOrder != null) {
// 恢复 Redis 标记
trafficActiveOrderRedisDAO.markActive(areaId, activeOrder.getOrderId(),
activeOrder.getStatus(), activeOrder.getPriority());
log.info("[CleanOrderCreateEventHandler] Redis标记丢失已恢复: areaId={}, orderId={}",
areaId, activeOrder.getOrderId());
}
}
// 2. 有活跃工单 → 升级或忽略
if (activeOrder != null) {
String status = activeOrder.getStatus();
if (UNDISPATCHED_STATUSES.contains(status)) {
// 未派发 → 升级优先级一级
PriorityEnum result = cleanOrderService.upgradeOneLevelPriority(
activeOrder.getOrderId(), "客流持续达标自动升级");
if (result != null) {
trafficActiveOrderRedisDAO.updatePriority(areaId, result.getPriority());
recordUpgradeLog(event, activeOrder.getOrderId(), result);
log.info("[CleanOrderCreateEventHandler] 工单优先级已升级: areaId={}, orderId={}, newPriority={}",
areaId, activeOrder.getOrderId(), result);
} else {
// 已是 P0记录审计日志
recordP0CeilingLog(event, activeOrder.getOrderId());
log.info("[CleanOrderCreateEventHandler] 工单已是P0封顶不再升级: areaId={}, orderId={}",
areaId, activeOrder.getOrderId());
}
} else {
// 已派发DISPATCHED/CONFIRMED/ARRIVED→ 忽略
log.info("[CleanOrderCreateEventHandler] 区域{}已有已派发工单{}(状态:{}),忽略本次客流触发",
areaId, activeOrder.getOrderId(), status);
}
// ★ 所有分支都重置阈值
resetTrafficCounter(event);
return;
}
// 3. 无活跃工单 → 创建新工单
CleanOrderAutoCreateReqDTO createReq = buildCreateRequest(event);
Long orderId = cleanOrderService.createAutoCleanOrder(createReq);
// 标记 Redis 活跃工单
trafficActiveOrderRedisDAO.markActive(areaId, orderId,
WorkOrderStatusEnum.PENDING.getStatus(), createReq.getPriority());
// 重置阈值
resetTrafficCounter(event);
// 记录业务日志
recordOrderCreatedLog(event, orderId, createReq);
log.info("[CleanOrderCreateEventHandler] 客流工单创建成功: eventId={}, orderId={}, areaId={}",
event.getEventId(), orderId, areaId);
}
/**
* 处理非客流触发的工单创建(原逻辑不变)
*/
private void handleNonTrafficTrigger(CleanOrderCreateEventDTO event) {
CleanOrderAutoCreateReqDTO createReq = buildCreateRequest(event);
Long orderId = cleanOrderService.createAutoCleanOrder(createReq);
recordOrderCreatedLog(event, orderId, createReq);
log.info("[CleanOrderCreateEventHandler] 工单创建成功: eventId={}, orderId={}, areaId={}",
event.getEventId(), orderId, event.getAreaId());
}
// ==================== 阈值重置 ====================
/**
* 重置客流阈值计数器
*/
private void resetTrafficCounter(CleanOrderCreateEventDTO event) {
Long deviceId = event.getTriggerDeviceId();
if (deviceId == null) {
log.warn("[CleanOrderCreateEventHandler] 缺少设备ID跳过重置阈值: eventId={}", event.getEventId());
return;
}
try {
ResetTrafficCounterReqDTO reqDTO = ResetTrafficCounterReqDTO.builder()
.deviceId(deviceId)
.remark("客流达标后重置阈值计数器")
.build();
var result = iotDeviceControlApi.resetTrafficCounter(reqDTO);
if (result.getData() != null && result.getData()) {
log.info("[CleanOrderCreateEventHandler] 阈值计数器重置成功: deviceId={}", deviceId);
} else {
log.error("[CleanOrderCreateEventHandler] 阈值计数器重置失败: deviceId={}", deviceId);
}
} catch (Exception e) {
log.error("[CleanOrderCreateEventHandler] 阈值计数器重置异常: deviceId={}", deviceId, e);
}
}
// ==================== DB 兜底查询 ====================
/**
* 从 DB 查询区域内客流触发的活跃工单Redis 标记丢失时兜底)
*/
private ActiveOrderInfo queryActiveTrafficOrderFromDb(Long areaId) {
OpsOrderDO order = opsOrderMapper.selectActiveTrafficOrder(areaId);
if (order == null) {
return null;
}
return ActiveOrderInfo.builder()
.orderId(order.getId())
.status(order.getStatus())
.priority(order.getPriority())
.build();
}
// ==================== 构建请求 ====================
/**
* 构建工单创建请求
*/
private CleanOrderAutoCreateReqDTO buildCreateRequest(CleanOrderCreateEventDTO event) {
CleanOrderAutoCreateReqDTO createReq = new CleanOrderAutoCreateReqDTO();
createReq.setOrderType("CLEAN");
createReq.setSourceType("TRAFFIC"); // 系统触发
createReq.setSourceType("TRAFFIC");
createReq.setTitle(generateOrderTitle(event));
createReq.setDescription(generateOrderDescription(event));
createReq.setPriority(
@@ -113,28 +286,19 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
createReq.setTriggerDeviceKey(event.getTriggerDeviceKey());
createReq.setTriggerData(event.getTriggerData());
// 2. 创建工单(同时创建主表+扩展表)
Long orderId = cleanOrderService.createAutoCleanOrder(createReq);
// 3. 记录业务日志
recordOrderCreatedLog(event, orderId, createReq);
// 注意:客流计数器重置已移至 CleanOrderEventListener在事务提交后执行
log.info("[CleanOrderCreateEventHandler] 工单创建成功: eventId={}, orderId={}, areaId={}",
event.getEventId(), orderId, event.getAreaId());
return createReq;
}
// ==================== 业务日志 ====================
/**
* 记录工单创建业务日志
*/
private void recordOrderCreatedLog(CleanOrderCreateEventDTO event, Long orderId,
CleanOrderAutoCreateReqDTO createReq) {
try {
// 确定事件域
EventDomain domain = determineDomain(event.getTriggerSource());
// 构建扩展信息
Map<String, Object> extra = new HashMap<>();
extra.put("eventId", event.getEventId());
extra.put("triggerSource", event.getTriggerSource());
@@ -145,7 +309,6 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
extra.putAll(event.getTriggerData());
}
// 记录日志(合并为一次调用)
eventLogRecorder.record(EventLogRecord.builder()
.module("clean")
.domain(domain)
@@ -163,6 +326,62 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
}
}
/**
* 记录优先级升级日志
*/
private void recordUpgradeLog(CleanOrderCreateEventDTO event, Long orderId, PriorityEnum newPriority) {
try {
Map<String, Object> extra = new HashMap<>();
extra.put("eventId", event.getEventId());
extra.put("areaId", event.getAreaId());
extra.put("newPriority", newPriority.getPriority());
extra.put("reason", "客流持续达标自动升级");
eventLogRecorder.record(EventLogRecord.builder()
.module("clean")
.domain(EventDomain.TRAFFIC)
.eventType("PRIORITY_UPGRADE")
.message(String.format("客流持续达标,工单优先级升级至%s [区域:%d]",
newPriority.getDescription(), event.getAreaId()))
.targetId(orderId)
.targetType("order")
.deviceId(event.getTriggerDeviceId())
.level(EventLevel.WARN)
.payload(extra)
.build());
} catch (Exception e) {
log.warn("[CleanOrderCreateEventHandler] 记录升级日志失败: orderId={}", orderId, e);
}
}
/**
* 记录 P0 封顶审计日志
*/
private void recordP0CeilingLog(CleanOrderCreateEventDTO event, Long orderId) {
try {
Map<String, Object> extra = new HashMap<>();
extra.put("eventId", event.getEventId());
extra.put("areaId", event.getAreaId());
extra.put("reason", "已是P0最高优先级无法继续升级");
eventLogRecorder.record(EventLogRecord.builder()
.module("clean")
.domain(EventDomain.TRAFFIC)
.eventType("PRIORITY_CEILING")
.message(String.format("客流持续达标但工单已是P0封顶 [区域:%d]", event.getAreaId()))
.targetId(orderId)
.targetType("order")
.deviceId(event.getTriggerDeviceId())
.level(EventLevel.WARN)
.payload(extra)
.build());
} catch (Exception e) {
log.warn("[CleanOrderCreateEventHandler] 记录P0封顶日志失败: orderId={}", orderId, e);
}
}
/**
* 确定事件域
*/
@@ -192,9 +411,8 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
event.getTriggerDeviceKey(), event.getTriggerSource());
}
/**
* 生成工单标题
*/
// ==================== 工具方法 ====================
private String generateOrderTitle(CleanOrderCreateEventDTO event) {
if ("IOT_TRAFFIC".equals(event.getTriggerSource())) {
return "客流阈值触发保洁";
@@ -207,9 +425,6 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
}
}
/**
* 生成工单描述
*/
private String generateOrderDescription(CleanOrderCreateEventDTO event) {
StringBuilder desc = new StringBuilder();
desc.append("触发来源: ").append(event.getTriggerSource()).append("\n");
@@ -227,31 +442,23 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
return desc.toString();
}
/**
* 计算预计作业时长(分钟)
*/
private Integer calculateExpectedDuration(CleanOrderCreateEventDTO event) {
// 根据触发来源和客流数据估算时长
if ("IOT_TRAFFIC".equals(event.getTriggerSource()) && event.getTriggerData() != null) {
Object actualCountObj = event.getTriggerData().get("actualCount");
if (actualCountObj != null) {
Long actualCount = ((Number) actualCountObj).longValue();
// 客流越大,预计耗时越长
if (actualCount > 50) {
return 45; // 高客流
return 45;
} else if (actualCount > 20) {
return 30; // 中客流
return 30;
} else {
return 20; // 低客流
return 20;
}
}
}
return 30; // 默认30分钟
return 30;
}
/**
* 从事件数据中提取规则ID
*/
private Long extractRuleId(CleanOrderCreateEventDTO event) {
if (event.getTriggerData() != null && event.getTriggerData().containsKey("ruleId")) {
Object ruleIdObj = event.getTriggerData().get("ruleId");

View File

@@ -1,7 +1,5 @@
package com.viewsh.module.ops.environment.integration.listener;
import com.viewsh.module.iot.api.device.IotDeviceControlApi;
import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO;
import com.viewsh.module.ops.api.queue.OrderQueueService;
import com.viewsh.module.ops.core.dispatch.DispatchEngine;
import com.viewsh.module.ops.core.dispatch.model.DispatchResult;
@@ -15,6 +13,7 @@ import com.viewsh.module.ops.enums.PriorityEnum;
import com.viewsh.module.ops.environment.constants.CleanNotificationConstants;
import com.viewsh.module.ops.environment.dal.dataobject.workorder.OpsOrderCleanExtDO;
import com.viewsh.module.ops.environment.dal.mysql.workorder.OpsOrderCleanExtMapper;
import com.viewsh.module.ops.environment.dal.redis.TrafficActiveOrderRedisDAO;
import com.viewsh.module.ops.environment.service.cleanorder.CleanOrderService;
import com.viewsh.module.ops.environment.service.voice.VoiceBroadcastService;
import com.viewsh.module.ops.infrastructure.log.enumeration.EventDomain;
@@ -78,15 +77,15 @@ public class CleanOrderEventListener {
@Resource
private NotifyMessageSendApi notifyMessageSendApi;
@Resource
private IotDeviceControlApi iotDeviceControlApi;
@Resource
private EventLogRecorder eventLogRecorder;
@Resource
private OrderQueueService orderQueueService;
@Resource
private TrafficActiveOrderRedisDAO trafficActiveOrderRedisDAO;
// ==================== 工单创建事件 ====================
/**
@@ -108,10 +107,7 @@ public class CleanOrderEventListener {
// 异步触发调度
asyncDispatchAfterCreated(event);
// 如果是客流触发的工单,重置客流计数器
if ("IOT_TRAFFIC".equals(event.getPayload().get("triggerSource"))) {
asyncResetTrafficCounter(event);
}
// 注意:客流计数器重置已移至 CleanOrderCreateEventHandler在消息消费时统一处理
}
/**
@@ -152,50 +148,6 @@ public class CleanOrderEventListener {
}
}
/**
* 异步重置客流计数器
* <p>
* 在工单创建成功后重置阈值计数器,确保:
* 1. 工单创建和计数器重置在同一事务语义下AFTER_COMMIT
* 2. 如果工单创建失败,计数器不会被错误重置
* 3. 由 Ops 模块(业务方)决定重置时机,职责清晰
*/
@Async("ops-task-executor")
public void asyncResetTrafficCounter(OrderCreatedEvent event) {
try {
Long deviceId = (Long) event.getPayload().get("triggerDeviceId");
if (deviceId == null) {
log.warn("[CleanOrderEventListener] 缺少设备ID跳过重置: orderId={}",
event.getOrderId());
return;
}
// 构建重置请求
ResetTrafficCounterReqDTO reqDTO = ResetTrafficCounterReqDTO.builder()
.deviceId(deviceId)
.orderId(event.getOrderId())
.remark("工单创建成功后重置阈值计数器")
.build();
// 调用 IoT 模块 RPC 接口
var result = iotDeviceControlApi.resetTrafficCounter(reqDTO);
if (result.getData() != null && result.getData()) {
log.info("[CleanOrderEventListener] 阈值计数器重置成功: orderId={}, deviceId={}",
event.getOrderId(), deviceId);
} else {
log.error("[CleanOrderEventListener] 阈值计数器重置失败: orderId={}, deviceId={}",
event.getOrderId(), deviceId);
// TODO: 发送告警,需要人工介入检查
}
} catch (Exception e) {
log.error("[CleanOrderEventListener] 阈值计数器重置异常: orderId={}", event.getOrderId(), e);
// TODO: 发送告警
}
}
// ==================== 状态变更事件 ====================
/**
@@ -216,25 +168,31 @@ public class CleanOrderEventListener {
switch (event.getNewStatus()) {
case DISPATCHED:
handleDispatched(event);
updateTrafficActiveOrderStatus(event);
break;
case CONFIRMED:
handleConfirmed(event);
updateTrafficActiveOrderStatus(event);
break;
case ARRIVED:
handleArrived(event);
updateTrafficActiveOrderStatus(event);
break;
case PAUSED:
handlePaused(event);
break;
case COMPLETED:
handleCompleted(event);
clearTrafficActiveOrder(event);
break;
case CANCELLED:
// 设备状态由 BadgeDeviceStatusEventListener 统一处理
log.debug("[CleanOrderEventListener] CANCELLED 状态已处理: orderId={}", event.getOrderId());
clearTrafficActiveOrder(event);
break;
case QUEUED:
handleQueued(event);
updateTrafficActiveOrderStatus(event);
break;
default:
break;
@@ -437,7 +395,7 @@ public class CleanOrderEventListener {
Long deviceId = event.getAssigneeId();
if (deviceId != null) {
// 异步执行:发送完成通知,等待5秒后再派发下一个任务
// 异步执行:发送完成通知,然后派发下一个任务TTS队列控制播报间隔
asyncCompleteAndDispatchNext(event.getOrderId(), deviceId);
}
}
@@ -445,23 +403,16 @@ public class CleanOrderEventListener {
/**
* 异步执行工单完成后的通知和派单
* <p>
* 保证顺序:先播报"工单已完成"间隔5秒后再派发下一个任务触发"新工单来啦"
* 避免两条语音播报顺序混乱或被覆盖
* 先发送完成通知,再立即触发下一个任务派发。
* 语音播报顺序和间隔由 TTS 队列({@link com.viewsh.module.ops.environment.service.voice.TtsQueueConsumer}
* 按设备维度控制,同一设备前后播报间隔由 ops.tts.queue.interval-ms 配置。
*/
@Async("ops-task-executor")
public void asyncCompleteAndDispatchNext(Long orderId, Long deviceId) {
// 1. 发送完成通知
// 1. 发送完成通知入TTS队列
sendOrderCompletedNotification(orderId, deviceId);
// 2. 等待5秒确保完成语音播报完毕后再触发下一个任务的通知
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("[CleanOrderEventListener] 等待被中断: orderId={}", orderId);
}
// 3. 自动推送下一个任务
// 2. 自动推送下一个任务新任务通知也入TTS队列由队列控制播报间隔
cleanOrderService.autoDispatchNextOrder(orderId, deviceId);
}
@@ -552,9 +503,9 @@ public class CleanOrderEventListener {
try {
log.warn("[P0紧急通知] deviceId={}, orderCode={}", deviceId, orderCode);
// 1. 语音播报(使用统一模板构建器
// 1. 语音播报(P0紧急插队到队列头部
String voiceMessage = CleanNotificationConstants.VoiceBuilder.buildPriorityUpgrade(orderCode);
playVoice(deviceId, voiceMessage, orderId);
playVoiceUrgent(deviceId, voiceMessage, orderId);
// 2. 发送站内信
sendNotifyMessage(1L,
@@ -637,11 +588,14 @@ public class CleanOrderEventListener {
}
/**
* 语音播报带工单ID
* 语音播报带工单ID,按序入队 FIFO
* <p>
* 大多数业务通知使用此方法,保证同一设备上的播报按入队顺序播放。
* 仅 P0 紧急插队场景使用 {@link #playVoiceUrgent}。
*/
private void playVoice(Long deviceId, String message, Long orderId) {
try {
voiceBroadcastService.broadcast(deviceId, message, orderId);
voiceBroadcastService.broadcastInOrder(deviceId, message, orderId);
log.debug("[语音播报] 调用成功: deviceId={}, message={}", deviceId, message);
} catch (Exception e) {
@@ -649,6 +603,21 @@ public class CleanOrderEventListener {
}
}
/**
* 紧急语音播报(插队到队列头部)
* <p>
* 仅用于 P0 紧急任务打断等需要立即播报的场景
*/
private void playVoiceUrgent(Long deviceId, String message, Long orderId) {
try {
voiceBroadcastService.broadcastUrgent(deviceId, message, orderId);
log.debug("[语音播报-紧急] 调用成功: deviceId={}, message={}", deviceId, message);
} catch (Exception e) {
log.error("[语音播报-紧急] 调用失败: deviceId={}, message={}", deviceId, message, e);
}
}
// ==================== 站内信发送方法 ====================
/**
@@ -679,6 +648,39 @@ public class CleanOrderEventListener {
return "某区域";
}
/**
* 工单状态变更时,更新 Redis 中的活跃工单状态标记
* <p>
* 仅处理客流触发的工单
*/
private void updateTrafficActiveOrderStatus(OrderStateChangedEvent event) {
try {
OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId());
if (order != null && "IOT_TRAFFIC".equals(order.getTriggerSource()) && order.getAreaId() != null) {
trafficActiveOrderRedisDAO.updateStatus(order.getAreaId(), event.getNewStatus().getStatus());
}
} catch (Exception e) {
log.warn("[CleanOrderEventListener] 更新客流活跃工单状态失败: orderId={}", event.getOrderId(), e);
}
}
/**
* 工单终态时,清除 Redis 中的活跃工单标记
* <p>
* 仅处理客流触发的工单。清除后下次客流达标将创建新工单(新周期)。
*/
private void clearTrafficActiveOrder(OrderStateChangedEvent event) {
try {
OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId());
if (order != null && "IOT_TRAFFIC".equals(order.getTriggerSource()) && order.getAreaId() != null) {
trafficActiveOrderRedisDAO.removeActive(order.getAreaId());
log.info("[CleanOrderEventListener] 客流工单周期结束,已清除区域{}活跃标记", order.getAreaId());
}
} catch (Exception e) {
log.warn("[CleanOrderEventListener] 清除客流活跃工单标记失败: orderId={}", event.getOrderId(), e);
}
}
/**
* 记录暂停开始时间
*/

View File

@@ -1,5 +1,6 @@
package com.viewsh.module.ops.environment.service.cleanorder;
import com.viewsh.module.ops.enums.PriorityEnum;
import com.viewsh.module.ops.environment.dal.dataobject.CleanOrderAutoCreateReqDTO;
/**
@@ -133,6 +134,18 @@ public interface CleanOrderService {
*/
boolean upgradePriorityToP0(Long orderId, String reason);
/**
* 升级工单优先级一级(客流持续达标自动升级)
* <p>
* P3→P2, P2→P1, P1→P0。已是 P0 时不升级。
* 如果升级到 P0 且工单在队列中,会触发 P0 打断逻辑。
*
* @param orderId 工单ID
* @param reason 升级原因
* @return 升级后的优先级枚举,如果已是 P0 返回 null
*/
PriorityEnum upgradeOneLevelPriority(Long orderId, String reason);
// ========== 作业时长计算 ==========
/**

View File

@@ -256,6 +256,49 @@ public class CleanOrderServiceImpl implements CleanOrderService {
return true;
}
@Override
@Transactional(rollbackFor = Exception.class)
public PriorityEnum upgradeOneLevelPriority(Long orderId, String reason) {
log.info("升级工单优先级一级: orderId={}, reason={}", orderId, reason);
// 1. 查询工单
OpsOrderDO order = opsOrderMapper.selectById(orderId);
if (order == null) {
log.error("工单不存在: orderId={}", orderId);
return null;
}
// 2. 检查当前优先级
PriorityEnum currentPriority = PriorityEnum.fromPriority(order.getPriority());
if (currentPriority == PriorityEnum.P0) {
log.info("工单已经是P0优先级无法再升级: orderId={}", orderId);
return null;
}
// 3. 计算新优先级priority 数值越小越高)
PriorityEnum newPriority = PriorityEnum.fromPriority(currentPriority.getPriority() - 1);
log.info("工单优先级升级: orderId={}, {} -> {}", orderId, currentPriority, newPriority);
// 4. 更新工单优先级
order.setPriority(newPriority.getPriority());
opsOrderMapper.updateById(order);
// 5. 如果工单在队列中,同步队列分数
OrderQueueDTO queueDTO = orderQueueService.getByOpsOrderId(orderId);
if (queueDTO != null) {
orderQueueService.adjustPriority(queueDTO.getId(), newPriority, reason);
// 6. 如果升级到 P0触发紧急打断逻辑
if (newPriority == PriorityEnum.P0) {
DispatchResult result = dispatchEngine.urgentInterrupt(orderId, queueDTO.getUserId());
cleanOrderEventListener.sendPriorityUpgradeNotification(queueDTO.getUserId(), order.getOrderCode(), orderId);
log.warn("客流升级到P0触发紧急打断: orderId={}, success={}", orderId, result.isSuccess());
}
}
return newPriority;
}
// ==================== 保洁特有的状态转换 ====================
@Override

View File

@@ -49,7 +49,7 @@ public class TtsQueueConsumer {
@Value("${ops.tts.queue.enabled:true}")
private boolean queueEnabled;
@Value("${ops.tts.queue.interval-ms:3000}")
@Value("${ops.tts.queue.interval-ms:6000}")
private long broadcastIntervalMs;
@Value("${ops.tts.queue.max-queue-size:50}")

View File

@@ -37,6 +37,16 @@ public class TtsQueueMessage implements Serializable {
*/
public static final int TTS_FLAG_URGENT = 0x09;
/**
* 队列优先级紧急leftPush 插队到头部)
*/
public static final int PRIORITY_URGENT = 1;
/**
* 队列优先级普通rightPush 按序追加到尾部)
*/
public static final int PRIORITY_NORMAL = 5;
/**
* 设备ID
*/
@@ -79,14 +89,18 @@ public class TtsQueueMessage implements Serializable {
private Integer maxRetry;
/**
* 创建普通消息
* 创建按序消息FIFOrightPush 追加到尾部)
* <p>
* ttsFlag=0x09紧急通知带显示priority=5普通优先级
* 适用于大多数业务通知,保证同一设备播报按入队顺序播放
*/
public static TtsQueueMessage normal(Long deviceId, String text) {
public static TtsQueueMessage inOrder(Long deviceId, String text, Long orderId) {
return TtsQueueMessage.builder()
.deviceId(deviceId)
.text(text)
.ttsFlag(TTS_FLAG_NORMAL)
.priority(5)
.ttsFlag(TTS_FLAG_URGENT)
.orderId(orderId)
.priority(PRIORITY_NORMAL)
.createTime(System.currentTimeMillis())
.retryCount(0)
.maxRetry(2)
@@ -94,36 +108,24 @@ public class TtsQueueMessage implements Serializable {
}
/**
* 创建紧急消息
* 创建紧急消息leftPush 插队到队列头部)
* <p>
* ttsFlag=0x09紧急通知带显示priority=1紧急优先级
* 仅用于 P0 紧急任务打断等需要立即播报的场景
*/
public static TtsQueueMessage urgent(Long deviceId, String text) {
public static TtsQueueMessage urgent(Long deviceId, String text, Long orderId) {
return TtsQueueMessage.builder()
.deviceId(deviceId)
.text(text)
.ttsFlag(TTS_FLAG_URGENT)
.priority(1)
.orderId(orderId)
.priority(PRIORITY_URGENT)
.createTime(System.currentTimeMillis())
.retryCount(0)
.maxRetry(3)
.build();
}
/**
* 创建带工单ID的消息
*/
public static TtsQueueMessage withOrder(Long deviceId, String text, int ttsFlag, Long orderId) {
return TtsQueueMessage.builder()
.deviceId(deviceId)
.text(text)
.ttsFlag(ttsFlag)
.orderId(orderId)
.priority(ttsFlag == TTS_FLAG_URGENT ? 1 : 5)
.createTime(System.currentTimeMillis())
.retryCount(0)
.maxRetry(ttsFlag == TTS_FLAG_URGENT ? 3 : 2)
.build();
}
/**
* 增加重试次数
*/
@@ -139,12 +141,12 @@ public class TtsQueueMessage implements Serializable {
}
/**
* 检查消息是否过期(超过30秒认为过期
* 检查消息是否过期(超过60秒认为过期
*/
public boolean isExpired() {
if (createTime == null) {
return false;
}
return System.currentTimeMillis() - createTime > 30_000;
return System.currentTimeMillis() - createTime > 60_000;
}
}

View File

@@ -10,7 +10,6 @@ import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
@@ -18,20 +17,13 @@ import org.springframework.stereotype.Service;
* <p>
* 职责:
* 1. 统一所有 TTS 下发入口
* 2. 提供同步/异步播报接口
* 3. 通过队列控制播报间隔,防止连续播报被覆盖
* 4. 记录播报日志
* <p>
* 设计原则:
* - 接受 deviceId 参数(而非 cleanerId
* - 默认使用队列模式,确保播报不丢失
* - 支持直接播报模式(特殊场景)
* - 按设备分组,独立队列管理
* 2. 通过队列控制播报间隔,防止连续播报被覆盖
* 3. 记录播报日志
* <p>
* 队列机制:
* - 相同设备的播报请求进入 Redis 队列
* - 消费者按顺序取出,间隔 1-2 秒播报
* - 紧急消息可插队(优先级高
* - 消费者按顺序取出,按配置间隔播报
* - 紧急消息可插队(leftPush 到队列头部
* - 支持失败重试(最多 2-3 次)
* <p>
* JT808 TTS 播报标志 (tts_flag) 说明:
@@ -50,27 +42,6 @@ public class VoiceBroadcastService {
*/
private static final String TTS_IDENTIFIER = "TTS";
/**
* TTS 播报标志:静默执行
* <p>
* 设备收到后解析指令,修改参数,回复 0x0001但不发声
*/
public static final int TTS_FLAG_SILENT = 0x01;
/**
* TTS 播报标志:普通通知
* <p>
* 播放语音,设备将文本内容通过喇叭朗读出来
*/
public static final int TTS_FLAG_NORMAL = 0x08;
/**
* TTS 播报标志:紧急通知
* <p>
* 播放语音(通常带显示),用于重要通知
*/
public static final int TTS_FLAG_URGENT = 0x09;
@Resource
private IotDeviceControlApi iotDeviceControlApi;
@@ -86,85 +57,48 @@ public class VoiceBroadcastService {
@Value("${ops.tts.queue.enabled:true}")
private boolean queueEnabled;
// ==================== 队列模式播报(推荐) ====================
// ==================== 队列模式播报 ====================
/**
* 播报语音(紧急通知,默认,使用队列
* 按序播报FIFOrightPush 追加到队列尾部
* <p>
* 使用 tts_flag=0x09,适用于工单确认、到岗提醒等重要通知
* 使用 tts_flag=0x09(紧急通知,带显示),以普通优先级入队,
* 保证同一设备上的播报按入队顺序播放。适用于大多数业务通知。
*
* @param deviceId 设备ID
* @param text 播报文本
* @param orderId 工单ID可选用于日志记录
*/
public void broadcast(Long deviceId, String text) {
broadcast(deviceId, text, TTS_FLAG_URGENT, null);
}
/**
* 播报语音带工单ID使用队列
*/
public void broadcast(Long deviceId, String text, Long orderId) {
broadcast(deviceId, text, TTS_FLAG_URGENT, orderId);
}
/**
* 播报语音(指定播报类型,使用队列)
*
* @param deviceId 设备ID
* @param text 播报文本
* @param ttsFlag 播报标志0x01=静默, 0x08=普通, 0x09=紧急)
*/
public void broadcast(Long deviceId, String text, int ttsFlag) {
broadcast(deviceId, text, ttsFlag, null);
}
/**
* 播报语音指定播报类型和工单ID使用队列
* <p>
* 优先使用队列模式,确保播报不丢失
*
* @param deviceId 设备ID
* @param text 播报文本
* @param ttsFlag 播报标志
* @param orderId 工单ID可选
*/
public void broadcast(Long deviceId, String text, int ttsFlag, Long orderId) {
public void broadcastInOrder(Long deviceId, String text, Long orderId) {
if (deviceId == null || text == null) {
return;
}
// 优先使用队列模式
if (queueEnabled) {
broadcastWithQueue(deviceId, text, ttsFlag, orderId);
enqueueOrFallback(TtsQueueMessage.inOrder(deviceId, text, orderId));
} else {
broadcastDirect(deviceId, text, ttsFlag, orderId);
broadcastDirect(deviceId, text, TtsQueueMessage.TTS_FLAG_URGENT, orderId);
}
}
/**
* 播报语音(普通通知,使用队列
* 紧急播报leftPush 插队到队列头部
* <p>
* 使用 tts_flag=0x09紧急通知带显示以紧急优先级入队
* 仅用于 P0 紧急任务打断等需要立即播报的场景。
*
* @param deviceId 设备ID
* @param text 播报文本
* @param orderId 工单ID可选用于日志记录
*/
public void broadcastNormal(Long deviceId, String text) {
broadcast(deviceId, text, TTS_FLAG_NORMAL);
}
/**
* 播报语音(紧急通知,使用队列)
*/
public void broadcastUrgent(Long deviceId, String text) {
broadcast(deviceId, text, TTS_FLAG_URGENT, null);
}
public void broadcastUrgent(Long deviceId, String text, Long orderId) {
broadcast(deviceId, text, TTS_FLAG_URGENT, orderId);
}
/**
* 播报语音(异步,使用队列)
*/
@Async("ops-task-executor")
public void broadcastAsync(Long deviceId, String text) {
broadcast(deviceId, text);
if (deviceId == null || text == null) {
return;
}
if (queueEnabled) {
enqueueOrFallback(TtsQueueMessage.urgent(deviceId, text, orderId));
} else {
broadcastDirect(deviceId, text, TtsQueueMessage.TTS_FLAG_URGENT, orderId);
}
}
// ==================== 直接播报模式(特殊场景) ====================
@@ -206,38 +140,31 @@ public class VoiceBroadcastService {
}
}
// ==================== 队列播报(内部方法 ====================
// ==================== 内部方法 ====================
/**
* 通过队列播报语音
* 入队,失败时降级为直接播报
*/
private void broadcastWithQueue(Long deviceId, String text, int ttsFlag, Long orderId) {
private void enqueueOrFallback(TtsQueueMessage message) {
try {
TtsQueueMessage message = TtsQueueMessage.builder()
.deviceId(deviceId)
.text(text)
.ttsFlag(ttsFlag)
.orderId(orderId)
.priority(ttsFlag == TTS_FLAG_URGENT ? 1 : 5)
.createTime(System.currentTimeMillis())
.retryCount(0)
.maxRetry(ttsFlag == TTS_FLAG_URGENT ? 3 : 2)
.build();
boolean enqueued = ttsQueueConsumer.enqueue(message);
if (enqueued) {
log.debug("[VoiceBroadcast] 消息入队: deviceId={}, text={}, flag=0x{}, priority={}",
deviceId, text, Integer.toHexString(ttsFlag), message.getPriority());
message.getDeviceId(), message.getText(),
Integer.toHexString(message.getTtsFlag()), message.getPriority());
} else {
// 队列满或其他原因入队失败,降级为直接播报
log.warn("[VoiceBroadcast] 入队失败,降级为直接播报: deviceId={}, text={}", deviceId, text);
broadcastDirect(deviceId, text, ttsFlag, orderId);
log.warn("[VoiceBroadcast] 入队失败,降级为直接播报: deviceId={}, text={}",
message.getDeviceId(), message.getText());
broadcastDirect(message.getDeviceId(), message.getText(),
message.getTtsFlag(), message.getOrderId());
}
} catch (Exception e) {
log.error("[VoiceBroadcast] 队列入队异常,降级为直接播报: deviceId={}, text={}", deviceId, text, e);
broadcastDirect(deviceId, text, ttsFlag, orderId);
log.error("[VoiceBroadcast] 队列入队异常,降级为直接播报: deviceId={}, text={}",
message.getDeviceId(), message.getText(), e);
broadcastDirect(message.getDeviceId(), message.getText(),
message.getTtsFlag(), message.getOrderId());
}
}

View File

@@ -338,7 +338,7 @@ public class CleanOrderEndToEndTest {
verify(eventLogRecorder).record(any());
// 2. TTS sent (orderId can be null for TTS_REQUEST events)
verify(voiceBroadcastService).broadcast(eq(5001L), contains("请回到作业区域"), eq((Long) null));
verify(voiceBroadcastService).broadcastInOrder(eq(5001L), contains("请回到作业区域"), eq((Long) null));
}
// ==========================================

View File

@@ -56,6 +56,21 @@ public interface OpsOrderMapper extends BaseMapperX<OpsOrderDO> {
.orderByAsc(OpsOrderDO::getCreateTime));
}
/**
* 查询区域内客流触发的活跃工单(非终态)
*
* @param areaId 区域ID
* @return 最近创建的活跃客流工单,不存在返回 null
*/
default OpsOrderDO selectActiveTrafficOrder(Long areaId) {
return selectOne(new LambdaQueryWrapperX<OpsOrderDO>()
.eq(OpsOrderDO::getAreaId, areaId)
.eq(OpsOrderDO::getTriggerSource, "IOT_TRAFFIC")
.notIn(OpsOrderDO::getStatus, "COMPLETED", "CANCELLED")
.orderByDesc(OpsOrderDO::getCreateTime)
.last("LIMIT 1"));
}
// 注意分页查询方法需要在Service层实现这里只提供基础查询方法
// 具体分页查询请参考Service实现

View File

@@ -149,7 +149,7 @@ ops:
tts:
queue:
enabled: true # 是否启用 TTS 语音播报队列
interval-ms: 3000 # 同一设备播报间隔(毫秒)
interval-ms: 6000 # 同一设备播报间隔(毫秒)
max-queue-size: 50 # 单个设备队列最大长度
debug: false