From 5142b38d12b32f982cfbb878ba8c7cce13be41a5 Mon Sep 17 00:00:00 2001 From: lzh Date: Thu, 29 Jan 2026 11:35:11 +0800 Subject: [PATCH] refactor(ops): optimize badge device status management and add sync job --- .../api/device/IotDeviceStatusQueryApi.java | 9 +- .../device/IotDeviceStatusQueryApiImpl.java | 47 ++++- .../BadgeDeviceStatusEventHandler.java | 188 +++++++++++++++++ .../dto/IotDeviceStatusChangedEventDTO.java | 104 ++++++++++ .../job/BadgeDeviceStatusInitializer.java | 40 ++++ .../job/BadgeDeviceStatusSyncJob.java | 192 ++++++++++++++++++ .../badge/BadgeDeviceStatusService.java | 48 +++-- .../badge/BadgeDeviceStatusServiceImpl.java | 83 +++++++- .../viewsh-module-ops-biz/pom.xml | 7 + 9 files changed, 690 insertions(+), 28 deletions(-) create mode 100644 viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/BadgeDeviceStatusEventHandler.java create mode 100644 viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/IotDeviceStatusChangedEventDTO.java create mode 100644 viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/BadgeDeviceStatusInitializer.java create mode 100644 viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/BadgeDeviceStatusSyncJob.java diff --git a/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/device/IotDeviceStatusQueryApi.java b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/device/IotDeviceStatusQueryApi.java index 1b9429f..6c2c663 100644 --- a/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/device/IotDeviceStatusQueryApi.java +++ b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/device/IotDeviceStatusQueryApi.java @@ -10,8 +10,10 @@ import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; +import java.util.List; + /** - * IoT 设备状态��询 API + * IoT 设备状态查询 API *

* 提供 RPC 接口供其他模块(如 Ops 模块)查询设备状态 * @@ -33,4 +35,9 @@ public interface IotDeviceStatusQueryApi { @Parameter(name = "deviceId", description = "设备ID", required = true, example = "1") CommonResult getDeviceStatus(@RequestParam("deviceId") Long deviceId); + @GetMapping(PREFIX + "/batch-get-status") + @Operation(summary = "批量获取设备状态") + @Parameter(name = "deviceIds", description = "设备ID列表", required = true, example = "[1,2,3]") + CommonResult> batchGetDeviceStatus(@RequestParam("deviceIds") List deviceIds); + } diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/device/IotDeviceStatusQueryApiImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/device/IotDeviceStatusQueryApiImpl.java index 52d9be2..672fb41 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/device/IotDeviceStatusQueryApiImpl.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/device/IotDeviceStatusQueryApiImpl.java @@ -14,6 +14,8 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import java.util.List; + import static com.viewsh.framework.common.pojo.CommonResult.success; /** @@ -54,13 +56,46 @@ public class IotDeviceStatusQueryApiImpl implements IotDeviceStatusQueryApi { .status(IotDeviceStateEnum.OFFLINE.getState()) .build()); } - return success(DeviceStatusRespDTO.builder() + return success(buildDeviceStatusRespDTO(device)); + } + + @Override + @GetMapping(PREFIX + "/batch-get-status") + @Operation(summary = "批量获取设备状态") + public CommonResult> batchGetDeviceStatus( + @RequestParam("deviceIds") List deviceIds) { + if (deviceIds == null || deviceIds.isEmpty()) { + return success(List.of()); + } + + List result = deviceIds.stream() + .map(deviceId -> { + IotDeviceDO device = deviceService.getDeviceFromCache(deviceId); + if (device == null) { + return DeviceStatusRespDTO.builder() + .deviceId(deviceId) + .status(IotDeviceStateEnum.OFFLINE.getState()) + .build(); + } + return buildDeviceStatusRespDTO(device); + }) + .toList(); + + return success(result); + } + + /** + * 构建设备状态响应 DTO + */ + private DeviceStatusRespDTO buildDeviceStatusRespDTO(IotDeviceDO device) { + return DeviceStatusRespDTO.builder() .deviceId(device.getId()) - .deviceCode(device.getSerialNumber()) - .status(device.getState()) - .statusChangeTime(IotDeviceStateEnum.ONLINE.getState().equals(device.getState()) ? - device.getOnlineTime() : device.getOfflineTime()) - .build()); + .deviceCode(device.getSerialNumber()) + .status(device.getState()) + .statusChangeTime( + IotDeviceStateEnum.ONLINE.getState().equals(device.getState()) ? device.getOnlineTime() + : device.getOfflineTime()) + .build(); } } diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/BadgeDeviceStatusEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/BadgeDeviceStatusEventHandler.java new file mode 100644 index 0000000..26a4492 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/BadgeDeviceStatusEventHandler.java @@ -0,0 +1,188 @@ +package com.viewsh.module.ops.environment.integration.consumer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO; +import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum; +import com.viewsh.module.ops.environment.integration.dto.IotDeviceStatusChangedEventDTO; +import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusService; +import com.viewsh.module.ops.service.area.AreaDeviceService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +/** + * 工牌设备状态变更事件消费者 + *

+ * 监听 IoT 模块发布的设备状态变更事件(上线/离线) + *

+ * 调用链路: + * + *

+ * IoT 设备连接/断开
+ *     ↓
+ * IotDeviceServiceImpl.updateDeviceState()
+ *     ↓
+ * RocketMQ Topic: integration-device-status
+ *     ↓
+ * BadgeDeviceStatusEventHandler.onMessage()
+ *     - 幂等性检查
+ *     - 判断是否为工牌设备
+ *     ↓
+ * badgeDeviceStatusService.updateBadgeOnlineStatus()
+ *     - 创建/更新 Redis 状态记录
+ * 
+ * + * @author AI + */ +@Slf4j +@Component +@RocketMQMessageListener(topic = "integration-device-status", + consumerGroup = "ops-badge-status-consumer", + consumeMode = ConsumeMode.CONCURRENTLY, + selectorExpression = "*") +public class BadgeDeviceStatusEventHandler implements RocketMQListener { + + /** + * 幂等性控制 Key 模式 + */ + private static final String DEDUP_KEY_PATTERN = "ops:badge:dedup:status:%s"; + + /** + * 幂等性控制 TTL(秒) + */ + private static final int DEDUP_TTL_SECONDS = 120; + + @Resource + private ObjectMapper objectMapper; + + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Resource + private BadgeDeviceStatusService badgeDeviceStatusService; + + @Resource + private AreaDeviceService areaDeviceService; + + @Override + public void onMessage(String message) { + try { + // 1. JSON 反序列化 + IotDeviceStatusChangedEventDTO event = objectMapper.readValue(message, + IotDeviceStatusChangedEventDTO.class); + + log.debug("[BadgeDeviceStatusEventHandler] 收到设备状态变更事件: eventId={}, deviceId={}, {} -> {}", + event.getEventId(), event.getDeviceId(), event.getOldStatus(), event.getNewStatus()); + + // 2. 幂等性检查 + String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId()); + Boolean firstTime = stringRedisTemplate.opsForValue() + .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); + + if (Boolean.FALSE.equals(firstTime)) { + log.debug("[BadgeDeviceStatusEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); + return; + } + + // 3. 判断是否为工牌设备 + if (!isBadgeDevice(event.getDeviceId())) { + log.debug("[BadgeDeviceStatusEventHandler] 非工牌设备,跳过处理: deviceId={}", event.getDeviceId()); + return; + } + + // 4. 处理状态变更 + handleDeviceStatusChange(event); + + } catch (Exception e) { + log.error("[BadgeDeviceStatusEventHandler] 消息处理失败: message={}", message, e); + // 不抛出异常,避免消息重试导致重复处理 + } + } + + /** + * 处理设备状态变更 + */ + private void handleDeviceStatusChange(IotDeviceStatusChangedEventDTO event) { + Long deviceId = event.getDeviceId(); + String deviceCode = event.getDeviceName(); + + // 获取设备所属区域 + Long areaId = getAreaIdByDeviceId(deviceId); + + if (event.isOnline()) { + // 设备上线 + log.info("[BadgeDeviceStatusEventHandler] 工牌设备上线: deviceId={}, deviceCode={}, areaId={}", + deviceId, deviceCode, areaId); + + badgeDeviceStatusService.updateBadgeOnlineStatus( + deviceId, + deviceCode, + areaId, + BadgeDeviceStatusEnum.IDLE, + "设备上线"); + + } else if (event.isOffline()) { + // 设备离线 + log.info("[BadgeDeviceStatusEventHandler] 工牌设备离线: deviceId={}, deviceCode={}", + deviceId, deviceCode); + + badgeDeviceStatusService.updateBadgeOnlineStatus( + deviceId, + deviceCode, + null, + BadgeDeviceStatusEnum.OFFLINE, + "设备离线"); + + } else { + // 其他状态(如 INACTIVE),暂不处理 + log.debug("[BadgeDeviceStatusEventHandler] 忽略状态变更: deviceId={}, newStatus={}", + deviceId, event.getNewStatus()); + } + } + + /** + * 判断是否为工牌设备 + *

+ * 通过查询 ops_area_device_relation 表判断设备是否为 BADGE 类型 + */ + private boolean isBadgeDevice(Long deviceId) { + if (deviceId == null) { + return false; + } + + try { + // 通过区域设备关系判断是否为工牌设备 + OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId); + return relation != null && "BADGE".equals(relation.getRelationType()); + } catch (Exception e) { + log.warn("[BadgeDeviceStatusEventHandler] 查询设备类型失败: deviceId={}", deviceId, e); + return false; + } + } + + /** + * 获取设备所属区域ID + */ + private Long getAreaIdByDeviceId(Long deviceId) { + if (deviceId == null) { + return null; + } + + try { + OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId); + if (relation != null && "BADGE".equals(relation.getRelationType())) { + return relation.getAreaId(); + } + return null; + } catch (Exception e) { + log.warn("[BadgeDeviceStatusEventHandler] 获取设备区域失败: deviceId={}", deviceId, e); + return null; + } + } +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/IotDeviceStatusChangedEventDTO.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/IotDeviceStatusChangedEventDTO.java new file mode 100644 index 0000000..f049df0 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/IotDeviceStatusChangedEventDTO.java @@ -0,0 +1,104 @@ +package com.viewsh.module.ops.environment.integration.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +/** + * IoT 设备状态变更事件 DTO + *

+ * 用于接收 IoT 模块发布的设备状态变更事件 + * 解耦 Ops 模块与 iot-core 的依赖 + * + * @author AI + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class IotDeviceStatusChangedEventDTO { + + /** + * 事件ID(唯一标识,用于幂等性处理) + */ + private String eventId; + + /** + * 设备ID + */ + private Long deviceId; + + /** + * 设备名称(deviceName) + */ + private String deviceName; + + /** + * 产品ID + */ + private Long productId; + + /** + * 产品标识符(productKey) + */ + private String productKey; + + /** + * 租户ID + */ + private Long tenantId; + + /** + * 事件时间 + */ + private LocalDateTime eventTime; + + /** + * 旧状态(0=INACTIVE, 1=ONLINE, 2=OFFLINE) + */ + private Integer oldStatus; + + /** + * 新状态(0=INACTIVE, 1=ONLINE, 2=OFFLINE) + */ + private Integer newStatus; + + /** + * 状态变更原因 + */ + private String reason; + + // ==================== 状态常量 ==================== + + /** + * 设备状态:在线 + */ + public static final Integer STATUS_ONLINE = 1; + + /** + * 设备状态:离线 + */ + public static final Integer STATUS_OFFLINE = 2; + + /** + * 设备状态:未激活 + */ + public static final Integer STATUS_INACTIVE = 0; + + /** + * 判断是否上线 + */ + public boolean isOnline() { + return STATUS_ONLINE.equals(newStatus); + } + + /** + * 判断是否离线 + */ + public boolean isOffline() { + return STATUS_OFFLINE.equals(newStatus); + } +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/BadgeDeviceStatusInitializer.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/BadgeDeviceStatusInitializer.java new file mode 100644 index 0000000..01c2d5d --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/BadgeDeviceStatusInitializer.java @@ -0,0 +1,40 @@ +package com.viewsh.module.ops.environment.job; + +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +/** + * 工牌设备状态启动初始化器 + *

+ * 职责:服务启动时,主动拉取所有工牌设备的当前状态,确保 Redis 数据正确 + *

+ * 解决场景: + * - 服务重启期间的状态事件丢失 + * - 首次部署时的状态初始化 + * + * @author AI + */ +@Slf4j +@Component +public class BadgeDeviceStatusInitializer implements ApplicationRunner { + + @Resource + private BadgeDeviceStatusSyncJob syncJob; + + @Override + public void run(ApplicationArguments args) { + log.info("[初始化] 服务启动,开始同步工牌设备状态..."); + + try { + // 复用定时对账逻辑 + BadgeDeviceStatusSyncJob.SyncResult result = syncJob.syncAllBadgeDeviceStatus(); + log.info("[初始化] 工牌设备状态同步完成:处理 {} 台,修正 {} 台,耗时 {} ms", + result.syncCount(), result.correctedCount(), result.durationMs()); + } catch (Exception e) { + log.error("[初始化] 工牌设备状态同步失败", e); + } + } +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/BadgeDeviceStatusSyncJob.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/BadgeDeviceStatusSyncJob.java new file mode 100644 index 0000000..c3eaf35 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/BadgeDeviceStatusSyncJob.java @@ -0,0 +1,192 @@ +package com.viewsh.module.ops.environment.job; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; +import com.viewsh.framework.common.pojo.CommonResult; +import com.viewsh.framework.tenant.core.job.TenantJob; +import com.viewsh.module.iot.api.device.IotDeviceStatusQueryApi; +import com.viewsh.module.iot.api.device.dto.status.DeviceStatusRespDTO; +import com.viewsh.module.ops.api.badge.BadgeDeviceStatusDTO; +import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO; +import com.viewsh.module.ops.dal.mysql.area.OpsAreaDeviceRelationMapper; +import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum; +import com.viewsh.module.ops.environment.integration.dto.IotDeviceStatusChangedEventDTO; +import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusService; +import com.xxl.job.core.handler.annotation.XxlJob; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * 工牌设备状态定时对账 Job + *

+ * 职责: + * 1. 定时从 IoT 模块查询所有工牌设备的当前状态 + * 2. 与 Ops 模块本地状态对比并修正 + *

+ * 场景: + * - 服务重启后恢复状态 + * - 修正因消息丢失导致的状态不一致 + * - 发现新注册的工牌设备 + *

+ * XXL-Job 配置: + * - JobHandler: badgeDeviceStatusSyncJob + * - Cron: 0 0/5 * * * ? (每 5 分钟) + * + * @author AI + */ +@Slf4j +@Component +public class BadgeDeviceStatusSyncJob { + + @Resource + private IotDeviceStatusQueryApi iotDeviceStatusQueryApi; + + @Resource + private OpsAreaDeviceRelationMapper areaDeviceRelationMapper; + + @Resource + private BadgeDeviceStatusService badgeDeviceStatusService; + + /** + * 执行全量对账 + *

+ * XXL-Job 调度入口 + * + * @return 执行结果 + */ + @XxlJob("badgeDeviceStatusSyncJob") + @TenantJob + public String execute() { + try { + SyncResult result = syncAllBadgeDeviceStatus(); + return StrUtil.format("工牌状态对账完成: 处理 {} 台,修正 {} 台,耗时 {} ms", + result.syncCount, result.correctedCount, result.durationMs); + } catch (Exception e) { + log.error("[SyncJob] 全量对账失败", e); + return StrUtil.format("工牌状态对账失败: {}", e.getMessage()); + } + } + + /** + * 执行全量对账(返回结果) + *

+ * 可由 XXL-Job 或 ApplicationRunner 调用 + * + * @return 同步结果 + */ + public SyncResult syncAllBadgeDeviceStatus() { + log.info("[SyncJob] 开始全量对账工牌设备状态"); + long startTime = System.currentTimeMillis(); + int syncCount = 0; + int correctedCount = 0; + + // 1. 获取所有工牌设备关联关系 + List badgeRelations = areaDeviceRelationMapper + .selectListByAreaIdAndRelationType(null, "BADGE"); + + if (CollUtil.isEmpty(badgeRelations)) { + log.info("[SyncJob] 无工牌设备,跳过对账"); + return new SyncResult(0, 0, System.currentTimeMillis() - startTime); + } + + List deviceIds = badgeRelations.stream() + .map(OpsAreaDeviceRelationDO::getDeviceId) + .distinct() + .toList(); + + log.info("[SyncJob] 待对账工牌设备数量: {}", deviceIds.size()); + + // 2. 批量查询 IoT 设备状态 + CommonResult> iotResult = iotDeviceStatusQueryApi.batchGetDeviceStatus(deviceIds); + if (!iotResult.isSuccess() || iotResult.getData() == null) { + log.error("[SyncJob] 查询 IoT 设备状态失败: {}", iotResult.getMsg()); + throw new RuntimeException("查询 IoT 设备状态失败: " + iotResult.getMsg()); + } + + // 3. 构建设备关系映射(deviceId -> areaId) + Map deviceAreaMap = badgeRelations.stream() + .collect(Collectors.toMap( + OpsAreaDeviceRelationDO::getDeviceId, + OpsAreaDeviceRelationDO::getAreaId, + (existing, replacement) -> existing)); + + // 4. 逐一对账并修正 + for (DeviceStatusRespDTO iotStatus : iotResult.getData()) { + boolean corrected = syncSingleDevice(iotStatus, deviceAreaMap.get(iotStatus.getDeviceId())); + syncCount++; + if (corrected) { + correctedCount++; + } + } + + long duration = System.currentTimeMillis() - startTime; + log.info("[SyncJob] 全量对账完成,共处理 {} 台设备,修正 {} 台,耗时 {} ms", + syncCount, correctedCount, duration); + + return new SyncResult(syncCount, correctedCount, duration); + } + + /** + * 对账单个设备 + * + * @param iotStatus IoT 设备状态 + * @param areaId 设备所属区域ID + * @return 是否进行了修正 + */ + private boolean syncSingleDevice(DeviceStatusRespDTO iotStatus, Long areaId) { + Long deviceId = iotStatus.getDeviceId(); + + try { + // 获取 Ops 当前状态 + BadgeDeviceStatusDTO opsStatus = badgeDeviceStatusService.getBadgeStatus(deviceId); + + // 判断是否需要同步(使用本地常量) + boolean iotOnline = IotDeviceStatusChangedEventDTO.STATUS_ONLINE.equals(iotStatus.getStatus()); + boolean opsOnline = opsStatus != null && opsStatus.getStatus() != null + && opsStatus.getStatus().isActive(); + + // 如果状态一致,无需修正 + if (iotOnline == opsOnline) { + return false; + } + + // 状态不一致,以 IoT 为准进行修正 + if (iotOnline) { + badgeDeviceStatusService.updateBadgeOnlineStatus( + deviceId, + iotStatus.getDeviceCode(), + areaId, + BadgeDeviceStatusEnum.IDLE, + "定时对账修正-上线"); + log.info("[SyncJob] 修正设备状态:deviceId={}, IoT=ONLINE, Ops={}", + deviceId, opsOnline ? "ACTIVE" : "OFFLINE/NULL"); + } else { + badgeDeviceStatusService.updateBadgeOnlineStatus( + deviceId, + iotStatus.getDeviceCode(), + null, + BadgeDeviceStatusEnum.OFFLINE, + "定时对账修正-离线"); + log.info("[SyncJob] 修正设备状态:deviceId={}, IoT=OFFLINE, Ops=ACTIVE", + deviceId); + } + + return true; + + } catch (Exception e) { + log.error("[SyncJob] 对账设备失败:deviceId={}", deviceId, e); + return false; + } + } + + /** + * 同步结果 + */ + public record SyncResult(int syncCount, int correctedCount, long durationMs) { + } +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusService.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusService.java index b25eb52..ad2c298 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusService.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusService.java @@ -31,17 +31,17 @@ public interface BadgeDeviceStatusService { *

* 状态转换会记录状态变更时间和操作原因 * - * @param deviceId 设备ID - * @param status 目标状态 + * @param deviceId 设备ID + * @param status 目标状态 * @param operatorId 操作人ID(可为null,表示系统操作) - * @param reason 状态变更原因 + * @param reason 状态变更原因 */ void updateBadgeStatus(Long deviceId, BadgeDeviceStatusEnum status, Long operatorId, String reason); /** * 批量更新工牌设备状态 * - * @param deviceIds 设备ID列表 + * @param deviceIds 设备ID列表 * @param status 目标状态 * @param operatorId 操作人ID * @param reason 状态变更原因 @@ -98,9 +98,9 @@ public interface BadgeDeviceStatusService { *

* 更新最后心跳时间和电量: *

* * @param deviceId 设备ID @@ -119,7 +119,29 @@ public interface BadgeDeviceStatusService { * @param areaName 当前所在区域名称 */ void handleHeartbeatWithArea(Long deviceId, String deviceCode, Integer batteryLevel, - Long areaId, String areaName); + Long areaId, String areaName); + + /** + * 更新工牌设备在线状态 + *

+ * 用于处理 IoT 设备状态变更事件和定时对账: + *

+ *

+ * 与 handleHeartbeat 区别: + * - handleHeartbeat:处理位置上报心跳,包含电量等详细信息 + * - updateBadgeOnlineStatus:仅处理在线/离线状态变更 + * + * @param deviceId 设备ID + * @param deviceCode 设备编码 + * @param areaId 区域ID(可为null) + * @param status 目标状态(IDLE 或 OFFLINE) + * @param reason 状态变更原因 + */ + void updateBadgeOnlineStatus(Long deviceId, String deviceCode, Long areaId, + BadgeDeviceStatusEnum status, String reason); // ==================== 在线状态检查 ==================== @@ -163,11 +185,11 @@ public interface BadgeDeviceStatusService { * 一次性设置工单ID、工单状态、区域ID、信标MAC * 使用 Redis Hash 的 hset() 操作,保证原子性 * - * @param deviceId 设备ID - * @param orderId 工单ID - * @param orderStatus 工单状态(DISPATCHED/ARRIVED/PAUSED) - * @param areaId 区域ID - * @param beaconMac 信标MAC地址(可为null) + * @param deviceId 设备ID + * @param orderId 工单ID + * @param orderStatus 工单状态(DISPATCHED/ARRIVED/PAUSED) + * @param areaId 区域ID + * @param beaconMac 信标MAC地址(可为null) */ void setCurrentOrderInfo(Long deviceId, Long orderId, String orderStatus, Long areaId, String beaconMac); diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusServiceImpl.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusServiceImpl.java index b219dc3..e209d19 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusServiceImpl.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusServiceImpl.java @@ -1,8 +1,6 @@ package com.viewsh.module.ops.environment.service.badge; -import com.fasterxml.jackson.databind.ObjectMapper; import com.viewsh.module.ops.api.badge.BadgeDeviceStatusDTO; -import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO; import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum; import com.viewsh.module.ops.service.area.AreaDeviceService; import jakarta.annotation.Resource; @@ -29,7 +27,9 @@ import java.util.stream.Collectors; * 4. 心跳超时检查 *

* 设计说明: - * - 状态变更事件由 {@link com.viewsh.module.ops.environment.integration.listener.BadgeDeviceStatusEventListener} 处理 + * - 状态变更事件由 + * {@link com.viewsh.module.ops.environment.integration.listener.BadgeDeviceStatusEventListener} + * 处理 * - 本类只提供基础的服务方法 * * @author lzh @@ -41,9 +41,6 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I @Resource private RedisTemplate redisTemplate; - @Resource - private ObjectMapper objectMapper; - @Resource private AreaDeviceService areaDeviceService; @@ -119,7 +116,8 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I } @Override - public void batchUpdateBadgeStatus(List deviceIds, BadgeDeviceStatusEnum status, Long operatorId, String reason) { + public void batchUpdateBadgeStatus(List deviceIds, BadgeDeviceStatusEnum status, Long operatorId, + String reason) { if (deviceIds == null || deviceIds.isEmpty() || status == null) { return; } @@ -256,7 +254,7 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I @Override public void handleHeartbeatWithArea(Long deviceId, String deviceCode, Integer batteryLevel, - Long areaId, String areaName) { + Long areaId, String areaName) { if (deviceId == null) { return; } @@ -315,6 +313,75 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I } } + @Override + public void updateBadgeOnlineStatus(Long deviceId, String deviceCode, Long areaId, + BadgeDeviceStatusEnum status, String reason) { + if (deviceId == null || status == null) { + return; + } + + try { + String key = BADGE_STATUS_KEY_PREFIX + deviceId; + Long now = System.currentTimeMillis(); + + // 获取当前状态 + Map currentMap = redisTemplate.opsForHash().entries(key); + + // 构建状态数据 + Map statusMap = new HashMap<>(); + statusMap.put("deviceId", deviceId); + statusMap.put("deviceCode", deviceCode != null ? deviceCode : currentMap.get("deviceCode")); + statusMap.put("status", status.getCode()); + statusMap.put("statusChangeTime", LocalDateTime.now().toString()); + statusMap.put("statusChangeReason", reason); + + // 如果是上线(非 OFFLINE 状态),更新心跳时间 + if (status.isActive()) { + statusMap.put("lastHeartbeatTime", now); + } + + // 更新区域信息 + if (areaId != null) { + statusMap.put("currentAreaId", areaId); + // 更新区域索引 + addToAreaIndex(deviceId, areaId); + } else { + // 保持现有区域信息 + Object existingAreaId = currentMap.get("currentAreaId"); + if (existingAreaId != null) { + statusMap.put("currentAreaId", existingAreaId); + } + Object existingAreaName = currentMap.get("currentAreaName"); + if (existingAreaName != null) { + statusMap.put("currentAreaName", existingAreaName); + } + } + + // 保持当前工单相关字段(如果存在) + if (currentMap.containsKey("currentOpsOrderId")) { + statusMap.put("currentOpsOrderId", currentMap.get("currentOpsOrderId")); + } + if (currentMap.containsKey("currentOrderStatus")) { + statusMap.put("currentOrderStatus", currentMap.get("currentOrderStatus")); + } + if (currentMap.containsKey("beaconMac")) { + statusMap.put("beaconMac", currentMap.get("beaconMac")); + } + if (currentMap.containsKey("batteryLevel")) { + statusMap.put("batteryLevel", currentMap.get("batteryLevel")); + } + + redisTemplate.opsForHash().putAll(key, statusMap); + redisTemplate.expire(key, STATUS_EXPIRE_HOURS, TimeUnit.HOURS); + + log.info("[updateBadgeOnlineStatus] 更新工牌在线状态: deviceId={}, deviceCode={}, status={}, reason={}", + deviceId, deviceCode, status, reason); + + } catch (Exception e) { + log.error("[updateBadgeOnlineStatus] 更新工牌在线状态失败: deviceId={}", deviceId, e); + } + } + // ==================== 在线状态检查 ==================== @Override diff --git a/viewsh-module-ops/viewsh-module-ops-biz/pom.xml b/viewsh-module-ops/viewsh-module-ops-biz/pom.xml index ba0d6fd..52a7248 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/pom.xml +++ b/viewsh-module-ops/viewsh-module-ops-biz/pom.xml @@ -47,6 +47,13 @@ viewsh-spring-boot-starter-biz-tenant + + + com.viewsh + viewsh-spring-boot-starter-job + + + com.viewsh