Revert "fix(iot): IoT 事件发布补充 tenantId 并修复租户上下文缺陷"

This reverts commit fef3e13ff4.
This commit is contained in:
lzh
2026-03-30 15:55:35 +08:00
parent aab3c670f6
commit c14ea56dd8
8 changed files with 30 additions and 55 deletions

View File

@@ -1,6 +1,5 @@
package com.viewsh.module.iot.mq.consumer.rule;
import com.viewsh.framework.tenant.core.util.TenantUtils;
import com.viewsh.module.iot.core.messagebus.core.IotMessageBus;
import com.viewsh.module.iot.core.messagebus.core.IotMessageSubscriber;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
@@ -10,6 +9,12 @@ import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
// TODO @puhui999后面重构哈
/**
* 针对 {@link IotDeviceMessage} 的消费者,处理规则场景
*
* @author 芋道源码
*/
@Component
@Slf4j
public class IotSceneRuleMessageHandler implements IotMessageSubscriber<IotDeviceMessage> {
@@ -37,7 +42,8 @@ public class IotSceneRuleMessageHandler implements IotMessageSubscriber<IotDevic
@Override
public void onMessage(IotDeviceMessage message) {
log.info("[onMessage][娑堟伅鍐呭({})]", message);
TenantUtils.execute(message.getTenantId(), () -> sceneRuleService.executeSceneRuleByDevice(message));
log.info("[onMessage][消息内容({})]", message);
sceneRuleService.executeSceneRuleByDevice(message);
}
}

View File

