From 64928f5a3f2258ad8e3d85611fa68f848506b301 Mon Sep 17 00:00:00 2001 From: lzh Date: Sat, 17 Jan 2026 17:44:30 +0800 Subject: [PATCH] =?UTF-8?q?feat(ops):=20add-iot-clean-order-integration?= =?UTF-8?q?=E9=98=B6=E6=AE=B53-=E5=AE=A2=E6=B5=81=E8=AE=A1=E6=95=B0?= =?UTF-8?q?=E5=99=A8=E9=87=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/api/device/IotDeviceControlApi.java | 8 +++ .../device/dto/ResetTrafficCounterReqDTO.java | 37 ++++++++++++ .../api/device/IotDeviceControlApiImpl.java | 38 ++++++++++++ .../clean/TrafficCounterBaseRedisDAO.java | 21 +++++++ .../IotDevicePropertyServiceImpl.java | 8 ++- .../job/TrafficCounterBaseResetJob.java | 49 +++++++++++++++ .../TrafficThresholdRuleProcessor.java | 13 ++-- .../CleanOrderCreateEventHandler.java | 60 +++++++++++++++++++ 8 files changed, 227 insertions(+), 7 deletions(-) create mode 100644 viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/device/dto/ResetTrafficCounterReqDTO.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/job/TrafficCounterBaseResetJob.java diff --git a/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/device/IotDeviceControlApi.java b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/device/IotDeviceControlApi.java index 0b96245..8b8d839 100644 --- a/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/device/IotDeviceControlApi.java +++ b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/device/IotDeviceControlApi.java @@ -3,6 +3,7 @@ package com.viewsh.module.iot.api.device; import com.viewsh.framework.common.pojo.CommonResult; import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeReqDTO; import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeRespDTO; +import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO; import com.viewsh.module.iot.enums.ApiConstants; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; @@ -22,6 +23,7 @@ import java.util.List; * 支持功能: * - 服务调用(语音播报、震动提醒等) * - 批量服务调用 + * - 重置客流计数器 * * @author lzh */ @@ -41,4 +43,10 @@ public interface IotDeviceControlApi { @Operation(summary = "批量调用设备服务") CommonResult> invokeServiceBatch(@Valid @RequestBody List reqDTOList); + // ==================== 客流计数器 ==================== + + @PostMapping(PREFIX + "/reset-traffic-counter") + @Operation(summary = "重置客流计数器基准值") + CommonResult resetTrafficCounter(@Valid @RequestBody ResetTrafficCounterReqDTO reqDTO); + } diff --git a/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/device/dto/ResetTrafficCounterReqDTO.java b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/device/dto/ResetTrafficCounterReqDTO.java new file mode 100644 index 0000000..44af1d9 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/device/dto/ResetTrafficCounterReqDTO.java @@ -0,0 +1,37 @@ +package com.viewsh.module.iot.api.device.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 重置客流计数器请求 DTO + *

+ * 用于 Ops 模块在工单创建成功后,通知 IoT 模块重置客流计数器基准值 + * + * @author AI + */ +@Schema(description = "重置客流计数器请求") +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class ResetTrafficCounterReqDTO { + + @Schema(description = "设备ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "1001") + @NotNull(message = "设备ID不能为空") + private Long deviceId; + + @Schema(description = "新的基准值", requiredMode = Schema.RequiredMode.REQUIRED, example = "1500") + @NotNull(message = "基准值不能为空") + private Long newBaseValue; + + @Schema(description = "关联的工单ID(用于日志追踪)", example = "12345") + private Long orderId; + + @Schema(description = "操作说明", example = "工单创建后重置计数器") + private String remark; +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/device/IotDeviceControlApiImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/device/IotDeviceControlApiImpl.java index c72af38..6f9cedf 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/device/IotDeviceControlApiImpl.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/device/IotDeviceControlApiImpl.java @@ -4,10 +4,12 @@ import cn.hutool.core.map.MapUtil; import com.viewsh.framework.common.pojo.CommonResult; import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeReqDTO; import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeRespDTO; +import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO; import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum; import com.viewsh.module.iot.core.enums.IotDeviceStateEnum; import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO; +import com.viewsh.module.iot.dal.redis.clean.TrafficCounterBaseRedisDAO; import com.viewsh.module.iot.service.device.IotDeviceService; import com.viewsh.module.iot.service.device.message.IotDeviceMessageService; import io.swagger.v3.oas.annotations.Operation; @@ -45,6 +47,9 @@ public class IotDeviceControlApiImpl implements IotDeviceControlApi { @Resource private IotDeviceMessageService deviceMessageService; + @Resource + private TrafficCounterBaseRedisDAO trafficCounterBaseRedisDAO; + @Override @PostMapping(PREFIX + "/invoke-service") @Operation(summary = "调用设备服务") @@ -119,5 +124,38 @@ public class IotDeviceControlApiImpl implements IotDeviceControlApi { return success(results); } + @Override + @PostMapping(PREFIX + "/reset-traffic-counter") + @Operation(summary = "重置客流计数器基准值") + public CommonResult resetTrafficCounter(@RequestBody ResetTrafficCounterReqDTO reqDTO) { + try { + Long deviceId = reqDTO.getDeviceId(); + Long newBaseValue = reqDTO.getNewBaseValue(); + + log.info("[resetTrafficCounter] 重置客流计数器: deviceId={}, newBaseValue={}, orderId={}, remark={}", + deviceId, newBaseValue, reqDTO.getOrderId(), reqDTO.getRemark()); + + // 1. 验证设备存在 + IotDeviceDO device = deviceService.getDeviceFromCache(deviceId); + if (device == null) { + log.warn("[resetTrafficCounter] 设备不存在: deviceId={}", deviceId); + return success(false); + } + + // 2. 重置基准值 + trafficCounterBaseRedisDAO.setBaseValue(deviceId, newBaseValue); + + log.info("[resetTrafficCounter] 客流计数器重置成功: deviceId={}, newBaseValue={}", + deviceId, newBaseValue); + + return success(true); + + } catch (Exception e) { + log.error("[resetTrafficCounter] 重置客流计数器失败: deviceId={}, newBaseValue={}", + reqDTO.getDeviceId(), reqDTO.getNewBaseValue(), e); + return success(false); + } + } + } diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/redis/clean/TrafficCounterBaseRedisDAO.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/redis/clean/TrafficCounterBaseRedisDAO.java index 2cc9a78..8e05ac7 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/redis/clean/TrafficCounterBaseRedisDAO.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/redis/clean/TrafficCounterBaseRedisDAO.java @@ -84,6 +84,27 @@ public class TrafficCounterBaseRedisDAO { stringRedisTemplate.delete(key); } + /** + * 清除所有基准值 + *

+ * 用于定时任务,每天 00:00 清零所有客流计数器基准值 + * + * @return 清除的数量 + */ + public int resetAll() { + String pattern = BASE_KEY_PATTERN.replace("%s", "*"); + var keys = stringRedisTemplate.keys(pattern); + + if (keys == null || keys.isEmpty()) { + return 0; + } + + // 批量删除 + stringRedisTemplate.delete(keys); + + return keys.size(); + } + /** * 格式化 Redis Key */ diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java index 7412be9..3afb5e6 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java @@ -91,6 +91,9 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { @Resource private com.viewsh.module.iot.service.rule.clean.processor.BeaconDetectionRuleProcessor beaconDetectionRuleProcessor; + @Resource + private com.viewsh.module.iot.service.rule.clean.processor.ButtonEventRuleProcessor buttonEventRuleProcessor; + // ========== 设备属性相关操作 ========== @Override @@ -194,7 +197,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { */ private void processRuleProcessors(IotDeviceDO device, Map properties) { try { - // 遍历所有属性,调用规则处理器 + // 遍历所有属性,调用规��处理器 for (Map.Entry entry : properties.entrySet()) { String identifier = entry.getKey(); Object value = entry.getValue(); @@ -204,6 +207,9 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { // 调用蓝牙信标检测规则处理器 beaconDetectionRuleProcessor.processPropertyChange(device.getId(), identifier, value); + + // 调用按键事件规则处理器 + buttonEventRuleProcessor.processPropertyChange(device.getId(), identifier, value); } } catch (Exception e) { // 规则处理器异常不应阻塞属性上报主流程 diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/job/TrafficCounterBaseResetJob.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/job/TrafficCounterBaseResetJob.java new file mode 100644 index 0000000..cfb2cf6 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/job/TrafficCounterBaseResetJob.java @@ -0,0 +1,49 @@ +package com.viewsh.module.iot.service.job; + +import com.viewsh.module.iot.dal.redis.clean.TrafficCounterBaseRedisDAO; +import com.xxl.job.core.handler.annotation.XxlJob; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 客流计数器基准值清零任务 + *

+ * 每天 00:00 执行,清除所有客流计数器的基准值缓存 + *

+ * 用途:确保每日客流统计从零开始 + * + * @author AI + */ +@Slf4j +@Component +public class TrafficCounterBaseResetJob { + + @Resource + private TrafficCounterBaseRedisDAO trafficCounterBaseRedisDAO; + + /** + * 清零所有客流计数器基准值 + *

+ * XxlJob 配置: + * - Cron: 0 0 0 * * ? (每天 00:00) + * + * @return 执行结果 + */ + @XxlJob("trafficCounterBaseResetJob") + public String execute() { + log.info("[TrafficCounterBaseResetJob] 开始执行客流计数器基准值清零任务"); + + try { + // 调用 Redis DAO 清除所有基准值 + int count = trafficCounterBaseRedisDAO.resetAll(); + + log.info("[TrafficCounterBaseResetJob] 客流计数器基准值清零完成: 清除数量={}", count); + return "成功清除 " + count + " 个基准值"; + + } catch (Exception e) { + log.error("[TrafficCounterBaseResetJob] 客流计数器基准值清零失败", e); + return "清零失败: " + e.getMessage(); + } + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java index 354eea2..f907f66 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java @@ -82,16 +82,17 @@ public class TrafficThresholdRuleProcessor { // 4. 计算实际客流(当前值 - 基准值) Long baseValue = trafficBaseRedisDAO.getBaseValue(deviceId); - Long actualCount = currentCount - (baseValue != null ? baseValue : 0L); - // 防止负数(设备重启后计数器归零) - if (actualCount < 0) { - log.warn("[TrafficThreshold] 检测到负数客流,重置基准值:deviceId={}, currentCount={}, baseValue={}", + // 动态校准:如果 currentCount < baseValue,说明设备已重置,则自动更新 baseValue = 0 + if (baseValue != null && currentCount < baseValue) { + log.warn("[TrafficThreshold] 检测到设备计数器重置,校准基准值:deviceId={}, currentCount={}, oldBaseValue={}", deviceId, currentCount, baseValue); - trafficBaseRedisDAO.setBaseValue(deviceId, currentCount); - actualCount = 0L; + trafficBaseRedisDAO.setBaseValue(deviceId, 0L); + baseValue = 0L; } + Long actualCount = currentCount - (baseValue != null ? baseValue : 0L); + log.debug("[TrafficThreshold] 客流统计:deviceId={}, currentCount={}, baseValue={}, actualCount={}, threshold={}", deviceId, currentCount, baseValue, actualCount, thresholdConfig.getThreshold()); diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCreateEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCreateEventHandler.java index 3fa6c4a..ac88c41 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCreateEventHandler.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCreateEventHandler.java @@ -1,6 +1,8 @@ package com.viewsh.module.ops.environment.integration.consumer; import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.iot.api.device.IotDeviceControlApi; +import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO; import com.viewsh.module.ops.dal.dataobject.dto.OpsOrderCreateReqDTO; import com.viewsh.module.ops.enums.PriorityEnum; import com.viewsh.module.ops.enums.SourceTypeEnum; @@ -56,6 +58,9 @@ public class CleanOrderCreateEventHandler implements RocketMQListener { @Resource private OpsOrderService opsOrderService; + @Resource + private IotDeviceControlApi iotDeviceControlApi; + @Override public void onMessage(String message) { try { @@ -109,10 +114,65 @@ public class CleanOrderCreateEventHandler implements RocketMQListener { event.getTriggerDeviceKey() ); + // 4. 如果是客流触发的工单,重置客流计数器基准值 + if ("IOT_TRAFFIC".equals(event.getTriggerSource()) && event.getTriggerData() != null) { + resetTrafficCounter(event, orderId); + } + log.info("[CleanOrderCreateEventHandler] 工单创建成功: eventId={}, orderId={}, areaId={}", event.getEventId(), orderId, event.getAreaId()); } + /** + * 重置客流计数器基准值 + *

+ * 工单创建成功后,通知 IoT 模块重置客流计数器,以便统计下一波客流 + * + * @param event 工单创建事件 + * @param orderId 工单ID + */ + private void resetTrafficCounter(CleanOrderCreateEventDTO event, Long orderId) { + try { + // 获取当前客流值作为新的基准值 + Object currentCountObj = event.getTriggerData().get("actualCount"); + Object baseValueObj = event.getTriggerData().get("baseValue"); + + if (currentCountObj == null || baseValueObj == null) { + log.warn("[CleanOrderCreateEventHandler] 缺少客流数据,跳过重置: eventId={}, actualCount={}, baseValue={}", + event.getEventId(), currentCountObj, baseValueObj); + return; + } + + // 计算新的基准值 = 当前值 + Long currentCount = ((Number) currentCountObj).longValue(); + Long newBaseValue = currentCount; // 将当前客流设为新的基准值 + + // 构建重置请求 + ResetTrafficCounterReqDTO reqDTO = ResetTrafficCounterReqDTO.builder() + .deviceId(event.getTriggerDeviceId()) + .newBaseValue(newBaseValue) + .orderId(orderId) + .remark("工单创建后重置计数器") + .build(); + + // 调用 IoT 模块 RPC 接口 + var result = iotDeviceControlApi.resetTrafficCounter(reqDTO); + + if (result.getData() != null && result.getData()) { + log.info("[CleanOrderCreateEventHandler] 客流计数器重置成功: eventId={}, deviceId={}, newBaseValue={}", + event.getEventId(), event.getTriggerDeviceId(), newBaseValue); + } else { + log.warn("[CleanOrderCreateEventHandler] 客流计数器重置失败: eventId={}, deviceId={}", + event.getEventId(), event.getTriggerDeviceId()); + } + + } catch (Exception e) { + // 重置失败不应影响主流程 + log.error("[CleanOrderCreateEventHandler] 客流计数器重置异常: eventId={}, deviceId={}", + event.getEventId(), event.getTriggerDeviceId(), e); + } + } + /** * 生成工单标题 */