Revert "fix(ops): RocketMQ 消费者添加租户上下文防御性兜底"

This reverts commit 7d19e7bafa.
This commit is contained in:
lzh
2026-03-30 15:55:35 +08:00
parent c14ea56dd8
commit bd6a98b766
11 changed files with 649 additions and 479 deletions

View File

@@ -1,128 +1,195 @@
package com.viewsh.module.ops.environment.integration.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
import com.viewsh.framework.tenant.core.util.TenantUtils;
import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO;
import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum;
import com.viewsh.module.ops.environment.integration.dto.IotDeviceStatusChangedEventDTO;
import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusService;
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
import com.viewsh.module.ops.service.area.AreaDeviceService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "integration-device-status",
consumerGroup = "ops-badge-status-consumer",
consumeMode = ConsumeMode.CONCURRENTLY,
selectorExpression = "*",
accessKey = "${rocketmq.consumer.access-key:}",
secretKey = "${rocketmq.consumer.secret-key:}"
)
public class BadgeDeviceStatusEventHandler implements RocketMQListener<String> {
private static final int DEDUP_TTL_SECONDS = 120;
@Resource
private ObjectMapper objectMapper;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private BadgeDeviceStatusService badgeDeviceStatusService;
@Resource
private AreaDeviceService areaDeviceService;
@Override
public void onMessage(String message) {
try {
IotDeviceStatusChangedEventDTO event = objectMapper.readValue(message, IotDeviceStatusChangedEventDTO.class);
log.debug("[BadgeDeviceStatusEventHandler] 收到设备状态变更事件: eventId={}, deviceId={}, {} -> {}",
event.getEventId(), event.getDeviceId(), event.getOldStatus(), event.getNewStatus());
String dedupKey = OpsRedisKeyBuilder.badgeDedup(event.getTenantId(), event.getEventId());
Boolean firstTime = stringRedisTemplate.opsForValue()
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
if (Boolean.FALSE.equals(firstTime)) {
log.debug("[BadgeDeviceStatusEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
return;
}
executeInTenantContext(event.getTenantId(), () -> handleDeviceStatusChange(event));
} catch (Exception e) {
log.error("[BadgeDeviceStatusEventHandler] 消息处理失败: message={}", message, e);
}
}
private void handleDeviceStatusChange(IotDeviceStatusChangedEventDTO event) {
if (!isBadgeDevice(event.getDeviceId())) {
log.debug("[BadgeDeviceStatusEventHandler] 非工牌设备,忽略状态同步: deviceId={}", event.getDeviceId());
return;
}
Long areaId = getAreaIdByDeviceId(event.getDeviceId());
if (event.isOnline()) {
badgeDeviceStatusService.updateBadgeOnlineStatus(
event.getDeviceId(), event.getDeviceName(), event.getNickname(), areaId,
BadgeDeviceStatusEnum.IDLE, "设备上线");
return;
}
if (event.isOffline()) {
badgeDeviceStatusService.updateBadgeOnlineStatus(
event.getDeviceId(), event.getDeviceName(), event.getNickname(), null,
BadgeDeviceStatusEnum.OFFLINE, "设备离线");
return;
}
log.debug("[BadgeDeviceStatusEventHandler] 忽略未处理的状态变更: deviceId={}, newStatus={}",
event.getDeviceId(), event.getNewStatus());
}
private boolean isBadgeDevice(Long deviceId) {
if (deviceId == null) {
return false;
}
try {
OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId);
return relation != null && "BADGE".equals(relation.getRelationType());
} catch (Exception e) {
log.warn("[BadgeDeviceStatusEventHandler] 查询设备绑定关系失败: deviceId={}", deviceId, e);
return false;
}
}
private Long getAreaIdByDeviceId(Long deviceId) {
if (deviceId == null) {
return null;
}
try {
OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId);
if (relation != null && "BADGE".equals(relation.getRelationType())) {
return relation.getAreaId();
}
return null;
} catch (Exception e) {
log.warn("[BadgeDeviceStatusEventHandler] 查询设备所属区域失败: deviceId={}", deviceId, e);
return null;
}
}
private void executeInTenantContext(Long tenantId, Runnable runnable) {
Long currentTenantId = TenantContextHolder.getTenantId();
if (tenantId == null || Objects.equals(currentTenantId, tenantId)) {
runnable.run();
return;
}
TenantUtils.execute(tenantId, runnable);
}
}
package com.viewsh.module.ops.environment.integration.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO;
import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum;
import com.viewsh.module.ops.environment.integration.dto.IotDeviceStatusChangedEventDTO;
import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusService;
import com.viewsh.module.ops.service.area.AreaDeviceService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 工牌设备状态变更事件消费者
* <p>
* 监听 IoT 模块发布的设备状态变更事件(上线/离线)
* <p>
* 调用链路:
*
* <pre>
* IoT 设备连接/断开
*
* IotDeviceServiceImpl.updateDeviceState()
* ↓
* RocketMQ Topic: integration-device-status
* ↓
* BadgeDeviceStatusEventHandler.onMessage()
* - 幂等性检查
* - 判断是否为工牌设备
* ↓
* badgeDeviceStatusService.updateBadgeOnlineStatus()
* - 创建/更新 Redis 状态记录
* </pre>
*
* @author AI
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = "integration-device-status",
consumerGroup = "ops-badge-status-consumer",
consumeMode = ConsumeMode.CONCURRENTLY,
selectorExpression = "*",
accessKey = "${rocketmq.consumer.access-key:}",
secretKey = "${rocketmq.consumer.secret-key:}"
)
public class BadgeDeviceStatusEventHandler implements RocketMQListener<String> {
/**
* 幂等性控制 Key 模式
*/
private static final String DEDUP_KEY_PATTERN = "ops:badge:dedup:status:%s";
/**
* 幂等性控制 TTL
*/
private static final int DEDUP_TTL_SECONDS = 120;
@Resource
private ObjectMapper objectMapper;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private BadgeDeviceStatusService badgeDeviceStatusService;
@Resource
private AreaDeviceService areaDeviceService;
@Override
public void onMessage(String message) {
try {
// 1. JSON 反序列化
IotDeviceStatusChangedEventDTO event = objectMapper.readValue(message,
IotDeviceStatusChangedEventDTO.class);
log.debug("[BadgeDeviceStatusEventHandler] 收到设备状态变更事件: eventId={}, deviceId={}, {} -> {}",
event.getEventId(), event.getDeviceId(), event.getOldStatus(), event.getNewStatus());
// 2. 幂等性检查
String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId());
Boolean firstTime = stringRedisTemplate.opsForValue()
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
if (Boolean.FALSE.equals(firstTime)) {
log.debug("[BadgeDeviceStatusEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
return;
}
// 3. 判断是否为工牌设备
if (!isBadgeDevice(event.getDeviceId())) {
log.debug("[BadgeDeviceStatusEventHandler] 非工牌设备,跳过处理: deviceId={}", event.getDeviceId());
return;
}
// 4. 处理状态变更
handleDeviceStatusChange(event);
} catch (Exception e) {
log.error("[BadgeDeviceStatusEventHandler] 消息处理失败: message={}", message, e);
// 不抛出异常,避免消息重试导致重复处理
}
}
/**
* 处理设备状态变更
*/
private void handleDeviceStatusChange(IotDeviceStatusChangedEventDTO event) {
Long deviceId = event.getDeviceId();
String deviceCode = event.getDeviceName();
String nickname = event.getNickname();
// 获取设备所属区域
Long areaId = getAreaIdByDeviceId(deviceId);
if (event.isOnline()) {
// 设备上线
log.info("[BadgeDeviceStatusEventHandler] 工牌设备上线: deviceId={}, deviceCode={}, nickname={}, areaId={}",
deviceId, deviceCode, nickname, areaId);
badgeDeviceStatusService.updateBadgeOnlineStatus(
deviceId,
deviceCode,
nickname,
areaId,
BadgeDeviceStatusEnum.IDLE,
"设备上线");
} else if (event.isOffline()) {
// 设备离线
log.info("[BadgeDeviceStatusEventHandler] 工牌设备离线: deviceId={}, deviceCode={}",
deviceId, deviceCode);
badgeDeviceStatusService.updateBadgeOnlineStatus(
deviceId,
deviceCode,
nickname,
null,
BadgeDeviceStatusEnum.OFFLINE,
"设备离线");
} else {
// 其他状态(如 INACTIVE暂不处理
log.debug("[BadgeDeviceStatusEventHandler] 忽略状态变更: deviceId={}, newStatus={}",
deviceId, event.getNewStatus());
}
}
/**
* 判断是否为工牌设备
* <p>
* 通过查询 ops_area_device_relation 表判断设备是否为 BADGE 类型
*/
private boolean isBadgeDevice(Long deviceId) {
if (deviceId == null) {
return false;
}
try {
// 通过区域设备关系判断是否为工牌设备
OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId);
return relation != null && "BADGE".equals(relation.getRelationType());
} catch (Exception e) {
log.warn("[BadgeDeviceStatusEventHandler] 查询设备类型失败: deviceId={}", deviceId, e);
return false;
}
}
/**
* 获取设备所属区域ID
*/
private Long getAreaIdByDeviceId(Long deviceId) {
if (deviceId == null) {
return null;
}
try {
OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId);
if (relation != null && "BADGE".equals(relation.getRelationType())) {
return relation.getAreaId();
}
return null;
} catch (Exception e) {
log.warn("[BadgeDeviceStatusEventHandler] 获取设备区域失败: deviceId={}", deviceId, e);
return null;
}
}
}