@@ -2,7 +2,6 @@ package com.viewsh.module.iot.service.integration.clean;
import com.viewsh.framework.common.pojo.CommonResult;
import com.viewsh.framework.common.util.json.JsonUtils;
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
import com.viewsh.module.iot.dal.dataobject.integration.clean.CleanOrderIntegrationConfig;
import com.viewsh.module.ops.api.area.AreaDeviceApi;
import com.viewsh.module.ops.api.area.AreaDeviceDTO;
@@ -30,6 +29,8 @@ public class CleanOrderIntegrationConfigServiceImpl implements CleanOrderIntegra
/**
* Redis Key 前缀
*/
private static final String AREA_TYPE_CACHE_KEY_PREFIX = "ops:area:%s:type:%s";
private static final String DEVICE_INDEX_KEY_PREFIX = "ops:device:index:%s";
private static final String NULL_CACHE = "NULL";
/**
@@ -92,7 +93,7 @@ public class CleanOrderIntegrationConfigServiceImpl implements CleanOrderIntegra
log.debug("[CleanOrderConfig] 查询单个区域配置areaId={}, relationType={}", areaId, relationType);
// 1. 先读 Redis 缓存
String cacheKey = buildAreaTypeCacheKey(areaId, relationType);
String cacheKey = String.format(AREA_TYPE_CACHE_KEY_PREFIX, areaId, relationType);
try {
String cached = stringRedisTemplate.opsForValue().get(cacheKey);
if (cached != null) {
@@ -159,7 +160,7 @@ public class CleanOrderIntegrationConfigServiceImpl implements CleanOrderIntegra
log.debug("[CleanOrderConfig] 查询设备配置deviceId={}", deviceId);
// 1. 查询设备反向索引缓存
String indexKey = buildDeviceIndexCacheKey(deviceId);
String indexKey = String.format(DEVICE_INDEX_KEY_PREFIX, deviceId);
try {
Map<Object, Object> indexMap = stringRedisTemplate.opsForHash().entries(indexKey);
@@ -233,7 +234,7 @@ public class CleanOrderIntegrationConfigServiceImpl implements CleanOrderIntegra
*/
private void writeDeviceIndexCache(Long deviceId, Long areaId, String relationType, Boolean enabled) {
try {
String cacheKey = buildDeviceIndexCacheKey(deviceId);
String cacheKey = String.format(DEVICE_INDEX_KEY_PREFIX, deviceId);
Map<String, String> indexData = new HashMap<>();
indexData.put("areaId", String.valueOf(areaId));
indexData.put("relationType", relationType);
@@ -254,7 +255,7 @@ public class CleanOrderIntegrationConfigServiceImpl implements CleanOrderIntegra
*/
private void evictDeviceIndexCache(Long deviceId) {
try {
String cacheKey = buildDeviceIndexCacheKey(deviceId);
String cacheKey = String.format(DEVICE_INDEX_KEY_PREFIX, deviceId);
stringRedisTemplate.delete(cacheKey);
log.debug("[CleanOrderConfig] 清除设备索引缓存deviceId={}", deviceId);
} catch (Exception e) {
@@ -342,12 +343,4 @@ public class CleanOrderIntegrationConfigServiceImpl implements CleanOrderIntegra
convertConfig(dto.getConfigData())
);
}
private String buildAreaTypeCacheKey(Long areaId, String relationType) {
return "ops:area:t" + TenantContextHolder.getRequiredTenantId() + ":" + areaId + ":type:" + relationType;
}
private String buildDeviceIndexCacheKey(Long deviceId) {
return "ops:device:index:t" + TenantContextHolder.getRequiredTenantId() + ":" + deviceId;
}
}

View File

@@ -1,6 +1,5 @@
package com.viewsh.module.iot.service.rule.clean.processor;
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderArriveEvent;
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderAuditEvent;
@@ -253,7 +252,6 @@ public class BeaconDetectionRuleProcessor {
private void publishArriveEvent(Long deviceId, Long orderId, Long areaId, Map<String, Object> triggerData) {
try {
CleanOrderArriveEvent event = CleanOrderArriveEvent.builder()
.tenantId(TenantContextHolder.getTenantId())
.eventId(java.util.UUID.randomUUID().toString())
.orderType("CLEAN")
.orderId(orderId)
@@ -279,7 +277,6 @@ public class BeaconDetectionRuleProcessor {
Long areaId, Long orderId, String message, Map<String, Object> data) {
try {
CleanOrderAuditEvent event = CleanOrderAuditEvent.builder()
.tenantId(TenantContextHolder.getTenantId())
.eventId(java.util.UUID.randomUUID().toString())
.auditType(auditType)
.deviceId(deviceId)

View File

@@ -1,7 +1,6 @@
package com.viewsh.module.iot.service.rule.clean.processor;
import com.viewsh.framework.common.util.json.JsonUtils;
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
import com.viewsh.module.iot.dal.dataobject.integration.clean.ButtonEventConfig;
import com.viewsh.module.iot.dal.redis.clean.BadgeDeviceStatusRedisDAO;
@@ -142,7 +141,6 @@ public class ButtonEventRuleProcessor {
Map<String, Object> event = new HashMap<>();
event.put("eventId", UUID.randomUUID().toString());
event.put("tenantId", TenantContextHolder.getTenantId());
event.put("orderType", "CLEAN");
event.put("orderId", orderId);
event.put("deviceId", deviceId);
@@ -173,7 +171,6 @@ public class ButtonEventRuleProcessor {
Map<String, Object> event = new HashMap<>();
event.put("eventId", UUID.randomUUID().toString());
event.put("tenantId", TenantContextHolder.getTenantId());
event.put("orderType", "CLEAN");
event.put("orderId", orderId);
event.put("deviceId", deviceId);

View File

@@ -1,6 +1,5 @@
package com.viewsh.module.iot.service.rule.clean.processor;
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderAuditEvent;
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderCompleteEvent;
@@ -76,7 +75,7 @@ public class SignalLossRuleProcessor {
public String checkLossTimeout() {
// TODO: 设置租户上下文单租户场景使用固定租户ID=1
// 确保后续发送的 RocketMQ 消息正确携带租户信息
return doCheckLossTimeout();
return TenantUtils.execute(1L, () -> doCheckLossTimeout());
}
/**
@@ -109,12 +108,7 @@ public class SignalLossRuleProcessor {
Long areaId = Long.parseLong(parts[5]);
// 检查超时
IotDeviceDO device = deviceService.getDevice(deviceId);
if (device == null || device.getTenantId() == null) {
log.warn("[SignalLoss] 璁惧涓嶅瓨鍦ㄦ垨缂哄皯绉熸埛淇℃伅: deviceId={}", deviceId);
continue;
}
TenantUtils.execute(device.getTenantId(), () -> checkTimeoutForDevice(deviceId, areaId));
checkTimeoutForDevice(deviceId, areaId);
} catch (Exception e) {
log.error("[SignalLoss] 处理离岗记录失败key={}", key, e);
@@ -266,7 +260,6 @@ public class SignalLossRuleProcessor {
// 3. 发布完成事件
try {
CleanOrderCompleteEvent event = CleanOrderCompleteEvent.builder()
.tenantId(TenantContextHolder.getTenantId())
.eventId(java.util.UUID.randomUUID().toString())
.orderType("CLEAN")
.orderId(currentOrder.getOrderId())
@@ -317,7 +310,6 @@ public class SignalLossRuleProcessor {
Long areaId, Long orderId, String message, Map<String, Object> data) {
try {
CleanOrderAuditEvent event = CleanOrderAuditEvent.builder()
.tenantId(TenantContextHolder.getTenantId())
.eventId(java.util.UUID.randomUUID().toString())
.auditType(auditType)
.deviceId(deviceId)

View File

@@ -1,6 +1,5 @@
package com.viewsh.module.iot.service.rule.clean.processor;
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderCreateEvent;
import com.viewsh.module.iot.dal.dataobject.integration.clean.TrafficThresholdConfig;
@@ -146,7 +145,6 @@ public class TrafficThresholdRuleProcessor {
Long accumulated, Integer threshold) {
try {
CleanOrderCreateEvent event = CleanOrderCreateEvent.builder()
.tenantId(TenantContextHolder.getTenantId())
.orderType("CLEAN")
.areaId(configWrapper.getAreaId())
.triggerSource("IOT_TRAFFIC")

View File

@@ -104,7 +104,6 @@ class RssiSlidingWindowDetectorTest {
BeaconPresenceConfig.ExitConfig exitConfig = new BeaconPresenceConfig.ExitConfig();
exitConfig.setWeakRssiThreshold(-85);
exitConfig.setHitCount(2);
exitConfig.setWindowSize(5);
// 场景:在区域内,信号变弱但未达退出阈值 [-80, -82, -84] -> 都在 -70 和 -85 之间 -> 无变化
List<Integer> window = Arrays.asList(-80, -82, -84);

View File

@@ -1,14 +1,12 @@
package com.viewsh.module.iot.service.rule.clean.processor;
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
import com.viewsh.module.iot.dal.dataobject.integration.clean.CleanOrderIntegrationConfig;
import com.viewsh.module.iot.dal.redis.clean.BadgeDeviceStatusRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.BeaconArrivedTimeRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.BeaconRssiWindowRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.SignalLossRedisDAO;
import com.viewsh.module.iot.service.device.IotDeviceService;
import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.jupiter.api.BeforeEach;
@@ -43,8 +41,6 @@ class SignalLossRuleProcessorTest {
@Mock
private CleanOrderIntegrationConfigService configService;
@Mock
private IotDeviceService deviceService;
@Mock
private RocketMQTemplate rocketMQTemplate;
@Mock
private StringRedisTemplate stringRedisTemplate;
@@ -59,12 +55,6 @@ class SignalLossRuleProcessorTest {
when(stringRedisTemplate.keys(anyString())).thenReturn(
Set.of("iot:clean:signal:loss:" + DEVICE_ID + ":" + AREA_ID)
);
// Mock device lookup (for tenant context)
IotDeviceDO device = new IotDeviceDO();
device.setId(DEVICE_ID);
device.setTenantId(1L);
when(deviceService.getDevice(DEVICE_ID)).thenReturn(device);
}
@Test
@@ -118,19 +108,18 @@ class SignalLossRuleProcessorTest {
}
@Test
void testCheckLossTimeout_ShortDuration_StillCompletes() {
// 注意当前生产代码已暂时取消作业时长不足的抑制逻辑TODO 注释),
// 所有超时情况均触发完成。此测试验证该行为。
void testCheckLossTimeout_Suppressed_InvalidDuration() {
// Setup times
long now = System.currentTimeMillis();
long firstLossTime = now - 6 * 60 * 1000; // 6 minutes ago (timeout)
long lastLossTime = now;
long arrivedTime = now - 5 * 60 * 1000; // Only 5 minutes work
long arrivedTime = now - 5 * 60 * 1000; // Only 5 minutes work (min is 10)
// Mock DAOs
when(signalLossRedisDAO.getFirstLossTime(DEVICE_ID, AREA_ID)).thenReturn(firstLossTime);
when(signalLossRedisDAO.getLastLossTime(DEVICE_ID, AREA_ID)).thenReturn(lastLossTime);
when(arrivedTimeRedisDAO.getArrivedTime(DEVICE_ID, AREA_ID)).thenReturn(arrivedTime);
// Mock Current Order (Valid area)
BadgeDeviceStatusRedisDAO.OrderInfo orderInfo = new BadgeDeviceStatusRedisDAO.OrderInfo();
orderInfo.setOrderId(500L);
@@ -141,13 +130,13 @@ class SignalLossRuleProcessorTest {
BeaconPresenceConfig.ExitConfig exitConfig = new BeaconPresenceConfig.ExitConfig();
exitConfig.setLossTimeoutMinutes(5);
exitConfig.setMinValidWorkMinutes(10);
BeaconPresenceConfig bpConfig = new BeaconPresenceConfig();
bpConfig.setExit(exitConfig);
CleanOrderIntegrationConfig mainConfig = new CleanOrderIntegrationConfig();
mainConfig.setBeaconPresence(bpConfig);
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper wrapper =
new CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper();
wrapper.setConfig(mainConfig);
@@ -158,7 +147,11 @@ class SignalLossRuleProcessorTest {
// Execute
processor.checkLossTimeout();
// Verify: 即使作业时长不足,当前仍会触发完成(抑制逻辑已暂时关闭)
verify(rocketMQTemplate).syncSend(eq(CleanOrderTopics.ORDER_COMPLETE), any(Message.class));
// Verify
// 1. Should NOT send complete message
verify(rocketMQTemplate, never()).syncSend(eq(CleanOrderTopics.ORDER_COMPLETE), any(Message.class));
// 2. Should send Audit Event (TTS)
verify(rocketMQTemplate, atLeastOnce()).syncSend(eq(CleanOrderTopics.ORDER_AUDIT), any(Message.class));
}
}