fix(ops): 恢复集成消费项目上下文

This commit is contained in:
lzh
2026-04-29 22:21:02 +08:00
parent 026a126824
commit f080cff7b7
17 changed files with 842 additions and 666 deletions

View File

@@ -46,6 +46,9 @@ public class BadgeDeviceStatusEventHandler implements RocketMQListener<String> {
@Resource @Resource
private AreaDeviceService areaDeviceService; private AreaDeviceService areaDeviceService;
@Resource
private IntegrationProjectContextExecutor projectContextExecutor;
@Override @Override
public void onMessage(String message) { public void onMessage(String message) {
try { try {
@@ -66,12 +69,16 @@ public class BadgeDeviceStatusEventHandler implements RocketMQListener<String> {
} }
private void handleDeviceStatusChange(IotDeviceStatusChangedEventDTO event) { private void handleDeviceStatusChange(IotDeviceStatusChangedEventDTO event) {
if (!isBadgeDevice(event.getDeviceId())) { OpsAreaDeviceRelationDO relation = getBadgeRelation(event.getDeviceId());
log.debug("[BadgeDeviceStatusEventHandler] 非工牌设备,忽略状态同步: deviceId={}", event.getDeviceId()); if (relation == null) {
log.debug("[BadgeDeviceStatusEventHandler] ignore non-badge device status event: deviceId={}", event.getDeviceId());
return; return;
} }
projectContextExecutor.execute(event.getProjectId(), null, relation.getAreaId(), event.getDeviceId(), event.getEventId(),
() -> updateBadgeDeviceStatus(event, relation.getAreaId()));
}
Long areaId = getAreaIdByDeviceId(event.getDeviceId()); private void updateBadgeDeviceStatus(IotDeviceStatusChangedEventDTO event, Long areaId) {
if (event.isOnline()) { if (event.isOnline()) {
badgeDeviceStatusService.updateBadgeOnlineStatus( badgeDeviceStatusService.updateBadgeOnlineStatus(
event.getDeviceId(), event.getDeviceName(), event.getNickname(), areaId, event.getDeviceId(), event.getDeviceName(), event.getNickname(), areaId,
@@ -84,39 +91,22 @@ public class BadgeDeviceStatusEventHandler implements RocketMQListener<String> {
BadgeDeviceStatusEnum.OFFLINE, "设备离线"); BadgeDeviceStatusEnum.OFFLINE, "设备离线");
return; return;
} }
log.debug("[BadgeDeviceStatusEventHandler] 忽略未处理的状态变更: deviceId={}, newStatus={}", log.debug("[BadgeDeviceStatusEventHandler] ignore device status event: deviceId={}, newStatus={}",
event.getDeviceId(), event.getNewStatus()); event.getDeviceId(), event.getNewStatus());
} }
private boolean isBadgeDevice(Long deviceId) { private OpsAreaDeviceRelationDO getBadgeRelation(Long deviceId) {
if (deviceId == null) {
return false;
}
try {
OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId);
return relation != null && "BADGE".equals(relation.getRelationType());
} catch (Exception e) {
log.warn("[BadgeDeviceStatusEventHandler] 查询设备绑定关系失败: deviceId={}", deviceId, e);
return false;
}
}
private Long getAreaIdByDeviceId(Long deviceId) {
if (deviceId == null) { if (deviceId == null) {
return null; return null;
} }
try { try {
OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId); OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId);
if (relation != null && "BADGE".equals(relation.getRelationType())) { return relation != null && "BADGE".equals(relation.getRelationType()) ? relation : null;
return relation.getAreaId();
}
return null;
} catch (Exception e) { } catch (Exception e) {
log.warn("[BadgeDeviceStatusEventHandler] 查询设备所属区域失败: deviceId={}", deviceId, e); log.warn("[BadgeDeviceStatusEventHandler] query badge relation failed: deviceId={}", deviceId, e);
return null; return null;
} }
} }
private void executeInTenantContext(Long tenantId, Runnable runnable) { private void executeInTenantContext(Long tenantId, Runnable runnable) {
Long currentTenantId = TenantContextHolder.getTenantId(); Long currentTenantId = TenantContextHolder.getTenantId();
if (tenantId == null || Objects.equals(currentTenantId, tenantId)) { if (tenantId == null || Objects.equals(currentTenantId, tenantId)) {

View File

@@ -50,6 +50,9 @@ public class CleanOrderArriveEventHandler implements RocketMQListener<String> {
@Resource @Resource
private OrderLifecycleManager orderLifecycleManager; private OrderLifecycleManager orderLifecycleManager;
@Resource
private IntegrationProjectContextExecutor projectContextExecutor;
@Override @Override
public void onMessage(String message) { public void onMessage(String message) {
try { try {
@@ -61,7 +64,9 @@ public class CleanOrderArriveEventHandler implements RocketMQListener<String> {
log.debug("[CleanOrderArriveEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); log.debug("[CleanOrderArriveEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
return; return;
} }
executeInTenantContext(event.getTenantId(), () -> handleOrderArrive(event)); executeInTenantContext(event.getTenantId(), () -> projectContextExecutor.execute(
event.getProjectId(), event.getOrderId(), event.getAreaId(), event.getDeviceId(),
event.getEventId(), () -> handleOrderArrive(event)));
} catch (Exception e) { } catch (Exception e) {
log.error("[CleanOrderArriveEventHandler] 消息处理失败: message={}", message, e); log.error("[CleanOrderArriveEventHandler] 消息处理失败: message={}", message, e);
throw new RuntimeException("保洁工单到岗事件处理失败", e); throw new RuntimeException("保洁工单到岗事件处理失败", e);

View File

@@ -81,6 +81,9 @@ public class CleanOrderAuditEventHandler implements RocketMQListener<String> {
@Resource @Resource
private OpsOrderMapper opsOrderMapper; private OpsOrderMapper opsOrderMapper;
@Resource
private IntegrationProjectContextExecutor projectContextExecutor;
@Override @Override
public void onMessage(String message) { public void onMessage(String message) {
try { try {
@@ -98,7 +101,9 @@ public class CleanOrderAuditEventHandler implements RocketMQListener<String> {
} }
// 3. 业务处理 // 3. 业务处理
executeInTenantContext(event.getTenantId(), () -> handleAuditEvent(event)); executeInTenantContext(event.getTenantId(), () -> projectContextExecutor.execute(
event.getProjectId(), event.getOrderId(), event.getAreaId(), event.getDeviceId(),
event.getEventId(), () -> handleAuditEvent(event)));
} catch (Exception e) { } catch (Exception e) {
log.error("[CleanOrderAuditEventHandler] 消息处理失败: message={}", message, e); log.error("[CleanOrderAuditEventHandler] 消息处理失败: message={}", message, e);

View File

@@ -50,6 +50,9 @@ public class CleanOrderCompleteEventHandler implements RocketMQListener<String>
@Resource @Resource
private OrderLifecycleManager orderLifecycleManager; private OrderLifecycleManager orderLifecycleManager;
@Resource
private IntegrationProjectContextExecutor projectContextExecutor;
@Override @Override
public void onMessage(String message) { public void onMessage(String message) {
try { try {
@@ -61,7 +64,9 @@ public class CleanOrderCompleteEventHandler implements RocketMQListener<String>
log.debug("[CleanOrderCompleteEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); log.debug("[CleanOrderCompleteEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
return; return;
} }
executeInTenantContext(event.getTenantId(), () -> handleOrderComplete(event)); executeInTenantContext(event.getTenantId(), () -> projectContextExecutor.execute(
event.getProjectId(), event.getOrderId(), event.getAreaId(), event.getDeviceId(),
event.getEventId(), () -> handleOrderComplete(event)));
} catch (Exception e) { } catch (Exception e) {
log.error("[CleanOrderCompleteEventHandler] 消息处理失败: message={}", message, e); log.error("[CleanOrderCompleteEventHandler] 消息处理失败: message={}", message, e);
throw new RuntimeException("保洁工单完单事件处理失败", e); throw new RuntimeException("保洁工单完单事件处理失败", e);

View File

@@ -47,6 +47,9 @@ public class CleanOrderConfirmEventHandler implements RocketMQListener<String> {
@Resource @Resource
private OrderLifecycleManager orderLifecycleManager; private OrderLifecycleManager orderLifecycleManager;
@Resource
private IntegrationProjectContextExecutor projectContextExecutor;
@Override @Override
public void onMessage(String message) { public void onMessage(String message) {
try { try {
@@ -58,7 +61,9 @@ public class CleanOrderConfirmEventHandler implements RocketMQListener<String> {
log.debug("[CleanOrderConfirmEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); log.debug("[CleanOrderConfirmEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
return; return;
} }
executeInTenantContext(event.getTenantId(), () -> handleConfirmEvent(event)); executeInTenantContext(event.getTenantId(), () -> projectContextExecutor.execute(
event.getProjectId(), event.getOrderId(), event.getAreaId(), event.getDeviceId(),
event.getEventId(), () -> handleConfirmEvent(event)));
} catch (Exception e) { } catch (Exception e) {
log.error("[CleanOrderConfirmEventHandler] 消息处理失败: message={}", message, e); log.error("[CleanOrderConfirmEventHandler] 消息处理失败: message={}", message, e);
throw new RuntimeException("保洁工单确认事件处理失败", e); throw new RuntimeException("保洁工单确认事件处理失败", e);

View File

@@ -1,11 +1,15 @@
package com.viewsh.module.ops.environment.integration.consumer; package com.viewsh.module.ops.environment.integration.consumer;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.framework.tenant.core.context.ProjectContextHolder;
import com.viewsh.framework.tenant.core.context.TenantContextHolder; import com.viewsh.framework.tenant.core.context.TenantContextHolder;
import com.viewsh.framework.tenant.core.util.ProjectUtils;
import com.viewsh.framework.tenant.core.util.TenantUtils; import com.viewsh.framework.tenant.core.util.TenantUtils;
import com.viewsh.module.iot.api.device.IotDeviceControlApi; import com.viewsh.module.iot.api.device.IotDeviceControlApi;
import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO; import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO;
import com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.area.OpsBusAreaMapper;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.PriorityEnum; import com.viewsh.module.ops.enums.PriorityEnum;
import com.viewsh.module.ops.enums.WorkOrderStatusEnum; import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
@@ -103,6 +107,9 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
@Resource @Resource
private OpsOrderMapper opsOrderMapper; private OpsOrderMapper opsOrderMapper;
@Resource
private OpsBusAreaMapper opsBusAreaMapper;
@Override @Override
public void onMessage(String message) { public void onMessage(String message) {
try { try {
@@ -120,7 +127,8 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
} }
// 3. 业务处理 // 3. 业务处理
executeInTenantContext(event.getTenantId(), () -> handleOrderCreate(event)); executeInTenantContext(event.getTenantId(), () ->
executeInProjectContext(event, () -> handleOrderCreate(event)));
} catch (Exception e) { } catch (Exception e) {
log.error("[CleanOrderCreateEventHandler] 消息处理失败: message={}", message, e); log.error("[CleanOrderCreateEventHandler] 消息处理失败: message={}", message, e);
@@ -538,6 +546,41 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
return null; return null;
} }
private void executeInProjectContext(CleanOrderCreateEventDTO event, Runnable runnable) {
Long projectId = resolveProjectId(event);
Long currentProjectId = ProjectContextHolder.getProjectId();
if (projectId == null) {
throw new IllegalStateException("Clean order create event missing projectId, eventId="
+ event.getEventId() + ", areaId=" + event.getAreaId());
}
if (Objects.equals(currentProjectId, projectId)) {
runnable.run();
return;
}
ProjectUtils.execute(projectId, runnable);
}
private Long resolveProjectId(CleanOrderCreateEventDTO event) {
if (event.getProjectId() != null) {
return event.getProjectId();
}
Long currentProjectId = ProjectContextHolder.getProjectId();
if (currentProjectId != null) {
return currentProjectId;
}
Long areaId = event.getAreaId();
if (areaId == null) {
return null;
}
OpsBusAreaDO area = ProjectUtils.executeIgnore(() -> opsBusAreaMapper.selectById(areaId));
if (area == null) {
log.warn("[CleanOrderCreateEventHandler] area not found when resolving projectId: eventId={}, areaId={}",
event.getEventId(), areaId);
return null;
}
return area.getProjectId();
}
private void executeInTenantContext(Long tenantId, Runnable runnable) { private void executeInTenantContext(Long tenantId, Runnable runnable) {
Long currentTenantId = TenantContextHolder.getTenantId(); Long currentTenantId = TenantContextHolder.getTenantId();
if (tenantId == null || Objects.equals(currentTenantId, tenantId)) { if (tenantId == null || Objects.equals(currentTenantId, tenantId)) {

View File

@@ -0,0 +1,91 @@
package com.viewsh.module.ops.environment.integration.consumer;
import com.viewsh.framework.tenant.core.context.ProjectContextHolder;
import com.viewsh.framework.tenant.core.util.ProjectUtils;
import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO;
import com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.area.OpsAreaDeviceRelationMapper;
import com.viewsh.module.ops.dal.mysql.area.OpsBusAreaMapper;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Objects;
@Slf4j
@Component
public class IntegrationProjectContextExecutor {
@Resource
private OpsOrderMapper opsOrderMapper;
@Resource
private OpsBusAreaMapper opsBusAreaMapper;
@Resource
private OpsAreaDeviceRelationMapper opsAreaDeviceRelationMapper;
public void execute(Long projectId, Long orderId, Long areaId, Long deviceId, String eventId, Runnable runnable) {
Long resolvedProjectId = resolveProjectId(projectId, orderId, areaId, deviceId, eventId);
Long currentProjectId = ProjectContextHolder.getProjectId();
if (resolvedProjectId == null) {
throw new IllegalStateException("integration event missing projectId, eventId=" + eventId
+ ", orderId=" + orderId + ", areaId=" + areaId + ", deviceId=" + deviceId);
}
if (Objects.equals(currentProjectId, resolvedProjectId)) {
runnable.run();
return;
}
ProjectUtils.execute(resolvedProjectId, runnable);
}
private Long resolveProjectId(Long projectId, Long orderId, Long areaId, Long deviceId, String eventId) {
if (projectId != null) {
return projectId;
}
Long currentProjectId = ProjectContextHolder.getProjectId();
if (currentProjectId != null) {
return currentProjectId;
}
Long projectIdByOrder = resolveProjectIdByOrder(orderId);
if (projectIdByOrder != null) {
return projectIdByOrder;
}
Long projectIdByArea = resolveProjectIdByArea(areaId);
if (projectIdByArea != null) {
return projectIdByArea;
}
Long projectIdByDevice = resolveProjectIdByDevice(deviceId);
if (projectIdByDevice == null) {
log.warn("[IntegrationProjectContextExecutor] projectId not resolved: eventId={}, orderId={}, areaId={}, deviceId={}",
eventId, orderId, areaId, deviceId);
}
return projectIdByDevice;
}
private Long resolveProjectIdByOrder(Long orderId) {
if (orderId == null) {
return null;
}
OpsOrderDO order = ProjectUtils.executeIgnore(() -> opsOrderMapper.selectById(orderId));
return order != null ? order.getProjectId() : null;
}
private Long resolveProjectIdByArea(Long areaId) {
if (areaId == null) {
return null;
}
OpsBusAreaDO area = ProjectUtils.executeIgnore(() -> opsBusAreaMapper.selectById(areaId));
return area != null ? area.getProjectId() : null;
}
private Long resolveProjectIdByDevice(Long deviceId) {
if (deviceId == null) {
return null;
}
OpsAreaDeviceRelationDO relation = ProjectUtils.executeIgnore(() -> opsAreaDeviceRelationMapper.selectByDeviceId(deviceId));
return relation != null ? relation.getProjectId() : null;
}
}

View File

@@ -42,6 +42,9 @@ public class TrajectoryEnterEventHandler implements RocketMQListener<String> {
@Resource @Resource
private DeviceTrajectoryService trajectoryService; private DeviceTrajectoryService trajectoryService;
@Resource
private IntegrationProjectContextExecutor projectContextExecutor;
@Override @Override
public void onMessage(String message) { public void onMessage(String message) {
try { try {
@@ -57,9 +60,17 @@ public class TrajectoryEnterEventHandler implements RocketMQListener<String> {
event.getEventId(), event.getDeviceId(), event.getAreaId()); event.getEventId(), event.getDeviceId(), event.getAreaId());
// 解析事件时间 // 解析事件时间
LocalDateTime enterTime = parseEventTime(event.getEventTime()); projectContextExecutor.execute(event.getProjectId(), null, event.getAreaId(), event.getDeviceId(),
event.getEventId(), () -> handleTrajectoryEnter(event));
// 创建轨迹记录 } catch (Exception e) {
log.error("[TrajectoryEnterHandler] 消息处理失败message={}", message, e);
throw new RuntimeException("轨迹进入事件处理失败", e);
}
}
private void handleTrajectoryEnter(TrajectoryEnterEventDTO event) {
LocalDateTime enterTime = parseEventTime(event.getEventTime());
trajectoryService.recordEnter( trajectoryService.recordEnter(
event.getDeviceId(), event.getDeviceId(),
event.getDeviceName(), event.getDeviceName(),
@@ -68,11 +79,6 @@ public class TrajectoryEnterEventHandler implements RocketMQListener<String> {
event.getBeaconMac(), event.getBeaconMac(),
event.getEnterRssi(), event.getEnterRssi(),
enterTime); enterTime);
} catch (Exception e) {
log.error("[TrajectoryEnterHandler] 消息处理失败message={}", message, e);
throw new RuntimeException("轨迹进入事件处理失败", e);
}
} }
private LocalDateTime parseEventTime(String eventTime) { private LocalDateTime parseEventTime(String eventTime) {

View File

@@ -42,6 +42,9 @@ public class TrajectoryLeaveEventHandler implements RocketMQListener<String> {
@Resource @Resource
private DeviceTrajectoryService trajectoryService; private DeviceTrajectoryService trajectoryService;
@Resource
private IntegrationProjectContextExecutor projectContextExecutor;
@Override @Override
public void onMessage(String message) { public void onMessage(String message) {
try { try {
@@ -57,20 +60,23 @@ public class TrajectoryLeaveEventHandler implements RocketMQListener<String> {
event.getEventId(), event.getDeviceId(), event.getAreaId(), event.getLeaveReason()); event.getEventId(), event.getDeviceId(), event.getAreaId(), event.getLeaveReason());
// 解析事件时间 // 解析事件时间
LocalDateTime leaveTime = parseEventTime(event.getEventTime()); projectContextExecutor.execute(event.getProjectId(), null, event.getAreaId(), event.getDeviceId(),
event.getEventId(), () -> handleTrajectoryLeave(event));
// 更新轨迹记录 } catch (Exception e) {
log.error("[TrajectoryLeaveHandler] 消息处理失败message={}", message, e);
throw new RuntimeException("轨迹离开事件处理失败", e);
}
}
private void handleTrajectoryLeave(TrajectoryLeaveEventDTO event) {
LocalDateTime leaveTime = parseEventTime(event.getEventTime());
trajectoryService.recordLeave( trajectoryService.recordLeave(
event.getDeviceId(), event.getDeviceId(),
event.getAreaId(), event.getAreaId(),
event.getLeaveReason(), event.getLeaveReason(),
event.getEnterTimestamp(), event.getEnterTimestamp(),
leaveTime); leaveTime);
} catch (Exception e) {
log.error("[TrajectoryLeaveHandler] 消息处理失败message={}", message, e);
throw new RuntimeException("轨迹离开事件处理失败", e);
}
} }
private LocalDateTime parseEventTime(String eventTime) { private LocalDateTime parseEventTime(String eventTime) {

View File

@@ -51,6 +51,9 @@ public class BaseDeviceEventDTO {
@JsonProperty("tenantId") @JsonProperty("tenantId")
private Long tenantId; private Long tenantId;
@JsonProperty("projectId")
private Long projectId;
/** /**
* 事件时间 * 事件时间
*/ */

View File

@@ -52,6 +52,8 @@ public class CleanOrderArriveEventDTO {
private Long tenantId; private Long tenantId;
private Long projectId;
/** /**
* 触发来源IOT_BEACON=蓝牙信标检测) * 触发来源IOT_BEACON=蓝牙信标检测)
*/ */

View File

@@ -67,6 +67,8 @@ public class CleanOrderAuditEventDTO {
private Long tenantId; private Long tenantId;
private Long projectId;
/** /**
* 消息内容 * 消息内容
*/ */

View File

@@ -52,6 +52,8 @@ public class CleanOrderCompleteEventDTO {
private Long tenantId; private Long tenantId;
private Long projectId;
/** /**
* 触发来源IOT_SIGNAL_LOSS=信号丢失超时) * 触发来源IOT_SIGNAL_LOSS=信号丢失超时)
*/ */

View File

@@ -57,6 +57,11 @@ public class CleanOrderCreateEventDTO {
private Long tenantId; private Long tenantId;
/**
* Project id.
*/
private Long projectId;
/** /**
* 触发数据JSON 格式的附加信息) * 触发数据JSON 格式的附加信息)
* <p> * <p>

View File

@@ -56,6 +56,8 @@ public class IotDeviceStatusChangedEventDTO {
*/ */
private Long tenantId; private Long tenantId;
private Long projectId;
/** /**
* 事件时间 * 事件时间
*/ */

View File

@@ -62,4 +62,6 @@ public class TrajectoryEnterEventDTO {
* 租户ID * 租户ID
*/ */
private Long tenantId; private Long tenantId;
private Long projectId;
} }

View File

@@ -69,4 +69,6 @@ public class TrajectoryLeaveEventDTO {
* 租户ID * 租户ID
*/ */
private Long tenantId; private Long tenantId;
private Long projectId;
} }