View File

@@ -1,118 +1,160 @@
package com.viewsh.module.ops.environment.integration.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
import com.viewsh.framework.tenant.core.util.TenantUtils;
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.OperatorTypeEnum;
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
import com.viewsh.module.ops.environment.integration.dto.CleanOrderArriveEventDTO;
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "ops-order-arrive",
consumerGroup = "ops-clean-order-arrive-group",
consumeMode = ConsumeMode.CONCURRENTLY,
selectorExpression = "*",
accessKey = "${rocketmq.consumer.access-key:}",
secretKey = "${rocketmq.consumer.secret-key:}"
)
public class CleanOrderArriveEventHandler implements RocketMQListener<String> {
private static final int DEDUP_TTL_SECONDS = 300;
@Resource
private ObjectMapper objectMapper;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private OpsOrderMapper opsOrderMapper;
@Resource
private OrderLifecycleManager orderLifecycleManager;
@Override
public void onMessage(String message) {
try {
CleanOrderArriveEventDTO event = objectMapper.readValue(message, CleanOrderArriveEventDTO.class);
String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "arrive", event.getEventId());
Boolean firstTime = stringRedisTemplate.opsForValue()
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
if (!Boolean.TRUE.equals(firstTime)) {
log.debug("[CleanOrderArriveEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
return;
}
executeInTenantContext(event.getTenantId(), () -> handleOrderArrive(event));
} catch (Exception e) {
log.error("[CleanOrderArriveEventHandler] 消息处理失败: message={}", message, e);
throw new RuntimeException("保洁工单到岗事件处理失败", e);
}
}
private void handleOrderArrive(CleanOrderArriveEventDTO event) {
log.info("[CleanOrderArriveEventHandler] 收到到岗事件: eventId={}, orderId={}, deviceId={}, areaId={}",
event.getEventId(), event.getOrderId(), event.getDeviceId(), event.getAreaId());
OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId());
if (order == null) {
log.warn("[CleanOrderArriveEventHandler] 工单不存在: orderId={}", event.getOrderId());
return;
}
WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus());
if (!currentStatus.canStartWorking()) {
log.warn("[CleanOrderArriveEventHandler] 当前状态不允许到岗: orderId={}, status={}",
event.getOrderId(), order.getStatus());
return;
}
Map<String, Object> payload = new HashMap<>();
payload.put("deviceId", event.getDeviceId());
payload.put("deviceKey", event.getDeviceKey());
payload.put("areaId", event.getAreaId());
payload.put("triggerSource", event.getTriggerSource());
if (event.getTriggerData() != null) {
payload.putAll(event.getTriggerData());
}
OrderTransitionRequest request = OrderTransitionRequest.builder()
.orderId(event.getOrderId())
.targetStatus(WorkOrderStatusEnum.ARRIVED)
.operatorType(OperatorTypeEnum.SYSTEM)
.reason("设备触发自动到岗")
.payload(payload)
.build();
orderLifecycleManager.transition(request);
log.info("[CleanOrderArriveEventHandler] 工单到岗完成: eventId={}, orderId={}, deviceId={}",
event.getEventId(), event.getOrderId(), event.getDeviceId());
}
private void executeInTenantContext(Long tenantId, Runnable runnable) {
Long currentTenantId = TenantContextHolder.getTenantId();
if (tenantId == null || Objects.equals(currentTenantId, tenantId)) {
runnable.run();
return;
}
TenantUtils.execute(tenantId, runnable);
}
}
package com.viewsh.module.ops.environment.integration.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.OperatorTypeEnum;
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
import com.viewsh.module.ops.environment.integration.dto.CleanOrderArriveEventDTO;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 保洁工单到岗事件消费者
* <p>
* 订阅 IoT 模块发布的保洁工单到岗事件
* <p>
* 调用链路:
* <pre>
* IoT 发布 RocketMQ 消息 (ops-order-arrive)
* ↓
* CleanOrderArriveEventHandler.onMessage()
* - 幂等性检查
* - 状态检查
*
* orderLifecycleManager.transition(ARRIVED) [@Transactional]
* - 更新工单状态
* - 发布 OrderStateChangedEvent
*
* 事务提交
* ↓
* CleanOrderEventListener.handleArrived() [@Async, AFTER_COMMIT]
* - 记录到岗时间到扩展表
* - 更新设备工单关联信息areaId, beaconMac
* - 记录业务日志
* </pre>
* <p>
* 设计说明:
* - Handler 只负责消息接收和状态转换
* - 业务日志、设备缓存更新由 CleanOrderEventListener 在事务提交后处理
* - BadgeDeviceStatusServiceImpl 在 BEFORE_COMMIT 阶段跳过 ARRIVED由 Listener 处理完整信息)
*
* @author AI
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = "ops-order-arrive",
consumerGroup = "ops-clean-order-arrive-group",
consumeMode = ConsumeMode.CONCURRENTLY,
selectorExpression = "*",
accessKey = "${rocketmq.consumer.access-key:}",
secretKey = "${rocketmq.consumer.secret-key:}"
)
public class CleanOrderArriveEventHandler implements RocketMQListener<String> {
/**
* 幂等性控制 Key 模式
*/
private static final String DEDUP_KEY_PATTERN = "ops:clean:dedup:arrive:%s";
/**
* 幂等性控制 TTL
*/
private static final int DEDUP_TTL_SECONDS = 300;
@Resource
private ObjectMapper objectMapper;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private OpsOrderMapper opsOrderMapper;
@Resource
private OrderLifecycleManager orderLifecycleManager;
@Override
public void onMessage(String message) {
try {
// 1. JSON 反序列化
CleanOrderArriveEventDTO event = objectMapper.readValue(message, CleanOrderArriveEventDTO.class);
// 2. 幂等性检查
String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId());
Boolean firstTime = stringRedisTemplate.opsForValue()
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
if (!firstTime) {
log.debug("[CleanOrderArriveEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
return;
}
// 3. 业务处理
handleOrderArrive(event);
} catch (Exception e) {
log.error("[CleanOrderArriveEventHandler] 消息处理失败: message={}", message, e);
throw new RuntimeException("保洁工单到岗事件处理失败", e);
}
}
/**
* 处理工单到岗
*/
private void handleOrderArrive(CleanOrderArriveEventDTO event) {
log.info("[CleanOrderArriveEventHandler] 收到到岗事件: eventId={}, orderId={}, deviceId={}, areaId={}",
event.getEventId(), event.getOrderId(), event.getDeviceId(), event.getAreaId());
// 1. 查询工单
OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId());
if (order == null) {
log.warn("[CleanOrderArriveEventHandler] 工单不存在: orderId={}", event.getOrderId());
return;
}
// 2. 检查工单状态
WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus());
if (!currentStatus.canStartWorking()) {
log.warn("[CleanOrderArriveEventHandler] 工单状态不允许到岗: orderId={}, status={}",
event.getOrderId(), order.getStatus());
return;
}
// 3. 构建状态转换请求(包含完整信息,供 Listener 使用)
Map<String, Object> payload = new HashMap<>();
payload.put("deviceId", event.getDeviceId());
payload.put("deviceKey", event.getDeviceKey());
payload.put("areaId", event.getAreaId());
payload.put("triggerSource", event.getTriggerSource());
if (event.getTriggerData() != null) {
payload.putAll(event.getTriggerData());
}
OrderTransitionRequest request = OrderTransitionRequest.builder()
.orderId(event.getOrderId())
.targetStatus(WorkOrderStatusEnum.ARRIVED)
.operatorType(OperatorTypeEnum.SYSTEM)
.reason("蓝牙信标自动到岗确认")
.payload(payload)
.build();
// 4. 执行状态转换
// 注意:业务日志和设备缓存更新由 CleanOrderEventListener 在 AFTER_COMMIT 阶段处理
orderLifecycleManager.transition(request);
log.info("[CleanOrderArriveEventHandler] 工单到岗成功: eventId={}, orderId={}, deviceId={}",
event.getEventId(), event.getOrderId(), event.getDeviceId());
}
}

