feat(ops): add-iot-clean-order-integration阶段3-客流计数器重置
This commit is contained in:
@@ -3,6 +3,7 @@ package com.viewsh.module.iot.api.device;
|
||||
import com.viewsh.framework.common.pojo.CommonResult;
|
||||
import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeReqDTO;
|
||||
import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeRespDTO;
|
||||
import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO;
|
||||
import com.viewsh.module.iot.enums.ApiConstants;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
@@ -22,6 +23,7 @@ import java.util.List;
|
||||
* 支持功能:
|
||||
* - 服务调用(语音播报、震动提醒等)
|
||||
* - 批量服务调用
|
||||
* - 重置客流计数器
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@@ -41,4 +43,10 @@ public interface IotDeviceControlApi {
|
||||
@Operation(summary = "批量调用设备服务")
|
||||
CommonResult<List<IotDeviceServiceInvokeRespDTO>> invokeServiceBatch(@Valid @RequestBody List<IotDeviceServiceInvokeReqDTO> reqDTOList);
|
||||
|
||||
// ==================== 客流计数器 ====================
|
||||
|
||||
@PostMapping(PREFIX + "/reset-traffic-counter")
|
||||
@Operation(summary = "重置客流计数器基准值")
|
||||
CommonResult<Boolean> resetTrafficCounter(@Valid @RequestBody ResetTrafficCounterReqDTO reqDTO);
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
package com.viewsh.module.iot.api.device.dto;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* 重置客流计数器请求 DTO
|
||||
* <p>
|
||||
* 用于 Ops 模块在工单创建成功后,通知 IoT 模块重置客流计数器基准值
|
||||
*
|
||||
* @author AI
|
||||
*/
|
||||
@Schema(description = "重置客流计数器请求")
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class ResetTrafficCounterReqDTO {
|
||||
|
||||
@Schema(description = "设备ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "1001")
|
||||
@NotNull(message = "设备ID不能为空")
|
||||
private Long deviceId;
|
||||
|
||||
@Schema(description = "新的基准值", requiredMode = Schema.RequiredMode.REQUIRED, example = "1500")
|
||||
@NotNull(message = "基准值不能为空")
|
||||
private Long newBaseValue;
|
||||
|
||||
@Schema(description = "关联的工单ID(用于日志追踪)", example = "12345")
|
||||
private Long orderId;
|
||||
|
||||
@Schema(description = "操作说明", example = "工单创建后重置计数器")
|
||||
private String remark;
|
||||
}
|
||||
@@ -4,10 +4,12 @@ import cn.hutool.core.map.MapUtil;
|
||||
import com.viewsh.framework.common.pojo.CommonResult;
|
||||
import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeReqDTO;
|
||||
import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeRespDTO;
|
||||
import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO;
|
||||
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
||||
import com.viewsh.module.iot.core.enums.IotDeviceStateEnum;
|
||||
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
|
||||
import com.viewsh.module.iot.dal.redis.clean.TrafficCounterBaseRedisDAO;
|
||||
import com.viewsh.module.iot.service.device.IotDeviceService;
|
||||
import com.viewsh.module.iot.service.device.message.IotDeviceMessageService;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
@@ -45,6 +47,9 @@ public class IotDeviceControlApiImpl implements IotDeviceControlApi {
|
||||
@Resource
|
||||
private IotDeviceMessageService deviceMessageService;
|
||||
|
||||
@Resource
|
||||
private TrafficCounterBaseRedisDAO trafficCounterBaseRedisDAO;
|
||||
|
||||
@Override
|
||||
@PostMapping(PREFIX + "/invoke-service")
|
||||
@Operation(summary = "调用设备服务")
|
||||
@@ -119,5 +124,38 @@ public class IotDeviceControlApiImpl implements IotDeviceControlApi {
|
||||
return success(results);
|
||||
}
|
||||
|
||||
@Override
|
||||
@PostMapping(PREFIX + "/reset-traffic-counter")
|
||||
@Operation(summary = "重置客流计数器基准值")
|
||||
public CommonResult<Boolean> resetTrafficCounter(@RequestBody ResetTrafficCounterReqDTO reqDTO) {
|
||||
try {
|
||||
Long deviceId = reqDTO.getDeviceId();
|
||||
Long newBaseValue = reqDTO.getNewBaseValue();
|
||||
|
||||
log.info("[resetTrafficCounter] 重置客流计数器: deviceId={}, newBaseValue={}, orderId={}, remark={}",
|
||||
deviceId, newBaseValue, reqDTO.getOrderId(), reqDTO.getRemark());
|
||||
|
||||
// 1. 验证设备存在
|
||||
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
|
||||
if (device == null) {
|
||||
log.warn("[resetTrafficCounter] 设备不存在: deviceId={}", deviceId);
|
||||
return success(false);
|
||||
}
|
||||
|
||||
// 2. 重置基准值
|
||||
trafficCounterBaseRedisDAO.setBaseValue(deviceId, newBaseValue);
|
||||
|
||||
log.info("[resetTrafficCounter] 客流计数器重置成功: deviceId={}, newBaseValue={}",
|
||||
deviceId, newBaseValue);
|
||||
|
||||
return success(true);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[resetTrafficCounter] 重置客流计数器失败: deviceId={}, newBaseValue={}",
|
||||
reqDTO.getDeviceId(), reqDTO.getNewBaseValue(), e);
|
||||
return success(false);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -84,6 +84,27 @@ public class TrafficCounterBaseRedisDAO {
|
||||
stringRedisTemplate.delete(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* 清除所有基准值
|
||||
* <p>
|
||||
* 用于定时任务,每天 00:00 清零所有客流计数器基准值
|
||||
*
|
||||
* @return 清除的数量
|
||||
*/
|
||||
public int resetAll() {
|
||||
String pattern = BASE_KEY_PATTERN.replace("%s", "*");
|
||||
var keys = stringRedisTemplate.keys(pattern);
|
||||
|
||||
if (keys == null || keys.isEmpty()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// 批量删除
|
||||
stringRedisTemplate.delete(keys);
|
||||
|
||||
return keys.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* 格式化 Redis Key
|
||||
*/
|
||||
|
||||
@@ -91,6 +91,9 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
|
||||
@Resource
|
||||
private com.viewsh.module.iot.service.rule.clean.processor.BeaconDetectionRuleProcessor beaconDetectionRuleProcessor;
|
||||
|
||||
@Resource
|
||||
private com.viewsh.module.iot.service.rule.clean.processor.ButtonEventRuleProcessor buttonEventRuleProcessor;
|
||||
|
||||
// ========== 设备属性相关操作 ==========
|
||||
|
||||
@Override
|
||||
@@ -194,7 +197,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
|
||||
*/
|
||||
private void processRuleProcessors(IotDeviceDO device, Map<String, Object> properties) {
|
||||
try {
|
||||
// 遍历所有属性,调用规则处理器
|
||||
// 遍历所有属性,调用规<EFBFBD><EFBFBD>处理器
|
||||
for (Map.Entry<String, Object> entry : properties.entrySet()) {
|
||||
String identifier = entry.getKey();
|
||||
Object value = entry.getValue();
|
||||
@@ -204,6 +207,9 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
|
||||
|
||||
// 调用蓝牙信标检测规则处理器
|
||||
beaconDetectionRuleProcessor.processPropertyChange(device.getId(), identifier, value);
|
||||
|
||||
// 调用按键事件规则处理器
|
||||
buttonEventRuleProcessor.processPropertyChange(device.getId(), identifier, value);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// 规则处理器异常不应阻塞属性上报主流程
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
package com.viewsh.module.iot.service.job;
|
||||
|
||||
import com.viewsh.module.iot.dal.redis.clean.TrafficCounterBaseRedisDAO;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 客流计数器基准值清零任务
|
||||
* <p>
|
||||
* 每天 00:00 执行,清除所有客流计数器的基准值缓存
|
||||
* <p>
|
||||
* 用途:确保每日客流统计从零开始
|
||||
*
|
||||
* @author AI
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class TrafficCounterBaseResetJob {
|
||||
|
||||
@Resource
|
||||
private TrafficCounterBaseRedisDAO trafficCounterBaseRedisDAO;
|
||||
|
||||
/**
|
||||
* 清零所有客流计数器基准值
|
||||
* <p>
|
||||
* XxlJob 配置:
|
||||
* - Cron: 0 0 0 * * ? (每天 00:00)
|
||||
*
|
||||
* @return 执行结果
|
||||
*/
|
||||
@XxlJob("trafficCounterBaseResetJob")
|
||||
public String execute() {
|
||||
log.info("[TrafficCounterBaseResetJob] 开始执行客流计数器基准值清零任务");
|
||||
|
||||
try {
|
||||
// 调用 Redis DAO 清除所有基准值
|
||||
int count = trafficCounterBaseRedisDAO.resetAll();
|
||||
|
||||
log.info("[TrafficCounterBaseResetJob] 客流计数器基准值清零完成: 清除数量={}", count);
|
||||
return "成功清除 " + count + " 个基准值";
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[TrafficCounterBaseResetJob] 客流计数器基准值清零失败", e);
|
||||
return "清零失败: " + e.getMessage();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -82,16 +82,17 @@ public class TrafficThresholdRuleProcessor {
|
||||
|
||||
// 4. 计算实际客流(当前值 - 基准值)
|
||||
Long baseValue = trafficBaseRedisDAO.getBaseValue(deviceId);
|
||||
Long actualCount = currentCount - (baseValue != null ? baseValue : 0L);
|
||||
|
||||
// 防止负数(设备重启后计数器归零)
|
||||
if (actualCount < 0) {
|
||||
log.warn("[TrafficThreshold] 检测到负数客流,重置基准值:deviceId={}, currentCount={}, baseValue={}",
|
||||
// 动态校准:如果 currentCount < baseValue,说明设备已重置,则自动更新 baseValue = 0
|
||||
if (baseValue != null && currentCount < baseValue) {
|
||||
log.warn("[TrafficThreshold] 检测到设备计数器重置,校准基准值:deviceId={}, currentCount={}, oldBaseValue={}",
|
||||
deviceId, currentCount, baseValue);
|
||||
trafficBaseRedisDAO.setBaseValue(deviceId, currentCount);
|
||||
actualCount = 0L;
|
||||
trafficBaseRedisDAO.setBaseValue(deviceId, 0L);
|
||||
baseValue = 0L;
|
||||
}
|
||||
|
||||
Long actualCount = currentCount - (baseValue != null ? baseValue : 0L);
|
||||
|
||||
log.debug("[TrafficThreshold] 客流统计:deviceId={}, currentCount={}, baseValue={}, actualCount={}, threshold={}",
|
||||
deviceId, currentCount, baseValue, actualCount, thresholdConfig.getThreshold());
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package com.viewsh.module.ops.environment.integration.consumer;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.viewsh.module.iot.api.device.IotDeviceControlApi;
|
||||
import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO;
|
||||
import com.viewsh.module.ops.dal.dataobject.dto.OpsOrderCreateReqDTO;
|
||||
import com.viewsh.module.ops.enums.PriorityEnum;
|
||||
import com.viewsh.module.ops.enums.SourceTypeEnum;
|
||||
@@ -56,6 +58,9 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
|
||||
@Resource
|
||||
private OpsOrderService opsOrderService;
|
||||
|
||||
@Resource
|
||||
private IotDeviceControlApi iotDeviceControlApi;
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
try {
|
||||
@@ -109,10 +114,65 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
|
||||
event.getTriggerDeviceKey()
|
||||
);
|
||||
|
||||
// 4. 如果是客流触发的工单,重置客流计数器基准值
|
||||
if ("IOT_TRAFFIC".equals(event.getTriggerSource()) && event.getTriggerData() != null) {
|
||||
resetTrafficCounter(event, orderId);
|
||||
}
|
||||
|
||||
log.info("[CleanOrderCreateEventHandler] 工单创建成功: eventId={}, orderId={}, areaId={}",
|
||||
event.getEventId(), orderId, event.getAreaId());
|
||||
}
|
||||
|
||||
/**
|
||||
* 重置客流计数器基准值
|
||||
* <p>
|
||||
* 工单创建成功后,通知 IoT 模块重置客流计数器,以便统计下一波客流
|
||||
*
|
||||
* @param event 工单创建事件
|
||||
* @param orderId 工单ID
|
||||
*/
|
||||
private void resetTrafficCounter(CleanOrderCreateEventDTO event, Long orderId) {
|
||||
try {
|
||||
// 获取当前客流值作为新的基准值
|
||||
Object currentCountObj = event.getTriggerData().get("actualCount");
|
||||
Object baseValueObj = event.getTriggerData().get("baseValue");
|
||||
|
||||
if (currentCountObj == null || baseValueObj == null) {
|
||||
log.warn("[CleanOrderCreateEventHandler] 缺少客流数据,跳过重置: eventId={}, actualCount={}, baseValue={}",
|
||||
event.getEventId(), currentCountObj, baseValueObj);
|
||||
return;
|
||||
}
|
||||
|
||||
// 计算新的基准值 = 当前值
|
||||
Long currentCount = ((Number) currentCountObj).longValue();
|
||||
Long newBaseValue = currentCount; // 将当前客流设为新的基准值
|
||||
|
||||
// 构建重置请求
|
||||
ResetTrafficCounterReqDTO reqDTO = ResetTrafficCounterReqDTO.builder()
|
||||
.deviceId(event.getTriggerDeviceId())
|
||||
.newBaseValue(newBaseValue)
|
||||
.orderId(orderId)
|
||||
.remark("工单创建后重置计数器")
|
||||
.build();
|
||||
|
||||
// 调用 IoT 模块 RPC 接口
|
||||
var result = iotDeviceControlApi.resetTrafficCounter(reqDTO);
|
||||
|
||||
if (result.getData() != null && result.getData()) {
|
||||
log.info("[CleanOrderCreateEventHandler] 客流计数器重置成功: eventId={}, deviceId={}, newBaseValue={}",
|
||||
event.getEventId(), event.getTriggerDeviceId(), newBaseValue);
|
||||
} else {
|
||||
log.warn("[CleanOrderCreateEventHandler] 客流计数器重置失败: eventId={}, deviceId={}",
|
||||
event.getEventId(), event.getTriggerDeviceId());
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
// 重置失败不应影响主流程
|
||||
log.error("[CleanOrderCreateEventHandler] 客流计数器重置异常: eventId={}, deviceId={}",
|
||||
event.getEventId(), event.getTriggerDeviceId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成工单标题
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user