refactor(ops): optimize badge device status management and add sync job
This commit is contained in:
@@ -0,0 +1,188 @@
|
||||
package com.viewsh.module.ops.environment.integration.consumer;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO;
|
||||
import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum;
|
||||
import com.viewsh.module.ops.environment.integration.dto.IotDeviceStatusChangedEventDTO;
|
||||
import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusService;
|
||||
import com.viewsh.module.ops.service.area.AreaDeviceService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 工牌设备状态变更事件消费者
|
||||
* <p>
|
||||
* 监听 IoT 模块发布的设备状态变更事件(上线/离线)
|
||||
* <p>
|
||||
* 调用链路:
|
||||
*
|
||||
* <pre>
|
||||
* IoT 设备连接/断开
|
||||
* ↓
|
||||
* IotDeviceServiceImpl.updateDeviceState()
|
||||
* ↓
|
||||
* RocketMQ Topic: integration-device-status
|
||||
* ↓
|
||||
* BadgeDeviceStatusEventHandler.onMessage()
|
||||
* - 幂等性检查
|
||||
* - 判断是否为工牌设备
|
||||
* ↓
|
||||
* badgeDeviceStatusService.updateBadgeOnlineStatus()
|
||||
* - 创建/更新 Redis 状态记录
|
||||
* </pre>
|
||||
*
|
||||
* @author AI
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(topic = "integration-device-status",
|
||||
consumerGroup = "ops-badge-status-consumer",
|
||||
consumeMode = ConsumeMode.CONCURRENTLY,
|
||||
selectorExpression = "*")
|
||||
public class BadgeDeviceStatusEventHandler implements RocketMQListener<String> {
|
||||
|
||||
/**
|
||||
* 幂等性控制 Key 模式
|
||||
*/
|
||||
private static final String DEDUP_KEY_PATTERN = "ops:badge:dedup:status:%s";
|
||||
|
||||
/**
|
||||
* 幂等性控制 TTL(秒)
|
||||
*/
|
||||
private static final int DEDUP_TTL_SECONDS = 120;
|
||||
|
||||
@Resource
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@Resource
|
||||
private BadgeDeviceStatusService badgeDeviceStatusService;
|
||||
|
||||
@Resource
|
||||
private AreaDeviceService areaDeviceService;
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
try {
|
||||
// 1. JSON 反序列化
|
||||
IotDeviceStatusChangedEventDTO event = objectMapper.readValue(message,
|
||||
IotDeviceStatusChangedEventDTO.class);
|
||||
|
||||
log.debug("[BadgeDeviceStatusEventHandler] 收到设备状态变更事件: eventId={}, deviceId={}, {} -> {}",
|
||||
event.getEventId(), event.getDeviceId(), event.getOldStatus(), event.getNewStatus());
|
||||
|
||||
// 2. 幂等性检查
|
||||
String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId());
|
||||
Boolean firstTime = stringRedisTemplate.opsForValue()
|
||||
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
|
||||
|
||||
if (Boolean.FALSE.equals(firstTime)) {
|
||||
log.debug("[BadgeDeviceStatusEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 判断是否为工牌设备
|
||||
if (!isBadgeDevice(event.getDeviceId())) {
|
||||
log.debug("[BadgeDeviceStatusEventHandler] 非工牌设备,跳过处理: deviceId={}", event.getDeviceId());
|
||||
return;
|
||||
}
|
||||
|
||||
// 4. 处理状态变更
|
||||
handleDeviceStatusChange(event);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[BadgeDeviceStatusEventHandler] 消息处理失败: message={}", message, e);
|
||||
// 不抛出异常,避免消息重试导致重复处理
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理设备状态变更
|
||||
*/
|
||||
private void handleDeviceStatusChange(IotDeviceStatusChangedEventDTO event) {
|
||||
Long deviceId = event.getDeviceId();
|
||||
String deviceCode = event.getDeviceName();
|
||||
|
||||
// 获取设备所属区域
|
||||
Long areaId = getAreaIdByDeviceId(deviceId);
|
||||
|
||||
if (event.isOnline()) {
|
||||
// 设备上线
|
||||
log.info("[BadgeDeviceStatusEventHandler] 工牌设备上线: deviceId={}, deviceCode={}, areaId={}",
|
||||
deviceId, deviceCode, areaId);
|
||||
|
||||
badgeDeviceStatusService.updateBadgeOnlineStatus(
|
||||
deviceId,
|
||||
deviceCode,
|
||||
areaId,
|
||||
BadgeDeviceStatusEnum.IDLE,
|
||||
"设备上线");
|
||||
|
||||
} else if (event.isOffline()) {
|
||||
// 设备离线
|
||||
log.info("[BadgeDeviceStatusEventHandler] 工牌设备离线: deviceId={}, deviceCode={}",
|
||||
deviceId, deviceCode);
|
||||
|
||||
badgeDeviceStatusService.updateBadgeOnlineStatus(
|
||||
deviceId,
|
||||
deviceCode,
|
||||
null,
|
||||
BadgeDeviceStatusEnum.OFFLINE,
|
||||
"设备离线");
|
||||
|
||||
} else {
|
||||
// 其他状态(如 INACTIVE),暂不处理
|
||||
log.debug("[BadgeDeviceStatusEventHandler] 忽略状态变更: deviceId={}, newStatus={}",
|
||||
deviceId, event.getNewStatus());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为工牌设备
|
||||
* <p>
|
||||
* 通过查询 ops_area_device_relation 表判断设备是否为 BADGE 类型
|
||||
*/
|
||||
private boolean isBadgeDevice(Long deviceId) {
|
||||
if (deviceId == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
// 通过区域设备关系判断是否为工牌设备
|
||||
OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId);
|
||||
return relation != null && "BADGE".equals(relation.getRelationType());
|
||||
} catch (Exception e) {
|
||||
log.warn("[BadgeDeviceStatusEventHandler] 查询设备类型失败: deviceId={}", deviceId, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取设备所属区域ID
|
||||
*/
|
||||
private Long getAreaIdByDeviceId(Long deviceId) {
|
||||
if (deviceId == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId);
|
||||
if (relation != null && "BADGE".equals(relation.getRelationType())) {
|
||||
return relation.getAreaId();
|
||||
}
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
log.warn("[BadgeDeviceStatusEventHandler] 获取设备区域失败: deviceId={}", deviceId, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,104 @@
|
||||
package com.viewsh.module.ops.environment.integration.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* IoT 设备状态变更事件 DTO
|
||||
* <p>
|
||||
* 用于接收 IoT 模块发布的设备状态变更事件
|
||||
* 解耦 Ops 模块与 iot-core 的依赖
|
||||
*
|
||||
* @author AI
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class IotDeviceStatusChangedEventDTO {
|
||||
|
||||
/**
|
||||
* 事件ID(唯一标识,用于幂等性处理)
|
||||
*/
|
||||
private String eventId;
|
||||
|
||||
/**
|
||||
* 设备ID
|
||||
*/
|
||||
private Long deviceId;
|
||||
|
||||
/**
|
||||
* 设备名称(deviceName)
|
||||
*/
|
||||
private String deviceName;
|
||||
|
||||
/**
|
||||
* 产品ID
|
||||
*/
|
||||
private Long productId;
|
||||
|
||||
/**
|
||||
* 产品标识符(productKey)
|
||||
*/
|
||||
private String productKey;
|
||||
|
||||
/**
|
||||
* 租户ID
|
||||
*/
|
||||
private Long tenantId;
|
||||
|
||||
/**
|
||||
* 事件时间
|
||||
*/
|
||||
private LocalDateTime eventTime;
|
||||
|
||||
/**
|
||||
* 旧状态(0=INACTIVE, 1=ONLINE, 2=OFFLINE)
|
||||
*/
|
||||
private Integer oldStatus;
|
||||
|
||||
/**
|
||||
* 新状态(0=INACTIVE, 1=ONLINE, 2=OFFLINE)
|
||||
*/
|
||||
private Integer newStatus;
|
||||
|
||||
/**
|
||||
* 状态变更原因
|
||||
*/
|
||||
private String reason;
|
||||
|
||||
// ==================== 状态常量 ====================
|
||||
|
||||
/**
|
||||
* 设备状态:在线
|
||||
*/
|
||||
public static final Integer STATUS_ONLINE = 1;
|
||||
|
||||
/**
|
||||
* 设备状态:离线
|
||||
*/
|
||||
public static final Integer STATUS_OFFLINE = 2;
|
||||
|
||||
/**
|
||||
* 设备状态:未激活
|
||||
*/
|
||||
public static final Integer STATUS_INACTIVE = 0;
|
||||
|
||||
/**
|
||||
* 判断是否上线
|
||||
*/
|
||||
public boolean isOnline() {
|
||||
return STATUS_ONLINE.equals(newStatus);
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否离线
|
||||
*/
|
||||
public boolean isOffline() {
|
||||
return STATUS_OFFLINE.equals(newStatus);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package com.viewsh.module.ops.environment.job;
|
||||
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 工牌设备状态启动初始化器
|
||||
* <p>
|
||||
* 职责:服务启动时,主动拉取所有工牌设备的当前状态,确保 Redis 数据正确
|
||||
* <p>
|
||||
* 解决场景:
|
||||
* - 服务重启期间的状态事件丢失
|
||||
* - 首次部署时的状态初始化
|
||||
*
|
||||
* @author AI
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class BadgeDeviceStatusInitializer implements ApplicationRunner {
|
||||
|
||||
@Resource
|
||||
private BadgeDeviceStatusSyncJob syncJob;
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) {
|
||||
log.info("[初始化] 服务启动,开始同步工牌设备状态...");
|
||||
|
||||
try {
|
||||
// 复用定时对账逻辑
|
||||
BadgeDeviceStatusSyncJob.SyncResult result = syncJob.syncAllBadgeDeviceStatus();
|
||||
log.info("[初始化] 工牌设备状态同步完成:处理 {} 台,修正 {} 台,耗时 {} ms",
|
||||
result.syncCount(), result.correctedCount(), result.durationMs());
|
||||
} catch (Exception e) {
|
||||
log.error("[初始化] 工牌设备状态同步失败", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,192 @@
|
||||
package com.viewsh.module.ops.environment.job;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.viewsh.framework.common.pojo.CommonResult;
|
||||
import com.viewsh.framework.tenant.core.job.TenantJob;
|
||||
import com.viewsh.module.iot.api.device.IotDeviceStatusQueryApi;
|
||||
import com.viewsh.module.iot.api.device.dto.status.DeviceStatusRespDTO;
|
||||
import com.viewsh.module.ops.api.badge.BadgeDeviceStatusDTO;
|
||||
import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO;
|
||||
import com.viewsh.module.ops.dal.mysql.area.OpsAreaDeviceRelationMapper;
|
||||
import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum;
|
||||
import com.viewsh.module.ops.environment.integration.dto.IotDeviceStatusChangedEventDTO;
|
||||
import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusService;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 工牌设备状态定时对账 Job
|
||||
* <p>
|
||||
* 职责:
|
||||
* 1. 定时从 IoT 模块查询所有工牌设备的当前状态
|
||||
* 2. 与 Ops 模块本地状态对比并修正
|
||||
* <p>
|
||||
* 场景:
|
||||
* - 服务重启后恢复状态
|
||||
* - 修正因消息丢失导致的状态不一致
|
||||
* - 发现新注册的工牌设备
|
||||
* <p>
|
||||
* XXL-Job 配置:
|
||||
* - JobHandler: badgeDeviceStatusSyncJob
|
||||
* - Cron: 0 0/5 * * * ? (每 5 分钟)
|
||||
*
|
||||
* @author AI
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class BadgeDeviceStatusSyncJob {
|
||||
|
||||
@Resource
|
||||
private IotDeviceStatusQueryApi iotDeviceStatusQueryApi;
|
||||
|
||||
@Resource
|
||||
private OpsAreaDeviceRelationMapper areaDeviceRelationMapper;
|
||||
|
||||
@Resource
|
||||
private BadgeDeviceStatusService badgeDeviceStatusService;
|
||||
|
||||
/**
|
||||
* 执行全量对账
|
||||
* <p>
|
||||
* XXL-Job 调度入口
|
||||
*
|
||||
* @return 执行结果
|
||||
*/
|
||||
@XxlJob("badgeDeviceStatusSyncJob")
|
||||
@TenantJob
|
||||
public String execute() {
|
||||
try {
|
||||
SyncResult result = syncAllBadgeDeviceStatus();
|
||||
return StrUtil.format("工牌状态对账完成: 处理 {} 台,修正 {} 台,耗时 {} ms",
|
||||
result.syncCount, result.correctedCount, result.durationMs);
|
||||
} catch (Exception e) {
|
||||
log.error("[SyncJob] 全量对账失败", e);
|
||||
return StrUtil.format("工牌状态对账失败: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行全量对账(返回结果)
|
||||
* <p>
|
||||
* 可由 XXL-Job 或 ApplicationRunner 调用
|
||||
*
|
||||
* @return 同步结果
|
||||
*/
|
||||
public SyncResult syncAllBadgeDeviceStatus() {
|
||||
log.info("[SyncJob] 开始全量对账工牌设备状态");
|
||||
long startTime = System.currentTimeMillis();
|
||||
int syncCount = 0;
|
||||
int correctedCount = 0;
|
||||
|
||||
// 1. 获取所有工牌设备关联关系
|
||||
List<OpsAreaDeviceRelationDO> badgeRelations = areaDeviceRelationMapper
|
||||
.selectListByAreaIdAndRelationType(null, "BADGE");
|
||||
|
||||
if (CollUtil.isEmpty(badgeRelations)) {
|
||||
log.info("[SyncJob] 无工牌设备,跳过对账");
|
||||
return new SyncResult(0, 0, System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
List<Long> deviceIds = badgeRelations.stream()
|
||||
.map(OpsAreaDeviceRelationDO::getDeviceId)
|
||||
.distinct()
|
||||
.toList();
|
||||
|
||||
log.info("[SyncJob] 待对账工牌设备数量: {}", deviceIds.size());
|
||||
|
||||
// 2. 批量查询 IoT 设备状态
|
||||
CommonResult<List<DeviceStatusRespDTO>> iotResult = iotDeviceStatusQueryApi.batchGetDeviceStatus(deviceIds);
|
||||
if (!iotResult.isSuccess() || iotResult.getData() == null) {
|
||||
log.error("[SyncJob] 查询 IoT 设备状态失败: {}", iotResult.getMsg());
|
||||
throw new RuntimeException("查询 IoT 设备状态失败: " + iotResult.getMsg());
|
||||
}
|
||||
|
||||
// 3. 构建设备关系映射(deviceId -> areaId)
|
||||
Map<Long, Long> deviceAreaMap = badgeRelations.stream()
|
||||
.collect(Collectors.toMap(
|
||||
OpsAreaDeviceRelationDO::getDeviceId,
|
||||
OpsAreaDeviceRelationDO::getAreaId,
|
||||
(existing, replacement) -> existing));
|
||||
|
||||
// 4. 逐一对账并修正
|
||||
for (DeviceStatusRespDTO iotStatus : iotResult.getData()) {
|
||||
boolean corrected = syncSingleDevice(iotStatus, deviceAreaMap.get(iotStatus.getDeviceId()));
|
||||
syncCount++;
|
||||
if (corrected) {
|
||||
correctedCount++;
|
||||
}
|
||||
}
|
||||
|
||||
long duration = System.currentTimeMillis() - startTime;
|
||||
log.info("[SyncJob] 全量对账完成,共处理 {} 台设备,修正 {} 台,耗时 {} ms",
|
||||
syncCount, correctedCount, duration);
|
||||
|
||||
return new SyncResult(syncCount, correctedCount, duration);
|
||||
}
|
||||
|
||||
/**
|
||||
* 对账单个设备
|
||||
*
|
||||
* @param iotStatus IoT 设备状态
|
||||
* @param areaId 设备所属区域ID
|
||||
* @return 是否进行了修正
|
||||
*/
|
||||
private boolean syncSingleDevice(DeviceStatusRespDTO iotStatus, Long areaId) {
|
||||
Long deviceId = iotStatus.getDeviceId();
|
||||
|
||||
try {
|
||||
// 获取 Ops 当前状态
|
||||
BadgeDeviceStatusDTO opsStatus = badgeDeviceStatusService.getBadgeStatus(deviceId);
|
||||
|
||||
// 判断是否需要同步(使用本地常量)
|
||||
boolean iotOnline = IotDeviceStatusChangedEventDTO.STATUS_ONLINE.equals(iotStatus.getStatus());
|
||||
boolean opsOnline = opsStatus != null && opsStatus.getStatus() != null
|
||||
&& opsStatus.getStatus().isActive();
|
||||
|
||||
// 如果状态一致,无需修正
|
||||
if (iotOnline == opsOnline) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 状态不一致,以 IoT 为准进行修正
|
||||
if (iotOnline) {
|
||||
badgeDeviceStatusService.updateBadgeOnlineStatus(
|
||||
deviceId,
|
||||
iotStatus.getDeviceCode(),
|
||||
areaId,
|
||||
BadgeDeviceStatusEnum.IDLE,
|
||||
"定时对账修正-上线");
|
||||
log.info("[SyncJob] 修正设备状态:deviceId={}, IoT=ONLINE, Ops={}",
|
||||
deviceId, opsOnline ? "ACTIVE" : "OFFLINE/NULL");
|
||||
} else {
|
||||
badgeDeviceStatusService.updateBadgeOnlineStatus(
|
||||
deviceId,
|
||||
iotStatus.getDeviceCode(),
|
||||
null,
|
||||
BadgeDeviceStatusEnum.OFFLINE,
|
||||
"定时对账修正-离线");
|
||||
log.info("[SyncJob] 修正设备状态:deviceId={}, IoT=OFFLINE, Ops=ACTIVE",
|
||||
deviceId);
|
||||
}
|
||||
|
||||
return true;
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[SyncJob] 对账设备失败:deviceId={}", deviceId, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 同步结果
|
||||
*/
|
||||
public record SyncResult(int syncCount, int correctedCount, long durationMs) {
|
||||
}
|
||||
}
|
||||
@@ -31,17 +31,17 @@ public interface BadgeDeviceStatusService {
|
||||
* <p>
|
||||
* 状态转换会记录状态变更时间和操作原因
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param status 目标状态
|
||||
* @param deviceId 设备ID
|
||||
* @param status 目标状态
|
||||
* @param operatorId 操作人ID(可为null,表示系统操作)
|
||||
* @param reason 状态变更原因
|
||||
* @param reason 状态变更原因
|
||||
*/
|
||||
void updateBadgeStatus(Long deviceId, BadgeDeviceStatusEnum status, Long operatorId, String reason);
|
||||
|
||||
/**
|
||||
* 批量更新工牌设备状态
|
||||
*
|
||||
* @param deviceIds 设备ID列表
|
||||
* @param deviceIds 设备ID列表
|
||||
* @param status 目标状态
|
||||
* @param operatorId 操作人ID
|
||||
* @param reason 状态变更原因
|
||||
@@ -98,9 +98,9 @@ public interface BadgeDeviceStatusService {
|
||||
* <p>
|
||||
* 更新最后心跳时间和电量:
|
||||
* <ul>
|
||||
* <li>如果设备之前为 OFFLINE,转为 IDLE</li>
|
||||
* <li>如果设备已存在,更新心跳时间和电量</li>
|
||||
* <li>如果设备不存在,创建新记录(状态为 IDLE)</li>
|
||||
* <li>如果设备之前为 OFFLINE,转为 IDLE</li>
|
||||
* <li>如果设备已存在,更新心跳时间和电量</li>
|
||||
* <li>如果设备不存在,创建新记录(状态为 IDLE)</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
@@ -119,7 +119,29 @@ public interface BadgeDeviceStatusService {
|
||||
* @param areaName 当前所在区域名称
|
||||
*/
|
||||
void handleHeartbeatWithArea(Long deviceId, String deviceCode, Integer batteryLevel,
|
||||
Long areaId, String areaName);
|
||||
Long areaId, String areaName);
|
||||
|
||||
/**
|
||||
* 更新工牌设备在线状态
|
||||
* <p>
|
||||
* 用于处理 IoT 设备状态变更事件和定时对账:
|
||||
* <ul>
|
||||
* <li>设备上线:创建或更新状态记录,状态设为 IDLE</li>
|
||||
* <li>设备离线:更新状态记录,状态设为 OFFLINE</li>
|
||||
* </ul>
|
||||
* <p>
|
||||
* 与 handleHeartbeat 区别:
|
||||
* - handleHeartbeat:处理位置上报心跳,包含电量等详细信息
|
||||
* - updateBadgeOnlineStatus:仅处理在线/离线状态变更
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param deviceCode 设备编码
|
||||
* @param areaId 区域ID(可为null)
|
||||
* @param status 目标状态(IDLE 或 OFFLINE)
|
||||
* @param reason 状态变更原因
|
||||
*/
|
||||
void updateBadgeOnlineStatus(Long deviceId, String deviceCode, Long areaId,
|
||||
BadgeDeviceStatusEnum status, String reason);
|
||||
|
||||
// ==================== 在线状态检查 ====================
|
||||
|
||||
@@ -163,11 +185,11 @@ public interface BadgeDeviceStatusService {
|
||||
* 一次性设置工单ID、工单状态、区域ID、信标MAC
|
||||
* 使用 Redis Hash 的 hset() 操作,保证原子性
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param orderId 工单ID
|
||||
* @param orderStatus 工单状态(DISPATCHED/ARRIVED/PAUSED)
|
||||
* @param areaId 区域ID
|
||||
* @param beaconMac 信标MAC地址(可为null)
|
||||
* @param deviceId 设备ID
|
||||
* @param orderId 工单ID
|
||||
* @param orderStatus 工单状态(DISPATCHED/ARRIVED/PAUSED)
|
||||
* @param areaId 区域ID
|
||||
* @param beaconMac 信标MAC地址(可为null)
|
||||
*/
|
||||
void setCurrentOrderInfo(Long deviceId, Long orderId, String orderStatus, Long areaId, String beaconMac);
|
||||
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package com.viewsh.module.ops.environment.service.badge;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.viewsh.module.ops.api.badge.BadgeDeviceStatusDTO;
|
||||
import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO;
|
||||
import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum;
|
||||
import com.viewsh.module.ops.service.area.AreaDeviceService;
|
||||
import jakarta.annotation.Resource;
|
||||
@@ -29,7 +27,9 @@ import java.util.stream.Collectors;
|
||||
* 4. 心跳超时检查
|
||||
* <p>
|
||||
* 设计说明:
|
||||
* - 状态变更事件由 {@link com.viewsh.module.ops.environment.integration.listener.BadgeDeviceStatusEventListener} 处理
|
||||
* - 状态变更事件由
|
||||
* {@link com.viewsh.module.ops.environment.integration.listener.BadgeDeviceStatusEventListener}
|
||||
* 处理
|
||||
* - 本类只提供基础的服务方法
|
||||
*
|
||||
* @author lzh
|
||||
@@ -41,9 +41,6 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
@Resource
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
@Resource
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
@Resource
|
||||
private AreaDeviceService areaDeviceService;
|
||||
|
||||
@@ -119,7 +116,8 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
}
|
||||
|
||||
@Override
|
||||
public void batchUpdateBadgeStatus(List<Long> deviceIds, BadgeDeviceStatusEnum status, Long operatorId, String reason) {
|
||||
public void batchUpdateBadgeStatus(List<Long> deviceIds, BadgeDeviceStatusEnum status, Long operatorId,
|
||||
String reason) {
|
||||
if (deviceIds == null || deviceIds.isEmpty() || status == null) {
|
||||
return;
|
||||
}
|
||||
@@ -256,7 +254,7 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
|
||||
@Override
|
||||
public void handleHeartbeatWithArea(Long deviceId, String deviceCode, Integer batteryLevel,
|
||||
Long areaId, String areaName) {
|
||||
Long areaId, String areaName) {
|
||||
if (deviceId == null) {
|
||||
return;
|
||||
}
|
||||
@@ -315,6 +313,75 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateBadgeOnlineStatus(Long deviceId, String deviceCode, Long areaId,
|
||||
BadgeDeviceStatusEnum status, String reason) {
|
||||
if (deviceId == null || status == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
String key = BADGE_STATUS_KEY_PREFIX + deviceId;
|
||||
Long now = System.currentTimeMillis();
|
||||
|
||||
// 获取当前状态
|
||||
Map<Object, Object> currentMap = redisTemplate.opsForHash().entries(key);
|
||||
|
||||
// 构建状态数据
|
||||
Map<String, Object> statusMap = new HashMap<>();
|
||||
statusMap.put("deviceId", deviceId);
|
||||
statusMap.put("deviceCode", deviceCode != null ? deviceCode : currentMap.get("deviceCode"));
|
||||
statusMap.put("status", status.getCode());
|
||||
statusMap.put("statusChangeTime", LocalDateTime.now().toString());
|
||||
statusMap.put("statusChangeReason", reason);
|
||||
|
||||
// 如果是上线(非 OFFLINE 状态),更新心跳时间
|
||||
if (status.isActive()) {
|
||||
statusMap.put("lastHeartbeatTime", now);
|
||||
}
|
||||
|
||||
// 更新区域信息
|
||||
if (areaId != null) {
|
||||
statusMap.put("currentAreaId", areaId);
|
||||
// 更新区域索引
|
||||
addToAreaIndex(deviceId, areaId);
|
||||
} else {
|
||||
// 保持现有区域信息
|
||||
Object existingAreaId = currentMap.get("currentAreaId");
|
||||
if (existingAreaId != null) {
|
||||
statusMap.put("currentAreaId", existingAreaId);
|
||||
}
|
||||
Object existingAreaName = currentMap.get("currentAreaName");
|
||||
if (existingAreaName != null) {
|
||||
statusMap.put("currentAreaName", existingAreaName);
|
||||
}
|
||||
}
|
||||
|
||||
// 保持当前工单相关字段(如果存在)
|
||||
if (currentMap.containsKey("currentOpsOrderId")) {
|
||||
statusMap.put("currentOpsOrderId", currentMap.get("currentOpsOrderId"));
|
||||
}
|
||||
if (currentMap.containsKey("currentOrderStatus")) {
|
||||
statusMap.put("currentOrderStatus", currentMap.get("currentOrderStatus"));
|
||||
}
|
||||
if (currentMap.containsKey("beaconMac")) {
|
||||
statusMap.put("beaconMac", currentMap.get("beaconMac"));
|
||||
}
|
||||
if (currentMap.containsKey("batteryLevel")) {
|
||||
statusMap.put("batteryLevel", currentMap.get("batteryLevel"));
|
||||
}
|
||||
|
||||
redisTemplate.opsForHash().putAll(key, statusMap);
|
||||
redisTemplate.expire(key, STATUS_EXPIRE_HOURS, TimeUnit.HOURS);
|
||||
|
||||
log.info("[updateBadgeOnlineStatus] 更新工牌在线状态: deviceId={}, deviceCode={}, status={}, reason={}",
|
||||
deviceId, deviceCode, status, reason);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[updateBadgeOnlineStatus] 更新工牌在线状态失败: deviceId={}", deviceId, e);
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== 在线状态检查 ====================
|
||||
|
||||
@Override
|
||||
|
||||
@@ -47,6 +47,13 @@
|
||||
<artifactId>viewsh-spring-boot-starter-biz-tenant</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- 定时任务 XXL-Job -->
|
||||
<dependency>
|
||||
<groupId>com.viewsh</groupId>
|
||||
<artifactId>viewsh-spring-boot-starter-job</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- 依赖 IoT API 模块:获取设备消息类、RPC 接口和枚举 -->
|
||||
<dependency>
|
||||
<groupId>com.viewsh</groupId>
|
||||
|
||||
Reference in New Issue
Block a user