View File

@@ -1,8 +1,6 @@
package com.viewsh.module.ops.environment.integration.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
import com.viewsh.framework.tenant.core.util.TenantUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
@@ -15,9 +13,8 @@ import com.viewsh.module.ops.infrastructure.log.enumeration.EventDomain;
import com.viewsh.module.ops.infrastructure.log.enumeration.EventLevel;
import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule;
import com.viewsh.module.ops.infrastructure.log.enumeration.LogType;
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord;
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder;
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord;
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
@@ -26,9 +23,8 @@ import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
* 保洁工单审计事件消费者
@@ -88,7 +84,7 @@ public class CleanOrderAuditEventHandler implements RocketMQListener<String> {
CleanOrderAuditEventDTO event = objectMapper.readValue(message, CleanOrderAuditEventDTO.class);
// 2. 幂等性检查
String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "audit", event.getEventId());
String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId());
Boolean firstTime = stringRedisTemplate.opsForValue()
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
@@ -98,7 +94,7 @@ public class CleanOrderAuditEventHandler implements RocketMQListener<String> {
}
// 3. 业务处理
executeInTenantContext(event.getTenantId(), () -> handleAuditEvent(event));
handleAuditEvent(event);
} catch (Exception e) {
log.error("[CleanOrderAuditEventHandler] 消息处理失败: message={}", message, e);
@@ -260,14 +256,5 @@ public class CleanOrderAuditEventHandler implements RocketMQListener<String> {
return EventLevel.WARN;
}
return EventLevel.INFO;
}
private void executeInTenantContext(Long tenantId, Runnable runnable) {
Long currentTenantId = TenantContextHolder.getTenantId();
if (tenantId == null || Objects.equals(currentTenantId, tenantId)) {
runnable.run();
return;
}
TenantUtils.execute(tenantId, runnable);
}
}
}
}

