diff --git a/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/device/IotDeviceStatusQueryApi.java b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/device/IotDeviceStatusQueryApi.java index 1b9429f..6c2c663 100644 --- a/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/device/IotDeviceStatusQueryApi.java +++ b/viewsh-module-iot/viewsh-module-iot-api/src/main/java/com/viewsh/module/iot/api/device/IotDeviceStatusQueryApi.java @@ -10,8 +10,10 @@ import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; +import java.util.List; + /** - * IoT 设备状态��询 API + * IoT 设备状态查询 API *
* 提供 RPC 接口供其他模块(如 Ops 模块)查询设备状态
*
@@ -33,4 +35,9 @@ public interface IotDeviceStatusQueryApi {
@Parameter(name = "deviceId", description = "设备ID", required = true, example = "1")
CommonResult
+ * 监听 IoT 模块发布的设备状态变更事件(上线/离线)
+ *
+ * 调用链路:
+ *
+ *
+ * 通过查询 ops_area_device_relation 表判断设备是否为 BADGE 类型
+ */
+ private boolean isBadgeDevice(Long deviceId) {
+ if (deviceId == null) {
+ return false;
+ }
+
+ try {
+ // 通过区域设备关系判断是否为工牌设备
+ OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId);
+ return relation != null && "BADGE".equals(relation.getRelationType());
+ } catch (Exception e) {
+ log.warn("[BadgeDeviceStatusEventHandler] 查询设备类型失败: deviceId={}", deviceId, e);
+ return false;
+ }
+ }
+
+ /**
+ * 获取设备所属区域ID
+ */
+ private Long getAreaIdByDeviceId(Long deviceId) {
+ if (deviceId == null) {
+ return null;
+ }
+
+ try {
+ OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId);
+ if (relation != null && "BADGE".equals(relation.getRelationType())) {
+ return relation.getAreaId();
+ }
+ return null;
+ } catch (Exception e) {
+ log.warn("[BadgeDeviceStatusEventHandler] 获取设备区域失败: deviceId={}", deviceId, e);
+ return null;
+ }
+ }
+}
diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/IotDeviceStatusChangedEventDTO.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/IotDeviceStatusChangedEventDTO.java
new file mode 100644
index 0000000..f049df0
--- /dev/null
+++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/IotDeviceStatusChangedEventDTO.java
@@ -0,0 +1,104 @@
+package com.viewsh.module.ops.environment.integration.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+
+/**
+ * IoT 设备状态变更事件 DTO
+ *
+ * 用于接收 IoT 模块发布的设备状态变更事件
+ * 解耦 Ops 模块与 iot-core 的依赖
+ *
+ * @author AI
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+public class IotDeviceStatusChangedEventDTO {
+
+ /**
+ * 事件ID(唯一标识,用于幂等性处理)
+ */
+ private String eventId;
+
+ /**
+ * 设备ID
+ */
+ private Long deviceId;
+
+ /**
+ * 设备名称(deviceName)
+ */
+ private String deviceName;
+
+ /**
+ * 产品ID
+ */
+ private Long productId;
+
+ /**
+ * 产品标识符(productKey)
+ */
+ private String productKey;
+
+ /**
+ * 租户ID
+ */
+ private Long tenantId;
+
+ /**
+ * 事件时间
+ */
+ private LocalDateTime eventTime;
+
+ /**
+ * 旧状态(0=INACTIVE, 1=ONLINE, 2=OFFLINE)
+ */
+ private Integer oldStatus;
+
+ /**
+ * 新状态(0=INACTIVE, 1=ONLINE, 2=OFFLINE)
+ */
+ private Integer newStatus;
+
+ /**
+ * 状态变更原因
+ */
+ private String reason;
+
+ // ==================== 状态常量 ====================
+
+ /**
+ * 设备状态:在线
+ */
+ public static final Integer STATUS_ONLINE = 1;
+
+ /**
+ * 设备状态:离线
+ */
+ public static final Integer STATUS_OFFLINE = 2;
+
+ /**
+ * 设备状态:未激活
+ */
+ public static final Integer STATUS_INACTIVE = 0;
+
+ /**
+ * 判断是否上线
+ */
+ public boolean isOnline() {
+ return STATUS_ONLINE.equals(newStatus);
+ }
+
+ /**
+ * 判断是否离线
+ */
+ public boolean isOffline() {
+ return STATUS_OFFLINE.equals(newStatus);
+ }
+}
diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/BadgeDeviceStatusInitializer.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/BadgeDeviceStatusInitializer.java
new file mode 100644
index 0000000..01c2d5d
--- /dev/null
+++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/BadgeDeviceStatusInitializer.java
@@ -0,0 +1,40 @@
+package com.viewsh.module.ops.environment.job;
+
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.stereotype.Component;
+
+/**
+ * 工牌设备状态启动初始化器
+ *
+ * 职责:服务启动时,主动拉取所有工牌设备的当前状态,确保 Redis 数据正确
+ *
+ * 解决场景:
+ * - 服务重启期间的状态事件丢失
+ * - 首次部署时的状态初始化
+ *
+ * @author AI
+ */
+@Slf4j
+@Component
+public class BadgeDeviceStatusInitializer implements ApplicationRunner {
+
+ @Resource
+ private BadgeDeviceStatusSyncJob syncJob;
+
+ @Override
+ public void run(ApplicationArguments args) {
+ log.info("[初始化] 服务启动,开始同步工牌设备状态...");
+
+ try {
+ // 复用定时对账逻辑
+ BadgeDeviceStatusSyncJob.SyncResult result = syncJob.syncAllBadgeDeviceStatus();
+ log.info("[初始化] 工牌设备状态同步完成:处理 {} 台,修正 {} 台,耗时 {} ms",
+ result.syncCount(), result.correctedCount(), result.durationMs());
+ } catch (Exception e) {
+ log.error("[初始化] 工牌设备状态同步失败", e);
+ }
+ }
+}
diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/BadgeDeviceStatusSyncJob.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/BadgeDeviceStatusSyncJob.java
new file mode 100644
index 0000000..c3eaf35
--- /dev/null
+++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/BadgeDeviceStatusSyncJob.java
@@ -0,0 +1,192 @@
+package com.viewsh.module.ops.environment.job;
+
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.util.StrUtil;
+import com.viewsh.framework.common.pojo.CommonResult;
+import com.viewsh.framework.tenant.core.job.TenantJob;
+import com.viewsh.module.iot.api.device.IotDeviceStatusQueryApi;
+import com.viewsh.module.iot.api.device.dto.status.DeviceStatusRespDTO;
+import com.viewsh.module.ops.api.badge.BadgeDeviceStatusDTO;
+import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO;
+import com.viewsh.module.ops.dal.mysql.area.OpsAreaDeviceRelationMapper;
+import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum;
+import com.viewsh.module.ops.environment.integration.dto.IotDeviceStatusChangedEventDTO;
+import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusService;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * 工牌设备状态定时对账 Job
+ *
+ * 职责:
+ * 1. 定时从 IoT 模块查询所有工牌设备的当前状态
+ * 2. 与 Ops 模块本地状态对比并修正
+ *
+ * 场景:
+ * - 服务重启后恢复状态
+ * - 修正因消息丢失导致的状态不一致
+ * - 发现新注册的工牌设备
+ *
+ * XXL-Job 配置:
+ * - JobHandler: badgeDeviceStatusSyncJob
+ * - Cron: 0 0/5 * * * ? (每 5 分钟)
+ *
+ * @author AI
+ */
+@Slf4j
+@Component
+public class BadgeDeviceStatusSyncJob {
+
+ @Resource
+ private IotDeviceStatusQueryApi iotDeviceStatusQueryApi;
+
+ @Resource
+ private OpsAreaDeviceRelationMapper areaDeviceRelationMapper;
+
+ @Resource
+ private BadgeDeviceStatusService badgeDeviceStatusService;
+
+ /**
+ * 执行全量对账
+ *
+ * XXL-Job 调度入口
+ *
+ * @return 执行结果
+ */
+ @XxlJob("badgeDeviceStatusSyncJob")
+ @TenantJob
+ public String execute() {
+ try {
+ SyncResult result = syncAllBadgeDeviceStatus();
+ return StrUtil.format("工牌状态对账完成: 处理 {} 台,修正 {} 台,耗时 {} ms",
+ result.syncCount, result.correctedCount, result.durationMs);
+ } catch (Exception e) {
+ log.error("[SyncJob] 全量对账失败", e);
+ return StrUtil.format("工牌状态对账失败: {}", e.getMessage());
+ }
+ }
+
+ /**
+ * 执行全量对账(返回结果)
+ *
+ * 可由 XXL-Job 或 ApplicationRunner 调用
+ *
+ * @return 同步结果
+ */
+ public SyncResult syncAllBadgeDeviceStatus() {
+ log.info("[SyncJob] 开始全量对账工牌设备状态");
+ long startTime = System.currentTimeMillis();
+ int syncCount = 0;
+ int correctedCount = 0;
+
+ // 1. 获取所有工牌设备关联关系
+ List
* 状态转换会记录状态变更时间和操作原因
*
- * @param deviceId 设备ID
- * @param status 目标状态
+ * @param deviceId 设备ID
+ * @param status 目标状态
* @param operatorId 操作人ID(可为null,表示系统操作)
- * @param reason 状态变更原因
+ * @param reason 状态变更原因
*/
void updateBadgeStatus(Long deviceId, BadgeDeviceStatusEnum status, Long operatorId, String reason);
/**
* 批量更新工牌设备状态
*
- * @param deviceIds 设备ID列表
+ * @param deviceIds 设备ID列表
* @param status 目标状态
* @param operatorId 操作人ID
* @param reason 状态变更原因
@@ -98,9 +98,9 @@ public interface BadgeDeviceStatusService {
*
* 更新最后心跳时间和电量:
*
+ * 用于处理 IoT 设备状态变更事件和定时对账:
+ *
+ * 与 handleHeartbeat 区别:
+ * - handleHeartbeat:处理位置上报心跳,包含电量等详细信息
+ * - updateBadgeOnlineStatus:仅处理在线/离线状态变更
+ *
+ * @param deviceId 设备ID
+ * @param deviceCode 设备编码
+ * @param areaId 区域ID(可为null)
+ * @param status 目标状态(IDLE 或 OFFLINE)
+ * @param reason 状态变更原因
+ */
+ void updateBadgeOnlineStatus(Long deviceId, String deviceCode, Long areaId,
+ BadgeDeviceStatusEnum status, String reason);
// ==================== 在线状态检查 ====================
@@ -163,11 +185,11 @@ public interface BadgeDeviceStatusService {
* 一次性设置工单ID、工单状态、区域ID、信标MAC
* 使用 Redis Hash 的 hset() 操作,保证原子性
*
- * @param deviceId 设备ID
- * @param orderId 工单ID
- * @param orderStatus 工单状态(DISPATCHED/ARRIVED/PAUSED)
- * @param areaId 区域ID
- * @param beaconMac 信标MAC地址(可为null)
+ * @param deviceId 设备ID
+ * @param orderId 工单ID
+ * @param orderStatus 工单状态(DISPATCHED/ARRIVED/PAUSED)
+ * @param areaId 区域ID
+ * @param beaconMac 信标MAC地址(可为null)
*/
void setCurrentOrderInfo(Long deviceId, Long orderId, String orderStatus, Long areaId, String beaconMac);
diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusServiceImpl.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusServiceImpl.java
index b219dc3..e209d19 100644
--- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusServiceImpl.java
+++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusServiceImpl.java
@@ -1,8 +1,6 @@
package com.viewsh.module.ops.environment.service.badge;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.module.ops.api.badge.BadgeDeviceStatusDTO;
-import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO;
import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum;
import com.viewsh.module.ops.service.area.AreaDeviceService;
import jakarta.annotation.Resource;
@@ -29,7 +27,9 @@ import java.util.stream.Collectors;
* 4. 心跳超时检查
*
* 设计说明:
- * - 状态变更事件由 {@link com.viewsh.module.ops.environment.integration.listener.BadgeDeviceStatusEventListener} 处理
+ * - 状态变更事件由
+ * {@link com.viewsh.module.ops.environment.integration.listener.BadgeDeviceStatusEventListener}
+ * 处理
* - 本类只提供基础的服务方法
*
* @author lzh
@@ -41,9 +41,6 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
@Resource
private RedisTemplate> batchGetDeviceStatus(@RequestParam("deviceIds") List
> batchGetDeviceStatus(
+ @RequestParam("deviceIds") List
+ * IoT 设备连接/断开
+ * ↓
+ * IotDeviceServiceImpl.updateDeviceState()
+ * ↓
+ * RocketMQ Topic: integration-device-status
+ * ↓
+ * BadgeDeviceStatusEventHandler.onMessage()
+ * - 幂等性检查
+ * - 判断是否为工牌设备
+ * ↓
+ * badgeDeviceStatusService.updateBadgeOnlineStatus()
+ * - 创建/更新 Redis 状态记录
+ *
+ *
+ * @author AI
+ */
+@Slf4j
+@Component
+@RocketMQMessageListener(topic = "integration-device-status",
+ consumerGroup = "ops-badge-status-consumer",
+ consumeMode = ConsumeMode.CONCURRENTLY,
+ selectorExpression = "*")
+public class BadgeDeviceStatusEventHandler implements RocketMQListener> iotResult = iotDeviceStatusQueryApi.batchGetDeviceStatus(deviceIds);
+ if (!iotResult.isSuccess() || iotResult.getData() == null) {
+ log.error("[SyncJob] 查询 IoT 设备状态失败: {}", iotResult.getMsg());
+ throw new RuntimeException("查询 IoT 设备状态失败: " + iotResult.getMsg());
+ }
+
+ // 3. 构建设备关系映射(deviceId -> areaId)
+ Map
- *
*
* @param deviceId 设备ID
@@ -119,7 +119,29 @@ public interface BadgeDeviceStatusService {
* @param areaName 当前所在区域名称
*/
void handleHeartbeatWithArea(Long deviceId, String deviceCode, Integer batteryLevel,
- Long areaId, String areaName);
+ Long areaId, String areaName);
+
+ /**
+ * 更新工牌设备在线状态
+ *
+ *
+ *