refactor(iot,ops): 重构客流计数器重置为按区域删除阈值 key

- ResetTrafficCounterReqDTO: 废弃 newBaseValue 字段
- IotDeviceControlApiImpl: 重置逻辑改为通过区域关联查询后删除阈值 key
- CleanOrderEventListener: 简化异步重置调用,移除 triggerData 依赖

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
lzh
2026-02-03 15:37:23 +08:00
parent 46024fd043
commit 13571faa59
3 changed files with 41 additions and 36 deletions

View File

@@ -25,8 +25,8 @@ public class ResetTrafficCounterReqDTO {
@NotNull(message = "设备ID不能为空")
private Long deviceId;
@Schema(description = "新的基准值", requiredMode = Schema.RequiredMode.REQUIRED, example = "1500")
@NotNull(message = "基准值不能为空")
@Schema(description = "新的基准值(已废弃,无需传递)", example = "1500")
@Deprecated
private Long newBaseValue;
@Schema(description = "关联的工单ID用于日志追踪", example = "12345")

View File

@@ -9,7 +9,8 @@ import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
import com.viewsh.module.iot.core.enums.IotDeviceStateEnum;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
import com.viewsh.module.iot.dal.redis.clean.TrafficCounterBaseRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.TrafficCounterRedisDAO;
import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
import com.viewsh.module.iot.service.device.IotDeviceService;
import com.viewsh.module.iot.service.device.message.IotDeviceMessageService;
import io.swagger.v3.oas.annotations.Operation;
@@ -48,7 +49,10 @@ public class IotDeviceControlApiImpl implements IotDeviceControlApi {
private IotDeviceMessageService deviceMessageService;
@Resource
private TrafficCounterBaseRedisDAO trafficCounterBaseRedisDAO;
private TrafficCounterRedisDAO trafficCounterRedisDAO;
@Resource
private CleanOrderIntegrationConfigService configService;
@Override
@PostMapping(PREFIX + "/invoke-service")
@@ -130,29 +134,36 @@ public class IotDeviceControlApiImpl implements IotDeviceControlApi {
public CommonResult<Boolean> resetTrafficCounter(@RequestBody ResetTrafficCounterReqDTO reqDTO) {
try {
Long deviceId = reqDTO.getDeviceId();
Long newBaseValue = reqDTO.getNewBaseValue();
log.info("[resetTrafficCounter] 重置客流计数器: deviceId={}, newBaseValue={}, orderId={}, remark={}",
deviceId, newBaseValue, reqDTO.getOrderId(), reqDTO.getRemark());
log.info("[resetTrafficCounter] 重置客流计数器deviceId={}, orderId={}, remark={}",
deviceId, reqDTO.getOrderId(), reqDTO.getRemark());
// 1. 验证设备存在
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
if (device == null) {
log.warn("[resetTrafficCounter] 设备不存在: deviceId={}", deviceId);
log.warn("[resetTrafficCounter] 设备不存在deviceId={}", deviceId);
return success(false);
}
// 2. 重置基准值
trafficCounterBaseRedisDAO.setBaseValue(deviceId, newBaseValue);
// 2. 获取区域ID
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper wrapper =
configService.getConfigWrapperByDeviceId(deviceId);
if (wrapper == null) {
log.warn("[resetTrafficCounter] 设备无区域关联deviceId={}", deviceId);
return success(false);
}
log.info("[resetTrafficCounter] 客流计数器重置成功: deviceId={}, newBaseValue={}",
deviceId, newBaseValue);
// 3. 重置阈值计数器(删除 key
trafficCounterRedisDAO.resetThreshold(deviceId, wrapper.getAreaId());
log.info("[resetTrafficCounter] 客流计数器重置成功deviceId={}, areaId={}",
deviceId, wrapper.getAreaId());
return success(true);
} catch (Exception e) {
log.error("[resetTrafficCounter] 重置客流计数器失败: deviceId={}, newBaseValue={}",
reqDTO.getDeviceId(), reqDTO.getNewBaseValue(), e);
log.error("[resetTrafficCounter] 重置客流计数器失败deviceId={}",
reqDTO.getDeviceId(), e);
return success(false);
}
}

View File

@@ -154,51 +154,45 @@ public class CleanOrderEventListener {
/**
* 异步重置客流计数器
* <p>
* 在工单创建成功后重置阈值计数器,确保:
* 1. 工单创建和计数器重置在同一事务语义下AFTER_COMMIT
* 2. 如果工单创建失败,计数器不会被错误重置
* 3. 由 Ops 模块(业务方)决定重置时机,职责清晰
*/
@Async("ops-task-executor")
public void asyncResetTrafficCounter(OrderCreatedEvent event) {
try {
Long deviceId = (Long) event.getPayload().get("triggerDeviceId");
@SuppressWarnings("unchecked")
Map<String, Object> triggerData = (Map<String, Object>) event.getPayload().get("triggerData");
if (deviceId == null || triggerData == null) {
log.warn("[CleanOrderEventListener] 缺少客流数据,跳过重置: orderId={}, deviceId={}, triggerData={}",
event.getOrderId(), deviceId, triggerData);
if (deviceId == null) {
log.warn("[CleanOrderEventListener] 缺少设备ID,跳过重置: orderId={}",
event.getOrderId());
return;
}
Object actualCountObj = triggerData.get("actualCount");
if (actualCountObj == null) {
log.warn("[CleanOrderEventListener] 缺少客流值,跳过重置: orderId={}", event.getOrderId());
return;
}
// 将当前客流值设为新的基准值
Long newBaseValue = ((Number) actualCountObj).longValue();
// 构建重置请求
ResetTrafficCounterReqDTO reqDTO = ResetTrafficCounterReqDTO.builder()
.deviceId(deviceId)
.newBaseValue(newBaseValue)
.orderId(event.getOrderId())
.remark("工单创建后重置计数器")
.remark("工单创建成功后重置阈值计数器")
.build();
// 调用 IoT 模块 RPC 接口
var result = iotDeviceControlApi.resetTrafficCounter(reqDTO);
if (result.getData() != null && result.getData()) {
log.info("[CleanOrderEventListener] 客流计数器重置成功: orderId={}, deviceId={}, newBaseValue={}",
event.getOrderId(), deviceId, newBaseValue);
} else {
log.warn("[CleanOrderEventListener] 客流计数器重置失败: orderId={}, deviceId={}",
log.info("[CleanOrderEventListener] 阈值计数器重置成功: orderId={}, deviceId={}",
event.getOrderId(), deviceId);
} else {
log.error("[CleanOrderEventListener] 阈值计数器重置失败: orderId={}, deviceId={}",
event.getOrderId(), deviceId);
// TODO: 发送告警,需要人工介入检查
}
} catch (Exception e) {
// 重置失败不应影响主流程
log.error("[CleanOrderEventListener] 客流计数器重置异常: orderId={}", event.getOrderId(), e);
log.error("[CleanOrderEventListener] 阈值计数器重置异常: orderId={}", event.getOrderId(), e);
// TODO: 发送告警
}
}