View File

@@ -1,130 +1,191 @@
package com.viewsh.module.ops.environment.integration.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
import com.viewsh.framework.tenant.core.util.TenantUtils;
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.OperatorTypeEnum;
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
import com.viewsh.module.ops.environment.integration.dto.CleanOrderCompleteEventDTO;
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "ops-order-complete",
consumerGroup = "ops-clean-order-complete-group",
consumeMode = ConsumeMode.CONCURRENTLY,
selectorExpression = "*",
accessKey = "${rocketmq.consumer.access-key:}",
secretKey = "${rocketmq.consumer.secret-key:}"
)
public class CleanOrderCompleteEventHandler implements RocketMQListener<String> {
private static final int DEDUP_TTL_SECONDS = 300;
@Resource
private ObjectMapper objectMapper;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private OpsOrderMapper opsOrderMapper;
@Resource
private OrderLifecycleManager orderLifecycleManager;
@Override
public void onMessage(String message) {
try {
CleanOrderCompleteEventDTO event = objectMapper.readValue(message, CleanOrderCompleteEventDTO.class);
String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "complete", event.getEventId());
Boolean firstTime = stringRedisTemplate.opsForValue()
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
if (!Boolean.TRUE.equals(firstTime)) {
log.debug("[CleanOrderCompleteEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
return;
}
executeInTenantContext(event.getTenantId(), () -> handleOrderComplete(event));
} catch (Exception e) {
log.error("[CleanOrderCompleteEventHandler] 消息处理失败: message={}", message, e);
throw new RuntimeException("保洁工单完单事件处理失败", e);
}
}
private void handleOrderComplete(CleanOrderCompleteEventDTO event) {
log.info("[CleanOrderCompleteEventHandler] 收到完单事件: eventId={}, orderId={}, deviceId={}, areaId={}",
event.getEventId(), event.getOrderId(), event.getDeviceId(), event.getAreaId());
OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId());
if (order == null) {
log.warn("[CleanOrderCompleteEventHandler] 工单不存在: orderId={}", event.getOrderId());
return;
}
WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus());
if (!currentStatus.canComplete()) {
log.warn("[CleanOrderCompleteEventHandler] 当前状态不允许完单: orderId={}, status={}",
event.getOrderId(), order.getStatus());
return;
}
Map<String, Object> payload = new HashMap<>();
payload.put("deviceId", event.getDeviceId());
payload.put("deviceKey", event.getDeviceKey());
payload.put("areaId", event.getAreaId());
payload.put("triggerSource", event.getTriggerSource());
if (event.getTriggerData() != null) {
payload.putAll(event.getTriggerData());
}
OrderTransitionRequest request = OrderTransitionRequest.builder()
.orderId(event.getOrderId())
.targetStatus(WorkOrderStatusEnum.COMPLETED)
.operatorType(OperatorTypeEnum.SYSTEM)
.assigneeId(event.getDeviceId())
.reason(buildCompletionRemark(event))
.payload(payload)
.build();
orderLifecycleManager.transition(request);
log.info("[CleanOrderCompleteEventHandler] 工单完单完成: eventId={}, orderId={}",
event.getEventId(), event.getOrderId());
}
private String buildCompletionRemark(CleanOrderCompleteEventDTO event) {
StringBuilder remark = new StringBuilder("设备触发自动完单");
if (event.getTriggerData() != null) {
Object durationMs = event.getTriggerData().get("durationMs");
if (durationMs instanceof Number number) {
remark.append(",作业时长约 ").append(number.longValue() / 60000).append(" 分钟");
}
}
return remark.toString();
}
private void executeInTenantContext(Long tenantId, Runnable runnable) {
Long currentTenantId = TenantContextHolder.getTenantId();
if (tenantId == null || Objects.equals(currentTenantId, tenantId)) {
runnable.run();
return;
}
TenantUtils.execute(tenantId, runnable);
}
}
package com.viewsh.module.ops.environment.integration.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.OperatorTypeEnum;
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
import com.viewsh.module.ops.environment.integration.dto.CleanOrderCompleteEventDTO;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 保洁工单完成事件消费者
* <p>
* 订阅 IoT 模块发布的保洁工单完成事件
* <p>
* 调用链路:
*
* <pre>
* IoT 发布 RocketMQ 消息 (ops-order-complete)
*
* CleanOrderCompleteEventHandler.onMessage()
* - 幂等性检查
* - 状态检查
* ↓
* orderLifecycleManager.transition(COMPLETED) [@Transactional]
* - 更新工单状态
* - 发布 OrderStateChangedEvent
* ↓
* 事务提交
* ↓
* BadgeDeviceStatusEventListener.onOrderStateChanged() [BEFORE_COMMIT]
* - 清除设备工单关联
* - 检查等待任务,决定设备状态 (BUSY/IDLE)
* ↓
* CleanOrderEventListener.handleCompleted() [AFTER_COMMIT]
* - 记录完成时间到扩展表
* - 记录业务日志
* CleanOrderEventListener.onOrderCompleted() [AFTER_COMMIT]
* - 自动调度下一个任务
* - 发送完成通知
* </pre>
* <p>
* 设计说明:
* - Handler 只负责消息接收和状态转换
* - 设备状态由 BadgeDeviceStatusEventListener 在 BEFORE_COMMIT 阶段处理
* - 业务日志、自动调度由 CleanOrderEventListener 在 AFTER_COMMIT 阶段处理
*
* @author AI
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = "ops-order-complete",
consumerGroup = "ops-clean-order-complete-group",
consumeMode = ConsumeMode.CONCURRENTLY,
selectorExpression = "*",
accessKey = "${rocketmq.consumer.access-key:}",
secretKey = "${rocketmq.consumer.secret-key:}"
)
public class CleanOrderCompleteEventHandler implements RocketMQListener<String> {
/**
* 幂等性控制 Key 模式
*/
private static final String DEDUP_KEY_PATTERN = "ops:clean:dedup:complete:%s";
/**
* 幂等性控制 TTL
*/
private static final int DEDUP_TTL_SECONDS = 300;
@Resource
private ObjectMapper objectMapper;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private OpsOrderMapper opsOrderMapper;
@Resource
private OrderLifecycleManager orderLifecycleManager;
@Override
public void onMessage(String message) {
try {
// 1. JSON 反序列化
CleanOrderCompleteEventDTO event = objectMapper.readValue(message, CleanOrderCompleteEventDTO.class);
// 2. 幂等性检查
String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId());
Boolean firstTime = stringRedisTemplate.opsForValue()
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
if (!firstTime) {
log.debug("[CleanOrderCompleteEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
return;
}
// 3. 业务处理
handleOrderComplete(event);
} catch (Exception e) {
log.error("[CleanOrderCompleteEventHandler] 消息处理失败: message={}", message, e);
throw new RuntimeException("保洁工单完成事件处理失败", e);
}
}
/**
* 处理工单完成
*/
private void handleOrderComplete(CleanOrderCompleteEventDTO event) {
log.info("[CleanOrderCompleteEventHandler] 收到完成事件: eventId={}, orderId={}, deviceId={}, areaId={}",
event.getEventId(), event.getOrderId(), event.getDeviceId(), event.getAreaId());
// 1. 查询工单
OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId());
if (order == null) {
log.warn("[CleanOrderCompleteEventHandler] 工单不存在: orderId={}", event.getOrderId());
return;
}
// 2. 检查工单状态
WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus());
if (!currentStatus.canComplete()) {
log.warn("[CleanOrderCompleteEventHandler] 工单状态不允许完成: orderId={}, status={}",
event.getOrderId(), order.getStatus());
return;
}
// 3. 计算作业时长
String remark = buildCompletionRemark(event);
// 4. 构建状态转换请求
Map<String, Object> payload = new HashMap<>();
payload.put("deviceId", event.getDeviceId());
payload.put("deviceKey", event.getDeviceKey());
payload.put("areaId", event.getAreaId());
payload.put("triggerSource", event.getTriggerSource());
if (event.getTriggerData() != null) {
payload.putAll(event.getTriggerData());
}
OrderTransitionRequest request = OrderTransitionRequest.builder()
.orderId(event.getOrderId())
.targetStatus(WorkOrderStatusEnum.COMPLETED)
.operatorType(OperatorTypeEnum.SYSTEM)
.assigneeId(event.getDeviceId()) // 设备ID用于后续自动调度
.reason(remark)
.payload(payload)
.build();
// 5. 执行状态转换
// 注意:设备状态、业务日志、自动调度由事件监听器处理
orderLifecycleManager.transition(request);
log.info("[CleanOrderCompleteEventHandler] 工单完成成功: eventId={}, orderId={}",
event.getEventId(), event.getOrderId());
}
/**
* 构建完成备注
*/
private String buildCompletionRemark(CleanOrderCompleteEventDTO event) {
StringBuilder remark = new StringBuilder();
remark.append("信号丢失超时自动完成");
if (event.getTriggerData() != null) {
Object durationMs = event.getTriggerData().get("durationMs");
if (durationMs != null) {
long durationMinutes = ((Number) durationMs).longValue() / 60000;
remark.append(",作业时长: ").append(durationMinutes).append("分钟");
}
}
return remark.toString();
}
}

View File

@@ -1,8 +1,6 @@
package com.viewsh.module.ops.environment.integration.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
import com.viewsh.framework.tenant.core.util.TenantUtils;
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
@@ -10,7 +8,6 @@ import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.OperatorTypeEnum;
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
import com.viewsh.module.ops.environment.integration.dto.CleanOrderConfirmEventDTO;
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
@@ -19,9 +16,39 @@ import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* 保洁工单确认事件消费者
* <p>
* 订阅 IoT 模块发布的工单确认事件(如:工牌按键确认)
* <p>
* 调用链路:
* <pre>
* IoT 发布 RocketMQ 消息 (ops-order-confirm)
* ↓
* CleanOrderConfirmEventHandler.onMessage()
* - 幂等性检查
* - 状态检查
* ↓
* orderLifecycleManager.transition(CONFIRMED) [@Transactional]
* - 更新工单状态
* - 发布 OrderStateChangedEvent
* ↓
* 事务提交
* ↓
* CleanOrderEventListener.onOrderStateChanged() [@TransactionalEventListener(AFTER_COMMIT)]
* - 记录业务日志
* - 发送 TTS "工单已确认,请前往作业区域开始作业"
* </pre>
* <p>
* 设计说明:
* - Handler 只负责消息接收和状态转换
* - 日志记录和通知发送由 CleanOrderEventListener 在事务提交后处理
* - 设备状态由 BadgeDeviceStatusServiceImpl 在 BEFORE_COMMIT 阶段同步
*
* @author AI
*/
@Slf4j
@Component
@RocketMQMessageListener(
@@ -33,6 +60,7 @@ import java.util.concurrent.TimeUnit;
)
public class CleanOrderConfirmEventHandler implements RocketMQListener<String> {
private static final String DEDUP_KEY_PATTERN = "ops:clean:dedup:confirm:%s";
private static final int DEDUP_TTL_SECONDS = 300;
@Resource
@@ -50,61 +78,66 @@ public class CleanOrderConfirmEventHandler implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
try {
// 1. JSON 反序列化
CleanOrderConfirmEventDTO event = objectMapper.readValue(message, CleanOrderConfirmEventDTO.class);
String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "confirm", event.getEventId());
String eventId = event.getEventId();
// 2. 幂等性检查
String dedupKey = String.format(DEDUP_KEY_PATTERN, eventId);
Boolean firstTime = stringRedisTemplate.opsForValue()
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
if (!Boolean.TRUE.equals(firstTime)) {
log.debug("[CleanOrderConfirmEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
log.debug("[CleanOrderConfirmEventHandler] 重复事件,跳过处理: eventId={}", eventId);
return;
}
executeInTenantContext(event.getTenantId(), () -> handleConfirmEvent(event));
// 3. 加载工单
Long orderId = event.getOrderId();
OpsOrderDO order = opsOrderMapper.selectById(orderId);
if (order == null) {
log.warn("[CleanOrderConfirmEventHandler] 工单不存在: orderId={}", orderId);
return;
}
WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus());
log.info("[CleanOrderConfirmEventHandler] 收到确认事件: eventId={}, orderId={}, currentStatus={}, deviceId={}",
eventId, orderId, currentStatus, event.getDeviceId());
// 4. 状态检查
// 如果已在进行中 (CONFIRMED or ARRIVED)直接返回TTS 由 Listener 处理)
if (currentStatus == WorkOrderStatusEnum.CONFIRMED || currentStatus == WorkOrderStatusEnum.ARRIVED) {
log.debug("[CleanOrderConfirmEventHandler] 工单已在进行中: orderId={}, status={}", orderId, currentStatus);
return;
}
// 检查是否可以确认
if (!currentStatus.canConfirm()) {
log.warn("[CleanOrderConfirmEventHandler] 当前状态无法确认工单: orderId={}, status={}", orderId, currentStatus);
return;
}
// 5. 构建状态转换请求(包含 deviceId供 Listener 使用)
OrderTransitionRequest request = OrderTransitionRequest.builder()
.orderId(orderId)
.targetStatus(WorkOrderStatusEnum.CONFIRMED)
.reason("工牌按键确认")
.operatorType(OperatorTypeEnum.CLEANER)
.operatorId(event.getDeviceId() != null ? event.getDeviceId() : order.getAssigneeId())
.build();
// 将 deviceId 放入 payload供 Listener 使用
request.putPayload("deviceId", event.getDeviceId());
request.putPayload("triggerSource", "BADGE_BUTTON");
// 6. 执行状态转换
// 注意:日志记录和 TTS 由 CleanOrderEventListener 在 AFTER_COMMIT 阶段处理
orderLifecycleManager.transition(request);
log.info("[CleanOrderConfirmEventHandler] 工单确认成功: orderId={}, deviceId={}", orderId, event.getDeviceId());
} catch (Exception e) {
log.error("[CleanOrderConfirmEventHandler] 消息处理失败: message={}", message, e);
throw new RuntimeException("保洁工单确认事件处理失败", e);
}
}
private void handleConfirmEvent(CleanOrderConfirmEventDTO event) {
Long orderId = event.getOrderId();
OpsOrderDO order = opsOrderMapper.selectById(orderId);
if (order == null) {
log.warn("[CleanOrderConfirmEventHandler] 工单不存在: orderId={}", orderId);
return;
}
WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus());
log.info("[CleanOrderConfirmEventHandler] 收到确认事件: eventId={}, orderId={}, currentStatus={}, deviceId={}",
event.getEventId(), orderId, currentStatus, event.getDeviceId());
if (currentStatus == WorkOrderStatusEnum.CONFIRMED || currentStatus == WorkOrderStatusEnum.ARRIVED) {
log.debug("[CleanOrderConfirmEventHandler] 工单已确认或已到岗,跳过处理: orderId={}, status={}", orderId, currentStatus);
return;
}
if (!currentStatus.canConfirm()) {
log.warn("[CleanOrderConfirmEventHandler] 当前状态不允许确认: orderId={}, status={}", orderId, currentStatus);
return;
}
OrderTransitionRequest request = OrderTransitionRequest.builder()
.orderId(orderId)
.targetStatus(WorkOrderStatusEnum.CONFIRMED)
.reason("设备确认工单")
.operatorType(OperatorTypeEnum.CLEANER)
.operatorId(event.getDeviceId() != null ? event.getDeviceId() : order.getAssigneeId())
.build();
request.putPayload("deviceId", event.getDeviceId());
request.putPayload("triggerSource", "BADGE_BUTTON");
orderLifecycleManager.transition(request);
log.info("[CleanOrderConfirmEventHandler] 工单确认完成: orderId={}, deviceId={}", orderId, event.getDeviceId());
}
private void executeInTenantContext(Long tenantId, Runnable runnable) {
Long currentTenantId = TenantContextHolder.getTenantId();
if (tenantId == null || Objects.equals(currentTenantId, tenantId)) {
runnable.run();
return;
}
TenantUtils.execute(tenantId, runnable);
}
}

View File

@@ -1,8 +1,6 @@
package com.viewsh.module.ops.environment.integration.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
import com.viewsh.framework.tenant.core.util.TenantUtils;
import com.viewsh.module.iot.api.device.IotDeviceControlApi;
import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
@@ -20,7 +18,6 @@ import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule;
import com.viewsh.module.ops.infrastructure.log.enumeration.LogType;
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord;
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder;
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
@@ -31,7 +28,6 @@ import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -110,7 +106,7 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
CleanOrderCreateEventDTO event = objectMapper.readValue(message, CleanOrderCreateEventDTO.class);
// 2. 幂等性检查
String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "create", event.getEventId());
String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId());
Boolean firstTime = stringRedisTemplate.opsForValue()
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
@@ -120,7 +116,7 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
}
// 3. 业务处理
executeInTenantContext(event.getTenantId(), () -> handleOrderCreate(event));
handleOrderCreate(event);
} catch (Exception e) {
log.error("[CleanOrderCreateEventHandler] 消息处理失败: message={}", message, e);
@@ -509,13 +505,4 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
}
return null;
}
private void executeInTenantContext(Long tenantId, Runnable runnable) {
Long currentTenantId = TenantContextHolder.getTenantId();
if (tenantId == null || Objects.equals(currentTenantId, tenantId)) {
runnable.run();
return;
}
TenantUtils.execute(tenantId, runnable);
}
}

View File

@@ -1,7 +1,6 @@
package com.viewsh.module.ops.environment.integration.consumer;
import cn.hutool.core.util.StrUtil;
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
@@ -20,6 +19,8 @@ import java.time.Duration;
@Service
public class IntegrationEventDeduplicationService {
private static final String DEDUP_KEY_PREFIX = "integration:event:dedup:";
@Resource
private StringRedisTemplate stringRedisTemplate;
@@ -35,7 +36,7 @@ public class IntegrationEventDeduplicationService {
return false;
}
String key = OpsRedisKeyBuilder.eventDedup("integration", eventId);
String key = DEDUP_KEY_PREFIX + eventId;
// setNX如果 key 不存在则设置,返回 true
Boolean success = stringRedisTemplate.opsForValue()
@@ -59,7 +60,7 @@ public class IntegrationEventDeduplicationService {
return;
}
String key = OpsRedisKeyBuilder.eventDedup("integration", eventId);
String key = DEDUP_KEY_PREFIX + eventId;
stringRedisTemplate.opsForValue().set(key, "1", Duration.ofHours(24));
}
@@ -74,7 +75,7 @@ public class IntegrationEventDeduplicationService {
return false;
}
String key = OpsRedisKeyBuilder.eventDedup("integration", eventId);
String key = DEDUP_KEY_PREFIX + eventId;
Boolean exists = stringRedisTemplate.hasKey(key);
return Boolean.TRUE.equals(exists);
}

View File

@@ -48,9 +48,7 @@ public class CleanOrderArriveEventDTO {
/**
* 区域ID
*/
private Long areaId;
private Long tenantId;
private Long areaId;
/**
* 触发来源IOT_BEACON=蓝牙信标检测)

View File

@@ -63,9 +63,7 @@ public class CleanOrderAuditEventDTO {
/**
* 区域ID
*/
private Long areaId;
private Long tenantId;
private Long areaId;
/**
* 消息内容

View File

@@ -48,9 +48,7 @@ public class CleanOrderCompleteEventDTO {
/**
* 区域ID
*/
private Long areaId;
private Long tenantId;
private Long areaId;
/**
* 触发来源IOT_SIGNAL_LOSS=信号丢失超时)

View File

@@ -53,9 +53,7 @@ public class CleanOrderCreateEventDTO {
/**
* 优先级0=P0紧急 1=P1重要 2=P2普通
*/
private Integer priority;
private Long tenantId;
private Integer priority;
/**
* 触发数据JSON 格式的附加信息)